001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.tx;
006:
007: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
008:
009: import com.tc.logging.TCLogger;
010: import com.tc.logging.TCLogging;
011: import com.tc.object.ObjectID;
012: import com.tc.object.tx.ServerTransactionID;
013: import com.tc.objectserver.api.ObjectManager;
014: import com.tc.objectserver.api.ObjectManagerLookupResults;
015: import com.tc.objectserver.context.ApplyTransactionContext;
016: import com.tc.objectserver.context.CommitTransactionContext;
017: import com.tc.objectserver.context.ObjectManagerResultsContext;
018: import com.tc.objectserver.context.RecallObjectsContext;
019: import com.tc.objectserver.gtx.ServerGlobalTransactionManager;
020: import com.tc.properties.TCPropertiesImpl;
021: import com.tc.text.PrettyPrintable;
022: import com.tc.text.PrettyPrinter;
023: import com.tc.util.Assert;
024:
025: import java.io.PrintWriter;
026: import java.util.ArrayList;
027: import java.util.Collection;
028: import java.util.Collections;
029: import java.util.HashMap;
030: import java.util.HashSet;
031: import java.util.IdentityHashMap;
032: import java.util.Iterator;
033: import java.util.LinkedHashMap;
034: import java.util.List;
035: import java.util.Map;
036: import java.util.Set;
037: import java.util.Map.Entry;
038:
039: /**
040: * This class keeps track of locally checked out objects for applys and maintain the objects to txnid mapping in the
041: * server. It wraps calls going to object manager from lookup, apply, commit stages
042: */
043: public class TransactionalObjectManagerImpl implements
044: TransactionalObjectManager, PrettyPrintable {
045:
046: private static final TCLogger logger = TCLogging
047: .getLogger(TransactionalObjectManagerImpl.class);
048: private static final int MAX_COMMIT_SIZE = TCPropertiesImpl
049: .getProperties().getInt(
050: "l2.objectmanager.maxObjectsToCommit");
051: private final ObjectManager objectManager;
052: private final TransactionSequencer sequencer;
053: private final ServerGlobalTransactionManager gtxm;
054:
055: /*
056: * This map contains ObjectIDs to TxnObjectGrouping that contains these objects
057: */
058: private final Map checkedOutObjects = new HashMap();
059: private final Map applyPendingTxns = new HashMap();
060: private final LinkedHashMap commitPendingTxns = new LinkedHashMap();
061:
062: private final Set pendingObjectRequest = new HashSet();
063: private final PendingList pendingTxnList = new PendingList();
064: private final LinkedQueue processedPendingLookups = new LinkedQueue();
065: private final LinkedQueue processedApplys = new LinkedQueue();
066:
067: private final TransactionalStageCoordinator txnStageCoordinator;
068:
069: public TransactionalObjectManagerImpl(ObjectManager objectManager,
070: TransactionSequencer sequencer,
071: ServerGlobalTransactionManager gtxm,
072: TransactionalStageCoordinator txnStageCoordinator) {
073: this .objectManager = objectManager;
074: this .sequencer = sequencer;
075: this .gtxm = gtxm;
076: this .txnStageCoordinator = txnStageCoordinator;
077: }
078:
079: // ProcessTransactionHandler Method
080: public void addTransactions(Collection txns) {
081: sequencer.addTransactions(txns);
082: txnStageCoordinator.initiateLookup();
083: }
084:
085: // LookupHandler Method
086: public void lookupObjectsForTransactions() {
087: processPendingIfNecessary();
088: while (true) {
089: ServerTransaction txn = sequencer.getNextTxnToProcess();
090: if (txn == null)
091: break;
092: ServerTransactionID stxID = txn.getServerTransactionID();
093: if (gtxm.initiateApply(stxID)) {
094: lookupObjectsForApplyAndAddToSink(txn, true);
095: } else {
096: // These txns are already applied, hence just sending it to the next stage.
097: txnStageCoordinator
098: .addToApplyStage(new ApplyTransactionContext(
099: txn));
100: }
101: }
102: }
103:
104: private synchronized void processPendingIfNecessary() {
105: if (addProcessedPendingLookups()) {
106: processPendingTransactions();
107: }
108: }
109:
110: public synchronized void lookupObjectsForApplyAndAddToSink(
111: ServerTransaction txn, boolean newTxn) {
112: Collection oids = txn.getObjectIDs();
113: // log("lookupObjectsForApplyAndAddToSink(): START : " + txn.getServerTransactionID() + " : " + oids);
114: Set newRequests = new HashSet();
115: boolean makePending = false;
116: for (Iterator i = oids.iterator(); i.hasNext();) {
117: ObjectID oid = (ObjectID) i.next();
118: TxnObjectGrouping tog;
119: if (pendingObjectRequest.contains(oid)) {
120: makePending = true;
121: } else if ((tog = (TxnObjectGrouping) checkedOutObjects
122: .get(oid)) == null) {
123: // 1) Object is not already checked out or
124: newRequests.add(oid);
125: } else if (tog.limitReached()) {
126: // 2) the object is available, but we dont use it to prevent huge commits, large txn acks etc
127: newRequests.add(oid);
128: // log(shortDescription());
129: // log("Limit Reached. " + oid + " - " + tog.shortDescription());
130: }
131: }
132: // TODO:: make cache and stats right
133: LookupContext lookupContext = null;
134: if (!newRequests.isEmpty()) {
135: lookupContext = new LookupContext(newRequests,
136: (newTxn ? txn.getNewObjectIDs()
137: : Collections.EMPTY_SET), txn
138: .getServerTransactionID());
139: if (objectManager.lookupObjectsFor(txn.getSourceID(),
140: lookupContext)) {
141: addLookedupObjects(lookupContext);
142: } else {
143: // New request went pending in object manager
144: // log("lookupObjectsForApplyAndAddToSink(): New Request went pending : " + newRequests);
145: makePending = true;
146: pendingObjectRequest.addAll(newRequests);
147: }
148: }
149: if (makePending) {
150: // log("lookupObjectsForApplyAndAddToSink(): Make Pending : " + txn.getServerTransactionID());
151: makePending(txn);
152: if (lookupContext != null)
153: lookupContext.makePending();
154: } else {
155: ServerTransactionID txnID = txn.getServerTransactionID();
156: TxnObjectGrouping newGrouping = new TxnObjectGrouping(
157: txnID, txn.getNewRoots());
158: mergeTransactionGroupings(oids, newGrouping);
159: applyPendingTxns.put(txnID, newGrouping);
160: txnStageCoordinator
161: .addToApplyStage(new ApplyTransactionContext(txn,
162: getRequiredObjectsMap(oids, newGrouping
163: .getObjects())));
164: makeUnpending(txn);
165: // log("lookupObjectsForApplyAndAddToSink(): Success: " + txn.getServerTransactionID());
166: }
167: }
168:
169: public String shortDescription() {
170: return "TxnObjectManager : checked Out count = "
171: + checkedOutObjects.size() + " apply pending txn = "
172: + applyPendingTxns.size() + " commit pending = "
173: + commitPendingTxns.size() + " pending txns = "
174: + pendingTxnList.size() + " pending object requests = "
175: + pendingObjectRequest.size();
176: }
177:
178: private Map getRequiredObjectsMap(Collection oids, Map objects) {
179: HashMap map = new HashMap(oids.size());
180: for (Iterator i = oids.iterator(); i.hasNext();) {
181: Object oid = i.next();
182: Object mo = objects.get(oid);
183: if (mo == null) {
184: dump();
185: log("NULL !! " + oid + " not found ! " + oids);
186: log("Map contains " + objects);
187: throw new AssertionError("Object is NULL !! : " + oid);
188: }
189: map.put(oid, mo);
190: }
191: return map;
192: }
193:
194: private void log(String message) {
195: logger.info(message);
196: }
197:
198: // This method written to be optimized to perform large merges fast. Hence the code flow might not
199: // look natural.
200: private void mergeTransactionGroupings(Collection oids,
201: TxnObjectGrouping newGrouping) {
202: long start = System.currentTimeMillis();
203: for (Iterator i = oids.iterator(); i.hasNext();) {
204: ObjectID oid = (ObjectID) i.next();
205: TxnObjectGrouping oldGrouping = (TxnObjectGrouping) checkedOutObjects
206: .get(oid);
207: if (oldGrouping == null) {
208: throw new AssertionError(
209: "Transaction Grouping for lookedup objects is Null !! "
210: + oid);
211: } else if (oldGrouping != newGrouping
212: && oldGrouping.isActive()) {
213: ServerTransactionID oldTxnId = oldGrouping
214: .getServerTransactionID();
215: // This merge has a sideeffect of setting all reference contained in oldGrouping to null.
216: newGrouping.merge(oldGrouping);
217: commitPendingTxns.remove(oldTxnId);
218: }
219: }
220: for (Iterator j = newGrouping.getObjects().keySet().iterator(); j
221: .hasNext();) {
222: checkedOutObjects.put(j.next(), newGrouping);
223: }
224: for (Iterator j = newGrouping.getApplyPendingTxnsIterator(); j
225: .hasNext();) {
226: ServerTransactionID oldTxnId = (ServerTransactionID) j
227: .next();
228: if (applyPendingTxns.containsKey(oldTxnId)) {
229: applyPendingTxns.put(oldTxnId, newGrouping);
230: }
231: }
232: long timeTaken = System.currentTimeMillis() - start;
233: if (timeTaken > 500) {
234: log("Merged " + oids.size() + " object into "
235: + newGrouping.shortDescription() + " in "
236: + timeTaken + " ms");
237: }
238: }
239:
240: private synchronized void addLookedupObjects(LookupContext context) {
241: Map lookedUpObjects = context.getLookedUpObjects();
242: if (lookedUpObjects == null || lookedUpObjects.isEmpty()) {
243: throw new AssertionError("Lookedup object is null : "
244: + lookedUpObjects + " context = " + context);
245: }
246: TxnObjectGrouping tg = new TxnObjectGrouping(lookedUpObjects);
247: for (Iterator i = lookedUpObjects.keySet().iterator(); i
248: .hasNext();) {
249: Object oid = i.next();
250: pendingObjectRequest.remove(oid);
251: checkedOutObjects.put(oid, tg);
252: }
253: }
254:
255: private void makePending(ServerTransaction txn) {
256: if (pendingTxnList.add(txn)) {
257: sequencer.makePending(txn);
258: }
259: }
260:
261: private void makeUnpending(ServerTransaction txn) {
262: if (pendingTxnList.remove(txn)) {
263: sequencer.makeUnpending(txn);
264: }
265: }
266:
267: private boolean addProcessedPendingLookups() {
268: LookupContext c;
269: boolean processedPending = false;
270: try {
271: while ((c = (LookupContext) processedPendingLookups.poll(0)) != null) {
272: addLookedupObjects(c);
273: processedPending = true;
274: }
275: } catch (InterruptedException e) {
276: throw new AssertionError(e);
277: }
278: return processedPending;
279: }
280:
281: private void addProcessedPending(LookupContext context) {
282: try {
283: processedPendingLookups.put(context);
284: } catch (InterruptedException e) {
285: throw new AssertionError(e);
286: }
287: txnStageCoordinator.initiateLookup();
288: }
289:
290: private void processPendingTransactions() {
291: List copy = pendingTxnList.copy();
292: for (Iterator i = copy.iterator(); i.hasNext();) {
293: ServerTransaction txn = (ServerTransaction) i.next();
294: lookupObjectsForApplyAndAddToSink(txn, false);
295: }
296: }
297:
298: // ApplyTransaction stage method
299: public boolean applyTransactionComplete(ServerTransactionID stxnID) {
300: try {
301: processedApplys.put(stxnID);
302: } catch (InterruptedException e) {
303: throw new AssertionError(e);
304: }
305: txnStageCoordinator.initiateApplyComplete();
306: return true;
307: }
308:
309: // Apply Complete stage method
310: public void processApplyComplete() {
311: try {
312: ServerTransactionID txnID;
313: ArrayList txnIDs = new ArrayList();
314: while ((txnID = (ServerTransactionID) processedApplys
315: .poll(0)) != null) {
316: txnIDs.add(txnID);
317: }
318: if (txnIDs.size() > 0) {
319: processApplyTxnComplete(txnIDs);
320: }
321: } catch (InterruptedException e) {
322: throw new AssertionError(e);
323: }
324: }
325:
326: private synchronized void processApplyTxnComplete(ArrayList txnIDs) {
327: for (Iterator i = txnIDs.iterator(); i.hasNext();) {
328: ServerTransactionID stxnID = (ServerTransactionID) i.next();
329: processApplyTxnComplete(stxnID);
330: }
331: }
332:
333: private void processApplyTxnComplete(ServerTransactionID stxnID) {
334: TxnObjectGrouping grouping = (TxnObjectGrouping) applyPendingTxns
335: .remove(stxnID);
336: Assert.assertNotNull(grouping);
337: if (grouping.applyComplete(stxnID)) {
338: // Since verifying against all txns is costly, only the prime one (the one that created this grouping) is verfied
339: // against
340: ServerTransactionID pTxnID = grouping
341: .getServerTransactionID();
342: Assert.assertNull(applyPendingTxns.get(pTxnID));
343: Object old = commitPendingTxns.put(pTxnID, grouping);
344: Assert.assertNull(old);
345: txnStageCoordinator.initiateCommit();
346: }
347: }
348:
349: // Commit Transaction stage method
350: public synchronized void commitTransactionsComplete(
351: CommitTransactionContext ctc) {
352:
353: if (commitPendingTxns.isEmpty())
354: return;
355:
356: Map newRoots = new HashMap();
357: Map objects = new HashMap();
358: Collection txnIDs = new ArrayList();
359: for (Iterator i = commitPendingTxns.values().iterator(); i
360: .hasNext();) {
361: TxnObjectGrouping tog = (TxnObjectGrouping) i.next();
362: newRoots.putAll(tog.getNewRoots());
363: txnIDs.addAll(tog.getTxnIDs());
364: objects.putAll(tog.getObjects());
365: i.remove();
366: if (objects.size() > MAX_COMMIT_SIZE) {
367: break;
368: }
369: }
370:
371: ctc.initialize(txnIDs, objects.values(), newRoots);
372:
373: for (Iterator j = objects.keySet().iterator(); j.hasNext();) {
374: Object old = checkedOutObjects.remove(j.next());
375: Assert.assertNotNull(old);
376: }
377:
378: if (!commitPendingTxns.isEmpty()) {
379: // More commits needed
380: txnStageCoordinator.initiateCommit();
381: }
382: }
383:
384: // recall from ObjectManager on GC start
385: public void recallAllCheckedoutObject() {
386: txnStageCoordinator.initiateRecallAll();
387: }
388:
389: // Recall Stage method
390: public synchronized void recallCheckedoutObject(
391: RecallObjectsContext roc) {
392: processPendingIfNecessary();
393: if (roc.recallAll()) {
394: IdentityHashMap recalled = new IdentityHashMap();
395: HashMap recalledObjects = new HashMap();
396: for (Iterator i = checkedOutObjects.entrySet().iterator(); i
397: .hasNext();) {
398: Entry e = (Entry) i.next();
399: TxnObjectGrouping tog = (TxnObjectGrouping) e
400: .getValue();
401: if (tog.getServerTransactionID().isNull()) {
402: i.remove();
403: if (!recalled.containsKey(tog)) {
404: recalled.put(tog, null);
405: recalledObjects.putAll(tog.getObjects());
406: }
407: }
408: }
409: if (!recalledObjects.isEmpty()) {
410: logger.info("Recalling " + recalledObjects.size()
411: + " Objects to ObjectManager");
412: objectManager.releaseAll(recalledObjects.values());
413: }
414: }
415: }
416:
417: public void dump() {
418: PrintWriter pw = new PrintWriter(System.err);
419: new PrettyPrinter(pw).visit(this );
420: pw.flush();
421: }
422:
423: public synchronized PrettyPrinter prettyPrint(PrettyPrinter out) {
424: out.println(getClass().getName());
425: out.indent().print("checkedOutObjects: ").visit(
426: checkedOutObjects).println();
427: out.indent().print("applyPendingTxns: ")
428: .visit(applyPendingTxns).println();
429: out.indent().print("commitPendingTxns: ").visit(
430: commitPendingTxns).println();
431: out.indent().print("pendingTxnList: ").visit(pendingTxnList)
432: .println();
433: out.indent().print("pendingObjectRequest: ").visit(
434: pendingObjectRequest).println();
435: return out;
436: }
437:
438: private class LookupContext implements ObjectManagerResultsContext {
439:
440: private boolean pending = false;
441: private boolean resultsSet = false;
442: private Map lookedUpObjects;
443: private final Set oids;
444: private final Set newOids;
445: private final ServerTransactionID transactionID;
446:
447: public LookupContext(Set oids, Set newOids,
448: ServerTransactionID transactionID) {
449: this .oids = oids;
450: this .newOids = newOids;
451: this .transactionID = transactionID;
452: }
453:
454: public synchronized void makePending() {
455: pending = true;
456: if (resultsSet) {
457: TransactionalObjectManagerImpl.this
458: .addProcessedPending(this );
459: }
460: }
461:
462: public synchronized void setResults(
463: ObjectManagerLookupResults results) {
464: lookedUpObjects = results.getObjects();
465: resultsSet = true;
466: if (pending) {
467: TransactionalObjectManagerImpl.this
468: .addProcessedPending(this );
469: }
470: }
471:
472: public synchronized Map getLookedUpObjects() {
473: return lookedUpObjects;
474: }
475:
476: public String toString() {
477: return "LookupContext [ txnID = "
478: + transactionID
479: + ", oids = "
480: + oids
481: + "] = { pending = "
482: + pending
483: + ", lookedupObjects = "
484: + (lookedUpObjects == null ? "null"
485: : lookedUpObjects.keySet().toString())
486: + "}";
487: }
488:
489: public Set getLookupIDs() {
490: return oids;
491: }
492:
493: public Set getNewObjectIDs() {
494: return newOids;
495: }
496:
497: public void missingObject(ObjectID oid) {
498: throw new AssertionError(
499: "Lookup for non-exisistent Object : " + oid
500: + " lookup context is : " + this );
501: }
502:
503: }
504:
505: private static final class PendingList {
506: LinkedHashMap pending = new LinkedHashMap();
507:
508: public boolean add(ServerTransaction txn) {
509: ServerTransactionID sTxID = txn.getServerTransactionID();
510: // Doing two lookups to avoid reordering
511: if (pending.containsKey(sTxID)) {
512: return false;
513: } else {
514: pending.put(sTxID, txn);
515: return true;
516: }
517: }
518:
519: public List copy() {
520: return new ArrayList(pending.values());
521: }
522:
523: public boolean remove(ServerTransaction txn) {
524: return (pending.remove(txn.getServerTransactionID()) != null);
525: }
526:
527: public String toString() {
528: return "PendingList : pending Txns = " + pending;
529: }
530:
531: public int size() {
532: return pending.size();
533: }
534: }
535: }
|