001: /*
002: * Copyright 2004,2005 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.apache.synapse.transport.jms;
017:
018: import org.apache.axiom.om.OMOutputFormat;
019: import org.apache.axiom.om.OMText;
020: import org.apache.axiom.om.OMElement;
021: import org.apache.axiom.om.util.StAXUtils;
022: import org.apache.axiom.om.impl.builder.StAXBuilder;
023: import org.apache.axiom.om.impl.builder.StAXOMBuilder;
024: import org.apache.axiom.om.impl.llom.OMTextImpl;
025: import org.apache.axiom.soap.SOAP11Constants;
026: import org.apache.axiom.soap.SOAP12Constants;
027: import org.apache.axiom.soap.SOAPEnvelope;
028: import org.apache.axiom.soap.SOAPFactory;
029: import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
030: import org.apache.axiom.attachments.ByteArrayDataSource;
031: import org.apache.axis2.AxisFault;
032: import org.apache.axis2.Constants;
033: import org.apache.axis2.builder.BuilderUtil;
034: import org.apache.axis2.context.MessageContext;
035: import org.apache.axis2.description.AxisService;
036: import org.apache.axis2.description.Parameter;
037: import org.apache.axis2.description.AxisOperation;
038: import org.apache.axis2.description.ParameterIncludeImpl;
039: import org.apache.axis2.transport.http.HTTPTransportUtils;
040: import org.apache.synapse.transport.base.BaseUtils;
041: import org.apache.synapse.transport.base.BaseConstants;
042: import org.apache.commons.logging.Log;
043: import org.apache.commons.logging.LogFactory;
044:
045: import javax.jms.*;
046: import javax.jms.Queue;
047: import javax.xml.stream.XMLStreamException;
048: import javax.xml.namespace.QName;
049: import javax.activation.DataHandler;
050: import javax.naming.Context;
051: import java.io.*;
052: import java.util.*;
053: import java.nio.ByteBuffer;
054:
055: /**
056: * Miscallaneous methods used for the JMS transport
057: */
058: public class JMSUtils extends BaseUtils {
059:
060: private static final Log log = LogFactory.getLog(JMSUtils.class);
061:
062: private static BaseUtils _instance = new JMSUtils();
063:
064: public static BaseUtils getInstace() {
065: return _instance;
066: }
067:
068: /**
069: * Create a JMS Queue using the given connection with the JNDI destination name, and return the
070: * JMS Destination name of the created queue
071: *
072: * @param con the JMS Connection to be used
073: * @param destinationJNDIName the JNDI name of the Queue to be created
074: * @return the JMS Destination name of the created Queue
075: * @throws JMSException on error
076: */
077: public static String createJMSQueue(Connection con,
078: String destinationJNDIName) throws JMSException {
079: try {
080: QueueSession session = ((QueueConnection) con)
081: .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
082: Queue queue = session.createQueue(destinationJNDIName);
083: log.info("JMS Queue with JNDI name : "
084: + destinationJNDIName + " created");
085: return queue.getQueueName();
086:
087: } finally {
088: try {
089: con.close();
090: } catch (JMSException ignore) {
091: }
092: }
093: }
094:
095: /**
096: * Create a JMS Topic using the given connection with the JNDI destination name, and return the
097: * JMS Destination name of the created queue
098: *
099: * @param con the JMS Connection to be used
100: * @param destinationJNDIName the JNDI name of the Topic to be created
101: * @return the JMS Destination name of the created Topic
102: * @throws JMSException on error
103: */
104: public static String createJMSTopic(Connection con,
105: String destinationJNDIName) throws JMSException {
106: try {
107: TopicSession session = ((TopicConnection) con)
108: .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
109: Topic topic = session.createTopic(destinationJNDIName);
110: log.info("JMS Topic with JNDI name : "
111: + destinationJNDIName + " created");
112: return topic.getTopicName();
113:
114: } finally {
115: try {
116: con.close();
117: } catch (JMSException ignore) {
118: }
119: }
120: }
121:
122: /**
123: * Should this service be enabled over the JMS transport?
124: *
125: * @param service the Axis service
126: * @return true if JMS should be enabled
127: */
128: public static boolean isJMSService(AxisService service) {
129: if (service.isEnableAllTransports()) {
130: return true;
131:
132: } else {
133: List transports = service.getExposedTransports();
134: for (int i = 0; i < transports.size(); i++) {
135: if (JMSListener.TRANSPORT_NAME
136: .equals(transports.get(i))) {
137: return true;
138: }
139: }
140: }
141: return false;
142: }
143:
144: /**
145: * Get the JMS destination used by this service
146: *
147: * @param service the Axis Service
148: * @return the name of the JMS destination
149: */
150: public static String getJNDIDestinationNameForService(
151: AxisService service) {
152: Parameter destParam = service
153: .getParameter(JMSConstants.DEST_PARAM);
154: if (destParam != null) {
155: return (String) destParam.getValue();
156: } else {
157: return service.getName();
158: }
159: }
160:
161: /**
162: * Get the JMS destination type of this service
163: *
164: * @param service the Axis Service
165: * @return the name of the JMS destination
166: */
167: public static String getDestinationTypeForService(
168: AxisService service) {
169: Parameter destTypeParam = service
170: .getParameter(JMSConstants.DEST_PARAM_TYPE);
171: if (destTypeParam != null) {
172: String paramValue = (String) destTypeParam.getValue();
173: if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue)
174: || JMSConstants.DESTINATION_TYPE_TOPIC
175: .equals(paramValue)) {
176: return paramValue;
177: } else {
178: handleException("Invalid destinaton type value "
179: + paramValue);
180: return null;
181: }
182: } else {
183: log.debug("JMS destination type not given. default queue");
184: return JMSConstants.DESTINATION_TYPE_QUEUE;
185: }
186: }
187:
188: /**
189: * Extract connection factory properties from a given URL
190: *
191: * @param url a JMS URL of the form jms:/<destination>?[<key>=<value>&]*
192: * @return a Hashtable of extracted properties
193: */
194: public static Hashtable getProperties(String url) {
195: Hashtable h = new Hashtable();
196: int propPos = url.indexOf("?");
197: if (propPos != -1) {
198: StringTokenizer st = new StringTokenizer(url
199: .substring(propPos + 1), "&");
200: while (st.hasMoreTokens()) {
201: String token = st.nextToken();
202: int sep = token.indexOf("=");
203: if (sep != -1) {
204: h.put(token.substring(0, sep), token
205: .substring(sep + 1));
206: } else {
207: continue; // ignore, what else can we do?
208: }
209: }
210: }
211: return h;
212: }
213:
214: /**
215: * Get the EPR for the given JMS connection factory and destination
216: * the form of the URL is
217: * jms:/<destination>?[<key>=<value>&]*
218: *
219: * @param cf the Axis2 JMS connection factory
220: * @param destination the JNDI name of the destination
221: * @return the EPR as a String
222: */
223: static String getEPR(JMSConnectionFactory cf, String destination) {
224: StringBuffer sb = new StringBuffer();
225: sb.append(JMSConstants.JMS_PREFIX).append(destination);
226: sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM)
227: .append("=").append(cf.getConnFactoryJNDIName());
228: Iterator props = cf.getJndiProperties().keySet().iterator();
229: while (props.hasNext()) {
230: String key = (String) props.next();
231: String value = (String) cf.getJndiProperties().get(key);
232: sb.append("&").append(key).append("=").append(value);
233: }
234: return sb.toString();
235: }
236:
237: /**
238: * Get a String property from the JMS message
239: *
240: * @param message JMS message
241: * @param property property name
242: * @return property value
243: */
244: public String getProperty(Object message, String property) {
245: try {
246: return ((Message) message).getStringProperty(property);
247: } catch (JMSException e) {
248: return null;
249: }
250: }
251:
252: /**
253: * Return the destination name from the given URL
254: *
255: * @param url the URL
256: * @return the destination name
257: */
258: public static String getDestination(String url) {
259: String tempUrl = url
260: .substring(JMSConstants.JMS_PREFIX.length());
261: int propPos = tempUrl.indexOf("?");
262:
263: if (propPos == -1) {
264: return tempUrl;
265: } else {
266: return tempUrl.substring(0, propPos);
267: }
268: }
269:
270: /**
271: * Set JNDI properties and any other connection factory parameters to the connection factory
272: * passed in, looing at the parameter in axis2.xml
273: * @param param the axis parameter that holds the connection factory settings
274: * @param jmsConFactory the JMS connection factory to which the parameters should be applied
275: */
276: public static void setConnectionFactoryParameters(Parameter param,
277: JMSConnectionFactory jmsConFactory) {
278:
279: ParameterIncludeImpl pi = new ParameterIncludeImpl();
280: try {
281: pi.deserializeParameters((OMElement) param.getValue());
282: } catch (AxisFault axisFault) {
283: log.error(
284: "Error reading parameters for JMS connection factory"
285: + jmsConFactory.getName(), axisFault);
286: }
287:
288: Iterator params = pi.getParameters().iterator();
289: while (params.hasNext()) {
290:
291: Parameter p = (Parameter) params.next();
292:
293: if (JMSConstants.CONFAC_TYPE.equals(p.getName())) {
294: String connectionFactoryType = (String) p.getValue();
295: jmsConFactory
296: .setConnectionFactoryType(connectionFactoryType);
297:
298: } else if (JMSConstants.RECONNECT_TIMEOUT.equals(p
299: .getName())) {
300: String strTimeout = (String) p.getValue();
301: int reconnectTimeoutSeconds = Integer
302: .parseInt(strTimeout);
303: long reconnectTimeoutMillis = reconnectTimeoutSeconds * 1000;
304: jmsConFactory
305: .setReconnectTimeout(reconnectTimeoutMillis);
306:
307: } else if (Context.INITIAL_CONTEXT_FACTORY.equals(p
308: .getName())) {
309: jmsConFactory.addJNDIContextProperty(
310: Context.INITIAL_CONTEXT_FACTORY, (String) p
311: .getValue());
312: } else if (Context.PROVIDER_URL.equals(p.getName())) {
313: jmsConFactory.addJNDIContextProperty(
314: Context.PROVIDER_URL, (String) p.getValue());
315: } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) {
316: jmsConFactory.addJNDIContextProperty(
317: Context.SECURITY_PRINCIPAL, (String) p
318: .getValue());
319: } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) {
320: jmsConFactory.addJNDIContextProperty(
321: Context.SECURITY_CREDENTIALS, (String) p
322: .getValue());
323: } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p
324: .getName())) {
325: jmsConFactory.setConnFactoryJNDIName((String) p
326: .getValue());
327: jmsConFactory.addJNDIContextProperty(
328: JMSConstants.CONFAC_JNDI_NAME_PARAM, (String) p
329: .getValue());
330: }
331: }
332: }
333:
334: /**
335: * Get an InputStream to the JMS message payload
336: *
337: * @param message the JMS message
338: * @return an InputStream to the payload
339: */
340: public InputStream getInputStream(Object message) {
341:
342: try {
343: if (message instanceof BytesMessage) {
344: byte[] buffer = new byte[1024];
345: ByteArrayOutputStream out = new ByteArrayOutputStream();
346:
347: BytesMessage byteMsg = (BytesMessage) message;
348: for (int bytesRead = byteMsg.readBytes(buffer); bytesRead != -1; bytesRead = byteMsg
349: .readBytes(buffer)) {
350: out.write(buffer, 0, bytesRead);
351: }
352: return new ByteArrayInputStream(out.toByteArray());
353:
354: } else if (message instanceof TextMessage) {
355: TextMessage txtMsg = (TextMessage) message;
356: String contentType = getProperty(txtMsg,
357: BaseConstants.CONTENT_TYPE);
358:
359: if (contentType != null) {
360: return new ByteArrayInputStream(
361: txtMsg
362: .getText()
363: .getBytes(
364: BuilderUtil
365: .getCharSetEncoding(contentType)));
366: } else {
367: return new ByteArrayInputStream(txtMsg.getText()
368: .getBytes());
369: }
370:
371: } else {
372: handleException("Unsupported JMS message type : "
373: + message.getClass().getName());
374: }
375:
376: } catch (JMSException e) {
377: handleException("JMS Exception reading message payload", e);
378: } catch (UnsupportedEncodingException e) {
379: handleException(
380: "Encoding exception getting InputStream into message",
381: e);
382: }
383: return null;
384: }
385:
386: /**
387: * Set the JMS ReplyTo for the message
388: *
389: * @param replyDestination the JMS Destination where the reply is expected
390: * @param session the session to use to create a temp Queue if a response is expected
391: * but a Destination has not been specified
392: * @param message the JMS message where the final Destinatio would be set as the JMS ReplyTo
393: * @return the JMS ReplyTo Destination for the message
394: */
395: public static Destination setReplyDestination(
396: Destination replyDestination, Session session,
397: Message message) {
398: if (replyDestination == null) {
399: try {
400: // create temporary queue to receive the reply
401: replyDestination = createTemporaryDestination(session);
402: } catch (JMSException e) {
403: handleException("Error creating temporary queue for response");
404: }
405: }
406:
407: try {
408: message.setJMSReplyTo(replyDestination);
409: } catch (JMSException e) {
410: log.warn("Error setting JMS ReplyTo destination to : "
411: + replyDestination, e);
412: }
413:
414: if (log.isDebugEnabled()) {
415: try {
416: log
417: .debug("Expecting a response to JMS Destination : "
418: + (replyDestination instanceof Queue ? ((Queue) replyDestination)
419: .getQueueName()
420: : ((Topic) replyDestination)
421: .getTopicName()));
422: } catch (JMSException ignore) {
423: }
424: }
425: return replyDestination;
426: }
427:
428: /**
429: * When trying to send a message to a destination, if it does not exist, try to create it
430: *
431: * @param destination the JMS destination to send messages
432: * @param targetAddress the target JMS EPR to find the Destination to be created if required
433: * @param session the JMS session to use
434: * @return the JMS Destination where messages could be posted
435: * @throws AxisFault if the target Destination does not exist and cannot be created
436: */
437: public static Destination createDestinationIfRequired(
438: Destination destination, String destinationType,
439: String targetAddress, Session session) throws AxisFault {
440: if (destination == null) {
441: if (targetAddress != null) {
442: String name = JMSUtils.getDestination(targetAddress);
443: if (log.isDebugEnabled()) {
444: log.debug("Creating JMS Destination : " + name);
445: }
446:
447: try {
448: destination = createDestination(session, name,
449: destinationType);
450: } catch (JMSException e) {
451: handleException(
452: "Error creating destination Queue : "
453: + name, e);
454: }
455: } else {
456: handleException("Cannot send reply to null JMS Destination");
457: }
458: }
459: return destination;
460: }
461:
462: /**
463: * Send the given message to the Destination using the given session
464: * @param session the session to use to send
465: * @param destination the Destination
466: * @param message the JMS Message
467: * @throws AxisFault on error
468: */
469: public static void sendMessageToJMSDestination(Session session,
470: Destination destination, String destinationType,
471: Message message) throws AxisFault {
472:
473: MessageProducer producer = null;
474: try {
475: if (log.isDebugEnabled()) {
476: log.debug("Sending message to destination : "
477: + destination);
478: }
479:
480: if (JMSConstants.DESTINATION_TYPE_TOPIC
481: .equals(destinationType)) {
482: producer = ((TopicSession) session)
483: .createPublisher((Topic) destination);
484: ((TopicPublisher) producer).publish(message);
485: } else {
486: producer = ((QueueSession) session)
487: .createSender((Queue) destination);
488: ((QueueSender) producer).send(message);
489: }
490:
491: if (log.isDebugEnabled()) {
492: log.debug("Sent message to destination : "
493: + destination + "\nMessage ID : "
494: + message.getJMSMessageID()
495: + "\nCorrelation ID : "
496: + message.getJMSCorrelationID()
497: + "\nReplyTo ID : " + message.getJMSReplyTo());
498: }
499:
500: } catch (JMSException e) {
501: handleException(
502: "Error creating a producer or sending to : "
503: + destination, e);
504: } finally {
505: if (producer != null) {
506: try {
507: producer.close();
508: } catch (JMSException ignore) {
509: }
510: }
511: }
512: }
513:
514: /**
515: * Set transport headers from the axis message context, into the JMS message
516: *
517: * @param msgContext the axis message context
518: * @param message the JMS Message
519: * @throws JMSException on exception
520: */
521: public static void setTransportHeaders(MessageContext msgContext,
522: Message message) throws JMSException {
523:
524: Map headerMap = (Map) msgContext
525: .getProperty(MessageContext.TRANSPORT_HEADERS);
526:
527: if (headerMap == null) {
528: return;
529: }
530:
531: Iterator iter = headerMap.keySet().iterator();
532: while (iter.hasNext()) {
533:
534: String name = (String) iter.next();
535:
536: if (JMSConstants.JMS_COORELATION_ID.equals(name)) {
537: message.setJMSCorrelationID((String) headerMap
538: .get(JMSConstants.JMS_COORELATION_ID));
539: } else if (JMSConstants.JMS_DELIVERY_MODE.equals(name)) {
540: Object o = headerMap
541: .get(JMSConstants.JMS_DELIVERY_MODE);
542: if (o instanceof Integer) {
543: message
544: .setJMSDeliveryMode(((Integer) o)
545: .intValue());
546: } else if (o instanceof String) {
547: try {
548: message.setJMSDeliveryMode(Integer
549: .parseInt((String) o));
550: } catch (NumberFormatException nfe) {
551: log.warn(
552: "Invalid delivery mode ignored : " + o,
553: nfe);
554: }
555: } else {
556: log.warn("Invalid delivery mode ignored : " + o);
557: }
558: } else if (JMSConstants.JMS_EXPIRATION.equals(name)) {
559: message.setJMSExpiration(Long
560: .parseLong((String) headerMap
561: .get(JMSConstants.JMS_EXPIRATION)));
562: } else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) {
563: message.setJMSMessageID((String) headerMap
564: .get(JMSConstants.JMS_MESSAGE_ID));
565: } else if (JMSConstants.JMS_PRIORITY.equals(name)) {
566: message.setJMSPriority(Integer
567: .parseInt((String) headerMap
568: .get(JMSConstants.JMS_PRIORITY)));
569: } else if (JMSConstants.JMS_TIMESTAMP.equals(name)) {
570: message.setJMSTimestamp(Long
571: .parseLong((String) headerMap
572: .get(JMSConstants.JMS_TIMESTAMP)));
573: } else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) {
574: message.setJMSType((String) headerMap
575: .get(JMSConstants.JMS_MESSAGE_TYPE));
576: } else {
577: Object value = headerMap.get(name);
578: if (value instanceof String) {
579: message.setStringProperty(name, (String) value);
580: } else if (value instanceof Boolean) {
581: message.setBooleanProperty(name, ((Boolean) value)
582: .booleanValue());
583: } else if (value instanceof Integer) {
584: message.setIntProperty(name, ((Integer) value)
585: .intValue());
586: } else if (value instanceof Long) {
587: message.setLongProperty(name, ((Long) value)
588: .longValue());
589: } else if (value instanceof Double) {
590: message.setDoubleProperty(name, ((Double) value)
591: .doubleValue());
592: } else if (value instanceof Float) {
593: message.setFloatProperty(name, ((Float) value)
594: .floatValue());
595: }
596: }
597: }
598: }
599:
600: /**
601: * Read the transport headers from the JMS Message and set them to the axis2 message context
602: *
603: * @param message the JMS Message received
604: * @param responseMsgCtx the axis message context
605: * @throws AxisFault on error
606: */
607: public static void loadTransportHeaders(Message message,
608: MessageContext responseMsgCtx) throws AxisFault {
609: responseMsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS,
610: getTransportHeaders(message));
611: }
612:
613: /**
614: * Extract transport level headers for JMS from the given message into a Map
615: *
616: * @param message the JMS message
617: * @return a Map of the transport headers
618: */
619: public static Map getTransportHeaders(Message message) {
620: // create a Map to hold transport headers
621: Map map = new HashMap();
622:
623: // correlation ID
624: try {
625: if (message.getJMSCorrelationID() != null) {
626: map.put(JMSConstants.JMS_COORELATION_ID, message
627: .getJMSCorrelationID());
628: }
629: } catch (JMSException ignore) {
630: }
631:
632: // set the delivery mode as persistent or not
633: try {
634: map.put(JMSConstants.JMS_DELIVERY_MODE, Integer
635: .toString(message.getJMSDeliveryMode()));
636: } catch (JMSException ignore) {
637: }
638:
639: // destination name
640: try {
641: if (message.getJMSDestination() != null) {
642: Destination dest = message.getJMSDestination();
643: map.put(JMSConstants.JMS_DESTINATION,
644: dest instanceof Queue ? ((Queue) dest)
645: .getQueueName() : ((Topic) dest)
646: .getTopicName());
647: }
648: } catch (JMSException ignore) {
649: }
650:
651: // expiration
652: try {
653: map.put(JMSConstants.JMS_EXPIRATION, Long.toString(message
654: .getJMSExpiration()));
655: } catch (JMSException ignore) {
656: }
657:
658: // if a JMS message ID is found
659: try {
660: if (message.getJMSMessageID() != null) {
661: map.put(JMSConstants.JMS_MESSAGE_ID, message
662: .getJMSMessageID());
663: }
664: } catch (JMSException ignore) {
665: }
666:
667: // priority
668: try {
669: map.put(JMSConstants.JMS_PRIORITY, Long.toString(message
670: .getJMSPriority()));
671: } catch (JMSException ignore) {
672: }
673:
674: // redelivered
675: try {
676: map.put(JMSConstants.JMS_REDELIVERED, Boolean
677: .toString(message.getJMSRedelivered()));
678: } catch (JMSException ignore) {
679: }
680:
681: // replyto destination name
682: try {
683: if (message.getJMSReplyTo() != null) {
684: Destination dest = message.getJMSReplyTo();
685: map.put(JMSConstants.JMS_REPLY_TO,
686: dest instanceof Queue ? ((Queue) dest)
687: .getQueueName() : ((Topic) dest)
688: .getTopicName());
689: }
690: } catch (JMSException ignore) {
691: }
692:
693: // priority
694: try {
695: map.put(JMSConstants.JMS_TIMESTAMP, Long.toString(message
696: .getJMSTimestamp()));
697: } catch (JMSException ignore) {
698: }
699:
700: // message type
701: try {
702: if (message.getJMSType() != null) {
703: map.put(JMSConstants.JMS_TYPE, message.getJMSType());
704: }
705: } catch (JMSException ignore) {
706: }
707:
708: // any other transport properties / headers
709: Enumeration e = null;
710: try {
711: e = message.getPropertyNames();
712: } catch (JMSException ignore) {
713: }
714:
715: if (e != null) {
716: while (e.hasMoreElements()) {
717: String headerName = (String) e.nextElement();
718: try {
719: map.put(headerName, message
720: .getStringProperty(headerName));
721: continue;
722: } catch (JMSException ignore) {
723: }
724: try {
725: map.put(headerName, Boolean.valueOf(message
726: .getBooleanProperty(headerName)));
727: continue;
728: } catch (JMSException ignore) {
729: }
730: try {
731: map.put(headerName, new Integer(message
732: .getIntProperty(headerName)));
733: continue;
734: } catch (JMSException ignore) {
735: }
736: try {
737: map.put(headerName, new Long(message
738: .getLongProperty(headerName)));
739: continue;
740: } catch (JMSException ignore) {
741: }
742: try {
743: map.put(headerName, new Double(message
744: .getDoubleProperty(headerName)));
745: continue;
746: } catch (JMSException ignore) {
747: }
748: try {
749: map.put(headerName, new Float(message
750: .getFloatProperty(headerName)));
751: continue;
752: } catch (JMSException ignore) {
753: }
754: }
755: }
756:
757: return map;
758: }
759:
760: public String getMessageTextPayload(Object message) {
761: if (message instanceof TextMessage) {
762: try {
763: return ((TextMessage) message).getText();
764: } catch (JMSException e) {
765: handleException(
766: "Error reading JMS text message payload", e);
767: }
768: }
769: return null;
770: }
771:
772: public byte[] getMessageBinaryPayload(Object message) {
773:
774: if (message instanceof BytesMessage) {
775: BytesMessage bytesMessage = (BytesMessage) message;
776:
777: try {
778: bytesMessage.reset();
779:
780: byte[] buffer = new byte[1024];
781: ByteArrayOutputStream out = new ByteArrayOutputStream();
782:
783: for (int bytesRead = bytesMessage.readBytes(buffer); bytesRead != -1; bytesRead = bytesMessage
784: .readBytes(buffer)) {
785: out.write(buffer, 0, bytesRead);
786: }
787: return out.toByteArray();
788:
789: } catch (JMSException e) {
790: handleException(
791: "Error reading JMS binary message payload", e);
792: }
793: }
794: return null;
795: }
796:
797: // ----------- JMS 1.0.2b compatibility methods -------------
798: public static Connection createConnection(
799: ConnectionFactory conFactory, String user, String pass,
800: String destinationType) throws JMSException {
801:
802: if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
803: if (user != null && pass != null) {
804: return ((QueueConnectionFactory) conFactory)
805: .createQueueConnection(user, pass);
806: } else {
807: return ((QueueConnectionFactory) conFactory)
808: .createQueueConnection();
809: }
810:
811: } else if (JMSConstants.DESTINATION_TYPE_TOPIC
812: .equals(destinationType)) {
813: if (user != null && pass != null) {
814: return ((TopicConnectionFactory) conFactory)
815: .createTopicConnection(user, pass);
816: } else {
817: return ((TopicConnectionFactory) conFactory)
818: .createTopicConnection();
819: }
820: } else {
821: handleException("Unable to determine type of JMS Connection Factory - i.e Queue/Topic");
822: }
823: return null;
824: }
825:
826: public static Session createSession(Connection con,
827: boolean transacted, int acknowledgeMode,
828: String destinationType) throws JMSException {
829:
830: if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
831: return ((QueueConnection) con).createQueueSession(
832: transacted, acknowledgeMode);
833: } else if (JMSConstants.DESTINATION_TYPE_TOPIC
834: .equals(destinationType)) {
835: return ((TopicConnection) con).createTopicSession(
836: transacted, acknowledgeMode);
837: } else {
838: log
839: .debug("JMS destination type not given or invalid. default queue. was "
840: + destinationType);
841: return ((QueueConnection) con).createQueueSession(
842: transacted, acknowledgeMode);
843: }
844: }
845:
846: public static Destination createDestination(Session session,
847: String destName, String destinationType)
848: throws JMSException {
849:
850: if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
851: return ((QueueSession) session).createQueue(destName);
852: } else if (JMSConstants.DESTINATION_TYPE_TOPIC
853: .equals(destinationType)) {
854: return ((TopicSession) session).createTopic(destName);
855: } else {
856: log
857: .debug("JMS destination type not given or invalid. default queue. was "
858: + destinationType);
859: return ((QueueSession) session).createQueue(destName);
860: }
861: }
862:
863: public static void createDestination(ConnectionFactory conFactory,
864: String destinationJNDIName, String destinationType)
865: throws JMSException {
866:
867: if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
868: JMSUtils.createJMSQueue(
869: ((QueueConnectionFactory) conFactory)
870: .createQueueConnection(),
871: destinationJNDIName);
872: } else if (JMSConstants.DESTINATION_TYPE_TOPIC
873: .equals(destinationType)) {
874: JMSUtils.createJMSTopic(
875: ((TopicConnectionFactory) conFactory)
876: .createTopicConnection(),
877: destinationJNDIName);
878: }
879: }
880:
881: public static MessageConsumer createConsumer(Session session,
882: Destination dest) throws JMSException {
883:
884: if (dest instanceof Queue) {
885: return ((QueueSession) session)
886: .createReceiver((Queue) dest);
887: } else {
888: return ((TopicSession) session)
889: .createSubscriber((Topic) dest);
890: }
891: }
892:
893: public static Destination createTemporaryDestination(Session session)
894: throws JMSException {
895:
896: if (session instanceof QueueSession) {
897: return ((QueueSession) session).createTemporaryQueue();
898: } else {
899: return ((TopicSession) session).createTemporaryTopic();
900: }
901: }
902: }
|