001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.jms.endpoints;
018:
019: import javax.jbi.management.DeploymentException;
020: import javax.jbi.messaging.MessageExchange;
021: import javax.jbi.messaging.NormalizedMessage;
022: import javax.jms.ConnectionFactory;
023: import javax.jms.Destination;
024: import javax.jms.JMSException;
025: import javax.jms.Message;
026: import javax.jms.Session;
027:
028: import org.apache.servicemix.common.endpoints.ProviderEndpoint;
029: import org.apache.servicemix.jms.JmsEndpointType;
030: import org.apache.servicemix.store.Store;
031: import org.apache.servicemix.store.StoreFactory;
032: import org.apache.servicemix.store.memory.MemoryStoreFactory;
033: import org.springframework.jms.UncategorizedJmsException;
034: import org.springframework.jms.core.JmsTemplate;
035: import org.springframework.jms.core.JmsTemplate102;
036: import org.springframework.jms.core.MessageCreator;
037: import org.springframework.jms.core.SessionCallback;
038: import org.springframework.jms.support.destination.DestinationResolver;
039: import org.springframework.jms.support.destination.DynamicDestinationResolver;
040:
041: /**
042: *
043: * @author gnodet
044: * @org.apache.xbean.XBean element="provider"
045: * @since 3.2
046: */
047: public class JmsProviderEndpoint extends ProviderEndpoint implements
048: JmsEndpointType {
049:
050: private static final String MSG_SELECTOR_START = "JMSCorrelationID='";
051: private static final String MSG_SELECTOR_END = "'";
052:
053: private JmsProviderMarshaler marshaler = new DefaultProviderMarshaler();
054: private DestinationChooser destinationChooser = new SimpleDestinationChooser();
055: private JmsTemplate template;
056:
057: private boolean jms102;
058: private ConnectionFactory connectionFactory;
059: private boolean pubSubDomain;
060: private DestinationResolver destinationResolver = new DynamicDestinationResolver();
061: private Destination destination;
062: private String destinationName;
063: private boolean messageIdEnabled = true;
064: private boolean messageTimestampEnabled = true;
065: private boolean pubSubNoLocal;
066: private long receiveTimeout = JmsTemplate.DEFAULT_RECEIVE_TIMEOUT;
067: private boolean explicitQosEnabled;
068: private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
069: private int priority = Message.DEFAULT_PRIORITY;
070: private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
071:
072: private Destination replyDestination;
073: private String replyDestinationName;
074:
075: private boolean stateless;
076: private StoreFactory storeFactory;
077: private Store store;
078:
079: /**
080: * @return the destination
081: */
082: public Destination getDestination() {
083: return destination;
084: }
085:
086: /**
087: * @param destination the destination to set
088: */
089: public void setDestination(Destination destination) {
090: this .destination = destination;
091: }
092:
093: /**
094: * @return the destinationName
095: */
096: public String getDestinationName() {
097: return destinationName;
098: }
099:
100: /**
101: * @param destinationName the destinationName to set
102: */
103: public void setDestinationName(String destinationName) {
104: this .destinationName = destinationName;
105: }
106:
107: /**
108: * @return the jms102
109: */
110: public boolean isJms102() {
111: return jms102;
112: }
113:
114: /**
115: * @param jms102 the jms102 to set
116: */
117: public void setJms102(boolean jms102) {
118: this .jms102 = jms102;
119: }
120:
121: /**
122: * @return the connectionFactory
123: */
124: public ConnectionFactory getConnectionFactory() {
125: return connectionFactory;
126: }
127:
128: /**
129: * @param connectionFactory the connectionFactory to set
130: */
131: public void setConnectionFactory(ConnectionFactory connectionFactory) {
132: this .connectionFactory = connectionFactory;
133: }
134:
135: /**
136: * @return the deliveryMode
137: */
138: public int getDeliveryMode() {
139: return deliveryMode;
140: }
141:
142: /**
143: * @param deliveryMode the deliveryMode to set
144: */
145: public void setDeliveryMode(int deliveryMode) {
146: this .deliveryMode = deliveryMode;
147: }
148:
149: /**
150: * @return the destinationChooser
151: */
152: public DestinationChooser getDestinationChooser() {
153: return destinationChooser;
154: }
155:
156: /**
157: * @param destinationChooser the destinationChooser to set
158: */
159: public void setDestinationChooser(
160: DestinationChooser destinationChooser) {
161: if (destinationChooser == null) {
162: throw new NullPointerException("destinationChooser is null");
163: }
164: this .destinationChooser = destinationChooser;
165: }
166:
167: /**
168: * @return the destinationResolver
169: */
170: public DestinationResolver getDestinationResolver() {
171: return destinationResolver;
172: }
173:
174: /**
175: * @param destinationResolver the destinationResolver to set
176: */
177: public void setDestinationResolver(
178: DestinationResolver destinationResolver) {
179: this .destinationResolver = destinationResolver;
180: }
181:
182: /**
183: * @return the explicitQosEnabled
184: */
185: public boolean isExplicitQosEnabled() {
186: return explicitQosEnabled;
187: }
188:
189: /**
190: * @param explicitQosEnabled the explicitQosEnabled to set
191: */
192: public void setExplicitQosEnabled(boolean explicitQosEnabled) {
193: this .explicitQosEnabled = explicitQosEnabled;
194: }
195:
196: /**
197: * @return the marshaler
198: */
199: public JmsProviderMarshaler getMarshaler() {
200: return marshaler;
201: }
202:
203: /**
204: * @param marshaler the marshaler to set
205: */
206: public void setMarshaler(JmsProviderMarshaler marshaler) {
207: if (marshaler == null) {
208: throw new NullPointerException("marshaler is null");
209: }
210: this .marshaler = marshaler;
211: }
212:
213: /**
214: * @return the messageIdEnabled
215: */
216: public boolean isMessageIdEnabled() {
217: return messageIdEnabled;
218: }
219:
220: /**
221: * @param messageIdEnabled the messageIdEnabled to set
222: */
223: public void setMessageIdEnabled(boolean messageIdEnabled) {
224: this .messageIdEnabled = messageIdEnabled;
225: }
226:
227: /**
228: * @return the messageTimestampEnabled
229: */
230: public boolean isMessageTimestampEnabled() {
231: return messageTimestampEnabled;
232: }
233:
234: /**
235: * @param messageTimestampEnabled the messageTimestampEnabled to set
236: */
237: public void setMessageTimestampEnabled(
238: boolean messageTimestampEnabled) {
239: this .messageTimestampEnabled = messageTimestampEnabled;
240: }
241:
242: /**
243: * @return the priority
244: */
245: public int getPriority() {
246: return priority;
247: }
248:
249: /**
250: * @param priority the priority to set
251: */
252: public void setPriority(int priority) {
253: this .priority = priority;
254: }
255:
256: /**
257: * @return the pubSubDomain
258: */
259: public boolean isPubSubDomain() {
260: return pubSubDomain;
261: }
262:
263: /**
264: * @param pubSubDomain the pubSubDomain to set
265: */
266: public void setPubSubDomain(boolean pubSubDomain) {
267: this .pubSubDomain = pubSubDomain;
268: }
269:
270: /**
271: * @return the pubSubNoLocal
272: */
273: public boolean isPubSubNoLocal() {
274: return pubSubNoLocal;
275: }
276:
277: /**
278: * @param pubSubNoLocal the pubSubNoLocal to set
279: */
280: public void setPubSubNoLocal(boolean pubSubNoLocal) {
281: this .pubSubNoLocal = pubSubNoLocal;
282: }
283:
284: /**
285: * @return the receiveTimeout
286: */
287: public long getReceiveTimeout() {
288: return receiveTimeout;
289: }
290:
291: /**
292: * @param receiveTimeout the receiveTimeout to set
293: */
294: public void setReceiveTimeout(long receiveTimeout) {
295: this .receiveTimeout = receiveTimeout;
296: }
297:
298: /**
299: * @return the timeToLive
300: */
301: public long getTimeToLive() {
302: return timeToLive;
303: }
304:
305: /**
306: * @param timeToLive the timeToLive to set
307: */
308: public void setTimeToLive(long timeToLive) {
309: this .timeToLive = timeToLive;
310: }
311:
312: public boolean isStateless() {
313: return stateless;
314: }
315:
316: public void setStateless(boolean stateless) {
317: this .stateless = stateless;
318: }
319:
320: public StoreFactory getStoreFactory() {
321: return storeFactory;
322: }
323:
324: public void setStoreFactory(StoreFactory storeFactory) {
325: this .storeFactory = storeFactory;
326: }
327:
328: public Store getStore() {
329: return store;
330: }
331:
332: public void setStore(Store store) {
333: this .store = store;
334: }
335:
336: public Destination getReplyDestination() {
337: return replyDestination;
338: }
339:
340: public void setReplyDestination(Destination replyDestination) {
341: this .replyDestination = replyDestination;
342: }
343:
344: public String getReplyDestinationName() {
345: return replyDestinationName;
346: }
347:
348: public void setReplyDestinationName(String replyDestinationName) {
349: this .replyDestinationName = replyDestinationName;
350: }
351:
352: protected void processInOnly(final MessageExchange exchange,
353: final NormalizedMessage in) throws Exception {
354: MessageCreator creator = new MessageCreator() {
355: public Message createMessage(Session session)
356: throws JMSException {
357: try {
358: Message message = marshaler.createMessage(exchange,
359: in, session);
360: if (logger.isTraceEnabled()) {
361: logger.trace("Sending message to: "
362: + template.getDefaultDestinationName()
363: + " message: " + message);
364: }
365: return message;
366: } catch (Exception e) {
367: JMSException jmsEx = new JMSException(
368: "Failed to create JMS Message: " + e);
369: jmsEx.setLinkedException(e);
370: jmsEx.initCause(e);
371: throw jmsEx;
372: }
373: }
374: };
375: Object dest = destinationChooser
376: .chooseDestination(exchange, in);
377: if (dest instanceof Destination) {
378: template.send((Destination) dest, creator);
379: } else if (dest instanceof String) {
380: template.send((String) dest, creator);
381: } else {
382: template.send(creator);
383: }
384: }
385:
386: protected void processInOut(final MessageExchange exchange,
387: final NormalizedMessage in, final NormalizedMessage out)
388: throws Exception {
389: SessionCallback callback = new SessionCallback() {
390: public Object doInJms(Session session) throws JMSException {
391: try {
392: processInOutInSession(exchange, in, out, session);
393: return null;
394: } catch (JMSException e) {
395: throw e;
396: } catch (RuntimeException e) {
397: throw e;
398: } catch (Exception e) {
399: throw new UncategorizedJmsException(e);
400: }
401: }
402: };
403: template.execute(callback, true);
404: }
405:
406: protected void processInOutInSession(
407: final MessageExchange exchange, final NormalizedMessage in,
408: final NormalizedMessage out, final Session session)
409: throws Exception {
410: // Create destinations
411: final Destination dest = getDestination(exchange, in, session);
412: final Destination replyDest = getReplyDestination(exchange,
413: out, session);
414: // Create message and send it
415: final Message sendJmsMsg = marshaler.createMessage(exchange,
416: in, session);
417: //setCorrelationID(sendJmsMsg, exchange);
418: sendJmsMsg.setJMSReplyTo(replyDest);
419: template.send(dest, new MessageCreator() {
420: public Message createMessage(Session session)
421: throws JMSException {
422: return sendJmsMsg;
423: }
424: });
425: // Create selector
426: String jmsId = sendJmsMsg.getJMSMessageID();
427: String selector = MSG_SELECTOR_START + jmsId + MSG_SELECTOR_END;
428: //Receiving JMS Message, Creating and Returning NormalizedMessage out
429: Message receiveJmsMsg = template.receiveSelected(replyDest,
430: selector);
431: if (receiveJmsMsg == null) {
432: throw new IllegalStateException(
433: "Unable to receive response");
434: }
435: marshaler.populateMessage(receiveJmsMsg, exchange, out);
436: }
437:
438: protected Destination getDestination(MessageExchange exchange,
439: Object message, Session session) throws JMSException {
440: Object dest = null;
441: // Let the destinationChooser a chance to choose the destination
442: if (destinationChooser != null) {
443: dest = destinationChooser.chooseDestination(exchange,
444: message);
445: }
446: // Default to destinationName properties
447: if (dest == null) {
448: dest = destinationName;
449: }
450: // Resolve destination if needed
451: if (dest instanceof Destination) {
452: return (Destination) dest;
453: } else if (dest instanceof String) {
454: return destinationResolver.resolveDestinationName(session,
455: (String) dest, isPubSubDomain());
456: }
457: throw new IllegalStateException(
458: "Unable to choose destination for exchange " + exchange);
459: }
460:
461: protected Destination getReplyDestination(MessageExchange exchange,
462: Object message, Session session) throws JMSException {
463: Object dest = null;
464: // Let the destinationChooser a chance to choose the destination
465: if (destinationChooser != null) {
466: dest = destinationChooser.chooseDestination(exchange,
467: message);
468: }
469: // Default to replyDestination / replyDestinationName properties
470: if (dest == null) {
471: dest = replyDestination;
472: }
473: if (dest == null) {
474: dest = replyDestinationName;
475: }
476: // Resolve destination if needed
477: if (dest instanceof Destination) {
478: return (Destination) dest;
479: } else if (dest instanceof String) {
480: return destinationResolver.resolveDestinationName(session,
481: (String) dest, isPubSubDomain());
482: }
483: throw new IllegalStateException(
484: "Unable to choose replyDestination for exchange "
485: + exchange);
486: }
487:
488: public synchronized void start() throws Exception {
489: template = createTemplate();
490: if (store == null && !stateless) {
491: if (storeFactory == null) {
492: storeFactory = new MemoryStoreFactory();
493: }
494: store = storeFactory.open(getService().toString()
495: + getEndpoint());
496: }
497: super .start();
498: }
499:
500: public synchronized void stop() throws Exception {
501: if (store != null) {
502: if (storeFactory != null) {
503: storeFactory.close(store);
504: }
505: store = null;
506: }
507: super .stop();
508: }
509:
510: public void validate() throws DeploymentException {
511: // TODO: check service, endpoint
512: super .validate();
513: if (getService() == null) {
514: throw new DeploymentException("service must be set");
515: }
516: if (getEndpoint() == null) {
517: throw new DeploymentException("endpoint must be set");
518: }
519: if (getConnectionFactory() == null) {
520: throw new DeploymentException(
521: "connectionFactory is required");
522: }
523: }
524:
525: protected JmsTemplate createTemplate() {
526: JmsTemplate tplt;
527: if (isJms102()) {
528: tplt = new JmsTemplate102();
529: } else {
530: tplt = new JmsTemplate();
531: }
532: tplt.setConnectionFactory(getConnectionFactory());
533: if (getDestination() != null) {
534: tplt.setDefaultDestination(getDestination());
535: } else if (getDestinationName() != null) {
536: tplt.setDefaultDestinationName(getDestinationName());
537: }
538: tplt.setDeliveryMode(getDeliveryMode());
539: if (getDestinationResolver() != null) {
540: tplt.setDestinationResolver(getDestinationResolver());
541: }
542: tplt.setExplicitQosEnabled(isExplicitQosEnabled());
543: tplt.setMessageIdEnabled(isMessageIdEnabled());
544: tplt.setMessageTimestampEnabled(isMessageTimestampEnabled());
545: tplt.setPriority(getPriority());
546: tplt.setPubSubDomain(isPubSubDomain());
547: tplt.setPubSubNoLocal(isPubSubNoLocal());
548: tplt.setTimeToLive(getTimeToLive());
549: tplt.setReceiveTimeout(getReceiveTimeout());
550: return tplt;
551: }
552: }
|