001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.ha;
006:
007: import com.tc.async.api.Sink;
008: import com.tc.exception.TCRuntimeException;
009: import com.tc.l2.api.ReplicatedClusterStateManager;
010: import com.tc.l2.msg.ClusterStateMessage;
011: import com.tc.l2.msg.ClusterStateMessageFactory;
012: import com.tc.l2.state.StateManager;
013: import com.tc.logging.TCLogger;
014: import com.tc.logging.TCLogging;
015: import com.tc.net.groups.ClientID;
016: import com.tc.net.groups.GroupException;
017: import com.tc.net.groups.GroupManager;
018: import com.tc.net.groups.GroupMessage;
019: import com.tc.net.groups.GroupMessageListener;
020: import com.tc.net.groups.GroupResponse;
021: import com.tc.net.groups.NodeID;
022: import com.tc.net.protocol.tcm.ChannelID;
023: import com.tc.net.protocol.transport.ConnectionID;
024: import com.tc.net.protocol.transport.ConnectionIDFactory;
025: import com.tc.net.protocol.transport.ConnectionIDFactoryListener;
026: import com.tc.objectserver.context.NodeStateEventContext;
027: import com.tc.util.Assert;
028: import com.tc.util.UUID;
029:
030: import java.util.Iterator;
031:
032: public class ReplicatedClusterStateManagerImpl implements
033: ReplicatedClusterStateManager, GroupMessageListener,
034: ConnectionIDFactoryListener {
035:
036: private static final TCLogger logger = TCLogging
037: .getLogger(ReplicatedClusterStateManagerImpl.class);
038:
039: private final GroupManager groupManager;
040: private final ClusterState state;
041: private final StateManager stateManager;
042: private final Sink channelLifeCycleSink;
043:
044: private boolean isActive = false;
045:
046: public ReplicatedClusterStateManagerImpl(GroupManager groupManager,
047: StateManager stateManager, ClusterState clusterState,
048: ConnectionIDFactory factory, Sink channelLifeCycleSink) {
049: this .groupManager = groupManager;
050: this .stateManager = stateManager;
051: state = clusterState;
052: this .channelLifeCycleSink = channelLifeCycleSink;
053: groupManager.registerForMessages(ClusterStateMessage.class,
054: this );
055: factory.registerForConnectionIDEvents(this );
056: }
057:
058: public synchronized void goActiveAndSyncState() {
059: generateClusterIDIfNeeded();
060:
061: // Sync state to internal DB
062: state.syncInternal();
063:
064: // Sync state to external passive servers
065: publishToAll(ClusterStateMessageFactory
066: .createClusterStateMessage(state));
067:
068: isActive = true;
069: notifyAll();
070: }
071:
072: private void generateClusterIDIfNeeded() {
073: if (state.getClusterID() == null) {
074: // This is the first time an L2 goes active in the cluster of L2s. Generate a new clusterID. this will stick.
075: state.setClusterID(UUID.getUUID().toString());
076: }
077: }
078:
079: public synchronized void publishClusterState(NodeID nodeID)
080: throws GroupException {
081: waitUntilActive();
082: ClusterStateMessage msg = (ClusterStateMessage) groupManager
083: .sendToAndWaitForResponse(nodeID,
084: ClusterStateMessageFactory
085: .createClusterStateMessage(state));
086: validateResponse(nodeID, msg);
087: }
088:
089: private void waitUntilActive() {
090: while (!isActive) {
091: logger
092: .info("Waiting since ReplicatedClusterStateManager hasn't gone ACTIVE yet ...");
093: try {
094: wait(3000);
095: } catch (InterruptedException e) {
096: throw new TCRuntimeException(e);
097: }
098: }
099: }
100:
101: private void validateResponse(NodeID nodeID, ClusterStateMessage msg) {
102: if (msg == null
103: || msg.getType() != ClusterStateMessage.OPERATION_SUCCESS) {
104: logger
105: .error("Recd wrong response from : "
106: + nodeID
107: + " : msg = "
108: + msg
109: + " while publishing Cluster State: Killing the node");
110: groupManager
111: .zapNode(
112: nodeID,
113: (msg != null
114: && msg.getType() == ClusterStateMessage.OPERATION_FAILED_SPLIT_BRAIN ? L2HAZapNodeRequestProcessor.SPLIT_BRAIN
115: : L2HAZapNodeRequestProcessor.PROGRAM_ERROR),
116: "Recd wrong response from : "
117: + nodeID
118: + " while publishing Cluster State"
119: + L2HAZapNodeRequestProcessor
120: .getErrorString(new Throwable()));
121: }
122: }
123:
124: // TODO:: Sync only once a while to the passives
125: public synchronized void publishNextAvailableObjectID(long minID) {
126: state.setNextAvailableObjectID(minID);
127: publishToAll(ClusterStateMessageFactory
128: .createNextAvailableObjectIDMessage(state));
129: }
130:
131: // TODO:: Sync only once a while to the passives
132: public void publishNextAvailableGlobalTransactionID(long minID) {
133: state.setNextAvailableGlobalTransactionID(minID);
134: publishToAll(ClusterStateMessageFactory
135: .createNextAvailableGlobalTransactionIDMessage(state));
136: }
137:
138: public synchronized void connectionIDCreated(
139: ConnectionID connectionID) {
140: Assert.assertTrue(stateManager.isActiveCoordinator());
141: state.addNewConnection(connectionID);
142: publishToAll(ClusterStateMessageFactory
143: .createNewConnectionCreatedMessage(connectionID));
144: }
145:
146: public synchronized void connectionIDDestroyed(
147: ConnectionID connectionID) {
148: Assert.assertTrue(stateManager.isActiveCoordinator());
149: state.removeConnection(connectionID);
150: publishToAll(ClusterStateMessageFactory
151: .createConnectionDestroyedMessage(connectionID));
152: }
153:
154: private void publishToAll(GroupMessage message) {
155: try {
156: GroupResponse gr = groupManager
157: .sendAllAndWaitForResponse(message);
158: for (Iterator i = gr.getResponses().iterator(); i.hasNext();) {
159: ClusterStateMessage msg = (ClusterStateMessage) i
160: .next();
161: validateResponse(msg.messageFrom(), msg);
162: }
163: } catch (GroupException e) {
164: // TODO:: Is this extreme ?
165: throw new AssertionError(e);
166: }
167: }
168:
169: public void messageReceived(NodeID fromNode, GroupMessage msg) {
170: if (msg instanceof ClusterStateMessage) {
171: ClusterStateMessage clusterMsg = (ClusterStateMessage) msg;
172: handleClusterStateMessage(fromNode, clusterMsg);
173: } else {
174: throw new AssertionError(
175: "ReplicatedClusterStateManagerImpl : Received wrong message type :"
176: + msg.getClass().getName() + " : " + msg);
177: }
178: }
179:
180: private void handleClusterStateMessage(NodeID fromNode,
181: ClusterStateMessage msg) {
182: if (stateManager.isActiveCoordinator()) {
183: logger
184: .warn("Recd ClusterStateMessage from "
185: + fromNode
186: + " while I am the cluster co-ordinator. This is bad. Sending NG response. ");
187: sendNGSplitBrainResponse(fromNode, msg);
188: groupManager.zapNode(fromNode,
189: L2HAZapNodeRequestProcessor.SPLIT_BRAIN,
190: "Recd ClusterStateMessage from : "
191: + fromNode
192: + " while in ACTIVE-COORDINATOR state"
193: + L2HAZapNodeRequestProcessor
194: .getErrorString(new Throwable()));
195: } else {
196: msg.initState(state);
197: sendChannelLifeCycleEventsIfNecessary(msg);
198: sendOKResponse(fromNode, msg);
199: }
200: }
201:
202: private void sendChannelLifeCycleEventsIfNecessary(
203: ClusterStateMessage msg) {
204: if (msg.getType() == ClusterStateMessage.NEW_CONNECTION_CREATED) {
205: // Not really needed, but just in case
206: channelLifeCycleSink.add(new NodeStateEventContext(
207: NodeStateEventContext.CREATE, new ClientID(
208: new ChannelID(msg.getConnectionID()
209: .getChannelID()))));
210: } else if (msg.getType() == ClusterStateMessage.CONNECTION_DESTROYED) {
211: // this is needed to clean up some data structures internally
212: // NOTE :: It is ok to add this event context directly to the channel life cycle handler (and not wrap around a
213: // InBandMoveToNextSink like in active) because there are no stages before the transactions are added to
214: // server transaction manager.
215: // XXX::FIXME:: The above statement is true only when this event is fixed to be fired from active after all txns
216: // are acked in the active.
217: channelLifeCycleSink.add(new NodeStateEventContext(
218: NodeStateEventContext.REMOVE, new ClientID(
219: new ChannelID(msg.getConnectionID()
220: .getChannelID()))));
221: }
222: }
223:
224: private void sendOKResponse(NodeID fromNode, ClusterStateMessage msg) {
225: try {
226: groupManager.sendTo(fromNode, ClusterStateMessageFactory
227: .createOKResponse(msg));
228: } catch (GroupException e) {
229: logger.error("Error handling message : " + msg, e);
230: }
231: }
232:
233: private void sendNGSplitBrainResponse(NodeID fromNode,
234: ClusterStateMessage msg) {
235: try {
236: groupManager.sendTo(fromNode, ClusterStateMessageFactory
237: .createNGSplitBrainResponse(msg));
238: } catch (GroupException e) {
239: logger.error("Error handling message : " + msg, e);
240: }
241: }
242:
243: public void fireNodeLeftEvent(NodeID nodeID) {
244: // this is needed to clean up some data structures internally
245: channelLifeCycleSink.add(new NodeStateEventContext(
246: NodeStateEventContext.REMOVE, nodeID));
247: }
248: }
|