001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.axis2.transport.jms;
020:
021: import edu.emory.mathcs.backport.java.util.concurrent.Executor;
022: import org.apache.axiom.om.util.UUIDGenerator;
023: import org.apache.axis2.AxisFault;
024: import org.apache.axis2.Constants;
025: import org.apache.axis2.description.Parameter;
026: import org.apache.axis2.addressing.RelatesTo;
027: import org.apache.axis2.context.ConfigurationContext;
028: import org.apache.axis2.context.MessageContext;
029: import org.apache.axis2.engine.AxisEngine;
030: import org.apache.axis2.util.MessageContextBuilder;
031: import org.apache.commons.logging.Log;
032: import org.apache.commons.logging.LogFactory;
033:
034: import javax.jms.Destination;
035: import javax.jms.JMSException;
036: import javax.jms.Message;
037: import javax.jms.MessageListener;
038: import javax.jms.Queue;
039: import javax.jms.Topic;
040: import javax.naming.Context;
041: import javax.xml.stream.XMLStreamException;
042: import java.io.InputStream;
043:
044: /**
045: * This is the actual receiver which listens for and accepts JMS messages, and
046: * hands them over to be processed by a worker thread. An instance of this
047: * class is created for each JMSConnectionFactory, but all instances may and
048: * will share the same worker thread pool.
049: */
050: public class JMSMessageReceiver implements MessageListener {
051:
052: private static final Log log = LogFactory
053: .getLog(JMSMessageReceiver.class);
054:
055: /**
056: * The thread pool of workers
057: */
058: private Executor workerPool = null;
059: /**
060: * The Axis configuration context
061: */
062: private ConfigurationContext axisConf = null;
063: /**
064: * A reference to the JMS Connection Factory
065: */
066: private JMSConnectionFactory jmsConFac = null;
067:
068: /**
069: * Create a new JMSMessage receiver
070: *
071: * @param jmsConFac the JMS connection factory associated with
072: * @param workerPool the worker thead pool to be used
073: * @param axisConf the Axis2 configuration
074: */
075: JMSMessageReceiver(JMSConnectionFactory jmsConFac,
076: Executor workerPool, ConfigurationContext axisConf) {
077: this .jmsConFac = jmsConFac;
078: this .workerPool = workerPool;
079: this .axisConf = axisConf;
080: }
081:
082: /**
083: * Return the Axis configuration
084: *
085: * @return the Axis configuration
086: */
087: public ConfigurationContext getAxisConf() {
088: return axisConf;
089: }
090:
091: /**
092: * Set the worker thread pool
093: *
094: * @param workerPool the worker thead pool
095: */
096: public void setWorkerPool(Executor workerPool) {
097: this .workerPool = workerPool;
098: }
099:
100: /**
101: * The entry point on the recepit of each JMS message
102: *
103: * @param message the JMS message received
104: */
105: public void onMessage(Message message) {
106: // directly create a new worker and delegate processing
107: try {
108: if (log.isDebugEnabled()) {
109: StringBuffer sb = new StringBuffer();
110: sb.append("Received JMS message to destination : "
111: + message.getJMSDestination());
112: sb
113: .append("\nMessage ID : "
114: + message.getJMSMessageID());
115: sb.append("\nCorrelation ID : "
116: + message.getJMSCorrelationID());
117: sb.append("\nReplyTo ID : " + message.getJMSReplyTo());
118: log.debug(sb.toString());
119: }
120: } catch (JMSException e) {
121: e.printStackTrace();
122: }
123: workerPool.execute(new Worker(message));
124: }
125:
126: /**
127: * Creates an Axis MessageContext for the received JMS message and
128: * sets up the transports and various properties
129: *
130: * @param message the JMS message
131: * @return the Axis MessageContext
132: */
133: private MessageContext createMessageContext(Message message) {
134:
135: InputStream in = JMSUtils.getInputStream(message);
136:
137: try {
138: MessageContext msgContext = axisConf.createMessageContext();
139:
140: // get destination and create correct EPR
141: Destination dest = message.getJMSDestination();
142: String destinationName = null;
143: if (dest instanceof Queue) {
144: destinationName = ((Queue) dest).getQueueName();
145: } else if (dest instanceof Topic) {
146: destinationName = ((Topic) dest).getTopicName();
147: }
148:
149: String serviceName = jmsConFac
150: .getServiceByDestination(destinationName);
151:
152: // hack to get around the crazy Active MQ dynamic queue and topic issues
153: if (serviceName == null) {
154: String provider = (String) jmsConFac.getProperties()
155: .get(Context.INITIAL_CONTEXT_FACTORY);
156: if (provider.indexOf("activemq") != -1) {
157: serviceName = jmsConFac
158: .getServiceNameForDestination(((dest instanceof Queue ? JMSConstants.ACTIVEMQ_DYNAMIC_QUEUE
159: : JMSConstants.ACTIVEMQ_DYNAMIC_TOPIC) + destinationName));
160: }
161: }
162:
163: if (serviceName != null) {
164: // set to bypass dispatching and handover directly to this service
165: msgContext
166: .setAxisService(axisConf.getAxisConfiguration()
167: .getService(serviceName));
168: }
169:
170: msgContext
171: .setIncomingTransportName(Constants.TRANSPORT_JMS);
172: msgContext.setTransportIn(axisConf.getAxisConfiguration()
173: .getTransportIn(Constants.TRANSPORT_JMS));
174:
175: msgContext.setTransportOut(axisConf.getAxisConfiguration()
176: .getTransportOut(Constants.TRANSPORT_JMS));
177: // the reply is assumed to be on the JMSReplyTo destination, using
178: // the same incoming connection factory
179:
180: JMSOutTransportInfo jmsOutTransportInfo;
181:
182: if ((jmsConFac.getJndiUser() == null)
183: || (jmsConFac.getJndiPass() == null))
184: jmsOutTransportInfo = new JMSOutTransportInfo(jmsConFac
185: .getConFactory(), message.getJMSReplyTo());
186: else
187: jmsOutTransportInfo = new JMSOutTransportInfo(jmsConFac
188: .getConFactory(), jmsConFac.getUser(),
189: jmsConFac.getPass(), message.getJMSReplyTo());
190:
191: msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
192: jmsOutTransportInfo);
193:
194: msgContext.setServerSide(true);
195: msgContext.setMessageID(message.getJMSMessageID());
196:
197: Destination replyTo = message.getJMSReplyTo();
198: String jndiDestinationName = null;
199: if (replyTo == null) {
200: Parameter param = msgContext.getAxisService()
201: .getParameter(JMSConstants.REPLY_PARAM);
202: if (param != null && param.getValue() != null) {
203: jndiDestinationName = (String) param.getValue();
204: }
205: }
206:
207: if (jndiDestinationName != null) {
208: msgContext.setReplyTo(jmsConFac
209: .getEPRForDestination(jndiDestinationName));
210: }
211:
212: String soapAction = JMSUtils.getProperty(message,
213: JMSConstants.SOAPACTION);
214: if (soapAction != null) {
215: msgContext.setSoapAction(soapAction);
216: }
217:
218: msgContext.setEnvelope(JMSUtils.getSOAPEnvelope(message,
219: msgContext, in));
220:
221: // set correlation id
222: String correlationId = message.getJMSCorrelationID();
223: if (correlationId != null && correlationId.length() > 0) {
224: msgContext.setProperty(JMSConstants.JMS_COORELATION_ID,
225: correlationId);
226: msgContext
227: .setRelationships(new RelatesTo[] { new RelatesTo(
228: correlationId) });
229: }
230:
231: return msgContext;
232:
233: } catch (JMSException e) {
234: handleException(
235: "JMS Exception reading the destination name", e);
236: } catch (AxisFault e) {
237: handleException("Axis fault creating the MessageContext", e);
238: } catch (XMLStreamException e) {
239: handleException("Error reading the SOAP envelope", e);
240: }
241: return null;
242: }
243:
244: private void handleException(String msg, Exception e) {
245: log.error(msg, e);
246: throw new AxisJMSException(msg, e);
247: }
248:
249: /**
250: * The actual Runnable Worker implementation which will process the
251: * received JMS messages in the worker thread pool
252: */
253: class Worker implements Runnable {
254:
255: private Message message = null;
256:
257: Worker(Message message) {
258: this .message = message;
259: }
260:
261: public void run() {
262: MessageContext msgCtx = createMessageContext(message);
263:
264: AxisEngine engine = new AxisEngine(msgCtx
265: .getConfigurationContext());
266: try {
267: log
268: .debug("Delegating JMS message for processing to the Axis engine");
269: try {
270: engine.receive(msgCtx);
271: } catch (AxisFault e) {
272: log
273: .debug(
274: "Exception occured when receiving the SOAP message",
275: e);
276: if (msgCtx.isServerSide()) {
277: MessageContext faultContext = MessageContextBuilder
278: .createFaultMessageContext(msgCtx, e);
279: engine.sendFault(faultContext);
280: }
281: }
282: } catch (AxisFault af) {
283: log.error("JMS Worker ["
284: + Thread.currentThread().getName()
285: + "] Encountered an Axis Fault : "
286: + af.getMessage(), af);
287: }
288: }
289: }
290: }
|