001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.tx;
006:
007: import com.tc.logging.TCLogger;
008: import com.tc.logging.TCLogging;
009: import com.tc.net.groups.NodeID;
010: import com.tc.object.tx.TransactionID;
011: import com.tc.util.Assert;
012:
013: import java.util.HashMap;
014: import java.util.Map;
015:
016: public class TransactionBatchManagerImpl implements
017: TransactionBatchManager {
018:
019: private static final TCLogger logger = TCLogging
020: .getLogger(TransactionBatchManagerImpl.class);
021:
022: private final Map map = new HashMap();
023:
024: public synchronized void defineBatch(NodeID nid, int numTxns) {
025: BatchStats batchStats = getOrCreateStats(nid);
026: batchStats.defineBatch(numTxns);
027: }
028:
029: private BatchStats getOrCreateStats(NodeID nid) {
030: BatchStats bs = (BatchStats) map.get(nid);
031: if (bs == null) {
032: bs = new BatchStats(nid);
033: map.put(nid, bs);
034: }
035: return bs;
036: }
037:
038: public synchronized boolean batchComponentComplete(NodeID nid,
039: TransactionID txnID) {
040: BatchStats bs = (BatchStats) map.get(nid);
041: Assert.assertNotNull(bs);
042: return bs.batchComplete(txnID);
043: }
044:
045: public synchronized void shutdownNode(NodeID nodeID) {
046: BatchStats bs = (BatchStats) map.get(nodeID);
047: if (bs != null) {
048: bs.shutdownNode();
049: }
050: }
051:
052: private void cleanUp(NodeID nodeID) {
053: map.remove(nodeID);
054: }
055:
056: public class BatchStats {
057: private final NodeID nodeID;
058:
059: private int batchCount;
060: private int txnCount;
061: private float avg;
062:
063: private boolean killed = false;
064:
065: public BatchStats(NodeID nid) {
066: this .nodeID = nid;
067: }
068:
069: public void defineBatch(int numTxns) {
070: long adjustedTotal = (long) (batchCount * avg) + numTxns;
071: txnCount += numTxns;
072: batchCount++;
073: avg = adjustedTotal / batchCount;
074: if (false)
075: log_stats();
076: }
077:
078: private void log_stats() {
079: logger.info(nodeID + " : Batch Stats : batch count = "
080: + batchCount + " txnCount = " + txnCount
081: + " avg = " + avg);
082: }
083:
084: private void log_stats(float thresh) {
085: logger.info(nodeID + " : Batch Stats : batch count = "
086: + batchCount + " txnCount = " + txnCount
087: + " avg = " + avg + " threshold = " + thresh);
088: }
089:
090: public boolean batchComplete(TransactionID txnID) {
091: txnCount--;
092: if (killed) {
093: // return true only when all txns are acked. Note new batches may still be in network read queue
094: if (txnCount == 0) {
095: cleanUp(nodeID);
096: return true;
097: } else {
098: return false;
099: }
100: }
101: float threshold = (avg * (batchCount - 1));
102:
103: if (false)
104: log_stats(threshold);
105:
106: if (txnCount <= threshold) {
107: batchCount--;
108: return true;
109: } else {
110: return false;
111: }
112: }
113:
114: public void shutdownNode() {
115: this .killed = true;
116: if (txnCount == 0) {
117: cleanUp(nodeID);
118: }
119: }
120: }
121: }
|