001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jms.listener;
018:
019: import java.util.HashSet;
020: import java.util.Iterator;
021: import java.util.Set;
022:
023: import javax.jms.Connection;
024: import javax.jms.JMSException;
025: import javax.jms.Message;
026: import javax.jms.MessageConsumer;
027: import javax.jms.Session;
028:
029: import org.springframework.core.Constants;
030: import org.springframework.core.task.SimpleAsyncTaskExecutor;
031: import org.springframework.core.task.TaskExecutor;
032: import org.springframework.jms.support.JmsUtils;
033: import org.springframework.jms.support.destination.CachingDestinationResolver;
034: import org.springframework.jms.support.destination.DestinationResolver;
035: import org.springframework.scheduling.SchedulingAwareRunnable;
036: import org.springframework.scheduling.SchedulingTaskExecutor;
037: import org.springframework.util.Assert;
038: import org.springframework.util.ClassUtils;
039:
040: /**
041: * Message listener container variant that uses plain JMS client API, specifically
042: * a loop of <code>MessageConsumer.receive()</code> calls that also allow for
043: * transactional reception of messages (registering them with XA transactions).
044: * Designed to work in a native JMS environment as well as in a J2EE environment,
045: * with only minimal differences in configuration.
046: *
047: * <p>This is a simple but nevertheless powerful form of message listener container.
048: * On startup, it obtains a fixed number of JMS Sessions to invoke the listener,
049: * and optionally allows for dynamic adaptation at runtime (up until a maximum number).
050: * Like {@link SimpleMessageListenerContainer}, its main advantage is its low level
051: * of runtime complexity, in particular the minimal requirements on the JMS provider:
052: * Not even the JMS ServerSessionPool facility is required. Beyond that, it is
053: * fully self-recovering in case of the broker being temporarily unavailable,
054: * and allows for stops/restarts as well as runtime changes to its configuration.
055: *
056: * <p>Actual MessageListener execution happens in asynchronous work units which are
057: * created through Spring's {@link org.springframework.core.task.TaskExecutor}
058: * abstraction. By default, the specified number of invoker tasks will be created
059: * on startup, according to the {@link #setConcurrentConsumers "concurrentConsumers"}
060: * setting. Specify an alternative TaskExecutor to integrate with an existing
061: * thread pool facility (such as a J2EE server's), for example using a
062: * {@link org.springframework.scheduling.commonj.WorkManagerTaskExecutor CommonJ WorkManager}.
063: * With a native JMS setup, each of those listener threads is going to use a
064: * cached JMS Session and MessageConsumer (only refreshed in case of failure),
065: * using the JMS provider's resources as efficiently as possible.
066: *
067: * <p>Message reception and listener execution can automatically be wrapped
068: * in transactions through passing a Spring
069: * {@link org.springframework.transaction.PlatformTransactionManager} into the
070: * {@link #setTransactionManager "transactionManager"} property. This will usually
071: * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
072: * J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory obtained
073: * from JNDI (check your J2EE server's documentation). Note that this listener
074: * container will automatically reobtain all JMS handles for each transaction
075: * in case of an external transaction manager specified, for compatibility with
076: * all J2EE servers (in particular JBoss). This non-caching behavior can be
077: * overridden through the {@link #setCacheLevel "cacheLevel"} /
078: * {@link #setCacheLevelName "cacheLevelName"} property, enforcing caching
079: * of the Connection (or also Session and MessageConsumer) even in case of
080: * an external transaction manager being involved.
081: *
082: * <p>Dynamic scaling of the number of concurrent invokers can be activated
083: * through specifying a {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"}
084: * value that is higher than the {@link #setConcurrentConsumers "concurrentConsumers"}
085: * value. Since the latter's default is 1, you can also simply specify a
086: * "maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling up to
087: * 5 concurrent consumers in case of increasing message load, as well as dynamic
088: * shrinking back to the standard number of consumers once the load decreases.
089: * Consider adapting the {@link #setIdleTaskExecutionLimit "idleTaskExecutionLimit"}
090: * setting to control the lifespan of each new task, to avoid frequent scaling up
091: * and down, in particular if the ConnectionFactory does not pool JMS Sessions
092: * and/or the TaskExecutor does not pool threads (check your configuration!).
093: * Note that dynamic scaling only really makes sense for a queue in the first
094: * place; for a topic, you will typically stick with the default number of 1
095: * consumer, else you'd receive the same message multiple times on the same node.
096: *
097: * <p><b>It is strongly recommended to either set {@link #setSessionTransacted
098: * "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
099: * "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
100: * javadoc for details on acknowledge modes and native transaction options,
101: * as well as the {@link AbstractPollingMessageListenerContainer} javadoc
102: * for details on configuring an external transaction manager.
103: *
104: * <p>This class requires a JMS 1.1+ provider, because it builds on the
105: * domain-independent API. <b>Use the {@link DefaultMessageListenerContainer102}
106: * subclass for JMS 1.0.2 providers.</b>
107: *
108: * @author Juergen Hoeller
109: * @since 2.0
110: * @see #setTransactionManager
111: * @see #setCacheLevel
112: * @see javax.jms.MessageConsumer#receive(long)
113: * @see SimpleMessageListenerContainer
114: * @see org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer
115: * @see DefaultMessageListenerContainer102
116: */
117: public class DefaultMessageListenerContainer extends
118: AbstractPollingMessageListenerContainer {
119:
120: /**
121: * Default thread name prefix: "DefaultMessageListenerContainer-".
122: */
123: public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils
124: .getShortName(DefaultMessageListenerContainer.class)
125: + "-";
126:
127: /**
128: * The default recovery interval: 5000 ms = 5 seconds.
129: */
130: public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
131:
132: /**
133: * Constant that indicates to cache no JMS resources at all.
134: * @see #setCacheLevel
135: */
136: public static final int CACHE_NONE = 0;
137:
138: /**
139: * Constant that indicates to cache a shared JMS Connection.
140: * @see #setCacheLevel
141: */
142: public static final int CACHE_CONNECTION = 1;
143:
144: /**
145: * Constant that indicates to cache a shared JMS Connection
146: * and a JMS Session for each listener thread.
147: * @see #setCacheLevel
148: */
149: public static final int CACHE_SESSION = 2;
150:
151: /**
152: * Constant that indicates to cache a shared JMS Connection
153: * and a JMS Session for each listener thread, as well as
154: * a JMS MessageConsumer for each listener thread.
155: * @see #setCacheLevel
156: */
157: public static final int CACHE_CONSUMER = 3;
158:
159: private static final Constants constants = new Constants(
160: DefaultMessageListenerContainer.class);
161:
162: private TaskExecutor taskExecutor;
163:
164: private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
165:
166: private Integer cacheLevel;
167:
168: private int concurrentConsumers = 1;
169:
170: private int maxConcurrentConsumers = 1;
171:
172: private int maxMessagesPerTask = Integer.MIN_VALUE;
173:
174: private int idleTaskExecutionLimit = 1;
175:
176: private final Set scheduledInvokers = new HashSet();
177:
178: private int activeInvokerCount = 0;
179:
180: private final Object activeInvokerMonitor = new Object();
181:
182: private Object currentRecoveryMarker = new Object();
183:
184: private final Object recoveryMonitor = new Object();
185:
186: /**
187: * Set the Spring TaskExecutor to use for running the listener threads.
188: * <p>Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor},
189: * starting up a number of new threads, according to the specified number
190: * of concurrent consumers.
191: * <p>Specify an alternative TaskExecutor for integration with an existing
192: * thread pool. Note that this really only adds value if the threads are
193: * managed in a specific fashion, for example within a J2EE environment.
194: * A plain thread pool does not add much value, as this listener container
195: * will occupy a number of threads for its entire lifetime.
196: * @see #setConcurrentConsumers
197: * @see org.springframework.core.task.SimpleAsyncTaskExecutor
198: * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
199: */
200: public void setTaskExecutor(TaskExecutor taskExecutor) {
201: this .taskExecutor = taskExecutor;
202: }
203:
204: /**
205: * Specify the interval between recovery attempts, in <b>milliseconds</b>.
206: * The default is 5000 ms, that is, 5 seconds.
207: * @see #handleListenerSetupFailure
208: */
209: public void setRecoveryInterval(long recoveryInterval) {
210: this .recoveryInterval = recoveryInterval;
211: }
212:
213: /**
214: * Specify the level of caching that this listener container is allowed to apply,
215: * in the form of the name of the corresponding constant: e.g. "CACHE_CONNECTION".
216: * @see #setCacheLevel
217: */
218: public void setCacheLevelName(String constantName)
219: throws IllegalArgumentException {
220: if (constantName == null || !constantName.startsWith("CACHE_")) {
221: throw new IllegalArgumentException(
222: "Only cache constants allowed");
223: }
224: setCacheLevel(constants.asNumber(constantName).intValue());
225: }
226:
227: /**
228: * Specify the level of caching that this listener container is allowed to apply.
229: * <p>Default is CACHE_NONE if an external transaction manager has been specified
230: * (to reobtain all resources freshly within the scope of the external transaction),
231: * and CACHE_CONSUMER else (operating with local JMS resources).
232: * <p>Some J2EE servers only register their JMS resources with an ongoing XA
233: * transaction in case of a freshly obtained JMS Connection and Session,
234: * which is why this listener container does by default not cache any of those.
235: * However, if you want to optimize for a specific server, consider switching
236: * this setting to at least CACHE_CONNECTION or CACHE_SESSION even in
237: * conjunction with an external transaction manager.
238: * <p>Currently known servers that absolutely require CACHE_NONE for XA
239: * transaction processing: JBoss 4. For any others, consider raising the
240: * cache level.
241: * @see #CACHE_NONE
242: * @see #CACHE_CONNECTION
243: * @see #CACHE_SESSION
244: * @see #CACHE_CONSUMER
245: * @see #setCacheLevelName
246: * @see #setTransactionManager
247: */
248: public void setCacheLevel(int cacheLevel) {
249: this .cacheLevel = new Integer(cacheLevel);
250: }
251:
252: /**
253: * Return the level of caching that this listener container is allowed to apply.
254: */
255: public int getCacheLevel() {
256: return (this .cacheLevel != null ? this .cacheLevel.intValue()
257: : CACHE_NONE);
258: }
259:
260: /**
261: * Specify the number of concurrent consumers to create. Default is 1.
262: * <p>Specifying a higher value for this setting will increase the standard
263: * level of scheduled concurrent consumers at runtime: This is effectively
264: * the minimum number of concurrent consumers which will be scheduled
265: * at any given time. This is a static setting; for dynamic scaling,
266: * consider specifying the "maxConcurrentConsumers" setting instead.
267: * <p>Raising the number of concurrent consumers is recommendable in order
268: * to scale the consumption of messages coming in from a queue. However,
269: * note that any ordering guarantees are lost once multiple consumers are
270: * registered. In general, stick with 1 consumer for low-volume queues.
271: * <p><b>Do not raise the number of concurrent consumers for a topic.</b>
272: * This would lead to concurrent consumption of the same message,
273: * which is hardly ever desirable.
274: * <p><b>This setting can be modified at runtime, for example through JMX.</b>
275: * @see #setMaxConcurrentConsumers
276: */
277: public void setConcurrentConsumers(int concurrentConsumers) {
278: Assert.isTrue(concurrentConsumers > 0,
279: "'concurrentConsumers' value must be at least 1 (one)");
280: synchronized (this .activeInvokerMonitor) {
281: this .concurrentConsumers = concurrentConsumers;
282: if (this .maxConcurrentConsumers < concurrentConsumers) {
283: this .maxConcurrentConsumers = concurrentConsumers;
284: }
285: }
286: }
287:
288: /**
289: * Return the "concurrentConsumer" setting.
290: * <p>This returns the currently configured "concurrentConsumers" value;
291: * the number of currently scheduled/active consumers might differ.
292: * @see #getScheduledConsumerCount()
293: * @see #getActiveConsumerCount()
294: */
295: public final int getConcurrentConsumers() {
296: synchronized (this .activeInvokerMonitor) {
297: return this .concurrentConsumers;
298: }
299: }
300:
301: /**
302: * Specify the maximum number of concurrent consumers to create. Default is 1.
303: * <p>If this setting is higher than "concurrentConsumers", the listener container
304: * will dynamically schedule new consumers at runtime, provided that enough
305: * incoming messages are encountered. Once the load goes down again, the number of
306: * consumers will be reduced to the standard level ("concurrentConsumers") again.
307: * <p>Raising the number of concurrent consumers is recommendable in order
308: * to scale the consumption of messages coming in from a queue. However,
309: * note that any ordering guarantees are lost once multiple consumers are
310: * registered. In general, stick with 1 consumer for low-volume queues.
311: * <p><b>Do not raise the number of concurrent consumers for a topic.</b>
312: * This would lead to concurrent consumption of the same message,
313: * which is hardly ever desirable.
314: * <p><b>This setting can be modified at runtime, for example through JMX.</b>
315: * @see #setConcurrentConsumers
316: */
317: public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
318: Assert
319: .isTrue(maxConcurrentConsumers > 0,
320: "'maxConcurrentConsumers' value must be at least 1 (one)");
321: synchronized (this .activeInvokerMonitor) {
322: this .maxConcurrentConsumers = (maxConcurrentConsumers > this .concurrentConsumers ? maxConcurrentConsumers
323: : this .concurrentConsumers);
324: }
325: }
326:
327: /**
328: * Return the "maxConcurrentConsumer" setting.
329: * <p>This returns the currently configured "maxConcurrentConsumers" value;
330: * the number of currently scheduled/active consumers might differ.
331: * @see #getScheduledConsumerCount()
332: * @see #getActiveConsumerCount()
333: */
334: public final int getMaxConcurrentConsumers() {
335: synchronized (this .activeInvokerMonitor) {
336: return this .maxConcurrentConsumers;
337: }
338: }
339:
340: /**
341: * Specify the maximum number of messages to process in one task.
342: * More concretely, this limits the number of message reception attempts per
343: * task, which includes receive iterations that did not actually pick up a
344: * message until they hit their timeout (see "receiveTimeout" property).
345: * <p>Default is unlimited (-1) in case of a standard TaskExecutor,
346: * and 1 in case of a SchedulingTaskExecutor that indicates a preference for
347: * short-lived tasks. Specify a number of 10 to 100 messages to balance
348: * between extremely long-lived and extremely short-lived tasks here.
349: * <p>Long-lived tasks avoid frequent thread context switches through
350: * sticking with the same thread all the way through, while short-lived
351: * tasks allow thread pools to control the scheduling. Hence, thread
352: * pools will usually prefer short-lived tasks.
353: * <p><b>This setting can be modified at runtime, for example through JMX.</b>
354: * @see #setTaskExecutor
355: * @see #setReceiveTimeout
356: * @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
357: */
358: public void setMaxMessagesPerTask(int maxMessagesPerTask) {
359: Assert.isTrue(maxMessagesPerTask != 0,
360: "'maxMessagesPerTask' must not be 0");
361: synchronized (this .activeInvokerMonitor) {
362: this .maxMessagesPerTask = maxMessagesPerTask;
363: }
364: }
365:
366: /**
367: * Return the maximum number of messages to process in one task.
368: */
369: public int getMaxMessagesPerTask() {
370: synchronized (this .activeInvokerMonitor) {
371: return this .maxMessagesPerTask;
372: }
373: }
374:
375: /**
376: * Specify the limit for idle executions of a receive task, not having
377: * received any message within its execution. If this limit is reached,
378: * the task will shut down and leave receiving to other executing tasks
379: * (in case of dynamic scheduling; see the "maxConcurrentConsumers" setting).
380: * Default is 1.
381: * <p>Within each task execution, a number of message reception attempts
382: * (according to the "maxMessagesPerTask" setting) will each wait for an incoming
383: * message (according to the "receiveTimeout" setting). If all of those receive
384: * attempts in a given task return without a message, the task is considered
385: * idle with respect to received messages. Such a task may still be rescheduled;
386: * however, once it reached the specified "idleTaskExecutionLimit", it will
387: * shut down (in case of dynamic scaling).
388: * <p>Raise this limit if you encounter too frequent scaling up and down.
389: * With this limit being higher, an idle consumer will be kept around longer,
390: * avoiding the restart of a consumer once a new load of messages comes in.
391: * Alternatively, specify a higher "maxMessagePerTask" and/or "receiveTimeout" value,
392: * which will also lead to idle consumers being kept around for a longer time
393: * (while also increasing the average execution time of each scheduled task).
394: * <p><b>This setting can be modified at runtime, for example through JMX.</b>
395: * @see #setMaxMessagesPerTask
396: * @see #setReceiveTimeout
397: */
398: public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
399: Assert.isTrue(idleTaskExecutionLimit > 0,
400: "'idleTaskExecutionLimit' must be 1 or higher");
401: synchronized (this .activeInvokerMonitor) {
402: this .idleTaskExecutionLimit = idleTaskExecutionLimit;
403: }
404: }
405:
406: /**
407: * Return the limit for idle executions of a receive task.
408: */
409: public int getIdleTaskExecutionLimit() {
410: synchronized (this .activeInvokerMonitor) {
411: return this .idleTaskExecutionLimit;
412: }
413: }
414:
415: protected void validateConfiguration() {
416: super .validateConfiguration();
417: synchronized (this .activeInvokerMonitor) {
418: if (isSubscriptionDurable()
419: && this .concurrentConsumers != 1) {
420: throw new IllegalArgumentException(
421: "Only 1 concurrent consumer supported for durable subscription");
422: }
423: }
424: }
425:
426: //-------------------------------------------------------------------------
427: // Implementation of AbstractMessageListenerContainer's template methods
428: //-------------------------------------------------------------------------
429:
430: public void initialize() {
431: // Adapt default cache level.
432: if (getTransactionManager() != null) {
433: if (this .cacheLevel == null) {
434: this .cacheLevel = new Integer(CACHE_NONE);
435: }
436: } else {
437: if (this .cacheLevel == null) {
438: this .cacheLevel = new Integer(CACHE_CONSUMER);
439: }
440: }
441:
442: // Prepare taskExecutor and maxMessagesPerTask.
443: synchronized (this .activeInvokerMonitor) {
444: if (this .taskExecutor == null) {
445: this .taskExecutor = createDefaultTaskExecutor();
446: } else if (this .taskExecutor instanceof SchedulingTaskExecutor
447: && ((SchedulingTaskExecutor) this .taskExecutor)
448: .prefersShortLivedTasks()
449: && this .maxMessagesPerTask == Integer.MIN_VALUE) {
450: // TaskExecutor indicated a preference for short-lived tasks. According to
451: // setMaxMessagesPerTask javadoc, we'll use 1 message per task in this case
452: // unless the user specified a custom value.
453: this .maxMessagesPerTask = 1;
454: }
455: }
456:
457: // Proceed with actual listener initialization.
458: super .initialize();
459: }
460:
461: /**
462: * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
463: * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
464: * with the specified bean name (or the class name, if no bean name specified) as thread name prefix.
465: * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
466: */
467: protected TaskExecutor createDefaultTaskExecutor() {
468: String beanName = getBeanName();
469: String threadNamePrefix = (beanName != null ? beanName + "-"
470: : DEFAULT_THREAD_NAME_PREFIX);
471: return new SimpleAsyncTaskExecutor(threadNamePrefix);
472: }
473:
474: /**
475: * Use a shared JMS Connection depending on the "cacheLevel" setting.
476: * @see #setCacheLevel
477: * @see #CACHE_CONNECTION
478: */
479: protected final boolean sharedConnectionEnabled() {
480: return (getCacheLevel() >= CACHE_CONNECTION);
481: }
482:
483: /**
484: * Creates the specified number of concurrent consumers,
485: * in the form of a JMS Session plus associated MessageConsumer
486: * running in a separate thread.
487: * @see #scheduleNewInvoker
488: * @see #setTaskExecutor
489: */
490: protected void doInitialize() throws JMSException {
491: synchronized (this .activeInvokerMonitor) {
492: for (int i = 0; i < this .concurrentConsumers; i++) {
493: scheduleNewInvoker();
494: }
495: }
496: }
497:
498: /**
499: * Re-executes the given task via this listener container's TaskExecutor.
500: * @see #setTaskExecutor
501: */
502: protected void doRescheduleTask(Object task) {
503: this .taskExecutor.execute((Runnable) task);
504: }
505:
506: protected void messageReceived(Message message, Session session) {
507: scheduleNewInvokerIfAppropriate();
508: }
509:
510: /**
511: * Schedule a new invoker, increasing the total number of scheduled
512: * invokers for this listener container, but only if the specified
513: * "maxConcurrentConsumers" limit has not been reached yet, and only
514: * if this listener container does not currently have idle invokers
515: * that are waiting for new messages already.
516: * <p>Called once a message has been received, to scale up while
517: * processing the message in the invoker that originally received it.
518: * @see #setTaskExecutor
519: * @see #getMaxConcurrentConsumers()
520: */
521: protected void scheduleNewInvokerIfAppropriate() {
522: if (isRunning()) {
523: synchronized (this .activeInvokerMonitor) {
524: if (this .scheduledInvokers.size() < this .maxConcurrentConsumers
525: && !hasIdleInvokers()) {
526: scheduleNewInvoker();
527: if (logger.isDebugEnabled()) {
528: logger.debug("Raised scheduled invoker count: "
529: + scheduledInvokers.size());
530: }
531: }
532: }
533: }
534: }
535:
536: /**
537: * Schedule a new invoker, increasing the total number of scheduled
538: * invokers for this listener container.
539: */
540: private void scheduleNewInvoker() {
541: AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
542: if (rescheduleTaskIfNecessary(invoker)) {
543: // This should always be true, since we're only calling this when active.
544: this .scheduledInvokers.add(invoker);
545: }
546: }
547:
548: /**
549: * Determine whether this listener container currently has any
550: * idle instances among its scheduled invokers.
551: */
552: private boolean hasIdleInvokers() {
553: for (Iterator it = this .scheduledInvokers.iterator(); it
554: .hasNext();) {
555: AsyncMessageListenerInvoker invoker = (AsyncMessageListenerInvoker) it
556: .next();
557: if (invoker.isIdle()) {
558: return true;
559: }
560: }
561: return false;
562: }
563:
564: /**
565: * Determine whether the current invoker should be rescheduled,
566: * given that it might not have received a message in a while.
567: * @param idleTaskExecutionCount the number of idle executions
568: * that this invoker task has already accumulated (in a row)
569: */
570: private boolean shouldRescheduleInvoker(int idleTaskExecutionCount) {
571: synchronized (this .activeInvokerMonitor) {
572: boolean idle = (idleTaskExecutionCount >= this .idleTaskExecutionLimit);
573: return (this .scheduledInvokers.size() <= (idle ? this .concurrentConsumers
574: : this .maxConcurrentConsumers));
575: }
576: }
577:
578: /**
579: * Return the number of currently scheduled consumers.
580: * <p>This number will always be inbetween "concurrentConsumers" and
581: * "maxConcurrentConsumers", but might be higher than "activeConsumerCount"
582: * (in case of some consumers being scheduled but not executed at the moment).
583: * @see #getConcurrentConsumers()
584: * @see #getMaxConcurrentConsumers()
585: * @see #getActiveConsumerCount()
586: */
587: public final int getScheduledConsumerCount() {
588: synchronized (this .activeInvokerMonitor) {
589: return this .scheduledInvokers.size();
590: }
591: }
592:
593: /**
594: * Return the number of currently active consumers.
595: * <p>This number will always be inbetween "concurrentConsumers" and
596: * "maxConcurrentConsumers", but might be lower than "scheduledConsumerCount".
597: * (in case of some consumers being scheduled but not executed at the moment).
598: * @see #getConcurrentConsumers()
599: * @see #getMaxConcurrentConsumers()
600: * @see #getActiveConsumerCount()
601: */
602: public final int getActiveConsumerCount() {
603: synchronized (this .activeInvokerMonitor) {
604: return this .activeInvokerCount;
605: }
606: }
607:
608: /**
609: * Overridden to accept a failure in the initial setup - leaving it up to the
610: * asynchronous invokers to establish the shared Connection on first access.
611: * @see #refreshConnectionUntilSuccessful()
612: */
613: protected void establishSharedConnection() {
614: try {
615: super .establishSharedConnection();
616: } catch (JMSException ex) {
617: logger
618: .debug(
619: "Could not establish shared JMS Connection - "
620: + "leaving it up to asynchronous invokers to establish a Connection as soon as possible",
621: ex);
622: }
623: }
624:
625: /**
626: * This implementations proceeds even after an exception thrown from
627: * <code>Connection.start()</code>, relying on listeners to perform
628: * appropriate recovery.
629: */
630: protected void startSharedConnection() {
631: try {
632: super .startSharedConnection();
633: } catch (JMSException ex) {
634: logger
635: .debug(
636: "Connection start failed - relying on listeners to perform recovery",
637: ex);
638: }
639: }
640:
641: /**
642: * This implementations proceeds even after an exception thrown from
643: * <code>Connection.stop()</code>, relying on listeners to perform
644: * appropriate recovery after a restart.
645: */
646: protected void stopSharedConnection() {
647: try {
648: super .stopSharedConnection();
649: } catch (JMSException ex) {
650: logger
651: .debug(
652: "Connection stop failed - relying on listeners to perform recovery after restart",
653: ex);
654: }
655: }
656:
657: /**
658: * Handle the given exception that arose during setup of a listener.
659: * Called for every such exception in every concurrent listener.
660: * <p>The default implementation logs the exception at error level
661: * if not recovered yet, and at debug level if already recovered.
662: * Can be overridden in subclasses.
663: * @param ex the exception to handle
664: * @param alreadyRecovered whether a previously executing listener
665: * already recovered from the present listener setup failure
666: * (this usually indicates a follow-up failure than be ignored
667: * other than for debug log purposes)
668: * @see #recoverAfterListenerSetupFailure()
669: */
670: protected void handleListenerSetupFailure(Throwable ex,
671: boolean alreadyRecovered) {
672: if (ex instanceof JMSException) {
673: invokeExceptionListener((JMSException) ex);
674: }
675: if (ex instanceof SharedConnectionNotInitializedException) {
676: if (!alreadyRecovered) {
677: logger
678: .debug("JMS message listener invoker needs to establish shared Connection");
679: }
680: } else {
681: if (alreadyRecovered) {
682: logger
683: .debug(
684: "Setup of JMS message listener invoker failed - already recovered by other invoker",
685: ex);
686: } else {
687: logger
688: .error(
689: "Setup of JMS message listener invoker failed - trying to recover",
690: ex);
691: }
692: }
693: }
694:
695: /**
696: * Recover this listener container after a listener failed to set itself up,
697: * for example reestablishing the underlying Connection.
698: * <p>The default implementation delegates to DefaultMessageListenerContainer's
699: * recovery-capable {@link #refreshConnectionUntilSuccessful()} method, which will
700: * try to re-establish a Connection to the JMS provider both for the shared
701: * and the non-shared Connection case.
702: * @see #refreshConnectionUntilSuccessful()
703: * @see #refreshDestination()
704: */
705: protected void recoverAfterListenerSetupFailure() {
706: refreshConnectionUntilSuccessful();
707: refreshDestination();
708: }
709:
710: /**
711: * Refresh the underlying Connection, not returning before an attempt has been
712: * successful. Called in case of a shared Connection as well as without shared
713: * Connection, so either needs to operate on the shared Connection or on a
714: * temporary Connection that just gets established for validation purposes.
715: * <p>The default implementation retries until it successfully established a
716: * Connection, for as long as this message listener container is active.
717: * Applies the specified recovery interval between retries.
718: * @see #setRecoveryInterval
719: */
720: protected void refreshConnectionUntilSuccessful() {
721: while (isRunning()) {
722: try {
723: if (sharedConnectionEnabled()) {
724: refreshSharedConnection();
725: startSharedConnection();
726: } else {
727: Connection con = createConnection();
728: JmsUtils.closeConnection(con);
729: }
730: logger.info("Successfully refreshed JMS Connection");
731: break;
732: } catch (Exception ex) {
733: if (logger.isInfoEnabled()) {
734: logger
735: .info(
736: "Could not refresh JMS Connection - retrying in "
737: + this .recoveryInterval
738: + " ms", ex);
739: }
740: }
741: sleepInbetweenRecoveryAttempts();
742: }
743: }
744:
745: /**
746: * Refresh the JMS destination that this listener container operates on.
747: * <p>Called after listener setup failure, assuming that a cached Destination
748: * object might have become invalid (a typical case on WebLogic JMS).
749: * <p>The default implementation removes the destination from a
750: * DestinationResolver's cache, in case of a CachingDestinationResolver.
751: * @see #setDestinationName
752: * @see org.springframework.jms.support.destination.CachingDestinationResolver
753: */
754: protected void refreshDestination() {
755: String destName = getDestinationName();
756: if (destName != null) {
757: DestinationResolver destResolver = getDestinationResolver();
758: if (destResolver instanceof CachingDestinationResolver) {
759: ((CachingDestinationResolver) destResolver)
760: .removeFromCache(destName);
761: }
762: }
763: }
764:
765: /**
766: * Sleep according to the specified recovery interval.
767: * Called inbetween recovery attempts.
768: */
769: protected void sleepInbetweenRecoveryAttempts() {
770: if (this .recoveryInterval > 0) {
771: try {
772: Thread.sleep(this .recoveryInterval);
773: } catch (InterruptedException interEx) {
774: // Re-interrupt current thread, to allow other threads to react.
775: Thread.currentThread().interrupt();
776: }
777: }
778: }
779:
780: /**
781: * Destroy the registered JMS Sessions and associated MessageConsumers.
782: */
783: protected void doShutdown() throws JMSException {
784: logger
785: .debug("Waiting for shutdown of message listener invokers");
786: synchronized (this .activeInvokerMonitor) {
787: while (this .activeInvokerCount > 0) {
788: if (logger.isDebugEnabled()) {
789: logger.debug("Still waiting for shutdown of "
790: + this .activeInvokerCount
791: + " message listener invokers");
792: }
793: try {
794: this .activeInvokerMonitor.wait();
795: } catch (InterruptedException interEx) {
796: // Re-interrupt current thread, to allow other threads to react.
797: Thread.currentThread().interrupt();
798: }
799: }
800: }
801: }
802:
803: //-------------------------------------------------------------------------
804: // Inner classes used as internal adapters
805: //-------------------------------------------------------------------------
806:
807: /**
808: * Runnable that performs looped <code>MessageConsumer.receive()</code> calls.
809: */
810: private class AsyncMessageListenerInvoker implements
811: SchedulingAwareRunnable {
812:
813: private Session session;
814:
815: private MessageConsumer consumer;
816:
817: private Object lastRecoveryMarker;
818:
819: private boolean lastMessageSucceeded;
820:
821: private int idleTaskExecutionCount = 0;
822:
823: private volatile boolean idle = true;
824:
825: public void run() {
826: synchronized (activeInvokerMonitor) {
827: activeInvokerCount++;
828: activeInvokerMonitor.notifyAll();
829: }
830: boolean messageReceived = false;
831: try {
832: if (maxMessagesPerTask < 0) {
833: while (isActive()) {
834: waitWhileNotRunning();
835: if (isActive()) {
836: messageReceived = invokeListener();
837: }
838: }
839: } else {
840: int messageCount = 0;
841: while (isRunning()
842: && messageCount < maxMessagesPerTask) {
843: messageReceived = (invokeListener() || messageReceived);
844: messageCount++;
845: }
846: }
847: } catch (Throwable ex) {
848: clearResources();
849: if (!this .lastMessageSucceeded) {
850: // We failed more than once in a row - sleep for recovery interval
851: // even before first recovery attempt.
852: sleepInbetweenRecoveryAttempts();
853: }
854: this .lastMessageSucceeded = false;
855: boolean alreadyRecovered = false;
856: synchronized (recoveryMonitor) {
857: if (this .lastRecoveryMarker == currentRecoveryMarker) {
858: handleListenerSetupFailure(ex, false);
859: recoverAfterListenerSetupFailure();
860: currentRecoveryMarker = new Object();
861: } else {
862: alreadyRecovered = true;
863: }
864: }
865: if (alreadyRecovered) {
866: handleListenerSetupFailure(ex, true);
867: }
868: }
869: synchronized (activeInvokerMonitor) {
870: activeInvokerCount--;
871: activeInvokerMonitor.notifyAll();
872: }
873: if (!messageReceived) {
874: this .idleTaskExecutionCount++;
875: } else {
876: this .idleTaskExecutionCount = 0;
877: }
878: if (!shouldRescheduleInvoker(this .idleTaskExecutionCount)
879: || !rescheduleTaskIfNecessary(this )) {
880: // We're shutting down completely.
881: synchronized (activeInvokerMonitor) {
882: scheduledInvokers.remove(this );
883: if (logger.isDebugEnabled()) {
884: logger
885: .debug("Lowered scheduled invoker count: "
886: + scheduledInvokers.size());
887: }
888: activeInvokerMonitor.notifyAll();
889: }
890: clearResources();
891: }
892: }
893:
894: private boolean invokeListener() throws JMSException {
895: initResourcesIfNecessary();
896: boolean messageReceived = receiveAndExecute(this .session,
897: this .consumer);
898: this .lastMessageSucceeded = true;
899: this .idle = !messageReceived;
900: return messageReceived;
901: }
902:
903: private void initResourcesIfNecessary() throws JMSException {
904: if (getCacheLevel() <= CACHE_CONNECTION) {
905: updateRecoveryMarker();
906: } else {
907: if (this .session == null
908: && getCacheLevel() >= CACHE_SESSION) {
909: updateRecoveryMarker();
910: this .session = createSession(getSharedConnection());
911: }
912: if (this .consumer == null
913: && getCacheLevel() >= CACHE_CONSUMER) {
914: this .consumer = createListenerConsumer(this .session);
915: }
916: }
917: }
918:
919: private void updateRecoveryMarker() {
920: synchronized (recoveryMonitor) {
921: this .lastRecoveryMarker = currentRecoveryMarker;
922: }
923: }
924:
925: private void clearResources() {
926: JmsUtils.closeMessageConsumer(this .consumer);
927: JmsUtils.closeSession(this .session);
928: this .consumer = null;
929: this .session = null;
930: }
931:
932: public boolean isLongLived() {
933: return (maxMessagesPerTask < 0);
934: }
935:
936: public boolean isIdle() {
937: return this.idle;
938: }
939: }
940:
941: }
|