001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jms.connection;
018:
019: import java.lang.reflect.InvocationHandler;
020: import java.lang.reflect.InvocationTargetException;
021: import java.lang.reflect.Method;
022: import java.lang.reflect.Proxy;
023: import java.util.ArrayList;
024: import java.util.List;
025:
026: import javax.jms.Connection;
027: import javax.jms.ConnectionFactory;
028: import javax.jms.ExceptionListener;
029: import javax.jms.JMSException;
030: import javax.jms.QueueConnection;
031: import javax.jms.QueueConnectionFactory;
032: import javax.jms.TopicConnection;
033: import javax.jms.TopicConnectionFactory;
034:
035: import org.apache.commons.logging.Log;
036: import org.apache.commons.logging.LogFactory;
037:
038: import org.springframework.beans.factory.DisposableBean;
039: import org.springframework.beans.factory.InitializingBean;
040: import org.springframework.util.Assert;
041:
042: /**
043: * A JMS ConnectionFactory adapter that returns the same Connection
044: * from all {@link #createConnection()} calls, and ignores calls to
045: * {@link javax.jms.Connection#close()}. According to the JMS Connection
046: * model, this is perfectly thread-safe (in contrast to e.g. JDBC).
047: *
048: * <p>You can either pass in a specific JMS Connection directly or let this
049: * factory lazily create a Connection via a given target ConnectionFactory.
050: * In the latter case, this factory just works with JMS 1.1; use
051: * {@link SingleConnectionFactory102} for JMS 1.0.2.
052: *
053: * <p>Useful for testing and standalone environments in order to keep using the
054: * same Connection for multiple {@link org.springframework.jms.core.JmsTemplate}
055: * calls, without having a pooling ConnectionFactory underneath. This may span
056: * any number of transactions, even concurrently executing transactions.
057: *
058: * <p>Note that Spring's message listener containers support the use of
059: * a shared Connection within each listener container instance. Using
060: * SingleConnectionFactory in combination only really makes sense for
061: * sharing a single JMS Connection <i>across multiple listener containers</i>.
062: *
063: * @author Mark Pollack
064: * @author Juergen Hoeller
065: * @since 1.1
066: * @see org.springframework.jms.core.JmsTemplate
067: * @see org.springframework.jms.listener.SimpleMessageListenerContainer
068: * @see org.springframework.jms.listener.DefaultMessageListenerContainer#setCacheLevel
069: */
070: public class SingleConnectionFactory implements ConnectionFactory,
071: QueueConnectionFactory, TopicConnectionFactory,
072: ExceptionListener, InitializingBean, DisposableBean {
073:
074: protected final Log logger = LogFactory.getLog(getClass());
075:
076: private ConnectionFactory targetConnectionFactory;
077:
078: private String clientId;
079:
080: private ExceptionListener exceptionListener;
081:
082: private boolean reconnectOnException = false;
083:
084: /** Wrapped Connection */
085: private Connection target;
086:
087: /** Proxy Connection */
088: private Connection connection;
089:
090: /** Synchronization monitor for the shared Connection */
091: private final Object connectionMonitor = new Object();
092:
093: /**
094: * Create a new SingleConnectionFactory for bean-style usage.
095: * @see #setTargetConnectionFactory
096: */
097: public SingleConnectionFactory() {
098: }
099:
100: /**
101: * Create a new SingleConnectionFactory that always returns the
102: * given Connection. Works with both JMS 1.1 and 1.0.2.
103: * @param target the single Connection
104: */
105: public SingleConnectionFactory(Connection target) {
106: Assert.notNull(target, "Target Connection must not be null");
107: this .target = target;
108: this .connection = getSharedConnectionProxy(target);
109: }
110:
111: /**
112: * Create a new SingleConnectionFactory that always returns a single
113: * Connection that it will lazily create via the given target
114: * ConnectionFactory.
115: * @param targetConnectionFactory the target ConnectionFactory
116: */
117: public SingleConnectionFactory(
118: ConnectionFactory targetConnectionFactory) {
119: Assert.notNull(targetConnectionFactory,
120: "Target ConnectionFactory must not be null");
121: this .targetConnectionFactory = targetConnectionFactory;
122: }
123:
124: /**
125: * Set the target ConnectionFactory which will be used to lazily
126: * create a single Connection.
127: */
128: public void setTargetConnectionFactory(
129: ConnectionFactory targetConnectionFactory) {
130: this .targetConnectionFactory = targetConnectionFactory;
131: }
132:
133: /**
134: * Return the target ConnectionFactory which will be used to lazily
135: * create a single Connection, if any.
136: */
137: public ConnectionFactory getTargetConnectionFactory() {
138: return this .targetConnectionFactory;
139: }
140:
141: /**
142: * Specify a JMS client ID for the single Connection created and exposed
143: * by this ConnectionFactory.
144: * <p>Note that client IDs need to be unique among all active Connections
145: * of the underlying JMS provider. Furthermore, a client ID can only be
146: * assigned if the original ConnectionFactory hasn't already assigned one.
147: * @see javax.jms.Connection#setClientID
148: * @see #setTargetConnectionFactory
149: */
150: public void setClientId(String clientId) {
151: this .clientId = clientId;
152: }
153:
154: /**
155: * Return a JMS client ID for the single Connection created and exposed
156: * by this ConnectionFactory, if any.
157: */
158: protected String getClientId() {
159: return this .clientId;
160: }
161:
162: /**
163: * Specify an JMS ExceptionListener implementation that should be
164: * registered with with the single Connection created by this factory.
165: * @see #setReconnectOnException
166: */
167: public void setExceptionListener(ExceptionListener exceptionListener) {
168: this .exceptionListener = exceptionListener;
169: }
170:
171: /**
172: * Return the JMS ExceptionListener implementation that should be registered
173: * with with the single Connection created by this factory, if any.
174: */
175: protected ExceptionListener getExceptionListener() {
176: return this .exceptionListener;
177: }
178:
179: /**
180: * Specify whether the single Connection should be reset (to be subsequently renewed)
181: * when a JMSException is reported by the underlying Connection.
182: * <p>Default is "false". Switch this to "true" to automatically trigger
183: * recovery based on your JMS provider's exception notifications.
184: * <p>Internally, this will lead to a special JMS ExceptionListener
185: * (this SingleConnectionFactory itself) being registered with the
186: * underlying Connection. This can also be combined with a
187: * user-specified ExceptionListener, if desired.
188: * @see #setExceptionListener
189: */
190: public void setReconnectOnException(boolean reconnectOnException) {
191: this .reconnectOnException = reconnectOnException;
192: }
193:
194: /**
195: * Return whether the single Connection should be renewed when
196: * a JMSException is reported by the underlying Connection.
197: */
198: protected boolean isReconnectOnException() {
199: return this .reconnectOnException;
200: }
201:
202: /**
203: * Make sure a Connection or ConnectionFactory has been set.
204: */
205: public void afterPropertiesSet() {
206: if (this .connection == null
207: && getTargetConnectionFactory() == null) {
208: throw new IllegalArgumentException(
209: "Connection or 'targetConnectionFactory' is required");
210: }
211: }
212:
213: public Connection createConnection() throws JMSException {
214: synchronized (this .connectionMonitor) {
215: if (this .connection == null) {
216: initConnection();
217: }
218: return this .connection;
219: }
220: }
221:
222: public Connection createConnection(String username, String password)
223: throws JMSException {
224: throw new javax.jms.IllegalStateException(
225: "SingleConnectionFactory does not support custom username and password");
226: }
227:
228: public QueueConnection createQueueConnection() throws JMSException {
229: Connection con = createConnection();
230: if (!(con instanceof QueueConnection)) {
231: throw new javax.jms.IllegalStateException(
232: "This SingleConnectionFactory does not hold a QueueConnection but rather: "
233: + con);
234: }
235: return ((QueueConnection) con);
236: }
237:
238: public QueueConnection createQueueConnection(String username,
239: String password) throws JMSException {
240: throw new javax.jms.IllegalStateException(
241: "SingleConnectionFactory does not support custom username and password");
242: }
243:
244: public TopicConnection createTopicConnection() throws JMSException {
245: Connection con = createConnection();
246: if (!(con instanceof TopicConnection)) {
247: throw new javax.jms.IllegalStateException(
248: "This SingleConnectionFactory does not hold a TopicConnection but rather: "
249: + con);
250: }
251: return ((TopicConnection) con);
252: }
253:
254: public TopicConnection createTopicConnection(String username,
255: String password) throws JMSException {
256: throw new javax.jms.IllegalStateException(
257: "SingleConnectionFactory does not support custom username and password");
258: }
259:
260: /**
261: * Exception listener callback that renews the underlying single Connection.
262: */
263: public void onException(JMSException ex) {
264: resetConnection();
265: }
266:
267: /**
268: * Close the underlying shared connection.
269: * The provider of this ConnectionFactory needs to care for proper shutdown.
270: * <p>As this bean implements DisposableBean, a bean factory will
271: * automatically invoke this on destruction of its cached singletons.
272: */
273: public void destroy() {
274: resetConnection();
275: }
276:
277: /**
278: * Initialize the underlying shared Connection.
279: * <p>Closes and reinitializes the Connection if an underlying
280: * Connection is present already.
281: * @throws javax.jms.JMSException if thrown by JMS API methods
282: */
283: public void initConnection() throws JMSException {
284: if (getTargetConnectionFactory() == null) {
285: throw new IllegalStateException(
286: "'targetConnectionFactory' is required for lazily initializing a Connection");
287: }
288: synchronized (this .connectionMonitor) {
289: if (this .target != null) {
290: closeConnection(this .target);
291: }
292: this .target = doCreateConnection();
293: prepareConnection(this .target);
294: if (logger.isInfoEnabled()) {
295: logger.info("Established shared JMS Connection: "
296: + this .target);
297: }
298: this .connection = getSharedConnectionProxy(this .target);
299: }
300: }
301:
302: /**
303: * Reset the underlying shared Connection, to be reinitialized on next access.
304: */
305: public void resetConnection() {
306: synchronized (this .connectionMonitor) {
307: if (this .target != null) {
308: closeConnection(this .target);
309: }
310: this .target = null;
311: this .connection = null;
312: }
313: }
314:
315: /**
316: * Create a JMS Connection via this template's ConnectionFactory.
317: * <p>This implementation uses JMS 1.1 API.
318: * @return the new JMS Connection
319: * @throws javax.jms.JMSException if thrown by JMS API methods
320: */
321: protected Connection doCreateConnection() throws JMSException {
322: return getTargetConnectionFactory().createConnection();
323: }
324:
325: /**
326: * Prepare the given Connection before it is exposed.
327: * <p>The default implementation applies ExceptionListener and client id.
328: * Can be overridden in subclasses.
329: * @param con the Connection to prepare
330: * @throws JMSException if thrown by JMS API methods
331: * @see #setExceptionListener
332: * @see #setReconnectOnException
333: */
334: protected void prepareConnection(Connection con)
335: throws JMSException {
336: if (getClientId() != null) {
337: con.setClientID(getClientId());
338: }
339: if (getExceptionListener() != null || isReconnectOnException()) {
340: ExceptionListener listenerToUse = getExceptionListener();
341: if (isReconnectOnException()) {
342: listenerToUse = new InternalChainedExceptionListener(
343: this , listenerToUse);
344: }
345: con.setExceptionListener(listenerToUse);
346: }
347: }
348:
349: /**
350: * Close the given Connection.
351: * @param con the Connection to close
352: */
353: protected void closeConnection(Connection con) {
354: try {
355: try {
356: con.stop();
357: } finally {
358: con.close();
359: }
360: } catch (Throwable ex) {
361: logger.warn("Could not close shared JMS Connection", ex);
362: }
363: }
364:
365: /**
366: * Wrap the given Connection with a proxy that delegates every method call to it
367: * but suppresses close calls. This is useful for allowing application code to
368: * handle a special framework Connection just like an ordinary Connection from a
369: * JMS ConnectionFactory.
370: * @param target the original Connection to wrap
371: * @return the wrapped Connection
372: */
373: protected Connection getSharedConnectionProxy(Connection target) {
374: List classes = new ArrayList(3);
375: classes.add(Connection.class);
376: if (target instanceof QueueConnection) {
377: classes.add(QueueConnection.class);
378: }
379: if (target instanceof TopicConnection) {
380: classes.add(TopicConnection.class);
381: }
382: return (Connection) Proxy.newProxyInstance(getClass()
383: .getClassLoader(), (Class[]) classes
384: .toArray(new Class[classes.size()]),
385: new SharedConnectionInvocationHandler(target));
386: }
387:
388: /**
389: * Invocation handler that suppresses close calls on JMS Connections.
390: */
391: private static class SharedConnectionInvocationHandler implements
392: InvocationHandler {
393:
394: private final Connection target;
395:
396: private SharedConnectionInvocationHandler(Connection target) {
397: this .target = target;
398: }
399:
400: public Object invoke(Object proxy, Method method, Object[] args)
401: throws Throwable {
402: if (method.getName().equals("equals")) {
403: // Only consider equal when proxies are identical.
404: return (proxy == args[0] ? Boolean.TRUE : Boolean.FALSE);
405: } else if (method.getName().equals("hashCode")) {
406: // Use hashCode of Connection proxy.
407: return new Integer(hashCode());
408: } else if (method.getName().equals("setClientID")) {
409: // Handle setExceptionListener method: throw exception.
410: throw new javax.jms.IllegalStateException(
411: "setClientID call not supported on proxy for shared Connection. "
412: + "Set the 'clientId' property on the SingleConnectionFactory instead.");
413: } else if (method.getName().equals("setExceptionListener")) {
414: // Handle setExceptionListener method: throw exception.
415: throw new javax.jms.IllegalStateException(
416: "setExceptionListener call not supported on proxy for shared Connection. "
417: + "Set the 'exceptionListener' property on the SingleConnectionFactory instead.");
418: } else if (method.getName().equals("stop")) {
419: // Handle stop method: don't pass the call on.
420: return null;
421: } else if (method.getName().equals("close")) {
422: // Handle close method: don't pass the call on.
423: return null;
424: }
425: try {
426: Object retVal = method.invoke(this .target, args);
427: if (method.getName().equals("getExceptionListener")
428: && retVal instanceof InternalChainedExceptionListener) {
429: // Handle getExceptionListener method: hide internal chain.
430: InternalChainedExceptionListener listener = (InternalChainedExceptionListener) retVal;
431: return listener.getUserListener();
432: } else {
433: return retVal;
434: }
435: } catch (InvocationTargetException ex) {
436: throw ex.getTargetException();
437: }
438: }
439: }
440:
441: /**
442: * Internal chained ExceptionListener for handling the internal recovery listener
443: * in combination with a user-specified listener.
444: */
445: private static class InternalChainedExceptionListener extends
446: ChainedExceptionListener {
447:
448: public InternalChainedExceptionListener(
449: ExceptionListener internalListener,
450: ExceptionListener userListener) {
451: addDelegate(internalListener);
452: if (userListener != null) {
453: addDelegate(userListener);
454: }
455: }
456:
457: public ExceptionListener getUserListener() {
458: ExceptionListener[] delegates = getDelegates();
459: return (delegates.length > 1 ? delegates[1] : null);
460: }
461: }
462:
463: }
|