001: /*
002: * Copyright 2004,2005 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.apache.synapse.transport.jms;
017:
018: import org.apache.axis2.AxisFault;
019: import org.apache.axis2.Constants;
020: import org.apache.axis2.addressing.EndpointReference;
021: import org.apache.axis2.context.ConfigurationContext;
022: import org.apache.axis2.description.*;
023: import org.apache.synapse.transport.base.AbstractTransportListener;
024: import org.apache.synapse.transport.base.BaseUtils;
025: import org.apache.commons.logging.LogFactory;
026:
027: import javax.jms.JMSException;
028: import javax.naming.NamingException;
029: import java.util.*;
030:
031: /**
032: * The JMS Transport listener implementation. A JMS Listner will hold one or
033: * more JMS connection factories, which would be created at initialization
034: * time. This implementation does not support the creation of connection
035: * factories at runtime. This JMS Listener registers with Axis to be notified
036: * of service deployment/undeployment/start and stop, and enables or disables
037: * listening for messages on the destinations as appropriate.
038: * <p/>
039: * A Service could state the JMS connection factory name and the destination
040: * name for use as Parameters in its services.xml as shown in the example
041: * below. If the connection name was not specified, it will use the connection
042: * factory named "default" (JMSConstants.DEFAULT_CONFAC_NAME) - if such a
043: * factory is defined in the Axis2.xml. If the destination name is not specified
044: * it will default to a JMS queue by the name of the service. If the destination
045: * should be a Topic, it should be created on the JMS implementation, and
046: * specified in the services.xml of the service.
047: * <p/>
048: * <parameter name="transport.jms.ConnectionFactory" locked="true">
049: * myTopicConnectionFactory</parameter>
050: * <parameter name="transport.jms.Destination" locked="true">
051: * dynamicTopics/something.TestTopic</parameter>
052: */
053: public class JMSListener extends AbstractTransportListener {
054:
055: public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
056:
057: /** A Map containing the JMS connection factories managed by this, keyed by name */
058: private Map connectionFactories = new HashMap();
059: /** A Map of service name to the JMS EPR addresses */
060: private Map serviceNameToEPRMap = new HashMap();
061:
062: /**
063: * This is the TransportListener initialization method invoked by Axis2
064: *
065: * @param cfgCtx the Axis configuration context
066: * @param trpInDesc the TransportIn description
067: */
068: public void init(ConfigurationContext cfgCtx,
069: TransportInDescription trpInDesc) throws AxisFault {
070: setTransportName(TRANSPORT_NAME);
071: super .init(cfgCtx, trpInDesc);
072:
073: // read the connection factory definitions and create them
074: loadConnectionFactoryDefinitions(trpInDesc);
075:
076: // if no connection factories are defined, we cannot listen for any messages
077: if (connectionFactories.isEmpty()) {
078: log
079: .warn("No JMS connection factories are defined. Cannot listen for JMS");
080: return;
081: }
082:
083: log.info("JMS Transport Receiver/Listener initialized...");
084: }
085:
086: /**
087: * Start this JMS Listener (Transport Listener)
088: *
089: * @throws AxisFault
090: */
091: public void start() throws AxisFault {
092:
093: Iterator iter = connectionFactories.values().iterator();
094: while (iter.hasNext()) {
095: JMSConnectionFactory conFac = (JMSConnectionFactory) iter
096: .next();
097: conFac.setJmsMessageReceiver(new JMSMessageReceiver(this ,
098: conFac, workerPool, cfgCtx));
099:
100: try {
101: conFac.connectAndListen();
102: } catch (JMSException e) {
103: handleException("Error starting connection factory : "
104: + conFac.getName(), e);
105: } catch (NamingException e) {
106: handleException("Error starting connection factory : "
107: + conFac.getName(), e);
108: }
109: }
110:
111: super .start();
112: }
113:
114: /**
115: * Stop the JMS Listener, and shutdown all of the connection factories
116: */
117: public void stop() throws AxisFault {
118: super .stop();
119: Iterator iter = connectionFactories.values().iterator();
120: while (iter.hasNext()) {
121: ((JMSConnectionFactory) iter.next()).stop();
122: }
123: }
124:
125: /**
126: * Returns EPRs for the given service and IP over the JMS transport
127: *
128: * @param serviceName service name
129: * @param ip ignored
130: * @return the EPR for the service
131: * @throws AxisFault not used
132: */
133: public EndpointReference[] getEPRsForService(String serviceName,
134: String ip) throws AxisFault {
135: //Strip out the operation name
136: if (serviceName.indexOf('/') != -1) {
137: serviceName = serviceName.substring(0, serviceName
138: .indexOf('/'));
139: }
140: return new EndpointReference[] { new EndpointReference(
141: (String) serviceNameToEPRMap.get(serviceName)) };
142: }
143:
144: /**
145: * Prepare to listen for JMS messages on behalf of the given service
146: *
147: * @param service the service for which to listen for messages
148: */
149: protected void startListeningForService(AxisService service) {
150: if (service.getName().startsWith("__")) {
151: return;
152: }
153:
154: JMSConnectionFactory cf = getConnectionFactory(service);
155: if (cf == null) {
156: String msg = "Service "
157: + service.getName()
158: + " does not specify"
159: + "a JMS connection factory or refers to an invalid factory. "
160: + "This service is being marked as faulty and will not be "
161: + "available over the JMS transport";
162: log.warn(msg);
163: BaseUtils.markServiceAsFaulty(service.getName(), msg,
164: service.getAxisConfiguration());
165: return;
166: }
167:
168: // compute service EPR and keep for later use
169: String destinationName = JMSUtils
170: .getJNDIDestinationNameForService(service);
171: serviceNameToEPRMap.put(service.getName(), JMSUtils.getEPR(cf,
172: destinationName));
173:
174: String destinationType = JMSUtils
175: .getDestinationTypeForService(service);
176:
177: log.info("Starting to listen on destination : "
178: + destinationName + " of type " + destinationType
179: + " for service " + service.getName());
180: cf.addDestination(destinationName, destinationType, service
181: .getName());
182: cf
183: .startListeningOnDestination(destinationName,
184: destinationType);
185: }
186:
187: /**
188: * Stops listening for messages for the service thats undeployed or stopped
189: *
190: * @param service the service that was undeployed or stopped
191: */
192: protected void stopListeningForService(AxisService service) {
193:
194: JMSConnectionFactory cf = getConnectionFactory(service);
195: if (cf != null) {
196: // remove from the serviceNameToEprMap
197: serviceNameToEPRMap.remove(service.getName());
198:
199: String destination = JMSUtils
200: .getJNDIDestinationNameForService(service);
201: cf.removeDestination(destination);
202: }
203: }
204:
205: /**
206: * Return the connection factory name for this service. If this service
207: * refers to an invalid factory or defaults to a non-existent default
208: * factory, this returns null
209: *
210: * @param service the AxisService
211: * @return the JMSConnectionFactory to be used, or null if reference is invalid
212: */
213: private JMSConnectionFactory getConnectionFactory(
214: AxisService service) {
215: Parameter conFacParam = service
216: .getParameter(JMSConstants.CONFAC_PARAM);
217:
218: // validate connection factory name (specified or default)
219: if (conFacParam != null) {
220: String conFac = (String) conFacParam.getValue();
221: if (connectionFactories.containsKey(conFac)) {
222: return (JMSConnectionFactory) connectionFactories
223: .get(conFac);
224: } else {
225: return null;
226: }
227:
228: } else if (connectionFactories
229: .containsKey(JMSConstants.DEFAULT_CONFAC_NAME)) {
230: return (JMSConnectionFactory) connectionFactories
231: .get(JMSConstants.DEFAULT_CONFAC_NAME);
232:
233: } else {
234: return null;
235: }
236: }
237:
238: /**
239: * Create JMSConnectionFactory instances for the definitions in the transport listener,
240: * and add these into our collection of connectionFactories map keyed by name
241: *
242: * @param transprtIn the transport-in description for JMS
243: */
244: private void loadConnectionFactoryDefinitions(
245: TransportInDescription transprtIn) {
246:
247: // iterate through all defined connection factories
248: Iterator conFacIter = transprtIn.getParameters().iterator();
249:
250: while (conFacIter.hasNext()) {
251: Parameter conFacParams = (Parameter) conFacIter.next();
252:
253: JMSConnectionFactory jmsConFactory = new JMSConnectionFactory(
254: conFacParams.getName(), cfgCtx);
255: JMSUtils.setConnectionFactoryParameters(conFacParams,
256: jmsConFactory);
257:
258: connectionFactories.put(jmsConFactory.getName(),
259: jmsConFactory);
260: }
261: }
262:
263: }
|