001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.tx;
006:
007: import com.tc.logging.TCLogger;
008: import com.tc.logging.TCLogging;
009: import com.tc.net.groups.NodeID;
010: import com.tc.object.gtx.GlobalTransactionID;
011: import com.tc.object.tx.ServerTransactionID;
012: import com.tc.objectserver.gtx.ServerGlobalTransactionManager;
013: import com.tc.util.Assert;
014: import com.tc.util.State;
015:
016: import java.util.ArrayList;
017: import java.util.Collection;
018: import java.util.Iterator;
019: import java.util.LinkedHashMap;
020: import java.util.LinkedList;
021: import java.util.List;
022: import java.util.ListIterator;
023: import java.util.Map;
024: import java.util.Set;
025:
026: /**
027: * The whole purpose of this class is to reorder "resent" transactions so that they are broadcased in the exact same
028: * order as before (the server crash)
029: */
030: public class ResentTransactionSequencer implements
031: ServerTransactionListener {
032:
033: private static final TCLogger logger = TCLogging
034: .getLogger(ResentTransactionSequencer.class);
035:
036: private static final State PASS_THRU = new State("PASS_THRU");
037: private static final State ADD_RESENT = new State("ADD_RESENT");
038: private static final State INCOMING_RESENT = new State(
039: "INCOMING_RESENT");
040:
041: private final TransactionalObjectManager txnObjectManager;
042: private final ServerTransactionManager transactionManager;
043: private final ServerGlobalTransactionManager gtxm;
044: private final List resentTxns = new LinkedList();
045: private final Map pendingTxns = new LinkedHashMap();
046: private State state = PASS_THRU;
047:
048: public ResentTransactionSequencer(
049: ServerTransactionManager transactionManager,
050: ServerGlobalTransactionManager gtxm,
051: TransactionalObjectManager txnObjectManager) {
052: this .transactionManager = transactionManager;
053: this .gtxm = gtxm;
054: this .txnObjectManager = txnObjectManager;
055: }
056:
057: public synchronized void addTransactions(Collection txns) {
058: State lstate = state;
059: if (lstate == PASS_THRU) {
060: txnObjectManager.addTransactions(txns);
061: } else if (lstate == INCOMING_RESENT) {
062: addToPending(txns);
063: processResent();
064: } else {
065: throw new AssertionError("Illegal State : " + state
066: + " resentTxns : " + resentTxns);
067: }
068: }
069:
070: private synchronized void processResent() {
071: ArrayList txns2Process = new ArrayList();
072: for (Iterator i = resentTxns.iterator(); i.hasNext();) {
073: TransactionDesc desc = (TransactionDesc) i.next();
074: ServerTransaction txn = (ServerTransaction) pendingTxns
075: .remove(desc.getServerTransactionID());
076: if (txn != null) {
077: txns2Process.add(txn);
078: i.remove();
079: } else {
080: break;
081: }
082: }
083: if (!txns2Process.isEmpty()) {
084: txnObjectManager.addTransactions(txns2Process);
085: }
086: moveToPassThruIfPossible();
087: }
088:
089: private synchronized void addToPending(Collection txns) {
090: for (Iterator i = txns.iterator(); i.hasNext();) {
091: ServerTransaction txn = (ServerTransaction) i.next();
092: pendingTxns.put(txn.getServerTransactionID(), txn);
093: }
094: }
095:
096: public synchronized void goToActiveMode() {
097: this .transactionManager.addTransactionListener(this );
098: this .state = ADD_RESENT;
099: }
100:
101: public synchronized void transactionManagerStarted(Set cids) {
102: this .state = INCOMING_RESENT;
103: removeAllExceptFrom(cids);
104: moveToPassThruIfPossible();
105: }
106:
107: private void removeAllExceptFrom(Set cids) {
108: for (Iterator i = resentTxns.iterator(); i.hasNext();) {
109: TransactionDesc desc = (TransactionDesc) i.next();
110: if (!cids.contains(desc.getServerTransactionID()
111: .getSourceID())) {
112: logger.warn("Removing " + desc
113: + " because not in startup set " + cids);
114: i.remove();
115: }
116: }
117: }
118:
119: private void moveToPassThruIfPossible() {
120: if (resentTxns.isEmpty()) {
121: this .state = PASS_THRU;
122: clearPending();
123: logger
124: .info("Unregistering ResentTransactionSequencer since no more resent Transactions : "
125: + resentTxns.size());
126: this .transactionManager.removeTransactionListener(this );
127: }
128: }
129:
130: private void clearPending() {
131: txnObjectManager.addTransactions(pendingTxns.values());
132: pendingTxns.clear();
133: }
134:
135: public synchronized void addResentServerTransactionIDs(
136: Collection stxIDs) {
137: Assert.assertEquals(ADD_RESENT, state);
138: for (Iterator i = stxIDs.iterator(); i.hasNext();) {
139: ServerTransactionID stxID = (ServerTransactionID) i.next();
140: GlobalTransactionID gid = gtxm
141: .getGlobalTransactionID(stxID);
142: if (!gid.isNull()) {
143: addOrdered(stxID, gid);
144: }
145: }
146: assertGidsInOrder();
147: logger.info("Resent Txns = " + resentTxns);
148: }
149:
150: private void assertGidsInOrder() {
151: long last = Long.MIN_VALUE;
152: for (Iterator i = resentTxns.iterator(); i.hasNext();) {
153: TransactionDesc desc = (TransactionDesc) i.next();
154: long current = desc.getGlobalTransactionID().toLong();
155: if (current < last) {
156: throw new AssertionError(
157: "Resent TransactionSequence Ordering error : "
158: + resentTxns);
159: }
160: last = current;
161: }
162: }
163:
164: private void addOrdered(ServerTransactionID stxID,
165: GlobalTransactionID gid) {
166: TransactionDesc toAdd = new TransactionDesc(stxID, gid);
167: ListIterator i;
168: // Going from the reverse means less iterations
169: for (i = resentTxns.listIterator(resentTxns.size()); i
170: .hasPrevious();) {
171: TransactionDesc desc = (TransactionDesc) i.previous();
172: if (desc.getGlobalTransactionID().lessThan(
173: toAdd.getGlobalTransactionID())) {
174: i.next(); // move to the right position
175: break;
176: }
177: }
178: i.add(toAdd);
179: }
180:
181: public synchronized void clearAllTransactionsFor(NodeID deadNode) {
182: for (Iterator i = resentTxns.iterator(); i.hasNext();) {
183: TransactionDesc desc = (TransactionDesc) i.next();
184: if (desc.getServerTransactionID().getSourceID().equals(
185: deadNode)) {
186: logger.warn("Removing " + desc + " because " + deadNode
187: + " is dead");
188: i.remove();
189: }
190: }
191: moveToPassThruIfPossible();
192: }
193:
194: public void incomingTransactions(NodeID source, Set serverTxnIDs) {
195: return;
196: }
197:
198: public void transactionApplied(ServerTransactionID stxID) {
199: return;
200: }
201:
202: public void transactionCompleted(ServerTransactionID stxID) {
203: return;
204: }
205:
206: private final static class TransactionDesc {
207:
208: private final ServerTransactionID stxID;
209: private final GlobalTransactionID gid;
210:
211: public TransactionDesc(ServerTransactionID stxID,
212: GlobalTransactionID gid) {
213: this .stxID = stxID;
214: this .gid = gid;
215: }
216:
217: public GlobalTransactionID getGlobalTransactionID() {
218: return gid;
219: }
220:
221: public ServerTransactionID getServerTransactionID() {
222: return stxID;
223: }
224:
225: public String toString() {
226: return "TxnDesc [" + gid + " , " + stxID + "]";
227: }
228:
229: }
230:
231: }
|