001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.ha;
006:
007: import com.tc.l2.state.StateManager;
008: import com.tc.logging.TCLogger;
009: import com.tc.logging.TCLogging;
010: import com.tc.net.protocol.transport.ConnectionID;
011: import com.tc.net.protocol.transport.ConnectionIDFactory;
012: import com.tc.objectserver.gtx.GlobalTransactionIDSequenceProvider;
013: import com.tc.objectserver.persistence.api.PersistentMapStore;
014: import com.tc.text.Banner;
015: import com.tc.util.Assert;
016: import com.tc.util.State;
017: import com.tc.util.sequence.ObjectIDSequence;
018:
019: import java.util.HashSet;
020: import java.util.Set;
021:
022: public class ClusterState {
023:
024: private static final TCLogger logger = TCLogging
025: .getLogger(ClusterState.class);
026:
027: private static final String L2_STATE_KEY = "CLUSTER_STATE::L2_STATE_KEY";
028: private static final String CLUSTER_ID_KEY = "CLUSTER_STATE::CLUSTER_ID_KEY";
029:
030: private final PersistentMapStore clusterStateStore;
031: private final ObjectIDSequence oidSequence;
032: private final ConnectionIDFactory connectionIdFactory;
033: private final GlobalTransactionIDSequenceProvider gidSequenceProvider;
034:
035: private final Set connections = new HashSet();
036: private long nextAvailObjectID = -1;
037: private long nextAvailChannelID = -1;
038: private long nextAvailGlobalTxnID = -1;
039: private State currentState;
040: private String clusterID;
041:
042: public ClusterState(PersistentMapStore clusterStateStore,
043: ObjectIDSequence oidSequence,
044: ConnectionIDFactory connectionIdFactory,
045: GlobalTransactionIDSequenceProvider gidSequenceProvider) {
046: this .clusterStateStore = clusterStateStore;
047: this .oidSequence = oidSequence;
048: this .connectionIdFactory = connectionIdFactory;
049: this .gidSequenceProvider = gidSequenceProvider;
050: this .clusterID = clusterStateStore.get(CLUSTER_ID_KEY);
051: validateStartupState(clusterStateStore.get(L2_STATE_KEY));
052: }
053:
054: private void validateStartupState(String stateStr) {
055: if (stateStr != null) {
056: State stateB4Crash = new State(stateStr);
057: if (!StateManager.ACTIVE_COORDINATOR.equals(stateB4Crash)) {
058: /*
059: * The server is running in persisitent mode and this instance of the server was not the ACTIVE server before
060: * the crash. Force user to clean up the DB so that this server can resync state from the ACTIVE.
061: */
062: throw new AssertionError(
063: Banner
064: .makeBanner(
065: "This server is running with persistence turned on and was stopped in "
066: + stateStr
067: + " state. Only the "
068: + StateManager.ACTIVE_COORDINATOR
069: .getName()
070: + " server is allowed "
071: + " to be restarted without cleaning up the data directory with persistence turned on.\n\n"
072: + "Please clean up the data directory and make sure that the "
073: + StateManager.ACTIVE_COORDINATOR
074: .getName()
075: + " is up and running before starting this server. It is important that the "
076: + StateManager.ACTIVE_COORDINATOR
077: .getName()
078: + " is up and running before starting this server else you might end up losing data",
079: "ERROR"));
080: }
081: }
082: }
083:
084: public void setNextAvailableObjectID(long nextAvailOID) {
085: if (nextAvailOID < nextAvailObjectID) {
086: // Could happen when two actives fight it out. Dont want to assert, let the state manager fight it out.
087: logger
088: .error("Trying to set Next Available ObjectID to a lesser value : known = "
089: + nextAvailObjectID
090: + " new value = "
091: + nextAvailOID + " IGNORING");
092: return;
093: }
094: this .nextAvailObjectID = nextAvailOID;
095: }
096:
097: public long getNextAvailableObjectID() {
098: return nextAvailObjectID;
099: }
100:
101: public long getNextAvailableChannelID() {
102: return nextAvailChannelID;
103: }
104:
105: public long getNextAvailableGlobalTxnID() {
106: return nextAvailGlobalTxnID;
107: }
108:
109: public void setNextAvailableGlobalTransactionID(long nextAvailGID) {
110: if (nextAvailGID < nextAvailGlobalTxnID) {
111: // Could happen when two actives fight it out. Dont want to assert, let the state manager fight it out.
112: logger
113: .error("Trying to set Next Available Global Txn ID to a lesser value : known = "
114: + nextAvailGlobalTxnID
115: + " new value = "
116: + nextAvailGID + " IGNORING");
117: return;
118: }
119: this .nextAvailGlobalTxnID = nextAvailGID;
120: }
121:
122: public void setNextAvailableChannelID(long nextAvailableCID) {
123: if (nextAvailableCID < nextAvailChannelID) {
124: // Could happen when two actives fight it out. Dont want to assert, let the state manager fight it out.
125: logger
126: .error("Trying to set Next Available ChannelID to a lesser value : known = "
127: + nextAvailChannelID
128: + " new value = "
129: + nextAvailableCID + " IGNORING");
130: return;
131: }
132: this .nextAvailChannelID = nextAvailableCID;
133: }
134:
135: public void syncInternal() {
136: syncConnectionIDs();
137: syncOIDSequence();
138: syncGIDSequence();
139: }
140:
141: private void syncConnectionIDs() {
142: Assert.assertNotNull(clusterID);
143: connectionIdFactory.init(clusterID, nextAvailChannelID,
144: connections);
145: }
146:
147: public String getClusterID() {
148: return clusterID;
149: }
150:
151: public void setClusterID(String uid) {
152: if (clusterID != null && !clusterID.equals(uid)) {
153: logger.error("Cluster ID doesnt match !! Mine : "
154: + clusterID + " Active sent clusterID as : " + uid);
155: throw new AssertionError("ClusterIDs dont match : "
156: + clusterID + " <-> " + uid);
157: }
158: clusterID = uid;
159: synchClusterIDToDB();
160: }
161:
162: private void synchClusterIDToDB() {
163: clusterStateStore.put(CLUSTER_ID_KEY, clusterID);
164: }
165:
166: private void syncOIDSequence() {
167: long nextOID = getNextAvailableObjectID();
168: if (nextOID != -1) {
169: logger.info("Setting the Next Available OID to " + nextOID);
170: this .oidSequence.setNextAvailableObjectID(nextOID);
171: }
172: }
173:
174: private void syncGIDSequence() {
175: long nextGID = getNextAvailableGlobalTxnID();
176: if (nextGID != -1) {
177: logger
178: .info("Setting the Next Available Global Transaction ID to "
179: + nextGID);
180: this .gidSequenceProvider.setNextAvailableGID(nextGID);
181: }
182: }
183:
184: public void setCurrentState(State state) {
185: this .currentState = state;
186: syncCurrentStateToDB();
187: }
188:
189: private void syncCurrentStateToDB() {
190: clusterStateStore.put(L2_STATE_KEY, currentState.getName());
191: }
192:
193: public void addNewConnection(ConnectionID connID) {
194: if (connID.getChannelID() >= nextAvailChannelID) {
195: nextAvailChannelID = connID.getChannelID() + 1;
196: }
197: connections.add(connID);
198: }
199:
200: public void removeConnection(ConnectionID connectionID) {
201: boolean removed = connections.remove(connectionID);
202: if (!removed) {
203: logger.warn("Connection ID not found : " + connectionID
204: + " Current Connections count : "
205: + connections.size());
206: }
207: }
208:
209: public Set getAllConnections() {
210: return new HashSet(connections);
211: }
212: }
|