001: /*
002: * $Id: JmsMessageDispatcher.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.jms;
012:
013: import org.mule.DefaultMuleMessage;
014: import org.mule.api.MuleEvent;
015: import org.mule.api.MuleMessage;
016: import org.mule.api.endpoint.EndpointURI;
017: import org.mule.api.endpoint.OutboundEndpoint;
018: import org.mule.api.transport.Connector;
019: import org.mule.api.transport.DispatchException;
020: import org.mule.api.transport.MessageAdapter;
021: import org.mule.transaction.IllegalTransactionStateException;
022: import org.mule.transport.AbstractMessageDispatcher;
023: import org.mule.transport.jms.i18n.JmsMessages;
024: import org.mule.util.ClassUtils;
025: import org.mule.util.NumberUtils;
026: import org.mule.util.StringUtils;
027: import org.mule.util.concurrent.Latch;
028: import org.mule.util.concurrent.WaitableBoolean;
029:
030: import javax.jms.DeliveryMode;
031: import javax.jms.Destination;
032: import javax.jms.Message;
033: import javax.jms.MessageConsumer;
034: import javax.jms.MessageListener;
035: import javax.jms.MessageProducer;
036: import javax.jms.Session;
037: import javax.jms.TemporaryQueue;
038: import javax.jms.TemporaryTopic;
039:
040: import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
041:
042: import org.apache.commons.lang.BooleanUtils;
043:
044: /**
045: * <code>JmsMessageDispatcher</code> is responsible for dispatching messages to JMS
046: * destinations. All JMS semantics apply and settings such as replyTo and QoS
047: * properties are read from the event properties or defaults are used (according to
048: * the JMS specification)
049: */
050: public class JmsMessageDispatcher extends AbstractMessageDispatcher {
051:
052: private JmsConnector connector;
053: private Session cachedSession;
054:
055: public JmsMessageDispatcher(OutboundEndpoint endpoint) {
056: super (endpoint);
057: this .connector = (JmsConnector) endpoint.getConnector();
058: }
059:
060: protected void doDispatch(MuleEvent event) throws Exception {
061: dispatchMessage(event);
062: }
063:
064: protected void doConnect() throws Exception {
065: // template method
066: }
067:
068: protected void doDisconnect() throws Exception {
069: // template method
070: }
071:
072: private MuleMessage dispatchMessage(MuleEvent event)
073: throws Exception {
074: Session session = null;
075: MessageProducer producer = null;
076: MessageConsumer consumer = null;
077: Destination replyTo = null;
078: boolean transacted = false;
079: boolean cached = false;
080: boolean remoteSync = useRemoteSync(event);
081:
082: if (logger.isDebugEnabled()) {
083: logger.debug("dispatching on endpoint: "
084: + event.getEndpoint().getEndpointURI()
085: + ". MuleEvent id is: " + event.getId()
086: + ". Outbound transformers are: "
087: + event.getEndpoint().getTransformers());
088: }
089:
090: try {
091: session = connector.getSessionFromTransaction();
092: if (session != null) {
093: transacted = true;
094:
095: // If a transaction is running, we can not receive any messages
096: // in the same transaction.
097: if (remoteSync) {
098: throw new IllegalTransactionStateException(
099: JmsMessages
100: .connectorDoesNotSupportSyncReceiveWhenTransacted());
101: }
102: }
103: // Should we be caching sessions? Note this is not part of the JMS spec.
104: // and is turned off by default.
105: else if (event.getMessage().getBooleanProperty(
106: JmsConstants.CACHE_JMS_SESSIONS_PROPERTY,
107: connector.isCacheJmsSessions())) {
108: cached = true;
109: if (cachedSession != null) {
110: session = cachedSession;
111: } else {
112: session = connector.getSession(event.getEndpoint());
113: cachedSession = session;
114: }
115: } else {
116: session = connector.getSession(event.getEndpoint());
117: if (event.getEndpoint().getTransactionConfig()
118: .isTransacted()) {
119: transacted = true;
120: }
121: }
122:
123: EndpointURI endpointUri = event.getEndpoint()
124: .getEndpointURI();
125:
126: boolean topic = connector.getTopicResolver().isTopic(
127: event.getEndpoint(), true);
128:
129: Destination dest = connector.getJmsSupport()
130: .createDestination(session,
131: endpointUri.getAddress(), topic);
132: producer = connector.getJmsSupport().createProducer(
133: session, dest, topic);
134:
135: Object message = event.transformMessage();
136: if (!(message instanceof Message)) {
137: throw new DispatchException(JmsMessages
138: .checkTransformer("JMS message", message
139: .getClass(), connector.getName()),
140: event.getMessage(), event.getEndpoint());
141: }
142:
143: Message msg = (Message) message;
144: if (event.getMessage().getCorrelationId() != null) {
145: msg.setJMSCorrelationID(event.getMessage()
146: .getCorrelationId());
147: }
148:
149: MuleMessage eventMsg = event.getMessage();
150:
151: // Some JMS implementations might not support the ReplyTo property.
152: if (connector.supportsProperty(JmsConstants.JMS_REPLY_TO)) {
153: Object tempReplyTo = eventMsg
154: .removeProperty(JmsConstants.JMS_REPLY_TO);
155: if (tempReplyTo != null) {
156: if (tempReplyTo instanceof Destination) {
157: replyTo = (Destination) tempReplyTo;
158: } else {
159: // TODO AP should this drill-down be moved into the resolver as well?
160: boolean replyToTopic = false;
161: String reply = tempReplyTo.toString();
162: int i = reply.indexOf(":");
163: if (i > -1) {
164: // TODO MULE-1409 this check will not work for ActiveMQ 4.x,
165: // as they have temp-queue://<destination> and temp-topic://<destination> URIs
166: // Extract to a custom resolver for ActiveMQ4.x
167: // The code path can be exercised, e.g. by a LoanBrokerESBTestCase
168: String qtype = reply.substring(0, i);
169: replyToTopic = JmsConstants.TOPIC_PROPERTY
170: .equalsIgnoreCase(qtype);
171: reply = reply.substring(i + 1);
172: }
173: replyTo = connector.getJmsSupport()
174: .createDestination(session, reply,
175: replyToTopic);
176: }
177: }
178: // Are we going to wait for a return event ?
179: if (remoteSync && replyTo == null) {
180: replyTo = connector.getJmsSupport()
181: .createTemporaryDestination(session, topic);
182: }
183: // Set the replyTo property
184: if (replyTo != null) {
185: msg.setJMSReplyTo(replyTo);
186: }
187:
188: // Are we going to wait for a return event ?
189: if (remoteSync) {
190: consumer = connector.getJmsSupport()
191: .createConsumer(session, replyTo, topic);
192: }
193: }
194:
195: // QoS support
196: String ttlString = (String) eventMsg
197: .removeProperty(JmsConstants.TIME_TO_LIVE_PROPERTY);
198: String priorityString = (String) eventMsg
199: .removeProperty(JmsConstants.PRIORITY_PROPERTY);
200: String persistentDeliveryString = (String) eventMsg
201: .removeProperty(JmsConstants.PERSISTENT_DELIVERY_PROPERTY);
202:
203: long ttl = StringUtils.isNotBlank(ttlString) ? NumberUtils
204: .toLong(ttlString) : Message.DEFAULT_TIME_TO_LIVE;
205: int priority = StringUtils.isNotBlank(priorityString) ? NumberUtils
206: .toInt(priorityString)
207: : Message.DEFAULT_PRIORITY;
208: boolean persistent = StringUtils
209: .isNotBlank(persistentDeliveryString) ? BooleanUtils
210: .toBoolean(persistentDeliveryString)
211: : connector.isPersistentDelivery();
212:
213: if (connector.isHonorQosHeaders()) {
214: int priorityProp = eventMsg.getIntProperty(
215: JmsConstants.JMS_PRIORITY,
216: Connector.INT_VALUE_NOT_SET);
217: int deliveryModeProp = eventMsg.getIntProperty(
218: JmsConstants.JMS_DELIVERY_MODE,
219: Connector.INT_VALUE_NOT_SET);
220:
221: if (priorityProp != Connector.INT_VALUE_NOT_SET) {
222: priority = priorityProp;
223: }
224: if (deliveryModeProp != Connector.INT_VALUE_NOT_SET) {
225: persistent = deliveryModeProp == DeliveryMode.PERSISTENT;
226: }
227: }
228:
229: if (logger.isDebugEnabled()) {
230: logger.debug("Sending message of type "
231: + ClassUtils.getSimpleName(msg.getClass()));
232: }
233:
234: if (consumer != null && topic) {
235: // need to register a listener for a topic
236: Latch l = new Latch();
237: ReplyToListener listener = new ReplyToListener(l);
238: consumer.setMessageListener(listener);
239:
240: connector.getJmsSupport().send(producer, msg,
241: persistent, priority, ttl, topic);
242:
243: int timeout = event.getTimeout();
244:
245: if (logger.isDebugEnabled()) {
246: logger.debug("Waiting for return event for: "
247: + timeout + " ms on " + replyTo);
248: }
249:
250: l.await(timeout, TimeUnit.MILLISECONDS);
251: consumer.setMessageListener(null);
252: listener.release();
253: Message result = listener.getMessage();
254: if (result == null) {
255: logger
256: .debug("No message was returned via replyTo destination");
257: return null;
258: } else {
259: MessageAdapter adapter = connector
260: .getMessageAdapter(result);
261: return new DefaultMuleMessage(JmsMessageUtils
262: .toObject(result, connector
263: .getSpecification()), adapter);
264: }
265: } else {
266: connector.getJmsSupport().send(producer, msg,
267: persistent, priority, ttl, topic);
268: if (consumer != null) {
269: int timeout = event.getTimeout();
270:
271: if (logger.isDebugEnabled()) {
272: logger.debug("Waiting for return event for: "
273: + timeout + " ms on " + replyTo);
274: }
275:
276: Message result = consumer.receive(timeout);
277: if (result == null) {
278: logger
279: .debug("No message was returned via replyTo destination");
280: return null;
281: } else {
282: MessageAdapter adapter = connector
283: .getMessageAdapter(result);
284: return new DefaultMuleMessage(JmsMessageUtils
285: .toObject(result, connector
286: .getSpecification()), adapter);
287: }
288: }
289: }
290: return null;
291: } finally {
292: connector.closeQuietly(producer);
293: connector.closeQuietly(consumer);
294:
295: // TODO AP check if TopicResolver is to be utilized for temp destinations as well
296: if (replyTo != null
297: && (replyTo instanceof TemporaryQueue || replyTo instanceof TemporaryTopic)) {
298: if (replyTo instanceof TemporaryQueue) {
299: connector.closeQuietly((TemporaryQueue) replyTo);
300: } else {
301: // hope there are no more non-standard tricks from JMS vendors
302: // here ;)
303: connector.closeQuietly((TemporaryTopic) replyTo);
304: }
305: }
306:
307: // If the session is from the current transaction, it is up to the
308: // transaction to close it.
309: if (session != null && !cached && !transacted) {
310: connector.closeQuietly(session);
311: }
312: }
313: }
314:
315: protected MuleMessage doSend(MuleEvent event) throws Exception {
316: MuleMessage message = dispatchMessage(event);
317: return message;
318: }
319:
320: protected void doDispose() {
321: // template method
322: }
323:
324: private class ReplyToListener implements MessageListener {
325: private final Latch latch;
326: private volatile Message message;
327: private final WaitableBoolean released = new WaitableBoolean(
328: false);
329:
330: public ReplyToListener(Latch latch) {
331: this .latch = latch;
332: }
333:
334: public Message getMessage() {
335: return message;
336: }
337:
338: public void release() {
339: released.set(true);
340: }
341:
342: public void onMessage(Message message) {
343: this .message = message;
344: latch.countDown();
345: try {
346: released.whenTrue(null);
347: } catch (InterruptedException e) {
348: // ignored
349: }
350: }
351: }
352:
353: }
|