001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jms.connection;
018:
019: import javax.jms.Connection;
020: import javax.jms.ConnectionFactory;
021: import javax.jms.JMSException;
022: import javax.jms.QueueConnection;
023: import javax.jms.QueueConnectionFactory;
024: import javax.jms.QueueSession;
025: import javax.jms.Session;
026: import javax.jms.TopicConnection;
027: import javax.jms.TopicConnectionFactory;
028: import javax.jms.TopicSession;
029:
030: import org.apache.commons.logging.Log;
031: import org.apache.commons.logging.LogFactory;
032:
033: import org.springframework.transaction.support.TransactionSynchronizationAdapter;
034: import org.springframework.transaction.support.TransactionSynchronizationManager;
035: import org.springframework.util.Assert;
036:
037: /**
038: * Helper class for managing a JMS {@link javax.jms.ConnectionFactory}, in particular
039: * for obtaining transactional JMS resources for a given ConnectionFactory.
040: *
041: * <p>Mainly for internal use within the framework. Used by
042: * {@link org.springframework.jms.core.JmsTemplate} as well as
043: * {@link org.springframework.jms.listener.DefaultMessageListenerContainer}.
044: *
045: * @author Juergen Hoeller
046: * @since 2.0
047: * @see SmartConnectionFactory
048: */
049: public abstract class ConnectionFactoryUtils {
050:
051: private static final Log logger = LogFactory
052: .getLog(ConnectionFactoryUtils.class);
053:
054: /**
055: * Release the given Connection, stopping it (if necessary) and eventually closing it.
056: * <p>Checks {@link SmartConnectionFactory#shouldStop}, if available.
057: * This is essentially a more sophisticated version of
058: * {@link org.springframework.jms.support.JmsUtils#closeConnection}.
059: * @param con the Connection to release
060: * (if this is <code>null</code>, the call will be ignored)
061: * @param cf the ConnectionFactory that the Connection was obtained from
062: * (may be <code>null</code>)
063: * @param started whether the Connection might have been started by the application
064: * @see SmartConnectionFactory#shouldStop
065: * @see org.springframework.jms.support.JmsUtils#closeConnection
066: */
067: public static void releaseConnection(Connection con,
068: ConnectionFactory cf, boolean started) {
069: if (con == null) {
070: return;
071: }
072: if (started && cf instanceof SmartConnectionFactory
073: && ((SmartConnectionFactory) cf).shouldStop(con)) {
074: try {
075: con.stop();
076: } catch (Throwable ex) {
077: logger
078: .debug(
079: "Could not stop JMS Connection before closing it",
080: ex);
081: }
082: }
083: try {
084: con.close();
085: } catch (Throwable ex) {
086: logger.debug("Could not close JMS Connection", ex);
087: }
088: }
089:
090: /**
091: * Determine whether the given JMS Session is transactional, that is,
092: * bound to the current thread by Spring's transaction facilities.
093: * @param session the JMS Session to check
094: * @param cf the JMS ConnectionFactory that the Session originated from
095: * @return whether the Session is transactional
096: */
097: public static boolean isSessionTransactional(Session session,
098: ConnectionFactory cf) {
099: if (session == null || cf == null) {
100: return false;
101: }
102: JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
103: .getResource(cf);
104: return (resourceHolder != null && resourceHolder
105: .containsSession(session));
106: }
107:
108: /**
109: * Obtain a JMS Session that is synchronized with the current transaction, if any.
110: * @param cf the ConnectionFactory to obtain a Session for
111: * @param existingCon the existing JMS Connection to obtain a Session for
112: * (may be <code>null</code>)
113: * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
114: * that is synchronized with a Spring-managed transaction (where the main transaction
115: * might be a JDBC-based one for a specific DataSource, for example), with the JMS
116: * transaction committing right after the main transaction. If not allowed, the given
117: * ConnectionFactory needs to handle transaction enlistment underneath the covers.
118: * @return the transactional Session, or <code>null</code> if none found
119: * @throws JMSException in case of JMS failure
120: */
121: public static Session getTransactionalSession(
122: final ConnectionFactory cf, final Connection existingCon,
123: final boolean synchedLocalTransactionAllowed)
124: throws JMSException {
125:
126: return doGetTransactionalSession(cf, new ResourceFactory() {
127: public Session getSession(JmsResourceHolder holder) {
128: return holder.getSession(Session.class, existingCon);
129: }
130:
131: public Connection getConnection(JmsResourceHolder holder) {
132: return (existingCon != null ? existingCon : holder
133: .getConnection());
134: }
135:
136: public Connection createConnection() throws JMSException {
137: return cf.createConnection();
138: }
139:
140: public Session createSession(Connection con)
141: throws JMSException {
142: return con.createSession(
143: synchedLocalTransactionAllowed,
144: Session.AUTO_ACKNOWLEDGE);
145: }
146:
147: public boolean isSynchedLocalTransactionAllowed() {
148: return synchedLocalTransactionAllowed;
149: }
150: });
151: }
152:
153: /**
154: * Obtain a JMS QueueSession that is synchronized with the current transaction, if any.
155: * <p>Mainly intended for use with the JMS 1.0.2 API.
156: * @param cf the ConnectionFactory to obtain a Session for
157: * @param existingCon the existing JMS Connection to obtain a Session for
158: * (may be <code>null</code>)
159: * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
160: * that is synchronized with a Spring-managed transaction (where the main transaction
161: * might be a JDBC-based one for a specific DataSource, for example), with the JMS
162: * transaction committing right after the main transaction. If not allowed, the given
163: * ConnectionFactory needs to handle transaction enlistment underneath the covers.
164: * @return the transactional Session, or <code>null</code> if none found
165: * @throws JMSException in case of JMS failure
166: */
167: public static QueueSession getTransactionalQueueSession(
168: final QueueConnectionFactory cf,
169: final QueueConnection existingCon,
170: final boolean synchedLocalTransactionAllowed)
171: throws JMSException {
172:
173: return (QueueSession) doGetTransactionalSession(cf,
174: new ResourceFactory() {
175: public Session getSession(JmsResourceHolder holder) {
176: return holder.getSession(QueueSession.class,
177: existingCon);
178: }
179:
180: public Connection getConnection(
181: JmsResourceHolder holder) {
182: return (existingCon != null ? existingCon
183: : holder
184: .getConnection(QueueConnection.class));
185: }
186:
187: public Connection createConnection()
188: throws JMSException {
189: return cf.createQueueConnection();
190: }
191:
192: public Session createSession(Connection con)
193: throws JMSException {
194: return ((QueueConnection) con)
195: .createQueueSession(
196: synchedLocalTransactionAllowed,
197: Session.AUTO_ACKNOWLEDGE);
198: }
199:
200: public boolean isSynchedLocalTransactionAllowed() {
201: return synchedLocalTransactionAllowed;
202: }
203: });
204: }
205:
206: /**
207: * Obtain a JMS TopicSession that is synchronized with the current transaction, if any.
208: * <p>Mainly intended for use with the JMS 1.0.2 API.
209: * @param cf the ConnectionFactory to obtain a Session for
210: * @param existingCon the existing JMS Connection to obtain a Session for
211: * (may be <code>null</code>)
212: * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
213: * that is synchronized with a Spring-managed transaction (where the main transaction
214: * might be a JDBC-based one for a specific DataSource, for example), with the JMS
215: * transaction committing right after the main transaction. If not allowed, the given
216: * ConnectionFactory needs to handle transaction enlistment underneath the covers.
217: * @return the transactional Session, or <code>null</code> if none found
218: * @throws JMSException in case of JMS failure
219: */
220: public static TopicSession getTransactionalTopicSession(
221: final TopicConnectionFactory cf,
222: final TopicConnection existingCon,
223: final boolean synchedLocalTransactionAllowed)
224: throws JMSException {
225:
226: return (TopicSession) doGetTransactionalSession(cf,
227: new ResourceFactory() {
228: public Session getSession(JmsResourceHolder holder) {
229: return holder.getSession(TopicSession.class,
230: existingCon);
231: }
232:
233: public Connection getConnection(
234: JmsResourceHolder holder) {
235: return (existingCon != null ? existingCon
236: : holder
237: .getConnection(TopicConnection.class));
238: }
239:
240: public Connection createConnection()
241: throws JMSException {
242: return cf.createTopicConnection();
243: }
244:
245: public Session createSession(Connection con)
246: throws JMSException {
247: return ((TopicConnection) con)
248: .createTopicSession(
249: synchedLocalTransactionAllowed,
250: Session.AUTO_ACKNOWLEDGE);
251: }
252:
253: public boolean isSynchedLocalTransactionAllowed() {
254: return synchedLocalTransactionAllowed;
255: }
256: });
257: }
258:
259: /**
260: * Obtain a JMS Session that is synchronized with the current transaction, if any.
261: * @param connectionFactory the JMS ConnectionFactory to bind for
262: * (used as TransactionSynchronizationManager key)
263: * @param resourceFactory the ResourceFactory to use for extracting or creating
264: * JMS resources
265: * @return the transactional Session, or <code>null</code> if none found
266: * @throws JMSException in case of JMS failure
267: */
268: public static Session doGetTransactionalSession(
269: ConnectionFactory connectionFactory,
270: ResourceFactory resourceFactory) throws JMSException {
271:
272: Assert.notNull(connectionFactory,
273: "ConnectionFactory must not be null");
274: Assert.notNull(resourceFactory,
275: "ResourceFactory must not be null");
276:
277: JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
278: .getResource(connectionFactory);
279: if (resourceHolder != null) {
280: Session session = resourceFactory
281: .getSession(resourceHolder);
282: if (session != null || resourceHolder.isFrozen()) {
283: return session;
284: }
285: }
286: if (!TransactionSynchronizationManager
287: .isSynchronizationActive()) {
288: return null;
289: }
290: JmsResourceHolder resourceHolderToUse = resourceHolder;
291: if (resourceHolderToUse == null) {
292: resourceHolderToUse = new JmsResourceHolder(
293: connectionFactory);
294: }
295: Connection con = resourceFactory
296: .getConnection(resourceHolderToUse);
297: Session session = null;
298: try {
299: boolean isExistingCon = (con != null);
300: if (!isExistingCon) {
301: con = resourceFactory.createConnection();
302: resourceHolderToUse.addConnection(con);
303: }
304: session = resourceFactory.createSession(con);
305: resourceHolderToUse.addSession(session, con);
306: if (!isExistingCon) {
307: con.start();
308: }
309: } catch (JMSException ex) {
310: if (session != null) {
311: try {
312: session.close();
313: } catch (Throwable ex2) {
314: // ignore
315: }
316: }
317: if (con != null) {
318: try {
319: con.close();
320: } catch (Throwable ex2) {
321: // ignore
322: }
323: }
324: throw ex;
325: }
326: if (resourceHolderToUse != resourceHolder) {
327: TransactionSynchronizationManager
328: .registerSynchronization(new JmsResourceSynchronization(
329: connectionFactory, resourceHolderToUse,
330: resourceFactory
331: .isSynchedLocalTransactionAllowed()));
332: resourceHolderToUse.setSynchronizedWithTransaction(true);
333: TransactionSynchronizationManager.bindResource(
334: connectionFactory, resourceHolderToUse);
335: }
336: return session;
337: }
338:
339: /**
340: * Callback interface for resource creation.
341: * Serving as argument for the <code>doGetTransactionalSession</code> method.
342: */
343: public interface ResourceFactory {
344:
345: /**
346: * Fetch an appropriate Session from the given JmsResourceHolder.
347: * @param holder the JmsResourceHolder
348: * @return an appropriate Session fetched from the holder,
349: * or <code>null</code> if none found
350: */
351: Session getSession(JmsResourceHolder holder);
352:
353: /**
354: * Fetch an appropriate Connection from the given JmsResourceHolder.
355: * @param holder the JmsResourceHolder
356: * @return an appropriate Connection fetched from the holder,
357: * or <code>null</code> if none found
358: */
359: Connection getConnection(JmsResourceHolder holder);
360:
361: /**
362: * Create a new JMS Connection for registration with a JmsResourceHolder.
363: * @return the new JMS Connection
364: * @throws JMSException if thrown by JMS API methods
365: */
366: Connection createConnection() throws JMSException;
367:
368: /**
369: * Create a new JMS Session for registration with a JmsResourceHolder.
370: * @param con the JMS Connection to create a Session for
371: * @return the new JMS Session
372: * @throws JMSException if thrown by JMS API methods
373: */
374: Session createSession(Connection con) throws JMSException;
375:
376: /**
377: * Return whether to allow for a local JMS transaction that is synchronized with
378: * a Spring-managed transaction (where the main transaction might be a JDBC-based
379: * one for a specific DataSource, for example), with the JMS transaction
380: * committing right after the main transaction.
381: * @return whether to allow for synchronizing a local JMS transaction
382: */
383: boolean isSynchedLocalTransactionAllowed();
384: }
385:
386: /**
387: * Callback for resource cleanup at the end of a non-native JMS transaction
388: * (e.g. when participating in a JtaTransactionManager transaction).
389: * @see org.springframework.transaction.jta.JtaTransactionManager
390: */
391: private static class JmsResourceSynchronization extends
392: TransactionSynchronizationAdapter {
393:
394: private final Object resourceKey;
395:
396: private final JmsResourceHolder resourceHolder;
397:
398: private final boolean transacted;
399:
400: private boolean holderActive = true;
401:
402: public JmsResourceSynchronization(Object resourceKey,
403: JmsResourceHolder resourceHolder, boolean transacted) {
404: this .resourceKey = resourceKey;
405: this .resourceHolder = resourceHolder;
406: this .transacted = transacted;
407: }
408:
409: public void suspend() {
410: if (this .holderActive) {
411: TransactionSynchronizationManager
412: .unbindResource(this .resourceKey);
413: }
414: }
415:
416: public void resume() {
417: if (this .holderActive) {
418: TransactionSynchronizationManager.bindResource(
419: this .resourceKey, this .resourceHolder);
420: }
421: }
422:
423: public void beforeCompletion() {
424: TransactionSynchronizationManager
425: .unbindResource(this .resourceKey);
426: this .holderActive = false;
427: if (!this .transacted) {
428: this .resourceHolder.closeAll();
429: }
430: }
431:
432: public void afterCommit() {
433: if (this .transacted) {
434: try {
435: this .resourceHolder.commitAll();
436: } catch (JMSException ex) {
437: throw new SynchedLocalTransactionFailedException(
438: "Local JMS transaction failed to commit",
439: ex);
440: }
441: }
442: }
443:
444: public void afterCompletion(int status) {
445: if (this.transacted) {
446: this.resourceHolder.closeAll();
447: }
448: }
449: }
450:
451: }
|