| org.springframework.jms.listener.AbstractJmsListeningContainer org.springframework.jms.listener.AbstractMessageListenerContainer org.springframework.jms.listener.AbstractPollingMessageListenerContainer org.springframework.jms.listener.DefaultMessageListenerContainer
All known Subclasses: org.springframework.jms.listener.DefaultMessageListenerContainer102,
DefaultMessageListenerContainer | public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer (Code) | | Message listener container variant that uses plain JMS client API, specifically
a loop of MessageConsumer.receive() calls that also allow for
transactional reception of messages (registering them with XA transactions).
Designed to work in a native JMS environment as well as in a J2EE environment,
with only minimal differences in configuration.
This is a simple but nevertheless powerful form of message listener container.
On startup, it obtains a fixed number of JMS Sessions to invoke the listener,
and optionally allows for dynamic adaptation at runtime (up until a maximum number).
Like
SimpleMessageListenerContainer , its main advantage is its low level
of runtime complexity, in particular the minimal requirements on the JMS provider:
Not even the JMS ServerSessionPool facility is required. Beyond that, it is
fully self-recovering in case of the broker being temporarily unavailable,
and allows for stops/restarts as well as runtime changes to its configuration.
Actual MessageListener execution happens in asynchronous work units which are
created through Spring's
org.springframework.core.task.TaskExecutor abstraction. By default, the specified number of invoker tasks will be created
on startup, according to the
DefaultMessageListenerContainer.setConcurrentConsumers "concurrentConsumers" setting. Specify an alternative TaskExecutor to integrate with an existing
thread pool facility (such as a J2EE server's), for example using a
org.springframework.scheduling.commonj.WorkManagerTaskExecutor CommonJ WorkManager .
With a native JMS setup, each of those listener threads is going to use a
cached JMS Session and MessageConsumer (only refreshed in case of failure),
using the JMS provider's resources as efficiently as possible.
Message reception and listener execution can automatically be wrapped
in transactions through passing a Spring
org.springframework.transaction.PlatformTransactionManager into the
DefaultMessageListenerContainer.setTransactionManager "transactionManager" property. This will usually
be a
org.springframework.transaction.jta.JtaTransactionManager in a
J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory obtained
from JNDI (check your J2EE server's documentation). Note that this listener
container will automatically reobtain all JMS handles for each transaction
in case of an external transaction manager specified, for compatibility with
all J2EE servers (in particular JBoss). This non-caching behavior can be
overridden through the
DefaultMessageListenerContainer.setCacheLevel "cacheLevel" /
DefaultMessageListenerContainer.setCacheLevelName "cacheLevelName" property, enforcing caching
of the Connection (or also Session and MessageConsumer) even in case of
an external transaction manager being involved.
Dynamic scaling of the number of concurrent invokers can be activated
through specifying a
DefaultMessageListenerContainer.setMaxConcurrentConsumers "maxConcurrentConsumers" value that is higher than the
DefaultMessageListenerContainer.setConcurrentConsumers "concurrentConsumers" value. Since the latter's default is 1, you can also simply specify a
"maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling up to
5 concurrent consumers in case of increasing message load, as well as dynamic
shrinking back to the standard number of consumers once the load decreases.
Consider adapting the
DefaultMessageListenerContainer.setIdleTaskExecutionLimit "idleTaskExecutionLimit" setting to control the lifespan of each new task, to avoid frequent scaling up
and down, in particular if the ConnectionFactory does not pool JMS Sessions
and/or the TaskExecutor does not pool threads (check your configuration!).
Note that dynamic scaling only really makes sense for a queue in the first
place; for a topic, you will typically stick with the default number of 1
consumer, else you'd receive the same message multiple times on the same node.
It is strongly recommended to either set
DefaultMessageListenerContainer.setSessionTransacted"sessionTransacted" to "true" or specify an external
DefaultMessageListenerContainer.setTransactionManager"transactionManager" . See the
AbstractMessageListenerContainer javadoc for details on acknowledge modes and native transaction options,
as well as the
AbstractPollingMessageListenerContainer javadoc
for details on configuring an external transaction manager.
This class requires a JMS 1.1+ provider, because it builds on the
domain-independent API. Use the
DefaultMessageListenerContainer102 subclass for JMS 1.0.2 providers.
author: Juergen Hoeller since: 2.0 See Also: DefaultMessageListenerContainer.setTransactionManager See Also: DefaultMessageListenerContainer.setCacheLevel See Also: javax.jms.MessageConsumer.receive(long) See Also: SimpleMessageListenerContainer See Also: org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer See Also: DefaultMessageListenerContainer102 |
Field Summary | |
final public static int | CACHE_CONNECTION Constant that indicates to cache a shared JMS Connection. | final public static int | CACHE_CONSUMER Constant that indicates to cache a shared JMS Connection
and a JMS Session for each listener thread, as well as
a JMS MessageConsumer for each listener thread. | final public static int | CACHE_NONE Constant that indicates to cache no JMS resources at all. | final public static int | CACHE_SESSION Constant that indicates to cache a shared JMS Connection
and a JMS Session for each listener thread. | final public static long | DEFAULT_RECOVERY_INTERVAL The default recovery interval: 5000 ms = 5 seconds. | final public static String | DEFAULT_THREAD_NAME_PREFIX Default thread name prefix: "DefaultMessageListenerContainer-". |
Method Summary | |
protected TaskExecutor | createDefaultTaskExecutor() Create a default TaskExecutor. | protected void | doInitialize() Creates the specified number of concurrent consumers,
in the form of a JMS Session plus associated MessageConsumer
running in a separate thread. | protected void | doRescheduleTask(Object task) Re-executes the given task via this listener container's TaskExecutor. | protected void | doShutdown() Destroy the registered JMS Sessions and associated MessageConsumers. | protected void | establishSharedConnection() Overridden to accept a failure in the initial setup - leaving it up to the
asynchronous invokers to establish the shared Connection on first access. | final public int | getActiveConsumerCount() Return the number of currently active consumers. | public int | getCacheLevel() Return the level of caching that this listener container is allowed to apply. | final public int | getConcurrentConsumers() Return the "concurrentConsumer" setting. | public int | getIdleTaskExecutionLimit() Return the limit for idle executions of a receive task. | final public int | getMaxConcurrentConsumers() Return the "maxConcurrentConsumer" setting. | public int | getMaxMessagesPerTask() Return the maximum number of messages to process in one task. | final public int | getScheduledConsumerCount() Return the number of currently scheduled consumers. | protected void | handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered) Handle the given exception that arose during setup of a listener. | public void | initialize() | protected void | messageReceived(Message message, Session session) | protected void | recoverAfterListenerSetupFailure() Recover this listener container after a listener failed to set itself up,
for example reestablishing the underlying Connection. | protected void | refreshConnectionUntilSuccessful() Refresh the underlying Connection, not returning before an attempt has been
successful. | protected void | refreshDestination() Refresh the JMS destination that this listener container operates on. | protected void | scheduleNewInvokerIfAppropriate() Schedule a new invoker, increasing the total number of scheduled
invokers for this listener container, but only if the specified
"maxConcurrentConsumers" limit has not been reached yet, and only
if this listener container does not currently have idle invokers
that are waiting for new messages already. | public void | setCacheLevel(int cacheLevel) Specify the level of caching that this listener container is allowed to apply.
Default is CACHE_NONE if an external transaction manager has been specified
(to reobtain all resources freshly within the scope of the external transaction),
and CACHE_CONSUMER else (operating with local JMS resources).
Some J2EE servers only register their JMS resources with an ongoing XA
transaction in case of a freshly obtained JMS Connection and Session,
which is why this listener container does by default not cache any of those.
However, if you want to optimize for a specific server, consider switching
this setting to at least CACHE_CONNECTION or CACHE_SESSION even in
conjunction with an external transaction manager.
Currently known servers that absolutely require CACHE_NONE for XA
transaction processing: JBoss 4. | public void | setCacheLevelName(String constantName) Specify the level of caching that this listener container is allowed to apply,
in the form of the name of the corresponding constant: e.g. | public void | setConcurrentConsumers(int concurrentConsumers) Specify the number of concurrent consumers to create. | public void | setIdleTaskExecutionLimit(int idleTaskExecutionLimit) Specify the limit for idle executions of a receive task, not having
received any message within its execution. | public void | setMaxConcurrentConsumers(int maxConcurrentConsumers) Specify the maximum number of concurrent consumers to create. | public void | setMaxMessagesPerTask(int maxMessagesPerTask) Specify the maximum number of messages to process in one task.
More concretely, this limits the number of message reception attempts per
task, which includes receive iterations that did not actually pick up a
message until they hit their timeout (see "receiveTimeout" property).
Default is unlimited (-1) in case of a standard TaskExecutor,
and 1 in case of a SchedulingTaskExecutor that indicates a preference for
short-lived tasks. | public void | setRecoveryInterval(long recoveryInterval) Specify the interval between recovery attempts, in milliseconds. | public void | setTaskExecutor(TaskExecutor taskExecutor) Set the Spring TaskExecutor to use for running the listener threads.
Default is a
org.springframework.core.task.SimpleAsyncTaskExecutor ,
starting up a number of new threads, according to the specified number
of concurrent consumers.
Specify an alternative TaskExecutor for integration with an existing
thread pool. | final protected boolean | sharedConnectionEnabled() Use a shared JMS Connection depending on the "cacheLevel" setting. | protected void | sleepInbetweenRecoveryAttempts() Sleep according to the specified recovery interval. | protected void | startSharedConnection() This implementations proceeds even after an exception thrown from
Connection.start() , relying on listeners to perform
appropriate recovery. | protected void | stopSharedConnection() This implementations proceeds even after an exception thrown from
Connection.stop() , relying on listeners to perform
appropriate recovery after a restart. | protected void | validateConfiguration() |
CACHE_CONSUMER | final public static int CACHE_CONSUMER(Code) | | Constant that indicates to cache a shared JMS Connection
and a JMS Session for each listener thread, as well as
a JMS MessageConsumer for each listener thread.
See Also: DefaultMessageListenerContainer.setCacheLevel |
DEFAULT_RECOVERY_INTERVAL | final public static long DEFAULT_RECOVERY_INTERVAL(Code) | | The default recovery interval: 5000 ms = 5 seconds.
|
DEFAULT_THREAD_NAME_PREFIX | final public static String DEFAULT_THREAD_NAME_PREFIX(Code) | | Default thread name prefix: "DefaultMessageListenerContainer-".
|
doShutdown | protected void doShutdown() throws JMSException(Code) | | Destroy the registered JMS Sessions and associated MessageConsumers.
|
getCacheLevel | public int getCacheLevel()(Code) | | Return the level of caching that this listener container is allowed to apply.
|
getIdleTaskExecutionLimit | public int getIdleTaskExecutionLimit()(Code) | | Return the limit for idle executions of a receive task.
|
getMaxMessagesPerTask | public int getMaxMessagesPerTask()(Code) | | Return the maximum number of messages to process in one task.
|
handleListenerSetupFailure | protected void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered)(Code) | | Handle the given exception that arose during setup of a listener.
Called for every such exception in every concurrent listener.
The default implementation logs the exception at error level
if not recovered yet, and at debug level if already recovered.
Can be overridden in subclasses.
Parameters: ex - the exception to handle Parameters: alreadyRecovered - whether a previously executing listeneralready recovered from the present listener setup failure(this usually indicates a follow-up failure than be ignoredother than for debug log purposes) See Also: DefaultMessageListenerContainer.recoverAfterListenerSetupFailure() |
initialize | public void initialize()(Code) | | |
messageReceived | protected void messageReceived(Message message, Session session)(Code) | | |
refreshConnectionUntilSuccessful | protected void refreshConnectionUntilSuccessful()(Code) | | Refresh the underlying Connection, not returning before an attempt has been
successful. Called in case of a shared Connection as well as without shared
Connection, so either needs to operate on the shared Connection or on a
temporary Connection that just gets established for validation purposes.
The default implementation retries until it successfully established a
Connection, for as long as this message listener container is active.
Applies the specified recovery interval between retries.
See Also: DefaultMessageListenerContainer.setRecoveryInterval |
scheduleNewInvokerIfAppropriate | protected void scheduleNewInvokerIfAppropriate()(Code) | | Schedule a new invoker, increasing the total number of scheduled
invokers for this listener container, but only if the specified
"maxConcurrentConsumers" limit has not been reached yet, and only
if this listener container does not currently have idle invokers
that are waiting for new messages already.
Called once a message has been received, to scale up while
processing the message in the invoker that originally received it.
See Also: DefaultMessageListenerContainer.setTaskExecutor See Also: DefaultMessageListenerContainer.getMaxConcurrentConsumers() |
setConcurrentConsumers | public void setConcurrentConsumers(int concurrentConsumers)(Code) | | Specify the number of concurrent consumers to create. Default is 1.
Specifying a higher value for this setting will increase the standard
level of scheduled concurrent consumers at runtime: This is effectively
the minimum number of concurrent consumers which will be scheduled
at any given time. This is a static setting; for dynamic scaling,
consider specifying the "maxConcurrentConsumers" setting instead.
Raising the number of concurrent consumers is recommendable in order
to scale the consumption of messages coming in from a queue. However,
note that any ordering guarantees are lost once multiple consumers are
registered. In general, stick with 1 consumer for low-volume queues.
Do not raise the number of concurrent consumers for a topic.
This would lead to concurrent consumption of the same message,
which is hardly ever desirable.
This setting can be modified at runtime, for example through JMX.
See Also: DefaultMessageListenerContainer.setMaxConcurrentConsumers |
setIdleTaskExecutionLimit | public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit)(Code) | | Specify the limit for idle executions of a receive task, not having
received any message within its execution. If this limit is reached,
the task will shut down and leave receiving to other executing tasks
(in case of dynamic scheduling; see the "maxConcurrentConsumers" setting).
Default is 1.
Within each task execution, a number of message reception attempts
(according to the "maxMessagesPerTask" setting) will each wait for an incoming
message (according to the "receiveTimeout" setting). If all of those receive
attempts in a given task return without a message, the task is considered
idle with respect to received messages. Such a task may still be rescheduled;
however, once it reached the specified "idleTaskExecutionLimit", it will
shut down (in case of dynamic scaling).
Raise this limit if you encounter too frequent scaling up and down.
With this limit being higher, an idle consumer will be kept around longer,
avoiding the restart of a consumer once a new load of messages comes in.
Alternatively, specify a higher "maxMessagePerTask" and/or "receiveTimeout" value,
which will also lead to idle consumers being kept around for a longer time
(while also increasing the average execution time of each scheduled task).
This setting can be modified at runtime, for example through JMX.
See Also: DefaultMessageListenerContainer.setMaxMessagesPerTask See Also: DefaultMessageListenerContainer.setReceiveTimeout |
setMaxConcurrentConsumers | public void setMaxConcurrentConsumers(int maxConcurrentConsumers)(Code) | | Specify the maximum number of concurrent consumers to create. Default is 1.
If this setting is higher than "concurrentConsumers", the listener container
will dynamically schedule new consumers at runtime, provided that enough
incoming messages are encountered. Once the load goes down again, the number of
consumers will be reduced to the standard level ("concurrentConsumers") again.
Raising the number of concurrent consumers is recommendable in order
to scale the consumption of messages coming in from a queue. However,
note that any ordering guarantees are lost once multiple consumers are
registered. In general, stick with 1 consumer for low-volume queues.
Do not raise the number of concurrent consumers for a topic.
This would lead to concurrent consumption of the same message,
which is hardly ever desirable.
This setting can be modified at runtime, for example through JMX.
See Also: DefaultMessageListenerContainer.setConcurrentConsumers |
setMaxMessagesPerTask | public void setMaxMessagesPerTask(int maxMessagesPerTask)(Code) | | Specify the maximum number of messages to process in one task.
More concretely, this limits the number of message reception attempts per
task, which includes receive iterations that did not actually pick up a
message until they hit their timeout (see "receiveTimeout" property).
Default is unlimited (-1) in case of a standard TaskExecutor,
and 1 in case of a SchedulingTaskExecutor that indicates a preference for
short-lived tasks. Specify a number of 10 to 100 messages to balance
between extremely long-lived and extremely short-lived tasks here.
Long-lived tasks avoid frequent thread context switches through
sticking with the same thread all the way through, while short-lived
tasks allow thread pools to control the scheduling. Hence, thread
pools will usually prefer short-lived tasks.
This setting can be modified at runtime, for example through JMX.
See Also: DefaultMessageListenerContainer.setTaskExecutor See Also: DefaultMessageListenerContainer.setReceiveTimeout See Also: org.springframework.scheduling.SchedulingTaskExecutor.prefersShortLivedTasks |
sleepInbetweenRecoveryAttempts | protected void sleepInbetweenRecoveryAttempts()(Code) | | Sleep according to the specified recovery interval.
Called inbetween recovery attempts.
|
startSharedConnection | protected void startSharedConnection()(Code) | | This implementations proceeds even after an exception thrown from
Connection.start() , relying on listeners to perform
appropriate recovery.
|
stopSharedConnection | protected void stopSharedConnection()(Code) | | This implementations proceeds even after an exception thrown from
Connection.stop() , relying on listeners to perform
appropriate recovery after a restart.
|
validateConfiguration | protected void validateConfiguration()(Code) | | |
Fields inherited from org.springframework.jms.listener.AbstractPollingMessageListenerContainer | final public static long DEFAULT_RECEIVE_TIMEOUT(Code)(Java Doc)
|
|
|