001: /*
002: * Copyright 2004,2005 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.apache.synapse.transport.jms;
017:
018: import org.apache.axiom.om.OMOutputFormat;
019: import org.apache.axiom.om.OMElement;
020: import org.apache.axiom.om.OMText;
021: import org.apache.axiom.om.OMNode;
022: import org.apache.axiom.om.util.UUIDGenerator;
023: import org.apache.axis2.AxisFault;
024: import org.apache.axis2.context.MessageContext;
025: import org.apache.axis2.context.ConfigurationContext;
026: import org.apache.axis2.description.TransportOutDescription;
027: import org.apache.axis2.description.Parameter;
028: import org.apache.axis2.transport.TransportUtils;
029: import org.apache.axis2.transport.MessageFormatter;
030: import org.apache.axis2.transport.OutTransportInfo;
031: import org.apache.synapse.transport.base.AbstractTransportSender;
032: import org.apache.synapse.transport.base.BaseUtils;
033: import org.apache.synapse.transport.base.BaseConstants;
034: import org.apache.axis2.transport.http.HTTPConstants;
035: import org.apache.commons.logging.LogFactory;
036:
037: import javax.jms.*;
038: import javax.jms.Queue;
039: import javax.activation.DataHandler;
040: import javax.naming.Context;
041: import javax.naming.NamingException;
042: import java.io.ByteArrayOutputStream;
043: import java.io.IOException;
044: import java.util.*;
045:
046: /**
047: * The TransportSender for JMS
048: */
049: public class JMSSender extends AbstractTransportSender {
050:
051: public static final String TRANSPORT_NAME = "jms";
052:
053: /** A Map containing the JMS connection factories managed by this, keyed by name */
054: private Map connectionFactories = new HashMap();
055:
056: public JMSSender() {
057: log = LogFactory.getLog(JMSSender.class);
058: }
059:
060: /**
061: * Initialize the transport sender by reading pre-defined connection factories for
062: * outgoing messages. These will create sessions (one per each destination dealth with)
063: * to be used when messages are being sent.
064: * @param cfgCtx the configuration context
065: * @param transportOut the transport sender definition from axis2.xml
066: * @throws AxisFault on error
067: */
068: public void init(ConfigurationContext cfgCtx,
069: TransportOutDescription transportOut) throws AxisFault {
070: setTransportName(TRANSPORT_NAME);
071: super .init(cfgCtx, transportOut);
072: // read the connection factory definitions and create them
073: loadConnectionFactoryDefinitions(transportOut);
074: }
075:
076: /**
077: * Get corresponding JMS connection factory defined within the transport sender for the
078: * transport-out information - usually constructed from a targetEPR
079: *
080: * @param trpInfo the transport-out information
081: * @return the corresponding JMS connection factory, if any
082: */
083: private JMSConnectionFactory getJMSConnectionFactory(
084: JMSOutTransportInfo trpInfo) {
085: if (trpInfo.getProperties() != null) {
086: String jmsConnectionFactoryName = (String) trpInfo
087: .getProperties().get(JMSConstants.CONFAC_PARAM);
088: if (jmsConnectionFactoryName != null) {
089: return (JMSConnectionFactory) connectionFactories
090: .get(jmsConnectionFactoryName);
091: }
092: }
093:
094: Iterator cfNames = connectionFactories.keySet().iterator();
095: while (cfNames.hasNext()) {
096: String cfName = (String) cfNames.next();
097: JMSConnectionFactory cf = (JMSConnectionFactory) connectionFactories
098: .get(cfName);
099: if (cf.equals(trpInfo)) {
100: return cf;
101: }
102: }
103: return null;
104: }
105:
106: /**
107: * Performs the actual sending of the JMS message
108: */
109: public void sendMessage(MessageContext msgCtx,
110: String targetAddress, OutTransportInfo outTransportInfo)
111: throws AxisFault {
112:
113: JMSConnectionFactory jmsConnectionFactory = null;
114: Connection connection = null; // holds a one time connection if used
115: JMSOutTransportInfo jmsOut = null;
116: Session session = null;
117: Destination destination = null;
118: Destination replyDestination = null;
119:
120: try {
121: if (targetAddress != null) {
122:
123: jmsOut = new JMSOutTransportInfo(targetAddress);
124: // do we have a definition for a connection factory to use for this address?
125: jmsConnectionFactory = getJMSConnectionFactory(jmsOut);
126:
127: if (jmsConnectionFactory != null) {
128: // create new or get existing session to send to the destination from the CF
129: session = jmsConnectionFactory
130: .getSessionForDestination(JMSUtils
131: .getDestination(targetAddress));
132:
133: } else {
134: // digest the targetAddress and locate CF from the EPR
135: jmsOut.loadConnectionFactoryFromProperies();
136: try {
137: // create a one time connection and session to be used
138: Hashtable jndiProps = jmsOut.getProperties();
139: String user = (String) jndiProps
140: .get(Context.SECURITY_PRINCIPAL);
141: String pass = (String) jndiProps
142: .get(Context.SECURITY_CREDENTIALS);
143:
144: QueueConnectionFactory qConFac = null;
145: TopicConnectionFactory tConFac = null;
146: ConnectionFactory conFac = null;
147:
148: if (JMSConstants.DESTINATION_TYPE_QUEUE
149: .equals(jmsOut.getDestinationType())) {
150: qConFac = (QueueConnectionFactory) jmsOut
151: .getConnectionFactory();
152: } else if (JMSConstants.DESTINATION_TYPE_TOPIC
153: .equals(jmsOut.getDestinationType())) {
154: tConFac = (TopicConnectionFactory) jmsOut
155: .getConnectionFactory();
156: } else {
157: handleException("Unable to determine type of JMS "
158: + "Connection Factory - i.e Queue/Topic");
159: }
160:
161: if (user != null && pass != null) {
162: if (qConFac != null) {
163: connection = qConFac
164: .createQueueConnection(user,
165: pass);
166: } else if (tConFac != null) {
167: connection = tConFac
168: .createTopicConnection(user,
169: pass);
170: }
171: } else {
172: if (qConFac != null) {
173: connection = qConFac
174: .createQueueConnection();
175: } else if (tConFac != null) {
176: connection = tConFac
177: .createTopicConnection();
178: }
179: }
180:
181: if (JMSConstants.DESTINATION_TYPE_QUEUE
182: .equals(jmsOut.getDestinationType())) {
183: session = ((QueueConnection) connection)
184: .createQueueSession(false,
185: Session.AUTO_ACKNOWLEDGE);
186: } else if (JMSConstants.DESTINATION_TYPE_TOPIC
187: .equals(jmsOut.getDestinationType())) {
188: session = ((TopicConnection) connection)
189: .createTopicSession(false,
190: Session.AUTO_ACKNOWLEDGE);
191: }
192:
193: } catch (JMSException e) {
194: handleException("Error creating a connection/session for : "
195: + targetAddress);
196: }
197: }
198: destination = jmsOut.getDestination();
199:
200: } else if (outTransportInfo != null
201: && outTransportInfo instanceof JMSOutTransportInfo) {
202:
203: jmsOut = (JMSOutTransportInfo) outTransportInfo;
204: jmsConnectionFactory = jmsOut.getJmsConnectionFactory();
205:
206: session = jmsConnectionFactory
207: .getSessionForDestination(jmsOut
208: .getDestination().toString());
209: destination = jmsOut.getDestination();
210: }
211:
212: String replyDestName = (String) msgCtx
213: .getProperty(JMSConstants.JMS_REPLY_TO);
214: if (replyDestName != null) {
215: if (jmsConnectionFactory != null) {
216: replyDestination = jmsConnectionFactory
217: .getDestination(replyDestName);
218: } else {
219: replyDestination = jmsOut
220: .getReplyDestination(replyDestName);
221: }
222: }
223:
224: if (session == null) {
225: handleException("Could not create JMS session");
226: }
227:
228: // now we are going to use the JMS session, but if this was a session from a
229: // defined JMS connection factory, we need to synchronize as sessions are not
230: // thread safe
231: synchronized (session) {
232:
233: // convert the axis message context into a JMS Message that we can send over JMS
234: Message message = null;
235: String correlationId = null;
236: try {
237: message = createJMSMessage(msgCtx, session);
238: } catch (JMSException e) {
239: handleException(
240: "Error creating a JMS message from the axis message context",
241: e);
242: }
243:
244: String destinationType = jmsOut.getDestinationType();
245:
246: // if the destination does not exist, see if we can create it
247: destination = JMSUtils.createDestinationIfRequired(
248: destination, destinationType, targetAddress,
249: session);
250:
251: // should we wait for a synchronous response on this same thread?
252: boolean waitForResponse = waitForSynchronousResponse(msgCtx);
253:
254: // if this is a synchronous out-in, prepare to listen on the response destination
255: if (waitForResponse) {
256: replyDestination = JMSUtils.setReplyDestination(
257: replyDestination, session, message);
258: }
259:
260: // send the outgoing message over JMS to the destination selected
261: JMSUtils.sendMessageToJMSDestination(session,
262: destination, destinationType, message);
263:
264: // if we are expecting a synchronous response back for the message sent out
265: if (waitForResponse) {
266: try {
267: connection.start();
268: } catch (JMSException ignore) {
269: }
270: try {
271: correlationId = message.getJMSMessageID();
272: } catch (JMSException ignore) {
273: }
274: waitForResponseAndProcess(session,
275: replyDestination, msgCtx, correlationId);
276: }
277: }
278:
279: } finally {
280: if (connection != null) {
281: try {
282: connection.close();
283: } catch (JMSException ignore) {
284: }
285: }
286: }
287: }
288:
289: /**
290: * Create a Consumer for the reply destination and wait for the response JMS message
291: * synchronously. If a message arrives within the specified time interval, process it
292: * through Axis2
293: * @param session the session to use to listen for the response
294: * @param replyDestination the JMS reply Destination
295: * @param msgCtx the outgoing message for which we are expecting the response
296: * @throws AxisFault on error
297: */
298: private void waitForResponseAndProcess(Session session,
299: Destination replyDestination, MessageContext msgCtx,
300: String correlationId) throws AxisFault {
301:
302: try {
303: MessageConsumer consumer = null;
304: if (replyDestination instanceof Queue) {
305: if (correlationId != null) {
306: consumer = ((QueueSession) session).createReceiver(
307: (Queue) replyDestination,
308: "JMSCorrelationID = '" + correlationId
309: + "'");
310: } else {
311: consumer = ((QueueSession) session)
312: .createReceiver((Queue) replyDestination);
313: }
314: } else {
315: if (correlationId != null) {
316: consumer = ((TopicSession) session)
317: .createSubscriber((Topic) replyDestination,
318: correlationId, false);
319: } else {
320: consumer = ((TopicSession) session)
321: .createSubscriber((Topic) replyDestination);
322: }
323: }
324:
325: // how long are we willing to wait for the sync response
326: long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
327: String waitReply = (String) msgCtx
328: .getProperty(JMSConstants.JMS_WAIT_REPLY);
329: if (waitReply != null) {
330: timeout = Long.valueOf(waitReply).longValue();
331: }
332:
333: if (log.isDebugEnabled()) {
334: log
335: .debug("Waiting for a maximum of "
336: + timeout
337: + "ms for a response message to destination : "
338: + replyDestination
339: + " with JMS correlation ID : "
340: + correlationId);
341: }
342:
343: Message reply = consumer.receive(timeout);
344: if (reply != null) {
345: processSyncResponse(msgCtx, reply);
346:
347: } else {
348: log
349: .warn("Did not receive a JMS response within "
350: + timeout + " ms to destination : "
351: + replyDestination
352: + " with JMS correlation ID : "
353: + correlationId);
354: }
355:
356: } catch (JMSException e) {
357: handleException(
358: "Error creating consumer or receiving reply to : "
359: + replyDestination, e);
360: }
361: }
362:
363: /**
364: * Create a JMS Message from the given MessageContext and using the given
365: * session
366: *
367: * @param msgContext the MessageContext
368: * @param session the JMS session
369: * @return a JMS message from the context and session
370: * @throws JMSException on exception
371: * @throws AxisFault on exception
372: */
373: private Message createJMSMessage(MessageContext msgContext,
374: Session session) throws JMSException, AxisFault {
375:
376: Message message = null;
377: String msgType = getProperty(msgContext,
378: JMSConstants.JMS_MESSAGE_TYPE);
379:
380: // check the first element of the SOAP body, do we have content wrapped using the
381: // default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or
382: // text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, do not create SOAP messages
383: // for JMS but just get the payload in its native format
384: String jmsPayloadType = guessMessageType(msgContext);
385:
386: if (jmsPayloadType == null) {
387:
388: OMOutputFormat format = BaseUtils
389: .getOMOutputFormat(msgContext);
390: MessageFormatter messageFormatter = null;
391: try {
392: messageFormatter = TransportUtils
393: .getMessageFormatter(msgContext);
394: } catch (AxisFault axisFault) {
395: throw new JMSException(
396: "Unable to get the message formatter to use");
397: }
398:
399: String contentType = messageFormatter.getContentType(
400: msgContext, format, msgContext.getSoapAction());
401:
402: ByteArrayOutputStream baos = new ByteArrayOutputStream();
403: try {
404: messageFormatter
405: .writeTo(msgContext, format, baos, true);
406: baos.flush();
407: } catch (IOException e) {
408: handleException("IO Error while creating BytesMessage",
409: e);
410: }
411:
412: if (msgType != null
413: && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType)
414: || contentType
415: .indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1) {
416: message = session.createBytesMessage();
417: BytesMessage bytesMsg = (BytesMessage) message;
418: bytesMsg.writeBytes(baos.toByteArray());
419: } else {
420: message = session.createTextMessage(); // default
421: TextMessage txtMsg = (TextMessage) message;
422: txtMsg.setText(new String(baos.toByteArray()));
423: }
424: message.setStringProperty(BaseConstants.CONTENT_TYPE,
425: contentType);
426:
427: } else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) {
428: message = session.createBytesMessage();
429: BytesMessage bytesMsg = (BytesMessage) message;
430: OMElement wrapper = msgContext.getEnvelope().getBody()
431: .getFirstChildWithName(
432: BaseConstants.DEFAULT_BINARY_WRAPPER);
433: OMNode omNode = wrapper.getFirstOMChild();
434: if (omNode != null && omNode instanceof OMText) {
435: Object dh = ((OMText) omNode).getDataHandler();
436: if (dh != null && dh instanceof DataHandler) {
437: ByteArrayOutputStream baos = new ByteArrayOutputStream();
438: try {
439: ((DataHandler) dh).writeTo(baos);
440: } catch (IOException e) {
441: handleException(
442: "Error serializing binary content of element : "
443: + BaseConstants.DEFAULT_BINARY_WRAPPER,
444: e);
445: }
446: bytesMsg.writeBytes(baos.toByteArray());
447: }
448: }
449:
450: } else if (JMSConstants.JMS_TEXT_MESSAGE.equals(jmsPayloadType)) {
451: message = session.createTextMessage();
452: TextMessage txtMsg = (TextMessage) message;
453: txtMsg.setText(msgContext.getEnvelope().getBody()
454: .getFirstChildWithName(
455: BaseConstants.DEFAULT_TEXT_WRAPPER)
456: .getText());
457: }
458:
459: // set the JMS correlation ID if specified
460: String correlationId = getProperty(msgContext,
461: JMSConstants.JMS_COORELATION_ID);
462: if (correlationId == null && msgContext.getRelatesTo() != null) {
463: correlationId = msgContext.getRelatesTo().getValue();
464: }
465:
466: if (correlationId != null) {
467: message.setJMSCorrelationID(correlationId);
468: }
469:
470: if (msgContext.isServerSide()) {
471: // set SOAP Action as a property on the JMS message
472: setProperty(message, msgContext, BaseConstants.SOAPACTION);
473: } else {
474: String action = msgContext.getOptions().getAction();
475: if (action != null) {
476: message.setStringProperty(BaseConstants.SOAPACTION,
477: action);
478: }
479: }
480:
481: JMSUtils.setTransportHeaders(msgContext, message);
482: return message;
483: }
484:
485: /**
486: * Guess the message type to use for JMS looking at the message contexts' envelope
487: * @param msgContext the message context
488: * @return JMSConstants.JMS_BYTE_MESSAGE or JMSConstants.JMS_TEXT_MESSAGE or null
489: */
490: private String guessMessageType(MessageContext msgContext) {
491: OMElement firstChild = msgContext.getEnvelope().getBody()
492: .getFirstElement();
493: if (firstChild != null) {
494: if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild
495: .getQName())) {
496: return JMSConstants.JMS_BYTE_MESSAGE;
497: } else if (BaseConstants.DEFAULT_TEXT_WRAPPER
498: .equals(firstChild.getQName())) {
499: return JMSConstants.JMS_TEXT_MESSAGE;
500: }
501: }
502: return null;
503: }
504:
505: /**
506: * Creates an Axis MessageContext for the received JMS message and
507: * sets up the transports and various properties
508: *
509: * @param outMsgCtx the outgoing message for which we are expecting the response
510: * @param message the JMS response message received
511: * @throws AxisFault on error
512: */
513: private void processSyncResponse(MessageContext outMsgCtx,
514: Message message) throws AxisFault {
515:
516: MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx);
517:
518: // load any transport headers from received message
519: JMSUtils.loadTransportHeaders(message, responseMsgCtx);
520:
521: // workaround for Axis2 TransportUtils.createSOAPMessage() issue, where a response
522: // of content type "text/xml" is thought to be REST if !MC.isServerSide(). This
523: // question is still under debate and due to the timelines, I am commiting this
524: // workaround as Axis2 1.2 is about to be released and Synapse 1.0
525: responseMsgCtx.setServerSide(false);
526:
527: String contentType = JMSUtils.getInstace().getProperty(message,
528: BaseConstants.CONTENT_TYPE);
529:
530: JMSUtils.getInstace().setSOAPEnvelope(message, responseMsgCtx,
531: contentType);
532: responseMsgCtx.setServerSide(true);
533:
534: handleIncomingMessage(responseMsgCtx, JMSUtils
535: .getTransportHeaders(message), JMSUtils.getInstace()
536: .getProperty(message, BaseConstants.SOAPACTION),
537: contentType);
538: }
539:
540: private void setProperty(Message message, MessageContext msgCtx,
541: String key) {
542:
543: String value = getProperty(msgCtx, key);
544: if (value != null) {
545: try {
546: message.setStringProperty(key, value);
547: } catch (JMSException e) {
548: log.warn("Couldn't set message property : " + key
549: + " = " + value, e);
550: }
551: }
552: }
553:
554: private String getProperty(MessageContext mc, String key) {
555: return (String) mc.getProperty(key);
556: }
557:
558: /**
559: * Create JMSConnectionFactory instances for the definitions in the transport sender,
560: * and add these into our collection of connectionFactories map keyed by name
561: *
562: * @param transportOut the transport-in description for JMS
563: */
564: private void loadConnectionFactoryDefinitions(
565: TransportOutDescription transportOut) {
566:
567: // iterate through all defined connection factories
568: Iterator conFacIter = transportOut.getParameters().iterator();
569:
570: while (conFacIter.hasNext()) {
571: Parameter conFacParams = (Parameter) conFacIter.next();
572:
573: JMSConnectionFactory jmsConFactory = new JMSConnectionFactory(
574: conFacParams.getName(), cfgCtx);
575: JMSUtils.setConnectionFactoryParameters(conFacParams,
576: jmsConFactory);
577:
578: try {
579: jmsConFactory.connectAndListen();
580: } catch (NamingException e) {
581: log.warn("Error looking up JMS connection factory : "
582: + jmsConFactory.getName(), e);
583: } catch (JMSException e) {
584: log.warn(
585: "Error connecting to JMS connection factory : "
586: + jmsConFactory.getName(), e);
587: }
588:
589: connectionFactories.put(jmsConFactory.getName(),
590: jmsConFactory);
591: }
592: }
593:
594: }
|