0001: /*
0002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
0003: * notice. All rights reserved.
0004: */
0005: package com.tc.object;
0006:
0007: import com.tc.exception.TCClassNotFoundException;
0008: import com.tc.exception.TCNonPortableObjectError;
0009: import com.tc.exception.TCRuntimeException;
0010: import com.tc.logging.ChannelIDLogger;
0011: import com.tc.logging.CustomerLogging;
0012: import com.tc.logging.TCLogger;
0013: import com.tc.logging.TCLogging;
0014: import com.tc.net.protocol.tcm.ChannelIDProvider;
0015: import com.tc.object.appevent.ApplicationEvent;
0016: import com.tc.object.appevent.ApplicationEventContext;
0017: import com.tc.object.appevent.NonPortableEventContext;
0018: import com.tc.object.appevent.NonPortableEventContextFactory;
0019: import com.tc.object.appevent.NonPortableFieldSetContext;
0020: import com.tc.object.appevent.NonPortableObjectEvent;
0021: import com.tc.object.appevent.NonPortableRootContext;
0022: import com.tc.object.bytecode.Manageable;
0023: import com.tc.object.bytecode.ManagerUtil;
0024: import com.tc.object.bytecode.TransparentAccess;
0025: import com.tc.object.cache.CacheStats;
0026: import com.tc.object.cache.Evictable;
0027: import com.tc.object.cache.EvictionPolicy;
0028: import com.tc.object.config.DSOClientConfigHelper;
0029: import com.tc.object.dna.api.DNA;
0030: import com.tc.object.idprovider.api.ObjectIDProvider;
0031: import com.tc.object.loaders.ClassProvider;
0032: import com.tc.object.loaders.Namespace;
0033: import com.tc.object.logging.RuntimeLogger;
0034: import com.tc.object.msg.JMXMessage;
0035: import com.tc.object.net.DSOClientMessageChannel;
0036: import com.tc.object.tx.ClientTransactionManager;
0037: import com.tc.object.tx.optimistic.OptimisticTransactionManager;
0038: import com.tc.object.tx.optimistic.TCObjectClone;
0039: import com.tc.object.util.IdentityWeakHashMap;
0040: import com.tc.object.walker.ObjectGraphWalker;
0041: import com.tc.text.ConsoleNonPortableReasonFormatter;
0042: import com.tc.text.ConsoleParagraphFormatter;
0043: import com.tc.text.NonPortableReasonFormatter;
0044: import com.tc.text.ParagraphFormatter;
0045: import com.tc.text.StringFormatter;
0046: import com.tc.util.Assert;
0047: import com.tc.util.NonPortableReason;
0048: import com.tc.util.State;
0049: import com.tc.util.Util;
0050: import com.tc.util.concurrent.StoppableThread;
0051:
0052: import java.io.PrintWriter;
0053: import java.io.StringWriter;
0054: import java.lang.ref.ReferenceQueue;
0055: import java.lang.ref.WeakReference;
0056: import java.lang.reflect.Array;
0057: import java.lang.reflect.InvocationHandler;
0058: import java.lang.reflect.InvocationTargetException;
0059: import java.lang.reflect.Method;
0060: import java.lang.reflect.Proxy;
0061: import java.util.ArrayList;
0062: import java.util.Collection;
0063: import java.util.Collections;
0064: import java.util.HashMap;
0065: import java.util.HashSet;
0066: import java.util.IdentityHashMap;
0067: import java.util.Iterator;
0068: import java.util.List;
0069: import java.util.Map;
0070: import java.util.Set;
0071:
0072: public class ClientObjectManagerImpl implements ClientObjectManager,
0073: PortableObjectProvider, Evictable {
0074:
0075: private static final State PAUSED = new State("PAUSED");
0076: private static final State STARTING = new State("STARTING");
0077: private static final State RUNNING = new State("RUNNING");
0078:
0079: private static final LiteralValues literals = new LiteralValues();
0080: private static final TCLogger staticLogger = TCLogging
0081: .getLogger(ClientObjectManager.class);
0082:
0083: private static final long POLL_TIME = 1000;
0084: private static final long STOP_WAIT = POLL_TIME * 3;
0085:
0086: private static final int NO_DEPTH = 0;
0087:
0088: private static final int COMMIT_SIZE = 100;
0089:
0090: private State state = RUNNING;
0091: private final Object shutdownLock = new Object();
0092: private final Map roots = new HashMap();
0093: private final Map idToManaged = new HashMap();
0094: private final Map pojoToManaged = new IdentityWeakHashMap();
0095: private final ClassProvider classProvider;
0096: private final RemoteObjectManager remoteObjectManager;
0097: private final EvictionPolicy cache;
0098: private final Traverser traverser;
0099: private final Traverser shareObjectsTraverser;
0100: private final TraverseTest traverseTest;
0101: private final DSOClientConfigHelper clientConfiguration;
0102: private final TCClassFactory clazzFactory;
0103: private final Set objectLookupsInProgress = new HashSet();
0104: private final Set rootLookupsInProgress = new HashSet();
0105: private final ObjectIDProvider idProvider;
0106: private final ReferenceQueue referenceQueue = new ReferenceQueue();
0107: private final TCObjectFactory factory;
0108:
0109: private ClientTransactionManager txManager;
0110:
0111: private StoppableThread reaper = null;
0112: private final TCLogger logger;
0113: private final RuntimeLogger runtimeLogger;
0114: private final NonPortableEventContextFactory appEventContextFactory;
0115: private final ThreadLocal localCreationInProgress = new ThreadLocal();
0116: private final Set pendingCreateTCObjects = new HashSet();
0117: private final Portability portability;
0118: private final DSOClientMessageChannel channel;
0119:
0120: private final boolean sendErrors = System
0121: .getProperty("project.name") != null;
0122:
0123: public ClientObjectManagerImpl(
0124: RemoteObjectManager remoteObjectManager,
0125: DSOClientConfigHelper clientConfiguration,
0126: ObjectIDProvider idProvider, EvictionPolicy cache,
0127: RuntimeLogger runtimeLogger, ChannelIDProvider provider,
0128: ClassProvider classProvider, TCClassFactory classFactory,
0129: TCObjectFactory objectFactory, Portability portability,
0130: DSOClientMessageChannel channel) {
0131: this .remoteObjectManager = remoteObjectManager;
0132: this .cache = cache;
0133: this .clientConfiguration = clientConfiguration;
0134: this .idProvider = idProvider;
0135: this .runtimeLogger = runtimeLogger;
0136: this .portability = portability;
0137: this .channel = channel;
0138: this .logger = new ChannelIDLogger(provider, TCLogging
0139: .getLogger(ClientObjectManager.class));
0140: this .classProvider = classProvider;
0141: this .traverseTest = new NewObjectTraverseTest();
0142: this .traverser = new Traverser(new AddManagedObjectAction(),
0143: this );
0144: this .shareObjectsTraverser = new Traverser(
0145: new SharedObjectsAction(), this );
0146: this .clazzFactory = classFactory;
0147: this .factory = objectFactory;
0148: this .factory.setObjectManager(this );
0149: this .appEventContextFactory = new NonPortableEventContextFactory(
0150: provider);
0151:
0152: if (logger.isDebugEnabled()) {
0153: logger.debug("Starting up ClientObjectManager:"
0154: + System.identityHashCode(this ) + ". Cache SIZE = "
0155: + cache.getCacheCapacity());
0156: }
0157: startReaper();
0158: }
0159:
0160: public Class getClassFor(String className, String loaderDesc)
0161: throws ClassNotFoundException {
0162: return classProvider.getClassFor(className, loaderDesc);
0163: }
0164:
0165: public synchronized void pause() {
0166: assertNotPaused("Attempt to pause while PAUSED");
0167: state = PAUSED;
0168: notifyAll();
0169: }
0170:
0171: public synchronized void starting() {
0172: assertPaused("Attempt to start while not PAUSED");
0173: state = STARTING;
0174: notifyAll();
0175: }
0176:
0177: public synchronized void unpause() {
0178: assertStarting("Attempt to unpause while not STARTING");
0179: state = RUNNING;
0180: notifyAll();
0181: }
0182:
0183: public Object createParentCopyInstanceIfNecessary(Map visited,
0184: Map cloned, Object v) {
0185: TCClass tcc = getOrCreateClass(v.getClass());
0186: Object parent = null;
0187: if (tcc.isNonStaticInner()) {
0188: TransparentAccess access = (TransparentAccess) v;
0189: Map m = new HashMap();
0190: access.__tc_getallfields(m);
0191: Object p = m.get(tcc.getParentFieldName());
0192: parent = visited.containsKey(p) ? visited.get(p)
0193: : createNewCopyInstance(p, null);
0194: visited.put(p, parent);
0195: cloned.put(p, parent);
0196: }
0197: return parent;
0198: }
0199:
0200: private void waitUntilRunning() {
0201: boolean isInterrupted = false;
0202:
0203: while (state != RUNNING) {
0204: try {
0205: wait();
0206: } catch (InterruptedException e) {
0207: isInterrupted = true;
0208: }
0209: }
0210: Util.selfInterruptIfNeeded(isInterrupted);
0211: }
0212:
0213: private void assertPaused(Object message) {
0214: if (state != PAUSED)
0215: throw new AssertionError(message + ": " + state);
0216: }
0217:
0218: private void assertStarting(Object message) {
0219: if (state != STARTING)
0220: throw new AssertionError(message + ": " + state);
0221: }
0222:
0223: private void assertNotPaused(Object message) {
0224: if (state == PAUSED)
0225: throw new AssertionError(message + ": " + state);
0226: }
0227:
0228: public TraversedReferences getPortableObjects(Class clazz,
0229: Object start, TraversedReferences addTo) {
0230: TCClass tcc = clazzFactory.getOrCreate(clazz, this );
0231: return tcc.getPortableObjects(start, addTo);
0232: }
0233:
0234: public void setTransactionManager(ClientTransactionManager txManager) {
0235: this .txManager = txManager;
0236: }
0237:
0238: public ClientTransactionManager getTransactionManager() {
0239: return txManager;
0240: }
0241:
0242: /**
0243: * Deep connected copy used to create stable views on collections of objects. While inefficient this should do that
0244: * job. It is important that this method be called holding a distributed lock in order to prevent an unstable view. It
0245: * also must be called in an optimistic transaction I'll probably move this out of the client object manager at some
0246: * point but we'll see.
0247: */
0248: public Object deepCopy(Object source,
0249: OptimisticTransactionManager optimisticTxManager) {
0250: IdentityHashMap cloned = new IdentityHashMap();
0251: IdentityHashMap visited = new IdentityHashMap();
0252:
0253: Object parent = this .createParentCopyInstanceIfNecessary(
0254: visited, cloned, source);
0255: Object copy = createNewCopyInstance(source, parent);
0256:
0257: Assert.eval(copy != null);
0258:
0259: visited.put(source, copy);
0260: optimisticTxManager.addClonesToTransaction(visited);
0261:
0262: cloneAndUpdate(optimisticTxManager, cloned, visited, source,
0263: copy);
0264: while (!cloned.isEmpty()) {
0265: Object original = cloned.keySet().iterator().next(); // ick
0266: Object clone = cloned.get(original);
0267: cloned.remove(original);
0268: cloneAndUpdate(optimisticTxManager, cloned, visited,
0269: original, clone);
0270: }
0271:
0272: return copy;
0273: }
0274:
0275: /**
0276: * While holding the resolve lock to protect against the cleaner create a new copy of the original that is connected
0277: * to the copy and has any references replaced with either an existing clone or a new clone where needed. New clones
0278: * created in the connected copy are returned so that they can be properly updated from their originals. The reason
0279: * for this strategy is to avoid recurrsion (and stack over flows)
0280: */
0281: private void cloneAndUpdate(
0282: OptimisticTransactionManager optimisticTxManager,
0283: IdentityHashMap cloned, IdentityHashMap visited,
0284: Object original, Object clone) {
0285: TCClass tcc;
0286: TCObject tco;
0287: tcc = this .getOrCreateClass(original.getClass());
0288: tco = this .lookupExistingOrNull(original);
0289: synchronized (tco.getResolveLock()) {
0290: tco.resolveAllReferences();
0291: Map c = tcc.connectedCopy(original, clone, visited,
0292: optimisticTxManager);
0293: optimisticTxManager.addClonesToTransaction(c);
0294: cloned.putAll(c);
0295: }
0296: }
0297:
0298: private TCObject create(Object pojo, NonPortableEventContext context) {
0299: addToManagedFromRoot(pojo, context);
0300: return basicLookup(pojo);
0301: }
0302:
0303: private TCObject share(Object pojo, NonPortableEventContext context) {
0304: addToSharedFromRoot(pojo, context);
0305: return basicLookup(pojo);
0306: }
0307:
0308: public ReferenceQueue getReferenceQueue() {
0309: return referenceQueue;
0310: }
0311:
0312: public void shutdown() {
0313: synchronized (shutdownLock) {
0314: if (reaper != null) {
0315: try {
0316: stopThread(reaper);
0317: } finally {
0318: reaper = null;
0319: }
0320: }
0321: }
0322: }
0323:
0324: private static void stopThread(StoppableThread thread) {
0325: try {
0326: thread.stopAndWait(STOP_WAIT);
0327: } finally {
0328: if (thread.isAlive()) {
0329: staticLogger.warn(thread.getName() + " is still alive");
0330: }
0331: }
0332: }
0333:
0334: public TCObject lookupOrCreate(Object pojo) {
0335: if (pojo == null)
0336: return TCObjectFactory.NULL_TC_OBJECT;
0337: return lookupOrCreateIfNecesary(pojo,
0338: this .appEventContextFactory
0339: .createNonPortableEventContext(pojo));
0340: }
0341:
0342: private TCObject lookupOrCreate(Object pojo,
0343: NonPortableEventContext context) {
0344: if (pojo == null)
0345: return TCObjectFactory.NULL_TC_OBJECT;
0346: return lookupOrCreateIfNecesary(pojo, context);
0347: }
0348:
0349: public TCObject lookupOrShare(Object pojo) {
0350: if (pojo == null)
0351: return TCObjectFactory.NULL_TC_OBJECT;
0352: return lookupOrShareIfNecesary(pojo,
0353: this .appEventContextFactory
0354: .createNonPortableEventContext(pojo));
0355: }
0356:
0357: private TCObject lookupOrShareIfNecesary(Object pojo,
0358: NonPortableEventContext context) {
0359: Assert.assertNotNull(pojo);
0360: TCObject obj = basicLookup(pojo);
0361: if (obj == null || obj.isNew()) {
0362: obj = share(pojo, context);
0363: }
0364: return obj;
0365: }
0366:
0367: private TCObject lookupOrCreateIfNecesary(Object pojo,
0368: NonPortableEventContext context) {
0369: Assert.assertNotNull(pojo);
0370: TCObject obj = basicLookup(pojo);
0371: if (obj == null || obj.isNew()) {
0372: executePreCreateMethod(pojo);
0373: obj = create(pojo, context);
0374: }
0375: return obj;
0376: }
0377:
0378: private void executePreCreateMethod(Object pojo) {
0379: String onLookupMethodName = clientConfiguration
0380: .getPreCreateMethodIfDefined(pojo.getClass().getName());
0381: if (onLookupMethodName != null) {
0382: executeMethod(pojo, onLookupMethodName,
0383: "preCreate method (" + onLookupMethodName
0384: + ") failed on object of "
0385: + pojo.getClass());
0386: }
0387: }
0388:
0389: /**
0390: * This method is created for situations in which a method needs to be taken place when an object moved from
0391: * non-shared to shared. The method could be an instrumented method. For instance, for ConcurrentHashMap, we need to
0392: * re-hash the objects already in the map because the hashing algorithm is different when a ConcurrentHashMap is
0393: * shared. The rehash method is an instrumented method. This should be executed only once.
0394: */
0395: private void executePostCreateMethod(Object pojo) {
0396: String onLookupMethodName = clientConfiguration
0397: .getPostCreateMethodIfDefined(pojo.getClass().getName());
0398: if (onLookupMethodName != null) {
0399: executeMethod(pojo, onLookupMethodName,
0400: "postCreate method (" + onLookupMethodName
0401: + ") failed on object of "
0402: + pojo.getClass());
0403: }
0404: }
0405:
0406: private void executeMethod(Object pojo, String onLookupMethodName,
0407: String loggingMessage) {
0408: // This method used to use beanshell, but I changed it to reflection to hopefully avoid a deadlock -- CDV-130
0409:
0410: try {
0411: Method m = pojo.getClass().getDeclaredMethod(
0412: onLookupMethodName, new Class[] {});
0413: m.setAccessible(true);
0414: m.invoke(pojo, new Object[] {});
0415: } catch (Throwable t) {
0416: if (t instanceof InvocationTargetException) {
0417: t = t.getCause();
0418: }
0419: logger.warn(loggingMessage, t);
0420: if (!(t instanceof RuntimeException)) {
0421: t = new RuntimeException(t);
0422: }
0423: throw (RuntimeException) t;
0424: }
0425: }
0426:
0427: private TCObject lookupExistingLiteralRootOrNull(String rootName) {
0428: ObjectID rootID = (ObjectID) roots.get(rootName);
0429: return basicLookupByID(rootID);
0430: }
0431:
0432: public TCObject lookupExistingOrNull(Object pojo) {
0433: return basicLookup(pojo);
0434: }
0435:
0436: public synchronized ObjectID lookupExistingObjectID(Object pojo) {
0437: TCObject obj = basicLookup(pojo);
0438: if (obj == null) {
0439: throw new AssertionError("Missing object ID for:" + pojo);
0440: }
0441: return obj.getObjectID();
0442: }
0443:
0444: public void markReferenced(TCObject tcobj) {
0445: cache.markReferenced(tcobj);
0446: }
0447:
0448: public Object lookupObjectNoDepth(ObjectID id)
0449: throws ClassNotFoundException {
0450: return lookupObject(id, null, true);
0451: }
0452:
0453: public Object lookupObject(ObjectID objectID)
0454: throws ClassNotFoundException {
0455: return lookupObject(objectID, null, false);
0456: }
0457:
0458: public Object lookupObject(ObjectID id, ObjectID parentContext)
0459: throws ClassNotFoundException {
0460: return lookupObject(id, parentContext, false);
0461: }
0462:
0463: private Object lookupObject(ObjectID objectID,
0464: ObjectID parentContext, boolean noDepth)
0465: throws ClassNotFoundException {
0466: if (objectID.isNull())
0467: return null;
0468: Object o = null;
0469: while (o == null) {
0470: final TCObject tco = lookup(objectID, parentContext,
0471: noDepth);
0472: if (tco == null)
0473: throw new AssertionError("TCObject was null for "
0474: + objectID);// continue;
0475:
0476: o = tco.getPeerObject();
0477: if (o == null) {
0478: reap(objectID);
0479: }
0480: }
0481: return o;
0482: }
0483:
0484: private void reap(ObjectID objectID) {
0485: synchronized (this ) {
0486: if (!basicHasLocal(objectID)) {
0487: if (logger.isDebugEnabled())
0488: logger
0489: .debug(System.identityHashCode(this )
0490: + " Entry removed before reaper got the chance: "
0491: + objectID);
0492: } else {
0493: TCObjectImpl tcobj = (TCObjectImpl) basicLookupByID(objectID);
0494: if (tcobj.isNull()) {
0495: idToManaged.remove(objectID);
0496: cache.remove(tcobj);
0497: remoteObjectManager.removed(objectID);
0498: }
0499: }
0500: }
0501: }
0502:
0503: public boolean isManaged(Object pojo) {
0504: return pojo != null
0505: && !literals.isLiteral(pojo.getClass().getName())
0506: && lookupExistingOrNull(pojo) != null;
0507: }
0508:
0509: public boolean isCreationInProgress() {
0510: Map m = (Map) localCreationInProgress.get();
0511: return (m != null) && (m.size() > 0);
0512: }
0513:
0514: // Dealing with the case where a map contains a map. The faulting will deadlock without this stuff
0515: private TCObject getCreationInProgress(ObjectID id) {
0516: Map m = (Map) localCreationInProgress.get();
0517: if (m == null)
0518: return null;
0519: return (TCObject) m.get(id);
0520: }
0521:
0522: private void setCreationInProgress(ObjectID id, Object obj) {
0523: Map m = (Map) localCreationInProgress.get();
0524: if (m == null) {
0525: m = new HashMap();
0526: localCreationInProgress.set(m);
0527: }
0528: m.put(id, obj);
0529: txManager.disableTransactionLogging(); // We dont want to log changes to transaction until we hydrate the new
0530: // object.
0531: }
0532:
0533: private void removeCreationInProgress(ObjectID id) {
0534: Map m = (Map) localCreationInProgress.get();
0535: Assert.assertNotNull(m);
0536: m.remove(id);
0537: txManager.enableTransactionLogging();
0538: }
0539:
0540: // Done
0541:
0542: public TCObject lookup(ObjectID id) throws ClassNotFoundException {
0543: return lookup(id, null, false);
0544: }
0545:
0546: private TCObject lookup(ObjectID id, ObjectID parentContext,
0547: boolean noDepth) throws ClassNotFoundException {
0548: TCObject obj = null;
0549: boolean retrieveNeeded = false;
0550: boolean isInterrupted = false;
0551:
0552: synchronized (this ) {
0553: while (obj == null) {
0554: obj = basicLookupByID(id);
0555: if (obj != null)
0556: return obj;
0557: obj = getCreationInProgress(id);
0558: if (obj != null)
0559: return obj;
0560: if (!objectLookupInProgress(id)) {
0561: retrieveNeeded = true;
0562: markObjectLookupInProgress(id);
0563: break;
0564: } else {
0565: try {
0566: wait();
0567: } catch (InterruptedException ie) {
0568: isInterrupted = true;
0569: }
0570: }
0571: }
0572: }
0573: Util.selfInterruptIfNeeded(isInterrupted);
0574:
0575: if (retrieveNeeded) {
0576: try {
0577: DNA dna = noDepth ? remoteObjectManager.retrieve(id,
0578: NO_DEPTH)
0579: : (parentContext == null ? remoteObjectManager
0580: .retrieve(id) : remoteObjectManager
0581: .retrieveWithParentContext(id,
0582: parentContext));
0583: // obj = factory.getNewInstance(id, classProvider.getClassFor(dna.getTypeName(), dna
0584: // .getDefiningLoaderDescription()));
0585: obj = factory.getNewInstance(id, classProvider
0586: .getClassFor(Namespace
0587: .parseClassNameIfNecessary(dna
0588: .getTypeName()), dna
0589: .getDefiningLoaderDescription()));
0590: setCreationInProgress(id, obj);
0591: Assert.assertFalse(dna.isDelta());
0592: obj.hydrate(dna, false);
0593: } catch (ClassNotFoundException e) {
0594: logger.warn("Exception: ", e);
0595: throw e;
0596: } finally {
0597: if (obj != null)
0598: removeCreationInProgress(id);
0599: }
0600: basicAddLocal(obj);
0601: }
0602: return obj;
0603:
0604: }
0605:
0606: public synchronized TCObject lookupIfLocal(ObjectID id) {
0607: return basicLookupByID(id);
0608: }
0609:
0610: public synchronized Collection getAllObjectIDsAndClear(Collection c) {
0611: assertStarting("Called when not in STARTING state !");
0612: for (Iterator i = idToManaged.keySet().iterator(); i.hasNext();) {
0613: c.add(i.next());
0614: }
0615: remoteObjectManager.clear();
0616: return c;
0617: }
0618:
0619: public Object lookupRoot(String rootName) {
0620: try {
0621: return lookupRootOptionallyCreateOrReplace(rootName, null,
0622: false, true, false);
0623: } catch (ClassNotFoundException e) {
0624: throw new TCClassNotFoundException(e);
0625: }
0626: }
0627:
0628: /**
0629: * Check to see if the root is already in existence on the server. If it is then get it if not then create it.
0630: */
0631: public Object lookupOrCreateRoot(String rootName, Object root) {
0632: try {
0633: return lookupOrCreateRoot(rootName, root, true, false);
0634: } catch (ClassNotFoundException e) {
0635: throw new TCClassNotFoundException(e);
0636: }
0637: }
0638:
0639: /**
0640: * This method must be called within a DSO synchronized context. Currently, this is called in a setter method of a
0641: * replaceable root.
0642: */
0643: public Object createOrReplaceRoot(String rootName, Object root) {
0644: Object existingRoot = lookupRoot(rootName);
0645: if (existingRoot == null) {
0646: return lookupOrCreateRoot(rootName, root, false);
0647: } else if (isLiteralPojo(root)) {
0648: TCObject tcObject = lookupExistingLiteralRootOrNull(rootName);
0649: tcObject.literalValueChanged(root, existingRoot);
0650: return root;
0651: } else {
0652: return lookupOrCreateRoot(rootName, root, false);
0653: }
0654: }
0655:
0656: public Object lookupOrCreateRootNoDepth(String rootName, Object root) {
0657: try {
0658: return lookupOrCreateRoot(rootName, root, true, true);
0659: } catch (ClassNotFoundException e) {
0660: throw new TCClassNotFoundException(e);
0661: }
0662: }
0663:
0664: public Object lookupOrCreateRoot(String rootName, Object root,
0665: boolean dsoFinal) {
0666: try {
0667: return lookupOrCreateRoot(rootName, root, dsoFinal, false);
0668: } catch (ClassNotFoundException e) {
0669: throw new TCClassNotFoundException(e);
0670: }
0671: }
0672:
0673: private boolean isLiteralPojo(Object pojo) {
0674: return !(pojo instanceof Class)
0675: && literals.isLiteralInstance(pojo);
0676: }
0677:
0678: private Object lookupOrCreateRoot(String rootName, Object root,
0679: boolean dsoFinal, boolean noDepth)
0680: throws ClassNotFoundException {
0681: if (root != null) {
0682: // this will throw an exception if root is not portable
0683: this
0684: .checkPortabilityOfRoot(root, rootName, root
0685: .getClass());
0686: }
0687:
0688: return lookupRootOptionallyCreateOrReplace(rootName, root,
0689: true, dsoFinal, noDepth);
0690: }
0691:
0692: private void checkPortabilityOfTraversedReference(
0693: TraversedReference reference, Class referringClass,
0694: NonPortableEventContext context) {
0695: NonPortableReason reason = checkPortabilityOf(reference
0696: .getValue());
0697: if (reason != null) {
0698: reason.addDetail("Referring class", referringClass
0699: .getName());
0700: if (!reference.isAnonymous()) {
0701: String fullyQualifiedFieldname = reference
0702: .getFullyQualifiedReferenceName();
0703: reason
0704: .setUltimateNonPortableFieldName(fullyQualifiedFieldname);
0705: }
0706: dumpObjectHierarchy(context.getPojo(), context);
0707: if (sendErrors) {
0708: storeObjectHierarchy(context.getPojo(), context);
0709: }
0710: throwNonPortableException(
0711: context.getPojo(),
0712: reason,
0713: context,
0714: "Attempt to share an instance of a non-portable class referenced by a portable class.");
0715: }
0716: }
0717:
0718: private void checkPortabilityOfRoot(Object root, String rootName,
0719: Class rootType) throws TCNonPortableObjectError {
0720: NonPortableReason reason = checkPortabilityOf(root);
0721: if (reason != null) {
0722: NonPortableRootContext context = this .appEventContextFactory
0723: .createNonPortableRootContext(rootName, root);
0724: dumpObjectHierarchy(root, context);
0725: if (sendErrors) {
0726: storeObjectHierarchy(root, context);
0727: }
0728: throwNonPortableException(
0729: root,
0730: reason,
0731: context,
0732: "Attempt to share an instance of a non-portable class by assigning it to a root.");
0733: }
0734: }
0735:
0736: public void checkPortabilityOfField(Object fieldValue,
0737: String fieldName, Object pojo)
0738: throws TCNonPortableObjectError {
0739: NonPortableReason reason = checkPortabilityOf(fieldValue);
0740: if (reason != null) {
0741: NonPortableFieldSetContext context = this .appEventContextFactory
0742: .createNonPortableFieldSetContext(pojo, fieldName,
0743: fieldValue);
0744: dumpObjectHierarchy(fieldValue, context);
0745: if (sendErrors) {
0746: storeObjectHierarchy(pojo, context);
0747: }
0748: throwNonPortableException(
0749: pojo,
0750: reason,
0751: context,
0752: "Attempt to set the field of a shared object to an instance of a non-portable class.");
0753: }
0754: }
0755:
0756: /**
0757: * This is used by the senders of ApplicationEvents to provide a version of a logically-managed pojo in the state it
0758: * would have been in had the ApplicationEvent not occurred.
0759: */
0760: public Object cloneAndInvokeLogicalOperation(Object pojo,
0761: String methodName, Object[] params) {
0762: try {
0763: Class c = pojo.getClass();
0764: Object o = c.newInstance();
0765: if (o instanceof Map) {
0766: ((Map) o).putAll((Map) pojo);
0767: } else if (o instanceof Collection) {
0768: ((Collection) o).addAll((Collection) pojo);
0769: }
0770: Method[] methods = c.getMethods();
0771: methodName = methodName.substring(0, methodName
0772: .indexOf('('));
0773: for (int i = 0; i < methods.length; i++) {
0774: Method m = methods[i];
0775: Class[] paramTypes = m.getParameterTypes();
0776: if (m.getName().equals(methodName)
0777: && params.length == paramTypes.length) {
0778: for (int j = 0; j < paramTypes.length; j++) {
0779: if (!paramTypes[j].isAssignableFrom(params[j]
0780: .getClass())) {
0781: m = null;
0782: break;
0783: }
0784: }
0785: if (m != null) {
0786: m.invoke(o, params);
0787: break;
0788: }
0789: }
0790: }
0791: pojo = o;
0792: } catch (Exception e) {
0793: logger.error("Unable to clone logical object", e);
0794: }
0795: return pojo;
0796: }
0797:
0798: public void checkPortabilityOfLogicalAction(Object[] params,
0799: int index, String methodName, Object pojo)
0800: throws TCNonPortableObjectError {
0801: Object param = params[index];
0802: NonPortableReason reason = checkPortabilityOf(param);
0803: if (reason != null) {
0804: NonPortableEventContext context = this .appEventContextFactory
0805: .createNonPortableLogicalInvokeContext(pojo,
0806: methodName, params, index);
0807: dumpObjectHierarchy(params[index], context);
0808: if (sendErrors) {
0809: storeObjectHierarchy(cloneAndInvokeLogicalOperation(
0810: pojo, methodName, params), context);
0811: }
0812: throwNonPortableException(
0813: pojo,
0814: reason,
0815: context,
0816: "Attempt to share an instance of a non-portable class by"
0817: + " passing it as an argument to a method of a logically-managed class.");
0818: }
0819: }
0820:
0821: private void throwNonPortableException(Object obj,
0822: NonPortableReason reason, NonPortableEventContext context,
0823: String message) throws TCNonPortableObjectError {
0824: // XXX: The message should probably be part of the context
0825: reason.setMessage(message);
0826: context.addDetailsTo(reason);
0827:
0828: // Send this event to L2
0829: JMXMessage jmxMsg = channel.getJMXMessage();
0830: jmxMsg
0831: .setJMXObject(new NonPortableObjectEvent(context,
0832: reason));
0833: jmxMsg.send();
0834:
0835: StringWriter formattedReason = new StringWriter();
0836: PrintWriter out = new PrintWriter(formattedReason);
0837: StringFormatter sf = new StringFormatter();
0838:
0839: ParagraphFormatter pf = new ConsoleParagraphFormatter(80, sf);
0840: NonPortableReasonFormatter reasonFormatter = new ConsoleNonPortableReasonFormatter(
0841: out, ": ", sf, pf);
0842: reason.accept(reasonFormatter);
0843: reasonFormatter.flush();
0844:
0845: throw new TCNonPortableObjectError(formattedReason.getBuffer()
0846: .toString());
0847: }
0848:
0849: private NonPortableReason checkPortabilityOf(Object obj) {
0850: if (!isPortableInstance(obj)) {
0851: return portability.getNonPortableReason(obj.getClass());
0852: }
0853: return null;
0854: }
0855:
0856: private boolean rootLookupInProgress(String rootName) {
0857: return rootLookupsInProgress.contains(rootName);
0858: }
0859:
0860: private void markRootLookupInProgress(String rootName) {
0861: boolean wasAdded = rootLookupsInProgress.add(rootName);
0862: if (!wasAdded)
0863: throw new AssertionError(
0864: "Attempt to mark a root lookup that is already in progress.");
0865: }
0866:
0867: private void markRootLookupNotInProgress(String rootName) {
0868: boolean removed = rootLookupsInProgress.remove(rootName);
0869: if (!removed)
0870: throw new AssertionError(
0871: "Attempt to unmark a root lookup that wasn't in progress.");
0872: }
0873:
0874: public synchronized void replaceRootIDIfNecessary(String rootName,
0875: ObjectID newRootID) {
0876: waitUntilRunning();
0877:
0878: ObjectID oldRootID = (ObjectID) roots.get(rootName);
0879: if (oldRootID == null || oldRootID.equals(newRootID)) {
0880: return;
0881: }
0882:
0883: roots.put(rootName, newRootID);
0884: }
0885:
0886: private Object lookupRootOptionallyCreateOrReplace(String rootName,
0887: Object rootPojo, boolean create, boolean dsoFinal,
0888: boolean noDepth) throws ClassNotFoundException {
0889: boolean replaceRootIfExistWhenCreate = !dsoFinal && create;
0890:
0891: ObjectID rootID = null;
0892:
0893: boolean retrieveNeeded = false;
0894: boolean isNew = false;
0895: boolean lookupInProgress = false;
0896: boolean isInterrupted = false;
0897:
0898: synchronized (this ) {
0899: while (true) {
0900: if (!replaceRootIfExistWhenCreate) {
0901: rootID = (ObjectID) roots.get(rootName);
0902: if (rootID != null) {
0903: break;
0904: }
0905: } else {
0906: rootID = ObjectID.NULL_ID;
0907: }
0908: if (!rootLookupInProgress(rootName)) {
0909: lookupInProgress = true;
0910: markRootLookupInProgress(rootName);
0911: break;
0912: } else {
0913: try {
0914: wait();
0915: } catch (InterruptedException e) {
0916: e.printStackTrace();
0917: isInterrupted = true;
0918: }
0919: }
0920: }
0921: }
0922: Util.selfInterruptIfNeeded(isInterrupted);
0923:
0924: retrieveNeeded = lookupInProgress
0925: && !replaceRootIfExistWhenCreate;
0926:
0927: isNew = retrieveNeeded || (rootID.isNull() && create);
0928:
0929: if (retrieveNeeded) {
0930: rootID = remoteObjectManager.retrieveRootID(rootName);
0931: }
0932:
0933: if (rootID.isNull() && create) {
0934: Assert.assertNotNull(rootPojo);
0935: // TODO:: Optimize this, do lazy instantiation
0936: TCObject root = null;
0937: if (isLiteralPojo(rootPojo)) {
0938: root = basicCreateIfNecessary(rootPojo);
0939: } else {
0940: root = lookupOrCreate(rootPojo,
0941: this .appEventContextFactory
0942: .createNonPortableRootContext(rootName,
0943: rootPojo));
0944: }
0945: rootID = root.getObjectID();
0946: txManager.createRoot(rootName, rootID);
0947: }
0948:
0949: synchronized (this ) {
0950: if (isNew && !rootID.isNull())
0951: roots.put(rootName, rootID);
0952: if (lookupInProgress) {
0953: markRootLookupNotInProgress(rootName);
0954: notifyAll();
0955: }
0956: }
0957:
0958: return lookupObject(rootID, null, noDepth);
0959: }
0960:
0961: private TCObject basicLookupByID(ObjectID id) {
0962: return (TCObject) idToManaged.get(id);
0963: }
0964:
0965: private boolean basicHasLocal(ObjectID id) {
0966: return basicLookupByID(id) != null;
0967: }
0968:
0969: private TCObject basicLookup(Object obj) {
0970: TCObject tcobj;
0971: if (obj instanceof Manageable) {
0972: tcobj = ((Manageable) obj).__tc_managed();
0973: } else {
0974: synchronized (pojoToManaged) {
0975: tcobj = (TCObject) pojoToManaged.get(obj);
0976: }
0977: }
0978: return tcobj;
0979: }
0980:
0981: private void basicAddLocal(TCObject obj) {
0982: synchronized (this ) {
0983: Assert.eval(!(obj instanceof TCObjectClone));
0984: if (basicHasLocal(obj.getObjectID())) {
0985: throw Assert
0986: .failure("Attempt to add an object that already exists: "
0987: + obj);
0988: }
0989: idToManaged.put(obj.getObjectID(), obj);
0990:
0991: Object pojo = obj.getPeerObject();
0992:
0993: if (pojo != null) {
0994: if (pojo.getClass().isArray()) {
0995: ManagerUtil.register(pojo, obj);
0996: }
0997:
0998: synchronized (pojoToManaged) {
0999: if (pojo instanceof Manageable) {
1000: Manageable m = (Manageable) pojo;
1001: if (m.__tc_managed() == null) {
1002: m.__tc_managed(obj);
1003: } else {
1004: Assert.assertTrue(m.__tc_managed() == obj);
1005: }
1006: } else {
1007: if (!isLiteralPojo(pojo)) {
1008: pojoToManaged.put(obj.getPeerObject(), obj);
1009: }
1010: }
1011: }
1012: }
1013: cache.add(obj);
1014: markObjectLookupNotInProgress(obj.getObjectID());
1015: notifyAll();
1016: }
1017: }
1018:
1019: private void addToManagedFromRoot(Object root,
1020: NonPortableEventContext context) {
1021: traverser.traverse(root, traverseTest, context);
1022: }
1023:
1024: private void dumpObjectHierarchy(Object root,
1025: NonPortableEventContext context) {
1026: // the catch is not in the called method so that when/if there is an OOME, the logging might have a chance of
1027: // actually working (as opposed to just throwing another OOME)
1028: try {
1029: dumpObjectHierarchy0(root, context);
1030: } catch (Throwable t) {
1031: logger.error(
1032: "error walking non-portable object instance of type "
1033: + root.getClass().getName(), t);
1034: }
1035: }
1036:
1037: private void dumpObjectHierarchy0(Object root,
1038: NonPortableEventContext context) {
1039: if (runtimeLogger.nonPortableDump()) {
1040: NonPortableWalkVisitor visitor = new NonPortableWalkVisitor(
1041: CustomerLogging.getDSORuntimeLogger(), this ,
1042: this .clientConfiguration, root);
1043: ObjectGraphWalker walker = new ObjectGraphWalker(root,
1044: visitor, visitor);
1045: walker.walk();
1046: }
1047: }
1048:
1049: public void sendApplicationEvent(Object pojo, ApplicationEvent event) {
1050: JMXMessage jmxMsg = channel.getJMXMessage();
1051: storeObjectHierarchy(pojo, event.getApplicationEventContext());
1052: jmxMsg.setJMXObject(event);
1053: jmxMsg.send();
1054: }
1055:
1056: public void storeObjectHierarchy(Object root,
1057: ApplicationEventContext context) {
1058: try {
1059: WalkVisitor wv = new WalkVisitor(this ,
1060: this .clientConfiguration, root, context);
1061: ObjectGraphWalker walker = new ObjectGraphWalker(context
1062: .getPojo(), wv, wv);
1063: walker.walk();
1064: context.setTreeModel(wv.getTreeModel());
1065: } catch (Throwable t) {
1066: t.printStackTrace();
1067: }
1068: }
1069:
1070: private void addToSharedFromRoot(Object root,
1071: NonPortableEventContext context) {
1072: shareObjectsTraverser.traverse(root, traverseTest, context);
1073: }
1074:
1075: private class AddManagedObjectAction implements TraversalAction {
1076: public void visit(List objects) {
1077: List tcObjects = basicCreateIfNecessary(objects);
1078: for (Iterator i = tcObjects.iterator(); i.hasNext();) {
1079: txManager.createObject((TCObject) i.next());
1080: }
1081: }
1082: }
1083:
1084: private class SharedObjectsAction implements TraversalAction {
1085: public void visit(List objects) {
1086: basicShareObjectsIfNecessary(objects);
1087: }
1088: }
1089:
1090: private class NewObjectTraverseTest implements TraverseTest {
1091:
1092: public boolean shouldTraverse(Object object) {
1093: // literals should be skipped -- without this check, literal members (field values, array element values, in
1094: // collection, etc) of newly shared instances would get TCObjects and ObjectIDs assigned to them.
1095: if (literals.isLiteralInstance(object)) {
1096: return false;
1097: }
1098:
1099: TCObject tco = basicLookup(object);
1100: if (tco == null) {
1101: return true;
1102: }
1103: return tco.isNew();
1104: }
1105:
1106: public void checkPortability(TraversedReference reference,
1107: Class referringClass, NonPortableEventContext context)
1108: throws TCNonPortableObjectError {
1109: ClientObjectManagerImpl.this
1110: .checkPortabilityOfTraversedReference(reference,
1111: referringClass, context);
1112: }
1113: }
1114:
1115: private TCObject basicCreateIfNecessary(Object pojo) {
1116: TCObject obj = null;
1117:
1118: if ((obj = basicLookup(pojo)) == null) {
1119: obj = factory.getNewInstance(nextObjectID(), pojo, pojo
1120: .getClass());
1121: obj.setIsNew();
1122: txManager.createObject(obj);
1123: basicAddLocal(obj);
1124: executePostCreateMethod(pojo);
1125: }
1126: return obj;
1127: }
1128:
1129: private synchronized List basicCreateIfNecessary(List pojos) {
1130: waitUntilRunning();
1131: List tcObjects = new ArrayList(pojos.size());
1132: for (Iterator i = pojos.iterator(); i.hasNext();) {
1133: tcObjects.add(basicCreateIfNecessary(i.next()));
1134: }
1135: return tcObjects;
1136: }
1137:
1138: private TCObject basicShareObjectIfNecessary(Object pojo) {
1139: TCObject obj = null;
1140:
1141: if ((obj = basicLookup(pojo)) == null) {
1142: obj = factory.getNewInstance(nextObjectID(), pojo, pojo
1143: .getClass());
1144: obj.setIsNew();
1145: pendingCreateTCObjects.add(obj);
1146: basicAddLocal(obj);
1147: }
1148: return obj;
1149: }
1150:
1151: private synchronized List basicShareObjectsIfNecessary(List pojos) {
1152: waitUntilRunning();
1153: List tcObjects = new ArrayList(pojos.size());
1154: for (Iterator i = pojos.iterator(); i.hasNext();) {
1155: tcObjects.add(basicShareObjectIfNecessary(i.next()));
1156: }
1157: return tcObjects;
1158: }
1159:
1160: public synchronized void addPendingCreateObjectsToTransaction() {
1161: for (Iterator i = pendingCreateTCObjects.iterator(); i
1162: .hasNext();) {
1163: TCObject tcObject = (TCObject) i.next();
1164: txManager.createObject(tcObject);
1165: }
1166: pendingCreateTCObjects.clear();
1167: }
1168:
1169: public synchronized boolean hasPendingCreateObjects() {
1170: return !pendingCreateTCObjects.isEmpty();
1171: }
1172:
1173: private ObjectID nextObjectID() {
1174: return idProvider.next();
1175: }
1176:
1177: private boolean objectLookupInProgress(ObjectID id) {
1178: return objectLookupsInProgress.contains(id);
1179: }
1180:
1181: private void markObjectLookupInProgress(ObjectID id) {
1182: objectLookupsInProgress.add(id);
1183: }
1184:
1185: private void markObjectLookupNotInProgress(ObjectID id) {
1186: objectLookupsInProgress.remove(id);
1187: }
1188:
1189: public WeakReference createNewPeer(TCClass clazz, DNA dna) {
1190: if (clazz.isUseNonDefaultConstructor()) {
1191: try {
1192: return new WeakObjectReference(dna.getObjectID(),
1193: factory.getNewPeerObject(clazz, dna),
1194: referenceQueue);
1195: } catch (Exception e) {
1196: throw new TCRuntimeException(e);
1197: }
1198: } else {
1199: return createNewPeer(clazz, dna.getArraySize(), dna
1200: .getObjectID(), dna.getParentObjectID());
1201: }
1202: }
1203:
1204: /**
1205: * Deep Clone support
1206: */
1207: public Object createNewCopyInstance(Object source, Object parent) {
1208: Assert.eval(!isLiteralPojo(source));
1209:
1210: TCClass clazz = this .getOrCreateClass(source.getClass());
1211:
1212: try {
1213: if (clazz.isProxyClass()) {
1214: InvocationHandler srcHandler = Proxy
1215: .getInvocationHandler(source);
1216: Class peerClass = clazz.getPeerClass();
1217: return Proxy.newProxyInstance(peerClass
1218: .getClassLoader(), peerClass.getInterfaces(),
1219: srcHandler);
1220: } else if (clazz.isIndexed()) {
1221: int size = Array.getLength(source);
1222: return factory.getNewArrayInstance(clazz, size);
1223: } else if (clazz.isNonStaticInner()) {
1224: Assert.eval(parent != null);
1225: return factory.getNewPeerObject(clazz, parent);
1226: } else {
1227: Assert.eval(parent == null);
1228: Object o = factory.getNewPeerObject(clazz);
1229: return o;
1230: }
1231: } catch (Exception e) {
1232: throw new TCRuntimeException(e);
1233: }
1234: }
1235:
1236: public WeakReference createNewPeer(TCClass clazz, int size,
1237: ObjectID id, ObjectID parentID) {
1238: try {
1239: if (clazz.isIndexed()) {
1240: Object array = factory.getNewArrayInstance(clazz, size);
1241: return new WeakObjectReference(id, array,
1242: referenceQueue);
1243: } else if (parentID.isNull()) {
1244: return new WeakObjectReference(id, factory
1245: .getNewPeerObject(clazz), referenceQueue);
1246: } else {
1247: return new WeakObjectReference(id,
1248: factory.getNewPeerObject(clazz,
1249: lookupObject(parentID)), referenceQueue);
1250: }
1251: } catch (Exception e) {
1252: throw new TCRuntimeException(e);
1253: }
1254: }
1255:
1256: public TCClass getOrCreateClass(Class clazz) {
1257: return clazzFactory.getOrCreate(clazz, this );
1258: }
1259:
1260: public boolean isPortableClass(Class clazz) {
1261: return portability.isPortableClass(clazz);
1262: }
1263:
1264: public boolean isPortableInstance(Object obj) {
1265: return portability.isPortableInstance(obj);
1266: }
1267:
1268: private void startReaper() {
1269: reaper = new StoppableThread("Reaper") {
1270: public void run() {
1271: while (true) {
1272: try {
1273: if (isStopRequested()) {
1274: return;
1275: }
1276:
1277: WeakObjectReference wor = (WeakObjectReference) referenceQueue
1278: .remove(POLL_TIME);
1279:
1280: if (wor != null) {
1281: ObjectID objectID = wor.getObjectID();
1282: reap(objectID);
1283: }
1284: } catch (InterruptedException e) {
1285: return;
1286: }
1287: }
1288: }
1289: };
1290: reaper.setDaemon(true);
1291: reaper.start();
1292: }
1293:
1294: // XXX::: Cache eviction doesnt clear it from the cache. it happens in reap().
1295: public void evictCache(CacheStats stat) {
1296: int size = idToManaged_size();
1297: int toEvict = stat.getObjectCountToEvict(size);
1298: if (toEvict <= 0)
1299: return;
1300: // Cache is full
1301: boolean debug = logger.isDebugEnabled();
1302: int totalReferencesCleared = 0;
1303: int toClear = toEvict;
1304: while (toEvict > 0 && toClear > 0) {
1305: int maxCount = Math.min(COMMIT_SIZE, toClear);
1306: Collection removalCandidates = cache
1307: .getRemovalCandidates(maxCount);
1308: if (removalCandidates.isEmpty())
1309: break; // couldnt find any more
1310: for (Iterator i = removalCandidates.iterator(); i.hasNext()
1311: && toClear > 0;) {
1312: TCObject removed = (TCObject) i.next();
1313: if (removed != null) {
1314: Object pr = removed.getPeerObject();
1315: if (pr != null) {
1316: int cleared = removed.clearReferences(toClear);
1317: totalReferencesCleared += cleared;
1318: if (debug) {
1319: logger.debug("Clearing:"
1320: + removed.getObjectID() + " class:"
1321: + pr.getClass()
1322: + " Total cleared = "
1323: + totalReferencesCleared);
1324: }
1325: toClear -= cleared;
1326: }
1327: }
1328: }
1329: toEvict -= removalCandidates.size();
1330: }
1331: // TODO:: Send the correct set of targetObjects2GC
1332: stat.objectEvicted(totalReferencesCleared, idToManaged_size(),
1333: Collections.EMPTY_LIST);
1334: }
1335:
1336: // XXX:: Not synchronizing to improve performance, should be called only during cache eviction
1337: private int idToManaged_size() {
1338: return idToManaged.size();
1339: }
1340:
1341: }
|