001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.resource.adapter.jms.inflow;
023:
024: import java.lang.reflect.Method;
025:
026: import javax.jms.Connection;
027: import javax.jms.Destination;
028: import javax.jms.ExceptionListener;
029: import javax.jms.JMSException;
030: import javax.jms.Message;
031: import javax.jms.MessageListener;
032: import javax.jms.Queue;
033: import javax.jms.QueueConnection;
034: import javax.jms.QueueConnectionFactory;
035: import javax.jms.Topic;
036: import javax.jms.TopicConnection;
037: import javax.jms.TopicConnectionFactory;
038: import javax.jms.XAQueueConnectionFactory;
039: import javax.jms.XATopicConnectionFactory;
040: import javax.naming.Context;
041: import javax.resource.ResourceException;
042: import javax.resource.spi.endpoint.MessageEndpointFactory;
043: import javax.resource.spi.work.Work;
044: import javax.resource.spi.work.WorkManager;
045: import javax.transaction.TransactionManager;
046:
047: import org.jboss.jms.jndi.JMSProviderAdapter;
048: import org.jboss.logging.Logger;
049: import org.jboss.resource.adapter.jms.JmsResourceAdapter;
050: import org.jboss.tm.TransactionManagerLocator;
051: import org.jboss.util.Strings;
052: import org.jboss.util.naming.Util;
053:
054: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
055:
056: /**
057: * A generic jms Activation.
058: *
059: * @author <a href="adrian@jboss.com">Adrian Brock</a>
060: * @version $Revision: 60397 $
061: */
062: public class JmsActivation implements ExceptionListener {
063: /** The log */
064: private static final Logger log = Logger
065: .getLogger(JmsActivation.class);
066:
067: /** The onMessage method */
068: public static final Method ONMESSAGE;
069:
070: /** The resource adapter */
071: protected JmsResourceAdapter ra;
072:
073: /** The activation spec */
074: protected JmsActivationSpec spec;
075:
076: /** The message endpoint factory */
077: protected MessageEndpointFactory endpointFactory;
078:
079: /** Whether delivery is active */
080: protected SynchronizedBoolean deliveryActive;
081:
082: /** The jms provider adapter */
083: protected JMSProviderAdapter adapter;
084:
085: /** The destination */
086: protected Destination destination;
087:
088: /** The connection */
089: protected Connection connection;
090:
091: /** The server session pool */
092: protected JmsServerSessionPool pool;
093:
094: /** Is the delivery transacted */
095: protected boolean isDeliveryTransacted;
096:
097: /** The DLQ handler */
098: protected DLQHandler dlqHandler;
099:
100: /** The TransactionManager */
101: protected TransactionManager tm;
102:
103: static {
104: try {
105: ONMESSAGE = MessageListener.class.getMethod("onMessage",
106: new Class[] { Message.class });
107: } catch (Exception e) {
108: throw new RuntimeException(e);
109: }
110: }
111:
112: public JmsActivation(JmsResourceAdapter ra,
113: MessageEndpointFactory endpointFactory,
114: JmsActivationSpec spec) throws ResourceException {
115: this .ra = ra;
116: this .endpointFactory = endpointFactory;
117: this .spec = spec;
118: try {
119: this .isDeliveryTransacted = endpointFactory
120: .isDeliveryTransacted(ONMESSAGE);
121: } catch (Exception e) {
122: throw new ResourceException(e);
123: }
124: }
125:
126: /**
127: * @return the activation spec
128: */
129: public JmsActivationSpec getActivationSpec() {
130: return spec;
131: }
132:
133: /**
134: * @return the message endpoint factory
135: */
136: public MessageEndpointFactory getMessageEndpointFactory() {
137: return endpointFactory;
138: }
139:
140: /**
141: * @return whether delivery is transacted
142: */
143: public boolean isDeliveryTransacted() {
144: return isDeliveryTransacted;
145: }
146:
147: /**
148: * @return the work manager
149: */
150: public WorkManager getWorkManager() {
151: return ra.getWorkManager();
152: }
153:
154: public TransactionManager getTransactionManager() {
155: if (tm == null) {
156: tm = TransactionManagerLocator.getInstance().locate();
157:
158: }
159:
160: return tm;
161: }
162:
163: /**
164: * @return the connection
165: */
166: public Connection getConnection() {
167: return connection;
168: }
169:
170: /**
171: * @return the destination
172: */
173: public Destination getDestination() {
174: return destination;
175: }
176:
177: /**
178: * @return the provider adapter
179: */
180: public JMSProviderAdapter getProviderAdapter() {
181: return adapter;
182: }
183:
184: /**
185: * @return the dlq handler
186: */
187: public DLQHandler getDLQHandler() {
188: return dlqHandler;
189: }
190:
191: /**
192: * Start the activation
193: *
194: * @throws ResourceException for any error
195: */
196: public void start() throws ResourceException {
197: deliveryActive = new SynchronizedBoolean(true);
198: ra.getWorkManager().scheduleWork(new SetupActivation());
199: }
200:
201: /**
202: * Stop the activation
203: */
204: public void stop() {
205: deliveryActive.set(false);
206: teardown();
207: }
208:
209: /**
210: * Handles any failure by trying to reconnect
211: */
212: public void handleFailure(Throwable failure) {
213: log.warn("Failure in jms activation " + spec, failure);
214:
215: while (deliveryActive.get()) {
216: teardown();
217: try {
218: Thread.sleep(spec.getReconnectIntervalLong());
219: } catch (InterruptedException e) {
220: log.debug("Interrupted trying to reconnect " + spec, e);
221: break;
222: }
223:
224: log.info("Attempting to reconnect " + spec);
225: try {
226: setup();
227: log.info("Reconnected with messaging provider.");
228: break;
229: } catch (Throwable t) {
230: log.error("Unable to reconnect " + spec, t);
231: }
232:
233: }
234: }
235:
236: public void onException(JMSException exception) {
237: handleFailure(exception);
238: }
239:
240: public String toString() {
241: StringBuffer buffer = new StringBuffer();
242: buffer.append(Strings.defaultToString(this )).append('(');
243: buffer.append("spec=").append(Strings.defaultToString(spec));
244: buffer.append(" mepf=").append(
245: Strings.defaultToString(endpointFactory));
246: buffer.append(" active=").append(deliveryActive.get());
247: if (destination != null)
248: buffer.append(" destination=").append(destination);
249: if (connection != null)
250: buffer.append(" connection=").append(connection);
251: if (pool != null)
252: buffer.append(" pool=").append(
253: Strings.defaultToString(pool));
254: if (dlqHandler != null)
255: buffer.append(" dlq=").append(
256: Strings.defaultToString(dlqHandler));
257: buffer.append(" transacted=").append(isDeliveryTransacted);
258: buffer.append(')');
259: return buffer.toString();
260: }
261:
262: /**
263: * Setup the activation
264: *
265: * @throws Exception for any error
266: */
267: protected void setup() throws Exception {
268: log.debug("Setting up " + spec);
269:
270: setupJMSProviderAdapter();
271: Context ctx = adapter.getInitialContext();
272: log.debug("Using context " + ctx.getEnvironment() + " for "
273: + spec);
274: try {
275: setupDLQ(ctx);
276: setupDestination(ctx);
277: setupConnection(ctx);
278: } finally {
279: ctx.close();
280: }
281: setupSessionPool();
282:
283: log.debug("Setup complete " + this );
284: }
285:
286: /**
287: * Teardown the activation
288: */
289: protected void teardown() {
290: log.debug("Tearing down " + spec);
291:
292: teardownSessionPool();
293: teardownConnection();
294: teardownDestination();
295: teardownDLQ();
296:
297: log.debug("Tearing down complete " + this );
298: }
299:
300: /**
301: * Get the jms provider
302: */
303: protected void setupJMSProviderAdapter() throws Exception {
304: String providerAdapterJNDI = spec.getProviderAdapterJNDI();
305: if (providerAdapterJNDI.startsWith("java:") == false)
306: providerAdapterJNDI = "java:" + providerAdapterJNDI;
307:
308: log.debug("Retrieving the jms provider adapter "
309: + providerAdapterJNDI + " for " + this );
310: adapter = (JMSProviderAdapter) Util.lookup(providerAdapterJNDI,
311: JMSProviderAdapter.class);
312: log.debug("Using jms provider adapter " + adapter + " for "
313: + this );
314: }
315:
316: /**
317: * Setup the DLQ
318: *
319: * @param ctx the naming context
320: * @throws Exception for any error
321: */
322: protected void setupDLQ(Context ctx) throws Exception {
323: if (spec.isUseDLQ()) {
324: Class clazz = Thread.currentThread()
325: .getContextClassLoader().loadClass(
326: spec.getDLQHandler());
327: dlqHandler = (DLQHandler) clazz.newInstance();
328: dlqHandler.setup(this , ctx);
329: }
330:
331: log.debug("Setup DLQ " + this );
332: }
333:
334: /**
335: * Teardown the DLQ
336: */
337: protected void teardownDLQ() {
338: log.debug("Removing DLQ " + this );
339: try {
340: if (dlqHandler != null)
341: dlqHandler.teardown();
342: } catch (Throwable t) {
343: log.debug("Error tearing down the DLQ " + dlqHandler, t);
344: }
345: dlqHandler = null;
346: }
347:
348: /**
349: * Setup the Destination
350: *
351: * @param ctx the naming context
352: * @throws Exception for any error
353: */
354: protected void setupDestination(Context ctx) throws Exception {
355: Class destinationType;
356: if (spec.isTopic())
357: destinationType = Topic.class;
358: else
359: destinationType = Queue.class;
360:
361: String destinationName = spec.getDestination();
362: log.debug("Retrieving destination " + destinationName
363: + " of type " + destinationType.getName());
364: destination = (Destination) Util.lookup(ctx, destinationName,
365: destinationType);
366: log.debug("Got destination " + destination + " from "
367: + destinationName);
368: }
369:
370: /**
371: * Teardown the destination
372: */
373: protected void teardownDestination() {
374: }
375:
376: /**
377: * Setup the Connection
378: *
379: * @param ctx the naming context
380: * @throws Exception for any error
381: */
382: protected void setupConnection(Context ctx) throws Exception {
383: log.debug("setup connection " + this );
384:
385: String user = spec.getUser();
386: String pass = spec.getPassword();
387: String clientID = spec.getClientId();
388: if (spec.isTopic())
389: connection = setupTopicConnection(ctx, user, pass, clientID);
390: else
391: connection = setupQueueConnection(ctx, user, pass, clientID);
392:
393: log.debug("established connection " + this );
394: }
395:
396: /**
397: * Setup a Queue Connection
398: *
399: * @param ctx the naming context
400: * @param user the user
401: * @param pass the password
402: * @param clientID the client id
403: * @throws Exception for any error
404: */
405: protected QueueConnection setupQueueConnection(Context ctx,
406: String user, String pass, String clientID) throws Exception {
407: String queueFactoryRef = adapter.getQueueFactoryRef();
408: log.debug("Attempting to lookup queue connection factory "
409: + queueFactoryRef);
410: QueueConnectionFactory qcf = (QueueConnectionFactory) Util
411: .lookup(ctx, queueFactoryRef,
412: QueueConnectionFactory.class);
413: log.debug("Got queue connection factory " + qcf + " from "
414: + queueFactoryRef);
415: log.debug("Attempting to create queue connection with user "
416: + user);
417: QueueConnection result;
418: if (qcf instanceof XAQueueConnectionFactory
419: && isDeliveryTransacted) {
420: XAQueueConnectionFactory xaqcf = (XAQueueConnectionFactory) qcf;
421: if (user != null)
422: result = xaqcf.createXAQueueConnection(user, pass);
423: else
424: result = xaqcf.createXAQueueConnection();
425: } else {
426: if (user != null)
427: result = qcf.createQueueConnection(user, pass);
428: else
429: result = qcf.createQueueConnection();
430: }
431: if (clientID != null)
432: result.setClientID(clientID);
433: result.setExceptionListener(this );
434: log.debug("Using queue connection " + result);
435: return result;
436: }
437:
438: /**
439: * Setup a Topic Connection
440: *
441: * @param ctx the naming context
442: * @param user the user
443: * @param pass the password
444: * @param clientID the client id
445: * @throws Exception for any error
446: */
447: protected TopicConnection setupTopicConnection(Context ctx,
448: String user, String pass, String clientID) throws Exception {
449: String topicFactoryRef = adapter.getTopicFactoryRef();
450: log.debug("Attempting to lookup topic connection factory "
451: + topicFactoryRef);
452: TopicConnectionFactory tcf = (TopicConnectionFactory) Util
453: .lookup(ctx, topicFactoryRef,
454: TopicConnectionFactory.class);
455: log.debug("Got topic connection factory " + tcf + " from "
456: + topicFactoryRef);
457: log.debug("Attempting to create topic connection with user "
458: + user);
459: TopicConnection result;
460: if (tcf instanceof XATopicConnectionFactory
461: && isDeliveryTransacted) {
462: XATopicConnectionFactory xatcf = (XATopicConnectionFactory) tcf;
463: if (user != null)
464: result = xatcf.createXATopicConnection(user, pass);
465: else
466: result = xatcf.createXATopicConnection();
467: } else {
468: if (user != null)
469: result = tcf.createTopicConnection(user, pass);
470: else
471: result = tcf.createTopicConnection();
472: }
473: if (clientID != null)
474: result.setClientID(clientID);
475: result.setExceptionListener(this );
476: log.debug("Using topic connection " + result);
477: return result;
478: }
479:
480: /**
481: * Teardown the connection
482: */
483: protected void teardownConnection() {
484: try {
485: if (connection != null) {
486: log.debug("Closing the " + connection);
487: connection.close();
488: }
489: } catch (Throwable t) {
490: log.debug("Error closing the connection " + connection, t);
491: }
492: connection = null;
493: }
494:
495: /**
496: * Setup the server session pool
497: *
498: * @throws Exception for any error
499: */
500: protected void setupSessionPool() throws Exception {
501: pool = new JmsServerSessionPool(this );
502: log.debug("Created session pool " + pool);
503:
504: log.debug("Starting session pool " + pool);
505: pool.start();
506: log.debug("Started session pool " + pool);
507:
508: log.debug("Starting delivery " + connection);
509: connection.start();
510: log.debug("Started delivery " + connection);
511: }
512:
513: /**
514: * Teardown the server session pool
515: */
516: protected void teardownSessionPool() {
517: try {
518: if (connection != null) {
519: log.debug("Stopping delivery " + connection);
520: connection.stop();
521: }
522: } catch (Throwable t) {
523: log.debug("Error stopping delivery " + connection, t);
524: }
525:
526: try {
527: if (pool != null) {
528: log.debug("Stopping the session pool " + pool);
529: pool.stop();
530: }
531: } catch (Throwable t) {
532: log.debug("Error clearing the pool " + pool, t);
533: }
534: }
535:
536: /**
537: * Handles the setup
538: */
539: private class SetupActivation implements Work {
540: public void run() {
541: try {
542: setup();
543: } catch (Throwable t) {
544: handleFailure(t);
545: }
546: }
547:
548: public void release() {
549: }
550: }
551: }
|