001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.resource.adapter.jms.inflow.dlq;
023:
024: import java.util.Enumeration;
025: import java.util.HashMap;
026: import java.util.Iterator;
027:
028: import javax.jms.Destination;
029: import javax.jms.ExceptionListener;
030: import javax.jms.JMSException;
031: import javax.jms.Message;
032: import javax.jms.Queue;
033: import javax.jms.QueueConnection;
034: import javax.jms.QueueConnectionFactory;
035: import javax.jms.QueueSender;
036: import javax.jms.QueueSession;
037: import javax.jms.TopicConnectionFactory;
038: import javax.naming.Context;
039:
040: import org.jboss.jms.jndi.JMSProviderAdapter;
041: import org.jboss.logging.Logger;
042: import org.jboss.resource.adapter.jms.inflow.DLQHandler;
043: import org.jboss.resource.adapter.jms.inflow.JmsActivation;
044: import org.jboss.resource.adapter.jms.inflow.JmsActivationSpec;
045: import org.jboss.util.naming.Util;
046:
047: /**
048: * An abstract DLQ handler.
049: *
050: * @author <a href="adrian@jboss.com">Adrian Brock</a>
051: * @version $Revision: 57189 $
052: */
053: public abstract class AbstractDLQHandler implements DLQHandler,
054: ExceptionListener {
055: /** The logger */
056: protected static final Logger log = Logger
057: .getLogger(AbstractDLQHandler.class);
058:
059: /** The activation */
060: protected JmsActivation activation;
061:
062: /** The DLQ */
063: protected Queue dlq;
064:
065: /** The DLQ Connection*/
066: protected QueueConnection connection;
067:
068: public boolean handleRedeliveredMessage(Message msg) {
069: boolean handled = handleDelivery(msg);
070: if (handled)
071: sendToDLQ(msg);
072: return handled;
073: }
074:
075: public void messageDelivered(Message msg) {
076: }
077:
078: public void setup(JmsActivation activation, Context ctx)
079: throws Exception {
080: this .activation = activation;
081: setupDLQDestination(ctx);
082: setupDLQConnection(ctx);
083: }
084:
085: public void teardown() {
086: teardownDLQConnection();
087: teardownDLQDestination();
088: }
089:
090: public void onException(JMSException exception) {
091: activation.handleFailure(exception);
092: }
093:
094: /**
095: * Setup the DLQ Destination
096: *
097: * @param ctx the naming context
098: * @throws Exception for any error
099: */
100: protected void setupDLQDestination(Context ctx) throws Exception {
101: String name = activation.getActivationSpec().getDLQJNDIName();
102: dlq = (Queue) Util.lookup(ctx, name, Queue.class);
103: }
104:
105: /**
106: * Teardown the DLQ Destination
107: */
108: protected void teardownDLQDestination() {
109: }
110:
111: /**
112: * Setup the DLQ Connection
113: *
114: * @param ctx the naming context
115: * @throws Exception for any error
116: */
117: protected void setupDLQConnection(Context ctx) throws Exception {
118: JmsActivationSpec spec = activation.getActivationSpec();
119: String user = spec.getDLQUser();
120: String pass = spec.getDLQPassword();
121: String clientID = spec.getDLQClientID();
122: JMSProviderAdapter adapter = activation.getProviderAdapter();
123: String queueFactoryRef = adapter.getQueueFactoryRef();
124: log.debug("Attempting to lookup dlq connection factory "
125: + queueFactoryRef);
126: QueueConnectionFactory qcf = (QueueConnectionFactory) Util
127: .lookup(ctx, queueFactoryRef,
128: QueueConnectionFactory.class);
129: log.debug("Got dlq connection factory " + qcf + " from "
130: + queueFactoryRef);
131: log.debug("Attempting to create queue connection with user "
132: + user);
133: if (user != null)
134: connection = qcf.createQueueConnection(user, pass);
135: else
136: connection = qcf.createQueueConnection();
137: if (clientID != null)
138: connection.setClientID(clientID);
139: connection.setExceptionListener(this );
140: log.debug("Using queue connection " + connection);
141: }
142:
143: /**
144: * Teardown the DLQ Connection
145: */
146: protected void teardownDLQConnection() {
147: try {
148: if (connection != null) {
149: log.debug("Closing the " + connection);
150: connection.close();
151: }
152: } catch (Throwable t) {
153: log.debug("Error closing the connection " + connection, t);
154: }
155: }
156:
157: /**
158: * Do we handle the message?
159: *
160: * @param msg the message to handle
161: * @return true when we handle it
162: */
163: protected abstract boolean handleDelivery(Message msg);
164:
165: /**
166: * Warn that a message is being handled by the DLQ
167: *
168: * @param msg
169: * @param count the number of redelivers
170: * @param max the maximum number of redeliveries
171: */
172: protected void warnDLQ(Message msg, int count, int max) {
173: log.warn("Message redelivered=" + count + " max=" + max
174: + " sending it to the dlq " + msg);
175: }
176:
177: /**
178: * Send the message to the dlq
179: *
180: * @param msg message to send
181: */
182: protected void sendToDLQ(Message msg) {
183: int deliveryMode = getDeliveryMode(msg);
184: int priority = getPriority(msg);
185: long timeToLive = getTimeToLive(msg);
186:
187: // If we get a negative time to live that means the message has expired
188: if (timeToLive < 0) {
189: if (log.isTraceEnabled())
190: log
191: .trace("Not sending the message to the DLQ, it has expired "
192: + msg);
193: return;
194: }
195:
196: Message copy = makeWritable(msg);
197: if (copy != null)
198: doSend(copy, deliveryMode, priority, timeToLive);
199: }
200:
201: /**
202: * Get the delivery mode for the DLQ message
203: *
204: * @param msg the message
205: * @return the delivery mode
206: */
207: protected int getDeliveryMode(Message msg) {
208: try {
209: return msg.getJMSDeliveryMode();
210: } catch (Throwable t) {
211: return Message.DEFAULT_DELIVERY_MODE;
212: }
213: }
214:
215: /**
216: * Get the priority for the DLQ message
217: *
218: * @param msg the message
219: * @return the priority
220: */
221: protected int getPriority(Message msg) {
222: try {
223: return msg.getJMSPriority();
224: } catch (Throwable t) {
225: return Message.DEFAULT_PRIORITY;
226: }
227: }
228:
229: /**
230: * Get the time to live for the DLQ message
231: *
232: * @param msg the message
233: * @return the time to live
234: */
235: protected long getTimeToLive(Message msg) {
236: try {
237: long expires = msg.getJMSExpiration();
238: if (expires == Message.DEFAULT_TIME_TO_LIVE)
239: return Message.DEFAULT_TIME_TO_LIVE;
240: return expires - System.currentTimeMillis();
241: } catch (Throwable t) {
242: return Message.DEFAULT_TIME_TO_LIVE;
243: }
244: }
245:
246: /**
247: * Make a writable copy of the message
248: *
249: * @param msg the message
250: * @return the copied message
251: */
252: protected Message makeWritable(Message msg) {
253: boolean trace = log.isTraceEnabled();
254:
255: try {
256: HashMap tmp = new HashMap();
257:
258: // Save properties
259: for (Enumeration en = msg.getPropertyNames(); en
260: .hasMoreElements();) {
261: String key = (String) en.nextElement();
262: tmp.put(key, msg.getObjectProperty(key));
263: }
264:
265: // Make them writable
266: msg.clearProperties();
267:
268: for (Iterator i = tmp.keySet().iterator(); i.hasNext();) {
269: String key = (String) i.next();
270: try {
271: msg.setObjectProperty(key, tmp.get(key));
272: } catch (JMSException ignored) {
273: if (trace)
274: log.trace("Could not copy message property "
275: + key, ignored);
276: }
277: }
278:
279: msg.setStringProperty(JBOSS_ORIG_MESSAGEID, msg
280: .getJMSMessageID());
281: Destination destination = msg.getJMSDestination();
282: if (destination != null)
283: msg.setStringProperty(JBOSS_ORIG_DESTINATION,
284: destination.toString());
285:
286: return msg;
287: } catch (Throwable t) {
288: log.error("Unable to make writable " + msg, t);
289: return null;
290: }
291: }
292:
293: /**
294: * Do the message send
295: *
296: * @param msg the message
297: */
298: protected void doSend(Message msg, int deliveryMode, int priority,
299: long timeToLive) {
300: QueueSession session = null;
301: try {
302: session = connection.createQueueSession(false,
303: QueueSession.AUTO_ACKNOWLEDGE);
304: QueueSender sender = session.createSender(dlq);
305: sender.send(msg, deliveryMode, priority, timeToLive);
306: } catch (Throwable t) {
307: handleSendError(msg, t);
308: } finally {
309: if (session != null) {
310: try {
311: session.close();
312: } catch (Throwable t) {
313: log.trace("Ignored ", t);
314: }
315: }
316: }
317: }
318:
319: /**
320: * Handle a failure to send the message to the dlq
321: *
322: * @param msg the message
323: * @param t the error
324: */
325: protected void handleSendError(Message msg, Throwable t) {
326: log.error("DLQ " + dlq + " error sending message " + msg, t);
327: }
328: }
|