001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.axis2.transport.jms;
020:
021: import org.apache.axiom.om.OMElement;
022: import org.apache.axiom.om.OMOutputFormat;
023: import org.apache.axis2.AxisFault;
024: import org.apache.axis2.Constants;
025: import org.apache.axis2.context.ConfigurationContext;
026: import org.apache.axis2.context.MessageContext;
027: import org.apache.axis2.description.TransportOutDescription;
028: import org.apache.axis2.description.WSDL2Constants;
029: import org.apache.axis2.description.Parameter;
030: import org.apache.axis2.handlers.AbstractHandler;
031: import org.apache.axis2.transport.TransportSender;
032: import org.apache.commons.logging.Log;
033: import org.apache.commons.logging.LogFactory;
034:
035: import javax.jms.*;
036: import javax.xml.stream.XMLStreamException;
037: import javax.naming.InitialContext;
038: import javax.naming.NamingException;
039: import javax.naming.Context;
040: import javax.naming.NameNotFoundException;
041: import java.io.ByteArrayOutputStream;
042: import java.io.IOException;
043: import java.util.Hashtable;
044:
045: /**
046: * The TransportSender for JMS
047: */
048: public class JMSSender extends AbstractHandler implements
049: TransportSender {
050:
051: private static final Log log = LogFactory.getLog(JMSSender.class);
052:
053: /**
054: * Performs the actual sending of the JMS message
055: *
056: * @param msgContext the message context to be sent
057: * @throws AxisFault on exception
058: */
059: public InvocationResponse invoke(MessageContext msgContext)
060: throws AxisFault {
061:
062: log.debug("JMSSender invoke()");
063:
064: JMSOutTransportInfo transportInfo = null;
065: String targetAddress = null;
066:
067: // is there a transport url? which may be different from the WS-A To..
068: targetAddress = (String) msgContext
069: .getProperty(Constants.Configuration.TRANSPORT_URL);
070:
071: if (targetAddress != null) {
072: transportInfo = new JMSOutTransportInfo(targetAddress);
073: } else if (targetAddress == null && msgContext.getTo() != null
074: && !msgContext.getTo().hasAnonymousAddress()) {
075: targetAddress = msgContext.getTo().getAddress();
076:
077: if (!msgContext.getTo().hasNoneAddress()) {
078: transportInfo = new JMSOutTransportInfo(targetAddress);
079: } else {
080: //Don't send the message.
081: return InvocationResponse.CONTINUE;
082: }
083: } else if (msgContext.isServerSide()) {
084: // get the jms ReplyTo
085: transportInfo = (JMSOutTransportInfo) msgContext
086: .getProperty(Constants.OUT_TRANSPORT_INFO);
087: }
088:
089: // get the ConnectionFactory to be used for the send
090: ConnectionFactory connectionFac = transportInfo
091: .getConnectionFactory();
092:
093: Connection con = null;
094: try {
095: String user = transportInfo.getConnectionFactoryUser();
096: String password = transportInfo
097: .getConnectionFactoryPassword();
098:
099: if ((user == null) || (password == null)) {
100: // Use the OS username and credentials
101: con = connectionFac.createConnection();
102: } else {
103: // use an explicit username and password
104: con = connectionFac.createConnection(user, password);
105: }
106:
107: Session session = con.createSession(false,
108: Session.AUTO_ACKNOWLEDGE);
109: Message message = createJMSMessage(msgContext, session);
110:
111: // get the JMS destination for the message being sent
112: Destination dest = transportInfo.getDestination();
113:
114: if (dest == null) {
115: if (targetAddress != null) {
116:
117: // if it does not exist, create it
118: String name = JMSUtils
119: .getDestination(targetAddress);
120: if (log.isDebugEnabled()) {
121: log.debug("Creating JMS Destination : " + name);
122: }
123:
124: try {
125: dest = session.createQueue(name);
126: } catch (JMSException e) {
127: handleException(
128: "Error creating destination Queue : "
129: + name, e);
130: }
131: } else {
132: handleException("Cannot send reply to unknown JMS Destination");
133: }
134: }
135:
136: MessageProducer producer = session.createProducer(dest);
137: Destination replyDest = null;
138:
139: boolean waitForResponse = msgContext.getOperationContext() != null
140: && WSDL2Constants.MEP_URI_OUT_IN.equals(msgContext
141: .getOperationContext().getAxisOperation()
142: .getMessageExchangePattern());
143:
144: if (waitForResponse) {
145: String replyToJNDIName = (String) msgContext
146: .getProperty(JMSConstants.REPLY_PARAM);
147: if (replyToJNDIName != null
148: && replyToJNDIName.length() > 0) {
149: Context context = null;
150: Hashtable props = JMSUtils
151: .getProperties(targetAddress);
152: try {
153: context = new InitialContext(props);
154: } catch (NamingException e) {
155: handleException(
156: "Could not get the initial context", e);
157: }
158:
159: try {
160: replyDest = (Destination) context
161: .lookup(replyToJNDIName);
162:
163: } catch (NameNotFoundException e) {
164: log
165: .warn("Cannot get or lookup JMS response destination : "
166: + replyToJNDIName
167: + " : "
168: + e.getMessage()
169: + ". Attempting to create a Queue named : "
170: + replyToJNDIName);
171: replyDest = session
172: .createQueue(replyToJNDIName);
173:
174: } catch (NamingException e) {
175: handleException(
176: "Cannot get JMS response destination : "
177: + replyToJNDIName + " : ", e);
178: }
179:
180: } else {
181: try {
182: // create temporary queue to receive reply
183: replyDest = session.createTemporaryQueue();
184: } catch (JMSException e) {
185: handleException("Error creating temporary queue for response");
186: }
187: }
188: message.setJMSReplyTo(replyDest);
189: if (log.isDebugEnabled()) {
190: log
191: .debug("Expecting a response to JMS Destination : "
192: + (replyDest instanceof Queue ? ((Queue) replyDest)
193: .getQueueName()
194: : ((Topic) replyDest)
195: .getTopicName()));
196: }
197: }
198:
199: try {
200: log.debug("["
201: + (msgContext.isServerSide() ? "Server"
202: : "Client")
203: + "]Sending message to destination : " + dest);
204: producer.send(message);
205: producer.close();
206:
207: } catch (JMSException e) {
208: handleException(
209: "Error sending JMS message to destination : "
210: + dest.toString(), e);
211: }
212:
213: if (waitForResponse) {
214: try {
215: // wait for reply
216: MessageConsumer consumer = session
217: .createConsumer(replyDest);
218:
219: long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
220: Long waitReply = (Long) msgContext
221: .getProperty(JMSConstants.JMS_WAIT_REPLY);
222: if (waitReply != null) {
223: timeout = waitReply.longValue();
224: }
225:
226: log
227: .debug("Waiting for a maximum of "
228: + timeout
229: + "ms for a response message to destination : "
230: + replyDest);
231: con.start();
232: Message reply = consumer.receive(timeout);
233:
234: if (reply != null) {
235: msgContext.setProperty(
236: MessageContext.TRANSPORT_IN, JMSUtils
237: .getInputStream(reply));
238: } else {
239: log
240: .warn("Did not receive a JMS response within "
241: + timeout
242: + " ms to destination : "
243: + dest);
244: }
245:
246: } catch (JMSException e) {
247: handleException(
248: "Error reading response from temporary "
249: + "queue : " + replyDest, e);
250: }
251: }
252: } catch (JMSException e) {
253: handleException(
254: "Error preparing to send message to destination", e);
255:
256: } finally {
257: if (con != null) {
258: try {
259: con.close(); // closes all sessions, producers, temp Q's etc
260: } catch (JMSException e) {
261: } // ignore
262: }
263: }
264: return InvocationResponse.CONTINUE;
265: }
266:
267: public void cleanup(MessageContext msgContext) throws AxisFault {
268: // do nothing
269: }
270:
271: public void init(ConfigurationContext confContext,
272: TransportOutDescription transportOut) throws AxisFault {
273: // do nothing
274: }
275:
276: public void stop() {
277: // do nothing
278: }
279:
280: /**
281: * Create a JMS Message from the given MessageContext and using the given
282: * session
283: *
284: * @param msgContext the MessageContext
285: * @param session the JMS session
286: * @return a JMS message from the context and session
287: * @throws JMSException on exception
288: */
289: private Message createJMSMessage(MessageContext msgContext,
290: Session session) throws JMSException {
291:
292: Message message = null;
293: String msgType = getProperty(msgContext,
294: JMSConstants.JMS_MESSAGE_TYPE);
295:
296: OMElement msgElement = msgContext.getEnvelope();
297: if (msgContext.isDoingREST()) {
298: msgElement = msgContext.getEnvelope().getBody()
299: .getFirstElement();
300: }
301:
302: if (msgType != null
303: && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType)) {
304:
305: message = session.createBytesMessage();
306: BytesMessage bytesMsg = (BytesMessage) message;
307: ByteArrayOutputStream baos = new ByteArrayOutputStream();
308: OMOutputFormat format = new OMOutputFormat();
309: format.setCharSetEncoding(getProperty(msgContext,
310: Constants.Configuration.CHARACTER_SET_ENCODING));
311: format.setDoOptimize(msgContext.isDoingMTOM());
312: try {
313: msgElement.serializeAndConsume(baos, format);
314: baos.flush();
315: } catch (XMLStreamException e) {
316: handleException(
317: "XML serialization error creating BytesMessage",
318: e);
319: } catch (IOException e) {
320: handleException("IO Error while creating BytesMessage",
321: e);
322: }
323: bytesMsg.writeBytes(baos.toByteArray());
324:
325: } else {
326: message = session.createTextMessage(); // default
327: TextMessage txtMsg = (TextMessage) message;
328: txtMsg.setText(msgElement.toString());
329: }
330:
331: // set the JMS correlation ID if specified
332: String correlationId = getProperty(msgContext,
333: JMSConstants.JMS_COORELATION_ID);
334: if (correlationId == null && msgContext.getRelatesTo() != null) {
335: correlationId = msgContext.getRelatesTo().getValue();
336: }
337:
338: if (correlationId != null) {
339: message.setJMSCorrelationID(correlationId);
340: }
341:
342: if (msgContext.isServerSide()) {
343: // set SOAP Action and context type as properties on the JMS message
344: setProperty(message, msgContext, JMSConstants.SOAPACTION);
345: setProperty(message, msgContext, JMSConstants.CONTENT_TYPE);
346: } else {
347: String action = msgContext.getOptions().getAction();
348: if (action != null) {
349: message.setStringProperty(JMSConstants.SOAPACTION,
350: action);
351: }
352: }
353:
354: return message;
355: }
356:
357: private void setProperty(Message message, MessageContext msgCtx,
358: String key) {
359:
360: String value = getProperty(msgCtx, key);
361: if (value != null) {
362: try {
363: message.setStringProperty(key, value);
364: } catch (JMSException e) {
365: log.warn("Couldn't set message property : " + key
366: + " = " + value, e);
367: }
368: }
369: }
370:
371: private String getProperty(MessageContext mc, String key) {
372: return (String) mc.getProperty(key);
373: }
374:
375: private static void handleException(String s) {
376: log.error(s);
377: throw new AxisJMSException(s);
378: }
379:
380: private static void handleException(String s, Exception e) {
381: log.error(s, e);
382: throw new AxisJMSException(s, e);
383: }
384:
385: }
|