001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object;
006:
007: import com.tc.logging.TCLogger;
008: import com.tc.net.groups.ClientID;
009: import com.tc.object.dna.api.DNA;
010: import com.tc.object.msg.RequestManagedObjectMessage;
011: import com.tc.object.msg.RequestManagedObjectMessageFactory;
012: import com.tc.object.msg.RequestRootMessage;
013: import com.tc.object.msg.RequestRootMessageFactory;
014: import com.tc.object.session.SessionID;
015: import com.tc.object.session.SessionManager;
016: import com.tc.properties.TCPropertiesImpl;
017: import com.tc.util.Assert;
018: import com.tc.util.State;
019: import com.tc.util.Util;
020:
021: import gnu.trove.THashMap;
022:
023: import java.util.Collection;
024: import java.util.Collections;
025: import java.util.Date;
026: import java.util.HashMap;
027: import java.util.HashSet;
028: import java.util.Iterator;
029: import java.util.LinkedHashMap;
030: import java.util.Map;
031: import java.util.Set;
032: import java.util.Map.Entry;
033:
034: /**
035: * This class is a kludge but I think it will do the trick for now. It is responsible for any communications to the
036: * server for object retrieval and removal
037: */
038: public class RemoteObjectManagerImpl implements RemoteObjectManager {
039:
040: private static final State PAUSED = new State("PAUSED");
041: private static final State STARTING = new State("STARTING");
042: private static final State RUNNING = new State("RUNNING");
043:
044: private final LinkedHashMap rootRequests = new LinkedHashMap();
045: private final Map dnaRequests = new THashMap();
046: private final Map outstandingObjectRequests = new THashMap();
047: private final Map outstandingRootRequests = new THashMap();
048: private final Set missingObjectIDs = new HashSet();
049: private long objectRequestIDCounter = 0;
050: private final ObjectRequestMonitor requestMonitor;
051: private final ClientIDProvider cip;
052: private final RequestRootMessageFactory rrmFactory;
053: private final RequestManagedObjectMessageFactory rmomFactory;
054: private final DNALRU lruDNA = new DNALRU();
055: private final static int MAX_LRU = TCPropertiesImpl.getProperties()
056: .getInt("l1.objectmanager.remote.maxDNALRUSize");
057: private final static boolean ENABLE_LOGGING = TCPropertiesImpl
058: .getProperties().getBoolean(
059: "l1.objectmanager.remote.logging.enabled");
060: private final int defaultDepth;
061: private State state = RUNNING;
062: private Set removeObjects = new HashSet(256);
063: private final SessionManager sessionManager;
064: private final TCLogger logger;
065: private static final int REMOVE_OBJECTS_THRESHOLD = 10000;
066: private long hit = 0;
067: private long miss = 0;
068:
069: public RemoteObjectManagerImpl(TCLogger logger,
070: ClientIDProvider cip, RequestRootMessageFactory rrmFactory,
071: RequestManagedObjectMessageFactory rmomFactory,
072: ObjectRequestMonitor requestMonitor, int defaultDepth,
073: SessionManager sessionManager) {
074: this .logger = logger;
075: this .cip = cip;
076: this .rrmFactory = rrmFactory;
077: this .rmomFactory = rmomFactory;
078: this .requestMonitor = requestMonitor;
079: this .defaultDepth = defaultDepth;
080: this .sessionManager = sessionManager;
081: }
082:
083: public synchronized void pause() {
084: assertNotPaused("Attempt to pause while PAUSED");
085: state = PAUSED;
086: notifyAll();
087: }
088:
089: public synchronized void starting() {
090: assertPaused("Attempt to start while not PAUSED");
091: state = STARTING;
092: notifyAll();
093: }
094:
095: public synchronized void unpause() {
096: assertStarting("Attempt to unpause while not STARTING");
097: state = RUNNING;
098: notifyAll();
099: }
100:
101: public synchronized void clear() {
102: if (state != STARTING)
103: throw new AssertionError(
104: "Attempt to clear while not STARTING: " + state);
105: lruDNA.clear();
106: for (Iterator i = dnaRequests.entrySet().iterator(); i
107: .hasNext();) {
108: Entry e = (Entry) i.next();
109: if (e.getValue() != null) {
110: i.remove();
111: }
112: }
113: removeObjects.clear();
114: }
115:
116: private void waitUntilRunning() {
117: boolean isInterrupted = false;
118: while (state != RUNNING) {
119: try {
120: wait();
121: } catch (InterruptedException e) {
122: isInterrupted = true;
123: }
124: }
125: Util.selfInterruptIfNeeded(isInterrupted);
126: }
127:
128: private void assertPaused(Object message) {
129: if (state != PAUSED)
130: throw new AssertionError(message + ": " + state);
131: }
132:
133: private void assertStarting(Object message) {
134: if (state != STARTING)
135: throw new AssertionError(message + ": " + state);
136: }
137:
138: private void assertNotPaused(Object message) {
139: if (state == PAUSED)
140: throw new AssertionError(message + ": " + state);
141: }
142:
143: public synchronized void requestOutstanding() {
144: assertStarting("Attempt to request outstanding object requests while not STARTING");
145: for (Iterator i = outstandingObjectRequests.values().iterator(); i
146: .hasNext();) {
147: RequestManagedObjectMessage rmom = createRequestManagedObjectMessage(
148: (ObjectRequestContext) i.next(),
149: Collections.EMPTY_SET);
150: rmom.send();
151: }
152: for (Iterator i = outstandingRootRequests.values().iterator(); i
153: .hasNext();) {
154: RequestRootMessage rrm = createRootMessage((String) i
155: .next());
156: rrm.send();
157: }
158: }
159:
160: public DNA retrieve(ObjectID id) {
161: return basicRetrieve(id, defaultDepth, ObjectID.NULL_ID);
162: }
163:
164: public DNA retrieveWithParentContext(ObjectID id,
165: ObjectID parentContext) {
166: return basicRetrieve(id, defaultDepth, parentContext);
167: }
168:
169: public DNA retrieve(ObjectID id, int depth) {
170: return basicRetrieve(id, depth, ObjectID.NULL_ID);
171: }
172:
173: public synchronized DNA basicRetrieve(ObjectID id, int depth,
174: ObjectID parentContext) {
175: boolean isInterrupted = false;
176:
177: ObjectRequestContext ctxt = new ObjectRequestContextImpl(
178: this .cip.getClientID(), new ObjectRequestID(
179: objectRequestIDCounter++), id, depth,
180: parentContext);
181: boolean inMemory = true;
182: while (!dnaRequests.containsKey(id)
183: || dnaRequests.get(id) == null
184: || missingObjectIDs.contains(id)) {
185: waitUntilRunning();
186: if (missingObjectIDs.contains(id)) {
187: throw new AssertionError(
188: "Requested Object is missing : " + id
189: + " Missing Oids = " + missingObjectIDs);
190: } else if (!dnaRequests.containsKey(id)) {
191: inMemory = false;
192: sendRequest(ctxt);
193: } else if (!outstandingObjectRequests.containsKey(id)) {
194: outstandingObjectRequests.put(id, ctxt);
195: }
196:
197: if (dnaRequests.get(id) == null) {
198: try {
199: wait();
200: } catch (InterruptedException e) {
201: isInterrupted = true;
202: }
203: }
204: }
205: Util.selfInterruptIfNeeded(isInterrupted);
206: lruDNA.remove(id);
207: if (inMemory) {
208: hit++;
209: } else {
210: miss++;
211: }
212: if (ENABLE_LOGGING && ((hit + miss) % 1000 == 0)) {
213: logger.info("Cache Hit : Miss ratio = " + hit + " : "
214: + miss);
215: }
216: return (DNA) dnaRequests.remove(id);
217: }
218:
219: private void sendRequest(ObjectRequestContext ctxt) {
220: RequestManagedObjectMessage rmom = createRequestManagedObjectMessage(
221: ctxt, removeObjects);
222: removeObjects = new HashSet(256);
223: ObjectID id = null;
224: for (Iterator i = ctxt.getObjectIDs().iterator(); i.hasNext();) {
225: id = (ObjectID) i.next();
226: dnaRequests.put(id, null);
227: }
228: // XXX:: This is a little weird that we add only the last ObjectID to the outstandingObjectRequests map
229: // when we add all the list of ObjectIDs to dnaRequests. This is done so that we only send the request once
230: // on resend. Since the only way we request for more than one ObjectID in 1 message is when someone initiate
231: // non-blocking lookups. So if we loose those requests on restart it is still ok.
232: this .outstandingObjectRequests.put(id, ctxt);
233: rmom.send();
234: requestMonitor.notifyObjectRequest(ctxt);
235: }
236:
237: private RequestManagedObjectMessage createRequestManagedObjectMessage(
238: ObjectRequestContext ctxt, Set removed) {
239: RequestManagedObjectMessage rmom = rmomFactory
240: .newRequestManagedObjectMessage();
241: Set requestedObjectIDs = ctxt.getObjectIDs();
242: rmom.initialize(ctxt, requestedObjectIDs, removed);
243: return rmom;
244: }
245:
246: public synchronized ObjectID retrieveRootID(String name) {
247:
248: if (!rootRequests.containsKey(name)) {
249: RequestRootMessage rrm = createRootMessage(name);
250: rootRequests.put(name, ObjectID.NULL_ID);
251: outstandingRootRequests.put(name, name);
252: rrm.send();
253: }
254:
255: boolean isInterrupted = false;
256: while (ObjectID.NULL_ID.equals(rootRequests.get(name))) {
257: waitUntilRunning();
258: try {
259: if (ObjectID.NULL_ID.equals(rootRequests.get(name))) {
260: wait();
261: }
262: } catch (InterruptedException e) {
263: isInterrupted = true;
264: }
265: }
266: Util.selfInterruptIfNeeded(isInterrupted);
267:
268: return (ObjectID) (rootRequests.containsKey(name) ? rootRequests
269: .get(name)
270: : ObjectID.NULL_ID);
271: }
272:
273: private RequestRootMessage createRootMessage(String name) {
274: RequestRootMessage rrm = rrmFactory.newRequestRootMessage();
275: rrm.initialize(name);
276: return rrm;
277: }
278:
279: public synchronized void addRoot(String name, ObjectID id) {
280: waitUntilRunning();
281: if (id.isNull()) {
282: rootRequests.remove(name);
283: } else {
284: rootRequests.put(name, id);
285: }
286: Object rootName = outstandingRootRequests.remove(name);
287: if (rootName == null) {
288: // This is possible in some restart scenario
289: logger
290: .warn("A root was added that was not found in the outstanding requests. root name = "
291: + name + " " + id);
292: }
293: notifyAll();
294: }
295:
296: public synchronized void addAllObjects(SessionID sessionID,
297: long batchID, Collection dnas) {
298: waitUntilRunning();
299: if (!sessionManager.isCurrentSession(sessionID)) {
300: logger.warn("Ignoring DNA added from a different session: "
301: + sessionID + ", " + sessionManager);
302: return;
303: }
304: lruDNA.clearUnrequestedDNA();
305: lruDNA.add(batchID, dnas);
306: for (Iterator i = dnas.iterator(); i.hasNext();) {
307: DNA dna = (DNA) i.next();
308: // The server should not send us any objects that the server thinks we still have.
309: if (removeObjects.contains(dna.getObjectID())) {
310: // formatting
311: throw new AssertionError(
312: "Server sent us an object that is present in the removed set - "
313: + dna.getObjectID()
314: + " , removed set = " + removeObjects);
315: }
316: basicAddObject(dna);
317: }
318: notifyAll();
319: }
320:
321: public synchronized void objectsNotFoundFor(SessionID sessionID,
322: long batchID, Set missingOIDs) {
323: waitUntilRunning();
324: if (!sessionManager.isCurrentSession(sessionID)) {
325: logger.warn("Ignoring Missing Object IDs " + missingOIDs
326: + " from a different session: " + sessionID + ", "
327: + sessionManager);
328: return;
329: }
330: logger.warn("Received Missing Object IDs from server : "
331: + missingOIDs);
332: missingObjectIDs.addAll(missingOIDs);
333: notifyAll();
334: }
335:
336: // Used only for testing
337: synchronized void addObject(DNA dna) {
338: if (!removeObjects.contains(dna.getObjectID()))
339: basicAddObject(dna);
340: notifyAll();
341: }
342:
343: // Used only for testing
344: synchronized int getDNACacheSize() {
345: return lruDNA.size();
346: }
347:
348: private void basicAddObject(DNA dna) {
349: dnaRequests.put(dna.getObjectID(), dna);
350: outstandingObjectRequests.remove(dna.getObjectID());
351: }
352:
353: public synchronized void removed(ObjectID id) {
354: dnaRequests.remove(id);
355: removeObjects.add(id);
356: if (removeObjects.size() >= REMOVE_OBJECTS_THRESHOLD) {
357: ObjectRequestContext ctxt = new ObjectRequestContextImpl(
358: this .cip.getClientID(), new ObjectRequestID(
359: objectRequestIDCounter++),
360: Collections.EMPTY_SET, -1);
361: RequestManagedObjectMessage rmom = createRequestManagedObjectMessage(
362: ctxt, removeObjects);
363: removeObjects = new HashSet(256);
364: rmom.send();
365: }
366: }
367:
368: public class ObjectRequestContextImpl implements
369: ObjectRequestContext {
370:
371: private final long timestamp;
372:
373: private final Set objectIDs;
374:
375: private final ObjectRequestID requestID;
376:
377: private final ClientID clientID;
378:
379: private final int depth;
380:
381: private ObjectRequestContextImpl(ClientID clientID,
382: ObjectRequestID requestID, ObjectID objectID,
383: int depth, ObjectID parentContext) {
384: this (clientID, requestID, new HashSet(), depth);
385: this .objectIDs.add(objectID);
386: // XXX:: This is a hack for now. This parent context could be exposed to the L2 to make it more elegant.
387: if (!parentContext.isNull()) {
388: this .objectIDs.add(parentContext);
389: }
390: }
391:
392: private ObjectRequestContextImpl(ClientID clientID,
393: ObjectRequestID requestID, Set objectIDs, int depth) {
394: this .timestamp = System.currentTimeMillis();
395: this .clientID = clientID;
396: this .requestID = requestID;
397: this .objectIDs = objectIDs;
398: this .depth = depth;
399: }
400:
401: public ClientID getClientID() {
402: return this .clientID;
403: }
404:
405: public ObjectRequestID getRequestID() {
406: return this .requestID;
407: }
408:
409: public Set getObjectIDs() {
410: return this .objectIDs;
411: }
412:
413: public int getRequestDepth() {
414: return this .depth;
415: }
416:
417: public String toString() {
418: return getClass().getName() + "[" + new Date(timestamp)
419: + ", requestID =" + requestID + ", objectIDs ="
420: + objectIDs + ", depth = " + depth + "]";
421: }
422: }
423:
424: private class DNALRU {
425: // TODO:: These two data structure can be merged to one with into a LinkedHashMap with some marker object to
426: // identify buckets
427: private LinkedHashMap dnas = new LinkedHashMap();
428: private HashMap oids2BatchID = new HashMap();
429:
430: public synchronized int size() {
431: return dnas.size();
432: }
433:
434: public synchronized void clear() {
435: dnas.clear();
436: oids2BatchID.clear();
437: }
438:
439: public synchronized void add(long batchID, Collection objs) {
440: Long key = new Long(batchID);
441: Map m = (Map) dnas.get(key);
442: if (m == null) {
443: m = new THashMap(objs.size() * 2, 0.8f);
444: dnas.put(key, m);
445: }
446: for (Iterator i = objs.iterator(); i.hasNext();) {
447: DNA dna = (DNA) i.next();
448: m.put(dna.getObjectID(), dna);
449: oids2BatchID.put(dna.getObjectID(), key);
450: }
451: }
452:
453: public synchronized void remove(ObjectID id) {
454: Long batchID = (Long) oids2BatchID.remove(id);
455: if (batchID != null) {
456: Map m = (Map) dnas.get(batchID);
457: Object dna = m.remove(id);
458: Assert.assertNotNull(dna);
459: if (m.isEmpty()) {
460: dnas.remove(batchID);
461: }
462: }
463: }
464:
465: public synchronized void clearUnrequestedDNA() {
466: if (dnas.size() > MAX_LRU) {
467: Iterator dnaMapIterator = dnas.values().iterator();
468: Map dnaMap = (Map) dnaMapIterator.next();
469: int removedDNACount = dnaMap.size();
470: for (Iterator i = dnaMap.keySet().iterator(); i
471: .hasNext();) {
472: ObjectID id = (ObjectID) i.next();
473: if (!outstandingObjectRequests.containsKey(id)) {
474: // only include this ID in the removed set if this DNA has never left the request map.
475: // If it has left the map, this client is actually be referencing this object
476: if (dnaRequests.containsKey(id)) {
477: removed(id);
478: }
479: }
480: oids2BatchID.remove(id);
481: }
482: dnaMapIterator.remove();
483: if (ENABLE_LOGGING) {
484: logger.info("DNA LRU remove 1 map containing "
485: + removedDNACount + " DNAs");
486: }
487: }
488: }
489: }
490: }
|