001: /*
002: * Copyright 2004,2005 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.apache.synapse.transport.jms;
017:
018: import org.apache.axis2.AxisFault;
019: import org.apache.axis2.Constants;
020: import org.apache.synapse.transport.base.threads.WorkerPool;
021: import org.apache.synapse.transport.base.BaseUtils;
022: import org.apache.synapse.transport.base.BaseConstants;
023: import org.apache.axis2.description.Parameter;
024: import org.apache.axis2.description.AxisService;
025: import org.apache.axis2.description.AxisOperation;
026: import org.apache.axis2.context.ConfigurationContext;
027: import org.apache.axis2.context.MessageContext;
028: import org.apache.commons.logging.Log;
029: import org.apache.commons.logging.LogFactory;
030:
031: import javax.jms.*;
032: import javax.xml.namespace.QName;
033:
034: /**
035: * This is the actual receiver which listens for and accepts JMS messages, and
036: * hands them over to be processed by a worker thread. An instance of this
037: * class is created for each JMSConnectionFactory, but all instances may and
038: * will share the same worker thread pool held by the JMSListener
039: */
040: public class JMSMessageReceiver implements MessageListener {
041:
042: private static final Log log = LogFactory
043: .getLog(JMSMessageReceiver.class);
044:
045: /** The JMSListener */
046: private JMSListener jmsListener = null;
047: /** The thread pool of workers */
048: private WorkerPool workerPool = null;
049: /** The Axis configuration context */
050: private ConfigurationContext cfgCtx = null;
051: /** A reference to the JMS Connection Factory to which this applies */
052: private JMSConnectionFactory jmsConnectionFactory = null;
053:
054: /**
055: * Create a new JMSMessage receiver
056: *
057: * @param jmsListener the JMS transport Listener
058: * @param jmsConFac the JMS connection factory we are associated with
059: * @param workerPool the worker thead pool to be used
060: * @param cfgCtx the axis ConfigurationContext
061: */
062: JMSMessageReceiver(JMSListener jmsListener,
063: JMSConnectionFactory jmsConFac, WorkerPool workerPool,
064: ConfigurationContext cfgCtx) {
065: this .jmsListener = jmsListener;
066: this .jmsConnectionFactory = jmsConFac;
067: this .workerPool = workerPool;
068: this .cfgCtx = cfgCtx;
069: }
070:
071: /**
072: * The entry point on the recepit of each JMS message
073: *
074: * @param message the JMS message received
075: */
076: public void onMessage(Message message) {
077: // directly create a new worker and delegate processing
078: try {
079: if (log.isDebugEnabled()) {
080: StringBuffer sb = new StringBuffer();
081: sb.append("Received JMS message to destination : "
082: + message.getJMSDestination());
083: sb
084: .append("\nMessage ID : "
085: + message.getJMSMessageID());
086: sb.append("\nCorrelation ID : "
087: + message.getJMSCorrelationID());
088: sb.append("\nReplyTo ID : " + message.getJMSReplyTo());
089: log.debug(sb.toString());
090: if (log.isTraceEnabled()
091: && message instanceof TextMessage) {
092: log.trace("\nMessage : "
093: + ((TextMessage) message).getText());
094: }
095: }
096: } catch (JMSException e) {
097: if (log.isDebugEnabled()) {
098: log
099: .debug(
100: "Error reading JMS message headers for debug logging",
101: e);
102: }
103: }
104:
105: // has this message already expired? expiration time == 0 means never expires
106: try {
107: long expiryTime = message.getJMSExpiration();
108: if (expiryTime > 0
109: && System.currentTimeMillis() > expiryTime) {
110: if (log.isDebugEnabled()) {
111: log.debug("Discard expired message with ID : "
112: + message.getJMSMessageID());
113: }
114: return;
115: }
116: } catch (JMSException ignore) {
117: }
118:
119: workerPool.execute(new Worker(message));
120: }
121:
122: private void handleException(String msg, Exception e) {
123: log.error(msg, e);
124: throw new AxisJMSException(msg, e);
125: }
126:
127: private void handleException(String msg) {
128: log.error(msg);
129: throw new AxisJMSException(msg);
130: }
131:
132: /**
133: * The actual Worker implementation which will process the
134: * received JMS messages in the worker thread pool
135: */
136: class Worker implements Runnable {
137:
138: private Message message = null;
139:
140: Worker(Message message) {
141: this .message = message;
142: }
143:
144: public void run() {
145:
146: MessageContext msgContext = jmsListener
147: .createMessageContext();
148:
149: // set the JMS Message ID as the Message ID of the MessageContext
150: try {
151: msgContext.setMessageID(message.getJMSMessageID());
152: msgContext.setProperty(JMSConstants.JMS_COORELATION_ID,
153: message.getJMSMessageID());
154: } catch (JMSException ignore) {
155: }
156:
157: try {
158: Destination dest = message.getJMSDestination();
159: String destinationName = null;
160: if (dest instanceof Queue) {
161: destinationName = ((Queue) dest).getQueueName();
162: } else if (dest instanceof Topic) {
163: destinationName = ((Topic) dest).getTopicName();
164: }
165:
166: String serviceName = jmsConnectionFactory
167: .getServiceNameForDestination(dest,
168: destinationName);
169: String soapAction = JMSUtils.getInstace().getProperty(
170: message, BaseConstants.SOAPACTION);
171: AxisService service = null;
172:
173: // set to bypass dispatching if we know the service - we already should!
174: if (serviceName != null) {
175: service = cfgCtx.getAxisConfiguration().getService(
176: serviceName);
177: msgContext.setAxisService(service);
178:
179: // find the operation for the message, or default to one
180: Parameter operationParam = service
181: .getParameter(BaseConstants.OPERATION_PARAM);
182: QName operationQName = (operationParam != null ? BaseUtils
183: .getQNameFromString(operationParam
184: .getValue())
185: : BaseConstants.DEFAULT_OPERATION);
186:
187: AxisOperation operation = service
188: .getOperation(operationQName);
189: if (operation != null) {
190: msgContext.setAxisOperation(operation);
191: msgContext.setSoapAction("urn:"
192: + operation.getName().getLocalPart());
193: }
194: }
195:
196: // set the message property OUT_TRANSPORT_INFO
197: // the reply is assumed to be over the JMSReplyTo destination, using
198: // the same incoming connection factory, if a JMSReplyTo is available
199: if (message.getJMSReplyTo() != null) {
200: msgContext.setProperty(
201: Constants.OUT_TRANSPORT_INFO,
202: new JMSOutTransportInfo(
203: jmsConnectionFactory, message
204: .getJMSReplyTo()));
205:
206: } else if (service != null) {
207: // does the service specify a default reply destination ?
208: Parameter param = service
209: .getParameter(JMSConstants.REPLY_PARAM);
210: if (param != null && param.getValue() != null) {
211: msgContext
212: .setProperty(
213: Constants.OUT_TRANSPORT_INFO,
214: new JMSOutTransportInfo(
215: jmsConnectionFactory,
216: jmsConnectionFactory
217: .getDestination((String) param
218: .getValue())));
219: }
220: }
221:
222: String contentType = JMSUtils.getInstace().getProperty(
223: message, BaseConstants.CONTENT_TYPE);
224:
225: // set the message payload to the message context
226: JMSUtils.getInstace().setSOAPEnvelope(message,
227: msgContext, contentType);
228:
229: jmsListener.handleIncomingMessage(msgContext, JMSUtils
230: .getTransportHeaders(message), soapAction,
231: contentType);
232:
233: } catch (JMSException e) {
234: handleException(
235: "JMS Exception reading the message Destination or JMS ReplyTo",
236: e);
237: } catch (AxisFault e) {
238: handleException("Axis fault creating a MessageContext",
239: e);
240: }
241: }
242: }
243: }
|