001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.axis2.transport.jms;
020:
021: import org.apache.axiom.om.OMOutputFormat;
022: import org.apache.axiom.om.OMText;
023: import org.apache.axiom.om.OMElement;
024: import org.apache.axiom.om.util.StAXUtils;
025: import org.apache.axiom.om.impl.builder.StAXBuilder;
026: import org.apache.axiom.om.impl.builder.StAXOMBuilder;
027: import org.apache.axiom.om.impl.llom.OMTextImpl;
028: import org.apache.axiom.soap.SOAP11Constants;
029: import org.apache.axiom.soap.SOAP12Constants;
030: import org.apache.axiom.soap.SOAPEnvelope;
031: import org.apache.axiom.soap.SOAPFactory;
032: import org.apache.axiom.soap.impl.builder.StAXSOAPModelBuilder;
033: import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
034: import org.apache.axiom.attachments.ByteArrayDataSource;
035: import org.apache.axis2.AxisFault;
036: import org.apache.axis2.Constants;
037: import org.apache.axis2.builder.BuilderUtil;
038: import org.apache.axis2.context.MessageContext;
039: import org.apache.axis2.context.OperationContext;
040: import org.apache.axis2.description.AxisService;
041: import org.apache.axis2.description.Parameter;
042: import org.apache.axis2.description.AxisOperation;
043: import org.apache.axis2.engine.AxisConfiguration;
044: import org.apache.axis2.transport.http.HTTPConstants;
045: import org.apache.axis2.util.JavaUtils;
046: import org.apache.commons.logging.Log;
047: import org.apache.commons.logging.LogFactory;
048:
049: import javax.jms.BytesMessage;
050: import javax.jms.JMSException;
051: import javax.jms.Message;
052: import javax.jms.TextMessage;
053: import javax.xml.stream.XMLStreamException;
054: import javax.xml.stream.XMLStreamReader;
055: import javax.xml.namespace.QName;
056: import javax.activation.DataHandler;
057: import java.io.*;
058: import java.util.Hashtable;
059: import java.util.List;
060: import java.util.StringTokenizer;
061:
062: public class JMSUtils {
063:
064: private static final Log log = LogFactory.getLog(JMSUtils.class);
065:
066: /**
067: * Should this service be enabled on JMS transport?
068: *
069: * @param service the Axis service
070: * @return true if JMS should be enabled
071: */
072: public static boolean isJMSService(AxisService service) {
073: boolean process = service.isEnableAllTransports();
074: if (process) {
075: return true;
076:
077: } else {
078: List transports = service.getExposedTransports();
079: for (int i = 0; i < transports.size(); i++) {
080: if (Constants.TRANSPORT_JMS.equals(transports.get(i))) {
081: return true;
082: }
083: }
084: }
085: return false;
086: }
087:
088: /**
089: * Get the JMS destination used by this service
090: *
091: * @param service the Axis Service
092: * @return the name of the JMS destination
093: */
094: public static String getDestination(AxisService service) {
095: Parameter destParam = service
096: .getParameter(JMSConstants.DEST_PARAM);
097:
098: // validate destination
099: String destination = null;
100: if (destParam != null) {
101: destination = (String) destParam.getValue();
102: } else {
103: destination = service.getName();
104: }
105: return destination;
106: }
107:
108: /**
109: * Extract connection factory properties from a given URL
110: *
111: * @param url a JMS URL of the form jms:/<destination>?[<key>=<value>&]*
112: * @return a Hashtable of extracted properties
113: */
114: public static Hashtable getProperties(String url) {
115: Hashtable h = new Hashtable();
116: int propPos = url.indexOf("?");
117: if (propPos != -1) {
118: StringTokenizer st = new StringTokenizer(url
119: .substring(propPos + 1), "&");
120: while (st.hasMoreTokens()) {
121: String token = st.nextToken();
122: int sep = token.indexOf("=");
123: if (sep != -1) {
124: h.put(token.substring(0, sep), token
125: .substring(sep + 1));
126: } else {
127: continue; // ignore, what else can we do?
128: }
129: }
130: }
131: return h;
132: }
133:
134: /**
135: * Marks the given service as faulty with the given comment
136: *
137: * @param serviceName service name
138: * @param msg comment for being faulty
139: * @param axisCfg configuration context
140: */
141: public static void markServiceAsFaulty(String serviceName,
142: String msg, AxisConfiguration axisCfg) {
143: if (serviceName != null) {
144: try {
145: AxisService service = axisCfg.getService(serviceName);
146: axisCfg.getFaultyServices().put(service.getName(), msg);
147:
148: } catch (AxisFault axisFault) {
149: log.warn("Error marking service : " + serviceName
150: + " as faulty due to : " + msg, axisFault);
151: }
152: }
153: }
154:
155: /**
156: * Get an InputStream to the message
157: *
158: * @param message the JMS message
159: * @return an InputStream
160: */
161: public static InputStream getInputStream(Message message) {
162:
163: try {
164: // get the incoming msg content into a byte array
165: if (message instanceof BytesMessage) {
166: byte[] buffer = new byte[8 * 1024];
167: ByteArrayOutputStream out = new ByteArrayOutputStream();
168:
169: BytesMessage byteMsg = (BytesMessage) message;
170: for (int bytesRead = byteMsg.readBytes(buffer); bytesRead != -1; bytesRead = byteMsg
171: .readBytes(buffer)) {
172: out.write(buffer, 0, bytesRead);
173: }
174: return new ByteArrayInputStream(out.toByteArray());
175:
176: } else if (message instanceof TextMessage) {
177: TextMessage txtMsg = (TextMessage) message;
178: String contentType = message
179: .getStringProperty(JMSConstants.CONTENT_TYPE);
180: if (contentType != null) {
181: return new ByteArrayInputStream(
182: txtMsg
183: .getText()
184: .getBytes(
185: BuilderUtil
186: .getCharSetEncoding(contentType)));
187: } else {
188: return new ByteArrayInputStream(txtMsg.getText()
189: .getBytes());
190: }
191:
192: } else {
193: handleException("Unsupported JMS message type : "
194: + message.getClass().getName());
195: }
196:
197: } catch (JMSException e) {
198: handleException(
199: "JMS Exception getting InputStream into message", e);
200: } catch (UnsupportedEncodingException e) {
201: handleException(
202: "Encoding exception getting InputStream into message",
203: e);
204: }
205: return null;
206: }
207:
208: /**
209: * Get a String property from the JMS message
210: *
211: * @param message JMS message
212: * @param property property name
213: * @return property value
214: */
215: public static String getProperty(Message message, String property) {
216: try {
217: return message.getStringProperty(property);
218: } catch (JMSException e) {
219: return null;
220: }
221: }
222:
223: /**
224: * Get the context type from the Axis MessageContext
225: *
226: * @param msgCtx message context
227: * @return the content type
228: */
229: public static String getContentType(MessageContext msgCtx) {
230: OMOutputFormat format = new OMOutputFormat();
231: String soapActionString = getSOAPAction(msgCtx);
232: String charSetEnc = (String) msgCtx
233: .getProperty(Constants.Configuration.CHARACTER_SET_ENCODING);
234:
235: if (charSetEnc != null) {
236: format.setCharSetEncoding(charSetEnc);
237: } else {
238: OperationContext opctx = msgCtx.getOperationContext();
239: if (opctx != null) {
240: charSetEnc = (String) opctx
241: .getProperty(Constants.Configuration.CHARACTER_SET_ENCODING);
242: }
243: }
244:
245: // If the char set enc is still not found use the default
246: if (charSetEnc == null) {
247: charSetEnc = MessageContext.DEFAULT_CHAR_SET_ENCODING;
248: }
249:
250: format.setSOAP11(msgCtx.isSOAP11());
251: format.setCharSetEncoding(charSetEnc);
252:
253: String encoding = format.getCharSetEncoding();
254: String contentType = format.getContentType();
255:
256: if (encoding != null) {
257: contentType += "; charset=" + encoding;
258: }
259:
260: // action header is not mandated in SOAP 1.2. So putting it, if available
261: if (!msgCtx.isSOAP11() && soapActionString != null
262: && !"".equals(soapActionString.trim())) {
263: contentType = contentType + ";action=\"" + soapActionString
264: + "\";";
265: }
266:
267: return contentType;
268: }
269:
270: /**
271: * Get the SOAP Action from the message context
272: *
273: * @param msgCtx the MessageContext
274: * @return the SOAP Action as s String if present, or the WS-Action
275: */
276: private static String getSOAPAction(MessageContext msgCtx) {
277: String soapActionString = msgCtx.getSoapAction();
278:
279: if (soapActionString == null
280: || soapActionString.trim().length() == 0) {
281: soapActionString = msgCtx.getWSAAction();
282: }
283:
284: Object disableSoapAction = msgCtx.getOptions().getProperty(
285: Constants.Configuration.DISABLE_SOAP_ACTION);
286:
287: if (soapActionString == null
288: || JavaUtils.isTrueExplicitly(disableSoapAction)) {
289: soapActionString = "";
290: }
291:
292: return soapActionString;
293: }
294:
295: /**
296: * Return the destination name from the given URL
297: *
298: * @param url the URL
299: * @return the destination name
300: */
301: public static String getDestination(String url) {
302: String tempUrl = url
303: .substring(JMSConstants.JMS_PREFIX.length());
304: int propPos = tempUrl.indexOf("?");
305:
306: if (propPos == -1) {
307: return tempUrl;
308: } else {
309: return tempUrl.substring(0, propPos);
310: }
311: }
312:
313: /**
314: * Return a SOAPEnvelope created from the given JMS Message and Axis
315: * MessageContext, and the InputStream into the message
316: *
317: * @param message the JMS Message
318: * @param msgContext the Axis MessageContext
319: * @param in the InputStream into the message
320: * @return SOAPEnvelope for the message
321: * @throws javax.xml.stream.XMLStreamException
322: *
323: */
324: public static SOAPEnvelope getSOAPEnvelope(Message message,
325: MessageContext msgContext, InputStream in)
326: throws XMLStreamException {
327:
328: SOAPEnvelope envelope = null;
329: StAXBuilder builder = null;
330: String contentType = JMSUtils.getProperty(message,
331: JMSConstants.CONTENT_TYPE);
332:
333: if (contentType != null) {
334: if (contentType
335: .indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1) {
336: builder = BuilderUtil.getAttachmentsBuilder(msgContext,
337: in, contentType, true);
338: envelope = (SOAPEnvelope) builder.getDocumentElement();
339: } else {
340: String charSetEnc = BuilderUtil
341: .getCharSetEncoding(contentType);
342: builder = BuilderUtil.getSOAPBuilder(in, charSetEnc);
343:
344: // Set the encoding scheme in the message context
345: msgContext.setProperty(
346: Constants.Configuration.CHARACTER_SET_ENCODING,
347: charSetEnc);
348: envelope = (SOAPEnvelope) builder.getDocumentElement();
349: }
350: }
351:
352: // handle pure plain vanilla POX and binary content (non SOAP)
353: if (builder == null) {
354: SOAPFactory soapFactory = new SOAP11Factory();
355: try {
356: XMLStreamReader xmlreader = StAXUtils
357: .createXMLStreamReader(
358: in,
359: MessageContext.DEFAULT_CHAR_SET_ENCODING);
360:
361: // Set the encoding scheme in the message context
362: msgContext.setProperty(
363: Constants.Configuration.CHARACTER_SET_ENCODING,
364: MessageContext.DEFAULT_CHAR_SET_ENCODING);
365: builder = new StAXOMBuilder(xmlreader);
366: builder.setOMBuilderFactory(soapFactory);
367:
368: String ns = builder.getDocumentElement().getNamespace()
369: .getNamespaceURI();
370: if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI
371: .equals(ns)) {
372: envelope = getEnvelope(in,
373: SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI);
374: } else if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI
375: .equals(ns)) {
376: envelope = getEnvelope(in,
377: SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI);
378: } else {
379: // this is POX ... mark MC as REST
380: msgContext.setDoingREST(true);
381: envelope = soapFactory.getDefaultEnvelope();
382: envelope.getBody().addChild(
383: builder.getDocumentElement());
384: }
385: } catch (Exception e) {
386: log.debug("Non SOAP/XML JMS message received");
387:
388: Parameter operationParam = msgContext.getAxisService()
389: .getParameter(JMSConstants.OPERATION_PARAM);
390: QName operationQName = (operationParam != null ? getQName(operationParam
391: .getValue())
392: : JMSConstants.DEFAULT_OPERATION);
393:
394: AxisOperation operation = msgContext.getAxisService()
395: .getOperation(operationQName);
396: if (operation != null) {
397: msgContext.setAxisOperation(operation);
398: } else {
399: handleException("Cannot find operation : "
400: + operationQName + " on the service "
401: + msgContext.getAxisService());
402: }
403:
404: Parameter wrapperParam = msgContext.getAxisService()
405: .getParameter(JMSConstants.WRAPPER_PARAM);
406: QName wrapperQName = (wrapperParam != null ? getQName(wrapperParam
407: .getValue())
408: : JMSConstants.DEFAULT_WRAPPER);
409:
410: OMElement wrapper = soapFactory.createOMElement(
411: wrapperQName, null);
412:
413: try {
414: if (message instanceof TextMessage) {
415: OMTextImpl textData = (OMTextImpl) soapFactory
416: .createOMText(((TextMessage) message)
417: .getText());
418: wrapper.addChild(textData);
419: } else if (message instanceof BytesMessage) {
420: BytesMessage bm = (BytesMessage) message;
421: byte[] msgBytes = new byte[(int) bm
422: .getBodyLength()];
423: bm.reset();
424: bm.readBytes(msgBytes);
425: DataHandler dataHandler = new DataHandler(
426: new ByteArrayDataSource(msgBytes));
427: OMText textData = soapFactory.createOMText(
428: dataHandler, true);
429: wrapper.addChild(textData);
430: msgContext.setDoingMTOM(true);
431: } else {
432: handleException("Unsupported JMS Message format : "
433: + message.getJMSType());
434: }
435: envelope = soapFactory.getDefaultEnvelope();
436: envelope.getBody().addChild(wrapper);
437:
438: } catch (JMSException j) {
439: handleException(
440: "Error wrapping JMS message into a SOAP envelope ",
441: j);
442: }
443: }
444: }
445:
446: String charEncOfMessage = builder == null ? null : builder
447: .getDocument() == null ? null : builder.getDocument()
448: .getCharsetEncoding();
449: String charEncOfTransport = ((String) msgContext
450: .getProperty(Constants.Configuration.CHARACTER_SET_ENCODING));
451:
452: if (charEncOfMessage != null
453: && !(charEncOfMessage.trim().length() == 0)
454: && !charEncOfMessage
455: .equalsIgnoreCase(charEncOfTransport)) {
456:
457: handleException("Character Set Encoding from transport information do not "
458: + "match with character set encoding in the received "
459: + "SOAP message");
460: }
461: return envelope;
462: }
463:
464: private static SOAPEnvelope getEnvelope(InputStream in,
465: String namespace) throws XMLStreamException {
466:
467: try {
468: in.reset();
469: } catch (IOException e) {
470: throw new XMLStreamException(
471: "Error resetting message input stream", e);
472: }
473: XMLStreamReader xmlreader = StAXUtils.createXMLStreamReader(in,
474: MessageContext.DEFAULT_CHAR_SET_ENCODING);
475: StAXBuilder builder = new StAXSOAPModelBuilder(xmlreader,
476: namespace);
477: return (SOAPEnvelope) builder.getDocumentElement();
478: }
479:
480: private static QName getQName(Object obj) {
481: String value;
482: if (obj instanceof QName) {
483: return (QName) obj;
484: } else {
485: value = obj.toString();
486: }
487: int open = value.indexOf('{');
488: int close = value.indexOf('}');
489: if (close > open && open > -1 && value.length() > close) {
490: return new QName(value.substring(open + 1, close - open),
491: value.substring(close + 1));
492: } else {
493: return new QName(value);
494: }
495: }
496:
497: private static void handleException(String s) {
498: log.error(s);
499: throw new AxisJMSException(s);
500: }
501:
502: private static void handleException(String s, Exception e) {
503: log.error(s, e);
504: throw new AxisJMSException(s, e);
505: }
506: }
|