001: /*
002: * Copyright 2004,2005 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.apache.synapse.transport.jms;
017:
018: import org.apache.commons.logging.Log;
019: import org.apache.commons.logging.LogFactory;
020: import org.apache.axis2.addressing.EndpointReference;
021: import org.apache.axis2.context.ConfigurationContext;
022: import org.apache.synapse.transport.base.BaseUtils;
023:
024: import javax.jms.*;
025: import javax.naming.Context;
026: import javax.naming.InitialContext;
027: import javax.naming.NameNotFoundException;
028: import javax.naming.NamingException;
029: import java.util.HashMap;
030: import java.util.Hashtable;
031: import java.util.Iterator;
032: import java.util.Map;
033:
034: /**
035: * Encapsulate a JMS Connection factory definition within an Axis2.xml
036: * <p/>
037: * More than one JMS connection factory could be defined within an Axis2 XML
038: * specifying the JMSListener as the transportReceiver.
039: * <p/>
040: * These connection factories are created at the initialization of the
041: * transportReceiver, and any service interested in using any of these could
042: * specify the name of the factory and the destination through Parameters named
043: * JMSConstants.CONFAC_PARAM and JMSConstants.DEST_PARAM as shown below.
044: * <p/>
045: * <parameter name="transport.jms.ConnectionFactory" locked="true">myQueueConnectionFactory</parameter>
046: * <parameter name="transport.jms.Destination" locked="true">TestQueue</parameter>
047: * <p/>
048: * If a connection factory is defined by a parameter named
049: * JMSConstants.DEFAULT_CONFAC_NAME in the Axis2 XML, services which does not
050: * explicitly specify a connection factory will be defaulted to it - if it is
051: * defined in the Axis2 configuration.
052: * <p/>
053: * e.g.
054: * <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
055: * <parameter name="myTopicConnectionFactory" locked="false">
056: * <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
057: * <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
058: * <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
059: * <parameter name="transport.jms.Destination" locked="false">myTopicOne, myTopicTwo</parameter>
060: * </parameter>
061: * <parameter name="myQueueConnectionFactory" locked="false">
062: * <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
063: * <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
064: * <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
065: * <parameter name="transport.jms.Destination" locked="false">myQueueOne, myQueueTwo</parameter>
066: * </parameter>
067: * <parameter name="default" locked="false">
068: * <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
069: * <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
070: * <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">ConnectionFactory</parameter>
071: * <parameter name="transport.jms.Destination" locked="false">myDestinationOne, myDestinationTwo</parameter>
072: * </parameter>
073: * </transportReceiver>
074: */
075: public class JMSConnectionFactory implements ExceptionListener {
076:
077: private static final Log log = LogFactory
078: .getLog(JMSConnectionFactory.class);
079:
080: /** The name used for the connection factory definition within Axis2 */
081: private String name = null;
082: /** The JNDI name of the actual connection factory */
083: private String connFactoryJNDIName = null;
084: /** Map of destination JNDI names to service names */
085: private Map serviceJNDINameMapping = null;
086: /** Map of destination JNDI names to destination types*/
087: private Map destinationTypeMapping = null;
088: /** Map of JMS destination names to service names */
089: private Map serviceDestinationNameMapping = null;
090: /** JMS Sessions currently active. One session for each Destination / Service */
091: private Map jmsSessions = null;
092: /** Properties of the connection factory to acquire the initial context */
093: private Hashtable jndiProperties = null;
094: /** The JNDI Context used - created using the properties */
095: private Context context = null;
096: /** The actual ConnectionFactory instance held within */
097: private ConnectionFactory conFactory = null;
098: /** The JMS connection factory type */
099: private String connectionFactoryType = null;
100: /** The JMS Connection opened */
101: private Connection connection = null;
102: /** The JMS Message receiver for this connection factory */
103: private JMSMessageReceiver jmsMessageReceiver = null;
104: /** The axis2 configuration context */
105: private ConfigurationContext cfgCtx = null;
106: /** if connection dropped, reconnect timeout in milliseconds; default 30 seconds */
107: private long reconnectTimeout = 30000;
108:
109: /**
110: * Create a JMSConnectionFactory for the given [axis2] name the
111: * JNDI name of the actual ConnectionFactory
112: *
113: * @param name the connection factory name specified in the axis2.xml for the
114: * TransportListener or the TransportSender using this
115: * @param cfgCtx the axis2 configuration context
116: */
117: public JMSConnectionFactory(String name, ConfigurationContext cfgCtx) {
118: this .name = name;
119: this .cfgCtx = cfgCtx;
120: serviceJNDINameMapping = new HashMap();
121: destinationTypeMapping = new HashMap();
122: serviceDestinationNameMapping = new HashMap();
123: jndiProperties = new Hashtable();
124: jmsSessions = new HashMap();
125: }
126:
127: /**
128: * Add a listen destination on this connection factory on behalf of the given service
129: *
130: * @param destinationJNDIName destination JNDI name
131: * @param serviceName the service to which it belongs
132: */
133: public void addDestination(String destinationJNDIName,
134: String destinationType, String serviceName) {
135:
136: String destinationName = getPhysicalDestinationName(destinationJNDIName);
137:
138: if (destinationName == null) {
139: log.warn("JMS Destination with JNDI name : "
140: + destinationJNDIName + " does not exist");
141:
142: try {
143: log
144: .info("Creating a JMS Queue with the JNDI name : "
145: + destinationJNDIName
146: + " using the connection factory definition named : "
147: + name);
148: JMSUtils.createDestination(conFactory,
149: destinationJNDIName, destinationType);
150:
151: destinationName = getPhysicalDestinationName(destinationJNDIName);
152:
153: } catch (JMSException e) {
154: log.error(
155: "Unable to create Destination with JNDI name : "
156: + destinationJNDIName, e);
157: BaseUtils.markServiceAsFaulty(serviceName,
158: "Error creating JMS destination : "
159: + destinationJNDIName, cfgCtx
160: .getAxisConfiguration());
161: return;
162: }
163: }
164:
165: serviceJNDINameMapping.put(destinationJNDIName, serviceName);
166: destinationTypeMapping
167: .put(destinationJNDIName, destinationType);
168: serviceDestinationNameMapping.put(destinationName, serviceName);
169:
170: log.info("Mapped JNDI name : " + destinationJNDIName
171: + " and JMS Destination name : " + destinationName
172: + " against service : " + serviceName);
173: }
174:
175: /**
176: * Abort listening on the JMS destination from this connection factory
177: *
178: * @param jndiDestinationName the JNDI name of the JMS destination to be removed
179: */
180: public void removeDestination(String jndiDestinationName) {
181:
182: // find and save provider specific Destination name before we delete
183: String providerSpecificDestination = getPhysicalDestinationName(jndiDestinationName);
184: stoplisteningOnDestination(jndiDestinationName);
185:
186: serviceJNDINameMapping.remove(jndiDestinationName);
187: if (providerSpecificDestination != null) {
188: serviceDestinationNameMapping
189: .remove(providerSpecificDestination);
190: }
191: }
192:
193: /**
194: * Begin [or restart] listening for messages on the list of destinations associated
195: * with this connection factory. (Called during Axis2 initialization of
196: * the Transport receivers, or after a disconnection has been detected)
197: *
198: * When called from the JMS transport sender, this call simply acquires the actual
199: * JMS connection factory from the JNDI, creates a new connection and starts it.
200: *
201: * @throws JMSException on exceptions
202: * @throws NamingException on exceptions
203: */
204: public synchronized void connectAndListen() throws JMSException,
205: NamingException {
206:
207: // if this is a reconnection/re-initialization effort after the detection of a
208: // disconnection, close all sessions and the CF connection and re-initialize
209: if (connection != null) {
210: log.info("Re-initializing the JMS connection factory : "
211: + name);
212:
213: Iterator sessionIter = jmsSessions.values().iterator();
214: while (sessionIter.hasNext()) {
215: try {
216: ((Session) sessionIter.next()).close();
217: } catch (JMSException ignore) {
218: }
219: }
220: try {
221: connection.stop();
222: } catch (JMSException ignore) {
223: }
224:
225: } else {
226: if (log.isDebugEnabled()) {
227: log.debug("Initializing the JMS connection factory : "
228: + name);
229: }
230: }
231:
232: // get the CF reference freshly [again] from JNDI
233: context = new InitialContext(jndiProperties);
234: conFactory = (ConnectionFactory) context
235: .lookup(connFactoryJNDIName);
236: log.info("Connected to the JMS connection factory : "
237: + connFactoryJNDIName);
238:
239: try {
240: ConnectionFactory conFac = null;
241: QueueConnectionFactory qConFac = null;
242: TopicConnectionFactory tConFac = null;
243: if (JMSConstants.DESTINATION_TYPE_QUEUE
244: .equals(getConnectionFactoryType())) {
245: qConFac = (QueueConnectionFactory) conFactory;
246: } else if (JMSConstants.DESTINATION_TYPE_TOPIC
247: .equals(getConnectionFactoryType())) {
248: tConFac = (TopicConnectionFactory) conFactory;
249: } else {
250: handleException(
251: "Unable to determine type of Connection Factory - i.e. Queue/Topic",
252: null);
253: }
254:
255: String user = (String) jndiProperties
256: .get(Context.SECURITY_PRINCIPAL);
257: String pass = (String) jndiProperties
258: .get(Context.SECURITY_CREDENTIALS);
259:
260: if (user != null && pass != null) {
261: if (qConFac != null) {
262: connection = qConFac.createQueueConnection(user,
263: pass);
264: } else if (tConFac != null) {
265: connection = tConFac.createTopicConnection(user,
266: pass);
267: }
268: } else {
269: if (qConFac != null) {
270: connection = qConFac.createQueueConnection();
271: } else if (tConFac != null) {
272: connection = tConFac.createTopicConnection();
273: }
274: }
275:
276: connection.setExceptionListener(this );
277:
278: } catch (JMSException e) {
279: handleException("Error connecting to Connection Factory : "
280: + connFactoryJNDIName, e);
281: }
282:
283: Iterator destJNDINameIter = serviceJNDINameMapping.keySet()
284: .iterator();
285: while (destJNDINameIter.hasNext()) {
286: String destJNDIName = (String) destJNDINameIter.next();
287: String destinationType = (String) destinationTypeMapping
288: .get(destJNDIName);
289: startListeningOnDestination(destJNDIName, destinationType);
290: }
291:
292: connection.start(); // indicate readyness to start receiving messages
293: log.info("Connection factory : " + name + " initialized...");
294: }
295:
296: /**
297: * Create a session for sending to the given destination and save it on the jmsSessions Map
298: * keyed by the destinatin JNDI name
299: * @param destinationJNDIname the destination JNDI name
300: * @return a JMS Session to send messages to the destination using this connection factory
301: */
302: public Session getSessionForDestination(String destinationJNDIname) {
303:
304: Session session = (Session) jmsSessions
305: .get(destinationJNDIname);
306:
307: if (session == null) {
308: try {
309: Destination dest = (Destination) getPhysicalDestination(destinationJNDIname);
310:
311: if (dest instanceof Topic) {
312: session = ((TopicConnection) connection)
313: .createTopicSession(false,
314: Session.AUTO_ACKNOWLEDGE);
315: } else {
316: session = ((QueueConnection) connection)
317: .createQueueSession(false,
318: Session.AUTO_ACKNOWLEDGE);
319: }
320:
321: jmsSessions.put(destinationJNDIname, session);
322:
323: } catch (JMSException e) {
324: handleException(
325: "Unable to create a session using connection factory : "
326: + name, e);
327: }
328: }
329: return session;
330: }
331:
332: /**
333: * Listen on the given destination from this connection factory. Used to
334: * start listening on a destination associated with a newly deployed service
335: *
336: * @param destinationJNDIname the JMS destination to listen on
337: */
338: public void startListeningOnDestination(String destinationJNDIname,
339: String destinationType) {
340:
341: Session session = (Session) jmsSessions
342: .get(destinationJNDIname);
343: // if we already had a session open, close it first
344: if (session != null) {
345: try {
346: session.close();
347: } catch (JMSException ignore) {
348: }
349: }
350:
351: try {
352: session = JMSUtils.createSession(connection, false,
353: Session.AUTO_ACKNOWLEDGE, destinationType);
354: Destination destination = null;
355:
356: try {
357: destination = (Destination) context
358: .lookup(destinationJNDIname);
359:
360: } catch (NameNotFoundException e) {
361: log.warn("Cannot find destination : "
362: + destinationJNDIname + ". Creating a Queue");
363: destination = JMSUtils.createDestination(session,
364: destinationJNDIname, destinationType);
365: }
366:
367: MessageConsumer consumer = JMSUtils.createConsumer(session,
368: destination);
369: consumer.setMessageListener(jmsMessageReceiver);
370: jmsSessions.put(destinationJNDIname, session);
371:
372: // catches NameNotFound and JMSExceptions and marks service as faulty
373: } catch (Exception e) {
374: if (session != null) {
375: try {
376: session.close();
377: } catch (JMSException ignore) {
378: }
379: }
380:
381: BaseUtils.markServiceAsFaulty(
382: (String) serviceJNDINameMapping
383: .get(destinationJNDIname),
384: "Error looking up JMS destination : "
385: + destinationJNDIname, cfgCtx
386: .getAxisConfiguration());
387: }
388: }
389:
390: /**
391: * Stop listening on the given destination - for undeployment or stopping of services
392: * closes the underlying Session opened to subscribe to the destination
393: *
394: * @param destinationJNDIname the JNDI name of the JMS destination
395: */
396: private void stoplisteningOnDestination(String destinationJNDIname) {
397: Session session = (Session) jmsSessions
398: .get(destinationJNDIname);
399: if (session != null) {
400: try {
401: session.close();
402: } catch (JMSException ignore) {
403: }
404: }
405: }
406:
407: /**
408: * Close all connections, sessions etc.. and stop this connection factory
409: */
410: public void stop() {
411: if (connection != null) {
412: Iterator sessionIter = jmsSessions.values().iterator();
413: while (sessionIter.hasNext()) {
414: try {
415: ((Session) sessionIter.next()).close();
416: } catch (JMSException ignore) {
417: }
418: }
419: try {
420: connection.close();
421: } catch (JMSException e) {
422: log.warn("Error shutting down connection factory : "
423: + name, e);
424: }
425: }
426: }
427:
428: /**
429: * Return the provider specific [physical] Destination name if any
430: * for the destination with the given JNDI name
431: *
432: * @param destinationJndi the JNDI name of the destination
433: * @return the provider specific Destination name or null if cannot be found
434: */
435: private String getPhysicalDestinationName(String destinationJndi) {
436: Destination destination = getPhysicalDestination(destinationJndi);
437:
438: if (destination != null) {
439: try {
440: if (destination instanceof Queue) {
441: return ((Queue) destination).getQueueName();
442: } else if (destination instanceof Topic) {
443: return ((Topic) destination).getTopicName();
444: }
445: } catch (JMSException e) {
446: log.warn(
447: "Error reading Destination name for JNDI destination : "
448: + destinationJndi, e);
449: }
450: }
451: return null;
452: }
453:
454: /**
455: * Return the provider specific [physical] Destination if any
456: * for the destination with the given JNDI name
457: *
458: * @param destinationJndi the JNDI name of the destination
459: * @return the provider specific Destination or null if cannot be found
460: */
461: private Destination getPhysicalDestination(String destinationJndi) {
462: Destination destination = null;
463:
464: try {
465: destination = (Destination) context.lookup(destinationJndi);
466: } catch (NamingException e) {
467:
468: // if we are using ActiveMQ, check for dynamic Queues and Topics
469: String provider = (String) jndiProperties
470: .get(Context.INITIAL_CONTEXT_FACTORY);
471: if (provider.indexOf("activemq") != -1) {
472: try {
473: destination = (Destination) context
474: .lookup(JMSConstants.ACTIVEMQ_DYNAMIC_QUEUE
475: + destinationJndi);
476: } catch (NamingException ne) {
477: try {
478: destination = (Destination) context
479: .lookup(JMSConstants.ACTIVEMQ_DYNAMIC_TOPIC
480: + destinationJndi);
481: } catch (NamingException e1) {
482: log
483: .warn("Error looking up destination for JNDI name : "
484: + destinationJndi);
485: }
486: }
487: }
488: }
489:
490: return destination;
491: }
492:
493: /**
494: * Return the EPR for the JMS Destination with the given JNDI name
495: * when using this connection factory
496: * @param jndiDestination the JNDI name of the JMS Destionation
497: * @return the EPR for a service using this destination
498: */
499: public EndpointReference getEPRForDestination(String jndiDestination) {
500:
501: StringBuffer sb = new StringBuffer();
502: sb.append(JMSConstants.JMS_PREFIX).append(jndiDestination);
503: sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM)
504: .append("=").append(getConnFactoryJNDIName());
505: Iterator props = getJndiProperties().keySet().iterator();
506: while (props.hasNext()) {
507: String key = (String) props.next();
508: String value = (String) getJndiProperties().get(key);
509: sb.append("&").append(key).append("=").append(value);
510: }
511:
512: return new EndpointReference(sb.toString());
513: }
514:
515: /**
516: * Is this connection factory referring to the same underlying connection factory passed in
517: *
518: * @param o a JMSOutTransport object which specifies a connection factory
519: * @return true if this instance could be substituted for the out-transport
520: */
521: public boolean equals(Object o) {
522: if (o instanceof JMSOutTransportInfo) {
523: JMSOutTransportInfo trpInfo = (JMSOutTransportInfo) o;
524:
525: Map trpProps = trpInfo.getProperties();
526: if (equals(trpProps
527: .get(JMSConstants.CONFAC_JNDI_NAME_PARAM),
528: jndiProperties
529: .get(JMSConstants.CONFAC_JNDI_NAME_PARAM))
530: && equals(
531: trpProps
532: .get(Context.INITIAL_CONTEXT_FACTORY),
533: jndiProperties
534: .get(Context.INITIAL_CONTEXT_FACTORY))
535: && equals(trpProps.get(Context.PROVIDER_URL),
536: jndiProperties.get(Context.PROVIDER_URL))
537: && equals(trpProps.get(Context.SECURITY_PRINCIPAL),
538: jndiProperties
539: .get(Context.SECURITY_PRINCIPAL))
540: && equals(trpProps
541: .get(Context.SECURITY_CREDENTIALS),
542: jndiProperties
543: .get(Context.SECURITY_CREDENTIALS))) {
544: return true;
545: }
546: }
547: return false;
548: }
549:
550: /**
551: * Prevents NullPointerException when s1 is null.
552: * If both values are null this returns true
553: */
554: private boolean equals(Object s1, Object s2) {
555: if (s1 == s2) {
556: return true;
557: } else if (s1 != null && s1.equals(s2)) {
558: return true;
559: } else {
560: return false;
561: }
562: }
563:
564: // -------------------- getters and setters and trivial methods --------------------
565: /**
566: * Return the service name using the JMS destination given by the JNDI name
567: *
568: * @param jmsDestinationName the JMS destination name
569: * @return the name of the service using the destination
570: */
571: public String getServiceNameForDestinationName(
572: String jmsDestinationName) {
573: return (String) serviceDestinationNameMapping
574: .get(jmsDestinationName);
575: }
576:
577: /**
578: * Return the service name using the JMS destination and its JNDI name
579: *
580: * @param dest the JMS Destination Queue or Topic
581: * @param jmsDestinationName the JMS destination name
582: * @return the name of the service using the destination
583: */
584: public String getServiceNameForDestination(Destination dest,
585: String jmsDestinationName) {
586: String serviceName = (String) serviceDestinationNameMapping
587: .get(jmsDestinationName);
588:
589: // hack to get around the crazy Active MQ dynamic queue and topic issues
590: if (serviceName == null) {
591: String provider = (String) getJndiProperties().get(
592: Context.INITIAL_CONTEXT_FACTORY);
593: if (provider.indexOf("activemq") != -1) {
594: serviceName = getServiceNameForJNDIName((dest instanceof Queue ? JMSConstants.ACTIVEMQ_DYNAMIC_QUEUE
595: : JMSConstants.ACTIVEMQ_DYNAMIC_TOPIC)
596: + jmsDestinationName);
597: }
598: }
599: return serviceName;
600: }
601:
602: /**
603: * Return the service name using the JMS destination given by the JNDI name
604: *
605: * @param jndiDestinationName the JNDI name of the destination
606: * @return the name of the service using the destination
607: */
608: public String getServiceNameForJNDIName(String jndiDestinationName) {
609: return (String) serviceJNDINameMapping.get(jndiDestinationName);
610: }
611:
612: public void setConnFactoryJNDIName(String connFactoryJNDIName) {
613: this .connFactoryJNDIName = connFactoryJNDIName;
614: }
615:
616: public Destination getDestination(String destinationJNDIName) {
617: try {
618: return (Destination) context.lookup(destinationJNDIName);
619: } catch (NamingException ignore) {
620: }
621: return null;
622: }
623:
624: public void addJNDIContextProperty(String key, String value) {
625: jndiProperties.put(key, value);
626: }
627:
628: public String getName() {
629: return name;
630: }
631:
632: public String getConnFactoryJNDIName() {
633: return connFactoryJNDIName;
634: }
635:
636: public ConnectionFactory getConFactory() {
637: return conFactory;
638: }
639:
640: public Hashtable getJndiProperties() {
641: return jndiProperties;
642: }
643:
644: public JMSMessageReceiver getJmsMessageReceiver() {
645: return jmsMessageReceiver;
646: }
647:
648: public Context getContext() {
649: return context;
650: }
651:
652: public void setJmsMessageReceiver(
653: JMSMessageReceiver jmsMessageReceiver) {
654: this .jmsMessageReceiver = jmsMessageReceiver;
655: }
656:
657: private void handleException(String msg, Exception e)
658: throws AxisJMSException {
659: log.error(msg, e);
660: throw new AxisJMSException(msg, e);
661: }
662:
663: public String getConnectionFactoryType() {
664: return connectionFactoryType;
665: }
666:
667: public void setConnectionFactoryType(String connectionFactoryType) {
668: this .connectionFactoryType = connectionFactoryType;
669: }
670:
671: public long getReconnectTimeout() {
672: return reconnectTimeout;
673: }
674:
675: public void setReconnectTimeout(long reconnectTimeout) {
676: this .reconnectTimeout = reconnectTimeout;
677: }
678:
679: public void onException(JMSException e) {
680: log.error("JMS connection factory " + name
681: + " encountered an error", e);
682: boolean wasError = true;
683:
684: // try to connect
685: // if error occurs wait and try again
686: while (wasError == true) {
687:
688: try {
689: connectAndListen();
690: wasError = false;
691:
692: } catch (Exception e1) {
693: log.warn(
694: "JMS reconnection attempt failed for connection factory : "
695: + name, e);
696: }
697:
698: if (wasError == true) {
699: try {
700: log
701: .info("Attempting reconnection for connection factory "
702: + name
703: + " in "
704: + getReconnectTimeout()
705: / 1000
706: + " seconds");
707: Thread.sleep(getReconnectTimeout());
708: } catch (InterruptedException ignore) {
709: }
710: }
711: } // wasError
712:
713: }
714: }
|