001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ejb.plugins.jms;
023:
024: import java.util.Hashtable;
025: import java.util.HashMap;
026: import java.util.Map;
027: import java.util.Enumeration;
028: import java.util.Iterator;
029:
030: import javax.naming.Context;
031: import javax.jms.ExceptionListener;
032: import javax.jms.Session;
033: import javax.jms.QueueConnection;
034: import javax.jms.QueueConnectionFactory;
035: import javax.jms.QueueSession;
036: import javax.jms.QueueSender;
037: import javax.jms.Queue;
038: import javax.jms.Message;
039: import javax.jms.JMSException;
040: import javax.jms.Destination;
041: import javax.transaction.Status;
042: import javax.transaction.Synchronization;
043: import javax.transaction.Transaction;
044:
045: import org.w3c.dom.Element;
046:
047: import org.jboss.deployment.DeploymentException;
048: import org.jboss.metadata.MetaData;
049: import org.jboss.jms.jndi.JMSProviderAdapter;
050: import org.jboss.system.ServiceMBeanSupport;
051:
052: /**
053: * Places redeliveded messages on a Dead Letter Queue.
054: *
055: *<p>
056: *The Dead Letter Queue handler is used to not set JBoss in an endles loop
057: * when a message is resent on and on due to transaction rollback for
058: * message receipt.
059: *
060: * <p>
061: * It sends message to a dead letter queue (configurable, defaults to
062: * queue/DLQ) when the message has been resent a configurable amount of times,
063: * defaults to 10.
064: *
065: * <p>
066: * The handler is configured through the element MDBConfig in
067: * container-invoker-conf.
068: *
069: * <p>
070: * The JMS property JBOSS_ORIG_DESTINATION in the resent message is set
071: * to the name of the original destination (Destination.toString())
072: * if it is present.
073: *
074: * <p>
075: * The JMS property JBOSS_ORIG_MESSAGEID in the resent message is set
076: * to the id of the original message.
077: *
078: * @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
079: * @author Scott.Stark@jboss.org
080: * @author Adrian Brock
081: * @version <tt>$Revision: 57209 $</tt>
082: */
083: public class DLQHandler extends ServiceMBeanSupport implements
084: ExceptionListener {
085: /** Standard property for delivery count */
086: public static final String PROPERTY_DELIVERY_COUNT = "JMSXDeliveryCount";
087:
088: /** JMS property name holding original destination. */
089: public static final String JBOSS_ORIG_DESTINATION = "JBOSS_ORIG_DESTINATION";
090:
091: /** JMS property name holding original JMS message id. */
092: public static final String JBOSS_ORIG_MESSAGEID = "JBOSS_ORIG_MESSAGEID";
093:
094: /** Properties copied from org.jboss.mq.SpyMessage */
095: private static final String JMS_JBOSS_REDELIVERY_COUNT = "JMS_JBOSS_REDELIVERY_COUNT";
096: private static final String JMS_JBOSS_REDELIVERY_LIMIT = "JMS_JBOSS_REDELIVERY_LIMIT";
097:
098: /**
099: * Destination to send dead letters to.
100: *
101: * <p>
102: * Defaults to <em>queue/DLQ</em>, configurable through
103: * <tt>DestinationQueue</tt> element.
104: */
105: private String destinationJNDI = "queue/DLQ";
106:
107: /**
108: * Maximum times a message is alowed to be resent.
109: *
110: * <p>Defaults to <em>10</em>, configurable through
111: * <tt>MaxTimesRedelivered</tt> element.
112: */
113: private int maxResent = 10;
114:
115: /**
116: * Time to live for the message.
117: *
118: * <p>
119: * Defaults to <em>{@link Message#DEFAULT_TIME_TO_LIVE}</em>,
120: * configurable through the <tt>TimeToLive</tt> element.
121: */
122: private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
123:
124: // May become configurable
125:
126: /** Delivery mode for message, Message.DEFAULT_DELIVERY_MODE. */
127: private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
128:
129: /** Priority for the message, Message.DEFAULT_PRIORITY */
130: private int priority = Message.DEFAULT_PRIORITY;
131:
132: /** The dlq user for the connection */
133: private String dlqUser;
134:
135: /** The dlq password for the connection */
136: private String dlqPass;
137:
138: // Private stuff
139: private QueueConnection connection;
140: private Queue dlq;
141: private JMSProviderAdapter providerAdapter;
142: private JMSContainerInvoker invoker;
143: private Hashtable resentBuffer = new Hashtable();
144:
145: public DLQHandler(final JMSProviderAdapter providerAdapter,
146: final JMSContainerInvoker invoker) {
147: this .providerAdapter = providerAdapter;
148: this .invoker = invoker;
149: }
150:
151: public void onException(JMSException e) {
152: if (invoker != null && invoker.exListener != null)
153: invoker.exListener.handleFailure(e);
154: else {
155: log
156: .warn(
157: "DLQHandler got JMS Failure but there is no link to JMSContainerInvoker's exception listener.",
158: e);
159:
160: // We shouldn't get here, but if we do, we should at least close the connection
161: if (connection != null) {
162: try {
163: connection.close();
164: } catch (Throwable ignored) {
165: log.trace("Ignored error closing connection",
166: ignored);
167: }
168: connection = null;
169: }
170: }
171: }
172:
173: protected void createService() throws Exception {
174: Context ctx = providerAdapter.getInitialContext();
175:
176: try {
177: String factoryName = providerAdapter.getQueueFactoryRef();
178: QueueConnectionFactory factory = (QueueConnectionFactory) ctx
179: .lookup(factoryName);
180: log.debug("Using factory: " + factory);
181:
182: if (dlqUser == null)
183: connection = factory.createQueueConnection();
184: else
185: connection = factory.createQueueConnection(dlqUser,
186: dlqPass);
187: log.debug("Created connection: " + connection);
188:
189: dlq = (Queue) ctx.lookup(destinationJNDI);
190: log.debug("Using Queue: " + dlq);
191: } finally {
192: ctx.close();
193: }
194: }
195:
196: protected void startService() throws Exception {
197: connection.setExceptionListener(this );
198: connection.start();
199: }
200:
201: protected void stopService() throws Exception {
202: try {
203: connection.setExceptionListener(null);
204: connection.stop();
205: } catch (Throwable t) {
206: log.trace("Ignored error stopping DLQ", t);
207: }
208: }
209:
210: protected void destroyService() throws Exception {
211: // Help the GC
212: if (connection != null)
213: connection.close();
214: connection = null;
215: dlq = null;
216: providerAdapter = null;
217: }
218:
219: /**
220: * Check if a message has been redelivered to many times.
221: *
222: * If message has been redelivered to many times, send it to the
223: * dead letter queue (default to queue/DLQ).
224: *
225: * @return true if message is handled (i.e resent), false if not.
226: */
227: public boolean handleRedeliveredMessage(final Message msg,
228: final Transaction tx) {
229: boolean handled = false;
230: int max = this .maxResent;
231: String id = null;
232: boolean fromMessage = true;
233: int count = 0;
234:
235: try {
236:
237: if (msg.propertyExists(JMS_JBOSS_REDELIVERY_LIMIT))
238: max = msg.getIntProperty(JMS_JBOSS_REDELIVERY_LIMIT);
239:
240: try {
241: if (msg.propertyExists(PROPERTY_DELIVERY_COUNT))
242: count = msg.getIntProperty(PROPERTY_DELIVERY_COUNT);
243: } catch (JMSException ignored) {
244: }
245: if (count > 0) {
246: // The delivery count is one too many
247: --count;
248: } else if (msg.propertyExists(JMS_JBOSS_REDELIVERY_COUNT))
249: count = msg.getIntProperty(JMS_JBOSS_REDELIVERY_COUNT);
250: else {
251: id = msg.getJMSMessageID();
252: if (id == null) {
253: // if we can't get the id we are basically fucked
254: log
255: .error("Message id is null, can't handle message");
256: return false;
257: }
258: count = incrementResentCount(id);
259: fromMessage = false;
260: }
261:
262: if (count > max) {
263: id = msg.getJMSMessageID();
264: log
265: .warn("Message resent too many times; sending it to DLQ; message id="
266: + id);
267:
268: sendMessage(msg);
269: deleteFromBuffer(id);
270:
271: handled = true;
272: } else if (fromMessage == false && tx != null) {
273: // Register a synchronization to remove the buffer entry
274: // should the transaction commit
275: DLQSynchronization synch = new DLQSynchronization(id);
276: try {
277: tx.registerSynchronization(synch);
278: } catch (Exception e) {
279: log.warn(
280: "Error registering DlQ Synchronization with transaction "
281: + tx, e);
282: }
283: }
284: } catch (JMSException e) {
285: // If we can't send it ahead, we do not dare to just drop it...or?
286: log.error("Could not send message to Dead Letter Queue", e);
287: }
288:
289: return handled;
290: }
291:
292: /**
293: * Send message to the configured dead letter queue, defaults to queue/DLQ.
294: */
295: protected void sendMessage(Message msg) throws JMSException {
296: boolean trace = log.isTraceEnabled();
297:
298: QueueSession session = null;
299: QueueSender sender = null;
300:
301: try {
302: msg = makeWritable(msg, trace); // Don't know yet if we are gona clone or not
303:
304: // Set the properties
305: msg.setStringProperty(JBOSS_ORIG_MESSAGEID, msg
306: .getJMSMessageID());
307: // Some providers (say Websphere MQ) don't set this to something we can use
308: Destination d = msg.getJMSDestination();
309: if (d != null)
310: msg.setStringProperty(JBOSS_ORIG_DESTINATION, d
311: .toString());
312:
313: session = connection.createQueueSession(false,
314: Session.AUTO_ACKNOWLEDGE);
315: sender = session.createSender(dlq);
316: if (trace) {
317: log
318: .trace("Sending message to DLQ; destination="
319: + dlq + ", session=" + session
320: + ", sender=" + sender);
321: }
322:
323: sender.send(msg, deliveryMode, priority, timeToLive);
324:
325: if (trace) {
326: log.trace("Message sent.");
327: }
328:
329: } finally {
330: try {
331: if (sender != null)
332: sender.close();
333: if (session != null)
334: session.close();
335: } catch (Exception e) {
336: log.warn("Failed to close sender or session; ignoring",
337: e);
338: }
339: }
340: }
341:
342: /**
343: * Increment the counter for the specific JMS message id.
344: *
345: * @return the new counter value.
346: */
347: protected int incrementResentCount(String id) {
348: BufferEntry entry = null;
349: boolean trace = log.isTraceEnabled();
350: if (!resentBuffer.containsKey(id)) {
351: if (trace)
352: log.trace("Making new entry for id " + id);
353: entry = new BufferEntry();
354: entry.id = id;
355: entry.count = 1;
356: resentBuffer.put(id, entry);
357: } else {
358: entry = (BufferEntry) resentBuffer.get(id);
359: entry.count++;
360: if (trace)
361: log.trace("Incremented old entry for id " + id
362: + " count " + entry.count);
363: }
364: return entry.count;
365: }
366:
367: /**
368: * Delete the entry in the message counter buffer for specifyed JMS id.
369: */
370: protected void deleteFromBuffer(String id) {
371: resentBuffer.remove(id);
372: }
373:
374: /**
375: * Make the Message properties writable.
376: *
377: * @return the writable message.
378: */
379: protected Message makeWritable(Message msg, boolean trace)
380: throws JMSException {
381: HashMap tmp = new HashMap();
382:
383: // Save properties
384: for (Enumeration en = msg.getPropertyNames(); en
385: .hasMoreElements();) {
386: String key = (String) en.nextElement();
387: tmp.put(key, msg.getObjectProperty(key));
388: }
389:
390: // Make them writable
391: msg.clearProperties();
392:
393: Iterator i = tmp.entrySet().iterator();
394: while (i.hasNext()) {
395: Map.Entry me = (Map.Entry) i.next();
396: String key = (String) me.getKey();
397: try {
398: msg.setObjectProperty(key, me.getValue());
399: } catch (JMSException ignored) {
400: if (trace)
401: log.trace("Could not copy message property " + key,
402: ignored);
403: }
404: }
405:
406: return msg;
407: }
408:
409: /**
410: * Takes an MDBConfig Element
411: */
412: public void importXml(final Element element)
413: throws DeploymentException {
414: destinationJNDI = MetaData.getElementContent(MetaData
415: .getUniqueChild(element, "DestinationQueue"));
416:
417: try {
418: String mr = MetaData.getElementContent(MetaData
419: .getUniqueChild(element, "MaxTimesRedelivered"));
420: maxResent = Integer.parseInt(mr);
421: } catch (Exception ignore) {
422: }
423:
424: try {
425: String ttl = MetaData.getElementContent(MetaData
426: .getUniqueChild(element, "TimeToLive"));
427: timeToLive = Long.parseLong(ttl);
428:
429: if (timeToLive < 0) {
430: log.warn("Invalid TimeToLive: " + timeToLive
431: + "; using default");
432: timeToLive = Message.DEFAULT_TIME_TO_LIVE;
433: }
434: } catch (Exception ignore) {
435: }
436:
437: dlqUser = MetaData.getElementContent(MetaData.getOptionalChild(
438: element, "DLQUser"));
439: dlqPass = MetaData.getElementContent(MetaData.getOptionalChild(
440: element, "DLQPassword"));
441: }
442:
443: public String toString() {
444: return super .toString() + "{ destinationJNDI="
445: + destinationJNDI + ", maxResent=" + maxResent
446: + ", timeToLive=" + timeToLive + " }";
447: }
448:
449: private static class BufferEntry {
450: int count;
451: String id;
452: }
453:
454: /**
455: * Remove a redelivered message from the DLQ's buffer when it is acknowledged
456: */
457: protected class DLQSynchronization implements Synchronization {
458: /** The message id */
459: String id;
460:
461: public DLQSynchronization(String id) {
462: this .id = id;
463: }
464:
465: public void beforeCompletion() {
466: }
467:
468: /**
469: * Forget the message when the transaction commits
470: */
471: public void afterCompletion(int status) {
472: if (status == Status.STATUS_COMMITTED)
473: deleteFromBuffer(id);
474: }
475: }
476: }
|