001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.gtx;
006:
007: import com.tc.logging.TCLogger;
008: import com.tc.logging.TCLogging;
009: import com.tc.net.groups.NodeID;
010: import com.tc.object.lockmanager.api.LockFlushCallback;
011: import com.tc.object.lockmanager.api.LockID;
012: import com.tc.object.tx.RemoteTransactionManager;
013: import com.tc.object.tx.ServerTransactionID;
014: import com.tc.object.tx.TransactionID;
015:
016: import java.util.Collection;
017: import java.util.HashSet;
018: import java.util.Iterator;
019: import java.util.Map;
020: import java.util.Set;
021: import java.util.SortedMap;
022: import java.util.TreeMap;
023: import java.util.Map.Entry;
024:
025: public class ClientGlobalTransactionManagerImpl implements
026: ClientGlobalTransactionManager {
027:
028: private static final TCLogger logger = TCLogging
029: .getLogger(ClientGlobalTransactionManagerImpl.class);
030:
031: private static final int ALLOWED_LWM_DELTA = 100;
032: private final Set applied = new HashSet();
033: private final SortedMap globalTransactionIDs = new TreeMap();
034:
035: private GlobalTransactionID lowWatermark = GlobalTransactionID.NULL_ID;
036: private final RemoteTransactionManager remoteTransactionManager;
037: private int ignoredCount = 0;
038:
039: public ClientGlobalTransactionManagerImpl(
040: RemoteTransactionManager remoteTransactionManager) {
041: this .remoteTransactionManager = remoteTransactionManager;
042: }
043:
044: // For testing
045: public synchronized int size() {
046: return applied.size();
047: }
048:
049: // For testing
050: public int getAllowedLowWaterMarkDelta() {
051: return ALLOWED_LWM_DELTA;
052: }
053:
054: public synchronized boolean startApply(NodeID committerID,
055: TransactionID transactionID, GlobalTransactionID gtxID) {
056: if (gtxID.lessThan(getLowGlobalTransactionIDWatermark())) {
057: // formatting
058: throw new UnknownTransactionError(
059: "Attempt to apply a transaction lower than the low watermark: gtxID = "
060: + gtxID + ", low watermark = "
061: + getLowGlobalTransactionIDWatermark());
062: }
063: ServerTransactionID serverTransactionID = new ServerTransactionID(
064: committerID, transactionID);
065: globalTransactionIDs.put(gtxID, serverTransactionID);
066: return applied.add(serverTransactionID);
067: }
068:
069: public synchronized GlobalTransactionID getLowGlobalTransactionIDWatermark() {
070: return lowWatermark;
071: }
072:
073: public synchronized void setLowWatermark(
074: GlobalTransactionID lowWatermark) {
075: if (this .lowWatermark.toLong() > lowWatermark.toLong()) {
076: // XXX::This case is possible when the server crashes (both in diskbased and lanbased) Eventually the server will
077: // catch up
078: logger
079: .warn("Low water mark lower than exisiting one : mine : "
080: + this .lowWatermark
081: + " server sent : "
082: + lowWatermark);
083: return;
084: }
085: if (this .lowWatermark.toLong() + ALLOWED_LWM_DELTA > lowWatermark
086: .toLong()) {
087: if (ignoredCount++ > ALLOWED_LWM_DELTA * 100) {
088: logger.warn("Current Low water Mark = "
089: + this .lowWatermark + " Server sent "
090: + lowWatermark);
091: logger
092: .warn("Server didnt send a Low water mark higher than ALLOWED_LWM_DELTA for "
093: + ignoredCount
094: + " times. applied.size() = "
095: + applied.size() + " Resetting count.");
096: ignoredCount = 0;
097: }
098: return;
099: }
100: this .ignoredCount = 0;
101: this .lowWatermark = lowWatermark;
102: Map toDelete = globalTransactionIDs.headMap(lowWatermark);
103: for (Iterator i = toDelete.entrySet().iterator(); i.hasNext();) {
104: Entry e = (Entry) i.next();
105: ServerTransactionID stxID = (ServerTransactionID) e
106: .getValue();
107: i.remove();
108: applied.remove(stxID);
109: }
110: }
111:
112: public void flush(LockID lockID) {
113: remoteTransactionManager.flush(lockID);
114: }
115:
116: public boolean isTransactionsForLockFlushed(LockID lockID,
117: LockFlushCallback callback) {
118: return remoteTransactionManager.isTransactionsForLockFlushed(
119: lockID, callback);
120: }
121:
122: public void unpause() {
123: remoteTransactionManager.unpause();
124: }
125:
126: public void pause() {
127: remoteTransactionManager.pause();
128: }
129:
130: public void resendOutstanding() {
131: remoteTransactionManager.resendOutstanding();
132: }
133:
134: public Collection getTransactionSequenceIDs() {
135: return remoteTransactionManager.getTransactionSequenceIDs();
136: }
137:
138: public Collection getResentTransactionIDs() {
139: return remoteTransactionManager.getResentTransactionIDs();
140: }
141:
142: public void starting() {
143: remoteTransactionManager.starting();
144: }
145:
146: public void resendOutstandingAndUnpause() {
147: remoteTransactionManager.resendOutstandingAndUnpause();
148: }
149:
150: }
|