001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jms.core;
018:
019: import javax.jms.Connection;
020: import javax.jms.ConnectionFactory;
021: import javax.jms.DeliveryMode;
022: import javax.jms.Destination;
023: import javax.jms.JMSException;
024: import javax.jms.Message;
025: import javax.jms.MessageConsumer;
026: import javax.jms.MessageProducer;
027: import javax.jms.Session;
028:
029: import org.springframework.jms.JmsException;
030: import org.springframework.jms.connection.ConnectionFactoryUtils;
031: import org.springframework.jms.connection.JmsResourceHolder;
032: import org.springframework.jms.support.JmsUtils;
033: import org.springframework.jms.support.converter.MessageConverter;
034: import org.springframework.jms.support.converter.SimpleMessageConverter;
035: import org.springframework.jms.support.destination.JmsDestinationAccessor;
036: import org.springframework.transaction.support.TransactionSynchronizationManager;
037: import org.springframework.util.Assert;
038:
039: /**
040: * Helper class that simplifies synchronous JMS access code.
041: *
042: * <p><b>NOTE:</b> This class requires a JMS 1.1+ provider because it builds
043: * on the domain-independent API. <b>Use the {@link JmsTemplate102} subclass
044: * for a JMS 1.0.2 provider, e.g. when running on a J2EE 1.3 server.</b>
045: *
046: * <p>If you want to use dynamic destination creation, you must specify
047: * the type of JMS destination to create, using the "pubSubDomain" property.
048: * For other operations, this is not necessary, in contrast to when working
049: * with JmsTemplate102. Point-to-Point (Queues) is the default domain.
050: *
051: * <p>Default settings for JMS Sessions are "not transacted" and "auto-acknowledge".
052: * As defined by the J2EE specification, the transaction and acknowledgement
053: * parameters are ignored when a JMS Session is created inside an active
054: * transaction, no matter if a JTA transaction or a Spring-managed transaction.
055: * To configure them for native JMS usage, specify appropriate values for
056: * the "sessionTransacted" and "sessionAcknowledgeMode" bean properties.
057: *
058: * <p>This template uses a
059: * {@link org.springframework.jms.support.destination.DynamicDestinationResolver}
060: * and a {@link org.springframework.jms.support.converter.SimpleMessageConverter}
061: * as default strategies for resolving a destination name or converting a message,
062: * respectively. These defaults can be overridden through the "destinationResolver"
063: * and "messageConverter" bean properties.
064: *
065: * @author Mark Pollack
066: * @author Juergen Hoeller
067: * @since 1.1
068: * @see #setConnectionFactory
069: * @see #setPubSubDomain
070: * @see #setDestinationResolver
071: * @see #setMessageConverter
072: * @see JmsTemplate102
073: * @see javax.jms.MessageProducer
074: * @see javax.jms.MessageConsumer
075: */
076: public class JmsTemplate extends JmsDestinationAccessor implements
077: JmsOperations {
078:
079: /**
080: * Timeout value indicating that a receive operation should
081: * check if a message is immediately available without blocking.
082: */
083: public static final long RECEIVE_TIMEOUT_NO_WAIT = -1;
084:
085: /**
086: * Timeout value indicating a blocking receive without timeout.
087: */
088: public static final long RECEIVE_TIMEOUT_INDEFINITE_WAIT = 0;
089:
090: /** Internal ResourceFactory adapter for interacting with ConnectionFactoryUtils */
091: private final JmsTemplateResourceFactory transactionalResourceFactory = new JmsTemplateResourceFactory();
092:
093: private Object defaultDestination;
094:
095: private MessageConverter messageConverter;
096:
097: private boolean messageIdEnabled = true;
098:
099: private boolean messageTimestampEnabled = true;
100:
101: private boolean pubSubNoLocal = false;
102:
103: private long receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT;
104:
105: private boolean explicitQosEnabled = false;
106:
107: private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
108:
109: private int priority = Message.DEFAULT_PRIORITY;
110:
111: private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
112:
113: /**
114: * Create a new JmsTemplate for bean-style usage.
115: * <p>Note: The ConnectionFactory has to be set before using the instance.
116: * This constructor can be used to prepare a JmsTemplate via a BeanFactory,
117: * typically setting the ConnectionFactory via setConnectionFactory.
118: * @see #setConnectionFactory
119: */
120: public JmsTemplate() {
121: initDefaultStrategies();
122: }
123:
124: /**
125: * Create a new JmsTemplate, given a ConnectionFactory.
126: * @param connectionFactory the ConnectionFactory to obtain Connections from
127: */
128: public JmsTemplate(ConnectionFactory connectionFactory) {
129: this ();
130: setConnectionFactory(connectionFactory);
131: afterPropertiesSet();
132: }
133:
134: /**
135: * Initialize the default implementations for the template's strategies:
136: * DynamicDestinationResolver and SimpleMessageConverter.
137: * @see #setDestinationResolver
138: * @see #setMessageConverter
139: * @see org.springframework.jms.support.destination.DynamicDestinationResolver
140: * @see org.springframework.jms.support.converter.SimpleMessageConverter
141: */
142: protected void initDefaultStrategies() {
143: setMessageConverter(new SimpleMessageConverter());
144: }
145:
146: /**
147: * Set the destination to be used on send/receive operations that do not
148: * have a destination parameter.
149: * <p>Alternatively, specify a "defaultDestinationName", to be
150: * dynamically resolved via the DestinationResolver.
151: * @see #send(MessageCreator)
152: * @see #convertAndSend(Object)
153: * @see #convertAndSend(Object, MessagePostProcessor)
154: * @see #setDefaultDestinationName(String)
155: */
156: public void setDefaultDestination(Destination destination) {
157: this .defaultDestination = destination;
158: }
159:
160: /**
161: * Return the destination to be used on send/receive operations that do not
162: * have a destination parameter.
163: */
164: public Destination getDefaultDestination() {
165: return (this .defaultDestination instanceof Destination ? (Destination) this .defaultDestination
166: : null);
167: }
168:
169: /**
170: * Set the destination name to be used on send/receive operations that
171: * do not have a destination parameter. The specified name will be
172: * dynamically resolved via the DestinationResolver.
173: * <p>Alternatively, specify a JMS Destination object as "defaultDestination".
174: * @see #send(MessageCreator)
175: * @see #convertAndSend(Object)
176: * @see #convertAndSend(Object, MessagePostProcessor)
177: * @see #setDestinationResolver
178: * @see #setDefaultDestination(javax.jms.Destination)
179: */
180: public void setDefaultDestinationName(String destinationName) {
181: this .defaultDestination = destinationName;
182: }
183:
184: /**
185: * Return the destination name to be used on send/receive operations that
186: * do not have a destination parameter.
187: */
188: public String getDefaultDestinationName() {
189: return (this .defaultDestination instanceof String ? (String) this .defaultDestination
190: : null);
191: }
192:
193: /**
194: * Set the message converter for this template. Used to resolve
195: * Object parameters to convertAndSend methods and Object results
196: * from receiveAndConvert methods.
197: * <p>The default converter is a SimpleMessageConverter, which is able
198: * to handle BytesMessages, TextMessages and ObjectMessages.
199: * @see #convertAndSend
200: * @see #receiveAndConvert
201: * @see org.springframework.jms.support.converter.SimpleMessageConverter
202: */
203: public void setMessageConverter(MessageConverter messageConverter) {
204: this .messageConverter = messageConverter;
205: }
206:
207: /**
208: * Return the message converter for this template.
209: */
210: public MessageConverter getMessageConverter() {
211: return this .messageConverter;
212: }
213:
214: /**
215: * Set whether message IDs are enabled. Default is "true".
216: * <p>This is only a hint to the JMS producer.
217: * See the JMS javadocs for details.
218: * @see javax.jms.MessageProducer#setDisableMessageID
219: */
220: public void setMessageIdEnabled(boolean messageIdEnabled) {
221: this .messageIdEnabled = messageIdEnabled;
222: }
223:
224: /**
225: * Return whether message IDs are enabled.
226: */
227: public boolean isMessageIdEnabled() {
228: return this .messageIdEnabled;
229: }
230:
231: /**
232: * Set whether message timestamps are enabled. Default is "true".
233: * <p>This is only a hint to the JMS producer.
234: * See the JMS javadocs for details.
235: * @see javax.jms.MessageProducer#setDisableMessageTimestamp
236: */
237: public void setMessageTimestampEnabled(
238: boolean messageTimestampEnabled) {
239: this .messageTimestampEnabled = messageTimestampEnabled;
240: }
241:
242: /**
243: * Return whether message timestamps are enabled.
244: */
245: public boolean isMessageTimestampEnabled() {
246: return this .messageTimestampEnabled;
247: }
248:
249: /**
250: * Set whether to inhibit the delivery of messages published by its own connection.
251: * Default is "false".
252: * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
253: */
254: public void setPubSubNoLocal(boolean pubSubNoLocal) {
255: this .pubSubNoLocal = pubSubNoLocal;
256: }
257:
258: /**
259: * Return whether to inhibit the delivery of messages published by its own connection.
260: */
261: public boolean isPubSubNoLocal() {
262: return this .pubSubNoLocal;
263: }
264:
265: /**
266: * Set the timeout to use for receive calls (in milliseconds).
267: * <p>The default is {@link #RECEIVE_TIMEOUT_INDEFINITE_WAIT}, which indicates
268: * a blocking receive without timeout.
269: * <p>Specify {@link #RECEIVE_TIMEOUT_NO_WAIT} to inidicate that a receive operation
270: * should check if a message is immediately available without blocking.
271: * @see javax.jms.MessageConsumer#receive(long)
272: * @see javax.jms.MessageConsumer#receive()
273: * @see javax.jms.MessageConsumer#receiveNoWait()
274: */
275: public void setReceiveTimeout(long receiveTimeout) {
276: this .receiveTimeout = receiveTimeout;
277: }
278:
279: /**
280: * Return the timeout to use for receive calls (in milliseconds).
281: */
282: public long getReceiveTimeout() {
283: return this .receiveTimeout;
284: }
285:
286: /**
287: * Set if the QOS values (deliveryMode, priority, timeToLive)
288: * should be used for sending a message.
289: * @see #setDeliveryMode
290: * @see #setPriority
291: * @see #setTimeToLive
292: */
293: public void setExplicitQosEnabled(boolean explicitQosEnabled) {
294: this .explicitQosEnabled = explicitQosEnabled;
295: }
296:
297: /**
298: * If "true", then the values of deliveryMode, priority, and timeToLive
299: * will be used when sending a message. Otherwise, the default values,
300: * that may be set administratively, will be used.
301: * @return true if overriding default values of QOS parameters
302: * (deliveryMode, priority, and timeToLive)
303: * @see #setDeliveryMode
304: * @see #setPriority
305: * @see #setTimeToLive
306: */
307: public boolean isExplicitQosEnabled() {
308: return this .explicitQosEnabled;
309: }
310:
311: /**
312: * Set whether message delivery should be persistent or non-persistent,
313: * specified as boolean value ("true" or "false"). This will set the delivery
314: * mode accordingly, to either "PERSISTENT" (1) or "NON_PERSISTENT" (2).
315: * <p>Default it "true" aka delivery mode "PERSISTENT".
316: * @see #setDeliveryMode(int)
317: * @see javax.jms.DeliveryMode#PERSISTENT
318: * @see javax.jms.DeliveryMode#NON_PERSISTENT
319: */
320: public void setDeliveryPersistent(boolean deliveryPersistent) {
321: this .deliveryMode = (deliveryPersistent ? DeliveryMode.PERSISTENT
322: : DeliveryMode.NON_PERSISTENT);
323: }
324:
325: /**
326: * Set the delivery mode to use when sending a message.
327: * Default is the Message default: "PERSISTENT".
328: * <p>Since a default value may be defined administratively,
329: * this is only used when "isExplicitQosEnabled" equals "true".
330: * @param deliveryMode the delivery mode to use
331: * @see #isExplicitQosEnabled
332: * @see javax.jms.DeliveryMode#PERSISTENT
333: * @see javax.jms.DeliveryMode#NON_PERSISTENT
334: * @see javax.jms.Message#DEFAULT_DELIVERY_MODE
335: * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
336: */
337: public void setDeliveryMode(int deliveryMode) {
338: this .deliveryMode = deliveryMode;
339: }
340:
341: /**
342: * Return the delivery mode to use when sending a message.
343: */
344: public int getDeliveryMode() {
345: return this .deliveryMode;
346: }
347:
348: /**
349: * Set the priority of a message when sending.
350: * <p>Since a default value may be defined administratively,
351: * this is only used when "isExplicitQosEnabled" equals "true".
352: * @see #isExplicitQosEnabled
353: * @see javax.jms.Message#DEFAULT_PRIORITY
354: * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
355: */
356: public void setPriority(int priority) {
357: this .priority = priority;
358: }
359:
360: /**
361: * Return the priority of a message when sending.
362: */
363: public int getPriority() {
364: return this .priority;
365: }
366:
367: /**
368: * Set the time-to-live of the message when sending.
369: * <p>Since a default value may be defined administratively,
370: * this is only used when "isExplicitQosEnabled" equals "true".
371: * @param timeToLive the message's lifetime (in milliseconds)
372: * @see #isExplicitQosEnabled
373: * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
374: * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
375: */
376: public void setTimeToLive(long timeToLive) {
377: this .timeToLive = timeToLive;
378: }
379:
380: /**
381: * Return the time-to-live of the message when sending.
382: */
383: public long getTimeToLive() {
384: return this .timeToLive;
385: }
386:
387: private void checkDefaultDestination() throws IllegalStateException {
388: if (this .defaultDestination == null) {
389: throw new IllegalStateException(
390: "No defaultDestination or defaultDestinationName specified. Check configuration of JmsTemplate.");
391: }
392: }
393:
394: private void checkMessageConverter() throws IllegalStateException {
395: if (getMessageConverter() == null) {
396: throw new IllegalStateException(
397: "No messageConverter registered. Check configuration of JmsTemplate.");
398: }
399: }
400:
401: /**
402: * Execute the action specified by the given action object within a
403: * JMS Session. Generalized version of <code>execute(SessionCallback)</code>,
404: * allowing the JMS Connection to be started on the fly.
405: * <p>Use <code>execute(SessionCallback)</code> for the general case.
406: * Starting the JMS Connection is just necessary for receiving messages,
407: * which is preferably achieved through the <code>receive</code> methods.
408: * @param action callback object that exposes the Session
409: * @param startConnection whether to start the Connection
410: * @return the result object from working with the Session
411: * @throws JmsException if there is any problem
412: * @see #execute(SessionCallback)
413: * @see #receive
414: */
415: public Object execute(SessionCallback action,
416: boolean startConnection) throws JmsException {
417: Assert.notNull(action, "Callback object must not be null");
418:
419: Connection conToClose = null;
420: Session sessionToClose = null;
421: try {
422: Session sessionToUse = ConnectionFactoryUtils
423: .doGetTransactionalSession(getConnectionFactory(),
424: this .transactionalResourceFactory,
425: startConnection);
426: if (sessionToUse == null) {
427: conToClose = createConnection();
428: sessionToClose = createSession(conToClose);
429: if (startConnection) {
430: conToClose.start();
431: }
432: sessionToUse = sessionToClose;
433: }
434: if (logger.isDebugEnabled()) {
435: logger.debug("Executing callback on JMS Session ["
436: + sessionToUse + "]");
437: }
438: return action.doInJms(sessionToUse);
439: } catch (JMSException ex) {
440: throw convertJmsAccessException(ex);
441: } finally {
442: JmsUtils.closeSession(sessionToClose);
443: ConnectionFactoryUtils.releaseConnection(conToClose,
444: getConnectionFactory(), startConnection);
445: }
446: }
447:
448: public Object execute(SessionCallback action) throws JmsException {
449: return execute(action, false);
450: }
451:
452: public Object execute(final ProducerCallback action)
453: throws JmsException {
454: Assert.notNull(action, "Callback object must not be null");
455:
456: return execute(new SessionCallback() {
457: public Object doInJms(Session session) throws JMSException {
458: MessageProducer producer = createProducer(session, null);
459: try {
460: return action.doInJms(session, producer);
461: } finally {
462: JmsUtils.closeMessageProducer(producer);
463: }
464: }
465: }, false);
466: }
467:
468: //-------------------------------------------------------------------------
469: // Convenience methods for sending messages
470: //-------------------------------------------------------------------------
471:
472: public void send(MessageCreator messageCreator) throws JmsException {
473: checkDefaultDestination();
474: if (getDefaultDestination() != null) {
475: send(getDefaultDestination(), messageCreator);
476: } else {
477: send(getDefaultDestinationName(), messageCreator);
478: }
479: }
480:
481: public void send(final Destination destination,
482: final MessageCreator messageCreator) throws JmsException {
483: execute(new SessionCallback() {
484: public Object doInJms(Session session) throws JMSException {
485: doSend(session, destination, messageCreator);
486: return null;
487: }
488: }, false);
489: }
490:
491: public void send(final String destinationName,
492: final MessageCreator messageCreator) throws JmsException {
493: execute(new SessionCallback() {
494: public Object doInJms(Session session) throws JMSException {
495: Destination destination = resolveDestinationName(
496: session, destinationName);
497: doSend(session, destination, messageCreator);
498: return null;
499: }
500: }, false);
501: }
502:
503: /**
504: * Send the given JMS message.
505: * @param session the JMS Session to operate on
506: * @param destination the JMS Destination to send to
507: * @param messageCreator callback to create a JMS Message
508: * @throws JMSException if thrown by JMS API methods
509: */
510: protected void doSend(Session session, Destination destination,
511: MessageCreator messageCreator) throws JMSException {
512:
513: Assert.notNull(messageCreator,
514: "MessageCreator must not be null");
515:
516: MessageProducer producer = createProducer(session, destination);
517: try {
518: Message message = messageCreator.createMessage(session);
519: if (logger.isDebugEnabled()) {
520: logger.debug("Sending created message [" + message
521: + "]");
522: }
523: doSend(producer, message);
524: // Check commit - avoid commit call within a JTA transaction.
525: if (session.getTransacted()
526: && isSessionLocallyTransacted(session)) {
527: // Transacted session created by this template -> commit.
528: JmsUtils.commitIfNecessary(session);
529: }
530: } finally {
531: JmsUtils.closeMessageProducer(producer);
532: }
533: }
534:
535: /**
536: * Actually send the given JMS message.
537: * @param producer the JMS MessageProducer to send with
538: * @param message the JMS Message to send
539: * @throws JMSException if thrown by JMS API methods
540: */
541: protected void doSend(MessageProducer producer, Message message)
542: throws JMSException {
543: if (isExplicitQosEnabled()) {
544: producer.send(message, getDeliveryMode(), getPriority(),
545: getTimeToLive());
546: } else {
547: producer.send(message);
548: }
549: }
550:
551: //-------------------------------------------------------------------------
552: // Convenience methods for sending auto-converted messages
553: //-------------------------------------------------------------------------
554:
555: public void convertAndSend(Object message) throws JmsException {
556: checkDefaultDestination();
557: if (getDefaultDestination() != null) {
558: convertAndSend(getDefaultDestination(), message);
559: } else {
560: convertAndSend(getDefaultDestinationName(), message);
561: }
562: }
563:
564: public void convertAndSend(Destination destination,
565: final Object message) throws JmsException {
566: checkMessageConverter();
567: send(destination, new MessageCreator() {
568: public Message createMessage(Session session)
569: throws JMSException {
570: return getMessageConverter()
571: .toMessage(message, session);
572: }
573: });
574: }
575:
576: public void convertAndSend(String destinationName,
577: final Object message) throws JmsException {
578: checkMessageConverter();
579: send(destinationName, new MessageCreator() {
580: public Message createMessage(Session session)
581: throws JMSException {
582: return getMessageConverter()
583: .toMessage(message, session);
584: }
585: });
586: }
587:
588: public void convertAndSend(Object message,
589: MessagePostProcessor postProcessor) throws JmsException {
590: checkDefaultDestination();
591: if (getDefaultDestination() != null) {
592: convertAndSend(getDefaultDestination(), message,
593: postProcessor);
594: } else {
595: convertAndSend(getDefaultDestinationName(), message,
596: postProcessor);
597: }
598: }
599:
600: public void convertAndSend(Destination destination,
601: final Object message,
602: final MessagePostProcessor postProcessor)
603: throws JmsException {
604:
605: checkMessageConverter();
606: send(destination, new MessageCreator() {
607: public Message createMessage(Session session)
608: throws JMSException {
609: Message msg = getMessageConverter().toMessage(message,
610: session);
611: return postProcessor.postProcessMessage(msg);
612: }
613: });
614: }
615:
616: public void convertAndSend(String destinationName,
617: final Object message,
618: final MessagePostProcessor postProcessor)
619: throws JmsException {
620:
621: checkMessageConverter();
622: send(destinationName, new MessageCreator() {
623: public Message createMessage(Session session)
624: throws JMSException {
625: Message msg = getMessageConverter().toMessage(message,
626: session);
627: return postProcessor.postProcessMessage(msg);
628: }
629: });
630: }
631:
632: //-------------------------------------------------------------------------
633: // Convenience methods for receiving messages
634: //-------------------------------------------------------------------------
635:
636: public Message receive() throws JmsException {
637: checkDefaultDestination();
638: if (getDefaultDestination() != null) {
639: return receive(getDefaultDestination());
640: } else {
641: return receive(getDefaultDestinationName());
642: }
643: }
644:
645: public Message receive(final Destination destination)
646: throws JmsException {
647: return (Message) execute(new SessionCallback() {
648: public Object doInJms(Session session) throws JMSException {
649: return doReceive(session, destination, null);
650: }
651: }, true);
652: }
653:
654: public Message receive(final String destinationName)
655: throws JmsException {
656: return (Message) execute(new SessionCallback() {
657: public Object doInJms(Session session) throws JMSException {
658: Destination destination = resolveDestinationName(
659: session, destinationName);
660: return doReceive(session, destination, null);
661: }
662: }, true);
663: }
664:
665: public Message receiveSelected(String messageSelector)
666: throws JmsException {
667: checkDefaultDestination();
668: if (getDefaultDestination() != null) {
669: return receiveSelected(getDefaultDestination(),
670: messageSelector);
671: } else {
672: return receiveSelected(getDefaultDestinationName(),
673: messageSelector);
674: }
675: }
676:
677: public Message receiveSelected(final Destination destination,
678: final String messageSelector) throws JmsException {
679: return (Message) execute(new SessionCallback() {
680: public Object doInJms(Session session) throws JMSException {
681: return doReceive(session, destination, messageSelector);
682: }
683: }, true);
684: }
685:
686: public Message receiveSelected(final String destinationName,
687: final String messageSelector) throws JmsException {
688: return (Message) execute(new SessionCallback() {
689: public Object doInJms(Session session) throws JMSException {
690: Destination destination = resolveDestinationName(
691: session, destinationName);
692: return doReceive(session, destination, messageSelector);
693: }
694: }, true);
695: }
696:
697: /**
698: * Receive a JMS message.
699: * @param session the JMS Session to operate on
700: * @param destination the JMS Destination to receive from
701: * @param messageSelector the message selector for this consumer (can be <code>null</code>)
702: * @return the JMS Message received, or <code>null</code> if none
703: * @throws JMSException if thrown by JMS API methods
704: */
705: protected Message doReceive(Session session,
706: Destination destination, String messageSelector)
707: throws JMSException {
708:
709: return doReceive(session, createConsumer(session, destination,
710: messageSelector));
711: }
712:
713: /**
714: * Actually receive a JMS message.
715: * @param session the JMS Session to operate on
716: * @param consumer the JMS MessageConsumer to send with
717: * @return the JMS Message received, or <code>null</code> if none
718: * @throws JMSException if thrown by JMS API methods
719: */
720: protected Message doReceive(Session session,
721: MessageConsumer consumer) throws JMSException {
722: try {
723: // Use transaction timeout (if available).
724: long timeout = getReceiveTimeout();
725: JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
726: .getResource(getConnectionFactory());
727: if (resourceHolder != null && resourceHolder.hasTimeout()) {
728: timeout = resourceHolder.getTimeToLiveInMillis();
729: }
730:
731: Message message = null;
732: if (timeout == RECEIVE_TIMEOUT_NO_WAIT) {
733: message = consumer.receiveNoWait();
734: } else if (timeout > 0) {
735: message = consumer.receive(timeout);
736: } else {
737: message = consumer.receive();
738: }
739:
740: if (session.getTransacted()) {
741: // Commit necessary - but avoid commit call within a JTA transaction.
742: if (isSessionLocallyTransacted(session)) {
743: // Transacted session created by this template -> commit.
744: JmsUtils.commitIfNecessary(session);
745: }
746: } else if (isClientAcknowledge(session)) {
747: // Manually acknowledge message, if any.
748: if (message != null) {
749: message.acknowledge();
750: }
751: }
752: return message;
753: } finally {
754: JmsUtils.closeMessageConsumer(consumer);
755: }
756: }
757:
758: //-------------------------------------------------------------------------
759: // Convenience methods for receiving auto-converted messages
760: //-------------------------------------------------------------------------
761:
762: public Object receiveAndConvert() throws JmsException {
763: checkMessageConverter();
764: return doConvertFromMessage(receive());
765: }
766:
767: public Object receiveAndConvert(Destination destination)
768: throws JmsException {
769: checkMessageConverter();
770: return doConvertFromMessage(receive(destination));
771: }
772:
773: public Object receiveAndConvert(String destinationName)
774: throws JmsException {
775: checkMessageConverter();
776: return doConvertFromMessage(receive(destinationName));
777: }
778:
779: public Object receiveSelectedAndConvert(String messageSelector)
780: throws JmsException {
781: checkMessageConverter();
782: return doConvertFromMessage(receiveSelected(messageSelector));
783: }
784:
785: public Object receiveSelectedAndConvert(Destination destination,
786: String messageSelector) throws JmsException {
787: checkMessageConverter();
788: return doConvertFromMessage(receiveSelected(destination,
789: messageSelector));
790: }
791:
792: public Object receiveSelectedAndConvert(String destinationName,
793: String messageSelector) throws JmsException {
794: checkMessageConverter();
795: return doConvertFromMessage(receiveSelected(destinationName,
796: messageSelector));
797: }
798:
799: /**
800: * Extract the content from the given JMS message.
801: * @param message the JMS Message to convert (can be <code>null</code>)
802: * @return the content of the message, or <code>null</code> if none
803: */
804: protected Object doConvertFromMessage(Message message) {
805: if (message != null) {
806: try {
807: return getMessageConverter().fromMessage(message);
808: } catch (JMSException ex) {
809: throw convertJmsAccessException(ex);
810: }
811: }
812: return null;
813: }
814:
815: //-------------------------------------------------------------------------
816: // JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
817: //-------------------------------------------------------------------------
818:
819: /**
820: * Fetch an appropriate Connection from the given JmsResourceHolder.
821: * <p>This implementation accepts any JMS 1.1 Connection.
822: * @param holder the JmsResourceHolder
823: * @return an appropriate Connection fetched from the holder,
824: * or <code>null</code> if none found
825: */
826: protected Connection getConnection(JmsResourceHolder holder) {
827: return holder.getConnection();
828: }
829:
830: /**
831: * Fetch an appropriate Session from the given JmsResourceHolder.
832: * <p>This implementation accepts any JMS 1.1 Session.
833: * @param holder the JmsResourceHolder
834: * @return an appropriate Session fetched from the holder,
835: * or <code>null</code> if none found
836: */
837: protected Session getSession(JmsResourceHolder holder) {
838: return holder.getSession();
839: }
840:
841: /**
842: * Check whether the given Session is locally transacted, that is, whether
843: * its transaction is managed by this listener container's Session handling
844: * and not by an external transaction coordinator.
845: * <p>Note: The Session's own transacted flag will already have been checked
846: * before. This method is about finding out whether the Session's transaction
847: * is local or externally coordinated.
848: * @param session the Session to check
849: * @return whether the given Session is locally transacted
850: * @see #isSessionTransacted()
851: * @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional
852: */
853: protected boolean isSessionLocallyTransacted(Session session) {
854: return isSessionTransacted()
855: && !ConnectionFactoryUtils.isSessionTransactional(
856: session, getConnectionFactory());
857: }
858:
859: /**
860: * Create a JMS MessageProducer for the given Session and Destination,
861: * configuring it to disable message ids and/or timestamps (if necessary).
862: * <p>Delegates to <code>doCreateProducer</code> for creation of the raw
863: * JMS MessageProducer, which needs to be specific to JMS 1.1 or 1.0.2.
864: * @param session the JMS Session to create a MessageProducer for
865: * @param destination the JMS Destination to create a MessageProducer for
866: * @return the new JMS MessageProducer
867: * @throws JMSException if thrown by JMS API methods
868: * @see #doCreateProducer
869: * @see #setMessageIdEnabled
870: * @see #setMessageTimestampEnabled
871: */
872: protected MessageProducer createProducer(Session session,
873: Destination destination) throws JMSException {
874: MessageProducer producer = doCreateProducer(session,
875: destination);
876: if (!isMessageIdEnabled()) {
877: producer.setDisableMessageID(true);
878: }
879: if (!isMessageTimestampEnabled()) {
880: producer.setDisableMessageTimestamp(true);
881: }
882: return producer;
883: }
884:
885: /**
886: * Create a raw JMS MessageProducer for the given Session and Destination.
887: * <p>This implementation uses JMS 1.1 API.
888: * @param session the JMS Session to create a MessageProducer for
889: * @param destination the JMS Destination to create a MessageProducer for
890: * @return the new JMS MessageProducer
891: * @throws JMSException if thrown by JMS API methods
892: */
893: protected MessageProducer doCreateProducer(Session session,
894: Destination destination) throws JMSException {
895: return session.createProducer(destination);
896: }
897:
898: /**
899: * Create a JMS MessageConsumer for the given Session and Destination.
900: * <p>This implementation uses JMS 1.1 API.
901: * @param session the JMS Session to create a MessageConsumer for
902: * @param destination the JMS Destination to create a MessageConsumer for
903: * @param messageSelector the message selector for this consumer (can be <code>null</code>)
904: * @return the new JMS MessageConsumer
905: * @throws JMSException if thrown by JMS API methods
906: */
907: protected MessageConsumer createConsumer(Session session,
908: Destination destination, String messageSelector)
909: throws JMSException {
910:
911: // Only pass in the NoLocal flag in case of a Topic:
912: // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
913: // in case of the NoLocal flag being specified for a Queue.
914: if (isPubSubDomain()) {
915: return session.createConsumer(destination, messageSelector,
916: isPubSubNoLocal());
917: } else {
918: return session.createConsumer(destination, messageSelector);
919: }
920: }
921:
922: /**
923: * ResourceFactory implementation that delegates to this template's protected callback methods.
924: */
925: private class JmsTemplateResourceFactory implements
926: ConnectionFactoryUtils.ResourceFactory {
927:
928: public Connection getConnection(JmsResourceHolder holder) {
929: return JmsTemplate.this .getConnection(holder);
930: }
931:
932: public Session getSession(JmsResourceHolder holder) {
933: return JmsTemplate.this .getSession(holder);
934: }
935:
936: public Connection createConnection() throws JMSException {
937: return JmsTemplate.this .createConnection();
938: }
939:
940: public Session createSession(Connection con)
941: throws JMSException {
942: return JmsTemplate.this .createSession(con);
943: }
944:
945: public boolean isSynchedLocalTransactionAllowed() {
946: return JmsTemplate.this.isSessionTransacted();
947: }
948: }
949:
950: }
|