001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.jms.endpoints;
018:
019: import java.util.Map;
020:
021: import javax.jbi.JBIException;
022: import javax.jbi.messaging.ExchangeStatus;
023: import javax.jbi.messaging.InOnly;
024: import javax.jbi.messaging.MessageExchange;
025: import javax.jbi.servicedesc.ServiceEndpoint;
026: import javax.jms.ConnectionFactory;
027: import javax.jms.Destination;
028: import javax.jms.JMSException;
029: import javax.jms.Message;
030: import javax.jms.MessageProducer;
031: import javax.jms.Session;
032: import javax.xml.namespace.QName;
033:
034: import org.apache.servicemix.common.DefaultComponent;
035: import org.apache.servicemix.common.ServiceUnit;
036: import org.apache.servicemix.common.endpoints.ConsumerEndpoint;
037: import org.apache.servicemix.jms.endpoints.JmsConsumerMarshaler.JmsContext;
038: import org.apache.servicemix.store.Store;
039: import org.apache.servicemix.store.StoreFactory;
040: import org.apache.servicemix.store.memory.MemoryStoreFactory;
041: import org.springframework.jms.core.JmsTemplate;
042: import org.springframework.jms.core.SessionCallback;
043: import org.springframework.jms.listener.adapter.ListenerExecutionFailedException;
044: import org.springframework.jms.support.JmsUtils;
045: import org.springframework.jms.support.destination.DestinationResolver;
046: import org.springframework.jms.support.destination.DynamicDestinationResolver;
047:
048: public abstract class AbstractConsumerEndpoint extends ConsumerEndpoint {
049:
050: protected static final String PROP_JMS_CONTEXT = JmsContext.class
051: .getName();
052:
053: private JmsConsumerMarshaler marshaler = new DefaultConsumerMarshaler();
054: private boolean synchronous = true;
055: private DestinationChooser destinationChooser;
056: private DestinationResolver destinationResolver = new DynamicDestinationResolver();
057: private boolean pubSubDomain;
058: private ConnectionFactory connectionFactory;
059: private JmsTemplate template;
060:
061: // Reply properties
062: private Boolean useMessageIdInResponse;
063: private Destination replyDestination;
064: private String replyDestinationName;
065: private boolean replyExplicitQosEnabled;
066: private int replyDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
067: private int replyPriority = Message.DEFAULT_PRIORITY;
068: private long replyTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
069: private Map<String, Object> replyProperties;
070:
071: private boolean stateless;
072: private StoreFactory storeFactory;
073: private Store store;
074:
075: public AbstractConsumerEndpoint() {
076: super ();
077: }
078:
079: public AbstractConsumerEndpoint(DefaultComponent component,
080: ServiceEndpoint endpoint) {
081: super (component, endpoint);
082: }
083:
084: public AbstractConsumerEndpoint(ServiceUnit serviceUnit,
085: QName service, String endpoint) {
086: super (serviceUnit, service, endpoint);
087: }
088:
089: /**
090: * @return the destinationChooser
091: */
092: public DestinationChooser getDestinationChooser() {
093: return destinationChooser;
094: }
095:
096: /**
097: * @param destinationChooser the destinationChooser to set
098: */
099: public void setDestinationChooser(
100: DestinationChooser destinationChooser) {
101: this .destinationChooser = destinationChooser;
102: }
103:
104: /**
105: * @return the replyDeliveryMode
106: */
107: public int getReplyDeliveryMode() {
108: return replyDeliveryMode;
109: }
110:
111: /**
112: * @param replyDeliveryMode the replyDeliveryMode to set
113: */
114: public void setReplyDeliveryMode(int replyDeliveryMode) {
115: this .replyDeliveryMode = replyDeliveryMode;
116: }
117:
118: /**
119: * @return the replyDestination
120: */
121: public Destination getReplyDestination() {
122: return replyDestination;
123: }
124:
125: /**
126: * @param replyDestination the replyDestination to set
127: */
128: public void setReplyDestination(Destination replyDestination) {
129: this .replyDestination = replyDestination;
130: }
131:
132: /**
133: * @return the replyDestinationName
134: */
135: public String getReplyDestinationName() {
136: return replyDestinationName;
137: }
138:
139: /**
140: * @param replyDestinationName the replyDestinationName to set
141: */
142: public void setReplyDestinationName(String replyDestinationName) {
143: this .replyDestinationName = replyDestinationName;
144: }
145:
146: /**
147: * @return the replyExplicitQosEnabled
148: */
149: public boolean isReplyExplicitQosEnabled() {
150: return replyExplicitQosEnabled;
151: }
152:
153: /**
154: * @param replyExplicitQosEnabled the replyExplicitQosEnabled to set
155: */
156: public void setReplyExplicitQosEnabled(
157: boolean replyExplicitQosEnabled) {
158: this .replyExplicitQosEnabled = replyExplicitQosEnabled;
159: }
160:
161: /**
162: * @return the replyPriority
163: */
164: public int getReplyPriority() {
165: return replyPriority;
166: }
167:
168: /**
169: * @param replyPriority the replyPriority to set
170: */
171: public void setReplyPriority(int replyPriority) {
172: this .replyPriority = replyPriority;
173: }
174:
175: /**
176: * @return the replyProperties
177: */
178: public Map<String, Object> getReplyProperties() {
179: return replyProperties;
180: }
181:
182: /**
183: * @param replyProperties the replyProperties to set
184: */
185: public void setReplyProperties(Map<String, Object> replyProperties) {
186: this .replyProperties = replyProperties;
187: }
188:
189: /**
190: * @return the replyTimeToLive
191: */
192: public long getReplyTimeToLive() {
193: return replyTimeToLive;
194: }
195:
196: /**
197: * @param replyTimeToLive the replyTimeToLive to set
198: */
199: public void setReplyTimeToLive(long replyTimeToLive) {
200: this .replyTimeToLive = replyTimeToLive;
201: }
202:
203: /**
204: * @return the useMessageIdInResponse
205: */
206: public Boolean getUseMessageIdInResponse() {
207: return useMessageIdInResponse;
208: }
209:
210: /**
211: * @param useMessageIdInResponse the useMessageIdInResponse to set
212: */
213: public void setUseMessageIdInResponse(Boolean useMessageIdInResponse) {
214: this .useMessageIdInResponse = useMessageIdInResponse;
215: }
216:
217: /**
218: * @return the connectionFactory
219: */
220: public ConnectionFactory getConnectionFactory() {
221: return connectionFactory;
222: }
223:
224: /**
225: * @param connectionFactory the connectionFactory to set
226: */
227: public void setConnectionFactory(ConnectionFactory connectionFactory) {
228: this .connectionFactory = connectionFactory;
229: }
230:
231: /**
232: * @return the pubSubDomain
233: */
234: public boolean isPubSubDomain() {
235: return pubSubDomain;
236: }
237:
238: /**
239: * @param pubSubDomain the pubSubDomain to set
240: */
241: public void setPubSubDomain(boolean pubSubDomain) {
242: this .pubSubDomain = pubSubDomain;
243: }
244:
245: /**
246: * @return the destinationResolver
247: */
248: public DestinationResolver getDestinationResolver() {
249: return destinationResolver;
250: }
251:
252: /**
253: * @param destinationResolver the destinationResolver to set
254: */
255: public void setDestinationResolver(
256: DestinationResolver destinationResolver) {
257: this .destinationResolver = destinationResolver;
258: }
259:
260: /**
261: * @return the marshaler
262: */
263: public JmsConsumerMarshaler getMarshaler() {
264: return marshaler;
265: }
266:
267: /**
268: * @param marshaler the marshaler to set
269: */
270: public void setMarshaler(JmsConsumerMarshaler marshaler) {
271: this .marshaler = marshaler;
272: }
273:
274: /**
275: * @return the synchronous
276: */
277: public boolean isSynchronous() {
278: return synchronous;
279: }
280:
281: /**
282: * @param synchronous the synchronous to set
283: */
284: public void setSynchronous(boolean synchronous) {
285: this .synchronous = synchronous;
286: }
287:
288: public boolean isStateless() {
289: return stateless;
290: }
291:
292: public void setStateless(boolean stateless) {
293: this .stateless = stateless;
294: }
295:
296: public Store getStore() {
297: return store;
298: }
299:
300: public void setStore(Store store) {
301: this .store = store;
302: }
303:
304: public StoreFactory getStoreFactory() {
305: return storeFactory;
306: }
307:
308: public void setStoreFactory(StoreFactory storeFactory) {
309: this .storeFactory = storeFactory;
310: }
311:
312: public String getLocationURI() {
313: // TODO: Need to return a real URI
314: return getService() + "#" + getEndpoint();
315: }
316:
317: public synchronized void start() throws Exception {
318: super .start();
319: if (template == null) {
320: template = new JmsTemplate(getConnectionFactory());
321: }
322: if (store == null && !stateless) {
323: if (storeFactory == null) {
324: storeFactory = new MemoryStoreFactory();
325: }
326: store = storeFactory.open(getService().toString()
327: + getEndpoint());
328: }
329: }
330:
331: public synchronized void stop() throws Exception {
332: if (store != null) {
333: if (storeFactory != null) {
334: storeFactory.close(store);
335: }
336: store = null;
337: }
338: super .stop();
339: }
340:
341: public void process(MessageExchange exchange) throws Exception {
342: JmsContext context;
343: if (stateless) {
344: context = (JmsContext) exchange
345: .getProperty(PROP_JMS_CONTEXT);
346: } else {
347: context = (JmsContext) store.load(exchange.getExchangeId());
348: }
349: processExchange(exchange, null, context);
350: }
351:
352: protected void processExchange(final MessageExchange exchange,
353: final Session session, final JmsContext context)
354: throws Exception {
355: // Ignore DONE exchanges
356: if (exchange.getStatus() == ExchangeStatus.DONE) {
357: return;
358: }
359: // Create session if needed
360: if (session == null) {
361: template.execute(new SessionCallback() {
362: public Object doInJms(Session session)
363: throws JMSException {
364: try {
365: processExchange(exchange, session, context);
366: } catch (Exception e) {
367: throw new ListenerExecutionFailedException(
368: "Exchange processing failed", e);
369: }
370: return null;
371: }
372: });
373: return;
374: }
375: // Handle exchanges
376: Message msg = null;
377: Destination dest = null;
378: if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
379: if (exchange.getFault() != null) {
380: msg = marshaler.createFault(exchange, exchange
381: .getFault(), session, context);
382: dest = getReplyDestination(exchange, exchange
383: .getFault(), session, context);
384: } else if (exchange.getMessage("out") != null) {
385: msg = marshaler.createOut(exchange, exchange
386: .getMessage("out"), session, context);
387: dest = getReplyDestination(exchange, exchange
388: .getMessage("out"), session, context);
389: }
390: if (msg == null) {
391: throw new IllegalStateException(
392: "Unable to send back answer or fault");
393: }
394: setCorrelationId(context.getMessage(), msg);
395: try {
396: send(msg, session, dest);
397: done(exchange);
398: } catch (Exception e) {
399: fail(exchange, e);
400: throw e;
401: }
402: } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
403: Exception error = exchange.getError();
404: if (error == null) {
405: error = new JBIException(
406: "Exchange in ERROR state, but no exception provided");
407: }
408: msg = marshaler.createError(exchange, error, session,
409: context);
410: dest = getReplyDestination(exchange, error, session,
411: context);
412: setCorrelationId(context.getMessage(), msg);
413: send(msg, session, dest);
414: } else {
415: throw new IllegalStateException(
416: "Unrecognized exchange status");
417: }
418: }
419:
420: protected void send(Message msg, Session session, Destination dest)
421: throws JMSException {
422: MessageProducer producer = session.createProducer(dest);
423: try {
424: if (replyProperties != null) {
425: for (Map.Entry<String, Object> e : replyProperties
426: .entrySet()) {
427: msg.setObjectProperty(e.getKey(), e.getValue());
428: }
429: }
430: if (replyExplicitQosEnabled) {
431: producer.send(msg, replyDeliveryMode, replyPriority,
432: replyTimeToLive);
433: } else {
434: producer.send(msg);
435: }
436: } finally {
437: JmsUtils.closeMessageProducer(producer);
438: }
439: }
440:
441: protected void onMessage(Message jmsMessage, Session session)
442: throws JMSException {
443: if (logger.isTraceEnabled()) {
444: logger.trace("Received: " + jmsMessage);
445: }
446: try {
447: JmsContext context = marshaler.createContext(jmsMessage);
448: MessageExchange exchange = marshaler.createExchange(
449: context, getContext());
450: configureExchangeTarget(exchange);
451: if (synchronous) {
452: try {
453: sendSync(exchange);
454: } catch (Exception e) {
455: handleException(exchange, e, session, context);
456: }
457: if (exchange.getStatus() != ExchangeStatus.DONE) {
458: processExchange(exchange, session, context);
459: }
460: } else {
461: if (stateless) {
462: exchange.setProperty(PROP_JMS_CONTEXT, context);
463: } else {
464: store.store(exchange.getExchangeId(), context);
465: }
466: boolean success = false;
467: try {
468: send(exchange);
469: success = true;
470: } catch (Exception e) {
471: handleException(exchange, e, session, context);
472: } finally {
473: if (!success && !stateless) {
474: store.load(exchange.getExchangeId());
475: }
476: }
477: }
478: } catch (JMSException e) {
479: throw e;
480: } catch (Exception e) {
481: throw (JMSException) new JMSException(
482: "Error sending JBI exchange").initCause(e);
483: }
484: }
485:
486: protected Destination getReplyDestination(MessageExchange exchange,
487: Object message, Session session, JmsContext context)
488: throws JMSException {
489: // If a JMS ReplyTo property is set, use it
490: if (context.getMessage().getJMSReplyTo() != null) {
491: return context.getMessage().getJMSReplyTo();
492: }
493: Object dest = null;
494: // Let the destinationChooser a chance to choose the destination
495: if (destinationChooser != null) {
496: dest = destinationChooser.chooseDestination(exchange,
497: message);
498: }
499: // Default to replyDestination / replyDestinationName properties
500: if (dest == null) {
501: dest = replyDestination;
502: }
503: if (dest == null) {
504: dest = replyDestinationName;
505: }
506: // Resolve destination if needed
507: if (dest instanceof Destination) {
508: return (Destination) dest;
509: } else if (dest instanceof String) {
510: return destinationResolver.resolveDestinationName(session,
511: (String) dest, isPubSubDomain());
512: }
513: throw new IllegalStateException(
514: "Unable to choose destination for exchange " + exchange);
515: }
516:
517: protected void setCorrelationId(Message query, Message reply)
518: throws Exception {
519: if (useMessageIdInResponse == null) {
520: if (query.getJMSCorrelationID() != null) {
521: reply.setJMSCorrelationID(query.getJMSCorrelationID());
522: } else if (query.getJMSMessageID() != null) {
523: reply.setJMSCorrelationID(query.getJMSMessageID());
524: } else {
525: throw new IllegalStateException(
526: "No JMSCorrelationID or JMSMessageID set on query message");
527: }
528: } else if (useMessageIdInResponse.booleanValue()) {
529: if (query.getJMSMessageID() != null) {
530: reply.setJMSCorrelationID(query.getJMSMessageID());
531: } else {
532: throw new IllegalStateException(
533: "No JMSMessageID set on query message");
534: }
535: } else {
536: if (query.getJMSCorrelationID() != null) {
537: reply.setJMSCorrelationID(query.getJMSCorrelationID());
538: } else {
539: throw new IllegalStateException(
540: "No JMSCorrelationID set on query message");
541: }
542: }
543: }
544:
545: protected void handleException(MessageExchange exchange,
546: Exception error, Session session, JmsContext context)
547: throws Exception {
548: // For InOnly, the consumer does not expect any response back, so
549: // just rethrow it and let the fault behavior
550: if (exchange instanceof InOnly) {
551: throw error;
552: }
553: // Check if the exception should lead to an error back
554: if (treatExceptionAsFault(error)) {
555: sendError(exchange, error, session, context);
556: } else {
557: throw error;
558: }
559: }
560:
561: protected boolean treatExceptionAsFault(Exception error) {
562: return error instanceof SecurityException;
563: }
564:
565: protected void sendError(final MessageExchange exchange,
566: final Exception error, Session session,
567: final JmsContext context) throws Exception {
568: // Create session if needed
569: if (session == null) {
570: template.execute(new SessionCallback() {
571: public Object doInJms(Session session)
572: throws JMSException {
573: try {
574: sendError(exchange, error, session, context);
575: } catch (Exception e) {
576: throw new ListenerExecutionFailedException(
577: "Exchange processing failed", e);
578: }
579: return null;
580: }
581: });
582: return;
583: }
584: Message msg = marshaler.createError(exchange, error, session,
585: context);
586: Destination dest = getReplyDestination(exchange, error,
587: session, context);
588: setCorrelationId(context.getMessage(), msg);
589: send(msg, session, dest);
590: }
591:
592: }
|