001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.tx;
006:
007: import com.tc.net.groups.NodeID;
008: import com.tc.object.tx.ServerTransactionID;
009: import com.tc.object.tx.TransactionID;
010: import com.tc.util.Assert;
011:
012: import java.util.Collections;
013: import java.util.HashMap;
014: import java.util.HashSet;
015: import java.util.Iterator;
016: import java.util.Map;
017: import java.util.Set;
018: import java.util.Map.Entry;
019:
020: /**
021: * An account of the state of a given transaction. Keeps track of the initiating client, the state of the transaction
022: * (applied, committed, etc), clients the transaction has been broadcast to, and clients that have ACKed the
023: * transaction.
024: */
025: public class TransactionAccountImpl implements TransactionAccount {
026: final NodeID sourceID;
027: private final Map waitees = Collections
028: .synchronizedMap(new HashMap());
029: private boolean dead = false;
030: private CallBackOnComplete callBack;
031:
032: public TransactionAccountImpl(NodeID source) {
033: this .sourceID = source;
034: }
035:
036: public String toString() {
037: return "TransactionAccount[" + sourceID + ": waitees="
038: + waitees + "]\n";
039: }
040:
041: public NodeID getNodeID() {
042: return sourceID;
043: }
044:
045: public void incommingTransactions(Set txnIDs) {
046: Assert.assertFalse(dead);
047: for (Iterator i = txnIDs.iterator(); i.hasNext();) {
048: ServerTransactionID stxnID = (ServerTransactionID) i.next();
049: createRecord(stxnID.getClientTransactionID());
050: }
051: }
052:
053: private void createRecord(TransactionID txnID) {
054: Object old = waitees.put(txnID, new TransactionRecord());
055: Assert.assertNull(old);
056: }
057:
058: /*
059: * returns true if completed, false if not completed or if the client has sent a duplicate ACK.
060: */
061: public boolean removeWaitee(NodeID waitee, TransactionID requestID) {
062: TransactionRecord transactionRecord = getRecord(requestID);
063:
064: if (transactionRecord == null)
065: return false;
066: synchronized (transactionRecord) {
067: transactionRecord.remove(waitee);
068: return checkCompletedAndRemove(requestID, transactionRecord);
069: }
070: }
071:
072: public void addWaitee(NodeID waitee, TransactionID requestID) {
073: TransactionRecord record = getRecord(requestID);
074: synchronized (record) {
075: boolean added = record.addWaitee(waitee);
076: Assert.eval(added);
077: }
078: }
079:
080: private TransactionRecord getRecord(TransactionID requestID) {
081: return (TransactionRecord) waitees.get(requestID);
082: }
083:
084: public boolean skipApplyAndCommit(TransactionID requestID) {
085: TransactionRecord transactionRecord = getRecord(requestID);
086: synchronized (transactionRecord) {
087: transactionRecord.applyAndCommitSkipped();
088: return checkCompletedAndRemove(requestID, transactionRecord);
089: }
090: }
091:
092: public boolean applyCommitted(TransactionID requestID) {
093: TransactionRecord transactionRecord = getRecord(requestID);
094: synchronized (transactionRecord) {
095: transactionRecord.applyCommitted();
096: return checkCompletedAndRemove(requestID, transactionRecord);
097: }
098: }
099:
100: public boolean broadcastCompleted(TransactionID requestID) {
101: TransactionRecord transactionRecord = getRecord(requestID);
102: synchronized (transactionRecord) {
103: transactionRecord.broadcastCompleted();
104: return checkCompletedAndRemove(requestID, transactionRecord);
105: }
106: }
107:
108: public boolean relayTransactionComplete(TransactionID requestID) {
109: TransactionRecord transactionRecord = getRecord(requestID);
110: synchronized (transactionRecord) {
111: transactionRecord.relayTransactionComplete();
112: return checkCompletedAndRemove(requestID, transactionRecord);
113: }
114: }
115:
116: public boolean hasWaitees(TransactionID requestID) {
117: TransactionRecord record = getRecord(requestID);
118: if (record == null)
119: return false;
120: synchronized (record) {
121: return !record.isEmpty();
122: }
123: }
124:
125: public Set requestersWaitingFor(NodeID waitee) {
126: Set requesters = new HashSet();
127: synchronized (waitees) {
128: for (Iterator i = waitees.entrySet().iterator(); i
129: .hasNext();) {
130: Entry e = (Entry) i.next();
131: TransactionRecord record = (TransactionRecord) e
132: .getValue();
133: if (record.contains(waitee)) {
134: TransactionID requester = (TransactionID) e
135: .getKey();
136: requesters.add(requester);
137: }
138: }
139: }
140: return requesters;
141: }
142:
143: private boolean checkCompletedAndRemove(TransactionID requestID,
144: TransactionRecord record) {
145: synchronized (waitees) {
146: if (record.isComplete()) {
147: waitees.remove(requestID);
148: invokeCallBackOnCompleteIfNecessary();
149: return !dead;
150: }
151: return false;
152: }
153: }
154:
155: public void addAllPendingServerTransactionIDsTo(HashSet txnIDs) {
156: synchronized (waitees) {
157: for (Iterator i = waitees.keySet().iterator(); i.hasNext();) {
158: TransactionID txnID = (TransactionID) i.next();
159: txnIDs.add(new ServerTransactionID(sourceID, txnID));
160: }
161: }
162: }
163:
164: public void nodeDead(CallBackOnComplete cb) {
165: synchronized (waitees) {
166: this .callBack = cb;
167: this .dead = true;
168: invokeCallBackOnCompleteIfNecessary();
169: }
170: }
171:
172: private void invokeCallBackOnCompleteIfNecessary() {
173: if (dead && waitees.isEmpty()) {
174: callBack.onComplete(sourceID);
175: }
176: }
177:
178: }
|