001: package org.apache.ojb.broker.core;
002:
003: /* Copyright 2004-2005 The Apache Software Foundation
004: *
005: * Licensed under the Apache License, Version 2.0 (the "License");
006: * you may not use this file except in compliance with the License.
007: * You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: import javax.transaction.RollbackException;
019: import javax.transaction.Status;
020: import javax.transaction.Synchronization;
021: import javax.transaction.SystemException;
022: import javax.transaction.Transaction;
023: import javax.transaction.TransactionManager;
024: import java.lang.reflect.Field;
025: import java.util.ArrayList;
026: import java.util.Collections;
027: import java.util.HashMap;
028: import java.util.Iterator;
029: import java.util.List;
030: import java.util.Map;
031: import java.util.WeakHashMap;
032:
033: import org.apache.commons.pool.KeyedObjectPool;
034: import org.apache.ojb.broker.PBFactoryException;
035: import org.apache.ojb.broker.PBKey;
036: import org.apache.ojb.broker.PersistenceBrokerInternal;
037: import org.apache.ojb.broker.TransactionAbortedException;
038: import org.apache.ojb.broker.TransactionInProgressException;
039: import org.apache.ojb.broker.TransactionNotInProgressException;
040: import org.apache.ojb.broker.accesslayer.ConnectionManagerIF;
041: import org.apache.ojb.broker.transaction.tm.TransactionManagerFactoryException;
042: import org.apache.ojb.broker.transaction.tm.TransactionManagerFactoryFactory;
043: import org.apache.ojb.broker.util.BrokerHelper;
044: import org.apache.ojb.broker.util.logging.Logger;
045: import org.apache.ojb.broker.util.logging.LoggerFactory;
046:
047: /**
048: * Workaround for participate the PB-api in JTA {@link javax.transaction.Transaction transaction} by
049: * implementing the {@link javax.transaction.Synchronization} interface.
050: * <br/>
051: * This may will be deprecated when we implemented a full JCA compliant connector.
052: * <br/>
053: * When a new {@link org.apache.ojb.broker.PersistenceBroker} instance is created in method
054: * {@link #wrapBrokerWithPoolingHandle}
055: * the given PB instance is wrapped with {@link PersistenceBrokerSyncImpl} before it was put to the PB-pool.
056: * When a PB instance was requested class try to lookup the current JTA transaction in
057: * {@link #wrapRequestedBrokerInstance} before the pooled PB instance was wrapped with the PB handle.
058: * If a running tx was found the PB instance was registered with the transaction using the
059: * {@link Synchronization} interface.
060: *
061: * @author <a href="mailto:armin@codeAuLait.de">Armin Waibel</a>
062: * @version $Id: PersistenceBrokerFactorySyncImpl.java,v 1.7.2.8 2005/12/21 22:25:01 tomdz Exp $
063: */
064: public class PersistenceBrokerFactorySyncImpl extends
065: PersistenceBrokerFactoryDefaultImpl {
066: private Logger log = LoggerFactory
067: .getLogger(PersistenceBrokerFactorySyncImpl.class);
068: private TransactionManager txMan;
069: private TxRegistry txRegistry;
070:
071: public PersistenceBrokerFactorySyncImpl() {
072: super ();
073: try {
074: txMan = TransactionManagerFactoryFactory.instance()
075: .getTransactionManager();
076: } catch (TransactionManagerFactoryException e) {
077: throw new PBFactoryException(
078: "Can't instantiate TransactionManager of managed environment",
079: e);
080: }
081: txRegistry = new TxRegistry();
082: }
083:
084: public PersistenceBrokerInternal createPersistenceBroker(PBKey pbKey)
085: throws PBFactoryException {
086: /*
087: try to find a valid PBKey, if given key does not full match
088: */
089: pbKey = BrokerHelper.crossCheckPBKey(pbKey);
090: /*
091: arminw:
092: First try to find a running JTA-tx. If a tx was found we try to find
093: an associated PB instance. This ensures that in a running tx
094: always the same PB instance was used.
095: If no tx was found we lookup a instance from pool.
096: All used PB instances always be wrapped with a "PBHandle"
097: */
098: Transaction tx;
099: try {
100: // search for an active tx
101: tx = searchForValidTx();
102: } catch (SystemException e) {
103: throw new PBFactoryException(
104: "Can't create PB instance, failure while lookup"
105: + " running JTA transaction", e);
106: }
107: PersistenceBrokerSyncImpl obtainedBroker = null;
108: PersistenceBrokerSyncHandle result;
109: if (tx != null) {
110: // try to find a broker instance already used by current tx
111: obtainedBroker = txRegistry.findBroker(tx, pbKey);
112: }
113:
114: if (obtainedBroker == null || obtainedBroker.isClosed()) {
115: // we have to lookup new PB instance with wrapped with handle
116: // method #wrapRequestedBrokerInstance wraps the new instance
117: // with a handle
118: result = (PersistenceBrokerSyncHandle) super
119: .createPersistenceBroker(pbKey);
120: } else {
121: // we found a PB instance that was already in use within the same JTA-tx
122: // so we only return a new handle
123: result = new PersistenceBrokerSyncHandle(obtainedBroker);
124: }
125: return result;
126: }
127:
128: protected PersistenceBrokerInternal wrapBrokerWithPoolingHandle(
129: PersistenceBrokerInternal broker, KeyedObjectPool pool) {
130: // wrap real PB instance with an extended version of pooling PB
131: return new PersistenceBrokerSyncImpl(broker, pool);
132: }
133:
134: protected PersistenceBrokerInternal wrapRequestedBrokerInstance(
135: PersistenceBrokerInternal broker) {
136: // all PB instance should be of this type
137: if (!(broker instanceof PersistenceBrokerSyncImpl)) {
138: throw new PBFactoryException("Expect instance of "
139: + PersistenceBrokerSyncImpl.class + ", found "
140: + broker.getClass());
141: }
142: /*
143: Before we return the PB handle, we jump into the running JTA tx
144: */
145: PersistenceBrokerSyncImpl pb = (PersistenceBrokerSyncImpl) broker;
146: try {
147: // search for an active tx
148: Transaction tx = searchForValidTx();
149: if (tx != null) {
150: txRegistry.register(tx, pb);
151: try {
152: pb.internBegin();
153: } catch (Exception e) {
154: /*
155: if something going wrong with pb-tx, we rollback the
156: whole JTA tx
157: */
158: log
159: .error(
160: "Unexpected exception when start intern pb-tx",
161: e);
162: try {
163: tx.setRollbackOnly();
164: } catch (Throwable ignore) {
165: }
166: throw new PBFactoryException(
167: "Unexpected exception when start intern pb-tx",
168: e);
169: }
170: }
171: } catch (Exception e) {
172: if (e instanceof PBFactoryException) {
173: throw (PBFactoryException) e;
174: } else {
175: throw new PBFactoryException(
176: "Error while try to participate in JTA transaction",
177: e);
178: }
179: }
180: return new PersistenceBrokerSyncHandle(pb);
181: }
182:
183: private Transaction searchForValidTx() throws SystemException {
184: Transaction tx = txMan.getTransaction();
185: if (tx != null) {
186: int status = tx.getStatus();
187: if (status != Status.STATUS_ACTIVE
188: && status != Status.STATUS_NO_TRANSACTION) {
189: throw new PBFactoryException(
190: "Transaction synchronization failed - wrong"
191: + " status of external JTA tx. Expected was an 'active' or 'no transaction'"
192: + ", found status is '"
193: + getStatusFlagAsString(status) + "'");
194: }
195: }
196: return tx;
197: }
198:
199: /**
200: * Returns a string representation of the given
201: * {@link javax.transaction.Status} flag.
202: */
203: private static String getStatusFlagAsString(int status) {
204: String statusName = "no match, unknown status!";
205: try {
206: Field[] fields = Status.class.getDeclaredFields();
207: for (int i = 0; i < fields.length; i++) {
208: if (fields[i].getInt(null) == status) {
209: statusName = fields[i].getName();
210: break;
211: }
212: }
213: } catch (Exception e) {
214: statusName = "no match, unknown status!";
215: }
216: return statusName;
217: }
218:
219: //****************************************************
220: // inner class
221: //****************************************************
222: public static class PersistenceBrokerSyncImpl extends
223: PoolablePersistenceBroker implements Synchronization {
224: private Logger log = LoggerFactory
225: .getLogger(PersistenceBrokerSyncImpl.class);
226: /**
227: * Used to register all handles using this PB instance
228: */
229: private List handleList = new ArrayList();
230:
231: public PersistenceBrokerSyncImpl(
232: PersistenceBrokerInternal broker, KeyedObjectPool pool) {
233: super (broker, pool);
234: }
235:
236: public void beforeCompletion() {
237: if (log.isDebugEnabled())
238: log.debug("beforeCompletion was called, nothing to do");
239: if (handleList.size() > 0) {
240: for (int i = 0; i < handleList.size(); i++) {
241: log
242: .warn("Found unclosed PersistenceBroker handle, will do automatic close. Please make"
243: + " sure that all used PB instances will be closed.");
244: PersistenceBrokerHandle pbh = (PersistenceBrokerHandle) handleList
245: .get(i);
246: pbh.close();
247: }
248: handleList.clear();
249: }
250: ConnectionManagerIF cm = serviceConnectionManager();
251: if (cm.isBatchMode())
252: cm.executeBatch();
253: // close connection immediately when in JTA-tx to avoid bad reports from server con-pool
254: if (cm.isInLocalTransaction()) {
255: // we should not be in a local tx when performing tx completion
256: log
257: .warn("Seems the used PersistenceBroker handle wasn't closed, close the used"
258: + " handle before the transaction completes.");
259: // in managed environments this call will be ignored by
260: // the wrapped connection
261: cm.localCommit();
262: }
263: cm.releaseConnection();
264: }
265:
266: public void afterCompletion(int status) {
267: if (log.isDebugEnabled())
268: log.debug("afterCompletion was called");
269: /*
270: we only commit if tx was successfully committed
271: */
272: try {
273: if (status != Status.STATUS_COMMITTED) {
274: if (status == Status.STATUS_ROLLEDBACK
275: || status == Status.STATUS_ROLLING_BACK) {
276: if (log.isDebugEnabled())
277: log
278: .debug("Aborting PB-tx due to JTA initiated Rollback: "
279: + getStatusFlagAsString(status));
280: } else {
281: log
282: .error("Aborting PB-tx due to inconsistent, and unexpected, status of JTA tx: "
283: + getStatusFlagAsString(status));
284: }
285: internAbort();
286: } else {
287: if (log.isDebugEnabled())
288: log.debug("Commit PB-tx");
289: internCommit();
290: }
291: } finally {
292: // returns the underlying PB instance to pool
293: doRealClose();
294: }
295: }
296:
297: private void internBegin() {
298: setManaged(true);
299: super .beginTransaction();
300: }
301:
302: private void internCommit() {
303: super .commitTransaction();
304: }
305:
306: private void internAbort() {
307: super .abortTransaction();
308: }
309:
310: private void doRealClose() {
311: if (log.isDebugEnabled())
312: log.debug("Now do real close of PB instance");
313: super .close();
314: }
315:
316: public boolean close() {
317: if (!isInTransaction()) {
318: if (log.isDebugEnabled())
319: log
320: .debug("PB close was called, pass the close call to underlying PB instance");
321: /*
322: if we not in JTA-tx, we close PB instance in a "normal" way. The PB.close()
323: should also release the used connection.
324: */
325: doRealClose();
326: } else {
327: // if we in tx and other handles operate on the same PB instance, do
328: // nothing, till all handles are closed.
329: if (handleList.size() > 0) {
330: if (log.isEnabledFor(Logger.INFO))
331: log
332: .info("PB.close(): Active used by "
333: + handleList.size()
334: + " handle objects, will skip close call");
335: } else {
336: /*
337: arminw:
338: if in JTA-tx, we don't really close the underlying PB instance (return PB
339: instance to pool, release used connection). As recently as the JTA was
340: completed we can return PB instance to pool. Thus after tx completion method
341: doRealClose() was called to close (return to pool) underlying PB instance.
342:
343: But to free used resources as soon as possible, we release the used connection
344: immediately. The JTA-tx will handle the connection status in a proper way.
345: */
346: if (log.isDebugEnabled())
347: log
348: .debug("PB close was called, only close the PB handle when in JTA-tx");
349:
350: /*
351: TODO: workaround, in 1.1 use special method do handle this stuff
352: arminw:
353: needed to prevent unclosed connection Statement instances when RsIterator
354: wasn't fully materialized in managed environment, because RsIterator is
355: a PBStateListener and below we close the connection.
356: */
357: PersistenceBrokerImpl pb = ((PersistenceBrokerImpl) getInnermostDelegate());
358: pb.fireBrokerEvent(pb.BEFORE_CLOSE_EVENT);
359:
360: ConnectionManagerIF cm = serviceConnectionManager();
361: if (cm.isInLocalTransaction()) {
362: /*
363: arminw:
364: in managed environment con.commit calls will be ignored because, the JTA
365: transaction manager control the connection status. But to make
366: connectionManager happy we have to complete the "local tx" of the
367: connectionManager before release the connection
368: */
369: cm.localCommit();
370: }
371: cm.releaseConnection();
372: }
373: }
374: return true;
375: }
376:
377: void registerHandle(PersistenceBrokerHandle handle) {
378: handleList.add(handle);
379: }
380:
381: void deregisterHandle(PersistenceBrokerHandle handle) {
382: handleList.remove(handle);
383: }
384:
385: public void beginTransaction()
386: throws TransactionInProgressException,
387: TransactionAbortedException {
388: throw new UnsupportedOperationException(
389: "In managed environments only JTA transaction demarcation allowed");
390: }
391:
392: public void commitTransaction()
393: throws TransactionNotInProgressException,
394: TransactionAbortedException {
395: throw new UnsupportedOperationException(
396: "In managed environments only JTA transaction demarcation allowed");
397: }
398:
399: public void abortTransaction()
400: throws TransactionNotInProgressException {
401: throw new UnsupportedOperationException(
402: "In managed environments only JTA transaction demarcation allowed");
403: }
404: }
405:
406: //****************************************************
407: // inner class
408: //****************************************************
409: /**
410: * This class collects all PB instances requested in the scope of one transaction
411: */
412: class TransactionBox implements Synchronization {
413: Transaction jtaTx;
414: Map syncMap = new HashMap();
415: boolean isLocked = false;
416: boolean isClosed = false;
417:
418: public TransactionBox(Transaction tx) {
419: this .jtaTx = tx;
420: }
421:
422: PersistenceBrokerSyncImpl find(PBKey key) {
423: return (PersistenceBrokerSyncImpl) syncMap.get(key);
424: }
425:
426: void add(PersistenceBrokerSyncImpl syncObj) {
427: if (isLocked) {
428: throw new PBFactoryException(
429: "Can't associate object with JTA transaction, because tx-completion started");
430: }
431: syncMap.put(syncObj.getPBKey(), syncObj);
432: }
433:
434: public void afterCompletion(int status) {
435: boolean failures = false;
436: Synchronization synchronization = null;
437: for (Iterator iterator = syncMap.values().iterator(); iterator
438: .hasNext();) {
439: try {
440: synchronization = (Synchronization) iterator.next();
441: synchronization.afterCompletion(status);
442: } catch (Exception e) {
443: failures = true;
444: log.error(
445: "Unexpected error when perform Synchronization#afterCompletion method"
446: + " call on object "
447: + synchronization, e);
448: }
449: }
450: isClosed = true;
451: // discard association of PB instances and jta-tx
452: txRegistry.removeTxBox(jtaTx);
453: if (failures) {
454: throw new PBFactoryException(
455: "Unexpected error occured while performing"
456: + " Synchronization#afterCompletion method");
457: }
458: }
459:
460: public void beforeCompletion() {
461: boolean failures = false;
462: Synchronization synchronization = null;
463: for (Iterator iterator = syncMap.values().iterator(); iterator
464: .hasNext();) {
465: try {
466: synchronization = (Synchronization) iterator.next();
467: synchronization.beforeCompletion();
468: } catch (Exception e) {
469: failures = true;
470: log.error(
471: "Unexpected error when perform Synchronization#beforeCompletion method"
472: + " call on object "
473: + synchronization, e);
474: }
475: }
476: isLocked = true;
477: if (failures) {
478: throw new PBFactoryException(
479: "Unexpected error occured while performing"
480: + " Synchronization#beforeCompletion method");
481: }
482: }
483: }
484:
485: //****************************************************
486: // inner class
487: //****************************************************
488: /**
489: * Maps all {@link TransactionBox} instances based on {@link Transaction} object identity.
490: *
491: * TODO: Not sure if we should held TransactionBox instances per thread or per transaction object identity.
492: * As far as I know it is possible in JTA that thread A starts a tx and thread B commits the tx, thus I
493: * start with tx identity as key in registry
494: */
495: class TxRegistry {
496: Map txBoxMap;
497:
498: public TxRegistry() {
499: txBoxMap = Collections.synchronizedMap(new WeakHashMap());
500: }
501:
502: void register(Transaction tx,
503: PersistenceBrokerSyncImpl syncObject)
504: throws RollbackException, SystemException {
505: TransactionBox txBox = (TransactionBox) txBoxMap.get(tx);
506: if (txBox == null || txBox.isClosed) {
507: // if environment reuse tx instances we can find closed TransactionBox instances
508: if (txBox != null)
509: txBoxMap.remove(tx);
510: txBox = new TransactionBox(tx);
511: tx.registerSynchronization(txBox);
512: txBoxMap.put(tx, txBox);
513: }
514: txBox.add(syncObject);
515: }
516:
517: PersistenceBrokerSyncImpl findBroker(Transaction tx, PBKey pbKey) {
518: PersistenceBrokerSyncImpl result = null;
519: TransactionBox txBox = (TransactionBox) txBoxMap.get(tx);
520: if (txBox != null) {
521: result = txBox.find(pbKey);
522: }
523: return result;
524: }
525:
526: TransactionBox findTxBox(Transaction tx) {
527: return (TransactionBox) txBoxMap.get(tx);
528: }
529:
530: void removeTxBox(Transaction tx) {
531: txBoxMap.remove(tx);
532: }
533: }
534:
535: //****************************************************
536: // inner class
537: //****************************************************
538: /**
539: * This wrapper was used when a PB instance which was already in use by a
540: * transaction was found.
541: */
542: class PersistenceBrokerSyncHandle extends PersistenceBrokerHandle {
543: /**
544: * Constructor for the handle, set itself in
545: * {@link PersistenceBrokerThreadMapping#setCurrentPersistenceBroker}
546: */
547: public PersistenceBrokerSyncHandle(
548: PersistenceBrokerSyncImpl broker) {
549: super (broker);
550: // we register handle at underlying PB instance
551: broker.registerHandle(this );
552: }
553:
554: public boolean isClosed() {
555: return super .isClosed();
556: }
557:
558: public boolean close() {
559: if (getDelegate() != null) {
560: // deregister from underlying PB instance
561: ((PersistenceBrokerSyncImpl) getDelegate())
562: .deregisterHandle(this);
563: }
564: return super.close();
565: }
566: }
567: }
|