001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.transport.jms;
019:
020: import java.io.ByteArrayInputStream;
021: import java.io.ByteArrayOutputStream;
022: import java.io.IOException;
023: import java.io.InputStream;
024: import java.io.OutputStream;
025: import java.util.Calendar;
026: import java.util.GregorianCalendar;
027: import java.util.SimpleTimeZone;
028: import java.util.TimeZone;
029: import java.util.concurrent.Executor;
030: import java.util.concurrent.RejectedExecutionException;
031: import java.util.logging.Level;
032: import java.util.logging.Logger;
033:
034: import javax.jms.JMSException;
035: import javax.jms.Queue;
036: import javax.jms.QueueSender;
037: import javax.jms.TextMessage;
038: import javax.naming.NamingException;
039:
040: import org.apache.cxf.Bus;
041: import org.apache.cxf.common.logging.LogUtils;
042: import org.apache.cxf.configuration.Configurable;
043: import org.apache.cxf.configuration.Configurer;
044: import org.apache.cxf.io.CachedOutputStream;
045: import org.apache.cxf.message.Message;
046: import org.apache.cxf.message.MessageImpl;
047: import org.apache.cxf.service.model.EndpointInfo;
048: import org.apache.cxf.transport.AbstractConduit;
049: import org.apache.cxf.transport.AbstractMultiplexDestination;
050: import org.apache.cxf.transport.Conduit;
051: import org.apache.cxf.transport.ConduitInitiator;
052: import org.apache.cxf.transport.MessageObserver;
053: import org.apache.cxf.workqueue.WorkQueueManager;
054: import org.apache.cxf.ws.addressing.EndpointReferenceType;
055: import org.apache.cxf.wsdl.EndpointReferenceUtils;
056:
057: public class JMSDestination extends AbstractMultiplexDestination
058: implements Configurable, JMSTransport {
059:
060: protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-destination-base";
061:
062: private static final Logger LOG = LogUtils
063: .getL7dLogger(JMSDestination.class);
064:
065: protected ServerConfig serverConfig;
066: protected ServerBehaviorPolicyType runtimePolicy;
067: protected AddressType address;
068: protected SessionPoolType sessionPool;
069:
070: final ConduitInitiator conduitInitiator;
071: final JMSTransportBase base;
072:
073: PooledSession listenerSession;
074: JMSListenerThread listenerThread;
075:
076: public JMSDestination(Bus b, ConduitInitiator ci, EndpointInfo info)
077: throws IOException {
078: super (b, getTargetReference(info, b), info);
079:
080: base = new JMSTransportBase(b, endpointInfo, true,
081: BASE_BEAN_NAME_SUFFIX, this );
082:
083: conduitInitiator = ci;
084:
085: initConfig();
086: }
087:
088: protected Logger getLogger() {
089: return LOG;
090: }
091:
092: /**
093: * @param inMessage the incoming message
094: * @return the inbuilt backchannel
095: */
096: protected Conduit getInbuiltBackChannel(Message inMessage) {
097: return new BackChannelConduit(EndpointReferenceUtils
098: .getAnonymousEndpointReference(), inMessage);
099: }
100:
101: public void activate() {
102: getLogger().log(Level.INFO,
103: "JMSServerTransport activate().... ");
104:
105: try {
106: getLogger().log(Level.FINE, "establishing JMS connection");
107: JMSProviderHub.connect(this , serverConfig, runtimePolicy);
108: //Get a non-pooled session.
109: listenerSession = base.sessionFactory
110: .get(base.targetDestination);
111: listenerThread = new JMSListenerThread(listenerSession);
112: listenerThread.start();
113: } catch (JMSException ex) {
114: getLogger().log(Level.SEVERE,
115: "JMS connect failed with JMSException : ", ex);
116: } catch (NamingException nex) {
117: getLogger().log(Level.SEVERE,
118: "JMS connect failed with NamingException : ", nex);
119: }
120: }
121:
122: public void deactivate() {
123: try {
124: listenerSession.consumer().close();
125: if (listenerThread != null) {
126: listenerThread.join();
127: }
128: base.sessionFactory.shutdown();
129: } catch (InterruptedException e) {
130: //Do nothing here
131: } catch (JMSException ex) {
132: //Do nothing here
133: }
134: }
135:
136: public void shutdown() {
137: getLogger().log(Level.FINE, "JMSDestination shutdown()");
138: this .deactivate();
139: }
140:
141: public Queue getReplyToDestination(Message inMessage)
142: throws JMSException, NamingException {
143: Queue replyTo;
144: javax.jms.Message message = (javax.jms.Message) inMessage
145: .get(JMSConstants.JMS_REQUEST_MESSAGE);
146: // If WS-Addressing had set the replyTo header.
147: if (inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO) != null) {
148: replyTo = base.sessionFactory
149: .getQueueFromInitialContext((String) inMessage
150: .get(JMSConstants.JMS_REBASED_REPLY_TO));
151: } else {
152: replyTo = (null != message.getJMSReplyTo()) ? (Queue) message
153: .getJMSReplyTo()
154: : (Queue) base.replyDestination;
155: }
156: return replyTo;
157: }
158:
159: public void setReplyCorrelationID(javax.jms.Message request,
160: javax.jms.Message reply) throws JMSException {
161:
162: String correlationID = request.getJMSCorrelationID();
163:
164: if (correlationID == null || "".equals(correlationID)
165: && getRuntimePolicy().isUseMessageIDAsCorrelationID()) {
166: correlationID = request.getJMSMessageID();
167: }
168:
169: if (correlationID != null && !"".equals(correlationID)) {
170: reply.setJMSCorrelationID(correlationID);
171: }
172: }
173:
174: protected void incoming(javax.jms.Message message)
175: throws IOException {
176: try {
177: getLogger().log(Level.FINE, "server received request: ",
178: message);
179:
180: String msgType = message instanceof TextMessage ? JMSConstants.TEXT_MESSAGE_TYPE
181: : JMSConstants.BINARY_MESSAGE_TYPE;
182: Object request = base.unmarshal(message, msgType);
183: getLogger().log(Level.FINE,
184: "The Request Message is [ " + request + "]");
185: byte[] bytes = null;
186:
187: if (JMSConstants.TEXT_MESSAGE_TYPE.equals(msgType)) {
188: String requestString = (String) request;
189: getLogger().log(Level.FINE,
190: "server received request: ", requestString);
191: bytes = requestString.getBytes();
192: } else {
193: bytes = (byte[]) request;
194: }
195:
196: // get the message to be interceptor
197: MessageImpl inMessage = new MessageImpl();
198: inMessage.setContent(InputStream.class,
199: new ByteArrayInputStream(bytes));
200: base.populateIncomingContext(message, inMessage,
201: JMSConstants.JMS_SERVER_REQUEST_HEADERS);
202: inMessage.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS,
203: new JMSMessageHeadersType());
204: inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
205:
206: inMessage.setDestination(this );
207:
208: //handle the incoming message
209: incomingObserver.onMessage(inMessage);
210:
211: } catch (JMSException jmsex) {
212: //TODO: need to revisit for which exception should we throw.
213: throw new IOException(jmsex.getMessage());
214: }
215: }
216:
217: public void connected(javax.jms.Destination target,
218: javax.jms.Destination reply, JMSSessionFactory factory) {
219: base.connected(target, reply, factory);
220: }
221:
222: public String getBeanName() {
223: return endpointInfo.getName().toString() + ".jms-destination";
224: }
225:
226: private void initConfig() {
227: this .runtimePolicy = endpointInfo.getTraversedExtensor(
228: new ServerBehaviorPolicyType(),
229: ServerBehaviorPolicyType.class);
230: this .serverConfig = endpointInfo.getTraversedExtensor(
231: new ServerConfig(), ServerConfig.class);
232: this .address = endpointInfo.getTraversedExtensor(
233: new AddressType(), AddressType.class);
234: this .sessionPool = endpointInfo.getTraversedExtensor(
235: new SessionPoolType(), SessionPoolType.class);
236:
237: Configurer configurer = base.bus.getExtension(Configurer.class);
238: if (null != configurer) {
239: configurer.configureBean(this );
240: }
241: }
242:
243: public AddressType getJMSAddress() {
244: return address;
245: }
246:
247: public void setJMSAddress(AddressType a) {
248: this .address = a;
249: }
250:
251: public ServerBehaviorPolicyType getRuntimePolicy() {
252: return runtimePolicy;
253: }
254:
255: public void setRuntimePolicy(ServerBehaviorPolicyType runtimePolicy) {
256: this .runtimePolicy = runtimePolicy;
257: }
258:
259: public ServerConfig getServerConfig() {
260: return serverConfig;
261: }
262:
263: public void setServerConfig(ServerConfig serverConfig) {
264: this .serverConfig = serverConfig;
265: }
266:
267: public SessionPoolType getSessionPool() {
268: return sessionPool;
269: }
270:
271: public void setSessionPool(SessionPoolType sessionPool) {
272: this .sessionPool = sessionPool;
273: }
274:
275: protected class JMSListenerThread extends Thread {
276: private final PooledSession listenSession;
277:
278: public JMSListenerThread(PooledSession session) {
279: listenSession = session;
280: }
281:
282: public void run() {
283: try {
284: while (true) {
285: javax.jms.Message message = listenSession
286: .consumer().receive();
287: if (message == null) {
288: getLogger()
289: .log(
290: Level.WARNING,
291: "Null message received from message consumer.",
292: " Exiting ListenerThread::run().");
293: return;
294: }
295: while (message != null) {
296: //REVISIT to get the thread pool
297: //Executor executor = jmsDestination.callback.getExecutor();
298: Executor executor = null;
299: if (executor == null) {
300: WorkQueueManager wqm = base.bus
301: .getExtension(WorkQueueManager.class);
302: if (null != wqm) {
303: executor = wqm.getAutomaticWorkQueue();
304: }
305: }
306: if (executor != null) {
307: try {
308: executor.execute(new JMSExecutor(
309: message));
310: message = null;
311: } catch (RejectedExecutionException ree) {
312: //FIXME - no room left on workqueue, what to do
313: //for now, loop until it WILL fit on the queue,
314: //although we could just dispatch on this thread.
315: }
316: } else {
317: getLogger()
318: .log(Level.INFO,
319: "handle the incoming message in listener thread");
320: try {
321: incoming(message);
322: } catch (IOException ex) {
323: getLogger()
324: .log(
325: Level.WARNING,
326: "Failed to process incoming message : ",
327: ex);
328: }
329: }
330: message = null;
331: }
332: }
333: } catch (JMSException jmsex) {
334: jmsex.printStackTrace();
335: getLogger().log(Level.SEVERE,
336: "Exiting ListenerThread::run(): ",
337: jmsex.getMessage());
338: } catch (Throwable jmsex) {
339: jmsex.printStackTrace();
340: getLogger().log(Level.SEVERE,
341: "Exiting ListenerThread::run(): ",
342: jmsex.getMessage());
343: }
344: }
345: }
346:
347: protected class JMSExecutor implements Runnable {
348: javax.jms.Message message;
349:
350: JMSExecutor(javax.jms.Message m) {
351: message = m;
352: }
353:
354: public void run() {
355: getLogger().log(Level.INFO,
356: "run the incoming message in the threadpool");
357: try {
358: incoming(message);
359: } catch (IOException ex) {
360: //TODO: Decide what to do if we receive the exception.
361: getLogger().log(Level.WARNING,
362: "Failed to process incoming message : ", ex);
363: }
364: }
365:
366: }
367:
368: // this should deal with the cxf message
369: protected class BackChannelConduit extends AbstractConduit {
370:
371: protected Message inMessage;
372:
373: BackChannelConduit(EndpointReferenceType ref, Message message) {
374: super (ref);
375: inMessage = message;
376: }
377:
378: /**
379: * Register a message observer for incoming messages.
380: *
381: * @param observer the observer to notify on receipt of incoming
382: */
383: public void setMessageObserver(MessageObserver observer) {
384: // shouldn't be called for a back channel conduit
385: }
386:
387: /**
388: * Send an outbound message, assumed to contain all the name-value
389: * mappings of the corresponding input message (if any).
390: *
391: * @param message the message to be sent.
392: */
393: public void prepare(Message message) throws IOException {
394: // setup the message to be send back
395: message.put(JMSConstants.JMS_REQUEST_MESSAGE, inMessage
396: .get(JMSConstants.JMS_REQUEST_MESSAGE));
397: message.setContent(OutputStream.class, new JMSOutputStream(
398: inMessage));
399: }
400:
401: protected Logger getLogger() {
402: return LOG;
403: }
404: }
405:
406: private class JMSOutputStream extends CachedOutputStream {
407:
408: private Message inMessage;
409: private javax.jms.Message reply;
410: private Queue replyTo;
411: private QueueSender sender;
412:
413: // setup the ByteArrayStream
414: public JMSOutputStream(Message m) {
415: super ();
416: inMessage = m;
417: }
418:
419: //to prepear the message and get the send out message
420: private void commitOutputMessage() throws IOException {
421:
422: JMSMessageHeadersType headers = (JMSMessageHeadersType) inMessage
423: .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
424: javax.jms.Message request = (javax.jms.Message) inMessage
425: .get(JMSConstants.JMS_REQUEST_MESSAGE);
426:
427: PooledSession replySession = null;
428:
429: if (base.isDestinationStyleQueue()) {
430: try {
431: //setup the reply message
432: replyTo = getReplyToDestination(inMessage);
433: replySession = base.sessionFactory.get(false);
434: sender = (QueueSender) replySession.producer();
435:
436: boolean textPayload = request instanceof TextMessage ? true
437: : false;
438: if (textPayload) {
439:
440: reply = base.marshal(currentStream.toString(),
441: replySession.session(), null,
442: JMSConstants.TEXT_MESSAGE_TYPE);
443: getLogger().log(
444: Level.FINE,
445: "The response message is ["
446: + currentStream.toString()
447: + "]");
448: } else {
449: reply = base.marshal(
450: ((ByteArrayOutputStream) currentStream)
451: .toByteArray(), replySession
452: .session(), null,
453: JMSConstants.BINARY_MESSAGE_TYPE);
454: getLogger()
455: .log(
456: Level.FINE,
457: "The response message is ["
458: + new String(
459: ((ByteArrayOutputStream) currentStream)
460: .toByteArray())
461: + "]");
462: }
463:
464: setReplyCorrelationID(request, reply);
465:
466: base.setMessageProperties(headers, reply);
467:
468: sendResponse();
469:
470: } catch (JMSException ex) {
471: getLogger().log(Level.WARNING,
472: "Failed in post dispatch ...", ex);
473: throw new IOException(ex.getMessage());
474: } catch (NamingException nex) {
475: getLogger().log(Level.WARNING,
476: "Failed in post dispatch ...", nex);
477: throw new IOException(nex.getMessage());
478: } finally {
479: // house-keeping
480: if (replySession != null) {
481: base.sessionFactory.recycle(replySession);
482: }
483: }
484: } else {
485: // we will never receive a non-oneway invocation in pub-sub
486: // domain from CXF client - however a mis-behaving pure JMS
487: // client could conceivably make suce an invocation, in which
488: // case we silently discard the reply
489: getLogger().log(Level.WARNING,
490: "discarding reply for non-oneway invocation ",
491: "with 'topic' destinationStyle");
492:
493: }
494:
495: getLogger().log(Level.FINE, "just server sending reply: ",
496: reply);
497: // Check the reply time limit Stream close will call for this
498:
499: }
500:
501: private void sendResponse() throws JMSException {
502: JMSMessageHeadersType headers = (JMSMessageHeadersType) inMessage
503: .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
504: javax.jms.Message request = (javax.jms.Message) inMessage
505: .get(JMSConstants.JMS_REQUEST_MESSAGE);
506:
507: int deliveryMode = base.getJMSDeliveryMode(headers);
508: int priority = base.getJMSPriority(headers);
509: long ttl = base.getTimeToLive(headers);
510:
511: if (ttl <= 0) {
512: ttl = getServerConfig().getMessageTimeToLive();
513: }
514:
515: long timeToLive = 0;
516: if (request.getJMSExpiration() > 0) {
517: TimeZone tz = new SimpleTimeZone(0, "GMT");
518: Calendar cal = new GregorianCalendar(tz);
519: timeToLive = request.getJMSExpiration()
520: - cal.getTimeInMillis();
521: }
522:
523: if (timeToLive >= 0) {
524: ttl = ttl > 0 ? ttl : timeToLive;
525: getLogger().log(Level.FINE, "send out the message!");
526: sender
527: .send(replyTo, reply, deliveryMode, priority,
528: ttl);
529: } else {
530: // the request message had dead
531: getLogger()
532: .log(Level.INFO,
533: "Message time to live is already expired skipping response.");
534: }
535: }
536:
537: @Override
538: protected void doFlush() throws IOException {
539: // TODO Auto-generated method stub
540:
541: }
542:
543: @Override
544: protected void doClose() throws IOException {
545:
546: commitOutputMessage();
547: }
548:
549: @Override
550: protected void onWrite() throws IOException {
551: // Do nothing here
552: }
553:
554: }
555:
556: }
|