001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jms.listener;
018:
019: import java.util.HashSet;
020: import java.util.Iterator;
021: import java.util.Set;
022:
023: import javax.jms.Destination;
024: import javax.jms.JMSException;
025: import javax.jms.Message;
026: import javax.jms.MessageConsumer;
027: import javax.jms.MessageListener;
028: import javax.jms.Session;
029: import javax.jms.Topic;
030:
031: import org.springframework.core.task.TaskExecutor;
032: import org.springframework.jms.support.JmsUtils;
033: import org.springframework.util.Assert;
034:
035: /**
036: * Message listener container that uses the plain JMS client API's
037: * <code>MessageConsumer.setMessageListener()</code> method to
038: * create concurrent MessageConsumers for the specified listeners.
039: *
040: * <p>This is the simplest form of a message listener container.
041: * It creates a fixed number of JMS Sessions to invoke the listener,
042: * not allowing for dynamic adaptation to runtime demands. Its main
043: * advantage is its low level of complexity and the minimum requirements
044: * on the JMS provider: Not even the ServerSessionPool facility is required.
045: *
046: * <p>See the {@link AbstractMessageListenerContainer} javadoc for details
047: * on acknowledge modes and transaction options.
048: *
049: * <p>For a different style of MessageListener handling, through looped
050: * <code>MessageConsumer.receive()</code> calls that also allow for
051: * transactional reception of messages (registering them with XA transactions),
052: * see {@link DefaultMessageListenerContainer}. For dynamic adaptation of the
053: * active number of Sessions, consider using
054: * {@link org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer}.
055: *
056: * <p>This class requires a JMS 1.1+ provider, because it builds on the
057: * domain-independent API. <b>Use the {@link SimpleMessageListenerContainer102}
058: * subclass for JMS 1.0.2 providers.</b>
059: *
060: * @author Juergen Hoeller
061: * @since 2.0
062: * @see javax.jms.MessageConsumer#setMessageListener
063: * @see DefaultMessageListenerContainer
064: * @see org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer
065: * @see SimpleMessageListenerContainer102
066: */
067: public class SimpleMessageListenerContainer extends
068: AbstractMessageListenerContainer {
069:
070: private boolean pubSubNoLocal = false;
071:
072: private int concurrentConsumers = 1;
073:
074: private TaskExecutor taskExecutor;
075:
076: private Set sessions;
077:
078: private Set consumers;
079:
080: /**
081: * Set whether to inhibit the delivery of messages published by its own connection.
082: * Default is "false".
083: * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
084: */
085: public void setPubSubNoLocal(boolean pubSubNoLocal) {
086: this .pubSubNoLocal = pubSubNoLocal;
087: }
088:
089: /**
090: * Return whether to inhibit the delivery of messages published by its own connection.
091: */
092: protected boolean isPubSubNoLocal() {
093: return this .pubSubNoLocal;
094: }
095:
096: /**
097: * Specify the number of concurrent consumers to create. Default is 1.
098: * <p>Raising the number of concurrent consumers is recommendable in order
099: * to scale the consumption of messages coming in from a queue. However,
100: * note that any ordering guarantees are lost once multiple consumers are
101: * registered. In general, stick with 1 consumer for low-volume queues.
102: * <p><b>Do not raise the number of concurrent consumers for a topic.</b>
103: * This would lead to concurrent consumption of the same message,
104: * which is hardly ever desirable.
105: */
106: public void setConcurrentConsumers(int concurrentConsumers) {
107: Assert.isTrue(concurrentConsumers > 0,
108: "'concurrentConsumers' value must be at least 1 (one)");
109: this .concurrentConsumers = concurrentConsumers;
110: }
111:
112: /**
113: * Set the Spring TaskExecutor to use for executing the listener once
114: * a message has been received by the provider.
115: * <p>Default is none, that is, to run in the JMS provider's own receive thread,
116: * blocking the provider's receive endpoint while executing the listener.
117: * <p>Specify a TaskExecutor for executing the listener in a different thread,
118: * rather than blocking the JMS provider, usually integrating with an existing
119: * thread pool. This allows to keep the number of concurrent consumers low (1)
120: * while still processing messages concurrently (decoupled from receiving!).
121: * <p><b>NOTE: Specifying a TaskExecutor for listener execution affects
122: * acknowledgement semantics.</b> Messages will then always get acknowledged
123: * before listener execution, with the underlying Session immediately reused
124: * for receiving the next message. Using this in combination with a transacted
125: * session or with client acknowledgement will lead to unspecified results!
126: * <p><b>NOTE: Concurrent listener execution via a TaskExecutor will lead
127: * to concurrent processing of messages that have been received by the same
128: * underlying Session.</b> As a consequence, it is not recommended to use
129: * this setting with a {@link SessionAwareMessageListener}, at least not
130: * if the latter performs actual work on the given Session. A standard
131: * {@link javax.jms.MessageListener} will work fine, in general.
132: * @see #setConcurrentConsumers
133: * @see org.springframework.core.task.SimpleAsyncTaskExecutor
134: * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
135: */
136: public void setTaskExecutor(TaskExecutor taskExecutor) {
137: this .taskExecutor = taskExecutor;
138: }
139:
140: protected void validateConfiguration() {
141: super .validateConfiguration();
142: if (isSubscriptionDurable() && this .concurrentConsumers != 1) {
143: throw new IllegalArgumentException(
144: "Only 1 concurrent consumer supported for durable subscription");
145: }
146: }
147:
148: //-------------------------------------------------------------------------
149: // Implementation of AbstractMessageListenerContainer's template methods
150: //-------------------------------------------------------------------------
151:
152: /**
153: * Always use a shared JMS Connection.
154: */
155: protected final boolean sharedConnectionEnabled() {
156: return true;
157: }
158:
159: /**
160: * Creates the specified number of concurrent consumers,
161: * in the form of a JMS Session plus associated MessageConsumer.
162: * @see #createListenerConsumer
163: */
164: protected void doInitialize() throws JMSException {
165: establishSharedConnection();
166:
167: // Register Sessions and MessageConsumers.
168: this .sessions = new HashSet(this .concurrentConsumers);
169: this .consumers = new HashSet(this .concurrentConsumers);
170: for (int i = 0; i < this .concurrentConsumers; i++) {
171: Session session = createSession(getSharedConnection());
172: MessageConsumer consumer = createListenerConsumer(session);
173: this .sessions.add(session);
174: this .consumers.add(consumer);
175: }
176: }
177:
178: /**
179: * Create a MessageConsumer for the given JMS Session,
180: * registering a MessageListener for the specified listener.
181: * @param session the JMS Session to work on
182: * @return the MessageConsumer
183: * @throws JMSException if thrown by JMS methods
184: * @see #executeListener
185: */
186: protected MessageConsumer createListenerConsumer(
187: final Session session) throws JMSException {
188: Destination destination = getDestination();
189: if (destination == null) {
190: destination = resolveDestinationName(session,
191: getDestinationName());
192: }
193: MessageConsumer consumer = createConsumer(session, destination);
194: if (this .taskExecutor != null) {
195: consumer.setMessageListener(new MessageListener() {
196: public void onMessage(final Message message) {
197: taskExecutor.execute(new Runnable() {
198: public void run() {
199: executeListener(session, message);
200: }
201: });
202: }
203: });
204: } else {
205: consumer.setMessageListener(new MessageListener() {
206: public void onMessage(Message message) {
207: executeListener(session, message);
208: }
209: });
210: }
211: return consumer;
212: }
213:
214: /**
215: * Destroy the registered JMS Sessions and associated MessageConsumers.
216: */
217: protected void doShutdown() throws JMSException {
218: logger.debug("Closing JMS MessageConsumers");
219: for (Iterator it = this .consumers.iterator(); it.hasNext();) {
220: MessageConsumer consumer = (MessageConsumer) it.next();
221: JmsUtils.closeMessageConsumer(consumer);
222: }
223: logger.debug("Closing JMS Sessions");
224: for (Iterator it = this .sessions.iterator(); it.hasNext();) {
225: Session session = (Session) it.next();
226: JmsUtils.closeSession(session);
227: }
228: }
229:
230: //-------------------------------------------------------------------------
231: // JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
232: //-------------------------------------------------------------------------
233:
234: /**
235: * Create a JMS MessageConsumer for the given Session and Destination.
236: * <p>This implementation uses JMS 1.1 API.
237: * @param session the JMS Session to create a MessageConsumer for
238: * @param destination the JMS Destination to create a MessageConsumer for
239: * @return the new JMS MessageConsumer
240: * @throws JMSException if thrown by JMS API methods
241: */
242: protected MessageConsumer createConsumer(Session session,
243: Destination destination) throws JMSException {
244: // Only pass in the NoLocal flag in case of a Topic:
245: // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
246: // in case of the NoLocal flag being specified for a Queue.
247: if (isPubSubDomain()) {
248: if (isSubscriptionDurable() && destination instanceof Topic) {
249: return session.createDurableSubscriber(
250: (Topic) destination,
251: getDurableSubscriptionName(),
252: getMessageSelector(), isPubSubNoLocal());
253: } else {
254: return session.createConsumer(destination,
255: getMessageSelector(), isPubSubNoLocal());
256: }
257: } else {
258: return session.createConsumer(destination,
259: getMessageSelector());
260: }
261: }
262:
263: }
|