001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.persistence.impl;
006:
007: import com.tc.logging.TCLogger;
008: import com.tc.logging.TCLogging;
009: import com.tc.net.groups.NodeID;
010: import com.tc.object.gtx.GlobalTransactionID;
011: import com.tc.object.tx.ServerTransactionID;
012: import com.tc.objectserver.gtx.GlobalTransactionDescriptor;
013: import com.tc.objectserver.gtx.ServerTransactionIDBookKeeper;
014: import com.tc.objectserver.persistence.api.PersistenceTransaction;
015: import com.tc.objectserver.persistence.api.TransactionPersistor;
016: import com.tc.objectserver.persistence.api.TransactionStore;
017: import com.tc.util.sequence.Sequence;
018:
019: import java.util.ArrayList;
020: import java.util.Collection;
021: import java.util.Collections;
022: import java.util.Iterator;
023: import java.util.List;
024: import java.util.Map;
025: import java.util.Set;
026: import java.util.SortedMap;
027: import java.util.TreeMap;
028:
029: public class TransactionStoreImpl implements TransactionStore {
030:
031: private static final TCLogger logger = TCLogging
032: .getLogger(TransactionStoreImpl.class);
033:
034: private final ServerTransactionIDBookKeeper sids = new ServerTransactionIDBookKeeper();
035: private final SortedMap ids = Collections
036: .synchronizedSortedMap(new TreeMap());
037: private final TransactionPersistor persistor;
038: private final Sequence globalIDSequence;
039:
040: public TransactionStoreImpl(TransactionPersistor persistor,
041: Sequence globalIDSequence) {
042: this .persistor = persistor;
043: this .globalIDSequence = globalIDSequence;
044: // We don't want to hit the DB (globalIDsequence) until all the stages are started.
045: for (Iterator i = this .persistor
046: .loadAllGlobalTransactionDescriptors().iterator(); i
047: .hasNext();) {
048: GlobalTransactionDescriptor gtx = (GlobalTransactionDescriptor) i
049: .next();
050: basicAdd(gtx);
051: gtx.commitComplete();
052: }
053: }
054:
055: public void commitAllTransactionDescriptor(
056: PersistenceTransaction transaction, Collection stxIDs) {
057: for (Iterator i = stxIDs.iterator(); i.hasNext();) {
058: ServerTransactionID stxnID = (ServerTransactionID) i.next();
059: GlobalTransactionDescriptor gtx = this .sids.get(stxnID);
060: if (stxnID.isServerGeneratedTransaction()) {
061: // XXX:: Since server Generated Transactions dont get completed acks, we dont persist these
062: this .sids.remove(stxnID);
063: this .ids.remove(gtx.getGlobalTransactionID());
064: } else {
065: persistor.saveGlobalTransactionDescriptor(transaction,
066: gtx);
067: gtx.commitComplete();
068: }
069: }
070: }
071:
072: // used only in tests
073: public void commitTransactionDescriptor(
074: PersistenceTransaction transaction,
075: ServerTransactionID stxID) {
076: ArrayList stxIDs = new ArrayList(1);
077: stxIDs.add(stxID);
078: commitAllTransactionDescriptor(transaction, stxIDs);
079: }
080:
081: public GlobalTransactionDescriptor getTransactionDescriptor(
082: ServerTransactionID serverTransactionID) {
083: return this .sids.get(serverTransactionID);
084: }
085:
086: public GlobalTransactionDescriptor getOrCreateTransactionDescriptor(
087: ServerTransactionID serverTransactionID) {
088: GlobalTransactionDescriptor rv = sids.get(serverTransactionID);
089: if (rv == null) {
090: rv = new GlobalTransactionDescriptor(serverTransactionID,
091: getNextGlobalTransactionID());
092: basicAdd(rv);
093: }
094: return rv;
095: }
096:
097: private GlobalTransactionID getNextGlobalTransactionID() {
098: return new GlobalTransactionID(this .globalIDSequence.next());
099: }
100:
101: private void basicAdd(GlobalTransactionDescriptor gtx) {
102: basicAdd(gtx, false);
103: }
104:
105: private void basicAdd(GlobalTransactionDescriptor gtx,
106: boolean allowRemapping) {
107: ServerTransactionID sid = gtx.getServerTransactionID();
108: GlobalTransactionID gid = gtx.getGlobalTransactionID();
109: GlobalTransactionDescriptor prevDesc = this .sids.add(sid, gtx);
110: if (prevDesc != null) {
111: if (allowRemapping) {
112: // This can happen in the 3'rd Passive when the active crashes and the 2'nd passive takes over. Some
113: // transactions that arrived to the 3'rd passive might not have arrived at the 2'nd passive and when the 2'nd
114: // becomes active, it might assign a new GID to the same transaction previsously known to the 3'rd with a
115: // different GID. It needs to reconsile. It is ok to just remove it from ids since even if it was commited, when
116: // completeTxns arrive, it will be removed from the DB. If the 3'rd passive crashes before that, when it come
117: // back up the DB is wiped out.
118: ids.remove(prevDesc.getGlobalTransactionID());
119: gtx.saveStateFrom(prevDesc);
120: logger.warn("Remapped new desc " + gtx
121: + " for the same SID. old = " + prevDesc);
122: } else {
123: throw new AssertionError(
124: "Adding new mapping for old txn IDs : " + gtx
125: + " Prev desc = " + prevDesc);
126: }
127: }
128: if (!gid.isNull()) {
129: ids.put(gid, gtx);
130: }
131: }
132:
133: public GlobalTransactionID getLeastGlobalTransactionID() {
134: synchronized (ids) {
135: return (GlobalTransactionID) ((ids.isEmpty()) ? GlobalTransactionID.NULL_ID
136: : ids.firstKey());
137: }
138: }
139:
140: /**
141: * This method clears the server transaction ids less than the low water mark, for that particular node.
142: */
143: public void clearCommitedTransactionsBelowLowWaterMark(
144: PersistenceTransaction tx, ServerTransactionID stxIDs) {
145: Collection gidDescs = sids
146: .clearCommitedSidsBelowLowWaterMark(stxIDs);
147: removeGlobalTransacionDescs(gidDescs, tx);
148: }
149:
150: public void shutdownNode(PersistenceTransaction tx, NodeID nid) {
151: Collection gidDescs = sids.removeAll(nid);
152: logger.info("shutdownClient() : Removing txns from DB : "
153: + gidDescs.size());
154: removeGlobalTransacionDescs(gidDescs, tx);
155: }
156:
157: /**
158: * Global Tranasaction descriptors should have been deleted from sids datastructure, before this call.
159: */
160: private void removeGlobalTransacionDescs(Collection gidDescs,
161: PersistenceTransaction tx) {
162: for (Iterator i = gidDescs.iterator(); i.hasNext();) {
163: GlobalTransactionDescriptor gd = (GlobalTransactionDescriptor) i
164: .next();
165: ids.remove(gd.getGlobalTransactionID());
166: }
167: if (!gidDescs.isEmpty()) {
168: persistor.deleteAllGlobalTransactionDescriptors(tx,
169: gidDescs);
170: }
171: }
172:
173: public void shutdownAllClientsExcept(PersistenceTransaction tx,
174: Set cids) {
175: Collection gidDescs = sids.removeAllExcept(cids);
176: logger
177: .info("shutdownAllClientsExcept() : Removing txns from DB : "
178: + gidDescs.size());
179: removeGlobalTransacionDescs(gidDescs, tx);
180: }
181:
182: // Used in Passive server
183: public void createGlobalTransactionDescIfNeeded(
184: ServerTransactionID stxnID,
185: GlobalTransactionID globalTransactionID) {
186: GlobalTransactionDescriptor rv = new GlobalTransactionDescriptor(
187: stxnID, globalTransactionID);
188: basicAdd(rv, true);
189: }
190:
191: // Used in Passive server
192: public void clearCommitedTransactionsBelowLowWaterMark(
193: PersistenceTransaction tx, GlobalTransactionID lowWaterMark) {
194: List toRemove = new ArrayList(100);
195: synchronized (ids) {
196: Map lowerThanLWM = ids.headMap(lowWaterMark);
197: for (Iterator i = lowerThanLWM.values().iterator(); i
198: .hasNext();) {
199: GlobalTransactionDescriptor gd = (GlobalTransactionDescriptor) i
200: .next();
201: if (gd.complete()) {
202: i.remove();
203: sids.remove(gd.getServerTransactionID());
204: toRemove.add(gd);
205: }
206: }
207: }
208: if (!toRemove.isEmpty()) {
209: persistor.deleteAllGlobalTransactionDescriptors(tx,
210: toRemove);
211: }
212: }
213: }
|