001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.objectserver;
006:
007: import com.tc.async.api.Sink;
008: import com.tc.l2.context.SyncObjectsRequest;
009: import com.tc.l2.ha.L2HAZapNodeRequestProcessor;
010: import com.tc.l2.msg.GCResultMessage;
011: import com.tc.l2.msg.GCResultMessageFactory;
012: import com.tc.l2.msg.ObjectListSyncMessage;
013: import com.tc.l2.msg.ObjectListSyncMessageFactory;
014: import com.tc.l2.msg.ObjectSyncCompleteMessage;
015: import com.tc.l2.msg.ObjectSyncCompleteMessageFactory;
016: import com.tc.l2.state.StateManager;
017: import com.tc.logging.TCLogger;
018: import com.tc.logging.TCLogging;
019: import com.tc.net.groups.GroupException;
020: import com.tc.net.groups.GroupManager;
021: import com.tc.net.groups.GroupMessage;
022: import com.tc.net.groups.GroupMessageListener;
023: import com.tc.net.groups.GroupResponse;
024: import com.tc.net.groups.NodeID;
025: import com.tc.objectserver.api.GCStats;
026: import com.tc.objectserver.api.ObjectManager;
027: import com.tc.objectserver.api.ObjectManagerEventListener;
028: import com.tc.objectserver.tx.ServerTransactionManager;
029: import com.tc.objectserver.tx.TxnsInSystemCompletionLister;
030: import com.tc.util.Assert;
031: import com.tc.util.sequence.SequenceGenerator;
032: import com.tc.util.sequence.SequenceGenerator.SequenceGeneratorException;
033:
034: import java.util.HashMap;
035: import java.util.Iterator;
036: import java.util.LinkedHashMap;
037: import java.util.Map;
038: import java.util.Set;
039: import java.util.Map.Entry;
040:
041: public class ReplicatedObjectManagerImpl implements
042: ReplicatedObjectManager, GroupMessageListener,
043: L2ObjectStateListener {
044:
045: private static final TCLogger logger = TCLogging
046: .getLogger(ReplicatedObjectManagerImpl.class);
047:
048: private final ObjectManager objectManager;
049: private final GroupManager groupManager;
050: private final StateManager stateManager;
051: private final L2ObjectStateManager l2ObjectStateManager;
052: private final ReplicatedTransactionManager rTxnManager;
053: private final ServerTransactionManager transactionManager;
054: private final Sink objectsSyncRequestSink;
055: private final SequenceGenerator sequenceGenerator;
056: private final GCMonitor gcMonitor;
057:
058: public ReplicatedObjectManagerImpl(GroupManager groupManager,
059: StateManager stateManager,
060: L2ObjectStateManager l2ObjectStateManager,
061: ReplicatedTransactionManager txnManager,
062: ObjectManager objectManager,
063: ServerTransactionManager transactionManager,
064: Sink objectsSyncRequestSink,
065: SequenceGenerator sequenceGenerator) {
066: this .groupManager = groupManager;
067: this .stateManager = stateManager;
068: this .rTxnManager = txnManager;
069: this .objectManager = objectManager;
070: this .transactionManager = transactionManager;
071: this .objectsSyncRequestSink = objectsSyncRequestSink;
072: this .l2ObjectStateManager = l2ObjectStateManager;
073: this .sequenceGenerator = sequenceGenerator;
074: this .gcMonitor = new GCMonitor();
075: this .objectManager.getGarbageCollector().addListener(gcMonitor);
076: l2ObjectStateManager.registerForL2ObjectStateChangeEvents(this );
077: this .groupManager.registerForMessages(
078: ObjectListSyncMessage.class, this );
079: }
080:
081: /**
082: * This method is used to sync up all ObjectIDs from the remote ObjectManagers. It is synchronous and after when it
083: * returns nobody is allowed to join the cluster with exisiting objects.
084: */
085: public void sync() {
086: try {
087: GroupResponse gr = groupManager
088: .sendAllAndWaitForResponse(ObjectListSyncMessageFactory
089: .createObjectListSyncRequestMessage());
090: Map nodeID2ObjectIDs = new LinkedHashMap();
091: for (Iterator i = gr.getResponses().iterator(); i.hasNext();) {
092: ObjectListSyncMessage msg = (ObjectListSyncMessage) i
093: .next();
094: if (msg.getType() == ObjectListSyncMessage.RESPONSE) {
095: nodeID2ObjectIDs.put(msg.messageFrom(), msg
096: .getObjectIDs());
097: } else {
098: logger
099: .error("Received wrong response for ObjectListSyncMessage Request from "
100: + msg.messageFrom()
101: + " : msg : "
102: + msg);
103: groupManager
104: .zapNode(
105: msg.messageFrom(),
106: L2HAZapNodeRequestProcessor.PROGRAM_ERROR,
107: "Recd wrong response from : "
108: + msg.messageFrom()
109: + " for ObjectListSyncMessage Request"
110: + L2HAZapNodeRequestProcessor
111: .getErrorString(new Throwable()));
112: }
113: }
114: if (!nodeID2ObjectIDs.isEmpty()) {
115: gcMonitor
116: .disableAndAdd2L2StateManager(nodeID2ObjectIDs);
117: }
118: } catch (GroupException e) {
119: logger.error(e);
120: throw new AssertionError(e);
121: }
122: }
123:
124: // Query current state of the other L2
125: public void query(NodeID nodeID) throws GroupException {
126: groupManager.sendTo(nodeID, ObjectListSyncMessageFactory
127: .createObjectListSyncRequestMessage());
128: }
129:
130: public void clear(NodeID nodeID) {
131: l2ObjectStateManager.removeL2(nodeID);
132: gcMonitor.clear(nodeID);
133: }
134:
135: public void messageReceived(NodeID fromNode, GroupMessage msg) {
136: if (msg instanceof ObjectListSyncMessage) {
137: ObjectListSyncMessage clusterMsg = (ObjectListSyncMessage) msg;
138: handleClusterObjectMessage(fromNode, clusterMsg);
139: } else {
140: throw new AssertionError(
141: "ReplicatedObjectManagerImpl : Received wrong message type :"
142: + msg.getClass().getName() + " : " + msg);
143:
144: }
145: }
146:
147: public void handleGCResult(GCResultMessage gcMsg) {
148: Set gcedOids = gcMsg.getGCedObjectIDs();
149: if (stateManager.isActiveCoordinator()) {
150: logger.warn("Received GC Result from "
151: + gcMsg.messageFrom()
152: + " While this node is ACTIVE. Ignoring result : "
153: + gcedOids.size());
154: return;
155: }
156: objectManager.notifyGCComplete(gcedOids);
157: logger
158: .info("Removed "
159: + gcedOids.size()
160: + "objects from passive ObjectManager from last GC from Active");
161: }
162:
163: private void handleClusterObjectMessage(NodeID nodeID,
164: ObjectListSyncMessage clusterMsg) {
165: try {
166: switch (clusterMsg.getType()) {
167: case ObjectListSyncMessage.REQUEST:
168: handleObjectListRequest(nodeID, clusterMsg);
169: break;
170: case ObjectListSyncMessage.RESPONSE:
171: handleObjectListResponse(nodeID, clusterMsg);
172: break;
173:
174: default:
175: throw new AssertionError(
176: "This message shouldn't have been routed here : "
177: + clusterMsg);
178: }
179: } catch (GroupException e) {
180: logger.error("Error handling message : " + clusterMsg, e);
181: throw new AssertionError(e);
182: }
183: }
184:
185: private void handleObjectListResponse(NodeID nodeID,
186: ObjectListSyncMessage clusterMsg) {
187: Assert.assertTrue(stateManager.isActiveCoordinator());
188: Set oids = clusterMsg.getObjectIDs();
189: if (!oids.isEmpty()) {
190: String error = "Nodes joining the cluster after startup shouldnt have any Objects. "
191: + nodeID
192: + " contains "
193: + oids.size()
194: + " Objects !!!";
195: logger.error(error + " Forcing node to Quit !!");
196: groupManager
197: .zapNode(
198: nodeID,
199: L2HAZapNodeRequestProcessor.NODE_JOINED_WITH_DIRTY_DB,
200: error
201: + L2HAZapNodeRequestProcessor
202: .getErrorString(new Throwable()));
203: } else {
204: gcMonitor.add2L2StateManagerWhenGCDisabled(nodeID, oids);
205: }
206: }
207:
208: private boolean add2L2StateManager(NodeID nodeID, Set oids) {
209: return l2ObjectStateManager.addL2(nodeID, oids);
210: }
211:
212: public void missingObjectsFor(NodeID nodeID, int missingObjects) {
213: if (missingObjects == 0) {
214: stateManager.moveNodeToPassiveStandby(nodeID);
215: gcMonitor.syncCompleteFor(nodeID);
216: } else {
217: objectsSyncRequestSink.add(new SyncObjectsRequest(nodeID));
218: }
219: }
220:
221: public void objectSyncCompleteFor(NodeID nodeID) {
222: try {
223: gcMonitor.syncCompleteFor(nodeID);
224: ObjectSyncCompleteMessage msg = ObjectSyncCompleteMessageFactory
225: .createObjectSyncCompleteMessageFor(nodeID,
226: sequenceGenerator.getNextSequence(nodeID));
227: groupManager.sendTo(nodeID, msg);
228: } catch (GroupException e) {
229: logger.error(
230: "Error Sending Object Sync complete message to : "
231: + nodeID, e);
232: groupManager.zapNode(nodeID,
233: L2HAZapNodeRequestProcessor.COMMUNICATION_ERROR,
234: "Error sending Object Sync complete message "
235: + L2HAZapNodeRequestProcessor
236: .getErrorString(e));
237: } catch (SequenceGeneratorException e) {
238: logger.error(
239: "Error Sending Object Sync complete message to : "
240: + nodeID, e);
241: }
242: }
243:
244: /**
245: * ACTIVE queries PASSIVES for the list of known object ids and this response is the one that opens up the
246: * transactions from ACTIVE to PASSIVE. So the replicated transaction manager is initialized here.
247: */
248: private void handleObjectListRequest(NodeID nodeID,
249: ObjectListSyncMessage clusterMsg) throws GroupException {
250: if (!stateManager.isActiveCoordinator()) {
251: Set knownIDs = objectManager.getAllObjectIDs();
252: rTxnManager.init(knownIDs);
253: logger
254: .info("Send response to Active's query : known id lists = "
255: + knownIDs.size());
256: groupManager.sendTo(nodeID, ObjectListSyncMessageFactory
257: .createObjectListSyncResponseMessage(clusterMsg,
258: knownIDs));
259: } else {
260: logger
261: .error("Recd. ObjectListRequest when in ACTIVE state from "
262: + nodeID + ". Zapping node ...");
263: groupManager
264: .sendTo(
265: nodeID,
266: ObjectListSyncMessageFactory
267: .createObjectListSyncFailedResponseMessage(clusterMsg));
268: // Now ZAP the node
269: groupManager.zapNode(nodeID,
270: L2HAZapNodeRequestProcessor.SPLIT_BRAIN,
271: "Recd ObjectListRequest from : "
272: + nodeID
273: + " while in ACTIVE-COORDINATOR state"
274: + L2HAZapNodeRequestProcessor
275: .getErrorString(new Throwable()));
276: }
277: }
278:
279: public boolean relayTransactions() {
280: return l2ObjectStateManager.getL2Count() > 0;
281: }
282:
283: private static final Object ADDED = new Object();
284:
285: private final class GCMonitor implements ObjectManagerEventListener {
286:
287: boolean disabled = false;
288: Map syncingPassives = new HashMap();
289:
290: public void garbageCollectionComplete(GCStats stats, Set deleted) {
291: Map toAdd = null;
292: notifyGCResultToPassives(deleted);
293: synchronized (this ) {
294: if (syncingPassives.isEmpty())
295: return;
296: toAdd = new LinkedHashMap();
297: for (Iterator i = syncingPassives.entrySet().iterator(); i
298: .hasNext();) {
299: Entry e = (Entry) i.next();
300: if (e.getValue() != ADDED) {
301: NodeID nodeID = (NodeID) e.getKey();
302: logger
303: .info("GC Completed : Starting scheduled passive sync for "
304: + nodeID);
305: disableGCIfNecessary();
306: // Shouldn't happen as this is in GC call back after GC completion
307: assertGCDisabled();
308: toAdd.put(nodeID, e.getValue());
309: e.setValue(ADDED);
310: }
311: }
312: }
313: add2L2StateManager(toAdd);
314: }
315:
316: private void notifyGCResultToPassives(Set deleted) {
317: if (deleted.isEmpty())
318: return;
319: final GCResultMessage msg = GCResultMessageFactory
320: .createGCResultMessage(deleted);
321: transactionManager
322: .callBackOnTxnsInSystemCompletion(new TxnsInSystemCompletionLister() {
323: public void onCompletion() {
324: try {
325: groupManager.sendAll(msg);
326: } catch (GroupException e) {
327: logger.error(
328: "Error sending gc results : ",
329: e);
330: }
331: }
332: });
333: }
334:
335: private void add2L2StateManager(Map toAdd) {
336: for (Iterator i = toAdd.entrySet().iterator(); i.hasNext();) {
337: Entry e = (Entry) i.next();
338: NodeID nodeID = (NodeID) e.getKey();
339: if (!ReplicatedObjectManagerImpl.this
340: .add2L2StateManager(nodeID, (Set) e.getValue())) {
341: logger
342: .warn(nodeID
343: + " is already added to L2StateManager, clearing our internal data structures.");
344: syncCompleteFor(nodeID);
345: }
346: }
347: }
348:
349: private void disableGCIfNecessary() {
350: if (!disabled) {
351: disabled = objectManager.getGarbageCollector()
352: .disableGC();
353: }
354: }
355:
356: private void assertGCDisabled() {
357: if (!disabled) {
358: throw new AssertionError("Cant disable GC");
359: }
360: }
361:
362: public void add2L2StateManagerWhenGCDisabled(NodeID nodeID,
363: Set oids) {
364: boolean toAdd = false;
365: synchronized (this ) {
366: disableGCIfNecessary();
367: if (syncingPassives.containsKey(nodeID)) {
368: logger
369: .warn("Not adding "
370: + nodeID
371: + " since it is already present in syncingPassives : "
372: + syncingPassives.keySet());
373: return;
374: }
375: if (disabled) {
376: syncingPassives.put(nodeID, ADDED);
377: toAdd = true;
378: } else {
379: logger
380: .info("Couldnt disable GC, probably because GC is currently running. So scheduling passive sync up for later after GC completion");
381: syncingPassives.put(nodeID, oids);
382: }
383: }
384: if (toAdd) {
385: if (!ReplicatedObjectManagerImpl.this
386: .add2L2StateManager(nodeID, oids)) {
387: logger
388: .warn(nodeID
389: + " is already added to L2StateManager, clearing our internal data structures.");
390: syncCompleteFor(nodeID);
391: }
392: }
393: }
394:
395: public synchronized void clear(NodeID nodeID) {
396: Object val = syncingPassives.remove(nodeID);
397: if (val != null) {
398: enableGCIfNecessary();
399: }
400: }
401:
402: private void enableGCIfNecessary() {
403: if (syncingPassives.isEmpty() && disabled) {
404: logger
405: .info("Reenabling GC as all passive are synced up");
406: objectManager.getGarbageCollector().enableGC();
407: disabled = false;
408: }
409: }
410:
411: public synchronized void syncCompleteFor(NodeID nodeID) {
412: Object val = syncingPassives.remove(nodeID);
413: // val could be null if the node disconnects before fully synching up.
414: Assert.assertTrue(val == ADDED || val == null);
415: if (val != null) {
416: Assert.assertTrue(disabled);
417: enableGCIfNecessary();
418: }
419: }
420:
421: public synchronized void disableAndAdd2L2StateManager(
422: Map nodeID2ObjectIDs) {
423: synchronized (this ) {
424: if (nodeID2ObjectIDs.size() > 0 && !disabled) {
425: logger.info("Disabling GC since "
426: + nodeID2ObjectIDs.size() + " passives ["
427: + nodeID2ObjectIDs.keySet()
428: + "] needs to sync up");
429: disableGCIfNecessary();
430: // Shouldnt happen as GC should be running yet. We havent started yet.
431: assertGCDisabled();
432: }
433: for (Iterator i = nodeID2ObjectIDs.entrySet()
434: .iterator(); i.hasNext();) {
435: Entry e = (Entry) i.next();
436: NodeID nodeID = (NodeID) e.getKey();
437: if (!syncingPassives.containsKey(nodeID)) {
438: syncingPassives.put(nodeID, ADDED);
439: } else {
440: logger
441: .info("Removing "
442: + e
443: + " from the list to add to L2ObjectStateManager since its present in syncingPassives : "
444: + syncingPassives.keySet());
445: i.remove();
446: }
447: }
448: }
449: add2L2StateManager(nodeID2ObjectIDs);
450: }
451:
452: }
453: }
|