001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.axis2.transport.jms;
020:
021: import org.apache.commons.logging.Log;
022: import org.apache.commons.logging.LogFactory;
023: import org.apache.axis2.addressing.EndpointReference;
024:
025: import javax.jms.*;
026: import javax.naming.Context;
027: import javax.naming.InitialContext;
028: import javax.naming.NameNotFoundException;
029: import javax.naming.NamingException;
030: import java.util.HashMap;
031: import java.util.Hashtable;
032: import java.util.Iterator;
033: import java.util.Map;
034:
035: /**
036: * Encapsulate a JMS Connection factory definition within an Axis2.xml
037: * <p/>
038: * More than one JMS connection factory could be defined within an Axis2 XML
039: * specifying the JMSListener as the transportReceiver.
040: * <p/>
041: * These connection factories are created at the initialization of the
042: * transportReceiver, and any service interested in using any of these could
043: * specify the name of the factory and the destination through Parameters named
044: * JMSConstants.CONFAC_PARAM and JMSConstants.DEST_PARAM as shown below.
045: * <p/>
046: * <parameter name="transport.jms.ConnectionFactory" locked="true">myQueueConnectionFactory</parameter>
047: * <parameter name="transport.jms.Destination" locked="true">TestQueue</parameter>
048: * <p/>
049: * If a connection factory is defined by a parameter named
050: * JMSConstants.DEFAULT_CONFAC_NAME in the Axis2 XML, services which does not
051: * explicitly specify a connection factory will be defaulted to it - if it is
052: * defined in the Axis2 configuration.
053: * <p/>
054: * e.g.
055: * <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
056: * <parameter name="myTopicConnectionFactory" locked="false">
057: * <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
058: * <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
059: * <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
060: * <parameter name="transport.jms.Destination" locked="false">myTopicOne, myTopicTwo</parameter>
061: * </parameter>
062: * <parameter name="myQueueConnectionFactory" locked="false">
063: * <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
064: * <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
065: * <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
066: * <parameter name="transport.jms.Destination" locked="false">myQueueOne, myQueueTwo</parameter>
067: * </parameter>
068: * <parameter name="default" locked="false">
069: * <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
070: * <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
071: * <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">ConnectionFactory</parameter>
072: * <parameter name="transport.jms.Destination" locked="false">myDestinationOne, myDestinationTwo</parameter>
073: * </parameter>
074: * </transportReceiver>
075: */
076: public class JMSConnectionFactory {
077:
078: private static final Log log = LogFactory
079: .getLog(JMSConnectionFactory.class);
080:
081: /**
082: * The name used for the connection factory definition within Axis2
083: */
084: private String name = null;
085: /**
086: * The JNDI name of the actual connection factory
087: */
088: private String jndiName = null;
089: /**
090: * The JNDI name of the actual connection factory username
091: */
092: private String jndiUser = null;
093: /**
094: * The JNDI name of the actual connection factory password
095: */
096: private String jndiPass = null;
097: /**
098: * Map of destination JNDI names to service names
099: */
100: private Map serviceJNDINameMapping = null;
101: /**
102: * Map of destinations to service names
103: */
104: private Map serviceDestinationMapping = null;
105: /**
106: * The JMS Sessions listening for messages
107: */
108: private Map jmsSessions = null;
109: /**
110: * Properties of the connection factory
111: */
112: private Hashtable properties = null;
113: /**
114: * The JNDI Context used
115: */
116: private Context context = null;
117: /**
118: * The actual ConnectionFactory instance held within
119: */
120: private ConnectionFactory conFactory = null;
121: /**
122: * The JMS Connection is opened.
123: */
124: private Connection connection = null;
125: /**
126: * The JMS Message receiver for this connection factory
127: */
128: private JMSMessageReceiver msgRcvr = null;
129: /**
130: * The actual password for the connection factory after retrieval from JNDI.
131: * If this is not supplied, the OS username will be used by default
132: */
133: private String user = null;
134: /**
135: * The actual password for the connection factory after retrieval from JNDI.
136: * If this is not supplied, the OS credentials will be used by default
137: */
138: private String pass = null;
139:
140: /**
141: * Create a JMSConnectionFactory for the given Axis2 name and JNDI name
142: *
143: * @param name the local Axis2 name of the connection factory
144: * @param jndiName the JNDI name of the actual connection factory used
145: */
146: JMSConnectionFactory(String name, String jndiName) {
147: this .name = name;
148: this .jndiName = jndiName;
149: serviceJNDINameMapping = new HashMap();
150: serviceDestinationMapping = new HashMap();
151: properties = new Hashtable();
152: jmsSessions = new HashMap();
153: }
154:
155: /**
156: * Create a JMSConnectionFactory for the given Axis2 name
157: *
158: * @param name the local Axis2 name of the connection factory
159: */
160: JMSConnectionFactory(String name) {
161: this (name, null);
162: }
163:
164: /**
165: * Connect to the actual JMS connection factory specified by the JNDI name
166: *
167: * @throws NamingException if the connection factory cannot be found
168: */
169: public void connect() throws NamingException {
170: if (context == null) {
171: createInitialContext();
172: }
173: conFactory = (ConnectionFactory) context.lookup(jndiName);
174:
175: if (jndiUser != null)
176: user = (String) context.lookup(jndiUser);
177:
178: if (jndiPass != null)
179: pass = (String) context.lookup(jndiPass);
180:
181: log.debug("Connected to the actual connection factory : "
182: + jndiName);
183: }
184:
185: /**
186: * Creates the initial context using the set properties
187: *
188: * @throws NamingException
189: */
190: private void createInitialContext() throws NamingException {
191: context = new InitialContext(properties);
192: }
193:
194: /**
195: * Set the JNDI connection factory name
196: *
197: * @param jndiName
198: */
199: public void setJndiName(String jndiName) {
200: this .jndiName = jndiName;
201: }
202:
203: /**
204: * Get the JNDI name of the actual factory username
205: *
206: * @return the jndi name of the actual connection factory username
207: */
208: public void setJndiUser(String jndiUser) {
209: this .jndiUser = jndiUser;
210: }
211:
212: /**
213: * Get the JNDI name of the actual factory password
214: *
215: * @return the jndi name of the actual connection factory password
216: */
217: public void setJndiPass(String jndiPass) {
218: this .jndiPass = jndiPass;
219: }
220:
221: /**
222: * Add a listen destination on this connection factory on behalf of the given service
223: *
224: * @param destinationJndi destination JNDI name
225: * @param serviceName the service to which it belongs
226: */
227: public void addDestination(String destinationJndi,
228: String serviceName) {
229:
230: serviceJNDINameMapping.put(destinationJndi, serviceName);
231: String destinationName = getDestinationName(destinationJndi);
232:
233: if (destinationName == null) {
234: log.warn("JMS Destination with JNDI name : "
235: + destinationJndi + " does not exist");
236:
237: Connection con = null;
238: try {
239: if ((jndiUser == null) || (jndiPass == null)) {
240: // Use the OS username and credentials
241: con = conFactory.createConnection();
242: } else {
243: // use an explicit username and password
244: con = conFactory.createConnection(user, pass);
245: }
246: Session session = con.createSession(false,
247: Session.AUTO_ACKNOWLEDGE);
248: Queue queue = session.createQueue(destinationJndi);
249: destinationName = queue.getQueueName();
250: log.warn("JMS Destination with JNDI name : "
251: + destinationJndi + " created");
252:
253: } catch (JMSException e) {
254: log.error(
255: "Unable to create a Destination with JNDI name : "
256: + destinationJndi, e);
257: // mark service as faulty
258: JMSUtils.markServiceAsFaulty(
259: (String) serviceJNDINameMapping
260: .get(destinationJndi),
261: "Error looking up JMS destination : "
262: + destinationJndi, msgRcvr
263: .getAxisConf().getAxisConfiguration());
264:
265: } finally {
266: if (con != null) {
267: try {
268: con.close();
269: } catch (JMSException ignore) {
270: }
271: }
272: }
273: }
274:
275: serviceDestinationMapping.put(destinationName, serviceName);
276: log.info("Mapping JNDI name : " + destinationJndi
277: + " and JMS Destination name : " + destinationName
278: + " against service : " + serviceName);
279: }
280:
281: /**
282: * Remove listen destination on this connection factory
283: *
284: * @param destinationJndi the JMS destination to be removed
285: * @throws if an error occurs trying to stop listening for messages before removal
286: */
287: public void removeDestination(String destinationJndi)
288: throws JMSException {
289: // find and save provider specific Destination name before we delete
290: String providerSpecificDestination = getDestinationName(destinationJndi);
291: stoplistenOnDestination(destinationJndi);
292: serviceJNDINameMapping.remove(destinationJndi);
293: if (providerSpecificDestination != null) {
294: serviceDestinationMapping
295: .remove(providerSpecificDestination);
296: }
297: }
298:
299: /**
300: * Add a property to the connection factory
301: *
302: * @param key
303: * @param value
304: */
305: public void addProperty(String key, String value) {
306: properties.put(key, value);
307: }
308:
309: /**
310: * Return the name of the connection factory
311: *
312: * @return the Axis2 name of this factory
313: */
314: public String getName() {
315: return name;
316: }
317:
318: /**
319: * Get the JNDI name of the actual factory
320: *
321: * @return the jndi name of the actual connection factory
322: */
323: public String getJndiName() {
324: return jndiName;
325: }
326:
327: /**
328: * Get the JNDI name of the actual factory username
329: *
330: * @return the jndi name of the actual connection factory username
331: */
332: public String getJndiUser() {
333: return jndiUser;
334: }
335:
336: /**
337: * Get the JNDI name of the actual factory password
338: *
339: * @return the jndi name of the actual connection factory password
340: */
341: public String getJndiPass() {
342: return jndiPass;
343: }
344:
345: /**
346: * This is the real password for the connection factory after the JNDI lookup
347: *
348: * @return the real password for the connection factory after the JNDI lookup
349: */
350: public String getPass() {
351: return pass;
352: }
353:
354: /**
355: * This is the real username for the connection factory after the JNDI lookup
356: *
357: * @return the eal username for the connection factory after the JNDI lookup
358: */
359: public String getUser() {
360: return user;
361: }
362:
363: /**
364: * Get the actual underlying connection factory
365: *
366: * @return actual connection factory
367: */
368: public ConnectionFactory getConFactory() {
369: return conFactory;
370: }
371:
372: /**
373: * Get the list of destinations (JNDI) associated with this connection factory
374: *
375: * @return destinations to service maping
376: */
377: public Map getDestinations() {
378: return serviceJNDINameMapping;
379: }
380:
381: /**
382: * Get the connection factory properties
383: *
384: * @return properties
385: */
386: public Hashtable getProperties() {
387: return properties;
388: }
389:
390: /**
391: * Begin listening for messages on the list of destinations associated
392: * with this connection factory. (Called during Axis2 initialization of
393: * the Transport receivers)
394: *
395: * @param msgRcvr the message receiver which will process received messages
396: * @throws JMSException on exceptions
397: */
398: public void listen(JMSMessageReceiver msgRcvr) throws JMSException {
399:
400: // save a reference to the message receiver
401: this .msgRcvr = msgRcvr;
402:
403: log.debug("Connection factory : " + name + " initializing...");
404:
405: if (conFactory == null || context == null) {
406: handleException("Connection factory must be 'connected' before listening");
407: } else {
408: try {
409: if ((jndiUser == null) || (jndiPass == null)) {
410: // User the OS username and credentials
411: connection = conFactory.createConnection();
412: } else {
413: // use an explicit username and password
414: connection = conFactory
415: .createConnection(user, pass);
416: }
417: } catch (JMSException e) {
418: handleException(
419: "Error creating a JMS connection using the "
420: + "factory : " + jndiName, e);
421: }
422: }
423:
424: Iterator iter = serviceJNDINameMapping.keySet().iterator();
425: while (iter.hasNext()) {
426: String destinationJndi = (String) iter.next();
427: listenOnDestination(destinationJndi);
428: }
429:
430: // start the connection
431: connection.start();
432: log.info("Connection factory : " + name + " initialized...");
433: }
434:
435: /**
436: * Listen on the given destination from this connection factory. Used to
437: * start listening on a destination associated with a newly deployed service
438: *
439: * @param destinationJndi the JMS destination to listen on
440: * @throws JMSException on exception
441: */
442: public void listenOnDestination(String destinationJndi)
443: throws JMSException {
444: Session session = connection.createSession(false,
445: Session.AUTO_ACKNOWLEDGE);
446: Destination destination = null;
447: try {
448: Object o = context.lookup(destinationJndi);
449: destination = (Destination) o;
450:
451: } catch (NameNotFoundException e) {
452: log.warn("Cannot find destination : " + destinationJndi
453: + " Creating a Queue with this name");
454: destination = session.createQueue(destinationJndi);
455:
456: } catch (NamingException e) {
457: log.warn("Error looking up destination : "
458: + destinationJndi, e);
459: // mark service as faulty
460: JMSUtils.markServiceAsFaulty(
461: (String) serviceJNDINameMapping
462: .get(destinationJndi),
463: "Error looking up JMS destination : "
464: + destinationJndi, this .msgRcvr
465: .getAxisConf().getAxisConfiguration());
466: }
467:
468: MessageConsumer consumer = session.createConsumer(destination);
469: consumer.setMessageListener(this .msgRcvr);
470: jmsSessions.put(destinationJndi, session);
471: }
472:
473: /**
474: * Stop listening on the given destination - for undeployment of services
475: *
476: * @param destinationJndi the JNDI name of the JMS destination
477: * @throws JMSException on exception
478: */
479: private void stoplistenOnDestination(String destinationJndi)
480: throws JMSException {
481: ((Session) jmsSessions.get(destinationJndi)).close();
482: }
483:
484: /**
485: * Return the service name using this destination
486: *
487: * @param destination the destination name
488: * @return the service which uses the given destination, or null
489: */
490: public String getServiceNameForDestination(String destination) {
491:
492: return (String) serviceJNDINameMapping.get(destination);
493: }
494:
495: /**
496: * Close all connections, sessions etc.. and stop this connection factory
497: */
498: public void stop() {
499: try {
500: connection.close();
501: } catch (JMSException e) {
502: log.warn(
503: "Error shutting down connection factory : " + name,
504: e);
505: }
506: }
507:
508: /**
509: * Return the provider specific Destination name if any for the destination with the given
510: * JNDI name
511: * @param destinationJndi the JNDI name of the destination
512: * @return the provider specific Destination name or null if cannot be found
513: */
514: public String getDestinationName(String destinationJndi) {
515: try {
516: Destination destination = (Destination) context
517: .lookup(destinationJndi);
518: if (destination != null && destination instanceof Queue) {
519: return ((Queue) destination).getQueueName();
520: } else if (destination != null
521: && destination instanceof Topic) {
522: return ((Topic) destination).getTopicName();
523: }
524: } catch (JMSException e) {
525: log.warn(
526: "Error reading provider specific JMS destination name for destination "
527: + "with JNDI name : " + destinationJndi, e);
528: } catch (NamingException e) {
529: log
530: .warn("Error looking up destination with JNDI name : "
531: + destinationJndi
532: + " to map its corresponding provider specific Destination name");
533: }
534: return null;
535: }
536:
537: /**
538: * Return the EPR for the JMS Destination with the given JNDI name and using
539: * this connection factory
540: * @param destination the JNDI name of the JMS Destionation
541: * @return the EPR
542: */
543: public EndpointReference getEPRForDestination(String destination) {
544:
545: StringBuffer sb = new StringBuffer();
546: sb.append(JMSConstants.JMS_PREFIX).append(destination);
547: sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM)
548: .append("=").append(getJndiName());
549: Iterator props = getProperties().keySet().iterator();
550: while (props.hasNext()) {
551: String key = (String) props.next();
552: String value = (String) getProperties().get(key);
553: sb.append("&").append(key).append("=").append(value);
554: }
555:
556: return new EndpointReference(sb.toString());
557: }
558:
559: public String getServiceByDestination(String destinationName) {
560: return (String) serviceDestinationMapping.get(destinationName);
561: }
562:
563: private void handleException(String msg) throws AxisJMSException {
564: log.error(msg);
565: throw new AxisJMSException(msg);
566: }
567:
568: private void handleException(String msg, Exception e)
569: throws AxisJMSException {
570: log.error(msg, e);
571: throw new AxisJMSException(msg, e);
572: }
573: }
|