001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.openjpa.kernel;
020:
021: import java.io.ObjectStreamException;
022: import java.lang.reflect.InvocationTargetException;
023: import java.util.ArrayList;
024: import java.util.Collection;
025: import java.util.Collections;
026: import java.util.HashMap;
027: import java.util.Iterator;
028: import java.util.LinkedList;
029: import java.util.List;
030: import java.util.Map;
031: import java.util.Properties;
032: import javax.transaction.Status;
033: import javax.transaction.Synchronization;
034: import javax.transaction.Transaction;
035: import javax.transaction.TransactionManager;
036:
037: import org.apache.commons.lang.StringUtils;
038: import org.apache.openjpa.conf.OpenJPAConfiguration;
039: import org.apache.openjpa.conf.OpenJPAVersion;
040: import org.apache.openjpa.datacache.DataCacheStoreManager;
041: import org.apache.openjpa.ee.ManagedRuntime;
042: import org.apache.openjpa.enhance.PCRegistry;
043: import org.apache.openjpa.enhance.PersistenceCapable;
044: import org.apache.openjpa.event.BrokerFactoryEvent;
045: import org.apache.openjpa.event.RemoteCommitEventManager;
046: import org.apache.openjpa.lib.conf.Configuration;
047: import org.apache.openjpa.lib.conf.Configurations;
048: import org.apache.openjpa.lib.log.Log;
049: import org.apache.openjpa.lib.util.J2DoPrivHelper;
050: import org.apache.openjpa.lib.util.JavaVersions;
051: import org.apache.openjpa.lib.util.Localizer;
052: import org.apache.openjpa.lib.util.concurrent.ConcurrentHashMap;
053: import org.apache.openjpa.lib.util.concurrent.ConcurrentReferenceHashSet;
054: import org.apache.openjpa.lib.util.concurrent.ReentrantLock;
055: import org.apache.openjpa.meta.MetaDataRepository;
056: import org.apache.openjpa.util.GeneralException;
057: import org.apache.openjpa.util.InternalException;
058: import org.apache.openjpa.util.InvalidStateException;
059: import org.apache.openjpa.util.OpenJPAException;
060: import org.apache.openjpa.util.UserException;
061:
062: /**
063: * Abstract implementation of the {@link BrokerFactory}
064: * that must be subclassed for a specific runtime.
065: *
066: * @author Abe White
067: */
068: public abstract class AbstractBrokerFactory implements BrokerFactory {
069:
070: private static final Localizer _loc = Localizer
071: .forPackage(AbstractBrokerFactory.class);
072:
073: // static mapping of configurations to pooled broker factories
074: private static final Map _pool = Collections
075: .synchronizedMap(new HashMap());
076:
077: // configuration
078: private final OpenJPAConfiguration _conf;
079: private transient boolean _readOnly = false;
080: private transient boolean _closed = false;
081: private transient RuntimeException _closedException = null;
082: private Map _userObjects = null;
083:
084: // internal lock: spec forbids synchronization on this object
085: private final ReentrantLock _lock = new ReentrantLock();
086:
087: // maps global transactions to associated brokers
088: private transient ConcurrentHashMap _transactional = new ConcurrentHashMap();
089:
090: // weak-ref tracking of open brokers
091: private transient Collection _brokers = new ConcurrentReferenceHashSet(
092: ConcurrentReferenceHashSet.WEAK);
093:
094: // cache the class names loaded from the persistent classes property so
095: // that we can re-load them for each new broker
096: private transient Collection _pcClassNames = null;
097: private transient Collection _pcClassLoaders = null;
098: private transient boolean _persistentTypesLoaded = false;
099:
100: // lifecycle listeners to pass to each broker
101: private transient Map _lifecycleListeners = null;
102:
103: // transaction listeners to pass to each broker
104: private transient List _transactionListeners = null;
105:
106: // key under which this instance can be stored in the broker pool
107: // and later identified
108: private Object _poolKey;
109:
110: /**
111: * Return an internal factory pool key for the given configuration.
112: *
113: * @since 1.1.0
114: */
115: protected static Object toPoolKey(Map map) {
116: Object key = Configurations.getProperty("Id", map);
117: return (key != null) ? key : map;
118: }
119:
120: /**
121: * Register <code>factory</code> in the pool under <code>key</code>.
122: *
123: * @since 1.1.0
124: */
125: protected static void pool(Object key, AbstractBrokerFactory factory) {
126: synchronized (_pool) {
127: _pool.put(key, factory);
128: factory.setPoolKey(key);
129: factory.makeReadOnly();
130: }
131: }
132:
133: /**
134: * Return the pooled factory matching the given key, or null
135: * if none. The key must be of the form created by {@link #getPoolKey}.
136: */
137: public static AbstractBrokerFactory getPooledFactoryForKey(
138: Object key) {
139: return (AbstractBrokerFactory) _pool.get(key);
140: }
141:
142: /**
143: * Constructor. Configuration must be provided on construction.
144: */
145: protected AbstractBrokerFactory(OpenJPAConfiguration config) {
146: _conf = config;
147: getPcClassLoaders();
148: }
149:
150: /**
151: * Return the configuration for this factory.
152: */
153: public OpenJPAConfiguration getConfiguration() {
154: return _conf;
155: }
156:
157: public Broker newBroker() {
158: return newBroker(_conf.getConnectionUserName(), _conf
159: .getConnectionPassword());
160: }
161:
162: public Broker newBroker(String user, String pass) {
163: return newBroker(user, pass, _conf.isTransactionModeManaged(),
164: _conf.getConnectionRetainModeConstant());
165: }
166:
167: public Broker newBroker(boolean managed, int connRetainMode) {
168: return newBroker(_conf.getConnectionUserName(), _conf
169: .getConnectionPassword(), managed, connRetainMode);
170: }
171:
172: public Broker newBroker(String user, String pass, boolean managed,
173: int connRetainMode) {
174: return newBroker(user, pass, managed, connRetainMode, true);
175: }
176:
177: public Broker newBroker(String user, String pass, boolean managed,
178: int connRetainMode, boolean findExisting) {
179: try {
180: assertOpen();
181: makeReadOnly();
182:
183: BrokerImpl broker = null;
184: if (findExisting)
185: broker = findBroker(user, pass, managed);
186: if (broker == null) {
187: broker = newBrokerImpl(user, pass);
188: initializeBroker(managed, connRetainMode, broker, false);
189: }
190: return broker;
191: } catch (OpenJPAException ke) {
192: throw ke;
193: } catch (RuntimeException re) {
194: throw new GeneralException(re);
195: }
196: }
197:
198: void initializeBroker(boolean managed, int connRetainMode,
199: BrokerImpl broker, boolean fromDeserialization) {
200: assertOpen();
201: makeReadOnly();
202:
203: // decorate the store manager for data caching and custom
204: // result object providers; always make sure it's a delegating
205: // store manager, because it's easier for users to deal with
206: // that way
207: StoreManager sm = newStoreManager();
208: DelegatingStoreManager dsm = null;
209: if (_conf.getDataCacheManagerInstance().getSystemDataCache() != null)
210: dsm = new DataCacheStoreManager(sm);
211: dsm = new ROPStoreManager((dsm == null) ? sm : dsm);
212:
213: broker.initialize(this , dsm, managed, connRetainMode,
214: fromDeserialization);
215: if (!fromDeserialization)
216: addListeners(broker);
217:
218: // if we're using remote events, register the event manager so
219: // that it can broadcast commit notifications from the broker
220: RemoteCommitEventManager remote = _conf
221: .getRemoteCommitEventManager();
222: if (remote.areRemoteEventsEnabled())
223: broker.addTransactionListener(remote);
224:
225: loadPersistentTypes(broker.getClassLoader());
226: _brokers.add(broker);
227: _conf.setReadOnly(Configuration.INIT_STATE_FROZEN);
228: }
229:
230: /**
231: * Add factory-registered lifecycle listeners to the broker.
232: */
233: protected void addListeners(BrokerImpl broker) {
234: if (_lifecycleListeners != null
235: && !_lifecycleListeners.isEmpty()) {
236: Map.Entry entry;
237: for (Iterator itr = _lifecycleListeners.entrySet()
238: .iterator(); itr.hasNext();) {
239: entry = (Map.Entry) itr.next();
240: broker.addLifecycleListener(entry.getKey(),
241: (Class[]) entry.getValue());
242: }
243: }
244:
245: if (_transactionListeners != null
246: && !_transactionListeners.isEmpty()) {
247: for (Iterator itr = _transactionListeners.iterator(); itr
248: .hasNext();) {
249: broker.addTransactionListener(itr.next());
250: }
251: }
252: }
253:
254: /**
255: * Load the configured persistent classes list. Performed automatically
256: * whenever a broker is created.
257: */
258: private void loadPersistentTypes(ClassLoader envLoader) {
259: // if we've loaded the persistent types and the class name list
260: // is empty, then we can simply return. Note that there is a
261: // potential threading scenario in which _persistentTypesLoaded is
262: // false when read, but the work to populate _pcClassNames has
263: // already been done. This is ok; _pcClassNames can tolerate
264: // concurrent access, so the worst case is that the list is
265: // persistent type data is processed multiple times, which this
266: // algorithm takes into account.
267: if (_persistentTypesLoaded && _pcClassNames.isEmpty())
268: return;
269:
270: // cache persistent type names if not already
271: ClassLoader loader = _conf.getClassResolverInstance()
272: .getClassLoader(getClass(), envLoader);
273: Collection toRedefine = new ArrayList();
274: if (!_persistentTypesLoaded) {
275: Collection clss = _conf.getMetaDataRepositoryInstance()
276: .loadPersistentTypes(false, loader);
277: if (clss.isEmpty())
278: _pcClassNames = Collections.EMPTY_SET;
279: else {
280: Collection c = new ArrayList(clss.size());
281: for (Iterator itr = clss.iterator(); itr.hasNext();) {
282: Class cls = (Class) itr.next();
283: c.add(cls.getName());
284: if (needsSub(cls))
285: toRedefine.add(cls);
286: }
287: getPcClassLoaders().add(loader);
288: _pcClassNames = c;
289: }
290: _persistentTypesLoaded = true;
291: } else {
292: // reload with this loader
293: if (getPcClassLoaders().add(loader)) {
294: for (Iterator itr = _pcClassNames.iterator(); itr
295: .hasNext();) {
296: try {
297: Class cls = Class.forName((String) itr.next(),
298: true, loader);
299: if (needsSub(cls))
300: toRedefine.add(cls);
301: } catch (Throwable t) {
302: _conf.getLog(OpenJPAConfiguration.LOG_RUNTIME)
303: .warn(null, t);
304: }
305: }
306: }
307: }
308:
309: if (JavaVersions.VERSION >= 5) {
310: try {
311: // This is Java 5 / 6 code. There might be a more elegant
312: // way to bootstrap this into the system, but reflection
313: // will get things working for now. We could potentially
314: // do this by creating a new BrokerFactoryEvent type for
315: // Broker creation, at which point we have an appropriate
316: // classloader to use.
317: Class cls = Class
318: .forName("org.apache.openjpa.enhance.ManagedClassSubclasser");
319: cls.getMethod(
320: "prepareUnenhancedClasses",
321: new Class[] { OpenJPAConfiguration.class,
322: Collection.class, ClassLoader.class })
323: .invoke(
324: null,
325: new Object[] { _conf, toRedefine,
326: envLoader });
327: } catch (NoSuchMethodException e) {
328: // should never happen in a properly-built installation
329: throw new InternalException(e);
330: } catch (IllegalAccessException e) {
331: // should never happen in a properly-built installation
332: throw new InternalException(e);
333: } catch (InvocationTargetException e) {
334: Throwable cause = e.getCause();
335: if (cause instanceof OpenJPAException)
336: throw (OpenJPAException) cause;
337: else
338: throw new InternalException(cause);
339: } catch (ClassNotFoundException e) {
340: // should never happen in a properly-built installation
341: throw new InternalException(e);
342: }
343: }
344: }
345:
346: private boolean needsSub(Class cls) {
347: return !cls.isInterface()
348: && !PersistenceCapable.class.isAssignableFrom(cls);
349: }
350:
351: public void addLifecycleListener(Object listener, Class[] classes) {
352: lock();
353: try {
354: assertOpen();
355: if (_lifecycleListeners == null)
356: _lifecycleListeners = new HashMap(7);
357: _lifecycleListeners.put(listener, classes);
358: } finally {
359: unlock();
360: }
361: }
362:
363: public void removeLifecycleListener(Object listener) {
364: lock();
365: try {
366: assertOpen();
367: if (_lifecycleListeners != null)
368: _lifecycleListeners.remove(listener);
369: } finally {
370: unlock();
371: }
372: }
373:
374: public void addTransactionListener(Object listener) {
375: lock();
376: try {
377: assertOpen();
378: if (_transactionListeners == null)
379: _transactionListeners = new LinkedList();
380: _transactionListeners.add(listener);
381: } finally {
382: unlock();
383: }
384: }
385:
386: public void removeTransactionListener(Object listener) {
387: lock();
388: try {
389: assertOpen();
390: if (_transactionListeners != null)
391: _transactionListeners.remove(listener);
392: } finally {
393: unlock();
394: }
395: }
396:
397: /**
398: * Returns true if this broker factory is closed.
399: */
400: public boolean isClosed() {
401: return _closed;
402: }
403:
404: public void close() {
405: lock();
406: try {
407: assertOpen();
408: assertNoActiveTransaction();
409:
410: // remove from factory pool
411: synchronized (_pool) {
412: if (_pool.get(_poolKey) == this )
413: _pool.remove(_poolKey);
414: }
415:
416: // close all brokers
417: Broker broker;
418: for (Iterator itr = _brokers.iterator(); itr.hasNext();) {
419: broker = (Broker) itr.next();
420: // Check for null because _brokers contains weak references
421: if ((broker != null) && (!broker.isClosed()))
422: broker.close();
423: }
424:
425: if (_conf.metaDataRepositoryAvailable()) {
426: // remove metadata repository from listener list
427: PCRegistry.removeRegisterClassListener(_conf
428: .getMetaDataRepositoryInstance());
429: }
430:
431: _conf.close();
432: _closed = true;
433: Log log = _conf.getLog(OpenJPAConfiguration.LOG_RUNTIME);
434: if (log.isTraceEnabled())
435: _closedException = new IllegalStateException();
436: } finally {
437: unlock();
438: }
439: }
440:
441: /**
442: * Subclasses should override this method to add a <code>Platform</code>
443: * property listing the runtime platform, such as:
444: * <code>OpenJPA JDBC Edition: Oracle Database</code>
445: */
446: public Properties getProperties() {
447: // required props are VendorName and VersionNumber
448: Properties props = new Properties();
449: props.setProperty("VendorName", OpenJPAVersion.VENDOR_NAME);
450: props.setProperty("VersionNumber",
451: OpenJPAVersion.VERSION_NUMBER);
452: props.setProperty("VersionId", OpenJPAVersion.VERSION_ID);
453: return props;
454: }
455:
456: public Object getUserObject(Object key) {
457: lock();
458: try {
459: assertOpen();
460: return (_userObjects == null) ? null : _userObjects
461: .get(key);
462: } finally {
463: unlock();
464: }
465: }
466:
467: public Object putUserObject(Object key, Object val) {
468: lock();
469: try {
470: assertOpen();
471: if (val == null)
472: return (_userObjects == null) ? null : _userObjects
473: .remove(key);
474:
475: if (_userObjects == null)
476: _userObjects = new HashMap();
477: return _userObjects.put(key, val);
478: } finally {
479: unlock();
480: }
481: }
482:
483: public void lock() {
484: _lock.lock();
485: }
486:
487: public void unlock() {
488: _lock.unlock();
489: }
490:
491: /**
492: * Replaces the factory with this JVMs pooled version if it exists. Also
493: * freezes the factory.
494: */
495: protected Object readResolve() throws ObjectStreamException {
496: AbstractBrokerFactory factory = getPooledFactoryForKey(_poolKey);
497: if (factory != null)
498: return factory;
499:
500: // reset these transient fields to empty values
501: _transactional = new ConcurrentHashMap();
502: _brokers = new ConcurrentReferenceHashSet(
503: ConcurrentReferenceHashSet.WEAK);
504:
505: makeReadOnly();
506: return this ;
507: }
508:
509: ////////////////////////
510: // Methods for Override
511: ////////////////////////
512:
513: /**
514: * Return a new StoreManager for this runtime. Note that the instance
515: * returned here may be wrapped before being passed to the
516: * {@link #newBroker} method.
517: */
518: protected abstract StoreManager newStoreManager();
519:
520: /**
521: * Find a pooled broker, or return null if none. If using
522: * managed transactions, looks for a transactional broker;
523: * otherwise returns null by default. This method will be called before
524: * {@link #newStoreManager} so that factory subclasses implementing
525: * pooling can return a matching manager before a new {@link StoreManager}
526: * is created.
527: */
528: protected BrokerImpl findBroker(String user, String pass,
529: boolean managed) {
530: if (managed)
531: return findTransactionalBroker(user, pass);
532: return null;
533: }
534:
535: /**
536: * Return a broker configured with the proper settings.
537: * By default, this method constructs a new
538: * BrokerImpl of the class set for this factory.
539: */
540: protected BrokerImpl newBrokerImpl(String user, String pass) {
541: BrokerImpl broker = _conf.newBrokerInstance(user, pass);
542: if (broker == null)
543: throw new UserException(_loc.get("no-broker-class", _conf
544: .getBrokerImpl()));
545:
546: return broker;
547: }
548:
549: /**
550: * Setup transient state used by this factory based on the
551: * current configuration, which will subsequently be locked down. This
552: * method will be called before the first broker is requested,
553: * and will be re-called each time the factory is deserialized into a JVM
554: * that has no configuration for this data store.
555: */
556: protected void setup() {
557: }
558:
559: /////////////
560: // Utilities
561: /////////////
562:
563: /**
564: * Find a managed runtime broker associated with the
565: * current transaction, or returns null if none.
566: */
567: protected BrokerImpl findTransactionalBroker(String user,
568: String pass) {
569: Transaction trans;
570: ManagedRuntime mr = _conf.getManagedRuntimeInstance();
571: Object txKey;
572: try {
573: trans = mr.getTransactionManager().getTransaction();
574: txKey = mr.getTransactionKey();
575:
576: if (trans == null
577: || trans.getStatus() == Status.STATUS_NO_TRANSACTION
578: || trans.getStatus() == Status.STATUS_UNKNOWN)
579: return null;
580: } catch (OpenJPAException ke) {
581: throw ke;
582: } catch (Exception e) {
583: throw new GeneralException(e);
584: }
585:
586: Collection brokers = (Collection) _transactional.get(txKey);
587: if (brokers != null) {
588: // we don't need to synchronize on brokers since one JTA transaction
589: // can never be active on multiple concurrent threads.
590: BrokerImpl broker;
591: for (Iterator itr = brokers.iterator(); itr.hasNext();) {
592: broker = (BrokerImpl) itr.next();
593: if (StringUtils.equals(broker.getConnectionUserName(),
594: user)
595: && StringUtils.equals(broker
596: .getConnectionPassword(), pass))
597: return broker;
598: }
599: }
600: return null;
601: }
602:
603: /**
604: * Configures the given broker with the current factory option settings.
605: */
606: protected void configureBroker(BrokerImpl broker) {
607: broker.setOptimistic(_conf.getOptimistic());
608: broker.setNontransactionalRead(_conf.getNontransactionalRead());
609: broker.setNontransactionalWrite(_conf
610: .getNontransactionalWrite());
611: broker.setRetainState(_conf.getRetainState());
612: broker.setRestoreState(_conf.getRestoreStateConstant());
613: broker.setAutoClear(_conf.getAutoClearConstant());
614: broker.setIgnoreChanges(_conf.getIgnoreChanges());
615: broker.setMultithreaded(_conf.getMultithreaded());
616: broker.setAutoDetach(_conf.getAutoDetachConstant());
617: broker.setDetachState(_conf.getDetachStateInstance()
618: .getDetachState());
619: }
620:
621: /**
622: * Freezes the configuration of this factory.
623: */
624: public void makeReadOnly() {
625: if (_readOnly)
626: return;
627:
628: lock();
629: try {
630: // check again
631: if (_readOnly)
632: return;
633: _readOnly = true;
634:
635: Log log = _conf.getLog(OpenJPAConfiguration.LOG_RUNTIME);
636: if (log.isInfoEnabled())
637: log.info(getFactoryInitializationBanner());
638: if (log.isTraceEnabled()) {
639: Map props = _conf.toProperties(true);
640: String lineSep = J2DoPrivHelper.getLineSeparator();
641: StringBuffer buf = new StringBuffer();
642: Map.Entry entry;
643: for (Iterator itr = props.entrySet().iterator(); itr
644: .hasNext();) {
645: entry = (Map.Entry) itr.next();
646: buf.append(entry.getKey()).append(": ").append(
647: entry.getValue());
648: if (itr.hasNext())
649: buf.append(lineSep);
650: }
651: log.trace(_loc
652: .get("factory-properties", buf.toString()));
653: }
654:
655: // setup transient state
656: setup();
657:
658: // register the metdata repository to auto-load persistent types
659: // and make sure types are enhanced
660: MetaDataRepository repos = _conf
661: .getMetaDataRepositoryInstance();
662: repos.setValidate(repos.VALIDATE_RUNTIME, true);
663: repos.setResolve(repos.MODE_MAPPING_INIT, true);
664: PCRegistry.addRegisterClassListener(repos);
665:
666: // freeze underlying configuration and eagerly initialize to
667: // avoid synchronization
668: _conf.setReadOnly(Configuration.INIT_STATE_FREEZING);
669: _conf.instantiateAll();
670:
671: // fire an event for all the broker factory listeners
672: // registered on the configuration.
673: _conf.getBrokerFactoryEventManager().fireEvent(
674: new BrokerFactoryEvent(this ,
675: BrokerFactoryEvent.BROKER_FACTORY_CREATED));
676: } finally {
677: unlock();
678: }
679: }
680:
681: /**
682: * Return an object to be written to the log when this broker factory
683: * initializes. This happens after the configuration is fully loaded.
684: */
685: protected Object getFactoryInitializationBanner() {
686: return _loc.get("factory-init", OpenJPAVersion.VERSION_NUMBER);
687: }
688:
689: /**
690: * Throw an exception if the factory is closed. The exact message and
691: * content of the exception varies whether TRACE is enabled or not.
692: */
693: private void assertOpen() {
694: if (_closed) {
695: if (_closedException == null) // TRACE not enabled
696: throw new InvalidStateException(_loc
697: .get("closed-factory-notrace"));
698: else
699: throw new InvalidStateException(_loc
700: .get("closed-factory"))
701: .setCause(_closedException);
702: }
703: }
704:
705: ////////////////////
706: // Broker utilities
707: ////////////////////
708:
709: /**
710: * Throws a {@link UserException} if a transaction is active. The thrown
711: * exception will contain all the Brokers with active transactions as
712: * failed objects in the nested exceptions.
713: */
714: private void assertNoActiveTransaction() {
715: Collection excs;
716: if (_transactional.isEmpty())
717: return;
718:
719: excs = new ArrayList(_transactional.size());
720: for (Iterator trans = _transactional.values().iterator(); trans
721: .hasNext();) {
722: Collection brokers = (Collection) trans.next();
723: for (Iterator itr = brokers.iterator(); itr.hasNext();) {
724: excs.add(new InvalidStateException(_loc.get("active"))
725: .setFailedObject(itr.next()));
726: }
727: }
728:
729: if (!excs.isEmpty())
730: throw new InvalidStateException(_loc.get("nested-exceps"))
731: .setNestedThrowables((Throwable[]) excs
732: .toArray(new Throwable[excs.size()]));
733: }
734:
735: /**
736: * Synchronize the given broker with a managed transaction,
737: * optionally starting one if none is in progress.
738: *
739: * @return true if synched with transaction, false otherwise
740: */
741: boolean syncWithManagedTransaction(BrokerImpl broker, boolean begin) {
742: Transaction trans;
743: try {
744: ManagedRuntime mr = broker.getManagedRuntime();
745: TransactionManager tm = mr.getTransactionManager();
746: trans = tm.getTransaction();
747: if (trans != null
748: && (trans.getStatus() == Status.STATUS_NO_TRANSACTION || trans
749: .getStatus() == Status.STATUS_UNKNOWN))
750: trans = null;
751:
752: if (trans == null && begin) {
753: tm.begin();
754: trans = tm.getTransaction();
755: } else if (trans == null)
756: return false;
757:
758: // synch broker and trans
759: trans.registerSynchronization(broker);
760:
761: // we don't need to synchronize on brokers or guard against multiple
762: // threads using the same trans since one JTA transaction can never
763: // be active on multiple concurrent threads.
764: Object txKey = mr.getTransactionKey();
765: Collection brokers = (Collection) _transactional.get(txKey);
766:
767: if (brokers == null) {
768: brokers = new ArrayList(2);
769: _transactional.put(txKey, brokers);
770: trans
771: .registerSynchronization(new RemoveTransactionSync(
772: txKey));
773: }
774: brokers.add(broker);
775:
776: return true;
777: } catch (OpenJPAException ke) {
778: throw ke;
779: } catch (Exception e) {
780: throw new GeneralException(e);
781: }
782: }
783:
784: /**
785: * Returns a set of all the open brokers associated with this factory. The
786: * returned set is unmodifiable, and may contain null references.
787: */
788: public Collection getOpenBrokers() {
789: return Collections.unmodifiableCollection(_brokers);
790: }
791:
792: /**
793: * @return a key that can be used to obtain this broker factory from the
794: * pool at a later time.
795: *
796: * @since 1.1.0
797: */
798: public Object getPoolKey() {
799: return _poolKey;
800: }
801:
802: /**
803: * Set a key that can be used to obtain this broker factory from the
804: * pool at a later time.
805: *
806: * @since 1.1.0
807: */
808: void setPoolKey(Object key) {
809: _poolKey = key;
810: }
811:
812: /**
813: * Simple synchronization listener to remove completed transactions
814: * from our cache.
815: */
816: private class RemoveTransactionSync implements Synchronization {
817:
818: private final Object _trans;
819:
820: public RemoveTransactionSync(Object trans) {
821: _trans = trans;
822: }
823:
824: public void beforeCompletion() {
825: }
826:
827: public void afterCompletion(int status) {
828: _transactional.remove(_trans);
829: }
830: }
831:
832: /**
833: * Method insures that deserialized EMF has this reference re-instantiated
834: */
835: private Collection getPcClassLoaders() {
836: if (_pcClassLoaders == null)
837: _pcClassLoaders = new ConcurrentReferenceHashSet(
838: ConcurrentReferenceHashSet.WEAK);
839:
840: return _pcClassLoaders;
841: }
842: }
|