001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.tx;
006:
007: import gnu.trove.TLinkable;
008: import gnu.trove.TLinkedList;
009:
010: import java.util.Collection;
011: import java.util.HashSet;
012: import java.util.Iterator;
013: import java.util.LinkedList;
014: import java.util.List;
015: import java.util.Set;
016: import java.util.SortedMap;
017: import java.util.TreeMap;
018:
019: public class TransactionBatchAccounting {
020:
021: private final SortedMap batchesByTransaction = new TreeMap();
022: private final TLinkedList batches = new TLinkedList();
023: private boolean stopped = false;
024: private TransactionID highWaterMark = TransactionID.NULL_ID;
025:
026: public Object dump() {
027: return this .toString();
028: }
029:
030: public String toString() {
031: return "TransactionBatchAccounting[batchesByTransaction="
032: + batchesByTransaction + "]";
033: }
034:
035: public synchronized void addBatch(TxnBatchID batchID,
036: Collection transactionIDs) {
037: if (stopped || transactionIDs.size() == 0)
038: return;
039: BatchDescriptor desc = new BatchDescriptor(batchID,
040: transactionIDs);
041: batches.add(desc);
042: for (Iterator i = transactionIDs.iterator(); i.hasNext();) {
043: TransactionID txID = (TransactionID) i.next();
044: Object removed = batchesByTransaction.put(txID, desc);
045: if (removed != null) {
046: throw new AssertionError(
047: "TransactionID is already accounted for: "
048: + txID + "=>" + removed);
049: }
050: if (highWaterMark.toLong() < txID.toLong()) {
051: highWaterMark = txID;
052: }
053: }
054: }
055:
056: public synchronized TxnBatchID getBatchByTransactionID(
057: TransactionID txID) {
058: BatchDescriptor desc = (BatchDescriptor) batchesByTransaction
059: .get(txID);
060: return desc == null ? TxnBatchID.NULL_BATCH_ID : desc.batchID;
061: }
062:
063: /**
064: * Adds all incomplete transaction batch ids to the given collection in the order they were added. An incomplete
065: * transaction batch is a batch for which not all its constituent transactions have been ACKed.
066: *
067: * @param c The collection to add all incomplete batch ids to
068: * @return The input collection
069: */
070: public synchronized List addIncompleteBatchIDsTo(List c) {
071: for (Iterator i = batches.iterator(); i.hasNext();) {
072: BatchDescriptor desc = (BatchDescriptor) i.next();
073: c.add(desc.batchID);
074: }
075: return c;
076: }
077:
078: public synchronized TxnBatchID getMinIncompleteBatchID() {
079: return batches.isEmpty() ? TxnBatchID.NULL_BATCH_ID
080: : ((BatchDescriptor) batches.getFirst()).batchID;
081: }
082:
083: public synchronized TxnBatchID acknowledge(TransactionID txID) {
084: if (stopped)
085: return TxnBatchID.NULL_BATCH_ID;
086: final TxnBatchID completed;
087: final BatchDescriptor desc = (BatchDescriptor) batchesByTransaction
088: .remove(txID);
089: if (desc == null)
090: throw new AssertionError("Batch not found for " + txID);
091: if (desc.acknowledge(txID) == 0) {
092: // completedGlobalTransactionIDs.addAll(desc.globalTransactionIDs);
093: batches.remove(desc);
094: completed = desc.batchID;
095: } else {
096: completed = TxnBatchID.NULL_BATCH_ID;
097: }
098: if (batches.size() == 0 && batchesByTransaction.size() > 0) {
099: throw new AssertionError(
100: "Batches list and batchesByTransaction map aren't zero at the same time");
101: }
102: return completed;
103: }
104:
105: public synchronized TransactionID getLowWaterMark() {
106: if (batchesByTransaction.isEmpty()) {
107: if (highWaterMark == TransactionID.NULL_ID) {
108: return TransactionID.NULL_ID;
109: } else {
110: // Low water mark should be set to the next valid lowwatermark, so that transactions are cleared correctly in
111: // the server
112: return highWaterMark.next();
113: }
114: } else {
115: return (TransactionID) batchesByTransaction.firstKey();
116: }
117: }
118:
119: public synchronized void clear() {
120: batches.clear();
121: batchesByTransaction.clear();
122: }
123:
124: private static final class BatchDescriptor implements TLinkable {
125: private final TxnBatchID batchID;
126: private final Set transactionIDs = new HashSet();
127:
128: private TLinkable next;
129: private TLinkable previous;
130:
131: public BatchDescriptor(TxnBatchID batchID, Collection txIDs) {
132: this .batchID = batchID;
133: transactionIDs.addAll(txIDs);
134: }
135:
136: public String toString() {
137: return "BatchDescriptor[" + batchID + ", transactionIDs="
138: + transactionIDs + "]";
139: }
140:
141: public int acknowledge(TransactionID txID) {
142: transactionIDs.remove(txID);
143: return transactionIDs.size();
144: }
145:
146: public TLinkable getNext() {
147: return next;
148: }
149:
150: public TLinkable getPrevious() {
151: return previous;
152: }
153:
154: public void setNext(TLinkable linkable) {
155: next = linkable;
156: }
157:
158: public void setPrevious(TLinkable linkable) {
159: previous = linkable;
160: }
161: }
162:
163: /**
164: * This is used for testing.
165: */
166: public synchronized List addIncompleteTransactionIDsTo(
167: LinkedList list) {
168: list.addAll(batchesByTransaction.keySet());
169: return list;
170: }
171:
172: /**
173: * Ignores all further modifier calls. This is used for testing to stop shutdown hooks from hanging.
174: */
175: public synchronized void stop() {
176: this .stopped = true;
177: }
178:
179: }
|