001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jms.connection;
018:
019: import java.lang.reflect.InvocationHandler;
020: import java.lang.reflect.InvocationTargetException;
021: import java.lang.reflect.Method;
022: import java.lang.reflect.Proxy;
023: import java.util.ArrayList;
024: import java.util.List;
025:
026: import javax.jms.Connection;
027: import javax.jms.ConnectionFactory;
028: import javax.jms.JMSException;
029: import javax.jms.QueueConnection;
030: import javax.jms.QueueConnectionFactory;
031: import javax.jms.QueueSession;
032: import javax.jms.Session;
033: import javax.jms.TopicConnection;
034: import javax.jms.TopicConnectionFactory;
035: import javax.jms.TopicSession;
036: import javax.jms.TransactionInProgressException;
037:
038: import org.springframework.util.Assert;
039:
040: /**
041: * Proxy for a target JMS {@link javax.jms.ConnectionFactory}, adding awareness of
042: * Spring-managed transactions. Similar to a transactional JNDI ConnectionFactory
043: * as provided by a J2EE server.
044: *
045: * <p>Messaging code that should remain unaware of Spring's JMS support can work
046: * with this proxy to seamlessly participate in Spring-managed transactions.
047: * Note that the transaction manager, for example {@link JmsTransactionManager},
048: * still needs to work with the underlying ConnectionFactory, <i>not</i> with
049: * this proxy.
050: *
051: * <p><b>Make sure that TransactionAwareConnectionFactoryProxy is the outermost
052: * ConnectionFactory of a chain of ConnectionFactory proxies/adapters.</b>
053: * TransactionAwareConnectionFactoryProxy can delegate either directly to the
054: * target factory or to some intermediary adapter like
055: * {@link UserCredentialsConnectionFactoryAdapter}.
056: *
057: * <p>Delegates to {@link ConnectionFactoryUtils} for automatically participating
058: * in thread-bound transactions, for example managed by {@link JmsTransactionManager}.
059: * <code>createSession</code> calls and <code>close</code> calls on returned Sessions
060: * will behave properly within a transaction, that is, always work on the transactional
061: * Session. If not within a transaction, normal ConnectionFactory behavior applies.
062: *
063: * <p>Note that transactional JMS Sessions will be registered on a per-Connection
064: * basis. To share the same JMS Session across a transaction, make sure that you
065: * operate on the same JMS Connection handle - either through reusing the handle
066: * or through configuring a {@link SingleConnectionFactory} underneath.
067: *
068: * <p>Returned transactional Session proxies will implement the {@link SessionProxy}
069: * interface to allow for access to the underlying target Session. This is only
070: * intended for accessing vendor-specific Session API or for testing purposes
071: * (e.g. to perform manual transaction control). For typical application purposes,
072: * simply use the standard JMS Session interface.
073: *
074: * @author Juergen Hoeller
075: * @since 2.0
076: * @see UserCredentialsConnectionFactoryAdapter
077: * @see SingleConnectionFactory
078: */
079: public class TransactionAwareConnectionFactoryProxy implements
080: ConnectionFactory, QueueConnectionFactory,
081: TopicConnectionFactory {
082:
083: private boolean synchedLocalTransactionAllowed = false;
084:
085: private ConnectionFactory targetConnectionFactory;
086:
087: /**
088: * Create a new TransactionAwareConnectionFactoryProxy.
089: */
090: public TransactionAwareConnectionFactoryProxy() {
091: }
092:
093: /**
094: * Create a new TransactionAwareConnectionFactoryProxy.
095: * @param targetConnectionFactory the target ConnectionFactory
096: */
097: public TransactionAwareConnectionFactoryProxy(
098: ConnectionFactory targetConnectionFactory) {
099: setTargetConnectionFactory(targetConnectionFactory);
100: }
101:
102: /**
103: * Set the target ConnectionFactory that this ConnectionFactory should delegate to.
104: */
105: public final void setTargetConnectionFactory(
106: ConnectionFactory targetConnectionFactory) {
107: Assert.notNull(targetConnectionFactory,
108: "targetConnectionFactory must not be nul");
109: this .targetConnectionFactory = targetConnectionFactory;
110: }
111:
112: /**
113: * Return the target ConnectionFactory that this ConnectionFactory should delegate to.
114: */
115: protected ConnectionFactory getTargetConnectionFactory() {
116: return this .targetConnectionFactory;
117: }
118:
119: /**
120: * Set whether to allow for a local JMS transaction that is synchronized with a
121: * Spring-managed transaction (where the main transaction might be a JDBC-based
122: * one for a specific DataSource, for example), with the JMS transaction committing
123: * right after the main transaction. If not allowed, the given ConnectionFactory
124: * needs to handle transaction enlistment underneath the covers.
125: * <p>Default is "false": If not within a managed transaction that encompasses
126: * the underlying JMS ConnectionFactory, standard Sessions will be returned.
127: * Turn this flag on to allow participation in any Spring-managed transaction,
128: * with a local JMS transaction synchronized with the main transaction.
129: */
130: public void setSynchedLocalTransactionAllowed(
131: boolean synchedLocalTransactionAllowed) {
132: this .synchedLocalTransactionAllowed = synchedLocalTransactionAllowed;
133: }
134:
135: /**
136: * Return whether to allow for a local JMS transaction that is synchronized
137: * with a Spring-managed transaction.
138: */
139: protected boolean isSynchedLocalTransactionAllowed() {
140: return this .synchedLocalTransactionAllowed;
141: }
142:
143: public Connection createConnection() throws JMSException {
144: Connection targetConnection = this .targetConnectionFactory
145: .createConnection();
146: return getTransactionAwareConnectionProxy(targetConnection);
147: }
148:
149: public Connection createConnection(String username, String password)
150: throws JMSException {
151: Connection targetConnection = this .targetConnectionFactory
152: .createConnection(username, password);
153: return getTransactionAwareConnectionProxy(targetConnection);
154: }
155:
156: public QueueConnection createQueueConnection() throws JMSException {
157: if (!(this .targetConnectionFactory instanceof QueueConnectionFactory)) {
158: throw new javax.jms.IllegalStateException(
159: "'targetConnectionFactory' is no QueueConnectionFactory");
160: }
161: QueueConnection targetConnection = ((QueueConnectionFactory) this .targetConnectionFactory)
162: .createQueueConnection();
163: return (QueueConnection) getTransactionAwareConnectionProxy(targetConnection);
164: }
165:
166: public QueueConnection createQueueConnection(String username,
167: String password) throws JMSException {
168: if (!(this .targetConnectionFactory instanceof QueueConnectionFactory)) {
169: throw new javax.jms.IllegalStateException(
170: "'targetConnectionFactory' is no QueueConnectionFactory");
171: }
172: QueueConnection targetConnection = ((QueueConnectionFactory) this .targetConnectionFactory)
173: .createQueueConnection(username, password);
174: return (QueueConnection) getTransactionAwareConnectionProxy(targetConnection);
175: }
176:
177: public TopicConnection createTopicConnection() throws JMSException {
178: if (!(this .targetConnectionFactory instanceof TopicConnectionFactory)) {
179: throw new javax.jms.IllegalStateException(
180: "'targetConnectionFactory' is no TopicConnectionFactory");
181: }
182: TopicConnection targetConnection = ((TopicConnectionFactory) this .targetConnectionFactory)
183: .createTopicConnection();
184: return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection);
185: }
186:
187: public TopicConnection createTopicConnection(String username,
188: String password) throws JMSException {
189: if (!(this .targetConnectionFactory instanceof TopicConnectionFactory)) {
190: throw new javax.jms.IllegalStateException(
191: "'targetConnectionFactory' is no TopicConnectionFactory");
192: }
193: TopicConnection targetConnection = ((TopicConnectionFactory) this .targetConnectionFactory)
194: .createTopicConnection(username, password);
195: return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection);
196: }
197:
198: /**
199: * Wrap the given Connection with a proxy that delegates every method call to it
200: * but handles Session lookup in a transaction-aware fashion.
201: * @param target the original Connection to wrap
202: * @return the wrapped Connection
203: */
204: private Connection getTransactionAwareConnectionProxy(
205: Connection target) {
206: List classes = new ArrayList(3);
207: classes.add(Connection.class);
208: if (target instanceof QueueConnection) {
209: classes.add(QueueConnection.class);
210: }
211: if (target instanceof TopicConnection) {
212: classes.add(TopicConnection.class);
213: }
214: return (Connection) Proxy
215: .newProxyInstance(
216: getClass().getClassLoader(),
217: (Class[]) classes.toArray(new Class[classes
218: .size()]),
219: new TransactionAwareConnectionInvocationHandler(
220: target));
221: }
222:
223: /**
224: * Invocation handler that exposes transactional Sessions for the underlying Connection.
225: */
226: private class TransactionAwareConnectionInvocationHandler implements
227: InvocationHandler {
228:
229: private final Connection target;
230:
231: public TransactionAwareConnectionInvocationHandler(
232: Connection target) {
233: this .target = target;
234: }
235:
236: public Object invoke(Object proxy, Method method, Object[] args)
237: throws Throwable {
238: // Invocation on ConnectionProxy interface coming in...
239:
240: if (Session.class.equals(method.getReturnType())) {
241: Session session = ConnectionFactoryUtils
242: .getTransactionalSession(
243: getTargetConnectionFactory(),
244: this .target,
245: isSynchedLocalTransactionAllowed());
246: if (session != null) {
247: return getCloseSuppressingSessionProxy(session);
248: }
249: } else if (QueueSession.class
250: .equals(method.getReturnType())) {
251: QueueSession session = ConnectionFactoryUtils
252: .getTransactionalQueueSession(
253: (QueueConnectionFactory) getTargetConnectionFactory(),
254: (QueueConnection) this .target,
255: isSynchedLocalTransactionAllowed());
256: if (session != null) {
257: return getCloseSuppressingSessionProxy(session);
258: }
259: } else if (TopicSession.class
260: .equals(method.getReturnType())) {
261: TopicSession session = ConnectionFactoryUtils
262: .getTransactionalTopicSession(
263: (TopicConnectionFactory) getTargetConnectionFactory(),
264: (TopicConnection) this .target,
265: isSynchedLocalTransactionAllowed());
266: if (session != null) {
267: return getCloseSuppressingSessionProxy(session);
268: }
269: } else if (method.getName().equals("equals")) {
270: // Only consider equal when proxies are identical.
271: return (proxy == args[0] ? Boolean.TRUE : Boolean.FALSE);
272: } else if (method.getName().equals("hashCode")) {
273: // Use hashCode of Connection proxy.
274: return new Integer(hashCode());
275: }
276:
277: // Invoke method on target Connection.
278: try {
279: return method.invoke(this .target, args);
280: } catch (InvocationTargetException ex) {
281: throw ex.getTargetException();
282: }
283: }
284:
285: private Session getCloseSuppressingSessionProxy(Session target) {
286: List classes = new ArrayList(3);
287: classes.add(SessionProxy.class);
288: if (target instanceof QueueSession) {
289: classes.add(QueueSession.class);
290: }
291: if (target instanceof TopicSession) {
292: classes.add(TopicSession.class);
293: }
294: return (Session) Proxy
295: .newProxyInstance(
296: SessionProxy.class.getClassLoader(),
297: (Class[]) classes.toArray(new Class[classes
298: .size()]),
299: new CloseSuppressingSessionInvocationHandler(
300: target));
301: }
302: }
303:
304: /**
305: * Invocation handler that suppresses close calls for a transactional JMS Session.
306: */
307: private static class CloseSuppressingSessionInvocationHandler
308: implements InvocationHandler {
309:
310: private final Session target;
311:
312: public CloseSuppressingSessionInvocationHandler(Session target) {
313: this .target = target;
314: }
315:
316: public Object invoke(Object proxy, Method method, Object[] args)
317: throws Throwable {
318: // Invocation on SessionProxy interface coming in...
319:
320: if (method.getName().equals("getTargetSession")) {
321: // Handle getTargetSession method: return underlying Session.
322: return this .target;
323: } else if (method.getName().equals("equals")) {
324: // Only consider equal when proxies are identical.
325: return (proxy == args[0] ? Boolean.TRUE : Boolean.FALSE);
326: } else if (method.getName().equals("hashCode")) {
327: // Use hashCode of Connection proxy.
328: return new Integer(hashCode());
329: } else if (method.getName().equals("commit")) {
330: throw new TransactionInProgressException(
331: "Commit call not allowed within a managed transaction");
332: } else if (method.getName().equals("rollback")) {
333: throw new TransactionInProgressException(
334: "Rollback call not allowed within a managed transaction");
335: } else if (method.getName().equals("close")) {
336: // Handle close method: not to be closed within a transaction.
337: return null;
338: }
339:
340: // Invoke method on target Session.
341: try {
342: return method.invoke(this .target, args);
343: } catch (InvocationTargetException ex) {
344: throw ex.getTargetException();
345: }
346: }
347: }
348:
349: }
|