001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.ha;
006:
007: import com.tc.async.api.Sink;
008: import com.tc.async.api.StageManager;
009: import com.tc.async.impl.OrderedSink;
010: import com.tc.config.schema.NewHaConfig;
011: import com.tc.l2.api.L2Coordinator;
012: import com.tc.l2.api.ReplicatedClusterStateManager;
013: import com.tc.l2.context.StateChangedEvent;
014: import com.tc.l2.ha.WeightGeneratorFactory.WeightGenerator;
015: import com.tc.l2.handler.GCResultHandler;
016: import com.tc.l2.handler.GroupEventsDispatchHandler;
017: import com.tc.l2.handler.L2ObjectSyncDehydrateHandler;
018: import com.tc.l2.handler.L2ObjectSyncHandler;
019: import com.tc.l2.handler.L2ObjectSyncRequestHandler;
020: import com.tc.l2.handler.L2ObjectSyncSendHandler;
021: import com.tc.l2.handler.L2StateChangeHandler;
022: import com.tc.l2.handler.L2StateMessageHandler;
023: import com.tc.l2.handler.ServerTransactionAckHandler;
024: import com.tc.l2.handler.TransactionRelayHandler;
025: import com.tc.l2.handler.GroupEventsDispatchHandler.GroupEventsDispatcher;
026: import com.tc.l2.msg.GCResultMessage;
027: import com.tc.l2.msg.L2StateMessage;
028: import com.tc.l2.msg.ObjectSyncCompleteMessage;
029: import com.tc.l2.msg.ObjectSyncMessage;
030: import com.tc.l2.msg.RelayedCommitTransactionMessage;
031: import com.tc.l2.msg.ServerTxnAckMessage;
032: import com.tc.l2.objectserver.L2ObjectStateManager;
033: import com.tc.l2.objectserver.L2ObjectStateManagerImpl;
034: import com.tc.l2.objectserver.ReplicatedObjectManager;
035: import com.tc.l2.objectserver.ReplicatedObjectManagerImpl;
036: import com.tc.l2.objectserver.ReplicatedTransactionManager;
037: import com.tc.l2.objectserver.ReplicatedTransactionManagerImpl;
038: import com.tc.l2.state.StateChangeListener;
039: import com.tc.l2.state.StateManager;
040: import com.tc.l2.state.StateManagerConfigImpl;
041: import com.tc.l2.state.StateManagerImpl;
042: import com.tc.logging.TCLogger;
043: import com.tc.logging.TCLogging;
044: import com.tc.net.groups.GroupEventsListener;
045: import com.tc.net.groups.GroupException;
046: import com.tc.net.groups.GroupManager;
047: import com.tc.net.groups.GroupManagerFactory;
048: import com.tc.net.groups.Node;
049: import com.tc.net.groups.NodeID;
050: import com.tc.object.net.DSOChannelManager;
051: import com.tc.objectserver.api.ObjectManager;
052: import com.tc.objectserver.core.api.ServerConfigurationContext;
053: import com.tc.objectserver.gtx.ServerGlobalTransactionManager;
054: import com.tc.objectserver.impl.DistributedObjectServer;
055: import com.tc.objectserver.persistence.api.PersistentMapStore;
056: import com.tc.objectserver.tx.ServerTransactionManager;
057: import com.tc.util.sequence.SequenceGenerator;
058: import com.tc.util.sequence.SequenceGenerator.SequenceGeneratorException;
059: import com.tc.util.sequence.SequenceGenerator.SequenceGeneratorListener;
060:
061: import java.io.IOException;
062:
063: public class L2HACoordinator implements L2Coordinator,
064: StateChangeListener, GroupEventsListener,
065: SequenceGeneratorListener {
066:
067: private static final TCLogger logger = TCLogging
068: .getLogger(L2HACoordinator.class);
069:
070: private final TCLogger consoleLogger;
071: private final DistributedObjectServer server;
072:
073: private GroupManager groupManager;
074: private StateManager stateManager;
075: private ReplicatedObjectManager rObjectManager;
076: private ReplicatedTransactionManager rTxnManager;
077: private L2ObjectStateManager l2ObjectStateManager;
078: private ReplicatedClusterStateManager rClusterStateMgr;
079:
080: private ClusterState clusterState;
081: private SequenceGenerator sequenceGenerator;
082:
083: private NewHaConfig haConfig;
084:
085: public L2HACoordinator(TCLogger consoleLogger,
086: DistributedObjectServer server, StageManager stageManager,
087: PersistentMapStore clusterStateStore,
088: ObjectManager objectManager,
089: ServerTransactionManager transactionManager,
090: ServerGlobalTransactionManager gtxm,
091: DSOChannelManager channelManager, NewHaConfig haConfig) {
092: this .consoleLogger = consoleLogger;
093: this .server = server;
094: this .haConfig = haConfig;
095:
096: init(stageManager, clusterStateStore, objectManager,
097: transactionManager, gtxm, channelManager);
098: }
099:
100: private void init(StageManager stageManager,
101: PersistentMapStore clusterStateStore,
102: ObjectManager objectManager,
103: ServerTransactionManager transactionManager,
104: ServerGlobalTransactionManager gtxm,
105: DSOChannelManager channelManager) {
106: try {
107: basicInit(stageManager, clusterStateStore, objectManager,
108: transactionManager, gtxm, channelManager);
109: } catch (GroupException e) {
110: logger.error(e);
111: throw new AssertionError(e);
112: }
113: }
114:
115: private void basicInit(StageManager stageManager,
116: PersistentMapStore clusterStateStore,
117: ObjectManager objectManager,
118: ServerTransactionManager transactionManager,
119: ServerGlobalTransactionManager gtxm,
120: DSOChannelManager channelManager) throws GroupException {
121:
122: this .clusterState = new ClusterState(clusterStateStore, server
123: .getManagedObjectStore(), server
124: .getConnectionIdFactory(), gtxm
125: .getGlobalTransactionIDSequenceProvider());
126:
127: final Sink stateChangeSink = stageManager.createStage(
128: ServerConfigurationContext.L2_STATE_CHANGE_STAGE,
129: new L2StateChangeHandler(), 1, Integer.MAX_VALUE)
130: .getSink();
131: this .groupManager = GroupManagerFactory.createGroupManager();
132:
133: this .stateManager = new StateManagerImpl(consoleLogger,
134: groupManager, stateChangeSink,
135: new StateManagerConfigImpl(haConfig),
136: createWeightGeneratorFactoryForStateManager(gtxm));
137: this .stateManager.registerForStateChangeEvents(this );
138:
139: this .l2ObjectStateManager = new L2ObjectStateManagerImpl(
140: objectManager, transactionManager);
141: this .sequenceGenerator = new SequenceGenerator(this );
142:
143: L2HAZapNodeRequestProcessor zapProcessor = new L2HAZapNodeRequestProcessor(
144: consoleLogger,
145: stateManager,
146: groupManager,
147: createWeightGeneratorFactoryForZapNodeProcessor(channelManager));
148: this .groupManager.setZapNodeRequestProcessor(zapProcessor);
149:
150: final Sink objectsSyncRequestSink = stageManager.createStage(
151: ServerConfigurationContext.OBJECTS_SYNC_REQUEST_STAGE,
152: new L2ObjectSyncRequestHandler(
153: this .l2ObjectStateManager), 1,
154: Integer.MAX_VALUE).getSink();
155: final Sink objectsSyncSink = stageManager.createStage(
156: ServerConfigurationContext.OBJECTS_SYNC_STAGE,
157: new L2ObjectSyncHandler(), 1, Integer.MAX_VALUE)
158: .getSink();
159: stageManager
160: .createStage(
161: ServerConfigurationContext.OBJECTS_SYNC_DEHYDRATE_STAGE,
162: new L2ObjectSyncDehydrateHandler(
163: this .sequenceGenerator), 1,
164: Integer.MAX_VALUE);
165: stageManager.createStage(
166: ServerConfigurationContext.OBJECTS_SYNC_SEND_STAGE,
167: new L2ObjectSyncSendHandler(this .l2ObjectStateManager),
168: 1, Integer.MAX_VALUE);
169: stageManager.createStage(
170: ServerConfigurationContext.TRANSACTION_RELAY_STAGE,
171: new TransactionRelayHandler(this .l2ObjectStateManager,
172: this .sequenceGenerator, gtxm), 1,
173: Integer.MAX_VALUE);
174: final Sink ackProcessingStage = stageManager
175: .createStage(
176: ServerConfigurationContext.SERVER_TRANSACTION_ACK_PROCESSING_STAGE,
177: new ServerTransactionAckHandler(), 1,
178: Integer.MAX_VALUE).getSink();
179: final Sink stateMessageStage = stageManager
180: .createStage(
181: ServerConfigurationContext.L2_STATE_MESSAGE_HANDLER_STAGE,
182: new L2StateMessageHandler(), 1,
183: Integer.MAX_VALUE).getSink();
184: final Sink gcResultStage = stageManager.createStage(
185: ServerConfigurationContext.GC_RESULT_PROCESSING_STAGE,
186: new GCResultHandler(), 1, Integer.MAX_VALUE).getSink();
187:
188: this .rClusterStateMgr = new ReplicatedClusterStateManagerImpl(
189: groupManager,
190: stateManager,
191: clusterState,
192: server.getConnectionIdFactory(),
193: stageManager
194: .getStage(
195: ServerConfigurationContext.CHANNEL_LIFE_CYCLE_STAGE)
196: .getSink());
197:
198: OrderedSink orderedObjectsSyncSink = new OrderedSink(logger,
199: objectsSyncSink);
200: this .rTxnManager = new ReplicatedTransactionManagerImpl(
201: groupManager, orderedObjectsSyncSink,
202: transactionManager, gtxm);
203:
204: this .rObjectManager = new ReplicatedObjectManagerImpl(
205: groupManager, stateManager, l2ObjectStateManager,
206: rTxnManager, objectManager, transactionManager,
207: objectsSyncRequestSink, sequenceGenerator);
208:
209: this .groupManager.routeMessages(ObjectSyncMessage.class,
210: orderedObjectsSyncSink);
211: this .groupManager
212: .routeMessages(ObjectSyncCompleteMessage.class,
213: orderedObjectsSyncSink);
214: this .groupManager.routeMessages(
215: RelayedCommitTransactionMessage.class,
216: orderedObjectsSyncSink);
217: this .groupManager.routeMessages(ServerTxnAckMessage.class,
218: ackProcessingStage);
219: this .groupManager.routeMessages(L2StateMessage.class,
220: stateMessageStage);
221: this .groupManager.routeMessages(GCResultMessage.class,
222: gcResultStage);
223:
224: final Sink groupEventsSink = stageManager.createStage(
225: ServerConfigurationContext.GROUP_EVENTS_DISPATCH_STAGE,
226: new GroupEventsDispatchHandler(this ), 1,
227: Integer.MAX_VALUE).getSink();
228: GroupEventsDispatcher dispatcher = new GroupEventsDispatcher(
229: groupEventsSink);
230: groupManager.registerForGroupEvents(dispatcher);
231: }
232:
233: private WeightGeneratorFactory createWeightGeneratorFactoryForZapNodeProcessor(
234: final DSOChannelManager channelManager) {
235: WeightGeneratorFactory wgf = new WeightGeneratorFactory();
236: wgf.add(new WeightGenerator() {
237: public long getWeight() {
238: // return number of connected clients
239: return channelManager.getAllActiveClientIDs().size();
240: }
241: });
242: return wgf;
243: }
244:
245: private WeightGeneratorFactory createWeightGeneratorFactoryForStateManager(
246: ServerGlobalTransactionManager gtxm) {
247: WeightGeneratorFactory wgf = new WeightGeneratorFactory();
248: // TODO::FIXME :: this is probably not the right thing to do since a runnign active might have current gid < curreng
249: // gid in a just turned active because of how things are wired.
250: //
251: // final Sequence gidSequence = gtxm.getGlobalTransactionIDSequence();
252: // wgf.add(new WeightGenerator() {
253: // public long getWeight() {
254: // return gidSequence.current();
255: // }
256: // });
257: wgf.add(WeightGeneratorFactory.RANDOM_WEIGHT_GENERATOR);
258: wgf.add(WeightGeneratorFactory.RANDOM_WEIGHT_GENERATOR);
259: return wgf;
260: }
261:
262: public void start(final Node this Node, final Node[] allNodes) {
263: NodeID myNodeId;
264: try {
265: myNodeId = groupManager.join(this Node, allNodes);
266: } catch (GroupException e) {
267: logger.error("Caught Exception :", e);
268: throw new AssertionError(e);
269: }
270: logger.info("This L2 Node ID = " + myNodeId);
271: stateManager.startElection();
272: }
273:
274: public StateManager getStateManager() {
275: return stateManager;
276: }
277:
278: public ReplicatedClusterStateManager getReplicatedClusterStateManager() {
279: return rClusterStateMgr;
280: }
281:
282: public ReplicatedObjectManager getReplicatedObjectManager() {
283: return rObjectManager;
284: }
285:
286: public ReplicatedTransactionManager getReplicatedTransactionManager() {
287: return rTxnManager;
288: }
289:
290: public GroupManager getGroupManager() {
291: return groupManager;
292: }
293:
294: public void l2StateChanged(StateChangedEvent sce) {
295: clusterState.setCurrentState(sce.getCurrentState());
296: rTxnManager.l2StateChanged(sce);
297: if (sce.movedToActive()) {
298: rClusterStateMgr.goActiveAndSyncState();
299: rObjectManager.sync();
300: try {
301: server.startActiveMode();
302: } catch (IOException e) {
303: throw new AssertionError(e);
304: }
305: } else {
306: // TODO:// handle
307: logger.info("Recd. " + sce + " ! Ignoring for now !!!!");
308: }
309: }
310:
311: public void nodeJoined(NodeID nodeID) {
312: log(nodeID + " joined the cluster");
313: if (stateManager.isActiveCoordinator()) {
314: try {
315: stateManager.publishActiveState(nodeID);
316: rClusterStateMgr.publishClusterState(nodeID);
317: rObjectManager.query(nodeID);
318: } catch (GroupException ge) {
319: logger.error(
320: "Error publishing states to the newly joined node : "
321: + nodeID + " Zapping it : ", ge);
322: groupManager
323: .zapNode(
324: nodeID,
325: L2HAZapNodeRequestProcessor.COMMUNICATION_ERROR,
326: "Error publishing states to "
327: + nodeID
328: + L2HAZapNodeRequestProcessor
329: .getErrorString(ge));
330: }
331: }
332: }
333:
334: private void log(String message) {
335: logger.info(message);
336: consoleLogger.info(message);
337: }
338:
339: private void warn(String message) {
340: logger.warn(message);
341: consoleLogger.warn(message);
342: }
343:
344: public void nodeLeft(NodeID nodeID) {
345: warn(nodeID + " left the cluster");
346: if (stateManager.isActiveCoordinator()) {
347: rObjectManager.clear(nodeID);
348: rClusterStateMgr.fireNodeLeftEvent(nodeID);
349: } else {
350: stateManager.startElectionIfNecessary(nodeID);
351: }
352: this .sequenceGenerator.clearSequenceFor(nodeID);
353: }
354:
355: public void sequenceCreatedFor(Object key)
356: throws SequenceGeneratorException {
357: NodeID nodeID = (NodeID) key;
358: try {
359: rTxnManager.publishResetRequest(nodeID);
360: } catch (GroupException ge) {
361: logger.error(
362: "Error publishing reset counter request node : "
363: + nodeID + " Zapping it : ", ge);
364: groupManager.zapNode(nodeID,
365: L2HAZapNodeRequestProcessor.COMMUNICATION_ERROR,
366: "Error publishing reset counter for "
367: + nodeID
368: + L2HAZapNodeRequestProcessor
369: .getErrorString(ge));
370: throw new SequenceGeneratorException(ge);
371: }
372: }
373:
374: public void sequenceDestroyedFor(Object key) {
375: // NOP
376: }
377: }
|