001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jms.connection;
018:
019: import java.lang.reflect.InvocationHandler;
020: import java.lang.reflect.InvocationTargetException;
021: import java.lang.reflect.Method;
022: import java.lang.reflect.Proxy;
023: import java.util.ArrayList;
024: import java.util.List;
025:
026: import javax.jms.Connection;
027: import javax.jms.ConnectionFactory;
028: import javax.jms.ExceptionListener;
029: import javax.jms.JMSException;
030: import javax.jms.QueueConnection;
031: import javax.jms.QueueConnectionFactory;
032: import javax.jms.TopicConnection;
033: import javax.jms.TopicConnectionFactory;
034:
035: import org.apache.commons.logging.Log;
036: import org.apache.commons.logging.LogFactory;
037:
038: import org.springframework.beans.factory.DisposableBean;
039: import org.springframework.beans.factory.InitializingBean;
040: import org.springframework.util.Assert;
041:
042: /**
043: * A JMS ConnectionFactory adapter that returns the same Connection on all
044: * <code>createConnection</code> calls, and ignores calls to
045: * <code>Connection.close()</code>. According to the JMS Connection model,
046: * this is even thread-safe.
047: *
048: * <p>Useful for testing and standalone environments, to keep using the same
049: * Connection for multiple JmsTemplate calls, without having a pooling
050: * ConnectionFactory, also spanning any number of transactions.
051: *
052: * <p>You can either pass in a JMS Connection directly, or let this
053: * factory lazily create a Connection via a given target ConnectionFactory.
054: * In the latter case, this factory just works with JMS 1.1; use
055: * {@link SingleConnectionFactory102} for JMS 1.0.2.
056: *
057: * @author Mark Pollack
058: * @author Juergen Hoeller
059: * @since 1.1
060: * @see #createConnection()
061: * @see javax.jms.Connection#close()
062: * @see SingleConnectionFactory102
063: * @see org.springframework.jms.core.JmsTemplate
064: */
065: public class SingleConnectionFactory implements ConnectionFactory,
066: QueueConnectionFactory, TopicConnectionFactory,
067: ExceptionListener, InitializingBean, DisposableBean {
068:
069: protected final Log logger = LogFactory.getLog(getClass());
070:
071: private ConnectionFactory targetConnectionFactory;
072:
073: private String clientId;
074:
075: private ExceptionListener exceptionListener;
076:
077: private boolean reconnectOnException = false;
078:
079: /** Wrapped Connection */
080: private Connection target;
081:
082: /** Proxy Connection */
083: private Connection connection;
084:
085: /** Synchronization monitor for the shared Connection */
086: private final Object connectionMonitor = new Object();
087:
088: /**
089: * Create a new SingleConnectionFactory for bean-style usage.
090: * @see #setTargetConnectionFactory
091: */
092: public SingleConnectionFactory() {
093: }
094:
095: /**
096: * Create a new SingleConnectionFactory that always returns the
097: * given Connection. Works with both JMS 1.1 and 1.0.2.
098: * @param target the single Connection
099: */
100: public SingleConnectionFactory(Connection target) {
101: Assert.notNull(target, "Target Connection must not be null");
102: this .target = target;
103: this .connection = getSharedConnectionProxy(target);
104: }
105:
106: /**
107: * Create a new SingleConnectionFactory that always returns a single
108: * Connection that it will lazily create via the given target
109: * ConnectionFactory.
110: * @param targetConnectionFactory the target ConnectionFactory
111: */
112: public SingleConnectionFactory(
113: ConnectionFactory targetConnectionFactory) {
114: Assert.notNull(targetConnectionFactory,
115: "Target ConnectionFactory must not be null");
116: this .targetConnectionFactory = targetConnectionFactory;
117: }
118:
119: /**
120: * Set the target ConnectionFactory which will be used to lazily
121: * create a single Connection.
122: */
123: public void setTargetConnectionFactory(
124: ConnectionFactory targetConnectionFactory) {
125: this .targetConnectionFactory = targetConnectionFactory;
126: }
127:
128: /**
129: * Return the target ConnectionFactory which will be used to lazily
130: * create a single Connection, if any.
131: */
132: public ConnectionFactory getTargetConnectionFactory() {
133: return this .targetConnectionFactory;
134: }
135:
136: /**
137: * Specify a JMS client ID for the single Connection created and exposed
138: * by this ConnectionFactory.
139: * <p>Note that client IDs need to be unique among all active Connections
140: * of the underlying JMS provider. Furthermore, a client ID can only be
141: * assigned if the original ConnectionFactory hasn't already assigned one.
142: * @see javax.jms.Connection#setClientID
143: * @see #setTargetConnectionFactory
144: */
145: public void setClientId(String clientId) {
146: this .clientId = clientId;
147: }
148:
149: /**
150: * Return a JMS client ID for the single Connection created and exposed
151: * by this ConnectionFactory, if any.
152: */
153: protected String getClientId() {
154: return this .clientId;
155: }
156:
157: /**
158: * Specify an JMS ExceptionListener implementation that should be
159: * registered with with the single Connection created by this factory.
160: * @see #setReconnectOnException
161: */
162: public void setExceptionListener(ExceptionListener exceptionListener) {
163: this .exceptionListener = exceptionListener;
164: }
165:
166: /**
167: * Return the JMS ExceptionListener implementation that should be registered
168: * with with the single Connection created by this factory, if any.
169: */
170: protected ExceptionListener getExceptionListener() {
171: return this .exceptionListener;
172: }
173:
174: /**
175: * Specify whether the single Connection should be reset (to be subsequently renewed)
176: * when a JMSException is reported by the underlying Connection.
177: * <p>Default is "false". Switch this to "true" to automatically trigger
178: * recovery based on your JMS provider's exception notifications.
179: * <p>Internally, this will lead to a special JMS ExceptionListener
180: * (this SingleConnectionFactory itself) being registered with the
181: * underlying Connection. This can also be combined with a
182: * user-specified ExceptionListener, if desired.
183: * @see #setExceptionListener
184: */
185: public void setReconnectOnException(boolean reconnectOnException) {
186: this .reconnectOnException = reconnectOnException;
187: }
188:
189: /**
190: * Return whether the single Connection should be renewed when
191: * a JMSException is reported by the underlying Connection.
192: */
193: protected boolean isReconnectOnException() {
194: return this .reconnectOnException;
195: }
196:
197: /**
198: * Make sure a Connection or ConnectionFactory has been set.
199: */
200: public void afterPropertiesSet() {
201: if (this .connection == null
202: && getTargetConnectionFactory() == null) {
203: throw new IllegalArgumentException(
204: "Connection or 'targetConnectionFactory' is required");
205: }
206: }
207:
208: public Connection createConnection() throws JMSException {
209: synchronized (this .connectionMonitor) {
210: if (this .connection == null) {
211: initConnection();
212: }
213: return this .connection;
214: }
215: }
216:
217: public Connection createConnection(String username, String password)
218: throws JMSException {
219: throw new javax.jms.IllegalStateException(
220: "SingleConnectionFactory does not support custom username and password");
221: }
222:
223: public QueueConnection createQueueConnection() throws JMSException {
224: Connection con = createConnection();
225: if (!(con instanceof QueueConnection)) {
226: throw new javax.jms.IllegalStateException(
227: "This SingleConnectionFactory does not hold a QueueConnection but rather: "
228: + con);
229: }
230: return ((QueueConnection) con);
231: }
232:
233: public QueueConnection createQueueConnection(String username,
234: String password) throws JMSException {
235: throw new javax.jms.IllegalStateException(
236: "SingleConnectionFactory does not support custom username and password");
237: }
238:
239: public TopicConnection createTopicConnection() throws JMSException {
240: Connection con = createConnection();
241: if (!(con instanceof TopicConnection)) {
242: throw new javax.jms.IllegalStateException(
243: "This SingleConnectionFactory does not hold a TopicConnection but rather: "
244: + con);
245: }
246: return ((TopicConnection) con);
247: }
248:
249: public TopicConnection createTopicConnection(String username,
250: String password) throws JMSException {
251: throw new javax.jms.IllegalStateException(
252: "SingleConnectionFactory does not support custom username and password");
253: }
254:
255: /**
256: * Exception listener callback that renews the underlying single Connection.
257: */
258: public void onException(JMSException ex) {
259: resetConnection();
260: }
261:
262: /**
263: * Close the underlying shared connection.
264: * The provider of this ConnectionFactory needs to care for proper shutdown.
265: * <p>As this bean implements DisposableBean, a bean factory will
266: * automatically invoke this on destruction of its cached singletons.
267: */
268: public void destroy() {
269: resetConnection();
270: }
271:
272: /**
273: * Initialize the underlying shared Connection.
274: * <p>Closes and reinitializes the Connection if an underlying
275: * Connection is present already.
276: * @throws javax.jms.JMSException if thrown by JMS API methods
277: */
278: public void initConnection() throws JMSException {
279: if (getTargetConnectionFactory() == null) {
280: throw new IllegalStateException(
281: "'targetConnectionFactory' is required for lazily initializing a Connection");
282: }
283: synchronized (this .connectionMonitor) {
284: if (this .target != null) {
285: closeConnection(this .target);
286: }
287: this .target = doCreateConnection();
288: prepareConnection(this .target);
289: if (logger.isInfoEnabled()) {
290: logger.info("Established shared JMS Connection: "
291: + this .target);
292: }
293: this .connection = getSharedConnectionProxy(this .target);
294: }
295: }
296:
297: /**
298: * Reset the underlying shared Connection, to be reinitialized on next access.
299: */
300: public void resetConnection() {
301: synchronized (this .connectionMonitor) {
302: if (this .target != null) {
303: closeConnection(this .target);
304: }
305: this .target = null;
306: this .connection = null;
307: }
308: }
309:
310: /**
311: * Create a JMS Connection via this template's ConnectionFactory.
312: * <p>This implementation uses JMS 1.1 API.
313: * @return the new JMS Connection
314: * @throws javax.jms.JMSException if thrown by JMS API methods
315: */
316: protected Connection doCreateConnection() throws JMSException {
317: return getTargetConnectionFactory().createConnection();
318: }
319:
320: /**
321: * Prepare the given Connection before it is exposed.
322: * <p>The default implementation applies ExceptionListener and client id.
323: * Can be overridden in subclasses.
324: * @param con the Connection to prepare
325: * @throws JMSException if thrown by JMS API methods
326: * @see #setExceptionListener
327: * @see #setReconnectOnException
328: */
329: protected void prepareConnection(Connection con)
330: throws JMSException {
331: if (getExceptionListener() != null || isReconnectOnException()) {
332: ExceptionListener listenerToUse = getExceptionListener();
333: if (isReconnectOnException()) {
334: listenerToUse = new InternalChainedExceptionListener(
335: this , listenerToUse);
336: }
337: con.setExceptionListener(listenerToUse);
338: }
339: if (getClientId() != null) {
340: con.setClientID(getClientId());
341: }
342: }
343:
344: /**
345: * Close the given Connection.
346: * @param con the Connection to close
347: */
348: protected void closeConnection(Connection con) {
349: try {
350: try {
351: con.stop();
352: } finally {
353: con.close();
354: }
355: } catch (Throwable ex) {
356: logger.warn("Could not close shared JMS Connection", ex);
357: }
358: }
359:
360: /**
361: * Wrap the given Connection with a proxy that delegates every method call to it
362: * but suppresses close calls. This is useful for allowing application code to
363: * handle a special framework Connection just like an ordinary Connection from a
364: * JMS ConnectionFactory.
365: * @param target the original Connection to wrap
366: * @return the wrapped Connection
367: */
368: protected Connection getSharedConnectionProxy(Connection target) {
369: List classes = new ArrayList(3);
370: classes.add(Connection.class);
371: if (target instanceof QueueConnection) {
372: classes.add(QueueConnection.class);
373: }
374: if (target instanceof TopicConnection) {
375: classes.add(TopicConnection.class);
376: }
377: return (Connection) Proxy.newProxyInstance(getClass()
378: .getClassLoader(), (Class[]) classes
379: .toArray(new Class[classes.size()]),
380: new SharedConnectionInvocationHandler(target));
381: }
382:
383: /**
384: * Invocation handler that suppresses close calls on JMS Connections.
385: */
386: private static class SharedConnectionInvocationHandler implements
387: InvocationHandler {
388:
389: private final Connection target;
390:
391: private SharedConnectionInvocationHandler(Connection target) {
392: this .target = target;
393: }
394:
395: public Object invoke(Object proxy, Method method, Object[] args)
396: throws Throwable {
397: if (method.getName().equals("equals")) {
398: // Only consider equal when proxies are identical.
399: return (proxy == args[0] ? Boolean.TRUE : Boolean.FALSE);
400: } else if (method.getName().equals("hashCode")) {
401: // Use hashCode of Connection proxy.
402: return new Integer(hashCode());
403: } else if (method.getName().equals("setExceptionListener")) {
404: // Handle setExceptionListener method: throw exception.
405: throw new javax.jms.IllegalStateException(
406: "setExceptionListener call not supported on proxy for shared Connection. "
407: + "Set the 'exceptionListener' property on the SingleConnectionFactory instead.");
408: } else if (method.getName().equals("setClientID")) {
409: // Handle setExceptionListener method: throw exception.
410: throw new javax.jms.IllegalStateException(
411: "setClientID call not supported on proxy for shared Connection. "
412: + "Set the 'clientId' property on the SingleConnectionFactory instead.");
413: } else if (method.getName().equals("stop")) {
414: // Handle stop method: don't pass the call on.
415: return null;
416: } else if (method.getName().equals("close")) {
417: // Handle close method: don't pass the call on.
418: return null;
419: }
420: try {
421: Object retVal = method.invoke(this .target, args);
422: if (method.getName().equals("getExceptionListener")
423: && retVal instanceof InternalChainedExceptionListener) {
424: // Handle getExceptionListener method: hide internal chain.
425: InternalChainedExceptionListener listener = (InternalChainedExceptionListener) retVal;
426: return listener.getUserListener();
427: } else {
428: return retVal;
429: }
430: } catch (InvocationTargetException ex) {
431: throw ex.getTargetException();
432: }
433: }
434: }
435:
436: /**
437: * Internal chained ExceptionListener for handling the internal recovery listener
438: * in combination with a user-specified listener.
439: */
440: private static class InternalChainedExceptionListener extends
441: ChainedExceptionListener {
442:
443: public InternalChainedExceptionListener(
444: ExceptionListener internalListener,
445: ExceptionListener userListener) {
446: addDelegate(internalListener);
447: if (userListener != null) {
448: addDelegate(userListener);
449: }
450: }
451:
452: public ExceptionListener getUserListener() {
453: ExceptionListener[] delegates = getDelegates();
454: return (delegates.length > 1 ? delegates[1] : null);
455: }
456: }
457:
458: }
|