Source Code Cross Referenced for SimpleMessageListenerContainer.java in  » J2EE » spring-framework-2.0.6 » org » springframework » jms » listener » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » J2EE » spring framework 2.0.6 » org.springframework.jms.listener 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Copyright 2002-2007 the original author or authors.
003:         *
004:         * Licensed under the Apache License, Version 2.0 (the "License");
005:         * you may not use this file except in compliance with the License.
006:         * You may obtain a copy of the License at
007:         *
008:         *      http://www.apache.org/licenses/LICENSE-2.0
009:         *
010:         * Unless required by applicable law or agreed to in writing, software
011:         * distributed under the License is distributed on an "AS IS" BASIS,
012:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013:         * See the License for the specific language governing permissions and
014:         * limitations under the License.
015:         */
016:
017:        package org.springframework.jms.listener;
018:
019:        import java.util.HashSet;
020:        import java.util.Iterator;
021:        import java.util.Set;
022:
023:        import javax.jms.Destination;
024:        import javax.jms.JMSException;
025:        import javax.jms.Message;
026:        import javax.jms.MessageConsumer;
027:        import javax.jms.MessageListener;
028:        import javax.jms.Session;
029:        import javax.jms.Topic;
030:
031:        import org.springframework.core.task.TaskExecutor;
032:        import org.springframework.jms.support.JmsUtils;
033:        import org.springframework.util.Assert;
034:
035:        /**
036:         * Message listener container that uses the plain JMS client API's
037:         * <code>MessageConsumer.setMessageListener()</code> method to
038:         * create concurrent MessageConsumers for the specified listeners.
039:         *
040:         * <p>This is the simplest form of a message listener container.
041:         * It creates a fixed number of JMS Sessions to invoke the listener,
042:         * not allowing for dynamic adaptation to runtime demands. Its main
043:         * advantage is its low level of complexity and the minimum requirements
044:         * on the JMS provider: Not even the ServerSessionPool facility is required.
045:         *
046:         * <p>See the {@link AbstractMessageListenerContainer} javadoc for details
047:         * on acknowledge modes and transaction options.
048:         *
049:         * <p>For a different style of MessageListener handling, through looped
050:         * <code>MessageConsumer.receive()</code> calls that also allow for
051:         * transactional reception of messages (registering them with XA transactions),
052:         * see {@link DefaultMessageListenerContainer}. For dynamic adaptation of the
053:         * active number of Sessions, consider using
054:         * {@link org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer}.
055:         *
056:         * <p>This class requires a JMS 1.1+ provider, because it builds on the
057:         * domain-independent API. <b>Use the {@link SimpleMessageListenerContainer102}
058:         * subclass for JMS 1.0.2 providers.</b>
059:         *
060:         * @author Juergen Hoeller
061:         * @since 2.0
062:         * @see javax.jms.MessageConsumer#setMessageListener
063:         * @see DefaultMessageListenerContainer
064:         * @see org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer
065:         * @see SimpleMessageListenerContainer102
066:         */
067:        public class SimpleMessageListenerContainer extends
068:                AbstractMessageListenerContainer {
069:
070:            private boolean pubSubNoLocal = false;
071:
072:            private int concurrentConsumers = 1;
073:
074:            private TaskExecutor taskExecutor;
075:
076:            private Set sessions;
077:
078:            private Set consumers;
079:
080:            /**
081:             * Set whether to inhibit the delivery of messages published by its own connection.
082:             * Default is "false".
083:             * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
084:             */
085:            public void setPubSubNoLocal(boolean pubSubNoLocal) {
086:                this .pubSubNoLocal = pubSubNoLocal;
087:            }
088:
089:            /**
090:             * Return whether to inhibit the delivery of messages published by its own connection.
091:             */
092:            protected boolean isPubSubNoLocal() {
093:                return this .pubSubNoLocal;
094:            }
095:
096:            /**
097:             * Specify the number of concurrent consumers to create. Default is 1.
098:             * <p>Raising the number of concurrent consumers is recommendable in order
099:             * to scale the consumption of messages coming in from a queue. However,
100:             * note that any ordering guarantees are lost once multiple consumers are
101:             * registered. In general, stick with 1 consumer for low-volume queues.
102:             * <p><b>Do not raise the number of concurrent consumers for a topic.</b>
103:             * This would lead to concurrent consumption of the same message,
104:             * which is hardly ever desirable.
105:             */
106:            public void setConcurrentConsumers(int concurrentConsumers) {
107:                Assert.isTrue(concurrentConsumers > 0,
108:                        "'concurrentConsumers' value must be at least 1 (one)");
109:                this .concurrentConsumers = concurrentConsumers;
110:            }
111:
112:            /**
113:             * Set the Spring TaskExecutor to use for executing the listener once
114:             * a message has been received by the provider.
115:             * <p>Default is none, that is, to run in the JMS provider's own receive thread,
116:             * blocking the provider's receive endpoint while executing the listener.
117:             * <p>Specify a TaskExecutor for executing the listener in a different thread,
118:             * rather than blocking the JMS provider, usually integrating with an existing
119:             * thread pool. This allows to keep the number of concurrent consumers low (1)
120:             * while still processing messages concurrently (decoupled from receiving!).
121:             * <p><b>NOTE: Specifying a TaskExecutor for listener execution affects
122:             * acknowledgement semantics.</b> Messages will then always get acknowledged
123:             * before listener execution, with the underlying Session immediately reused
124:             * for receiving the next message. Using this in combination with a transacted
125:             * session or with client acknowledgement will lead to unspecified results!
126:             * <p><b>NOTE: Concurrent listener execution via a TaskExecutor will lead
127:             * to concurrent processing of messages that have been received by the same
128:             * underlying Session.</b> As a consequence, it is not recommended to use
129:             * this setting with a {@link SessionAwareMessageListener}, at least not
130:             * if the latter performs actual work on the given Session. A standard
131:             * {@link javax.jms.MessageListener} will work fine, in general.
132:             * @see #setConcurrentConsumers
133:             * @see org.springframework.core.task.SimpleAsyncTaskExecutor
134:             * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
135:             */
136:            public void setTaskExecutor(TaskExecutor taskExecutor) {
137:                this .taskExecutor = taskExecutor;
138:            }
139:
140:            protected void validateConfiguration() {
141:                super .validateConfiguration();
142:                if (isSubscriptionDurable() && this .concurrentConsumers != 1) {
143:                    throw new IllegalArgumentException(
144:                            "Only 1 concurrent consumer supported for durable subscription");
145:                }
146:            }
147:
148:            //-------------------------------------------------------------------------
149:            // Implementation of AbstractMessageListenerContainer's template methods
150:            //-------------------------------------------------------------------------
151:
152:            /**
153:             * Always use a shared JMS Connection.
154:             */
155:            protected final boolean sharedConnectionEnabled() {
156:                return true;
157:            }
158:
159:            /**
160:             * Creates the specified number of concurrent consumers,
161:             * in the form of a JMS Session plus associated MessageConsumer.
162:             * @see #createListenerConsumer
163:             */
164:            protected void doInitialize() throws JMSException {
165:                establishSharedConnection();
166:
167:                // Register Sessions and MessageConsumers.
168:                this .sessions = new HashSet(this .concurrentConsumers);
169:                this .consumers = new HashSet(this .concurrentConsumers);
170:                for (int i = 0; i < this .concurrentConsumers; i++) {
171:                    Session session = createSession(getSharedConnection());
172:                    MessageConsumer consumer = createListenerConsumer(session);
173:                    this .sessions.add(session);
174:                    this .consumers.add(consumer);
175:                }
176:            }
177:
178:            /**
179:             * Create a MessageConsumer for the given JMS Session,
180:             * registering a MessageListener for the specified listener.
181:             * @param session the JMS Session to work on
182:             * @return the MessageConsumer
183:             * @throws JMSException if thrown by JMS methods
184:             * @see #executeListener
185:             */
186:            protected MessageConsumer createListenerConsumer(
187:                    final Session session) throws JMSException {
188:                Destination destination = getDestination();
189:                if (destination == null) {
190:                    destination = resolveDestinationName(session,
191:                            getDestinationName());
192:                }
193:                MessageConsumer consumer = createConsumer(session, destination);
194:                if (this .taskExecutor != null) {
195:                    consumer.setMessageListener(new MessageListener() {
196:                        public void onMessage(final Message message) {
197:                            taskExecutor.execute(new Runnable() {
198:                                public void run() {
199:                                    executeListener(session, message);
200:                                }
201:                            });
202:                        }
203:                    });
204:                } else {
205:                    consumer.setMessageListener(new MessageListener() {
206:                        public void onMessage(Message message) {
207:                            executeListener(session, message);
208:                        }
209:                    });
210:                }
211:                return consumer;
212:            }
213:
214:            /**
215:             * Destroy the registered JMS Sessions and associated MessageConsumers.
216:             */
217:            protected void doShutdown() throws JMSException {
218:                logger.debug("Closing JMS MessageConsumers");
219:                for (Iterator it = this .consumers.iterator(); it.hasNext();) {
220:                    MessageConsumer consumer = (MessageConsumer) it.next();
221:                    JmsUtils.closeMessageConsumer(consumer);
222:                }
223:                logger.debug("Closing JMS Sessions");
224:                for (Iterator it = this .sessions.iterator(); it.hasNext();) {
225:                    Session session = (Session) it.next();
226:                    JmsUtils.closeSession(session);
227:                }
228:            }
229:
230:            //-------------------------------------------------------------------------
231:            // JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
232:            //-------------------------------------------------------------------------
233:
234:            /**
235:             * Create a JMS MessageConsumer for the given Session and Destination.
236:             * <p>This implementation uses JMS 1.1 API.
237:             * @param session the JMS Session to create a MessageConsumer for
238:             * @param destination the JMS Destination to create a MessageConsumer for
239:             * @return the new JMS MessageConsumer
240:             * @throws JMSException if thrown by JMS API methods
241:             */
242:            protected MessageConsumer createConsumer(Session session,
243:                    Destination destination) throws JMSException {
244:                // Only pass in the NoLocal flag in case of a Topic:
245:                // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
246:                // in case of the NoLocal flag being specified for a Queue.
247:                if (isPubSubDomain()) {
248:                    if (isSubscriptionDurable() && destination instanceof  Topic) {
249:                        return session.createDurableSubscriber(
250:                                (Topic) destination,
251:                                getDurableSubscriptionName(),
252:                                getMessageSelector(), isPubSubNoLocal());
253:                    } else {
254:                        return session.createConsumer(destination,
255:                                getMessageSelector(), isPubSubNoLocal());
256:                    }
257:                } else {
258:                    return session.createConsumer(destination,
259:                            getMessageSelector());
260:                }
261:            }
262:
263:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.