001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jms.connection;
018:
019: import javax.jms.Connection;
020: import javax.jms.ConnectionFactory;
021: import javax.jms.JMSException;
022: import javax.jms.QueueConnection;
023: import javax.jms.QueueConnectionFactory;
024: import javax.jms.QueueSession;
025: import javax.jms.Session;
026: import javax.jms.TopicConnection;
027: import javax.jms.TopicConnectionFactory;
028: import javax.jms.TopicSession;
029:
030: import org.apache.commons.logging.Log;
031: import org.apache.commons.logging.LogFactory;
032:
033: import org.springframework.transaction.support.TransactionSynchronizationAdapter;
034: import org.springframework.transaction.support.TransactionSynchronizationManager;
035: import org.springframework.util.Assert;
036:
037: /**
038: * Helper class for managing a JMS {@link javax.jms.ConnectionFactory}, in particular
039: * for obtaining transactional JMS resources for a given ConnectionFactory.
040: *
041: * <p>Mainly for internal use within the framework. Used by
042: * {@link org.springframework.jms.core.JmsTemplate} as well as
043: * {@link org.springframework.jms.listener.DefaultMessageListenerContainer}.
044: *
045: * @author Juergen Hoeller
046: * @since 2.0
047: * @see SmartConnectionFactory
048: */
049: public abstract class ConnectionFactoryUtils {
050:
051: private static final Log logger = LogFactory
052: .getLog(ConnectionFactoryUtils.class);
053:
054: /**
055: * Release the given Connection, stopping it (if necessary) and eventually closing it.
056: * <p>Checks {@link SmartConnectionFactory#shouldStop}, if available.
057: * This is essentially a more sophisticated version of
058: * {@link org.springframework.jms.support.JmsUtils#closeConnection}.
059: * @param con the Connection to release
060: * (if this is <code>null</code>, the call will be ignored)
061: * @param cf the ConnectionFactory that the Connection was obtained from
062: * (may be <code>null</code>)
063: * @param started whether the Connection might have been started by the application
064: * @see SmartConnectionFactory#shouldStop
065: * @see org.springframework.jms.support.JmsUtils#closeConnection
066: */
067: public static void releaseConnection(Connection con,
068: ConnectionFactory cf, boolean started) {
069: if (con == null) {
070: return;
071: }
072: if (started && cf instanceof SmartConnectionFactory
073: && ((SmartConnectionFactory) cf).shouldStop(con)) {
074: try {
075: con.stop();
076: } catch (Throwable ex) {
077: logger
078: .debug(
079: "Could not stop JMS Connection before closing it",
080: ex);
081: }
082: }
083: try {
084: con.close();
085: } catch (Throwable ex) {
086: logger.debug("Could not close JMS Connection", ex);
087: }
088: }
089:
090: /**
091: * Determine whether the given JMS Session is transactional, that is,
092: * bound to the current thread by Spring's transaction facilities.
093: * @param session the JMS Session to check
094: * @param cf the JMS ConnectionFactory that the Session originated from
095: * @return whether the Session is transactional
096: */
097: public static boolean isSessionTransactional(Session session,
098: ConnectionFactory cf) {
099: if (session == null || cf == null) {
100: return false;
101: }
102: JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
103: .getResource(cf);
104: return (resourceHolder != null && resourceHolder
105: .containsSession(session));
106: }
107:
108: /**
109: * Obtain a JMS Session that is synchronized with the current transaction, if any.
110: * @param cf the ConnectionFactory to obtain a Session for
111: * @param existingCon the existing JMS Connection to obtain a Session for
112: * (may be <code>null</code>)
113: * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
114: * that is synchronized with a Spring-managed transaction (where the main transaction
115: * might be a JDBC-based one for a specific DataSource, for example), with the JMS
116: * transaction committing right after the main transaction. If not allowed, the given
117: * ConnectionFactory needs to handle transaction enlistment underneath the covers.
118: * @return the transactional Session, or <code>null</code> if none found
119: * @throws JMSException in case of JMS failure
120: */
121: public static Session getTransactionalSession(
122: final ConnectionFactory cf, final Connection existingCon,
123: final boolean synchedLocalTransactionAllowed)
124: throws JMSException {
125:
126: return doGetTransactionalSession(cf, new ResourceFactory() {
127: public Session getSession(JmsResourceHolder holder) {
128: return holder.getSession(Session.class, existingCon);
129: }
130:
131: public Connection getConnection(JmsResourceHolder holder) {
132: return (existingCon != null ? existingCon : holder
133: .getConnection());
134: }
135:
136: public Connection createConnection() throws JMSException {
137: return cf.createConnection();
138: }
139:
140: public Session createSession(Connection con)
141: throws JMSException {
142: return con.createSession(
143: synchedLocalTransactionAllowed,
144: Session.AUTO_ACKNOWLEDGE);
145: }
146:
147: public boolean isSynchedLocalTransactionAllowed() {
148: return synchedLocalTransactionAllowed;
149: }
150: }, true);
151: }
152:
153: /**
154: * Obtain a JMS QueueSession that is synchronized with the current transaction, if any.
155: * <p>Mainly intended for use with the JMS 1.0.2 API.
156: * @param cf the ConnectionFactory to obtain a Session for
157: * @param existingCon the existing JMS Connection to obtain a Session for
158: * (may be <code>null</code>)
159: * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
160: * that is synchronized with a Spring-managed transaction (where the main transaction
161: * might be a JDBC-based one for a specific DataSource, for example), with the JMS
162: * transaction committing right after the main transaction. If not allowed, the given
163: * ConnectionFactory needs to handle transaction enlistment underneath the covers.
164: * @return the transactional Session, or <code>null</code> if none found
165: * @throws JMSException in case of JMS failure
166: */
167: public static QueueSession getTransactionalQueueSession(
168: final QueueConnectionFactory cf,
169: final QueueConnection existingCon,
170: final boolean synchedLocalTransactionAllowed)
171: throws JMSException {
172:
173: return (QueueSession) doGetTransactionalSession(cf,
174: new ResourceFactory() {
175: public Session getSession(JmsResourceHolder holder) {
176: return holder.getSession(QueueSession.class,
177: existingCon);
178: }
179:
180: public Connection getConnection(
181: JmsResourceHolder holder) {
182: return (existingCon != null ? existingCon
183: : holder
184: .getConnection(QueueConnection.class));
185: }
186:
187: public Connection createConnection()
188: throws JMSException {
189: return cf.createQueueConnection();
190: }
191:
192: public Session createSession(Connection con)
193: throws JMSException {
194: return ((QueueConnection) con)
195: .createQueueSession(
196: synchedLocalTransactionAllowed,
197: Session.AUTO_ACKNOWLEDGE);
198: }
199:
200: public boolean isSynchedLocalTransactionAllowed() {
201: return synchedLocalTransactionAllowed;
202: }
203: }, true);
204: }
205:
206: /**
207: * Obtain a JMS TopicSession that is synchronized with the current transaction, if any.
208: * <p>Mainly intended for use with the JMS 1.0.2 API.
209: * @param cf the ConnectionFactory to obtain a Session for
210: * @param existingCon the existing JMS Connection to obtain a Session for
211: * (may be <code>null</code>)
212: * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
213: * that is synchronized with a Spring-managed transaction (where the main transaction
214: * might be a JDBC-based one for a specific DataSource, for example), with the JMS
215: * transaction committing right after the main transaction. If not allowed, the given
216: * ConnectionFactory needs to handle transaction enlistment underneath the covers.
217: * @return the transactional Session, or <code>null</code> if none found
218: * @throws JMSException in case of JMS failure
219: */
220: public static TopicSession getTransactionalTopicSession(
221: final TopicConnectionFactory cf,
222: final TopicConnection existingCon,
223: final boolean synchedLocalTransactionAllowed)
224: throws JMSException {
225:
226: return (TopicSession) doGetTransactionalSession(cf,
227: new ResourceFactory() {
228: public Session getSession(JmsResourceHolder holder) {
229: return holder.getSession(TopicSession.class,
230: existingCon);
231: }
232:
233: public Connection getConnection(
234: JmsResourceHolder holder) {
235: return (existingCon != null ? existingCon
236: : holder
237: .getConnection(TopicConnection.class));
238: }
239:
240: public Connection createConnection()
241: throws JMSException {
242: return cf.createTopicConnection();
243: }
244:
245: public Session createSession(Connection con)
246: throws JMSException {
247: return ((TopicConnection) con)
248: .createTopicSession(
249: synchedLocalTransactionAllowed,
250: Session.AUTO_ACKNOWLEDGE);
251: }
252:
253: public boolean isSynchedLocalTransactionAllowed() {
254: return synchedLocalTransactionAllowed;
255: }
256: }, true);
257: }
258:
259: /**
260: * Obtain a JMS Session that is synchronized with the current transaction, if any.
261: * <p>This <code>doGetTransactionalSession</code> variant always starts the underlying
262: * JMS Connection, assuming that the Session will be used for receiving messages.
263: * @param connectionFactory the JMS ConnectionFactory to bind for
264: * (used as TransactionSynchronizationManager key)
265: * @param resourceFactory the ResourceFactory to use for extracting or creating
266: * JMS resources
267: * @return the transactional Session, or <code>null</code> if none found
268: * @throws JMSException in case of JMS failure
269: * @see #doGetTransactionalSession(javax.jms.ConnectionFactory, ResourceFactory, boolean)
270: */
271: public static Session doGetTransactionalSession(
272: ConnectionFactory connectionFactory,
273: ResourceFactory resourceFactory) throws JMSException {
274:
275: return doGetTransactionalSession(connectionFactory,
276: resourceFactory, true);
277: }
278:
279: /**
280: * Obtain a JMS Session that is synchronized with the current transaction, if any.
281: * @param connectionFactory the JMS ConnectionFactory to bind for
282: * (used as TransactionSynchronizationManager key)
283: * @param resourceFactory the ResourceFactory to use for extracting or creating
284: * JMS resources
285: * @param startConnection whether the underlying JMS Connection approach should be
286: * started in order to allow for receiving messages. Note that a reused Connection
287: * may already have been started before, even if this flag is <code>false</code>.
288: * @return the transactional Session, or <code>null</code> if none found
289: * @throws JMSException in case of JMS failure
290: */
291: public static Session doGetTransactionalSession(
292: ConnectionFactory connectionFactory,
293: ResourceFactory resourceFactory, boolean startConnection)
294: throws JMSException {
295:
296: Assert.notNull(connectionFactory,
297: "ConnectionFactory must not be null");
298: Assert.notNull(resourceFactory,
299: "ResourceFactory must not be null");
300:
301: JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
302: .getResource(connectionFactory);
303: if (resourceHolder != null) {
304: Session session = resourceFactory
305: .getSession(resourceHolder);
306: if (session != null) {
307: if (startConnection) {
308: Connection con = resourceFactory
309: .getConnection(resourceHolder);
310: if (con != null) {
311: con.start();
312: }
313: }
314: return session;
315: }
316: if (resourceHolder.isFrozen()) {
317: return null;
318: }
319: }
320: if (!TransactionSynchronizationManager
321: .isSynchronizationActive()) {
322: return null;
323: }
324: JmsResourceHolder resourceHolderToUse = resourceHolder;
325: if (resourceHolderToUse == null) {
326: resourceHolderToUse = new JmsResourceHolder(
327: connectionFactory);
328: }
329: Connection con = resourceFactory
330: .getConnection(resourceHolderToUse);
331: Session session = null;
332: try {
333: boolean isExistingCon = (con != null);
334: if (!isExistingCon) {
335: con = resourceFactory.createConnection();
336: resourceHolderToUse.addConnection(con);
337: }
338: session = resourceFactory.createSession(con);
339: resourceHolderToUse.addSession(session, con);
340: if (startConnection) {
341: con.start();
342: }
343: } catch (JMSException ex) {
344: if (session != null) {
345: try {
346: session.close();
347: } catch (Throwable ex2) {
348: // ignore
349: }
350: }
351: if (con != null) {
352: try {
353: con.close();
354: } catch (Throwable ex2) {
355: // ignore
356: }
357: }
358: throw ex;
359: }
360: if (resourceHolderToUse != resourceHolder) {
361: TransactionSynchronizationManager
362: .registerSynchronization(new JmsResourceSynchronization(
363: connectionFactory, resourceHolderToUse,
364: resourceFactory
365: .isSynchedLocalTransactionAllowed()));
366: resourceHolderToUse.setSynchronizedWithTransaction(true);
367: TransactionSynchronizationManager.bindResource(
368: connectionFactory, resourceHolderToUse);
369: }
370: return session;
371: }
372:
373: /**
374: * Callback interface for resource creation.
375: * Serving as argument for the <code>doGetTransactionalSession</code> method.
376: */
377: public interface ResourceFactory {
378:
379: /**
380: * Fetch an appropriate Session from the given JmsResourceHolder.
381: * @param holder the JmsResourceHolder
382: * @return an appropriate Session fetched from the holder,
383: * or <code>null</code> if none found
384: */
385: Session getSession(JmsResourceHolder holder);
386:
387: /**
388: * Fetch an appropriate Connection from the given JmsResourceHolder.
389: * @param holder the JmsResourceHolder
390: * @return an appropriate Connection fetched from the holder,
391: * or <code>null</code> if none found
392: */
393: Connection getConnection(JmsResourceHolder holder);
394:
395: /**
396: * Create a new JMS Connection for registration with a JmsResourceHolder.
397: * @return the new JMS Connection
398: * @throws JMSException if thrown by JMS API methods
399: */
400: Connection createConnection() throws JMSException;
401:
402: /**
403: * Create a new JMS Session for registration with a JmsResourceHolder.
404: * @param con the JMS Connection to create a Session for
405: * @return the new JMS Session
406: * @throws JMSException if thrown by JMS API methods
407: */
408: Session createSession(Connection con) throws JMSException;
409:
410: /**
411: * Return whether to allow for a local JMS transaction that is synchronized with
412: * a Spring-managed transaction (where the main transaction might be a JDBC-based
413: * one for a specific DataSource, for example), with the JMS transaction
414: * committing right after the main transaction.
415: * @return whether to allow for synchronizing a local JMS transaction
416: */
417: boolean isSynchedLocalTransactionAllowed();
418: }
419:
420: /**
421: * Callback for resource cleanup at the end of a non-native JMS transaction
422: * (e.g. when participating in a JtaTransactionManager transaction).
423: * @see org.springframework.transaction.jta.JtaTransactionManager
424: */
425: private static class JmsResourceSynchronization extends
426: TransactionSynchronizationAdapter {
427:
428: private final Object resourceKey;
429:
430: private final JmsResourceHolder resourceHolder;
431:
432: private final boolean transacted;
433:
434: private boolean holderActive = true;
435:
436: public JmsResourceSynchronization(Object resourceKey,
437: JmsResourceHolder resourceHolder, boolean transacted) {
438: this .resourceKey = resourceKey;
439: this .resourceHolder = resourceHolder;
440: this .transacted = transacted;
441: }
442:
443: public void suspend() {
444: if (this .holderActive) {
445: TransactionSynchronizationManager
446: .unbindResource(this .resourceKey);
447: }
448: }
449:
450: public void resume() {
451: if (this .holderActive) {
452: TransactionSynchronizationManager.bindResource(
453: this .resourceKey, this .resourceHolder);
454: }
455: }
456:
457: public void beforeCompletion() {
458: TransactionSynchronizationManager
459: .unbindResource(this .resourceKey);
460: this .holderActive = false;
461: if (!this .transacted) {
462: this .resourceHolder.closeAll();
463: }
464: }
465:
466: public void afterCommit() {
467: if (this .transacted) {
468: try {
469: this .resourceHolder.commitAll();
470: } catch (JMSException ex) {
471: throw new SynchedLocalTransactionFailedException(
472: "Local JMS transaction failed to commit",
473: ex);
474: }
475: }
476: }
477:
478: public void afterCompletion(int status) {
479: if (this.transacted) {
480: this.resourceHolder.closeAll();
481: }
482: }
483: }
484:
485: }
|