001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.resource.connectionmanager;
023:
024: import java.util.ArrayList;
025:
026: import javax.transaction.RollbackException;
027: import javax.transaction.Synchronization;
028: import javax.transaction.SystemException;
029: import javax.transaction.Transaction;
030: import javax.transaction.TransactionManager;
031:
032: import org.jboss.logging.Logger;
033: import org.jboss.tm.TransactionLocal;
034: import org.jboss.util.NestedRuntimeException;
035:
036: /**
037: * Organizes transaction synchronization done by JCA.<p>
038: *
039: * This class exists to make sure all Tx synchronizations
040: * are invoked before the cached connection manager closes any
041: * closed connections.
042: *
043: * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
044: * @version $Revision: 57189 $
045: */
046: public class TransactionSynchronizer implements Synchronization {
047: /** The logger */
048: private static final Logger log = Logger
049: .getLogger(TransactionSynchronizer.class);
050:
051: /** The transaction synchronizations */
052: protected static TransactionLocal txSynchs;
053:
054: /** The transaction */
055: protected Transaction tx;
056:
057: /** The enlisting thread */
058: protected Thread enlistingThread;
059:
060: /** Unenlisted */
061: protected ArrayList unenlisted;
062:
063: /** Enlisted */
064: protected ArrayList enlisted;
065:
066: /** The cached connection manager synchronization */
067: protected Synchronization ccmSynch;
068:
069: /** Initialization */
070: public static void setTransactionManager(TransactionManager tm) {
071: txSynchs = new TransactionLocal(tm);
072: }
073:
074: /**
075: * Create a new transaction synchronizer
076: *
077: * @param tx the transaction to synchronize with
078: */
079: private TransactionSynchronizer(Transaction tx) {
080: this .tx = tx;
081: }
082:
083: /**
084: * Add a new Tx synchronization that has not been enlisted
085: *
086: * @param synch the synchronization
087: */
088: synchronized void addUnenlisted(Synchronization synch) {
089: if (unenlisted == null)
090: unenlisted = new ArrayList();
091: unenlisted.add(synch);
092: }
093:
094: /**
095: * Get the unenlisted synchronizations
096: * and say we are enlisting if some are returned.
097: *
098: * @return the unenlisted synchronizations
099: */
100: synchronized ArrayList getUnenlisted() {
101: Thread currentThread = Thread.currentThread();
102: while (enlistingThread != null
103: && enlistingThread != currentThread) {
104: boolean interrupted = false;
105: try {
106: wait();
107: } catch (InterruptedException e) {
108: interrupted = true;
109: }
110: if (interrupted)
111: currentThread.interrupt();
112: }
113: ArrayList result = unenlisted;
114: unenlisted = null;
115: if (result != null)
116: enlistingThread = currentThread;
117: return result;
118: }
119:
120: /**
121: * The synchronization is now enlisted
122: *
123: * @param synch the synchronization
124: */
125: synchronized void addEnlisted(Synchronization synch) {
126: if (enlisted == null)
127: enlisted = new ArrayList();
128: enlisted.add(synch);
129: }
130:
131: /**
132: * Remove an enlisted synchronization
133: *
134: * @param synch the synchronization
135: * @return true when the synchronization was enlisted
136: */
137: synchronized boolean removeEnlisted(Synchronization synch) {
138: return enlisted.remove(synch);
139: }
140:
141: /**
142: * This thread has finished enlisting
143: */
144: synchronized void enlisted() {
145: Thread currentThread = Thread.currentThread();
146: if (enlistingThread == null || enlistingThread != currentThread) {
147: log.warn("Thread " + currentThread
148: + " not the enlisting thread " + enlistingThread,
149: new Exception("STACKTRACE"));
150: return;
151: }
152: enlistingThread = null;
153: notifyAll();
154: }
155:
156: /**
157: * Get a registered transaction synchronizer.
158: *
159: * @param tx the transaction
160: * @return the registered transaction synchronizer for this transaction
161: * @throws RolledbackException if the transaction is already rolled back
162: * @throws SystemException for an error in the tranaction manager
163: */
164: static TransactionSynchronizer getRegisteredSynchronizer(
165: Transaction tx) throws RollbackException, SystemException {
166: TransactionSynchronizer result = (TransactionSynchronizer) txSynchs
167: .get(tx);
168: if (result == null) {
169: result = new TransactionSynchronizer(tx);
170: tx.registerSynchronization(result);
171: txSynchs.set(tx, result);
172: }
173: return result;
174: }
175:
176: /**
177: * Check whether we have a CCM synchronization
178: *
179: * @param tx the transaction
180: */
181: static Synchronization getCCMSynchronization(Transaction tx) {
182: TransactionSynchronizer ts = (TransactionSynchronizer) txSynchs
183: .get(tx);
184: if (ts != null)
185: return ts.ccmSynch;
186: else
187: return null;
188: }
189:
190: /**
191: * Register a new CCM synchronization
192: *
193: * @param tx the transaction
194: * @param synch the synchronization
195: * @throws RolledbackException if the transaction is already rolled back
196: * @throws SystemException for an error in the tranaction manager
197: */
198: static void registerCCMSynchronization(Transaction tx,
199: Synchronization synch) throws RollbackException,
200: SystemException {
201: TransactionSynchronizer ts = getRegisteredSynchronizer(tx);
202: ts.ccmSynch = synch;
203: }
204:
205: /**
206: * Lock for the given transaction
207: *
208: * @param tx the transaction
209: */
210: static void lock(Transaction tx) {
211: try {
212: txSynchs.lock(tx);
213: } catch (InterruptedException e) {
214: throw new NestedRuntimeException(
215: "Unable to get synchronization", e);
216: }
217: }
218:
219: /**
220: * Unlock for the given transaction
221: *
222: * @param tx the transaction
223: */
224: static void unlock(Transaction tx) {
225: txSynchs.unlock(tx);
226: }
227:
228: public void beforeCompletion() {
229: if (enlisted != null) {
230: int i = 0;
231: while (i < enlisted.size()) {
232: Synchronization synch = (Synchronization) enlisted
233: .get(i);
234: invokeBefore(synch);
235: ++i;
236: }
237: }
238:
239: if (ccmSynch != null)
240: invokeBefore(ccmSynch);
241: }
242:
243: public void afterCompletion(int status) {
244: if (enlisted != null) {
245: int i = 0;
246: while (i < enlisted.size()) {
247: Synchronization synch = (Synchronization) enlisted
248: .get(i);
249: invokeAfter(synch, status);
250: ++i;
251: }
252: }
253:
254: if (ccmSynch != null)
255: invokeAfter(ccmSynch, status);
256: }
257:
258: /**
259: * Invoke a beforeCompletion
260: *
261: * @param synch the synchronization
262: */
263: protected void invokeBefore(Synchronization synch) {
264: try {
265: synch.beforeCompletion();
266: } catch (Throwable t) {
267: log.warn("Transaction " + tx
268: + " error in before completion " + synch, t);
269: }
270: }
271:
272: /**
273: * Invoke an afterCompletion
274: *
275: * @param synch the synchronization
276: * @param status the status of the transaction
277: */
278: protected void invokeAfter(Synchronization synch, int status) {
279: try {
280: synch.afterCompletion(status);
281: } catch (Throwable t) {
282: log.warn("Transaction " + tx
283: + " error in after completion " + synch, t);
284: }
285: }
286: }
|