001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.resource.connectionmanager;
023:
024: import java.util.ArrayList;
025: import java.util.Collection;
026: import java.util.HashSet;
027: import java.util.Iterator;
028: import java.util.Set;
029:
030: import javax.management.ObjectName;
031: import javax.naming.InitialContext;
032: import javax.resource.ResourceException;
033: import javax.resource.spi.ConnectionEvent;
034: import javax.resource.spi.ConnectionRequestInfo;
035: import javax.resource.spi.LocalTransaction;
036: import javax.resource.spi.ManagedConnection;
037: import javax.security.auth.Subject;
038: import javax.transaction.RollbackException;
039: import javax.transaction.Status;
040: import javax.transaction.Synchronization;
041: import javax.transaction.SystemException;
042: import javax.transaction.Transaction;
043: import javax.transaction.TransactionManager;
044: import javax.transaction.xa.XAException;
045: import javax.transaction.xa.XAResource;
046: import javax.transaction.xa.Xid;
047:
048: import org.jboss.logging.Logger;
049: import org.jboss.resource.JBossResourceException;
050: import org.jboss.resource.connectionmanager.xa.JcaXAResourceWrapper;
051: import org.jboss.resource.connectionmanager.xa.JcaXAResourceWrapperFactory;
052: import org.jboss.tm.LastResource;
053: import org.jboss.tm.TransactionTimeoutConfiguration;
054: import org.jboss.tm.TxUtils;
055: import org.jboss.util.NestedRuntimeException;
056:
057: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
058:
059: /**
060: * The TxConnectionManager is a JBoss ConnectionManager
061: * implementation for jca adapters implementing LocalTransaction and XAResource support.
062: *
063: * It implements a ConnectionEventListener that implements XAResource to
064: * manage transactions through the Transaction Manager. To assure that all
065: * work in a local transaction occurs over the same ManagedConnection, it
066: * includes a xid to ManagedConnection map. When a Connection is requested
067: * or a transaction started with a connection handle in use, it checks to
068: * see if a ManagedConnection already exists enrolled in the global
069: * transaction and uses it if found. Otherwise a free ManagedConnection
070: * has its LocalTransaction started and is used. From the
071: * BaseConnectionManager2, it includes functionality to obtain managed
072: * connections from
073: * a ManagedConnectionPool mbean, find the Subject from a SubjectSecurityDomain,
074: * and interact with the CachedConnectionManager for connections held over
075: * transaction and method boundaries. Important mbean references are to a
076: * ManagedConnectionPool supplier (typically a JBossManagedConnectionPool), and a
077: * RARDeployment representing the ManagedConnectionFactory.
078: *
079: * This connection manager has to perform the following operations:
080: *
081: * 1. When an application component requests a new ConnectionHandle,
082: * it must find a ManagedConnection, and make sure a
083: * ConnectionEventListener is registered. It must inform the
084: * CachedConnectionManager that a connection handle has been given
085: * out. It needs to count the number of handles for each
086: * ManagedConnection. If there is a current transaction, it must
087: * enlist the ManagedConnection's LocalTransaction in the transaction
088: * using the ConnectionEventListeners XAResource XAResource implementation.
089: * Entry point: ConnectionManager.allocateConnection.
090: * written.
091: *
092: * 2. When a ConnectionClosed event is received from the
093: * ConnectionEventListener, it must reduce the handle count. If
094: * the handle count is zero, the XAResource should be delisted from
095: * the Transaction, if any. The CachedConnectionManager must be
096: * notified that the connection is closed.
097: * Entry point: ConnectionEventListener.ConnectionClosed.
098: * written
099: *
100: *3. When a transaction begun notification is received from the
101: * UserTransaction (via the CachedConnectionManager, all
102: * managedConnections associated with the current object must be
103: * enlisted in the transaction.
104: * Entry point: (from
105: * CachedConnectionManager)
106: * ConnectionCacheListener.transactionStarted(Transaction,
107: * Collection). The collection is of ConnectionRecord objects.
108: * written.
109: *
110: * 5. When an "entering object" notification is received from the
111: * CachedConnectionInterceptor, all the connections for the current
112: * object must be associated with a ManagedConnection. if there is a
113: * Transaction, the XAResource must be enlisted with it.
114: * Entry point: ConnectionCacheListener.reconnect(Collection conns) The Collection
115: * is of ConnectionRecord objects.
116: * written.
117: *
118: * 6. When a "leaving object" notification is received from the
119: * CachedConnectionInterceptor, all the managedConnections for the
120: * current object must have their XAResources delisted from the
121: * current Transaction, if any, and cleanup called on each
122: * ManagedConnection.
123: * Entry point: ConnectionCacheListener.disconnect(Collection conns).
124: * written.
125: *
126: * @author <a href="mailto:d_jencks@users.sourceforge.net">David Jencks</a>
127: * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
128: * @version $Revision: 59870 $
129: */
130: public class TxConnectionManager extends BaseConnectionManager2
131: implements TxConnectionManagerMBean {
132: private static final Throwable FAILED_TO_ENLIST = new Throwable(
133: "Unabled to enlist resource, see the previous warnings.");
134:
135: private ObjectName transactionManagerService;
136:
137: //use the object name, please
138: private String tmName;
139:
140: private TransactionManager tm;
141:
142: private boolean trackConnectionByTx = false;
143:
144: private boolean localTransactions;
145:
146: private int xaResourceTimeout = 0;
147:
148: private Boolean isSameRMOverrideValue;
149:
150: protected static void rethrowAsSystemException(String context,
151: Transaction tx, Throwable t) throws SystemException {
152: if (t instanceof SystemException)
153: throw (SystemException) t;
154: if (t instanceof RuntimeException)
155: throw (RuntimeException) t;
156: if (t instanceof Error)
157: throw (Error) t;
158: if (t instanceof RollbackException)
159: throw new IllegalStateException(context + " tx=" + tx
160: + " marked for rollback.");
161: throw new NestedRuntimeException(context + " tx=" + tx
162: + " got unexpected error ", t);
163: }
164:
165: /**
166: * Default managed TxConnectionManager constructor for mbean instances.
167: */
168: public TxConnectionManager() {
169: }
170:
171: /**
172: * Creates a new <code>TxConnectionManager</code> instance.
173: * for TESTING ONLY!!! not a managed constructor!!
174: *
175: * @param ccm a <code>CachedConnectionManager</code> value
176: * @param poolingStrategy a <code>ManagedConnectionPool</code> value
177: * @param tm a <code>TransactionManager</code> value
178: */
179: public TxConnectionManager(final CachedConnectionManager ccm,
180: final ManagedConnectionPool poolingStrategy,
181: final TransactionManager tm) {
182: super (ccm, poolingStrategy);
183: this .tm = tm;
184: }
185:
186: public ObjectName getTransactionManagerService() {
187: return transactionManagerService;
188: }
189:
190: public void setTransactionManagerService(
191: ObjectName transactionManagerService) {
192: this .transactionManagerService = transactionManagerService;
193: }
194:
195: /**
196: * @deprecated
197: */
198: public void setTransactionManager(final String tmName) {
199: this .tmName = tmName;
200: }
201:
202: /**
203: * @deprecated
204: */
205: public String getTransactionManager() {
206: return this .tmName;
207: }
208:
209: public TransactionManager getTransactionManagerInstance() {
210: return tm;
211: }
212:
213: public void setTransactionManagerInstance(TransactionManager tm) {
214: this .tm = tm;
215: }
216:
217: public boolean isTrackConnectionByTx() {
218: return trackConnectionByTx;
219: }
220:
221: public void setTrackConnectionByTx(boolean trackConnectionByTx) {
222: this .trackConnectionByTx = trackConnectionByTx;
223: }
224:
225: public boolean isLocalTransactions() {
226: return localTransactions;
227: }
228:
229: public void setLocalTransactions(boolean localTransactions) {
230: this .localTransactions = localTransactions;
231: }
232:
233: public int getXAResourceTransactionTimeout() {
234: return xaResourceTimeout;
235: }
236:
237: public void setXAResourceTransactionTimeout(int timeout) {
238: this .xaResourceTimeout = timeout;
239: }
240:
241: public Boolean getIsSameRMOverrideValue() {
242: return isSameRMOverrideValue;
243: }
244:
245: public void setIsSameRMOverrideValue(Boolean isSameRMOverride) {
246: this .isSameRMOverrideValue = isSameRMOverride;
247: }
248:
249: public long getTimeLeftBeforeTransactionTimeout(
250: boolean errorRollback) throws RollbackException {
251: if (tm == null)
252: throw new IllegalStateException("No transaction manager: "
253: + ccmName);
254: if (tm instanceof TransactionTimeoutConfiguration)
255: return ((TransactionTimeoutConfiguration) tm)
256: .getTimeLeftBeforeTransactionTimeout(errorRollback);
257: return -1;
258: }
259:
260: protected void startService() throws Exception {
261: if (transactionManagerService != null)
262: tm = (TransactionManager) getServer().getAttribute(
263: transactionManagerService, "TransactionManager");
264: else {
265: log
266: .warn("----------------------------------------------------------");
267: log
268: .warn("----------------------------------------------------------");
269: log
270: .warn("Please change your datasource setup to use <depends optional-attribute-name\"TransactionManagerService\">jboss:service=TransactionManager</depends>");
271: log
272: .warn("instead of <attribute name=\"TransactionManager\">java:/TransactionManager</attribute>");
273: log.warn("Better still, use a *-ds.xml file");
274: log
275: .warn("----------------------------------------------------------");
276: log
277: .warn("----------------------------------------------------------");
278: tm = (TransactionManager) new InitialContext()
279: .lookup(tmName);
280: }
281:
282: super .startService();
283: }
284:
285: protected void stopService() throws Exception {
286: this .tm = null;
287: super .stopService();
288: }
289:
290: public ConnectionListener getManagedConnection(Subject subject,
291: ConnectionRequestInfo cri) throws ResourceException {
292: Transaction trackByTransaction = null;
293: try {
294: Transaction tx = tm.getTransaction();
295: if (tx != null && TxUtils.isActive(tx) == false)
296: throw new ResourceException(
297: "Transaction is not active: tx=" + tx);
298: if (trackConnectionByTx)
299: trackByTransaction = tx;
300: } catch (Throwable t) {
301: JBossResourceException.rethrowAsResourceException(
302: "Error checking for a transaction.", t);
303: }
304:
305: if (trace)
306: log
307: .trace("getManagedConnection trackByTx="
308: + trackConnectionByTx + " tx="
309: + trackByTransaction);
310: return super .getManagedConnection(trackByTransaction, subject,
311: cri);
312: }
313:
314: public void transactionStarted(Collection crs)
315: throws SystemException {
316: Set cls = new HashSet();
317: for (Iterator i = crs.iterator(); i.hasNext();) {
318: ConnectionRecord cr = (ConnectionRecord) i.next();
319: ConnectionListener cl = cr.cl;
320: if (!cls.contains(cl)) {
321: cls.add(cl);
322: cl.enlist();
323: }
324: }
325: }
326:
327: protected void managedConnectionReconnected(ConnectionListener cl)
328: throws ResourceException {
329: try {
330: cl.enlist();
331: } catch (Throwable t) {
332: if (trace)
333: log.trace(
334: "Could not enlist in transaction on entering meta-aware object! "
335: + cl, t);
336: throw new JBossResourceException(
337: "Could not enlist in transaction on entering meta-aware object!",
338: t);
339: }
340: }
341:
342: protected void managedConnectionDisconnected(ConnectionListener cl)
343: throws ResourceException {
344: Throwable throwable = null;
345: try {
346: cl.delist();
347: } catch (Throwable t) {
348: throwable = t;
349: }
350:
351: //if there are no more handles and tx is complete, we can return to pool.
352: boolean isFree = cl.isManagedConnectionFree();
353: if (trace)
354: log.trace("Disconnected isManagedConnectionFree=" + isFree
355: + " cl=" + cl);
356: if (isFree)
357: returnManagedConnection(cl, false);
358:
359: // Rethrow the error
360: if (throwable != null)
361: JBossResourceException
362: .rethrowAsResourceException(
363: "Could not delist resource, probably a transaction rollback? ",
364: throwable);
365: }
366:
367: public ConnectionListener createConnectionListener(
368: ManagedConnection mc, Object context)
369: throws ResourceException {
370: XAResource xaResource = null;
371:
372: if (localTransactions) {
373: xaResource = new LocalXAResource(log);
374: if (xaResourceTimeout != 0)
375: log
376: .debug("XAResource transaction timeout cannot be set for local transactions: "
377: + getJndiName());
378: } else {
379: xaResource = JcaXAResourceWrapperFactory
380: .getResourceWrapper(mc.getXAResource(),
381: isSameRMOverrideValue);
382:
383: if (xaResourceTimeout != 0) {
384: try {
385: if (xaResource
386: .setTransactionTimeout(xaResourceTimeout) == false)
387: log
388: .debug("XAResource does not support transaction timeout configuration: "
389: + getJndiName());
390: } catch (XAException e) {
391: throw new JBossResourceException(
392: "Unable to set XAResource transaction timeout: "
393: + getJndiName(), e);
394: }
395: }
396: }
397:
398: ConnectionListener cli = new TxConnectionEventListener(mc,
399: poolingStrategy, context, log, xaResource);
400: mc.addConnectionEventListener(cli);
401: return cli;
402: }
403:
404: public boolean isTransactional() {
405: return TxUtils.isCompleted(tm) == false;
406: }
407:
408: // implementation of javax.resource.spi.ConnectionEventListener interface
409: //there is one of these for each ManagedConnection instance. It lives as long as the ManagedConnection.
410: protected class TxConnectionEventListener extends
411: BaseConnectionManager2.BaseConnectionEventListener {
412: /** Use our own logger to prevent MNFE caused by compiler bug with nested classes. */
413: protected Logger log;
414:
415: protected TransactionSynchronization transactionSynchronization;
416:
417: private final XAResource xaResource;
418:
419: /** Whether there is a local transaction */
420: private SynchronizedBoolean localTransaction = new SynchronizedBoolean(
421: false);
422:
423: public TxConnectionEventListener(final ManagedConnection mc,
424: final ManagedConnectionPool mcp, final Object context,
425: Logger log, final XAResource xaResource)
426: throws ResourceException {
427: super (mc, mcp, context, log);
428: this .log = log;
429: this .xaResource = xaResource;
430: if (xaResource instanceof LocalXAResource)
431: ((LocalXAResource) xaResource)
432: .setConnectionListener(this );
433: }
434:
435: public void enlist() throws SystemException {
436: // This method is a bit convulted, but it has to be such because
437: // there is a race condition in the transaction manager where it
438: // unlocks during the enlist of the XAResource. It does this
439: // to avoid distributed deadlocks and to ensure the transaction
440: // timeout can fail a badly behaving resource during the enlist.
441: //
442: // When two threads in the same transaction are trying to enlist
443: // connections they could be from the same resource manager
444: // or even the same connection when tracking the connection by transaction.
445: //
446: // For the same connection, we only want to do the real enlist once.
447: // For two connections from the same resource manager we don't
448: // want the join before the initial start request.
449: //
450: // The solution is to build up a list of unenlisted resources
451: // in the TransactionSynchronizer and then choose one of the
452: // threads that is contending in the transaction to enlist them
453: // in order. The actual order doesn't really matter as it is the
454: // transaction manager that calculates the enlist flags and determines
455: // whether the XAResource was already enlisted.
456: //
457: // Once there are no unenlisted resources the threads are released
458: // to return the result of the enlistments.
459: //
460: // In practice, a thread just takes a snapshot to try to avoid one
461: // thread having to do all the work. If it did not do them all
462: // the next waiting thread will do the next snapshot until there
463: // there is either no snapshot or no waiting threads.
464: //
465: // A downside to this design is a thread could have its resource enlisted by
466: // an earlier thread while it enlists some later thread's resource.
467: // Since they are all a part of the same transaction, this is probably
468: // not a real issue.
469:
470: // No transaction associated with the thread
471: int status = tm.getStatus();
472: if (status == Status.STATUS_NO_TRANSACTION) {
473: if (transactionSynchronization != null
474: && transactionSynchronization.currentTx != null) {
475: String error = "Attempt to use connection outside a transaction when already a tx!";
476: if (trace)
477: log.trace(error + " " + this );
478: throw new IllegalStateException(error);
479: }
480: if (trace)
481: log.trace("No transaction, no need to enlist: "
482: + this );
483: return;
484: }
485:
486: // Inactive transaction
487: Transaction threadTx = tm.getTransaction();
488: if (threadTx == null || status != Status.STATUS_ACTIVE) {
489: String error = "Transaction " + threadTx
490: + " is not active "
491: + TxUtils.getStatusAsString(status);
492: if (trace)
493: log.trace(error + " cl=" + this );
494: throw new IllegalStateException(error);
495: }
496:
497: if (trace)
498: log.trace("Pre-enlist: " + this + " threadTx="
499: + threadTx);
500:
501: // Our synchronization
502: TransactionSynchronization ourSynchronization = null;
503:
504: // Serializes enlistment when two different threads are enlisting
505: // different connections in the same transaction concurrently
506: TransactionSynchronizer synchronizer = null;
507:
508: TransactionSynchronizer.lock(threadTx);
509: try {
510: // Interleaving should have an unenlisted transaction
511: // TODO: We should be able to do some sharing shouldn't we?
512: if (isTrackByTx() == false
513: && transactionSynchronization != null) {
514: String error = "Can't enlist - already a tx!";
515: if (trace)
516: log.trace(error + " " + this );
517: throw new IllegalStateException(error);
518: }
519:
520: // Check for different transaction
521: if (transactionSynchronization != null
522: && transactionSynchronization.currentTx
523: .equals(threadTx) == false) {
524: String error = "Trying to change transaction "
525: + threadTx + " in enlist!";
526: if (trace)
527: log.trace(error + " " + this );
528: throw new IllegalStateException(error);
529: }
530:
531: // Get the synchronizer
532: try {
533: if (trace)
534: log.trace("Get synchronizer " + this
535: + " threadTx=" + threadTx);
536: synchronizer = TransactionSynchronizer
537: .getRegisteredSynchronizer(threadTx);
538: } catch (Throwable t) {
539: setTrackByTx(false);
540: rethrowAsSystemException(
541: "Cannot register synchronization",
542: threadTx, t);
543: }
544:
545: // First time through, create a transaction synchronization
546: if (transactionSynchronization == null) {
547: TransactionSynchronization synchronization = new TransactionSynchronization(
548: threadTx, isTrackByTx());
549: synchronizer.addUnenlisted(synchronization);
550: transactionSynchronization = synchronization;
551: }
552: ourSynchronization = transactionSynchronization;
553: } finally {
554: TransactionSynchronizer.unlock(threadTx);
555: }
556:
557: // Perform the enlistment(s)
558: ArrayList unenlisted = synchronizer.getUnenlisted();
559: if (unenlisted != null) {
560: try {
561: for (int i = 0; i < unenlisted.size(); ++i) {
562: TransactionSynchronization sync = (TransactionSynchronization) unenlisted
563: .get(i);
564: if (sync.enlist())
565: synchronizer.addEnlisted(sync);
566: }
567: } finally {
568: synchronizer.enlisted();
569: }
570: }
571:
572: // What was the result of our enlistment?
573: if (trace)
574: log.trace("Check enlisted " + this + " threadTx="
575: + threadTx);
576: ourSynchronization.checkEnlisted();
577: }
578:
579: public void delist() throws ResourceException {
580: if (trace)
581: log.trace("delisting " + this );
582:
583: try {
584: if (isTrackByTx() == false
585: && transactionSynchronization != null) {
586: Transaction tx = transactionSynchronization.currentTx;
587: TransactionSynchronization synchronization = transactionSynchronization;
588: transactionSynchronization = null;
589: if (TxUtils.isUncommitted(tx)) {
590: TransactionSynchronizer synchronizer = TransactionSynchronizer
591: .getRegisteredSynchronizer(tx);
592: if (synchronization.enlisted)
593: synchronizer
594: .removeEnlisted(synchronization);
595: if (tx.delistResource(getXAResource(),
596: XAResource.TMSUSPEND) == false)
597: throw new ResourceException(
598: "Failure to delist resource: "
599: + this );
600: }
601: }
602: } catch (Throwable t) {
603: JBossResourceException.rethrowAsResourceException(
604: "Error in delist!", t);
605: }
606: }
607:
608: //local will return this, xa will return one from mc.
609: protected XAResource getXAResource() {
610: return xaResource;
611: }
612:
613: public void connectionClosed(ConnectionEvent ce) {
614: if (trace)
615: log.trace("connectionClosed called mc="
616: + this .getManagedConnection());
617: if (this .getManagedConnection() != (ManagedConnection) ce
618: .getSource())
619: throw new IllegalArgumentException(
620: "ConnectionClosed event received from wrong ManagedConnection! Expected: "
621: + this .getManagedConnection()
622: + ", actual: " + ce.getSource());
623: try {
624: getCcm().unregisterConnection(TxConnectionManager.this ,
625: ce.getConnectionHandle());
626: } catch (Throwable t) {
627: log.info("throwable from unregister connection", t);
628: }
629:
630: try {
631: unregisterAssociation(this , ce.getConnectionHandle());
632: boolean isFree = isManagedConnectionFree();
633: if (trace)
634: log.trace("isManagedConnectionFree=" + isFree
635: + " mc=" + this .getManagedConnection());
636: //no more handles
637: if (isFree) {
638: delist();
639: returnManagedConnection(this , false);
640: }
641: } catch (Throwable t) {
642: log.error("Error while closing connection handle!", t);
643: returnManagedConnection(this , true);
644: }
645: }
646:
647: public void localTransactionStarted(ConnectionEvent ce) {
648: localTransaction.set(true);
649: }
650:
651: public void localTransactionCommitted(ConnectionEvent ce) {
652: localTransaction.set(false);
653: }
654:
655: public void localTransactionRolledback(ConnectionEvent ce) {
656: localTransaction.set(false);
657: }
658:
659: public void tidyup() throws ResourceException {
660: // We have a hanging transaction
661: if (localTransaction.get()) {
662: LocalTransaction local = null;
663: ManagedConnection mc = getManagedConnection();
664: try {
665: local = mc.getLocalTransaction();
666: } catch (Throwable t) {
667: JBossResourceException.rethrowAsResourceException(
668: "Unfinished local transaction - error getting local transaction from "
669: + this , t);
670: }
671: if (local == null)
672: throw new ResourceException(
673: "Unfinished local transaction but managed connection does not provide a local transaction. "
674: + this );
675: else {
676: local.rollback();
677: log
678: .debug("Unfinished local transaction was rolled back."
679: + this );
680: }
681: }
682: }
683:
684: public void connectionErrorOccurred(ConnectionEvent ce) {
685: transactionSynchronization = null;
686: super .connectionErrorOccurred(ce);
687: }
688:
689: //Important method!!
690: public boolean isManagedConnectionFree() {
691: if (isTrackByTx() && transactionSynchronization != null)
692: return false;
693: return super .isManagedConnectionFree();
694: }
695:
696: private class TransactionSynchronization implements
697: Synchronization {
698: /** Transaction */
699: private Transaction currentTx;
700:
701: /** This is the status when we were registered */
702: private boolean wasTrackByTx;
703:
704: /** Whether we are enlisted */
705: private boolean enlisted = false;
706:
707: /** Any error during enlistment */
708: private Throwable enlistError;
709:
710: /**
711: * Create a new TransactionSynchronization.
712: *
713: * @param trackByTx whether this is track by connection
714: */
715: public TransactionSynchronization(Transaction tx,
716: boolean trackByTx) {
717: this .currentTx = tx;
718: wasTrackByTx = trackByTx;
719: }
720:
721: /**
722: * Get the result of the enlistment
723: *
724: * @throws SystemExeption for any error
725: */
726: public void checkEnlisted() throws SystemException {
727: if (enlistError != null) {
728: String error = "Error enlisting resource in transaction="
729: + currentTx;
730: if (trace)
731: log.trace(error + " "
732: + TxConnectionEventListener.this );
733:
734: // Wrap the error to give a reasonable stacktrace since the resource
735: // could have been enlisted by a different thread
736: if (enlistError == FAILED_TO_ENLIST)
737: throw new SystemException(FAILED_TO_ENLIST
738: + " tx=" + currentTx);
739: else {
740: SystemException e = new SystemException(error);
741: e.initCause(enlistError);
742: throw e;
743: }
744: }
745: if (enlisted == false) {
746: String error = "Resource is not enlisted in transaction="
747: + currentTx;
748: if (trace)
749: log.trace(error + " "
750: + TxConnectionEventListener.this );
751: throw new IllegalStateException(
752: "Resource was not enlisted.");
753: }
754: }
755:
756: /**
757: * Enlist the resource
758: *
759: * @return true when enlisted, false otherwise
760: */
761: public boolean enlist() {
762: if (trace)
763: log.trace("Enlisting resource "
764: + TxConnectionEventListener.this );
765: try {
766: XAResource resource = getXAResource();
767: if (false == currentTx.enlistResource(resource))
768: enlistError = FAILED_TO_ENLIST;
769: } catch (Throwable t) {
770: enlistError = t;
771: }
772:
773: synchronized (this ) {
774: if (enlistError != null) {
775: if (trace)
776: log.trace("Failed to enlist resource "
777: + TxConnectionEventListener.this ,
778: enlistError);
779: setTrackByTx(false);
780: transactionSynchronization = null;
781: return false;
782: }
783:
784: if (trace)
785: log.trace("Enlisted resource "
786: + TxConnectionEventListener.this );
787: enlisted = true;
788: return true;
789: }
790: }
791:
792: public void beforeCompletion() {
793: }
794:
795: public void afterCompletion(int status) {
796: // The connection got destroyed during the transaction
797: if (getState() == DESTROYED)
798: return;
799:
800: // Are we still in the original transaction?
801: if (this .equals(transactionSynchronization) == false) {
802: // If we are interleaving transactions we have nothing to do
803: if (wasTrackByTx == false)
804: return;
805: else {
806: // There is something wrong with the pooling
807: String message = "afterCompletion called with wrong tx! Expected: "
808: + this
809: + ", actual: "
810: + transactionSynchronization;
811: IllegalStateException e = new IllegalStateException(
812: message);
813: log
814: .error(
815: "There is something wrong with the pooling?",
816: e);
817: }
818: }
819: // "Delist"
820: transactionSynchronization = null;
821: // This is where we close when doing track by transaction
822: if (wasTrackByTx) {
823: setTrackByTx(false);
824: if (isManagedConnectionFree())
825: returnManagedConnection(
826: TxConnectionEventListener.this , false);
827: }
828: }
829:
830: public String toString() {
831: StringBuffer buffer = new StringBuffer();
832: buffer.append("TxSync").append(
833: System.identityHashCode(this ));
834: buffer.append("{tx=").append(currentTx);
835: buffer.append(" wasTrackByTx=").append(wasTrackByTx);
836: buffer.append(" enlisted=").append(enlisted);
837: buffer.append("}");
838: return buffer.toString();
839: }
840: }
841:
842: // For debugging
843: protected void toString(StringBuffer buffer) {
844: buffer.append(" xaResource=").append(xaResource);
845: buffer.append(" txSync=")
846: .append(transactionSynchronization);
847: }
848: }
849:
850: private class LocalXAResource implements XAResource, LastResource {
851: protected Logger log;
852:
853: private ConnectionListener cl;
854:
855: /**
856: * <code>warned</code> is set after one warning about a local participant
857: * in a multi-branch jta transaction is logged.
858: *
859: */
860: private boolean warned = false;
861:
862: private Xid currentXid;
863:
864: public LocalXAResource(final Logger log) {
865: this .log = log;
866: }
867:
868: void setConnectionListener(ConnectionListener cl) {
869: this .cl = cl;
870: }
871:
872: // implementation of javax.transaction.xa.XAResource interface
873:
874: public void start(Xid xid, int flags) throws XAException {
875: if (trace)
876: log.trace("start, xid: " + xid + ", flags: " + flags);
877: if (currentXid != null && flags == XAResource.TMNOFLAGS)
878: throw new JBossLocalXAException(
879: "Trying to start a new tx when old is not complete! old: "
880: + currentXid + ", new " + xid
881: + ", flags " + flags,
882: XAException.XAER_PROTO);
883: if (currentXid == null && flags != XAResource.TMNOFLAGS)
884: throw new JBossLocalXAException(
885: "Trying to start a new tx with wrong flags! new "
886: + xid + ", flags " + flags,
887: XAException.XAER_PROTO);
888: if (currentXid == null) {
889: try {
890: cl.getManagedConnection().getLocalTransaction()
891: .begin();
892: } catch (ResourceException re) {
893: throw new JBossLocalXAException(
894: "Error trying to start local tx: ",
895: XAException.XAER_RMERR, re);
896: } catch (Throwable t) {
897: throw new JBossLocalXAException(
898: "Throwable trying to start local transaction!",
899: XAException.XAER_RMERR, t);
900: }
901:
902: currentXid = xid;
903: }
904: }
905:
906: public void end(Xid xid, int flags) throws XAException {
907: if (trace)
908: log.trace("end on xid: " + xid + " called with flags "
909: + flags);
910: }
911:
912: public void commit(Xid xid, boolean onePhase)
913: throws XAException {
914: if (xid.equals(currentXid) == false)
915: throw new JBossLocalXAException(
916: "wrong xid in commit: expected: " + currentXid
917: + ", got: " + xid,
918: XAException.XAER_PROTO);
919: currentXid = null;
920: try {
921: cl.getManagedConnection().getLocalTransaction()
922: .commit();
923: } catch (ResourceException re) {
924: returnManagedConnection(cl, true);
925: if (trace)
926: log.trace("commit problem: ", re);
927: throw new JBossLocalXAException(
928: "could not commit local tx",
929: XAException.XA_RBROLLBACK, re);
930: }
931: }
932:
933: public void forget(Xid xid) throws XAException {
934: throw new JBossLocalXAException(
935: "forget not supported in local tx",
936: XAException.XAER_RMERR);
937: }
938:
939: public int getTransactionTimeout() throws XAException {
940: // TODO: implement this javax.transaction.xa.XAResource method
941: return 0;
942: }
943:
944: public boolean isSameRM(XAResource xaResource)
945: throws XAException {
946: return xaResource == this ;
947: }
948:
949: public int prepare(Xid xid) throws XAException {
950: if (!warned)
951: log
952: .warn("Prepare called on a local tx. Use of local transactions on a jta transaction with more than one branch may result in inconsistent data in some cases of failure.");
953: warned = true;
954: return XAResource.XA_OK;
955: }
956:
957: public Xid[] recover(int flag) throws XAException {
958: throw new JBossLocalXAException(
959: "no recover with local-tx only resource managers",
960: XAException.XAER_RMERR);
961: }
962:
963: public void rollback(Xid xid) throws XAException {
964: if (xid.equals(currentXid) == false)
965: throw new JBossLocalXAException(
966: "wrong xid in rollback: expected: "
967: + currentXid + ", got: " + xid,
968: XAException.XAER_PROTO);
969: currentXid = null;
970: try {
971: cl.getManagedConnection().getLocalTransaction()
972: .rollback();
973: } catch (ResourceException re) {
974: returnManagedConnection(cl, true);
975: if (trace)
976: log.trace("rollback problem: ", re);
977: throw new JBossLocalXAException(
978: "could not rollback local tx",
979: XAException.XAER_RMERR, re);
980: }
981: }
982:
983: public boolean setTransactionTimeout(int seconds)
984: throws XAException {
985: // TODO: implement this javax.transaction.xa.XAResource method
986: return false;
987: }
988: }
989: }
|