001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jms.listener;
018:
019: import javax.jms.Connection;
020: import javax.jms.Destination;
021: import javax.jms.JMSException;
022: import javax.jms.Message;
023: import javax.jms.MessageConsumer;
024: import javax.jms.Session;
025: import javax.jms.Topic;
026:
027: import org.springframework.beans.factory.BeanNameAware;
028: import org.springframework.jms.connection.ConnectionFactoryUtils;
029: import org.springframework.jms.connection.JmsResourceHolder;
030: import org.springframework.jms.support.JmsUtils;
031: import org.springframework.transaction.PlatformTransactionManager;
032: import org.springframework.transaction.TransactionStatus;
033: import org.springframework.transaction.support.DefaultTransactionDefinition;
034: import org.springframework.transaction.support.ResourceTransactionManager;
035:
036: /**
037: * Base class for listener container implementations which are based on polling.
038: * Provides support for listener handling based on {@link javax.jms.MessageConsumer},
039: * optionally participating in externally managed transactions.
040: *
041: * <p>This listener container variant is built for repeated polling attempts,
042: * each invoking the {@link #receiveAndExecute} method. The MessageConsumer used
043: * may be reobtained fo reach attempt or cached inbetween attempts; this is up
044: * to the concrete implementation. The receive timeout for each attempt can be
045: * configured through the {@link #setReceiveTimeout "receiveTimeout"} property.
046: *
047: * <p>The underlying mechanism is based on standard JMS MessageConsumer handling,
048: * which is perfectly compatible with both native JMS and JMS in a J2EE environment.
049: * Neither the JMS <code>MessageConsumer.setMessageListener</code> facility
050: * nor the JMS ServerSessionPool facility is required. A further advantage
051: * of this approach is full control over the listening process, allowing for
052: * custom scaling and throttling and of concurrent message processing
053: * (which is up to concrete subclasses).
054: *
055: * <p>Message reception and listener execution can automatically be wrapped
056: * in transactions through passing a Spring
057: * {@link org.springframework.transaction.PlatformTransactionManager} into the
058: * {@link #setTransactionManager "transactionManager"} property. This will usually
059: * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
060: * J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory obtained
061: * from JNDI (check your J2EE server's documentation).
062: *
063: * <p>This base class does not assume any specific mechanism for asynchronous
064: * execution of polling invokers. Check out {@link DefaultMessageListenerContainer}
065: * for a concrete implementation which is based on Spring's
066: * {@link org.springframework.core.task.TaskExecutor} abstraction,
067: * including dynamic scaling of concurrent consumers and automatic self recovery.
068: *
069: * @author Juergen Hoeller
070: * @since 2.0.3
071: * @see #createListenerConsumer(javax.jms.Session)
072: * @see #receiveAndExecute(javax.jms.Session, javax.jms.MessageConsumer)
073: * @see #setTransactionManager
074: */
075: public abstract class AbstractPollingMessageListenerContainer extends
076: AbstractMessageListenerContainer implements BeanNameAware {
077:
078: /**
079: * The default receive timeout: 1000 ms = 1 second.
080: */
081: public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
082:
083: private final MessageListenerContainerResourceFactory transactionalResourceFactory = new MessageListenerContainerResourceFactory();
084:
085: private boolean sessionTransactedCalled = false;
086:
087: private boolean pubSubNoLocal = false;
088:
089: private PlatformTransactionManager transactionManager;
090:
091: private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
092:
093: private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
094:
095: public void setSessionTransacted(boolean sessionTransacted) {
096: super .setSessionTransacted(sessionTransacted);
097: this .sessionTransactedCalled = true;
098: }
099:
100: /**
101: * Set whether to inhibit the delivery of messages published by its own connection.
102: * Default is "false".
103: * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
104: */
105: public void setPubSubNoLocal(boolean pubSubNoLocal) {
106: this .pubSubNoLocal = pubSubNoLocal;
107: }
108:
109: /**
110: * Return whether to inhibit the delivery of messages published by its own connection.
111: */
112: protected boolean isPubSubNoLocal() {
113: return this .pubSubNoLocal;
114: }
115:
116: /**
117: * Specify the Spring {@link org.springframework.transaction.PlatformTransactionManager}
118: * to use for transactional wrapping of message reception plus listener execution.
119: * <p>Default is none, not performing any transactional wrapping.
120: * If specified, this will usually be a Spring
121: * {@link org.springframework.transaction.jta.JtaTransactionManager} or one
122: * of its subclasses, in combination with a JTA-aware ConnectionFactory that
123: * this message listener container obtains its Connections from.
124: * <p><b>Note: Consider the use of local JMS transactions instead.</b>
125: * Simply switch the {@link #setSessionTransacted "sessionTransacted"} flag
126: * to "true" in order to use a locally transacted JMS Session for the entire
127: * receive processing, including any Session operations performed by a
128: * {@link SessionAwareMessageListener} (e.g. sending a response message).
129: * Alternatively, a {@link org.springframework.jms.connection.JmsTransactionManager}
130: * may be used for fully synchronized Spring transactions based on local JMS
131: * transactions. Check {@link AbstractMessageListenerContainer}'s javadoc for
132: * a discussion of transaction choices and message redelivery scenarios.
133: * @see org.springframework.transaction.jta.JtaTransactionManager
134: * @see org.springframework.jms.connection.JmsTransactionManager
135: */
136: public void setTransactionManager(
137: PlatformTransactionManager transactionManager) {
138: this .transactionManager = transactionManager;
139: }
140:
141: /**
142: * Return the Spring PlatformTransactionManager to use for transactional
143: * wrapping of message reception plus listener execution.
144: */
145: protected final PlatformTransactionManager getTransactionManager() {
146: return this .transactionManager;
147: }
148:
149: /**
150: * Specify the transaction name to use for transactional wrapping.
151: * Default is the bean name of this listener container, if any.
152: * @see org.springframework.transaction.TransactionDefinition#getName()
153: */
154: public void setTransactionName(String transactionName) {
155: this .transactionDefinition.setName(transactionName);
156: }
157:
158: /**
159: * Specify the transaction timeout to use for transactional wrapping, in <b>seconds</b>.
160: * Default is none, using the transaction manager's default timeout.
161: * @see org.springframework.transaction.TransactionDefinition#getTimeout()
162: * @see #setReceiveTimeout
163: */
164: public void setTransactionTimeout(int transactionTimeout) {
165: this .transactionDefinition.setTimeout(transactionTimeout);
166: }
167:
168: /**
169: * Set the timeout to use for receive calls, in <b>milliseconds</b>.
170: * The default is 1000 ms, that is, 1 second.
171: * <p><b>NOTE:</b> This value needs to be smaller than the transaction
172: * timeout used by the transaction manager (in the appropriate unit,
173: * of course). -1 indicates no timeout at all; however, this is only
174: * feasible if not running within a transaction manager.
175: * @see javax.jms.MessageConsumer#receive(long)
176: * @see javax.jms.MessageConsumer#receive()
177: * @see #setTransactionTimeout
178: */
179: public void setReceiveTimeout(long receiveTimeout) {
180: this .receiveTimeout = receiveTimeout;
181: }
182:
183: public void initialize() {
184: // Set sessionTransacted=true in case of a non-JTA transaction manager.
185: if (!this .sessionTransactedCalled
186: && this .transactionManager instanceof ResourceTransactionManager
187: && ((ResourceTransactionManager) this .transactionManager)
188: .getResourceFactory() != getConnectionFactory()) {
189: super .setSessionTransacted(true);
190: }
191:
192: // Use bean name as default transaction name.
193: if (this .transactionDefinition.getName() == null) {
194: this .transactionDefinition.setName(getBeanName());
195: }
196:
197: // Proceed with superclass initialization.
198: super .initialize();
199: }
200:
201: /**
202: * Create a MessageConsumer for the given JMS Session,
203: * registering a MessageListener for the specified listener.
204: * @param session the JMS Session to work on
205: * @return the MessageConsumer
206: * @throws javax.jms.JMSException if thrown by JMS methods
207: * @see #receiveAndExecute
208: */
209: protected MessageConsumer createListenerConsumer(Session session)
210: throws JMSException {
211: Destination destination = getDestination();
212: if (destination == null) {
213: destination = resolveDestinationName(session,
214: getDestinationName());
215: }
216: return createConsumer(session, destination);
217: }
218:
219: /**
220: * Execute the listener for a message received from the given consumer,
221: * wrapping the entire operation in an external transaction if demanded.
222: * @param session the JMS Session to work on
223: * @param consumer the MessageConsumer to work on
224: * @return whether a message has been received
225: * @throws JMSException if thrown by JMS methods
226: * @see #doReceiveAndExecute
227: */
228: protected boolean receiveAndExecute(Session session,
229: MessageConsumer consumer) throws JMSException {
230: if (this .transactionManager != null) {
231: // Execute receive within transaction.
232: TransactionStatus status = this .transactionManager
233: .getTransaction(this .transactionDefinition);
234: boolean messageReceived = true;
235: try {
236: messageReceived = doReceiveAndExecute(session,
237: consumer, status);
238: } catch (JMSException ex) {
239: rollbackOnException(status, ex);
240: throw ex;
241: } catch (RuntimeException ex) {
242: rollbackOnException(status, ex);
243: throw ex;
244: } catch (Error err) {
245: rollbackOnException(status, err);
246: throw err;
247: }
248: this .transactionManager.commit(status);
249: return messageReceived;
250: }
251:
252: else {
253: // Execute receive outside of transaction.
254: return doReceiveAndExecute(session, consumer, null);
255: }
256: }
257:
258: /**
259: * Actually execute the listener for a message received from the given consumer,
260: * fetching all requires resources and invoking the listener.
261: * @param session the JMS Session to work on
262: * @param consumer the MessageConsumer to work on
263: * @param status the TransactionStatus (may be <code>null</code>)
264: * @return whether a message has been received
265: * @throws JMSException if thrown by JMS methods
266: * @see #doExecuteListener(javax.jms.Session, javax.jms.Message)
267: */
268: protected boolean doReceiveAndExecute(Session session,
269: MessageConsumer consumer, TransactionStatus status)
270: throws JMSException {
271:
272: Connection conToClose = null;
273: Session sessionToClose = null;
274: MessageConsumer consumerToClose = null;
275: try {
276: Session sessionToUse = session;
277: boolean transactional = false;
278: if (sessionToUse == null) {
279: sessionToUse = ConnectionFactoryUtils
280: .doGetTransactionalSession(
281: getConnectionFactory(),
282: this .transactionalResourceFactory);
283: transactional = (sessionToUse != null);
284: }
285: if (sessionToUse == null) {
286: Connection conToUse = null;
287: if (sharedConnectionEnabled()) {
288: conToUse = getSharedConnection();
289: } else {
290: conToUse = createConnection();
291: conToClose = conToUse;
292: conToUse.start();
293: }
294: sessionToUse = createSession(conToUse);
295: sessionToClose = sessionToUse;
296: }
297: MessageConsumer consumerToUse = consumer;
298: if (consumerToUse == null) {
299: consumerToUse = createListenerConsumer(sessionToUse);
300: consumerToClose = consumerToUse;
301: }
302: Message message = receiveMessage(consumerToUse);
303: if (message != null) {
304: if (logger.isDebugEnabled()) {
305: logger.debug("Received message of type ["
306: + message.getClass() + "] from consumer ["
307: + consumerToUse + "] of "
308: + (transactional ? "transactional " : "")
309: + "session [" + sessionToUse + "]");
310: }
311: messageReceived(message, session);
312: try {
313: doExecuteListener(sessionToUse, message);
314: } catch (Throwable ex) {
315: if (status != null) {
316: if (logger.isDebugEnabled()) {
317: logger
318: .debug("Rolling back transaction because of listener exception thrown: "
319: + ex);
320: }
321: status.setRollbackOnly();
322: }
323: handleListenerException(ex);
324: }
325: return true;
326: } else {
327: if (logger.isDebugEnabled()) {
328: logger.debug("Consumer [" + consumerToUse + "] of "
329: + (transactional ? "transactional " : "")
330: + "session [" + sessionToUse
331: + "] did not receive a message");
332: }
333: return false;
334: }
335: } finally {
336: JmsUtils.closeMessageConsumer(consumerToClose);
337: JmsUtils.closeSession(sessionToClose);
338: ConnectionFactoryUtils.releaseConnection(conToClose,
339: getConnectionFactory(), true);
340: }
341: }
342:
343: /**
344: * This implementation checks whether the Session is externally synchronized.
345: * In this case, the Session is not locally transacted, despite the listener
346: * container's "sessionTransacted" flag being set to "true".
347: * @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional
348: */
349: protected boolean isSessionLocallyTransacted(Session session) {
350: return super .isSessionLocallyTransacted(session)
351: && !ConnectionFactoryUtils.isSessionTransactional(
352: session, getConnectionFactory());
353: }
354:
355: /**
356: * Perform a rollback, handling rollback exceptions properly.
357: * @param status object representing the transaction
358: * @param ex the thrown application exception or error
359: */
360: private void rollbackOnException(TransactionStatus status,
361: Throwable ex) {
362: logger
363: .debug(
364: "Initiating transaction rollback on application exception",
365: ex);
366: try {
367: this .transactionManager.rollback(status);
368: } catch (RuntimeException ex2) {
369: logger
370: .error(
371: "Application exception overridden by rollback exception",
372: ex);
373: throw ex2;
374: } catch (Error err) {
375: logger
376: .error(
377: "Application exception overridden by rollback error",
378: ex);
379: throw err;
380: }
381: }
382:
383: /**
384: * Receive a message from the given consumer.
385: * @param consumer the MessageConsumer to use
386: * @return the Message, or <code>null</code> if none
387: * @throws JMSException if thrown by JMS methods
388: */
389: protected Message receiveMessage(MessageConsumer consumer)
390: throws JMSException {
391: return (this .receiveTimeout < 0 ? consumer.receive() : consumer
392: .receive(this .receiveTimeout));
393: }
394:
395: /**
396: * Template method that gets called right when a new message has been received,
397: * before attempting to process it. Allows subclasses to react to the event
398: * of an actual incoming message, for example adapting their consumer count.
399: * @param message the received message
400: * @param session the receiving JMS Session
401: */
402: protected void messageReceived(Message message, Session session) {
403: }
404:
405: //-------------------------------------------------------------------------
406: // JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
407: //-------------------------------------------------------------------------
408:
409: /**
410: * Fetch an appropriate Connection from the given JmsResourceHolder.
411: * <p>This implementation accepts any JMS 1.1 Connection.
412: * @param holder the JmsResourceHolder
413: * @return an appropriate Connection fetched from the holder,
414: * or <code>null</code> if none found
415: */
416: protected Connection getConnection(JmsResourceHolder holder) {
417: return holder.getConnection();
418: }
419:
420: /**
421: * Fetch an appropriate Session from the given JmsResourceHolder.
422: * <p>This implementation accepts any JMS 1.1 Session.
423: * @param holder the JmsResourceHolder
424: * @return an appropriate Session fetched from the holder,
425: * or <code>null</code> if none found
426: */
427: protected Session getSession(JmsResourceHolder holder) {
428: return holder.getSession();
429: }
430:
431: /**
432: * Create a JMS MessageConsumer for the given Session and Destination.
433: * <p>This implementation uses JMS 1.1 API.
434: * @param session the JMS Session to create a MessageConsumer for
435: * @param destination the JMS Destination to create a MessageConsumer for
436: * @return the new JMS MessageConsumer
437: * @throws javax.jms.JMSException if thrown by JMS API methods
438: */
439: protected MessageConsumer createConsumer(Session session,
440: Destination destination) throws JMSException {
441: // Only pass in the NoLocal flag in case of a Topic:
442: // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
443: // in case of the NoLocal flag being specified for a Queue.
444: if (isPubSubDomain()) {
445: if (isSubscriptionDurable() && destination instanceof Topic) {
446: return session.createDurableSubscriber(
447: (Topic) destination,
448: getDurableSubscriptionName(),
449: getMessageSelector(), isPubSubNoLocal());
450: } else {
451: return session.createConsumer(destination,
452: getMessageSelector(), isPubSubNoLocal());
453: }
454: } else {
455: return session.createConsumer(destination,
456: getMessageSelector());
457: }
458: }
459:
460: /**
461: * ResourceFactory implementation that delegates to this listener container's protected callback methods.
462: */
463: private class MessageListenerContainerResourceFactory implements
464: ConnectionFactoryUtils.ResourceFactory {
465:
466: public Connection getConnection(JmsResourceHolder holder) {
467: return AbstractPollingMessageListenerContainer.this
468: .getConnection(holder);
469: }
470:
471: public Session getSession(JmsResourceHolder holder) {
472: return AbstractPollingMessageListenerContainer.this
473: .getSession(holder);
474: }
475:
476: public Connection createConnection() throws JMSException {
477: if (AbstractPollingMessageListenerContainer.this
478: .sharedConnectionEnabled()) {
479: return AbstractPollingMessageListenerContainer.this
480: .getSharedConnection();
481: } else {
482: return AbstractPollingMessageListenerContainer.this
483: .createConnection();
484: }
485: }
486:
487: public Session createSession(Connection con)
488: throws JMSException {
489: return AbstractPollingMessageListenerContainer.this
490: .createSession(con);
491: }
492:
493: public boolean isSynchedLocalTransactionAllowed() {
494: return AbstractPollingMessageListenerContainer.this
495: .isSessionTransacted();
496: }
497: }
498:
499: }
|