001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.state;
007: import com.tc.l2.ha.WeightGeneratorFactory;
008: import com.tc.l2.msg.L2StateMessage;
009: import com.tc.l2.msg.L2StateMessageFactory;
010: import com.tc.logging.TCLogger;
011: import com.tc.logging.TCLogging;
012: import com.tc.net.groups.GroupException;
013: import com.tc.net.groups.GroupManager;
014: import com.tc.net.groups.GroupMessage;
015: import com.tc.net.groups.GroupResponse;
016: import com.tc.net.groups.NodeID;
017: import com.tc.net.groups.NodeIDImpl;
018: import com.tc.util.Assert;
019: import com.tc.util.State;
021: import java.util.HashMap;
022: import java.util.Iterator;
023: import java.util.Map;
025: public class ElectionManagerImpl implements ElectionManager {
027: private static final TCLogger logger = TCLogging
028: .getLogger(ElectionManagerImpl.class);
030: private static final State INIT = new State("Initial-State");
031: private static final State ELECTION_COMPLETE = new State(
032: "Election-Complete");
033: private static final State ELECTION_IN_PROGRESS = new State(
034: "Election-In-Progress");
036: private final GroupManager groupManager;
037: private final Map votes = new HashMap();
039: private State state = INIT;
041: // XXX::NOTE:: These variables are not reset until next election
042: private Enrollment myVote = null;
043: private Enrollment winner;
045: private final long electionTime;
047: public ElectionManagerImpl(GroupManager groupManager,
048: StateManagerConfig stateManagerConfig) {
049: this .groupManager = groupManager;
050: electionTime = stateManagerConfig.getElectionTimeInSecs() * 1000;
051: }
053: public synchronized boolean handleStartElectionRequest(
054: L2StateMessage msg) {
055: Assert.assertEquals(L2StateMessage.START_ELECTION, msg
056: .getType());
057: if (state == ELECTION_IN_PROGRESS
058: && (myVote.isANewCandidate() || !msg.getEnrollment()
059: .isANewCandidate())) {
060: // Another node is also joining in the election process, Cast its vote and notify my vote
061: // Note : WE dont want to do this for new candidates when we are not new.
062: Enrollment vote = msg.getEnrollment();
063: Enrollment old = (Enrollment) votes.put(vote.getNodeID(),
064: vote);
065: boolean sendResponse = msg.inResponseTo().isNull();
066: if (old != null && !vote.equals(old)) {
067: logger
068: .warn("Received duplicate vote : Replacing with new one : "
069: + vote + " old one : " + old);
070: sendResponse = true;
071: }
072: if (sendResponse) {
073: // This is either not a response to this node initiating election or a duplicate vote. Either case notify this
074: // nodes vote
075: GroupMessage response = createElectionStartedMessage(
076: msg, myVote);
077: logger.info("Casted vote from " + msg
078: + " My Response : " + response);
079: try {
080: groupManager.sendTo(msg.messageFrom(), response);
081: } catch (GroupException e) {
082: logger.error("Error sending Votes to : "
083: + msg.messageFrom(), e);
084: }
085: } else {
086: logger.info("Casted vote from " + msg);
087: }
088: return true;
089: } else {
090: logger.info("Ignoring Start Election Request : " + msg
091: + " My state = " + state);
092: return false;
093: }
094: }
096: public synchronized void handleElectionAbort(L2StateMessage msg) {
097: Assert.assertEquals(L2StateMessage.ABORT_ELECTION, msg
098: .getType());
099: if (state == ELECTION_IN_PROGRESS) {
100: // An existing ACTIVE Node has forced election to quit
101: Assert.assertNotNull(myVote);
102: basicAbort(msg);
103: } else {
104: logger.warn("Ignoring Abort Election Request : " + msg
105: + " My state = " + state);
106: }
107: }
109: public synchronized void handleElectionResultMessage(
110: L2StateMessage msg) {
111: Assert.assertEquals(L2StateMessage.ELECTION_RESULT, msg
112: .getType());
113: if (state == ELECTION_COMPLETE
114: && !this .winner.equals(msg.getEnrollment())) {
115: // conflict
116: GroupMessage resultConflict = L2StateMessageFactory
117: .createResultConflictMessage(msg, this .winner);
118: logger
119: .warn("WARNING :: Election result conflict : Winner local = "
120: + this .winner
121: + " : remote winner = "
122: + msg.getEnrollment());
123: try {
124: groupManager.sendTo(msg.messageFrom(), resultConflict);
125: } catch (GroupException e) {
126: logger
127: .error("Error sending Election result conflict message : "
128: + resultConflict);
129: }
130: } else {
131: // Agree to the result, abort the election if necessary
132: if (state == ELECTION_IN_PROGRESS) {
133: basicAbort(msg);
134: }
135: GroupMessage resultAgreed = L2StateMessageFactory
136: .createResultAgreedMessage(msg, msg.getEnrollment());
137: logger.info("Agreed with Election Result from "
138: + msg.messageFrom() + " : " + resultAgreed);
139: try {
140: groupManager.sendTo(msg.messageFrom(), resultAgreed);
141: } catch (GroupException e) {
142: logger
143: .error("Error sending Election result agreed message : "
144: + resultAgreed);
145: }
146: }
147: }
149: private void basicAbort(L2StateMessage msg) {
150: reset(msg.getEnrollment());
151: logger.info("Aborted Election : Winner is : " + this .winner);
152: }
154: /**
155: * This method is called by the winner of the election to announce to the world
156: */
157: public synchronized void declareWinner(NodeID myNodeId) {
158: Assert.assertEquals(winner.getNodeID(), myNodeId);
159: GroupMessage msg = createElectionWonMessage(this .winner);
160: try {
161: this .groupManager.sendAll(msg);
162: } catch (GroupException e) {
163: logger.error("Error declaring results : ", e);
164: }
165: logger.info("Declared as Winner: Winner is : " + this .winner);
166: reset(winner);
167: }
169: public synchronized void reset(Enrollment winningEnrollment) {
170: this .winner = winningEnrollment;
171: this .state = INIT;
172: this .votes.clear();
173: this .myVote = null;
174: notifyAll();
175: }
177: public NodeID runElection(NodeID myNodeId, boolean isNew,
178: WeightGeneratorFactory weightsFactory) {
179: NodeID winnerID = NodeIDImpl.NULL_ID;
180: int count = 0;
181: while (winnerID.isNull()) {
182: if (count++ > 0) {
183: logger.info("Requesting Re-election !!! count = "
184: + count);
185: }
186: try {
187: winnerID = doElection(myNodeId, isNew, weightsFactory);
188: } catch (GroupException e1) {
189: logger.error("Error during election : ", e1);
190: reset(null);
191: }
192: }
193: return winnerID;
194: }
196: private synchronized void electionStarted(Enrollment e) {
197: if (this .state == ELECTION_IN_PROGRESS) {
198: throw new AssertionError("Election Already in Progress");
199: }
200: this .state = ELECTION_IN_PROGRESS;
201: this .myVote = e;
202: this .winner = null;
203: this .votes.clear();
204: this .votes.put(e.getNodeID(), e); // Cast my vote
205: logger.info("Election Started : " + e);
206: }
208: private NodeID doElection(NodeID myNodeId, boolean isNew,
209: WeightGeneratorFactory weightsFactory)
210: throws GroupException {
212: // Step 1: publish to cluster NodeID, weight and election start
213: Enrollment e = EnrollmentFactory.createEnrollment(myNodeId,
214: isNew, weightsFactory);
215: electionStarted(e);
217: GroupMessage msg = createElectionStartedMessage(e);
218: groupManager.sendAll(msg);
220: // Step 2: Wait for election completion
221: waitTillElectionComplete();
223: // Step 3: Compute Winner
224: Enrollment lWinner = computeResult();
225: if (lWinner != e) {
226: logger.info("Election lost : Winner is : " + lWinner);
227: Assert.assertNotNull(lWinner);
228: return lWinner.getNodeID();
229: }
230: // Step 4 : local host won the election, so notify world for acceptance
231: msg = createElectionResultMessage(e);
232: GroupResponse responses = groupManager
233: .sendAllAndWaitForResponse(msg);
234: for (Iterator i = responses.getResponses().iterator(); i
235: .hasNext();) {
236: L2StateMessage response = (L2StateMessage) i.next();
237: Assert.assertEquals(msg.getMessageID(), response
238: .inResponseTo());
239: if (response.getType() == L2StateMessage.RESULT_AGREED) {
240: Assert.assertEquals(e, response.getEnrollment());
241: } else if (response.getType() == L2StateMessage.RESULT_CONFLICT) {
242: logger.info("Result Conflict: Local Result : " + e
243: + " From : " + response.messageFrom()
244: + " Result : " + response.getEnrollment());
245: return NodeIDImpl.NULL_ID;
246: } else {
247: throw new AssertionError(
248: "Node : "
249: + response.messageFrom()
250: + " responded neither with RESULT_AGREED or RESULT_CONFLICT :"
251: + response);
252: }
253: }
255: // Step 5 : result agreed - I am the winner
256: return myNodeId;
257: }
259: private synchronized Enrollment computeResult() {
260: if (state == ELECTION_IN_PROGRESS) {
262: logger.info("Election Complete : " + votes.values() + " : "
263: + state);
264: winner = countVotes();
265: }
266: return winner;
267: }
269: private Enrollment countVotes() {
270: Enrollment computedWinner = null;
271: for (Iterator i = votes.values().iterator(); i.hasNext();) {
272: Enrollment e = (Enrollment) i.next();
273: if (computedWinner == null) {
274: computedWinner = e;
275: } else if (e.wins(computedWinner)) {
276: computedWinner = e;
277: }
278: }
279: Assert.assertNotNull(computedWinner);
280: return computedWinner;
281: }
283: private synchronized void waitTillElectionComplete() {
284: long start = System.currentTimeMillis();
285: long diff = electionTime;
286: while (state == ELECTION_IN_PROGRESS && diff > 0) {
287: try {
288: wait(diff);
289: } catch (InterruptedException e) {
290: logger.error("Interrupted during election : ", e);
291: break;
292: }
293: diff = diff - (System.currentTimeMillis() - start);
294: }
295: }
297: private GroupMessage createElectionStartedMessage(Enrollment e) {
298: return L2StateMessageFactory.createElectionStartedMessage(e);
299: }
301: private GroupMessage createElectionWonMessage(Enrollment e) {
302: return L2StateMessageFactory.createElectionWonMessage(e);
303: }
305: private GroupMessage createElectionResultMessage(Enrollment e) {
306: return L2StateMessageFactory.createElectionResultMessage(e);
307: }
309: private GroupMessage createElectionStartedMessage(
310: L2StateMessage msg, Enrollment e) {
311: return L2StateMessageFactory.createElectionStartedMessage(msg,
312: e);
313: }
315: }