Source Code Cross Referenced for DefaultMessageListenerContainer.java in  » J2EE » spring-framework-2.0.6 » org » springframework » jms » listener » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » J2EE » spring framework 2.0.6 » org.springframework.jms.listener 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Copyright 2002-2007 the original author or authors.
003:         *
004:         * Licensed under the Apache License, Version 2.0 (the "License");
005:         * you may not use this file except in compliance with the License.
006:         * You may obtain a copy of the License at
007:         *
008:         *      http://www.apache.org/licenses/LICENSE-2.0
009:         *
010:         * Unless required by applicable law or agreed to in writing, software
011:         * distributed under the License is distributed on an "AS IS" BASIS,
012:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013:         * See the License for the specific language governing permissions and
014:         * limitations under the License.
015:         */
016:
017:        package org.springframework.jms.listener;
018:
019:        import java.util.HashSet;
020:        import java.util.Iterator;
021:        import java.util.Set;
022:
023:        import javax.jms.Connection;
024:        import javax.jms.JMSException;
025:        import javax.jms.Message;
026:        import javax.jms.MessageConsumer;
027:        import javax.jms.Session;
028:
029:        import org.springframework.core.Constants;
030:        import org.springframework.core.task.SimpleAsyncTaskExecutor;
031:        import org.springframework.core.task.TaskExecutor;
032:        import org.springframework.jms.support.JmsUtils;
033:        import org.springframework.jms.support.destination.CachingDestinationResolver;
034:        import org.springframework.jms.support.destination.DestinationResolver;
035:        import org.springframework.scheduling.SchedulingAwareRunnable;
036:        import org.springframework.scheduling.SchedulingTaskExecutor;
037:        import org.springframework.util.Assert;
038:        import org.springframework.util.ClassUtils;
039:
040:        /**
041:         * Message listener container variant that uses plain JMS client API, specifically
042:         * a loop of <code>MessageConsumer.receive()</code> calls that also allow for
043:         * transactional reception of messages (registering them with XA transactions).
044:         * Designed to work in a native JMS environment as well as in a J2EE environment,
045:         * with only minimal differences in configuration.
046:         *
047:         * <p>This is a simple but nevertheless powerful form of message listener container.
048:         * On startup, it obtains a fixed number of JMS Sessions to invoke the listener,
049:         * and optionally allows for dynamic adaptation at runtime (up until a maximum number).
050:         * Like {@link SimpleMessageListenerContainer}, its main advantage is its low level
051:         * of runtime complexity, in particular the minimal requirements on the JMS provider:
052:         * Not even the JMS ServerSessionPool facility is required. Beyond that, it is
053:         * fully self-recovering in case of the broker being temporarily unavailable,
054:         * and allows for stops/restarts as well as runtime changes to its configuration.
055:         *
056:         * <p>Actual MessageListener execution happens in asynchronous work units which are
057:         * created through Spring's {@link org.springframework.core.task.TaskExecutor}
058:         * abstraction. By default, the specified number of invoker tasks will be created
059:         * on startup, according to the {@link #setConcurrentConsumers "concurrentConsumers"}
060:         * setting. Specify an alternative TaskExecutor to integrate with an existing
061:         * thread pool facility (such as a J2EE server's), for example using a
062:         * {@link org.springframework.scheduling.commonj.WorkManagerTaskExecutor CommonJ WorkManager}.
063:         * With a native JMS setup, each of those listener threads is going to use a
064:         * cached JMS Session and MessageConsumer (only refreshed in case of failure),
065:         * using the JMS provider's resources as efficiently as possible.
066:         *
067:         * <p>Message reception and listener execution can automatically be wrapped
068:         * in transactions through passing a Spring
069:         * {@link org.springframework.transaction.PlatformTransactionManager} into the
070:         * {@link #setTransactionManager "transactionManager"} property. This will usually
071:         * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
072:         * J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory obtained
073:         * from JNDI (check your J2EE server's documentation). Note that this listener
074:         * container will automatically reobtain all JMS handles for each transaction
075:         * in case of an external transaction manager specified, for compatibility with
076:         * all J2EE servers (in particular JBoss). This non-caching behavior can be
077:         * overridden through the {@link #setCacheLevel "cacheLevel"} /
078:         * {@link #setCacheLevelName "cacheLevelName"} property, enforcing caching
079:         * of the Connection (or also Session and MessageConsumer) even in case of
080:         * an external transaction manager being involved.
081:         *
082:         * <p>Dynamic scaling of the number of concurrent invokers can be activated
083:         * through specifying a {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"}
084:         * value that is higher than the {@link #setConcurrentConsumers "concurrentConsumers"}
085:         * value. Since the latter's default is 1, you can also simply specify a
086:         * "maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling up to
087:         * 5 concurrent consumers in case of increasing message load, as well as dynamic
088:         * shrinking back to the standard number of consumers once the load decreases.
089:         * Consider adapting the {@link #setIdleTaskExecutionLimit "idleTaskExecutionLimit"}
090:         * setting to control the lifespan of each new task, to avoid frequent scaling up
091:         * and down, in particular if the ConnectionFactory does not pool JMS Sessions
092:         * and/or the TaskExecutor does not pool threads (check your configuration!).
093:         * Note that dynamic scaling only really makes sense for a queue in the first
094:         * place; for a topic, you will typically stick with the default number of 1
095:         * consumer, else you'd receive the same message multiple times on the same node.
096:         *
097:         * <p><b>It is strongly recommended to either set {@link #setSessionTransacted
098:         * "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
099:         * "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
100:         * javadoc for details on acknowledge modes and native transaction options,
101:         * as well as the {@link AbstractPollingMessageListenerContainer} javadoc
102:         * for details on configuring an external transaction manager.
103:         *
104:         * <p>This class requires a JMS 1.1+ provider, because it builds on the
105:         * domain-independent API. <b>Use the {@link DefaultMessageListenerContainer102}
106:         * subclass for JMS 1.0.2 providers.</b>
107:         *
108:         * @author Juergen Hoeller
109:         * @since 2.0
110:         * @see #setTransactionManager
111:         * @see #setCacheLevel
112:         * @see javax.jms.MessageConsumer#receive(long)
113:         * @see SimpleMessageListenerContainer
114:         * @see org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer
115:         * @see DefaultMessageListenerContainer102
116:         */
117:        public class DefaultMessageListenerContainer extends
118:                AbstractPollingMessageListenerContainer {
119:
120:            /**
121:             * Default thread name prefix: "DefaultMessageListenerContainer-".
122:             */
123:            public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils
124:                    .getShortName(DefaultMessageListenerContainer.class)
125:                    + "-";
126:
127:            /**
128:             * The default recovery interval: 5000 ms = 5 seconds.
129:             */
130:            public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
131:
132:            /**
133:             * Constant that indicates to cache no JMS resources at all.
134:             * @see #setCacheLevel
135:             */
136:            public static final int CACHE_NONE = 0;
137:
138:            /**
139:             * Constant that indicates to cache a shared JMS Connection.
140:             * @see #setCacheLevel
141:             */
142:            public static final int CACHE_CONNECTION = 1;
143:
144:            /**
145:             * Constant that indicates to cache a shared JMS Connection
146:             * and a JMS Session for each listener thread.
147:             * @see #setCacheLevel
148:             */
149:            public static final int CACHE_SESSION = 2;
150:
151:            /**
152:             * Constant that indicates to cache a shared JMS Connection
153:             * and a JMS Session for each listener thread, as well as
154:             * a JMS MessageConsumer for each listener thread.
155:             * @see #setCacheLevel
156:             */
157:            public static final int CACHE_CONSUMER = 3;
158:
159:            private static final Constants constants = new Constants(
160:                    DefaultMessageListenerContainer.class);
161:
162:            private TaskExecutor taskExecutor;
163:
164:            private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
165:
166:            private Integer cacheLevel;
167:
168:            private int concurrentConsumers = 1;
169:
170:            private int maxConcurrentConsumers = 1;
171:
172:            private int maxMessagesPerTask = Integer.MIN_VALUE;
173:
174:            private int idleTaskExecutionLimit = 1;
175:
176:            private final Set scheduledInvokers = new HashSet();
177:
178:            private int activeInvokerCount = 0;
179:
180:            private final Object activeInvokerMonitor = new Object();
181:
182:            private Object currentRecoveryMarker = new Object();
183:
184:            private final Object recoveryMonitor = new Object();
185:
186:            /**
187:             * Set the Spring TaskExecutor to use for running the listener threads.
188:             * <p>Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor},
189:             * starting up a number of new threads, according to the specified number
190:             * of concurrent consumers.
191:             * <p>Specify an alternative TaskExecutor for integration with an existing
192:             * thread pool. Note that this really only adds value if the threads are
193:             * managed in a specific fashion, for example within a J2EE environment.
194:             * A plain thread pool does not add much value, as this listener container
195:             * will occupy a number of threads for its entire lifetime.
196:             * @see #setConcurrentConsumers
197:             * @see org.springframework.core.task.SimpleAsyncTaskExecutor
198:             * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
199:             */
200:            public void setTaskExecutor(TaskExecutor taskExecutor) {
201:                this .taskExecutor = taskExecutor;
202:            }
203:
204:            /**
205:             * Specify the interval between recovery attempts, in <b>milliseconds</b>.
206:             * The default is 5000 ms, that is, 5 seconds.
207:             * @see #handleListenerSetupFailure
208:             */
209:            public void setRecoveryInterval(long recoveryInterval) {
210:                this .recoveryInterval = recoveryInterval;
211:            }
212:
213:            /**
214:             * Specify the level of caching that this listener container is allowed to apply,
215:             * in the form of the name of the corresponding constant: e.g. "CACHE_CONNECTION".
216:             * @see #setCacheLevel
217:             */
218:            public void setCacheLevelName(String constantName)
219:                    throws IllegalArgumentException {
220:                if (constantName == null || !constantName.startsWith("CACHE_")) {
221:                    throw new IllegalArgumentException(
222:                            "Only cache constants allowed");
223:                }
224:                setCacheLevel(constants.asNumber(constantName).intValue());
225:            }
226:
227:            /**
228:             * Specify the level of caching that this listener container is allowed to apply.
229:             * <p>Default is CACHE_NONE if an external transaction manager has been specified
230:             * (to reobtain all resources freshly within the scope of the external transaction),
231:             * and CACHE_CONSUMER else (operating with local JMS resources).
232:             * <p>Some J2EE servers only register their JMS resources with an ongoing XA
233:             * transaction in case of a freshly obtained JMS Connection and Session,
234:             * which is why this listener container does by default not cache any of those.
235:             * However, if you want to optimize for a specific server, consider switching
236:             * this setting to at least CACHE_CONNECTION or CACHE_SESSION even in
237:             * conjunction with an external transaction manager.
238:             * <p>Currently known servers that absolutely require CACHE_NONE for XA
239:             * transaction processing: JBoss 4. For any others, consider raising the
240:             * cache level.
241:             * @see #CACHE_NONE
242:             * @see #CACHE_CONNECTION
243:             * @see #CACHE_SESSION
244:             * @see #CACHE_CONSUMER
245:             * @see #setCacheLevelName
246:             * @see #setTransactionManager
247:             */
248:            public void setCacheLevel(int cacheLevel) {
249:                this .cacheLevel = new Integer(cacheLevel);
250:            }
251:
252:            /**
253:             * Return the level of caching that this listener container is allowed to apply.
254:             */
255:            public int getCacheLevel() {
256:                return (this .cacheLevel != null ? this .cacheLevel.intValue()
257:                        : CACHE_NONE);
258:            }
259:
260:            /**
261:             * Specify the number of concurrent consumers to create. Default is 1.
262:             * <p>Specifying a higher value for this setting will increase the standard
263:             * level of scheduled concurrent consumers at runtime: This is effectively
264:             * the minimum number of concurrent consumers which will be scheduled
265:             * at any given time. This is a static setting; for dynamic scaling,
266:             * consider specifying the "maxConcurrentConsumers" setting instead.
267:             * <p>Raising the number of concurrent consumers is recommendable in order
268:             * to scale the consumption of messages coming in from a queue. However,
269:             * note that any ordering guarantees are lost once multiple consumers are
270:             * registered. In general, stick with 1 consumer for low-volume queues.
271:             * <p><b>Do not raise the number of concurrent consumers for a topic.</b>
272:             * This would lead to concurrent consumption of the same message,
273:             * which is hardly ever desirable.
274:             * <p><b>This setting can be modified at runtime, for example through JMX.</b>
275:             * @see #setMaxConcurrentConsumers
276:             */
277:            public void setConcurrentConsumers(int concurrentConsumers) {
278:                Assert.isTrue(concurrentConsumers > 0,
279:                        "'concurrentConsumers' value must be at least 1 (one)");
280:                synchronized (this .activeInvokerMonitor) {
281:                    this .concurrentConsumers = concurrentConsumers;
282:                    if (this .maxConcurrentConsumers < concurrentConsumers) {
283:                        this .maxConcurrentConsumers = concurrentConsumers;
284:                    }
285:                }
286:            }
287:
288:            /**
289:             * Return the "concurrentConsumer" setting.
290:             * <p>This returns the currently configured "concurrentConsumers" value;
291:             * the number of currently scheduled/active consumers might differ.
292:             * @see #getScheduledConsumerCount()
293:             * @see #getActiveConsumerCount()
294:             */
295:            public final int getConcurrentConsumers() {
296:                synchronized (this .activeInvokerMonitor) {
297:                    return this .concurrentConsumers;
298:                }
299:            }
300:
301:            /**
302:             * Specify the maximum number of concurrent consumers to create. Default is 1.
303:             * <p>If this setting is higher than "concurrentConsumers", the listener container
304:             * will dynamically schedule new consumers at runtime, provided that enough
305:             * incoming messages are encountered. Once the load goes down again, the number of
306:             * consumers will be reduced to the standard level ("concurrentConsumers") again.
307:             * <p>Raising the number of concurrent consumers is recommendable in order
308:             * to scale the consumption of messages coming in from a queue. However,
309:             * note that any ordering guarantees are lost once multiple consumers are
310:             * registered. In general, stick with 1 consumer for low-volume queues.
311:             * <p><b>Do not raise the number of concurrent consumers for a topic.</b>
312:             * This would lead to concurrent consumption of the same message,
313:             * which is hardly ever desirable.
314:             * <p><b>This setting can be modified at runtime, for example through JMX.</b>
315:             * @see #setConcurrentConsumers
316:             */
317:            public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
318:                Assert
319:                        .isTrue(maxConcurrentConsumers > 0,
320:                                "'maxConcurrentConsumers' value must be at least 1 (one)");
321:                synchronized (this .activeInvokerMonitor) {
322:                    this .maxConcurrentConsumers = (maxConcurrentConsumers > this .concurrentConsumers ? maxConcurrentConsumers
323:                            : this .concurrentConsumers);
324:                }
325:            }
326:
327:            /**
328:             * Return the "maxConcurrentConsumer" setting.
329:             * <p>This returns the currently configured "maxConcurrentConsumers" value;
330:             * the number of currently scheduled/active consumers might differ.
331:             * @see #getScheduledConsumerCount()
332:             * @see #getActiveConsumerCount()
333:             */
334:            public final int getMaxConcurrentConsumers() {
335:                synchronized (this .activeInvokerMonitor) {
336:                    return this .maxConcurrentConsumers;
337:                }
338:            }
339:
340:            /**
341:             * Specify the maximum number of messages to process in one task.
342:             * More concretely, this limits the number of message reception attempts per
343:             * task, which includes receive iterations that did not actually pick up a
344:             * message until they hit their timeout (see "receiveTimeout" property).
345:             * <p>Default is unlimited (-1) in case of a standard TaskExecutor,
346:             * and 1 in case of a SchedulingTaskExecutor that indicates a preference for
347:             * short-lived tasks. Specify a number of 10 to 100 messages to balance
348:             * between extremely long-lived and extremely short-lived tasks here.
349:             * <p>Long-lived tasks avoid frequent thread context switches through
350:             * sticking with the same thread all the way through, while short-lived
351:             * tasks allow thread pools to control the scheduling. Hence, thread
352:             * pools will usually prefer short-lived tasks.
353:             * <p><b>This setting can be modified at runtime, for example through JMX.</b>
354:             * @see #setTaskExecutor
355:             * @see #setReceiveTimeout
356:             * @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
357:             */
358:            public void setMaxMessagesPerTask(int maxMessagesPerTask) {
359:                Assert.isTrue(maxMessagesPerTask != 0,
360:                        "'maxMessagesPerTask' must not be 0");
361:                synchronized (this .activeInvokerMonitor) {
362:                    this .maxMessagesPerTask = maxMessagesPerTask;
363:                }
364:            }
365:
366:            /**
367:             * Return the maximum number of messages to process in one task.
368:             */
369:            public int getMaxMessagesPerTask() {
370:                synchronized (this .activeInvokerMonitor) {
371:                    return this .maxMessagesPerTask;
372:                }
373:            }
374:
375:            /**
376:             * Specify the limit for idle executions of a receive task, not having
377:             * received any message within its execution. If this limit is reached,
378:             * the task will shut down and leave receiving to other executing tasks
379:             * (in case of dynamic scheduling; see the "maxConcurrentConsumers" setting).
380:             * Default is 1.
381:             * <p>Within each task execution, a number of message reception attempts
382:             * (according to the "maxMessagesPerTask" setting) will each wait for an incoming
383:             * message (according to the "receiveTimeout" setting). If all of those receive
384:             * attempts in a given task return without a message, the task is considered
385:             * idle with respect to received messages. Such a task may still be rescheduled;
386:             * however, once it reached the specified "idleTaskExecutionLimit", it will
387:             * shut down (in case of dynamic scaling).
388:             * <p>Raise this limit if you encounter too frequent scaling up and down.
389:             * With this limit being higher, an idle consumer will be kept around longer,
390:             * avoiding the restart of a consumer once a new load of messages comes in.
391:             * Alternatively, specify a higher "maxMessagePerTask" and/or "receiveTimeout" value,
392:             * which will also lead to idle consumers being kept around for a longer time
393:             * (while also increasing the average execution time of each scheduled task).
394:             * <p><b>This setting can be modified at runtime, for example through JMX.</b>
395:             * @see #setMaxMessagesPerTask
396:             * @see #setReceiveTimeout
397:             */
398:            public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
399:                Assert.isTrue(idleTaskExecutionLimit > 0,
400:                        "'idleTaskExecutionLimit' must be 1 or higher");
401:                synchronized (this .activeInvokerMonitor) {
402:                    this .idleTaskExecutionLimit = idleTaskExecutionLimit;
403:                }
404:            }
405:
406:            /**
407:             * Return the limit for idle executions of a receive task.
408:             */
409:            public int getIdleTaskExecutionLimit() {
410:                synchronized (this .activeInvokerMonitor) {
411:                    return this .idleTaskExecutionLimit;
412:                }
413:            }
414:
415:            protected void validateConfiguration() {
416:                super .validateConfiguration();
417:                synchronized (this .activeInvokerMonitor) {
418:                    if (isSubscriptionDurable()
419:                            && this .concurrentConsumers != 1) {
420:                        throw new IllegalArgumentException(
421:                                "Only 1 concurrent consumer supported for durable subscription");
422:                    }
423:                }
424:            }
425:
426:            //-------------------------------------------------------------------------
427:            // Implementation of AbstractMessageListenerContainer's template methods
428:            //-------------------------------------------------------------------------
429:
430:            public void initialize() {
431:                // Adapt default cache level.
432:                if (getTransactionManager() != null) {
433:                    if (this .cacheLevel == null) {
434:                        this .cacheLevel = new Integer(CACHE_NONE);
435:                    }
436:                } else {
437:                    if (this .cacheLevel == null) {
438:                        this .cacheLevel = new Integer(CACHE_CONSUMER);
439:                    }
440:                }
441:
442:                // Prepare taskExecutor and maxMessagesPerTask.
443:                synchronized (this .activeInvokerMonitor) {
444:                    if (this .taskExecutor == null) {
445:                        this .taskExecutor = createDefaultTaskExecutor();
446:                    } else if (this .taskExecutor instanceof  SchedulingTaskExecutor
447:                            && ((SchedulingTaskExecutor) this .taskExecutor)
448:                                    .prefersShortLivedTasks()
449:                            && this .maxMessagesPerTask == Integer.MIN_VALUE) {
450:                        // TaskExecutor indicated a preference for short-lived tasks. According to
451:                        // setMaxMessagesPerTask javadoc, we'll use 1 message per task in this case
452:                        // unless the user specified a custom value.
453:                        this .maxMessagesPerTask = 1;
454:                    }
455:                }
456:
457:                // Proceed with actual listener initialization.
458:                super .initialize();
459:            }
460:
461:            /**
462:             * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
463:             * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
464:             * with the specified bean name (or the class name, if no bean name specified) as thread name prefix.
465:             * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
466:             */
467:            protected TaskExecutor createDefaultTaskExecutor() {
468:                String beanName = getBeanName();
469:                String threadNamePrefix = (beanName != null ? beanName + "-"
470:                        : DEFAULT_THREAD_NAME_PREFIX);
471:                return new SimpleAsyncTaskExecutor(threadNamePrefix);
472:            }
473:
474:            /**
475:             * Use a shared JMS Connection depending on the "cacheLevel" setting.
476:             * @see #setCacheLevel
477:             * @see #CACHE_CONNECTION
478:             */
479:            protected final boolean sharedConnectionEnabled() {
480:                return (getCacheLevel() >= CACHE_CONNECTION);
481:            }
482:
483:            /**
484:             * Creates the specified number of concurrent consumers,
485:             * in the form of a JMS Session plus associated MessageConsumer
486:             * running in a separate thread.
487:             * @see #scheduleNewInvoker
488:             * @see #setTaskExecutor
489:             */
490:            protected void doInitialize() throws JMSException {
491:                synchronized (this .activeInvokerMonitor) {
492:                    for (int i = 0; i < this .concurrentConsumers; i++) {
493:                        scheduleNewInvoker();
494:                    }
495:                }
496:            }
497:
498:            /**
499:             * Re-executes the given task via this listener container's TaskExecutor.
500:             * @see #setTaskExecutor
501:             */
502:            protected void doRescheduleTask(Object task) {
503:                this .taskExecutor.execute((Runnable) task);
504:            }
505:
506:            protected void messageReceived(Message message, Session session) {
507:                scheduleNewInvokerIfAppropriate();
508:            }
509:
510:            /**
511:             * Schedule a new invoker, increasing the total number of scheduled
512:             * invokers for this listener container, but only if the specified
513:             * "maxConcurrentConsumers" limit has not been reached yet, and only
514:             * if this listener container does not currently have idle invokers
515:             * that are waiting for new messages already.
516:             * <p>Called once a message has been received, to scale up while
517:             * processing the message in the invoker that originally received it.
518:             * @see #setTaskExecutor
519:             * @see #getMaxConcurrentConsumers()
520:             */
521:            protected void scheduleNewInvokerIfAppropriate() {
522:                if (isRunning()) {
523:                    synchronized (this .activeInvokerMonitor) {
524:                        if (this .scheduledInvokers.size() < this .maxConcurrentConsumers
525:                                && !hasIdleInvokers()) {
526:                            scheduleNewInvoker();
527:                            if (logger.isDebugEnabled()) {
528:                                logger.debug("Raised scheduled invoker count: "
529:                                        + scheduledInvokers.size());
530:                            }
531:                        }
532:                    }
533:                }
534:            }
535:
536:            /**
537:             * Schedule a new invoker, increasing the total number of scheduled
538:             * invokers for this listener container.
539:             */
540:            private void scheduleNewInvoker() {
541:                AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
542:                if (rescheduleTaskIfNecessary(invoker)) {
543:                    // This should always be true, since we're only calling this when active.
544:                    this .scheduledInvokers.add(invoker);
545:                }
546:            }
547:
548:            /**
549:             * Determine whether this listener container currently has any
550:             * idle instances among its scheduled invokers.
551:             */
552:            private boolean hasIdleInvokers() {
553:                for (Iterator it = this .scheduledInvokers.iterator(); it
554:                        .hasNext();) {
555:                    AsyncMessageListenerInvoker invoker = (AsyncMessageListenerInvoker) it
556:                            .next();
557:                    if (invoker.isIdle()) {
558:                        return true;
559:                    }
560:                }
561:                return false;
562:            }
563:
564:            /**
565:             * Determine whether the current invoker should be rescheduled,
566:             * given that it might not have received a message in a while.
567:             * @param idleTaskExecutionCount the number of idle executions
568:             * that this invoker task has already accumulated (in a row)
569:             */
570:            private boolean shouldRescheduleInvoker(int idleTaskExecutionCount) {
571:                synchronized (this .activeInvokerMonitor) {
572:                    boolean idle = (idleTaskExecutionCount >= this .idleTaskExecutionLimit);
573:                    return (this .scheduledInvokers.size() <= (idle ? this .concurrentConsumers
574:                            : this .maxConcurrentConsumers));
575:                }
576:            }
577:
578:            /**
579:             * Return the number of currently scheduled consumers.
580:             * <p>This number will always be inbetween "concurrentConsumers" and
581:             * "maxConcurrentConsumers", but might be higher than "activeConsumerCount"
582:             * (in case of some consumers being scheduled but not executed at the moment).
583:             * @see #getConcurrentConsumers()
584:             * @see #getMaxConcurrentConsumers()
585:             * @see #getActiveConsumerCount()
586:             */
587:            public final int getScheduledConsumerCount() {
588:                synchronized (this .activeInvokerMonitor) {
589:                    return this .scheduledInvokers.size();
590:                }
591:            }
592:
593:            /**
594:             * Return the number of currently active consumers.
595:             * <p>This number will always be inbetween "concurrentConsumers" and
596:             * "maxConcurrentConsumers", but might be lower than "scheduledConsumerCount".
597:             * (in case of some consumers being scheduled but not executed at the moment).
598:             * @see #getConcurrentConsumers()
599:             * @see #getMaxConcurrentConsumers()
600:             * @see #getActiveConsumerCount()
601:             */
602:            public final int getActiveConsumerCount() {
603:                synchronized (this .activeInvokerMonitor) {
604:                    return this .activeInvokerCount;
605:                }
606:            }
607:
608:            /**
609:             * Overridden to accept a failure in the initial setup - leaving it up to the
610:             * asynchronous invokers to establish the shared Connection on first access.
611:             * @see #refreshConnectionUntilSuccessful()
612:             */
613:            protected void establishSharedConnection() {
614:                try {
615:                    super .establishSharedConnection();
616:                } catch (JMSException ex) {
617:                    logger
618:                            .debug(
619:                                    "Could not establish shared JMS Connection - "
620:                                            + "leaving it up to asynchronous invokers to establish a Connection as soon as possible",
621:                                    ex);
622:                }
623:            }
624:
625:            /**
626:             * This implementations proceeds even after an exception thrown from
627:             * <code>Connection.start()</code>, relying on listeners to perform
628:             * appropriate recovery.
629:             */
630:            protected void startSharedConnection() {
631:                try {
632:                    super .startSharedConnection();
633:                } catch (JMSException ex) {
634:                    logger
635:                            .debug(
636:                                    "Connection start failed - relying on listeners to perform recovery",
637:                                    ex);
638:                }
639:            }
640:
641:            /**
642:             * This implementations proceeds even after an exception thrown from
643:             * <code>Connection.stop()</code>, relying on listeners to perform
644:             * appropriate recovery after a restart.
645:             */
646:            protected void stopSharedConnection() {
647:                try {
648:                    super .stopSharedConnection();
649:                } catch (JMSException ex) {
650:                    logger
651:                            .debug(
652:                                    "Connection stop failed - relying on listeners to perform recovery after restart",
653:                                    ex);
654:                }
655:            }
656:
657:            /**
658:             * Handle the given exception that arose during setup of a listener.
659:             * Called for every such exception in every concurrent listener.
660:             * <p>The default implementation logs the exception at error level
661:             * if not recovered yet, and at debug level if already recovered.
662:             * Can be overridden in subclasses.
663:             * @param ex the exception to handle
664:             * @param alreadyRecovered whether a previously executing listener
665:             * already recovered from the present listener setup failure
666:             * (this usually indicates a follow-up failure than be ignored
667:             * other than for debug log purposes)
668:             * @see #recoverAfterListenerSetupFailure()
669:             */
670:            protected void handleListenerSetupFailure(Throwable ex,
671:                    boolean alreadyRecovered) {
672:                if (ex instanceof  JMSException) {
673:                    invokeExceptionListener((JMSException) ex);
674:                }
675:                if (ex instanceof  SharedConnectionNotInitializedException) {
676:                    if (!alreadyRecovered) {
677:                        logger
678:                                .debug("JMS message listener invoker needs to establish shared Connection");
679:                    }
680:                } else {
681:                    if (alreadyRecovered) {
682:                        logger
683:                                .debug(
684:                                        "Setup of JMS message listener invoker failed - already recovered by other invoker",
685:                                        ex);
686:                    } else {
687:                        logger
688:                                .error(
689:                                        "Setup of JMS message listener invoker failed - trying to recover",
690:                                        ex);
691:                    }
692:                }
693:            }
694:
695:            /**
696:             * Recover this listener container after a listener failed to set itself up,
697:             * for example reestablishing the underlying Connection.
698:             * <p>The default implementation delegates to DefaultMessageListenerContainer's
699:             * recovery-capable {@link #refreshConnectionUntilSuccessful()} method, which will
700:             * try to re-establish a Connection to the JMS provider both for the shared
701:             * and the non-shared Connection case.
702:             * @see #refreshConnectionUntilSuccessful()
703:             * @see #refreshDestination()
704:             */
705:            protected void recoverAfterListenerSetupFailure() {
706:                refreshConnectionUntilSuccessful();
707:                refreshDestination();
708:            }
709:
710:            /**
711:             * Refresh the underlying Connection, not returning before an attempt has been
712:             * successful. Called in case of a shared Connection as well as without shared
713:             * Connection, so either needs to operate on the shared Connection or on a
714:             * temporary Connection that just gets established for validation purposes.
715:             * <p>The default implementation retries until it successfully established a
716:             * Connection, for as long as this message listener container is active.
717:             * Applies the specified recovery interval between retries.
718:             * @see #setRecoveryInterval
719:             */
720:            protected void refreshConnectionUntilSuccessful() {
721:                while (isRunning()) {
722:                    try {
723:                        if (sharedConnectionEnabled()) {
724:                            refreshSharedConnection();
725:                            startSharedConnection();
726:                        } else {
727:                            Connection con = createConnection();
728:                            JmsUtils.closeConnection(con);
729:                        }
730:                        logger.info("Successfully refreshed JMS Connection");
731:                        break;
732:                    } catch (Exception ex) {
733:                        if (logger.isInfoEnabled()) {
734:                            logger
735:                                    .info(
736:                                            "Could not refresh JMS Connection - retrying in "
737:                                                    + this .recoveryInterval
738:                                                    + " ms", ex);
739:                        }
740:                    }
741:                    sleepInbetweenRecoveryAttempts();
742:                }
743:            }
744:
745:            /**
746:             * Refresh the JMS destination that this listener container operates on.
747:             * <p>Called after listener setup failure, assuming that a cached Destination
748:             * object might have become invalid (a typical case on WebLogic JMS).
749:             * <p>The default implementation removes the destination from a
750:             * DestinationResolver's cache, in case of a CachingDestinationResolver.
751:             * @see #setDestinationName
752:             * @see org.springframework.jms.support.destination.CachingDestinationResolver
753:             */
754:            protected void refreshDestination() {
755:                String destName = getDestinationName();
756:                if (destName != null) {
757:                    DestinationResolver destResolver = getDestinationResolver();
758:                    if (destResolver instanceof  CachingDestinationResolver) {
759:                        ((CachingDestinationResolver) destResolver)
760:                                .removeFromCache(destName);
761:                    }
762:                }
763:            }
764:
765:            /**
766:             * Sleep according to the specified recovery interval.
767:             * Called inbetween recovery attempts.
768:             */
769:            protected void sleepInbetweenRecoveryAttempts() {
770:                if (this .recoveryInterval > 0) {
771:                    try {
772:                        Thread.sleep(this .recoveryInterval);
773:                    } catch (InterruptedException interEx) {
774:                        // Re-interrupt current thread, to allow other threads to react.
775:                        Thread.currentThread().interrupt();
776:                    }
777:                }
778:            }
779:
780:            /**
781:             * Destroy the registered JMS Sessions and associated MessageConsumers.
782:             */
783:            protected void doShutdown() throws JMSException {
784:                logger
785:                        .debug("Waiting for shutdown of message listener invokers");
786:                synchronized (this .activeInvokerMonitor) {
787:                    while (this .activeInvokerCount > 0) {
788:                        if (logger.isDebugEnabled()) {
789:                            logger.debug("Still waiting for shutdown of "
790:                                    + this .activeInvokerCount
791:                                    + " message listener invokers");
792:                        }
793:                        try {
794:                            this .activeInvokerMonitor.wait();
795:                        } catch (InterruptedException interEx) {
796:                            // Re-interrupt current thread, to allow other threads to react.
797:                            Thread.currentThread().interrupt();
798:                        }
799:                    }
800:                }
801:            }
802:
803:            //-------------------------------------------------------------------------
804:            // Inner classes used as internal adapters
805:            //-------------------------------------------------------------------------
806:
807:            /**
808:             * Runnable that performs looped <code>MessageConsumer.receive()</code> calls.
809:             */
810:            private class AsyncMessageListenerInvoker implements 
811:                    SchedulingAwareRunnable {
812:
813:                private Session session;
814:
815:                private MessageConsumer consumer;
816:
817:                private Object lastRecoveryMarker;
818:
819:                private boolean lastMessageSucceeded;
820:
821:                private int idleTaskExecutionCount = 0;
822:
823:                private volatile boolean idle = true;
824:
825:                public void run() {
826:                    synchronized (activeInvokerMonitor) {
827:                        activeInvokerCount++;
828:                        activeInvokerMonitor.notifyAll();
829:                    }
830:                    boolean messageReceived = false;
831:                    try {
832:                        if (maxMessagesPerTask < 0) {
833:                            while (isActive()) {
834:                                waitWhileNotRunning();
835:                                if (isActive()) {
836:                                    messageReceived = invokeListener();
837:                                }
838:                            }
839:                        } else {
840:                            int messageCount = 0;
841:                            while (isRunning()
842:                                    && messageCount < maxMessagesPerTask) {
843:                                messageReceived = (invokeListener() || messageReceived);
844:                                messageCount++;
845:                            }
846:                        }
847:                    } catch (Throwable ex) {
848:                        clearResources();
849:                        if (!this .lastMessageSucceeded) {
850:                            // We failed more than once in a row - sleep for recovery interval
851:                            // even before first recovery attempt.
852:                            sleepInbetweenRecoveryAttempts();
853:                        }
854:                        this .lastMessageSucceeded = false;
855:                        boolean alreadyRecovered = false;
856:                        synchronized (recoveryMonitor) {
857:                            if (this .lastRecoveryMarker == currentRecoveryMarker) {
858:                                handleListenerSetupFailure(ex, false);
859:                                recoverAfterListenerSetupFailure();
860:                                currentRecoveryMarker = new Object();
861:                            } else {
862:                                alreadyRecovered = true;
863:                            }
864:                        }
865:                        if (alreadyRecovered) {
866:                            handleListenerSetupFailure(ex, true);
867:                        }
868:                    }
869:                    synchronized (activeInvokerMonitor) {
870:                        activeInvokerCount--;
871:                        activeInvokerMonitor.notifyAll();
872:                    }
873:                    if (!messageReceived) {
874:                        this .idleTaskExecutionCount++;
875:                    } else {
876:                        this .idleTaskExecutionCount = 0;
877:                    }
878:                    if (!shouldRescheduleInvoker(this .idleTaskExecutionCount)
879:                            || !rescheduleTaskIfNecessary(this )) {
880:                        // We're shutting down completely.
881:                        synchronized (activeInvokerMonitor) {
882:                            scheduledInvokers.remove(this );
883:                            if (logger.isDebugEnabled()) {
884:                                logger
885:                                        .debug("Lowered scheduled invoker count: "
886:                                                + scheduledInvokers.size());
887:                            }
888:                            activeInvokerMonitor.notifyAll();
889:                        }
890:                        clearResources();
891:                    }
892:                }
893:
894:                private boolean invokeListener() throws JMSException {
895:                    initResourcesIfNecessary();
896:                    boolean messageReceived = receiveAndExecute(this .session,
897:                            this .consumer);
898:                    this .lastMessageSucceeded = true;
899:                    this .idle = !messageReceived;
900:                    return messageReceived;
901:                }
902:
903:                private void initResourcesIfNecessary() throws JMSException {
904:                    if (getCacheLevel() <= CACHE_CONNECTION) {
905:                        updateRecoveryMarker();
906:                    } else {
907:                        if (this .session == null
908:                                && getCacheLevel() >= CACHE_SESSION) {
909:                            updateRecoveryMarker();
910:                            this .session = createSession(getSharedConnection());
911:                        }
912:                        if (this .consumer == null
913:                                && getCacheLevel() >= CACHE_CONSUMER) {
914:                            this .consumer = createListenerConsumer(this .session);
915:                        }
916:                    }
917:                }
918:
919:                private void updateRecoveryMarker() {
920:                    synchronized (recoveryMonitor) {
921:                        this .lastRecoveryMarker = currentRecoveryMarker;
922:                    }
923:                }
924:
925:                private void clearResources() {
926:                    JmsUtils.closeMessageConsumer(this .consumer);
927:                    JmsUtils.closeSession(this .session);
928:                    this .consumer = null;
929:                    this .session = null;
930:                }
931:
932:                public boolean isLongLived() {
933:                    return (maxMessagesPerTask < 0);
934:                }
935:
936:                public boolean isIdle() {
937:                    return this.idle;
938:                }
939:            }
940:
941:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.