001: /*
002: * CoadunationLib: The coaduntion implementation library.
003: * Copyright (C) 2006 Rift IT Contracting
004: *
005: * This library is free software; you can redistribute it and/or
006: * modify it under the terms of the GNU Lesser General Public
007: * License as published by the Free Software Foundation; either
008: * version 2.1 of the License, or (at your option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
013: * Lesser General Public License for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public
016: * License along with this library; if not, write to the Free Software
017: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
018: *
019: * TransactionBeanCache.java
020: */
021:
022: // java package
023: package com.rift.coad.lib.bean;
024:
025: // java imports
026: import java.util.ArrayList;
027: import java.util.Date;
028: import java.util.Iterator;
029: import java.util.Map;
030: import java.util.HashMap;
031: import java.util.List;
032: import java.util.LinkedHashMap;
033: import java.util.LinkedHashSet;
034: import java.util.Set;
035: import java.util.concurrent.ConcurrentHashMap;
036: import javax.rmi.PortableRemoteObject;
037: import javax.transaction.xa.XAException;
038: import javax.transaction.xa.XAResource;
039: import javax.transaction.xa.Xid;
040:
041: // logging import
042: import org.apache.log4j.Logger;
043:
044: // coadunation imports
045: import com.rift.coad.lib.cache.Cache;
046: import com.rift.coad.lib.cache.CacheEntry;
047: import com.rift.coad.lib.configuration.ConfigurationFactory;
048: import com.rift.coad.lib.configuration.Configuration;
049: import com.rift.coad.lib.thread.ThreadStateMonitor;
050: import com.rift.coad.util.lock.LockRef;
051: import com.rift.coad.util.lock.ObjectLockFactory;
052: import com.rift.coad.util.transaction.TransactionManager;
053:
054: /**
055: * This object is responsible for managing the transaction bean cache.
056: *
057: * @author Brett Chaldecott
058: */
059: public class TransactionBeanCache implements Cache, XAResource {
060:
061: /**
062: * This object represents a change in the change list
063: */
064: public class ChangeEntry {
065: // member variables
066: private Object key = null;
067: private Object value = null;
068: private byte changeType = 0;
069:
070: /**
071: * The constructor of the change entry object.
072: *
073: * @param key The key to identify this change.
074: * @param value The new value for this change.
075: * @param changeType The type of change that has occurred.
076: */
077: public ChangeEntry(Object key, Object value, byte changeType) {
078: this .key = key;
079: this .value = value;
080: this .changeType = changeType;
081: }
082:
083: /**
084: * This method returns the key identifying this object.
085: *
086: * @return The key identifying this object.
087: */
088: public Object getKey() {
089: return key;
090: }
091:
092: /**
093: * This method returns the value identifying this object.
094: *
095: * @return The value identifying this object.
096: */
097: public Object getValue() {
098: return value;
099: }
100:
101: /**
102: * This method returns the change type for this object.
103: *
104: * @return The change type for this object.
105: */
106: public byte getChangeType() {
107: return changeType;
108: }
109: }
110:
111: /**
112: * The object that represents a change on this cache object.
113: */
114: public class Changes {
115: // the class private member variables
116: private Xid transactionId = null;
117: private List locks = new ArrayList();
118: private List changesEntries = new ArrayList();
119:
120: /**
121: * The constructor of the changes object.
122: *
123: * @param transactionId The id of the current transaction
124: */
125: public Changes(Xid transactionId) {
126: this .transactionId = transactionId;
127: }
128:
129: /**
130: * This method will add a lock to the list of locks.
131: *
132: * @exception MessageServiceException
133: */
134: public void addLock(LockRef lock) throws BeanException {
135: locks.add(lock);
136: }
137:
138: /**
139: * This method adds a new entry to the entries list.
140: *
141: * @param key The key to add to the list.
142: * @param value The value to add to the list
143: * @exception BeanException
144: */
145: public void addEntry(Object key, Object value)
146: throws BeanException {
147: changesEntries.add(new ChangeEntry(key, value, ADD));
148: }
149:
150: /**
151: * This method adds a new remove entry to the list.
152: *
153: * @param key The key to add to the list.
154: * @param value The object that is getting removed.
155: * @exception BeanException
156: */
157: public void addRemoveEntry(Object key, Object value)
158: throws BeanException {
159: changesEntries.add(new ChangeEntry(key, value, REMOVE));
160: }
161:
162: /**
163: * This method returns the list of added entries
164: *
165: * @return The list of queues.
166: */
167: public List getChangeEntries() {
168: return changesEntries;
169: }
170:
171: /**
172: * This method returns the list of locks.
173: *
174: * @return The list of locks.
175: */
176: public List getLocks() {
177: return locks;
178: }
179: }
180:
181: // class constants
182: private final static String CACHE_EXPIRY_TIME = "bean_cache_expiry";
183: private final static long CACHE_EXPIRY_TIME_DEFAULT = 30 * 60 * 1000;
184: private final static byte ADD = 1;
185: private final static byte UPDATE = 2;
186: private final static byte REMOVE = 3;
187:
188: // the logger reference
189: protected Logger log = Logger.getLogger(TransactionBeanCache.class
190: .getName());
191:
192: // the cache entries
193: private long defaultCacheExpiryTime = 0;
194: private ThreadStateMonitor status = new ThreadStateMonitor();
195: private ThreadLocal currentTransaction = new ThreadLocal();
196: private Map keyLockMap = new HashMap();
197: private Map baseCacheEntries = new ConcurrentHashMap();
198: private Map transactionChanges = new ConcurrentHashMap();
199:
200: /**
201: * Creates a new instance of TransactionBeanCache
202: *
203: * @exception BeanException
204: */
205: public TransactionBeanCache() throws BeanException {
206: try {
207: Configuration config = ConfigurationFactory.getInstance()
208: .getConfig(BeanCache.class);
209: defaultCacheExpiryTime = config.getLong(CACHE_EXPIRY_TIME,
210: CACHE_EXPIRY_TIME_DEFAULT);
211: } catch (Exception ex) {
212: log.error(
213: "Failed to start the TransactionBeanCache object "
214: + "because : " + ex.getMessage(), ex);
215: throw new BeanException(
216: "Failed to start the TransactionBeanCache object "
217: + "because : " + ex.getMessage(), ex);
218: }
219: }
220:
221: /**
222: * This method is called to perform garbage collection on the cache entries.
223: */
224: public void garbageCollect() {
225: // copy the entries map
226: Map entries = new HashMap();
227: entries.putAll(this .baseCacheEntries);
228:
229: // loop through the entires and remove the expired ones
230: Date expiryDate = new Date();
231: for (Iterator iter = entries.keySet().iterator(); iter
232: .hasNext();) {
233: Object cacheKey = iter.next();
234: LockRef lockRef = null;
235: try {
236: lockRef = getLockRef(cacheKey);
237: } catch (Exception ex) {
238: log.error("Failed to aquire lock on [" + cacheKey
239: + "] because :" + ex.getMessage(), ex);
240: continue;
241: }
242: try {
243: BeanCacheEntry beanCacheEntry = (BeanCacheEntry) entries
244: .get(cacheKey);
245: if (beanCacheEntry.isExpired(expiryDate)) {
246: if (beanCacheEntry.getCacheEntry() != null) {
247: try {
248: PortableRemoteObject
249: .unexportObject((java.rmi.Remote) beanCacheEntry
250: .getCacheEntry());
251: this .baseCacheEntries.remove(cacheKey);
252: beanCacheEntry.cacheRelease();
253: } catch (java.rmi.NoSuchObjectException ex) {
254: log.warn("The object was never exported : "
255: + ex.getMessage(), ex);
256: // remove from cache
257: synchronized (entries) {
258: this .baseCacheEntries.remove(cacheKey);
259: }
260: beanCacheEntry.cacheRelease();
261: } catch (Exception ex) {
262: log.error(
263: "Failed to un-export this object : "
264: + ex.getMessage(), ex);
265: }
266: } else {
267: // if this object has not cache entry as in no rmi tie
268: // class than just remove it.
269: this .baseCacheEntries.remove(cacheKey);
270: beanCacheEntry.cacheRelease();
271: }
272: }
273: } finally {
274: try {
275: lockRef.release();
276: } catch (Exception ex) {
277: log.error("Failed to release the lock on ["
278: + cacheKey + "] because :"
279: + ex.getMessage(), ex);
280: }
281: }
282: }
283: }
284:
285: /**
286: * This method is called to forcibly remove everything from the cache.
287: */
288: public void clear() {
289: LockRef lockRef = null;
290: try {
291: lockRef = ObjectLockFactory.getInstance().acquireReadLock(
292: this );
293: // copy the entries map
294: status.terminate(false);
295: Map entries = new HashMap();
296: entries.putAll(this .baseCacheEntries);
297: this .baseCacheEntries.clear();
298:
299: // loop through the entires and remove the expired ones
300: for (Iterator iter = entries.keySet().iterator(); iter
301: .hasNext();) {
302: Object cacheKey = iter.next();
303: BeanCacheEntry beanCacheEntry = (BeanCacheEntry) entries
304: .get(cacheKey);
305: if (beanCacheEntry.getCacheEntry() != null) {
306: try {
307: PortableRemoteObject
308: .unexportObject((java.rmi.Remote) beanCacheEntry
309: .getCacheEntry());
310: } catch (java.rmi.NoSuchObjectException ex) {
311: log.warn("The cache object was not bound : "
312: + ex.getMessage(), ex);
313: } catch (Exception ex) {
314: log.error(
315: "Failed to un-export the cached object : "
316: + ex.getMessage(), ex);
317: }
318: }
319: beanCacheEntry.cacheRelease();
320: }
321:
322: } catch (Exception ex) {
323: log.error("Failed to clear the bean cache : "
324: + ex.getMessage(), ex);
325: } finally {
326: try {
327: lockRef.release();
328: } catch (Exception ex) {
329: log.error("Failed to release the write lock : "
330: + ex.getMessage(), ex);
331: }
332: }
333: }
334:
335: /**
336: * This mehtod returns true if the cache contains the checked entry.
337: *
338: * @return TRUE if the cache contains the checked entry.
339: * @param cacheEntry The entry to perform the check for.
340: */
341: public boolean contains(Object cacheKey) {
342: try {
343: TransactionManager.getInstance().bindResource(this , false);
344: getLock(cacheKey);
345: return baseCacheEntries.containsKey(cacheKey);
346: } catch (Exception ex) {
347: log.error("Failed to retrieve the cache entries : "
348: + ex.getMessage(), ex);
349: return false;
350: }
351: }
352:
353: /**
354: * This method returns the bean cache entry.
355: *
356: * @return The reference to the bean cache object.
357: * @param key The key to retrieve.
358: * @exception BeanException
359: */
360: public BeanCacheEntry getCacheEntry(Object cacheKey)
361: throws BeanException {
362: try {
363: TransactionManager.getInstance().bindResource(this , false);
364: getLock(cacheKey);
365: BeanCacheEntry beanCacheEntry = (BeanCacheEntry) baseCacheEntries
366: .get(cacheKey);
367: if (beanCacheEntry != null) {
368: beanCacheEntry.touch();
369: }
370: return beanCacheEntry;
371: } catch (Exception ex) {
372: throw new BeanException(
373: "Failed to retrieve the cache entries : "
374: + ex.getMessage(), ex);
375: }
376: }
377:
378: /**
379: * This method adds the entry to the cache.
380: *
381: * @param cacheKey The key to identify this entry by.
382: * @param wrappedObject The object wrapped by the cache entry.
383: * @param entry An entry in the cache
384: */
385: public void addCacheEntry(long timeout, Object cacheKey,
386: Object wrappedObject, CacheEntry entry)
387: throws BeanException {
388: try {
389: TransactionManager.getInstance().bindResource(this , false);
390: getLock(cacheKey);
391: if (baseCacheEntries.containsKey(cacheKey)) {
392: throw new BeanException(
393: "Entry is already in the cache.");
394: }
395: long cacheTimeout = timeout;
396: if (timeout == -1) {
397: cacheTimeout = defaultCacheExpiryTime;
398: }
399: BeanCacheEntry beanCacheEntry = new BeanCacheEntry(
400: cacheTimeout, cacheKey, wrappedObject, entry);
401: baseCacheEntries.put(cacheKey, beanCacheEntry);
402: Changes changes = (Changes) transactionChanges
403: .get(currentTransaction.get());
404: changes.addEntry(cacheKey, beanCacheEntry);
405: } catch (BeanException ex) {
406: throw ex;
407: } catch (Exception ex) {
408: throw new BeanException("Failed to add a cache entrie : "
409: + ex.getMessage(), ex);
410: }
411: }
412:
413: /**
414: * This method adds a new entry to the cache.
415: *
416: * @param cacheKey The key to identify this entry by.
417: * @param wrappedObject The object wrapped by the proxy.
418: * @param proxy The proxy to add.
419: * @param handle The handler for the bean proxy object.
420: */
421: public void addCacheEntry(long timeout, Object cacheKey,
422: Object wrappedObject, Object proxy, CacheEntry handle)
423: throws BeanException {
424: try {
425: TransactionManager.getInstance().bindResource(this , false);
426: getLock(cacheKey);
427: if (baseCacheEntries.containsKey(cacheKey)) {
428: throw new BeanException(
429: "Entry is already in the cache.");
430: }
431: long cacheTimeout = timeout;
432: if (timeout == -1) {
433: cacheTimeout = defaultCacheExpiryTime;
434: }
435: BeanCacheEntry beanCacheEntry = new BeanCacheEntry(
436: cacheTimeout, cacheKey, wrappedObject, proxy,
437: handle);
438:
439: baseCacheEntries.put(cacheKey, beanCacheEntry);
440: Changes changes = (Changes) transactionChanges
441: .get(currentTransaction.get());
442: changes.addEntry(cacheKey, beanCacheEntry);
443: } catch (BeanException ex) {
444: throw ex;
445: } catch (Exception ex) {
446: throw new BeanException("Failed to add a cache entrie : "
447: + ex.getMessage(), ex);
448: }
449: }
450:
451: /**
452: * This method removes the entry from the cache based on the key passed in.
453: *
454: * @param cacheKey The key in the cache to remove.
455: */
456: public void removeCacheEntry(Object cacheKey) throws BeanException {
457: try {
458: TransactionManager.getInstance().bindResource(this , false);
459: getLock(cacheKey);
460: Object entry = baseCacheEntries.get(cacheKey);
461: baseCacheEntries.remove(cacheKey);
462: Changes changes = (Changes) transactionChanges
463: .get(currentTransaction.get());
464: changes.addRemoveEntry(cacheKey, entry);
465: } catch (Exception ex) {
466: throw new BeanException(
467: "Failed to remove a cache entrie : "
468: + ex.getMessage(), ex);
469: }
470: }
471:
472: /**
473: * This method is called to commit the specified transaction.
474: *
475: * @param xid The id of the transaction to commit.
476: * @param onePhase If true a one phase commit should be used.
477: * @exception XAException
478: */
479: public synchronized void commit(Xid xid, boolean b)
480: throws XAException {
481: try {
482: if (this .status.isTerminated()) {
483: log
484: .error("Commit called on terminated cache, ignoring.");
485: return;
486: }
487: Changes changes = (Changes) transactionChanges.get(xid);
488: for (Iterator iter = changes.getLocks().iterator(); iter
489: .hasNext();) {
490: LockRef lockRef = (LockRef) iter.next();
491: lockRef.release();
492: }
493: transactionChanges.remove(xid);
494: } catch (Exception ex) {
495: log.error("Failed to commit the changes : "
496: + ex.getMessage(), ex);
497: throw new XAException("Failed to commit the changes : "
498: + ex.getMessage());
499: }
500: }
501:
502: /**
503: * The resource manager has dissociated this object from the transaction.
504: *
505: * @param xid The id of the transaction that is getting ended.
506: * @param flags The flags associated with this operation.
507: * @exception XAException
508: */
509: public void end(Xid xid, int i) throws XAException {
510: }
511:
512: /**
513: * The transaction has been completed and must be forgotten.
514: *
515: * @param xid The id of the transaction to forget.
516: * @exception XAException
517: */
518: public void forget(Xid xid) throws XAException {
519: try {
520: if (this .status.isTerminated()) {
521: log
522: .error("Commit called on terminated cache, ignoring.");
523: return;
524: }
525: Changes changes = (Changes) transactionChanges.get(xid);
526: for (Iterator iter = changes.getLocks().iterator(); iter
527: .hasNext();) {
528: LockRef lockRef = (LockRef) iter.next();
529: lockRef.release();
530: }
531: transactionChanges.remove(xid);
532: } catch (Exception ex) {
533: log.error("Failed to forget the changes : "
534: + ex.getMessage(), ex);
535: throw new XAException("Failed to forget the changes : "
536: + ex.getMessage());
537: }
538: }
539:
540: /**
541: * This method returns the transaction timeout for this object.
542: *
543: * @return The int containing the transaction timeout.
544: * @exception XAException
545: */
546: public int getTransactionTimeout() throws XAException {
547: return -1;
548: }
549:
550: /**
551: * This method returns true if this object is the resource manager getting
552: * queried.
553: *
554: * @return TRUE if this is the resource manager, FALSE if not.
555: * @param xaResource The resource to perform the check against.
556: * @exception XAException
557: */
558: public boolean isSameRM(XAResource xAResource) throws XAException {
559: return this == xAResource;
560: }
561:
562: /**
563: * This is called before a transaction is committed.
564: *
565: * @return The results of the transaction.
566: * @param xid The id of the transaction to check against.
567: * @exception XAException
568: */
569: public int prepare(Xid xid) throws XAException {
570: return XAResource.XA_OK;
571: }
572:
573: /**
574: * This method returns the list of transaction branches for this resource
575: * manager.
576: *
577: * @return The list of resource branches.
578: * @param flags The flags
579: * @exception XAException
580: */
581: public Xid[] recover(int i) throws XAException {
582: return null;
583: }
584:
585: /**
586: * This method is called to roll back the specified transaction.
587: *
588: * @param xid The id of the transaction to roll back.
589: * @exception XAException
590: */
591: public void rollback(Xid xid) throws XAException {
592: try {
593: if (this .status.isTerminated()) {
594: log
595: .error("Commit called on terminated cache, ignoring.");
596: return;
597: }
598: Changes changes = (Changes) transactionChanges.get(xid);
599: List changeEntries = changes.getChangeEntries();
600: for (int index = 0; index < changeEntries.size(); index++) {
601: ChangeEntry changeEntry = (ChangeEntry) changeEntries
602: .get(changeEntries.size() - (index + 1));
603: if (changeEntry.getChangeType() == ADD) {
604: BeanCacheEntry beanCacheEntry = (BeanCacheEntry) changeEntry
605: .getValue();
606: if (beanCacheEntry.getCacheEntry() != null) {
607: try {
608: PortableRemoteObject
609: .unexportObject((java.rmi.Remote) beanCacheEntry
610: .getCacheEntry());
611: beanCacheEntry.cacheRelease();
612: } catch (java.rmi.NoSuchObjectException ex) {
613: log.warn("The object was never exported : "
614: + ex.getMessage(), ex);
615: beanCacheEntry.cacheRelease();
616: } catch (Exception ex) {
617: log.error(
618: "Failed to un-export this object : "
619: + ex.getMessage(), ex);
620: beanCacheEntry.cacheRelease();
621: }
622: } else {
623: // if this object has not cache entry as in no rmi tie
624: // class than just remove it.
625: beanCacheEntry.cacheRelease();
626: }
627: baseCacheEntries.remove(changeEntry.getKey());
628:
629: } else if (changeEntry.getChangeType() == REMOVE) {
630: baseCacheEntries.put(changeEntry.getKey(),
631: changeEntry.getValue());
632: }
633: }
634:
635: for (Iterator iter = changes.getLocks().iterator(); iter
636: .hasNext();) {
637: LockRef lockRef = (LockRef) iter.next();
638: lockRef.release();
639: }
640: transactionChanges.remove(xid);
641: } catch (Exception ex) {
642: log.error("Failed to rollback the changes : "
643: + ex.getMessage(), ex);
644: throw new XAException("Failed to rollback the changes : "
645: + ex.getMessage());
646: }
647: }
648:
649: /**
650: * This method sets the transaction timeout for this resource manager.
651: *
652: * @return TRUE if the transaction timeout can be set successfully.
653: * @param transactionTimeout The new transaction timeout value.
654: * @exception XAException
655: */
656: public boolean setTransactionTimeout(int i) throws XAException {
657: return true;
658: }
659:
660: /**
661: * This method is called to start a transaction on a resource manager.
662: *
663: * @param xid The id of the new transaction.
664: * @param flags The flags associated with the transaction.
665: * @exception XAException
666: */
667: public void start(Xid xid, int i) throws XAException {
668: try {
669: checkStatus();
670: if (!transactionChanges.containsKey(xid)) {
671: transactionChanges.put(xid, new Changes(xid));
672: }
673: currentTransaction.set(xid);
674: } catch (Exception ex) {
675: log.error("Cannot start a transaction because : "
676: + ex.getMessage(), ex);
677: throw new XAException(
678: "Cannot start a transaction because : "
679: + ex.getMessage());
680: }
681: }
682:
683: /**
684: * This method checks the bean cache status.
685: */
686: private void checkStatus() throws BeanException {
687: if (status.isTerminated()) {
688: throw new BeanException("Bean cache has been terminated.");
689: }
690: }
691:
692: /**
693: * This method returns the named lock
694: *
695: * @return The reference to the lock.
696: * @param The name of the queue that must be locked.
697: * @exception MessageServiceException
698: */
699: private void getLock(Object name) throws BeanException {
700: try {
701: Object key = null;
702: synchronized (keyLockMap) {
703: if (keyLockMap.containsKey(name)) {
704: key = keyLockMap.get(name);
705: } else {
706: key = name.toString();
707: keyLockMap.put(name, key);
708: }
709: }
710: LockRef lockRef = ObjectLockFactory.getInstance()
711: .acquireWriteLock(key, currentTransaction.get());
712: Changes changes = (Changes) transactionChanges
713: .get(currentTransaction.get());
714: changes.addLock(lockRef);
715: } catch (Exception ex) {
716: log.error(
717: "Failed to retrieve a lock on the bean cache entry : "
718: + ex.getMessage(), ex);
719: throw new BeanException(
720: "Failed to retrieve a lock on the bean cache entry : "
721: + ex.getMessage(), ex);
722: }
723: }
724:
725: /**
726: * This method returns the named lock
727: *
728: * @return The reference to the lock.
729: * @param The name of the queue that must be locked.
730: * @exception MessageServiceException
731: */
732: private LockRef getLockRef(Object name) throws BeanException {
733: try {
734: Object key = null;
735: synchronized (keyLockMap) {
736: if (keyLockMap.containsKey(name)) {
737: key = keyLockMap.get(name);
738: } else {
739: key = name.toString();
740: keyLockMap.put(name, key);
741: }
742: }
743: return ObjectLockFactory.getInstance()
744: .acquireWriteLock(key);
745: } catch (Exception ex) {
746: log.error(
747: "Failed to retrieve a lock on the bean cache entry : "
748: + ex.getMessage(), ex);
749: throw new BeanException(
750: "Failed to retrieve a lock on the bean cache entry : "
751: + ex.getMessage(), ex);
752: }
753: }
754: }
|