001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jms.listener;
018:
019: import java.util.Iterator;
020: import java.util.LinkedList;
021: import java.util.List;
022:
023: import javax.jms.Connection;
024: import javax.jms.JMSException;
025:
026: import org.springframework.beans.factory.BeanNameAware;
027: import org.springframework.beans.factory.DisposableBean;
028: import org.springframework.context.Lifecycle;
029: import org.springframework.jms.JmsException;
030: import org.springframework.jms.connection.ConnectionFactoryUtils;
031: import org.springframework.jms.support.JmsUtils;
032: import org.springframework.jms.support.destination.JmsDestinationAccessor;
033: import org.springframework.util.Assert;
034: import org.springframework.util.ClassUtils;
035:
036: /**
037: * Common base class for all containers which need to implement listening
038: * based on a JMS Connection (either shared or freshly obtained for each attempt).
039: * Inherits basic Connection and Session configuration handling from the
040: * {@link org.springframework.jms.support.JmsAccessor} base class.
041: *
042: * <p>This class provides basic lifecycle management, in particular management
043: * of a shared JMS Connection. Subclasses are supposed to plug into this
044: * lifecycle, implementing the {@link #sharedConnectionEnabled()} as well
045: * as the {@link #doInitialize()} and {@link #doShutdown()} template methods.
046: *
047: * <p>This base class does not assume any specific listener programming model
048: * or listener invoker mechanism. It just provides the general runtime
049: * lifecycle management needed for any kind of JMS-based listening mechanism
050: * that operates on a JMS Connection/Session.
051: *
052: * <p>For a concrete listener programming model, check out the
053: * {@link AbstractMessageListenerContainer} subclass. For a concrete listener
054: * invoker mechanism, check out the {@link DefaultMessageListenerContainer} class.
055: *
056: * @author Juergen Hoeller
057: * @since 2.0.3
058: * @see #sharedConnectionEnabled()
059: * @see #doInitialize()
060: * @see #doShutdown()
061: */
062: public abstract class AbstractJmsListeningContainer extends
063: JmsDestinationAccessor implements Lifecycle, BeanNameAware,
064: DisposableBean {
065:
066: private String clientId;
067:
068: private boolean autoStartup = true;
069:
070: private String beanName;
071:
072: private Connection sharedConnection;
073:
074: protected final Object sharedConnectionMonitor = new Object();
075:
076: private boolean active = false;
077:
078: private boolean running = false;
079:
080: private final List pausedTasks = new LinkedList();
081:
082: protected final Object lifecycleMonitor = new Object();
083:
084: /**
085: * Specify the JMS client id for a shared Connection created and used
086: * by this container.
087: * <p>Note that client ids need to be unique among all active Connections
088: * of the underlying JMS provider. Furthermore, a client id can only be
089: * assigned if the original ConnectionFactory hasn't already assigned one.
090: * @see javax.jms.Connection#setClientID
091: * @see #setConnectionFactory
092: */
093: public void setClientId(String clientId) {
094: this .clientId = clientId;
095: }
096:
097: /**
098: * Return the JMS client ID for the shared Connection created and used
099: * by this container, if any.
100: */
101: public String getClientId() {
102: return this .clientId;
103: }
104:
105: /**
106: * Set whether to automatically start the container after initialization.
107: * <p>Default is "true"; set this to "false" to allow for manual startup
108: * through the {@link #start()} method.
109: */
110: public void setAutoStartup(boolean autoStartup) {
111: this .autoStartup = autoStartup;
112: }
113:
114: public void setBeanName(String beanName) {
115: this .beanName = beanName;
116: }
117:
118: /**
119: * Return the bean name that this listener container has been assigned
120: * in its containing bean factory, if any.
121: */
122: protected final String getBeanName() {
123: return this .beanName;
124: }
125:
126: /**
127: * Delegates to {@link #validateConfiguration()} and {@link #initialize()}.
128: */
129: public void afterPropertiesSet() {
130: super .afterPropertiesSet();
131: validateConfiguration();
132: initialize();
133: }
134:
135: /**
136: * Validate the configuration of this container.
137: * <p>The default implementation is empty. To be overridden in subclasses.
138: */
139: protected void validateConfiguration() {
140: }
141:
142: /**
143: * Initialize this container.
144: * <p>Creates a JMS Connection, starts the {@link javax.jms.Connection}
145: * (if {@link #setAutoStartup(boolean) "autoStartup"} hasn't been turned off),
146: * and calls {@link #doInitialize()}.
147: * @throws org.springframework.jms.JmsException if startup failed
148: */
149: public void initialize() throws JmsException {
150: try {
151: synchronized (this .lifecycleMonitor) {
152: this .active = true;
153: this .lifecycleMonitor.notifyAll();
154: }
155: if (this .autoStartup) {
156: doStart();
157: }
158: doInitialize();
159: } catch (JMSException ex) {
160: synchronized (this .sharedConnectionMonitor) {
161: ConnectionFactoryUtils.releaseConnection(
162: this .sharedConnection, getConnectionFactory(),
163: this .autoStartup);
164: }
165: throw convertJmsAccessException(ex);
166: }
167: }
168:
169: /**
170: * Establish a shared Connection for this container.
171: * <p>The default implementation delegates to {@link #createSharedConnection()},
172: * which does one immediate attempt and throws an exception if it fails.
173: * Can be overridden to have a recovery process in place, retrying
174: * until a Connection can be successfully established.
175: * @throws JMSException if thrown by JMS API methods
176: */
177: protected void establishSharedConnection() throws JMSException {
178: synchronized (this .sharedConnectionMonitor) {
179: if (this .sharedConnection == null) {
180: this .sharedConnection = createSharedConnection();
181: logger.debug("Established shared JMS Connection");
182: }
183: }
184: }
185:
186: /**
187: * Refresh the shared Connection that this container holds.
188: * <p>Called on startup and also after an infrastructure exception
189: * that occurred during invoker setup and/or execution.
190: * @throws JMSException if thrown by JMS API methods
191: */
192: protected final void refreshSharedConnection() throws JMSException {
193: boolean running = isRunning();
194: synchronized (this .sharedConnectionMonitor) {
195: ConnectionFactoryUtils.releaseConnection(
196: this .sharedConnection, getConnectionFactory(),
197: running);
198: this .sharedConnection = createSharedConnection();
199: }
200: }
201:
202: /**
203: * Create a shared Connection for this container.
204: * <p>The default implementation creates a standard Connection
205: * and prepares it through {@link #prepareSharedConnection}.
206: * @return the prepared Connection
207: * @throws JMSException if the creation failed
208: */
209: protected Connection createSharedConnection() throws JMSException {
210: Connection con = createConnection();
211: try {
212: prepareSharedConnection(con);
213: return con;
214: } catch (JMSException ex) {
215: JmsUtils.closeConnection(con);
216: throw ex;
217: }
218: }
219:
220: /**
221: * Prepare the given Connection, which is about to be registered
222: * as shared Connection for this container.
223: * <p>The default implementation sets the specified client id, if any.
224: * Subclasses can override this to apply further settings.
225: * @param connection the Connection to prepare
226: * @throws JMSException if the preparation efforts failed
227: * @see #getClientId()
228: */
229: protected void prepareSharedConnection(Connection connection)
230: throws JMSException {
231: String clientId = getClientId();
232: if (clientId != null) {
233: connection.setClientID(clientId);
234: }
235: }
236:
237: /**
238: * Return the shared JMS Connection maintained by this container.
239: * Available after initialization.
240: * @return the shared Connection (never <code>null</code>)
241: * @throws IllegalStateException if this container does not maintain a
242: * shared Connection, or if the Connection hasn't been initialized yet
243: * @see #sharedConnectionEnabled()
244: */
245: protected final Connection getSharedConnection() {
246: if (!sharedConnectionEnabled()) {
247: throw new IllegalStateException(
248: "This listener container does not maintain a shared Connection");
249: }
250: synchronized (this .sharedConnectionMonitor) {
251: if (this .sharedConnection == null) {
252: throw new SharedConnectionNotInitializedException(
253: "This listener container's shared Connection has not been initialized yet");
254: }
255: return this .sharedConnection;
256: }
257: }
258:
259: /**
260: * Calls {@link #shutdown()} when the BeanFactory destroys the container instance.
261: * @see #shutdown()
262: */
263: public void destroy() {
264: shutdown();
265: }
266:
267: /**
268: * Stop the shared Connection, call {@link #doShutdown()},
269: * and close this container.
270: * @throws JmsException if shutdown failed
271: */
272: public void shutdown() throws JmsException {
273: logger.debug("Shutting down JMS listener container");
274: boolean wasRunning = false;
275: synchronized (this .lifecycleMonitor) {
276: wasRunning = this .running;
277: this .running = false;
278: this .active = false;
279: this .lifecycleMonitor.notifyAll();
280: }
281:
282: // Stop shared Connection early, if necessary.
283: if (wasRunning && sharedConnectionEnabled()) {
284: try {
285: stopSharedConnection();
286: } catch (Throwable ex) {
287: logger
288: .debug(
289: "Could not stop JMS Connection on shutdown",
290: ex);
291: }
292: }
293:
294: // Shut down the invokers.
295: try {
296: doShutdown();
297: } catch (JMSException ex) {
298: throw convertJmsAccessException(ex);
299: } finally {
300: if (sharedConnectionEnabled()) {
301: synchronized (this .sharedConnectionMonitor) {
302: ConnectionFactoryUtils.releaseConnection(
303: this .sharedConnection,
304: getConnectionFactory(), false);
305: }
306: }
307: }
308: }
309:
310: /**
311: * Return whether this container is currently active,
312: * that is, whether it has been set up but not shut down yet.
313: */
314: public final boolean isActive() {
315: synchronized (this .lifecycleMonitor) {
316: return this .active;
317: }
318: }
319:
320: //-------------------------------------------------------------------------
321: // Lifecycle methods for dynamically starting and stopping the container
322: //-------------------------------------------------------------------------
323:
324: /**
325: * Start this container.
326: * @throws JmsException if starting failed
327: * @see #doStart
328: */
329: public void start() throws JmsException {
330: try {
331: doStart();
332: } catch (JMSException ex) {
333: throw convertJmsAccessException(ex);
334: }
335: }
336:
337: /**
338: * Start the shared Connection, if any, and notify all invoker tasks.
339: * @throws JMSException if thrown by JMS API methods
340: * @see #startSharedConnection
341: */
342: protected void doStart() throws JMSException {
343: // Lazily establish a shared Connection, if necessary.
344: if (sharedConnectionEnabled()) {
345: establishSharedConnection();
346: }
347:
348: // Reschedule paused tasks, if any.
349: synchronized (this .lifecycleMonitor) {
350: this .running = true;
351: this .lifecycleMonitor.notifyAll();
352: for (Iterator it = this .pausedTasks.iterator(); it
353: .hasNext();) {
354: doRescheduleTask(it.next());
355: it.remove();
356: }
357: }
358:
359: // Start the shared Connection, if any.
360: if (sharedConnectionEnabled()) {
361: startSharedConnection();
362: }
363: }
364:
365: /**
366: * Start the shared Connection.
367: * @throws JMSException if thrown by JMS API methods
368: * @see javax.jms.Connection#start()
369: */
370: protected void startSharedConnection() throws JMSException {
371: synchronized (this .sharedConnectionMonitor) {
372: if (this .sharedConnection != null) {
373: try {
374: this .sharedConnection.start();
375: } catch (javax.jms.IllegalStateException ex) {
376: logger
377: .debug(
378: "Ignoring Connection start exception - assuming already started",
379: ex);
380: }
381: }
382: }
383: }
384:
385: /**
386: * Stop this container.
387: * @throws JmsException if stopping failed
388: * @see #doStop
389: */
390: public void stop() throws JmsException {
391: try {
392: doStop();
393: } catch (JMSException ex) {
394: throw convertJmsAccessException(ex);
395: }
396: }
397:
398: /**
399: * Notify all invoker tasks and stop the shared Connection, if any.
400: * @throws JMSException if thrown by JMS API methods
401: * @see #stopSharedConnection
402: */
403: protected void doStop() throws JMSException {
404: synchronized (this .lifecycleMonitor) {
405: this .running = false;
406: this .lifecycleMonitor.notifyAll();
407: }
408:
409: if (sharedConnectionEnabled()) {
410: stopSharedConnection();
411: }
412: }
413:
414: /**
415: * Stop the shared Connection.
416: * @throws JMSException if thrown by JMS API methods
417: * @see javax.jms.Connection#start()
418: */
419: protected void stopSharedConnection() throws JMSException {
420: synchronized (this .sharedConnectionMonitor) {
421: if (this .sharedConnection != null) {
422: try {
423: this .sharedConnection.stop();
424: } catch (javax.jms.IllegalStateException ex) {
425: logger
426: .debug(
427: "Ignoring Connection stop exception - assuming already stopped",
428: ex);
429: }
430: }
431: }
432: }
433:
434: /**
435: * Return whether this container is currently running,
436: * that is, whether it has been started and not stopped yet.
437: */
438: public final boolean isRunning() {
439: synchronized (this .lifecycleMonitor) {
440: return this .running;
441: }
442: }
443:
444: /**
445: * Wait while this container is not running.
446: * <p>To be called by asynchronous tasks that want to block
447: * while the container is in stopped state.
448: */
449: protected final void waitWhileNotRunning() {
450: synchronized (this .lifecycleMonitor) {
451: while (this .active && !this .running) {
452: try {
453: this .lifecycleMonitor.wait();
454: } catch (InterruptedException ex) {
455: // Re-interrupt current thread, to allow other threads to react.
456: Thread.currentThread().interrupt();
457: }
458: }
459: }
460: }
461:
462: /**
463: * Take the given task object and reschedule it, either immediately if
464: * this container is currently running, or later once this container
465: * has been restarted.
466: * <p>If this container has already been shut down, the task will not
467: * get rescheduled at all.
468: * @param task the task object to reschedule
469: * @return whether the task has been rescheduled
470: * (either immediately or for a restart of this container)
471: * @see #doRescheduleTask
472: */
473: protected final boolean rescheduleTaskIfNecessary(Object task) {
474: Assert.notNull(task, "Task object must not be null");
475: synchronized (this .lifecycleMonitor) {
476: if (this .running) {
477: doRescheduleTask(task);
478: return true;
479: } else if (this .active) {
480: this .pausedTasks.add(task);
481: return true;
482: } else {
483: return false;
484: }
485: }
486: }
487:
488: /**
489: * Reschedule the given task object immediately.
490: * <p>To be implemented by subclasses if they ever call
491: * <code>rescheduleTaskIfNecessary</code>.
492: * This implementation throws an UnsupportedOperationException.
493: * @param task the task object to reschedule
494: * @see #rescheduleTaskIfNecessary
495: */
496: protected void doRescheduleTask(Object task) {
497: throw new UnsupportedOperationException(ClassUtils
498: .getShortName(getClass())
499: + " does not support rescheduling of tasks");
500: }
501:
502: //-------------------------------------------------------------------------
503: // Template methods to be implemented by subclasses
504: //-------------------------------------------------------------------------
505:
506: /**
507: * Return whether a shared JMS Connection should be maintained
508: * by this container base class.
509: * @see #getSharedConnection()
510: */
511: protected abstract boolean sharedConnectionEnabled();
512:
513: /**
514: * Register any invokers within this container.
515: * <p>Subclasses need to implement this method for their specific
516: * invoker management process.
517: * <p>A shared JMS Connection, if any, will already have been
518: * started at this point.
519: * @throws JMSException if registration failed
520: * @see #getSharedConnection()
521: */
522: protected abstract void doInitialize() throws JMSException;
523:
524: /**
525: * Close the registered invokers.
526: * <p>Subclasses need to implement this method for their specific
527: * invoker management process.
528: * <p>A shared JMS Connection, if any, will automatically be closed
529: * <i>afterwards</i>.
530: * @throws JMSException if shutdown failed
531: * @see #shutdown()
532: */
533: protected abstract void doShutdown() throws JMSException;
534:
535: /**
536: * Exception that indicates that the initial setup of this container's
537: * shared JMS Connection failed. This is indicating to invokers that they need
538: * to establish the shared Connection themselves on first access.
539: */
540: public static class SharedConnectionNotInitializedException extends
541: RuntimeException {
542:
543: /**
544: * Create a new SharedConnectionNotInitializedException.
545: * @param msg the detail message
546: */
547: protected SharedConnectionNotInitializedException(String msg) {
548: super(msg);
549: }
550: }
551:
552: }
|