0001: /*
0002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
0003: * notice. All rights reserved.
0004: */
0005: package com.tc.objectserver.impl;
0006:
0007: import com.tc.async.api.Sink;
0008: import com.tc.exception.TCRuntimeException;
0009: import com.tc.logging.TCLogger;
0010: import com.tc.logging.TCLogging;
0011: import com.tc.net.groups.ClientID;
0012: import com.tc.net.groups.NodeID;
0013: import com.tc.object.ObjectID;
0014: import com.tc.object.cache.CacheStats;
0015: import com.tc.object.cache.Evictable;
0016: import com.tc.object.cache.EvictionPolicy;
0017: import com.tc.objectserver.api.GCStats;
0018: import com.tc.objectserver.api.NoSuchObjectException;
0019: import com.tc.objectserver.api.ObjectManager;
0020: import com.tc.objectserver.api.ObjectManagerEventListener;
0021: import com.tc.objectserver.api.ObjectManagerLookupResults;
0022: import com.tc.objectserver.api.ObjectManagerMBean;
0023: import com.tc.objectserver.api.ObjectManagerStatsListener;
0024: import com.tc.objectserver.api.ShutdownError;
0025: import com.tc.objectserver.context.ManagedObjectFaultingContext;
0026: import com.tc.objectserver.context.ManagedObjectFlushingContext;
0027: import com.tc.objectserver.context.ObjectManagerResultsContext;
0028: import com.tc.objectserver.core.api.GarbageCollector;
0029: import com.tc.objectserver.core.api.ManagedObject;
0030: import com.tc.objectserver.core.impl.NullGarbageCollector;
0031: import com.tc.objectserver.l1.api.ClientStateManager;
0032: import com.tc.objectserver.managedobject.ManagedObjectChangeListener;
0033: import com.tc.objectserver.managedobject.ManagedObjectImpl;
0034: import com.tc.objectserver.managedobject.ManagedObjectTraverser;
0035: import com.tc.objectserver.mgmt.ManagedObjectFacade;
0036: import com.tc.objectserver.persistence.api.ManagedObjectStore;
0037: import com.tc.objectserver.persistence.api.PersistenceTransaction;
0038: import com.tc.objectserver.persistence.api.PersistenceTransactionProvider;
0039: import com.tc.objectserver.tx.NullTransactionalObjectManager;
0040: import com.tc.objectserver.tx.TransactionalObjectManager;
0041: import com.tc.objectserver.tx.TransactionalObjectManagerImpl;
0042: import com.tc.properties.TCPropertiesImpl;
0043: import com.tc.text.PrettyPrinter;
0044: import com.tc.util.Assert;
0045: import com.tc.util.Counter;
0046: import com.tc.util.ObjectIDSet2;
0047: import com.tc.util.concurrent.StoppableThread;
0048:
0049: import java.io.PrintWriter;
0050: import java.util.ArrayList;
0051: import java.util.Collection;
0052: import java.util.Collections;
0053: import java.util.HashMap;
0054: import java.util.HashSet;
0055: import java.util.Iterator;
0056: import java.util.List;
0057: import java.util.Map;
0058: import java.util.Set;
0059:
0060: /**
0061: * Manages access to all the Managed objects in the system.
0062: */
0063: public class ObjectManagerImpl implements ObjectManager,
0064: ManagedObjectChangeListener, ObjectManagerMBean, Evictable {
0065:
0066: private static final TCLogger logger = TCLogging
0067: .getLogger(ObjectManager.class);
0068:
0069: private static final int MAX_COMMIT_SIZE = TCPropertiesImpl
0070: .getProperties().getInt(
0071: "l2.objectmanager.maxObjectsToCommit");
0072: // XXX:: Should go to property file
0073: private static final int INITIAL_SET_SIZE = 16;
0074: private static final float LOAD_FACTOR = 0.75f;
0075: private static final int MAX_LOOKUP_OBJECTS_COUNT = 5000;
0076: private static final long REMOVE_THRESHOLD = 300;
0077:
0078: private final ManagedObjectStore objectStore;
0079: private final Map references;
0080: private final EvictionPolicy evictionPolicy;
0081: private final Counter flushCount = new Counter();
0082: private final PendingList pending = new PendingList();
0083:
0084: private GarbageCollector collector = new NullGarbageCollector();
0085: private int checkedOutCount = 0;
0086:
0087: private volatile boolean inShutdown = false;
0088:
0089: private ClientStateManager stateManager;
0090: private final ObjectManagerConfig config;
0091: private final ThreadGroup gcThreadGroup;
0092: private ObjectManagerStatsListener stats = new NullObjectManagerStatsListener();
0093: private final PersistenceTransactionProvider persistenceTransactionProvider;
0094: private final Sink faultSink;
0095: private final Sink flushSink;
0096: private TransactionalObjectManager txnObjectMgr = new NullTransactionalObjectManager();
0097:
0098: public ObjectManagerImpl(
0099: ObjectManagerConfig config,
0100: ThreadGroup gcThreadGroup,
0101: ClientStateManager stateManager,
0102: ManagedObjectStore objectStore,
0103: EvictionPolicy cache,
0104: PersistenceTransactionProvider persistenceTransactionProvider,
0105: Sink faultSink, Sink flushSink) {
0106: this .faultSink = faultSink;
0107: this .flushSink = flushSink;
0108: Assert.assertNotNull(objectStore);
0109: this .config = config;
0110: this .gcThreadGroup = gcThreadGroup;
0111: this .stateManager = stateManager;
0112: this .objectStore = objectStore;
0113: this .evictionPolicy = cache;
0114: this .persistenceTransactionProvider = persistenceTransactionProvider;
0115: this .references = new HashMap(10000);
0116: }
0117:
0118: public void setTransactionalObjectManager(
0119: TransactionalObjectManagerImpl txnObjectManager) {
0120: this .txnObjectMgr = txnObjectManager;
0121: }
0122:
0123: public void setStatsListener(
0124: ObjectManagerStatsListener statsListener) {
0125: this .stats = statsListener;
0126: }
0127:
0128: public void start() {
0129: collector.start();
0130: }
0131:
0132: public synchronized void stop() {
0133: this .inShutdown = true;
0134:
0135: collector.stop();
0136:
0137: // flush the cache to stable persistence.
0138: Set toFlush = new HashSet();
0139: for (Iterator i = references.values().iterator(); i.hasNext();) {
0140: ManagedObject obj = ((ManagedObjectReference) i.next())
0141: .getObject();
0142: if (!obj.isNew())
0143: toFlush.add(obj);
0144: }
0145: PersistenceTransaction tx = newTransaction();
0146: flushAll(tx, toFlush);
0147: tx.commit();
0148: }
0149:
0150: public synchronized PrettyPrinter prettyPrint(PrettyPrinter out) {
0151: out.println(getClass().getName());
0152: out.indent().print("roots: ").println(getRoots());
0153: out.indent().print("collector: ").visit(collector).println();
0154: out.indent().print("references: ").visit(references).println();
0155:
0156: out.indent().println("checkedOutCount: " + checkedOutCount);
0157: out.indent().print("pending: ").visit(pending).println();
0158:
0159: out.indent().print("objectStore: ").duplicateAndIndent().visit(
0160: objectStore).println();
0161: out.indent().print("stateManager: ").duplicateAndIndent()
0162: .visit(stateManager).println();
0163: return out;
0164: }
0165:
0166: public void addListener(ObjectManagerEventListener listener) {
0167: if (listener == null) {
0168: throw new NullPointerException(
0169: "cannot add a null event listener");
0170: }
0171: collector.addListener(listener);
0172: }
0173:
0174: public GCStats[] getGarbageCollectorStats() {
0175: return collector.getGarbageCollectorStats();
0176: }
0177:
0178: public ObjectID lookupRootID(String name) {
0179: syncAssertNotInShutdown();
0180: return objectStore.getRootID(name);
0181: }
0182:
0183: public boolean lookupObjectsAndSubObjectsFor(NodeID nodeID,
0184: ObjectManagerResultsContext responseContext,
0185: int maxReachableObjects) {
0186: // maxReachableObjects is at least 1 so that addReachableObjectsIfNecessary does the right thing
0187: return lookupObjectsForOptionallyCreate(nodeID,
0188: responseContext, maxReachableObjects <= 0 ? 1
0189: : maxReachableObjects);
0190: }
0191:
0192: public boolean lookupObjectsFor(NodeID nodeID,
0193: ObjectManagerResultsContext responseContext) {
0194: return lookupObjectsForOptionallyCreate(nodeID,
0195: responseContext, -1);
0196: }
0197:
0198: private synchronized boolean lookupObjectsForOptionallyCreate(
0199: NodeID nodeID, ObjectManagerResultsContext responseContext,
0200: int maxReachableObjects) {
0201: syncAssertNotInShutdown();
0202:
0203: if (collector.isPausingOrPaused()) {
0204: makePending(nodeID, new ObjectManagerLookupContext(
0205: responseContext, false), maxReachableObjects);
0206: return false;
0207: }
0208: return basicLookupObjectsFor(nodeID,
0209: new ObjectManagerLookupContext(responseContext, false),
0210: maxReachableObjects);
0211: }
0212:
0213: public Iterator getRoots() {
0214: syncAssertNotInShutdown();
0215: return objectStore.getRoots().iterator();
0216: }
0217:
0218: public Iterator getRootNames() {
0219: syncAssertNotInShutdown();
0220: return objectStore.getRootNames().iterator();
0221: }
0222:
0223: /**
0224: * For management use only (see interface documentation)
0225: */
0226: public ManagedObjectFacade lookupFacade(ObjectID id, int limit)
0227: throws NoSuchObjectException {
0228: final ManagedObject object = lookup(id, true);
0229: if (object == null) {
0230: throw new NoSuchObjectException(id);
0231: }
0232:
0233: try {
0234: return object.createFacade(limit);
0235: } finally {
0236: releaseReadOnly(object);
0237: }
0238: }
0239:
0240: private ManagedObject lookup(ObjectID id, boolean missingOk) {
0241: syncAssertNotInShutdown();
0242:
0243: WaitForLookupContext waitContext = new WaitForLookupContext(id,
0244: missingOk);
0245: ObjectManagerLookupContext context = new ObjectManagerLookupContext(
0246: waitContext, true);
0247: basicLookupObjectsFor(ClientID.NULL_ID, context, -1);
0248:
0249: ManagedObject mo = waitContext.getLookedUpObject();
0250: if (mo == null) {
0251: Assert.assertTrue(missingOk);
0252: }
0253: return mo;
0254: }
0255:
0256: public ManagedObject getObjectByID(ObjectID id) {
0257: return lookup(id, false);
0258: }
0259:
0260: private void markReferenced(ManagedObjectReference reference) {
0261: if (reference.isReferenced()) {
0262: throw new AssertionError(
0263: "Attempt to mark an already referenced object: "
0264: + reference);
0265: }
0266: reference.markReference();
0267: checkedOutCount++;
0268: }
0269:
0270: private void unmarkReferenced(ManagedObjectReference reference) {
0271: if (!reference.isReferenced()) {
0272: throw new AssertionError(
0273: "Attempt to unmark an unreferenced object: "
0274: + reference);
0275: }
0276: reference.unmarkReference();
0277: checkedOutCount--;
0278: }
0279:
0280: /**
0281: * Retrieves materialized references.
0282: */
0283: private ManagedObjectReference getReference(ObjectID id) {
0284: return (ManagedObjectReference) references.get(id);
0285: }
0286:
0287: /**
0288: * Retrieves materialized references-- if not materialized, will initiate a request to materialize them from the
0289: * object store.
0290: */
0291: private ManagedObjectReference getOrLookupReference(
0292: ObjectManagerLookupContext context, ObjectID id) {
0293: ManagedObjectReference rv = getReference(id);
0294:
0295: if (rv == null) {
0296: // Request Faulting in a different stage and give back a "Referenced" proxy
0297: ManagedObjectFaultingContext mofc = new ManagedObjectFaultingContext(
0298: id, context.removeOnRelease());
0299: faultSink.add(mofc);
0300:
0301: // don't account for a cache "miss" unless this was a real request
0302: // originating from a client
0303: stats.cacheMiss();
0304: rv = addNewReference(new FaultingManagedObjectReference(id));
0305: } else if (rv instanceof FaultingManagedObjectReference) {
0306: // Check to see if the retrieve was complete and the Object is missing
0307: FaultingManagedObjectReference fmr = (FaultingManagedObjectReference) rv;
0308: if (!fmr.isFaultingInProgress()) {
0309: references.remove(id);
0310: logger.warn("Request for non-exisitent object : " + id
0311: + " context = " + context);
0312: context.missingObject(id);
0313: return null;
0314: }
0315: if (context.isNewRequest())
0316: stats.cacheMiss();
0317: } else {
0318: if (context.isNewRequest())
0319: stats.cacheHit();
0320: if (!context.removeOnRelease()) {
0321: if (rv.isRemoveOnRelease()) {
0322: // This Object is faulted in by GC or Management interface with removeOnRelease = true, but before they got a
0323: // chance to grab it, a regular request for object is received. Take corrective action.
0324: rv.setRemoveOnRelease(false);
0325: evictionPolicy.add(rv);
0326: } else {
0327: evictionPolicy.markReferenced(rv);
0328: }
0329: }
0330: }
0331: return rv;
0332: }
0333:
0334: public synchronized void addFaultedObject(ObjectID oid,
0335: ManagedObject mo, boolean removeOnRelease) {
0336: FaultingManagedObjectReference fmor;
0337: if (mo == null) {
0338: ManagedObjectReference mor = (ManagedObjectReference) references
0339: .get(oid);
0340: if (mor == null
0341: || !(mor instanceof FaultingManagedObjectReference)
0342: || !oid.equals(mor.getObjectID())) {
0343: // Format
0344: throw new AssertionError(
0345: "ManagedObjectReference is not what was expected : "
0346: + mor + " oid : " + oid);
0347: }
0348: fmor = (FaultingManagedObjectReference) mor;
0349: fmor.faultingFailed();
0350: } else {
0351: Assert.assertEquals(oid, mo.getID());
0352: ManagedObjectReference mor = (ManagedObjectReference) references
0353: .remove(oid);
0354: if (mor == null
0355: || !(mor instanceof FaultingManagedObjectReference)
0356: || !oid.equals(mor.getObjectID())) {
0357: // Format
0358: throw new AssertionError(
0359: "ManagedObjectReference is not what was expected : "
0360: + mor + " oid : " + oid);
0361: }
0362: fmor = (FaultingManagedObjectReference) mor;
0363: addNewReference(mo, removeOnRelease);
0364: }
0365: makeUnBlocked(oid);
0366: postRelease();
0367: }
0368:
0369: private ManagedObjectReference addNewReference(ManagedObject obj,
0370: boolean isRemoveOnRelease) throws AssertionError {
0371: ManagedObjectReference newReference = obj.getReference();
0372: newReference.setRemoveOnRelease(isRemoveOnRelease);
0373:
0374: return addNewReference(newReference);
0375: }
0376:
0377: private ManagedObjectReference addNewReference(
0378: ManagedObjectReference newReference) {
0379: Assert.assertNull(references.put(newReference.getObjectID(),
0380: newReference));
0381: Assert.assertTrue(newReference.getNext() == null
0382: && newReference.getPrevious() == null);
0383:
0384: if (!newReference.isRemoveOnRelease()) {
0385: evictionPolicy.add(newReference);
0386: }
0387: return newReference;
0388: }
0389:
0390: private synchronized void reapCache(Collection removalCandidates,
0391: Collection toFlush, Collection removedObjects) {
0392: while (collector.isPausingOrPaused()) {
0393: try {
0394: this .wait();
0395: } catch (InterruptedException e) {
0396: logger.error(e);
0397: }
0398: }
0399: for (Iterator i = removalCandidates.iterator(); i.hasNext();) {
0400: ManagedObjectReference removalCandidate = (ManagedObjectReference) i
0401: .next();
0402: // It is possible that before the cache evictor has a chance to mark the reference, the GC could come and remove
0403: // the reference, hence we check in references map again
0404: if (removalCandidate != null
0405: && !removalCandidate.isReferenced()
0406: && !removalCandidate.isNew()
0407: && references.containsKey(removalCandidate
0408: .getObjectID())) {
0409: evictionPolicy.remove(removalCandidate);
0410: if (removalCandidate.getObject().isDirty()) {
0411: Assert.assertFalse(config.paranoid());
0412: markReferenced(removalCandidate);
0413: toFlush.add(removalCandidate.getObject());
0414: } else {
0415: // paranoid mode or the object is not dirty - just remove from reference
0416: removedObjects.add(references
0417: .remove(removalCandidate.getObjectID()));
0418: }
0419: }
0420: }
0421: }
0422:
0423: private void evicted(Collection managedObjects) {
0424: synchronized (this ) {
0425: checkedOutCount -= managedObjects.size();
0426: for (Iterator i = managedObjects.iterator(); i.hasNext();) {
0427: ManagedObject mo = (ManagedObject) i.next();
0428: ObjectID oid = mo.getID();
0429: Object o = references.remove(oid);
0430: if (o == null) {
0431: logger
0432: .warn("Object ID : "
0433: + mo.getID()
0434: + " was mapped to null but should have been mapped to a reference of "
0435: + mo);
0436: } else {
0437: ManagedObjectReference ref = (ManagedObjectReference) o;
0438: if (isBlocked(oid)) {
0439: ref.unmarkReference();
0440: addNewReference(mo, ref.isRemoveOnRelease());
0441: makeUnBlocked(oid);
0442: i.remove();
0443: }
0444: }
0445: }
0446: postRelease();
0447: }
0448:
0449: }
0450:
0451: private synchronized boolean basicLookupObjectsFor(NodeID nodeID,
0452: ObjectManagerLookupContext context, int maxReachableObjects) {
0453: Set objects = createNewSet();
0454:
0455: final Set newObjectIDs = context.getNewObjectIDs();
0456: boolean available = true;
0457: Set ids = context.getLookupIDs();
0458: for (Iterator i = ids.iterator(); i.hasNext();) {
0459: ObjectID id = (ObjectID) i.next();
0460: if (newObjectIDs.contains(id))
0461: continue;
0462: // We don't check available flag before doing calling getOrLookupReference() for two reasons.
0463: // 1) To get the right hit/miss count and
0464: // 2) to Fault objects that are not available
0465: ManagedObjectReference reference = getOrLookupReference(
0466: context, id);
0467: if (reference == null) {
0468: continue;
0469: } else if (available && reference.isReferenced()) {
0470: available = false;
0471: // Setting only the first referenced object to process Pending. If objects are being faulted in, then this
0472: // will ensure that we don't run processPending multiple times unnecessarily.
0473: addBlocked(nodeID, context, maxReachableObjects, id);
0474: }
0475:
0476: objects.add(reference);
0477: }
0478:
0479: if (available) {
0480: createNewObjectsAndAddTo(objects, newObjectIDs);
0481: Set processLater = addReachableObjectsIfNecessary(nodeID,
0482: maxReachableObjects, objects);
0483: ObjectManagerLookupResults results = new ObjectManagerLookupResultsImpl(
0484: processObjectsRequest(objects), processLater);
0485: context.setResults(results);
0486: } else {
0487: context.makeOldRequest();
0488: }
0489: return available;
0490: }
0491:
0492: private void createNewObjectsAndAddTo(Set objects, Set newObjectIDs) {
0493: for (Iterator i = newObjectIDs.iterator(); i.hasNext();) {
0494: ObjectID oid = (ObjectID) i.next();
0495: ManagedObject mo = new ManagedObjectImpl(oid);
0496: createObject(mo);
0497: objects.add(mo.getReference());
0498: }
0499: }
0500:
0501: private Set addReachableObjectsIfNecessary(NodeID nodeID,
0502: int maxReachableObjects, Set objects) {
0503: if (maxReachableObjects <= 0) {
0504: return Collections.EMPTY_SET;
0505: }
0506: ManagedObjectTraverser traverser = new ManagedObjectTraverser(
0507: maxReachableObjects);
0508: Set lookedUpObjects = objects;
0509: do {
0510: traverser.traverse(lookedUpObjects);
0511: lookedUpObjects = new HashSet();
0512: Set lookupObjectIDs = traverser.getObjectsToLookup();
0513: if (lookupObjectIDs.isEmpty())
0514: break;
0515: stateManager.removeReferencedFrom(nodeID, lookupObjectIDs);
0516: for (Iterator j = lookupObjectIDs.iterator(); j.hasNext();) {
0517: ObjectID id = (ObjectID) j.next();
0518: ManagedObjectReference newRef = getReference(id);
0519: // Note : Objects are looked up only if it is in the memory and not referenced
0520: if (newRef != null && !newRef.isReferenced()) {
0521: if (objects.add(newRef)) {
0522: lookedUpObjects.add(newRef);
0523: }
0524: }
0525: }
0526: } while (objects.size() < MAX_LOOKUP_OBJECTS_COUNT);
0527: return traverser.getPendingObjectsToLookup(lookedUpObjects);
0528: }
0529:
0530: // TODO:: Multiple read only checkouts, now that there are more than 1 thread faulting objects to the
0531: // client
0532: public void releaseReadOnly(ManagedObject object) {
0533: if (config.paranoid() && object.isDirty()) {
0534: throw new AssertionError(
0535: "Object is dirty after a read-only checkout"
0536: + object);
0537: }
0538: synchronized (this ) {
0539: basicRelease(object);
0540: postRelease();
0541: }
0542: }
0543:
0544: public void release(PersistenceTransaction persistenceTransaction,
0545: ManagedObject object) {
0546: if (config.paranoid())
0547: flush(persistenceTransaction, object);
0548: synchronized (this ) {
0549: basicRelease(object);
0550: postRelease();
0551: }
0552:
0553: }
0554:
0555: public synchronized void releaseAll(Collection objects) {
0556: for (Iterator i = objects.iterator(); i.hasNext();) {
0557: ManagedObject mo = (ManagedObject) i.next();
0558: if (config.paranoid() && !mo.isNew() && mo.isDirty()) {
0559: // It is possible to release new just created objects before it has a chance to get applied because of a recall
0560: // due to a GC. Check out ObjectManagerTest.testRecallNewObjects()
0561: throw new AssertionError(
0562: "ObjectManager.releaseAll() called on dirty old objects : "
0563: + mo + " total objects size : "
0564: + objects.size());
0565: }
0566: basicRelease(mo);
0567: }
0568: postRelease();
0569: }
0570:
0571: public void releaseAll(
0572: PersistenceTransaction persistenceTransaction,
0573: Collection managedObjects) {
0574: if (config.paranoid())
0575: flushAll(persistenceTransaction, managedObjects);
0576: synchronized (this ) {
0577: for (Iterator i = managedObjects.iterator(); i.hasNext();) {
0578: basicRelease((ManagedObject) i.next());
0579: }
0580: postRelease();
0581: }
0582: }
0583:
0584: private void removeAllObjectsByID(Set toDelete) {
0585: for (Iterator i = toDelete.iterator(); i.hasNext();) {
0586: ObjectID id = (ObjectID) i.next();
0587: ManagedObjectReference ref = (ManagedObjectReference) references
0588: .remove(id);
0589: if (ref != null) {
0590: Assert.assertFalse(ref.isNew());
0591: while (ref != null && ref.isReferenced()) {
0592: // This is possible if the cache manager is evicting this *unreachable* object or somehow the admin console is
0593: // looking up this object.
0594: logger
0595: .warn("Reference : "
0596: + ref
0597: + " was referenced. So waiting to remove !");
0598: // reconcile
0599: references.put(id, ref);
0600: try {
0601: wait();
0602: } catch (InterruptedException e) {
0603: throw new AssertionError(e);
0604: }
0605: ref = (ManagedObjectReference) references
0606: .remove(id);
0607: }
0608: if (ref != null)
0609: evictionPolicy.remove(ref);
0610: }
0611: }
0612: }
0613:
0614: public synchronized int getCheckedOutCount() {
0615: return checkedOutCount;
0616: }
0617:
0618: public Set getRootIDs() {
0619: return objectStore.getRoots();
0620: }
0621:
0622: public Map getRootNamesToIDsMap() {
0623: return objectStore.getRootNamesToIDsMap();
0624: }
0625:
0626: public ObjectIDSet2 getAllObjectIDs() {
0627: return objectStore.getAllObjectIDs();
0628: }
0629:
0630: private void postRelease() {
0631: if (collector.isPausingOrPaused()) {
0632: checkAndNotifyGC();
0633: } else if (pending.size() > 0) {
0634: processPendingLookups();
0635: }
0636: notifyAll();
0637: }
0638:
0639: private void basicRelease(ManagedObject object) {
0640: ManagedObjectReference mor = object.getReference();
0641: removeReferenceIfNecessary(mor);
0642: unmarkReferenced(mor);
0643: makeUnBlocked(object.getID());
0644: }
0645:
0646: private void removeReferenceIfNecessary(ManagedObjectReference mor) {
0647: if (mor.isRemoveOnRelease()) {
0648: if (mor.getObject().isDirty()) {
0649: logger.error(mor + " is DIRTY");
0650: throw new AssertionError(mor + " is DIRTY");
0651: }
0652: Object removed = references.remove(mor.getObjectID());
0653: Assert.assertNotNull(removed);
0654: }
0655: }
0656:
0657: private void checkAndNotifyGC() {
0658: if (checkedOutCount == 0) {
0659: logger.info("Notifying GC : pending = " + pending.size()
0660: + " checkedOutCount = " + checkedOutCount);
0661: collector.notifyReadyToGC();
0662: }
0663: }
0664:
0665: public synchronized void waitUntilReadyToGC() {
0666: checkAndNotifyGC();
0667: txnObjectMgr.recallAllCheckedoutObject();
0668: while (!collector.isPaused()) {
0669: try {
0670: this .wait(10000);
0671: } catch (InterruptedException e) {
0672: throw new AssertionError(e);
0673: }
0674: }
0675: }
0676:
0677: public void notifyGCComplete(Set toDelete) {
0678: synchronized (this ) {
0679: collector.notifyGCDeleteStarted();
0680: removeAllObjectsByID(toDelete);
0681: // Process pending, since we disabled process pending while GC pause was initiate.
0682: processPendingLookups();
0683: notifyAll();
0684: }
0685:
0686: if (toDelete.size() <= config.getDeleteBatchSize()) {
0687: removeFromStore(toDelete);
0688: } else {
0689: Set split = new HashSet();
0690: for (Iterator i = toDelete.iterator(); i.hasNext();) {
0691: split.add(i.next());
0692: if (split.size() >= config.getDeleteBatchSize()) {
0693: removeFromStore(split);
0694: split = new HashSet();
0695: }
0696: }
0697: if (split.size() > 0) {
0698: removeFromStore(split);
0699: }
0700: }
0701: collector.notifyGCComplete();
0702: }
0703:
0704: private void removeFromStore(Set toDelete) {
0705: long start = System.currentTimeMillis();
0706:
0707: PersistenceTransaction tx = newTransaction();
0708: objectStore.removeAllObjectsByIDNow(tx, toDelete);
0709: tx.commit();
0710:
0711: long elapsed = System.currentTimeMillis() - start;
0712: if (elapsed > REMOVE_THRESHOLD) {
0713: logger.info("Removed " + toDelete.size() + " objects in "
0714: + elapsed + "ms.");
0715: }
0716: }
0717:
0718: private void flush(PersistenceTransaction persistenceTransaction,
0719: ManagedObject managedObject) {
0720: objectStore.commitObject(persistenceTransaction, managedObject);
0721: }
0722:
0723: private void flushAll(
0724: PersistenceTransaction persistenceTransaction,
0725: Collection managedObjects) {
0726: objectStore.commitAllObjects(persistenceTransaction,
0727: managedObjects);
0728: }
0729:
0730: public void dump() {
0731: PrintWriter pw = new PrintWriter(System.err);
0732: new PrettyPrinter(pw).visit(this );
0733: pw.flush();
0734: }
0735:
0736: // This method is for tests only
0737: public synchronized boolean isReferenced(ObjectID id) {
0738: ManagedObjectReference reference = getReference(id);
0739: return reference != null && reference.isReferenced();
0740: }
0741:
0742: // This method is public for testing purpose
0743: public synchronized void createObject(ManagedObject object) {
0744: syncAssertNotInShutdown();
0745: Assert.eval(object.getID().toLong() != -1);
0746: objectStore.addNewObject(object);
0747: addNewReference(object, false);
0748: stats.newObjectCreated();
0749: }
0750:
0751: public void createRoot(String rootName, ObjectID id) {
0752: syncAssertNotInShutdown();
0753: PersistenceTransaction tx = newTransaction();
0754: objectStore.addNewRoot(tx, rootName, id);
0755: tx.commit();
0756: stats.newObjectCreated();
0757: // This change needs to be notified so that new roots are not missedout
0758: changed(null, null, id);
0759: }
0760:
0761: private PersistenceTransaction newTransaction() {
0762: return this .persistenceTransactionProvider.newTransaction();
0763: }
0764:
0765: public GarbageCollector getGarbageCollector() {
0766: return this .collector;
0767: }
0768:
0769: public void setGarbageCollector(final GarbageCollector newCollector) {
0770: syncAssertNotInShutdown();
0771: if (this .collector != null) {
0772: this .collector.stop();
0773: }
0774: this .collector = newCollector;
0775:
0776: if (!config.doGC() || config.gcThreadSleepTime() < 0)
0777: return;
0778:
0779: final Object stopLock = new Object();
0780:
0781: StoppableThread st = new StoppableThread(this .gcThreadGroup,
0782: "GC") {
0783: public void requestStop() {
0784: super .requestStop();
0785:
0786: synchronized (stopLock) {
0787: stopLock.notifyAll();
0788: }
0789: }
0790:
0791: public void run() {
0792: final long gcSleepTime = config.gcThreadSleepTime();
0793:
0794: while (true) {
0795: try {
0796: if (isStopRequested()) {
0797: return;
0798: }
0799: synchronized (stopLock) {
0800: stopLock.wait(gcSleepTime);
0801: }
0802: if (isStopRequested()) {
0803: return;
0804: }
0805: newCollector.gc();
0806: } catch (InterruptedException ie) {
0807: throw new TCRuntimeException(ie);
0808: }
0809: }
0810: }
0811:
0812: };
0813: st.setDaemon(true);
0814: newCollector.setState(st);
0815: }
0816:
0817: private Map processObjectsRequest(Collection objects) {
0818: Map results = new HashMap();
0819: for (Iterator i = objects.iterator(); i.hasNext();) {
0820: ManagedObjectReference mor = (ManagedObjectReference) i
0821: .next();
0822: Assert.assertNotNull(mor);
0823: if (!mor.isReferenced()) {
0824: markReferenced(mor);
0825: }
0826: if (mor.getObject() == null) {
0827: logger.error("Object is NULL for " + mor);
0828: throw new AssertionError("ManagedObject is null.");
0829: }
0830: results.put(mor.getObjectID(), mor.getObject());
0831: }
0832: return results;
0833: }
0834:
0835: private void processPendingLookups() {
0836: List lp = pending.getAndResetPendingRequests();
0837: for (Iterator i = lp.iterator(); i.hasNext();) {
0838: Pending p = (Pending) i.next();
0839: basicLookupObjectsFor(p.getNodeID(), p.getRequestContext(),
0840: p.getMaxReachableObjects());
0841: }
0842: }
0843:
0844: private void addBlocked(NodeID nodeID,
0845: ObjectManagerLookupContext context,
0846: int maxReachableObjects, ObjectID blockedOid) {
0847: pending.makeBlocked(blockedOid, new Pending(nodeID, context,
0848: maxReachableObjects));
0849:
0850: if (context.getProcessedCount() % 500 == 499) {
0851: logger.warn("Reached " + context.getProcessedCount()
0852: + " Pending size : " + pending.size()
0853: + " : basic look up for : " + context
0854: + " maxReachable depth : " + maxReachableObjects);
0855: }
0856: }
0857:
0858: private void makeUnBlocked(ObjectID id) {
0859: pending.makeUnBlocked(id);
0860: }
0861:
0862: private boolean isBlocked(ObjectID id) {
0863: return pending.isBlocked(id);
0864: }
0865:
0866: private void makePending(NodeID nodeID,
0867: ObjectManagerLookupContext context, int maxReachableObjects) {
0868: pending.addPending(new Pending(nodeID, context,
0869: maxReachableObjects));
0870: }
0871:
0872: private void syncAssertNotInShutdown() {
0873: assertNotInShutdown();
0874: }
0875:
0876: private void assertNotInShutdown() {
0877: if (this .inShutdown)
0878: throw new ShutdownError();
0879: }
0880:
0881: public void evictCache(CacheStats stat) {
0882: int size = references_size();
0883: int toEvict = stat.getObjectCountToEvict(size);
0884: if (toEvict <= 0)
0885: return;
0886:
0887: // This could be a costly call, so call just once
0888: Collection removalCandidates = evictionPolicy
0889: .getRemovalCandidates(toEvict);
0890:
0891: HashSet toFlush = new HashSet();
0892: ArrayList removed = new ArrayList();
0893: reapCache(removalCandidates, toFlush, removed);
0894:
0895: int evicted = (toFlush.size() + removed.size());
0896: // Let GC work for us
0897: removed = null;
0898: removalCandidates = null;
0899:
0900: if (!toFlush.isEmpty()) {
0901: initateFlushRequest(toFlush);
0902: toFlush = null; // make GC work
0903: waitUntilFlushComplete();
0904: }
0905:
0906: // TODO:: Send the right objects to the cache manager
0907: stat.objectEvicted(evicted, references_size(),
0908: Collections.EMPTY_LIST);
0909: }
0910:
0911: private void waitUntilFlushComplete() {
0912: flushCount.waitUntil(0);
0913: }
0914:
0915: private void initateFlushRequest(Collection toFlush) {
0916: flushCount.increment(toFlush.size());
0917: for (Iterator i = toFlush.iterator(); i.hasNext();) {
0918: int count = 0;
0919: ManagedObjectFlushingContext mofc = new ManagedObjectFlushingContext();
0920: while (count < MAX_COMMIT_SIZE && i.hasNext()) {
0921: mofc.addObjectToFlush(i.next());
0922: count++;
0923: // i.remove();
0924: }
0925: flushSink.add(mofc);
0926: }
0927: }
0928:
0929: public void flushAndEvict(List objects2Flush) {
0930: PersistenceTransaction tx = newTransaction();
0931: int size = objects2Flush.size();
0932: flushAll(tx, objects2Flush);
0933: tx.commit();
0934: evicted(objects2Flush);
0935: flushCount.decrement(size);
0936: }
0937:
0938: // XXX:: This is not synchronized and might not give us the right number. Performance over accuracy. This is to be
0939: // used only in evictCache method.
0940: private int references_size() {
0941: return references.size();
0942: }
0943:
0944: private static class ObjectManagerLookupContext implements
0945: ObjectManagerResultsContext {
0946:
0947: private final ObjectManagerResultsContext responseContext;
0948: private final boolean removeOnRelease;
0949: private int processedCount = 0;
0950:
0951: public ObjectManagerLookupContext(
0952: ObjectManagerResultsContext responseContext,
0953: boolean removeOnRelease) {
0954: this .responseContext = responseContext;
0955: this .removeOnRelease = removeOnRelease;
0956: }
0957:
0958: public void makeOldRequest() {
0959: processedCount++;
0960: }
0961:
0962: public int getProcessedCount() {
0963: return processedCount;
0964: }
0965:
0966: public boolean isNewRequest() {
0967: return processedCount == 0;
0968: }
0969:
0970: public boolean removeOnRelease() {
0971: return removeOnRelease;
0972: }
0973:
0974: public Set getLookupIDs() {
0975: return responseContext.getLookupIDs();
0976: }
0977:
0978: public Set getNewObjectIDs() {
0979: return responseContext.getNewObjectIDs();
0980: }
0981:
0982: public void setResults(ObjectManagerLookupResults results) {
0983: responseContext.setResults(results);
0984: }
0985:
0986: public void missingObject(ObjectID oid) {
0987: responseContext.missingObject(oid);
0988: }
0989:
0990: public String toString() {
0991: return "ObjectManagerLookupContext : [ processed count = "
0992: + processedCount + ", responseContext = "
0993: + responseContext + "] ";
0994: }
0995: }
0996:
0997: private static class WaitForLookupContext implements
0998: ObjectManagerResultsContext {
0999:
1000: private final ObjectID lookupID;
1001: private final boolean missingOk;
1002: private final Set lookupIDs = new HashSet();
1003: private boolean resultSet = false;
1004: private ManagedObject result;
1005:
1006: public WaitForLookupContext(ObjectID id, boolean missingOk) {
1007: this .lookupID = id;
1008: this .missingOk = missingOk;
1009: lookupIDs.add(id);
1010: }
1011:
1012: public synchronized ManagedObject getLookedUpObject() {
1013: while (!resultSet) {
1014: try {
1015: wait();
1016: } catch (InterruptedException e) {
1017: throw new AssertionError(e);
1018: }
1019: }
1020: return result;
1021: }
1022:
1023: public Set getLookupIDs() {
1024: return lookupIDs;
1025: }
1026:
1027: public Set getNewObjectIDs() {
1028: return Collections.EMPTY_SET;
1029: }
1030:
1031: public synchronized void setResults(
1032: ObjectManagerLookupResults results) {
1033: resultSet = true;
1034: Map objects = results.getObjects();
1035: Assert.assertTrue(objects.size() == 0
1036: || objects.size() == 1);
1037: if (objects.size() == 1) {
1038: result = (ManagedObject) objects.get(lookupID);
1039: Assert.assertNotNull(result);
1040: }
1041: notifyAll();
1042: }
1043:
1044: public void missingObject(ObjectID oid) {
1045: if (!missingOk) {
1046: throw new AssertionError(
1047: "Lookup of non-exisiting object : " + oid + " "
1048: + this );
1049: }
1050: }
1051:
1052: public String toString() {
1053: return "WaitForLookupContext [ " + lookupID
1054: + ", missingOK = " + missingOk + "]";
1055: }
1056:
1057: }
1058:
1059: private static class Pending {
1060: private final ObjectManagerLookupContext context;
1061: private final NodeID groupingKey;
1062: private final int maxReachableObjects;
1063:
1064: public Pending(NodeID nodeID,
1065: ObjectManagerLookupContext context,
1066: int maxReachableObjects) {
1067: this .groupingKey = nodeID;
1068: this .context = context;
1069: this .maxReachableObjects = maxReachableObjects;
1070: }
1071:
1072: public String toString() {
1073: return "ObjectManagerImpl.Pending[groupingKey="
1074: + groupingKey + "]";
1075:
1076: }
1077:
1078: public NodeID getNodeID() {
1079: return this .groupingKey;
1080: }
1081:
1082: public ObjectManagerLookupContext getRequestContext() {
1083: return context;
1084: }
1085:
1086: public int getMaxReachableObjects() {
1087: return maxReachableObjects;
1088: }
1089:
1090: }
1091:
1092: private static class PendingList {
1093: List pending = new ArrayList();
1094: Map blocked = new HashMap();
1095:
1096: public void makeBlocked(ObjectID blockedOid, Pending pd) {
1097: ArrayList blockedRequests = (ArrayList) blocked
1098: .get(blockedOid);
1099: if (blockedRequests == null) {
1100: blockedRequests = new ArrayList(1);
1101: blocked.put(blockedOid, blockedRequests);
1102: }
1103: blockedRequests.add(pd);
1104: }
1105:
1106: public boolean isBlocked(ObjectID id) {
1107: return blocked.containsKey(id);
1108: }
1109:
1110: public void makeUnBlocked(ObjectID id) {
1111: ArrayList blockedRequests = (ArrayList) blocked.remove(id);
1112: if (blockedRequests != null) {
1113: pending.addAll(blockedRequests);
1114: }
1115: }
1116:
1117: public List getAndResetPendingRequests() {
1118: List rv = pending;
1119: pending = new ArrayList();
1120: return rv;
1121: }
1122:
1123: public void addPending(Pending pd) {
1124: pending.add(pd);
1125: }
1126:
1127: public int size() {
1128: return pending.size();
1129: }
1130: }
1131:
1132: /*********************************************************************************************************************
1133: * ManagedObjectChangeListener interface
1134: */
1135: public void changed(ObjectID changedObject, ObjectID oldReference,
1136: ObjectID newReference) {
1137: collector.changed(changedObject, oldReference, newReference);
1138: }
1139:
1140: private static Set createNewSet() {
1141: return new HashSet(INITIAL_SET_SIZE, LOAD_FACTOR);
1142: }
1143: }
|