001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.state;
006:
007: import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
008:
009: import com.tc.async.api.Sink;
010: import com.tc.l2.context.StateChangedEvent;
011: import com.tc.l2.ha.L2HAZapNodeRequestProcessor;
012: import com.tc.l2.ha.WeightGeneratorFactory;
013: import com.tc.l2.msg.L2StateMessage;
014: import com.tc.l2.msg.L2StateMessageFactory;
015: import com.tc.logging.TCLogger;
016: import com.tc.logging.TCLogging;
017: import com.tc.net.groups.GroupException;
018: import com.tc.net.groups.GroupManager;
019: import com.tc.net.groups.GroupMessage;
020: import com.tc.net.groups.NodeID;
021: import com.tc.net.groups.NodeIDImpl;
022: import com.tc.util.Assert;
023: import com.tc.util.State;
024:
025: import java.util.Iterator;
026:
027: public class StateManagerImpl implements StateManager {
028:
029: private static final TCLogger logger = TCLogging
030: .getLogger(StateManagerImpl.class);
031:
032: private final TCLogger consoleLogger;
033: private final GroupManager groupManager;
034: private final ElectionManager electionMgr;
035: private final Sink stateChangeSink;
036: private final WeightGeneratorFactory weightsFactory;
037:
038: private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
039: private final Object electionLock = new Object();
040:
041: private NodeID activeNode = NodeIDImpl.NULL_ID;
042: private volatile State state = START_STATE;
043: private boolean electionInProgress = false;
044:
045: public StateManagerImpl(TCLogger consoleLogger,
046: GroupManager groupManager, Sink stateChangeSink,
047: StateManagerConfig stateManagerConfig,
048: WeightGeneratorFactory weightFactory) {
049: this .consoleLogger = consoleLogger;
050: this .groupManager = groupManager;
051: this .stateChangeSink = stateChangeSink;
052: this .weightsFactory = weightFactory;
053: this .electionMgr = new ElectionManagerImpl(groupManager,
054: stateManagerConfig);
055: }
056:
057: /*
058: * XXX:: If ACTIVE went dead before any passive moved to STANDBY state, then the cluster is hung and there is no going
059: * around it. If ACTIVE in persistent mode, it can come back and recover the cluster
060: */
061: public void startElection() {
062: synchronized (electionLock) {
063: if (electionInProgress)
064: return;
065: electionInProgress = true;
066: }
067: try {
068: if (state == START_STATE) {
069: runElection(true);
070: } else if (state == PASSIVE_STANDBY) {
071: runElection(false);
072: } else {
073: info("Ignoring Election request since not in right state");
074: }
075: } finally {
076: synchronized (electionLock) {
077: electionInProgress = false;
078: }
079: }
080: }
081:
082: private void runElection(boolean isNew) {
083: NodeID myNodeID = getLocalNodeID();
084: NodeID winner = electionMgr.runElection(myNodeID, isNew,
085: weightsFactory);
086: if (winner == myNodeID) {
087: moveToActiveState();
088: } else {
089: electionMgr.reset(null);
090: }
091: }
092:
093: private NodeID getLocalNodeID() {
094: try {
095: return groupManager.getLocalNodeID();
096: } catch (GroupException e) {
097: throw new AssertionError(e);
098: }
099: }
100:
101: public void registerForStateChangeEvents(
102: StateChangeListener listener) {
103: listeners.add(listener);
104: }
105:
106: public void fireStateChangedEvent(StateChangedEvent sce) {
107: for (Iterator i = listeners.iterator(); i.hasNext();) {
108: StateChangeListener listener = (StateChangeListener) i
109: .next();
110: listener.l2StateChanged(sce);
111: }
112: }
113:
114: private synchronized void moveToPassiveState(
115: Enrollment winningEnrollment) {
116: electionMgr.reset(winningEnrollment);
117: if (state == START_STATE) {
118: state = winningEnrollment.isANewCandidate() ? PASSIVE_STANDBY
119: : PASSIVE_UNINTIALIZED;
120: info("Moved to " + state, true);
121: stateChangeSink.add(new StateChangedEvent(START_STATE,
122: state));
123: } else if (state == ACTIVE_COORDINATOR) {
124: // TODO:: Support this later
125: throw new AssertionError("Cant move to "
126: + PASSIVE_UNINTIALIZED + " from "
127: + ACTIVE_COORDINATOR + " at least for now");
128: }
129: }
130:
131: public synchronized void moveToPassiveStandbyState() {
132: if (state == ACTIVE_COORDINATOR) {
133: // TODO:: Support this later
134: throw new AssertionError("Cant move to " + PASSIVE_STANDBY
135: + " from " + ACTIVE_COORDINATOR
136: + " at least for now");
137: } else if (state != PASSIVE_STANDBY) {
138: stateChangeSink.add(new StateChangedEvent(state,
139: PASSIVE_STANDBY));
140: state = PASSIVE_STANDBY;
141: info("Moved to " + state, true);
142: } else {
143: info("Already in " + state);
144: }
145: }
146:
147: private synchronized void moveToActiveState() {
148: if (state == START_STATE || state == PASSIVE_STANDBY) {
149: // TODO :: If state == START_STATE publish cluster ID
150: StateChangedEvent event = new StateChangedEvent(state,
151: ACTIVE_COORDINATOR);
152: state = ACTIVE_COORDINATOR;
153: this .activeNode = getLocalNodeID();
154: info("Becoming " + state, true);
155: electionMgr.declareWinner(this .activeNode);
156: stateChangeSink.add(event);
157: } else {
158: throw new AssertionError("Cant move to "
159: + ACTIVE_COORDINATOR + " from " + state);
160: }
161: }
162:
163: public synchronized NodeID getActiveNodeID() {
164: return activeNode;
165: }
166:
167: public boolean isActiveCoordinator() {
168: return (state == ACTIVE_COORDINATOR);
169: }
170:
171: public void moveNodeToPassiveStandby(NodeID nodeID) {
172: Assert.assertTrue(isActiveCoordinator());
173: logger.info("Requesting node " + nodeID + " to move to "
174: + PASSIVE_STANDBY);
175: GroupMessage msg = L2StateMessageFactory
176: .createMoveToPassiveStandbyMessage(EnrollmentFactory
177: .createTrumpEnrollment(getLocalNodeID(),
178: weightsFactory));
179: try {
180: this .groupManager.sendTo(nodeID, msg);
181: } catch (GroupException e) {
182: logger.error(e);
183: }
184: }
185:
186: public void handleClusterStateMessage(L2StateMessage clusterMsg) {
187: try {
188: switch (clusterMsg.getType()) {
189: case L2StateMessage.START_ELECTION:
190: handleStartElectionRequest(clusterMsg);
191: break;
192: case L2StateMessage.ABORT_ELECTION:
193: handleElectionAbort(clusterMsg);
194: break;
195: case L2StateMessage.ELECTION_RESULT:
196: handleElectionResultMessage(clusterMsg);
197: break;
198: case L2StateMessage.ELECTION_WON:
199: handleElectionWonMessage(clusterMsg);
200: break;
201: case L2StateMessage.MOVE_TO_PASSIVE_STANDBY:
202: handleMoveToPassiveStandbyMessage(clusterMsg);
203: break;
204: default:
205: throw new AssertionError(
206: "This message shouldn't have been routed here : "
207: + clusterMsg);
208: }
209: } catch (GroupException ge) {
210: logger.error(
211: "Zapping Node : Caught Exception while handling Message : "
212: + clusterMsg, ge);
213: groupManager.zapNode(clusterMsg.messageFrom(),
214: L2HAZapNodeRequestProcessor.COMMUNICATION_ERROR,
215: "Error handling Election Message "
216: + L2HAZapNodeRequestProcessor
217: .getErrorString(ge));
218: }
219: }
220:
221: private void handleMoveToPassiveStandbyMessage(
222: L2StateMessage clusterMsg) {
223: moveToPassiveStandbyState();
224: }
225:
226: private synchronized void handleElectionWonMessage(
227: L2StateMessage clusterMsg) {
228: Enrollment winningEnrollment = clusterMsg.getEnrollment();
229: if (state == ACTIVE_COORDINATOR) {
230: // Cant get Election Won from another node : Split brain
231: String error = state + " Received Election Won Msg : "
232: + clusterMsg + ". Possible split brain detected";
233: logger.error(error);
234: groupManager.zapNode(winningEnrollment.getNodeID(),
235: L2HAZapNodeRequestProcessor.SPLIT_BRAIN, error);
236: } else {
237: this .activeNode = winningEnrollment.getNodeID();
238: moveToPassiveState(winningEnrollment);
239: }
240: }
241:
242: private synchronized void handleElectionResultMessage(
243: L2StateMessage msg) throws GroupException {
244: if (activeNode.equals(msg.getEnrollment().getNodeID())) {
245: Assert.assertFalse(NodeIDImpl.NULL_ID.equals(activeNode));
246: // This wouldnt normally happen, but we agree - so ack
247: GroupMessage resultAgreed = L2StateMessageFactory
248: .createResultAgreedMessage(msg, msg.getEnrollment());
249: logger.info("Agreed with Election Result from "
250: + msg.messageFrom() + " : " + resultAgreed);
251: groupManager.sendTo(msg.messageFrom(), resultAgreed);
252: } else if (state == ACTIVE_COORDINATOR
253: || !activeNode.isNull()
254: || (msg.getEnrollment().isANewCandidate() && state != START_STATE)) {
255: // Condition 1 :
256: // Obviously an issue.
257: // Condition 2 :
258: // This shouldn't happen normally, but is possible when there is some weird network error where A sees B,
259: // B sees A/C and C sees B and A is active and C is trying to run election
260: // Force other node to rerun election so that we can abort
261: // Condition 3 :
262: // We dont want new L2s to win an election when there are old L2s in PASSIVE states.
263: GroupMessage resultConflict = L2StateMessageFactory
264: .createResultConflictMessage(msg, EnrollmentFactory
265: .createTrumpEnrollment(getLocalNodeID(),
266: weightsFactory));
267: warn("WARNING :: Active Node = "
268: + activeNode
269: + " , "
270: + state
271: + " received ELECTION_RESULT message from another node : "
272: + msg + " : Forcing re-election " + resultConflict);
273: groupManager.sendTo(msg.messageFrom(), resultConflict);
274: } else {
275: electionMgr.handleElectionResultMessage(msg);
276: }
277: }
278:
279: private void handleElectionAbort(L2StateMessage clusterMsg) {
280: if (state == ACTIVE_COORDINATOR) {
281: // Cant get Abort back to ACTIVE, if so then there is a split brain
282: String error = state
283: + " Received Abort Election Msg : Possible split brain detected ";
284: logger.error(error);
285: groupManager.zapNode(clusterMsg.messageFrom(),
286: L2HAZapNodeRequestProcessor.SPLIT_BRAIN, error);
287: } else {
288: electionMgr.handleElectionAbort(clusterMsg);
289: }
290: }
291:
292: private void handleStartElectionRequest(L2StateMessage msg)
293: throws GroupException {
294: if (state == ACTIVE_COORDINATOR) {
295: // This is either a new L2 joining a cluster or a renegade L2. Force it to abort
296: GroupMessage abortMsg = L2StateMessageFactory
297: .createAbortElectionMessage(msg, EnrollmentFactory
298: .createTrumpEnrollment(getLocalNodeID(),
299: weightsFactory));
300: info("Forcing Abort Election for " + msg + " with "
301: + abortMsg);
302: groupManager.sendTo(msg.messageFrom(), abortMsg);
303: } else if (!electionMgr.handleStartElectionRequest(msg)) {
304: // TODO::FIXME:: Commenting so that stage thread is not held up doind election.
305: // startElectionIfNecessary(NodeID.NULL_ID);
306: logger
307: .warn("Not starting election as it was commented out");
308: }
309: }
310:
311: // notify new node
312: public void publishActiveState(NodeID nodeID) throws GroupException {
313: Assert.assertTrue(isActiveCoordinator());
314: GroupMessage msg = L2StateMessageFactory
315: .createElectionWonMessage(EnrollmentFactory
316: .createTrumpEnrollment(getLocalNodeID(),
317: weightsFactory));
318: groupManager.sendTo(nodeID, msg);
319: }
320:
321: public void startElectionIfNecessary(NodeID disconnectedNode) {
322: Assert.assertFalse(disconnectedNode.equals(getLocalNodeID()));
323: boolean elect = false;
324: synchronized (this ) {
325: if (activeNode.equals(disconnectedNode)) {
326: // ACTIVE Node is gone
327: activeNode = NodeIDImpl.NULL_ID;
328: }
329: if (state != PASSIVE_UNINTIALIZED
330: && state != ACTIVE_COORDINATOR
331: && activeNode.isNull()) {
332: elect = true;
333: }
334: }
335: if (elect) {
336: info("Starting Election to determine cluser wide ACTIVE L2");
337: startElection();
338: }
339: }
340:
341: private void info(String message) {
342: info(message, false);
343: }
344:
345: private void info(String message, boolean console) {
346: logger.info(message);
347: if (console) {
348: consoleLogger.info(message);
349: }
350: }
351:
352: private void warn(String message) {
353: warn(message, false);
354: }
355:
356: private void warn(String message, boolean console) {
357: logger.warn(message);
358: if (console) {
359: consoleLogger.warn(message);
360: }
361: }
362: }
|