001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.gtx;
006:
007: import com.tc.logging.TCLogger;
008: import com.tc.logging.TCLogging;
009: import com.tc.net.groups.NodeID;
010: import com.tc.object.gtx.GlobalTransactionID;
011: import com.tc.object.gtx.GlobalTransactionManager;
012: import com.tc.object.tx.ServerTransactionID;
013: import com.tc.objectserver.tx.ServerTransactionListener;
014: import com.tc.objectserver.tx.ServerTransactionManager;
015: import com.tc.util.State;
016:
017: import java.util.Collection;
018: import java.util.HashSet;
019: import java.util.Iterator;
020: import java.util.Set;
021:
022: public class GlobalTransactionIDLowWaterMarkProvider implements
023: GlobalTransactionManager, ServerTransactionListener {
024:
025: private static final TCLogger logger = TCLogging
026: .getLogger(GlobalTransactionIDLowWaterMarkProvider.class);
027: private static final State INITIAL = new State("INITAL");
028: private static final State STARTED = new State("STARTED");
029:
030: private final ServerTransactionManager transactionManager;
031: private final GlobalTransactionManager gtxm;
032:
033: private volatile GlobalTransactionManager lwmProvider;
034: private State state = INITIAL;
035:
036: private final Set resentTxns = new HashSet();
037:
038: private final GlobalTransactionManager NULL_GLOBAL_TXN_MGR = new GlobalTransactionManager() {
039: public GlobalTransactionID getLowGlobalTransactionIDWatermark() {
040: return GlobalTransactionID.NULL_ID;
041: }
042: };
043:
044: public GlobalTransactionIDLowWaterMarkProvider(
045: ServerTransactionManager transactionManager,
046: GlobalTransactionManager gtxm) {
047: this .transactionManager = transactionManager;
048: this .gtxm = gtxm;
049: this .lwmProvider = NULL_GLOBAL_TXN_MGR;
050: }
051:
052: public void goToActiveMode() {
053: transactionManager.addTransactionListener(this );
054: }
055:
056: public GlobalTransactionID getLowGlobalTransactionIDWatermark() {
057: return lwmProvider.getLowGlobalTransactionIDWatermark();
058: }
059:
060: public synchronized void addResentServerTransactionIDs(
061: Collection stxIDs) {
062: resentTxns.addAll(stxIDs);
063: }
064:
065: public synchronized void clearAllTransactionsFor(NodeID deadNode) {
066: for (Iterator i = resentTxns.iterator(); i.hasNext();) {
067: ServerTransactionID sid = (ServerTransactionID) i.next();
068: if (sid.getSourceID().equals(deadNode)) {
069: i.remove();
070: }
071: }
072: switchLWMProviderIfReady();
073: }
074:
075: private void switchLWMProviderIfReady() {
076: if (resentTxns.isEmpty() && state == STARTED) {
077: logger
078: .info("Switching GlobalTransactionID Low Water mark provider since all resent transactions are applied");
079: this .lwmProvider = gtxm;
080: this .transactionManager.removeTransactionListener(this );
081: }
082: }
083:
084: public void incomingTransactions(NodeID source, Set serverTxnIDs) {
085: // NOP
086: }
087:
088: public void transactionApplied(ServerTransactionID stxID) {
089: // NOP
090: }
091:
092: public synchronized void transactionCompleted(
093: ServerTransactionID stxID) {
094: resentTxns.remove(stxID);
095: switchLWMProviderIfReady();
096: }
097:
098: public synchronized void transactionManagerStarted(Set cids) {
099: state = STARTED;
100: removeAllExceptFrom(cids);
101: switchLWMProviderIfReady();
102: }
103:
104: private void removeAllExceptFrom(Set cids) {
105: for (Iterator i = resentTxns.iterator(); i.hasNext();) {
106: ServerTransactionID sid = (ServerTransactionID) i.next();
107: if (!cids.contains(sid.getSourceID())) {
108: i.remove();
109: }
110: }
111: }
112: }
|