001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.handler;
006:
007: import com.tc.async.api.AbstractEventHandler;
008: import com.tc.async.api.ConfigurationContext;
009: import com.tc.async.api.EventContext;
010: import com.tc.async.api.Sink;
011: import com.tc.l2.api.L2Coordinator;
012: import com.tc.l2.context.ManagedObjectSyncContext;
013: import com.tc.l2.context.SyncObjectsRequest;
014: import com.tc.l2.ha.L2HAZapNodeRequestProcessor;
015: import com.tc.l2.msg.ObjectSyncMessage;
016: import com.tc.l2.msg.ObjectSyncMessageFactory;
017: import com.tc.l2.msg.ServerTxnAckMessage;
018: import com.tc.l2.objectserver.L2ObjectStateManager;
019: import com.tc.logging.TCLogger;
020: import com.tc.logging.TCLogging;
021: import com.tc.net.groups.GroupException;
022: import com.tc.net.groups.GroupManager;
023: import com.tc.objectserver.core.api.ServerConfigurationContext;
024:
025: public class L2ObjectSyncSendHandler extends AbstractEventHandler {
026:
027: private static final TCLogger logger = TCLogging
028: .getLogger(L2ObjectSyncSendHandler.class);
029:
030: private final L2ObjectStateManager objectStateManager;
031: private GroupManager groupManager;
032:
033: private Sink syncRequestSink;
034:
035: public L2ObjectSyncSendHandler(
036: L2ObjectStateManager objectStateManager) {
037: this .objectStateManager = objectStateManager;
038: }
039:
040: public void handleEvent(EventContext context) {
041: if (context instanceof ManagedObjectSyncContext) {
042: ManagedObjectSyncContext mosc = (ManagedObjectSyncContext) context;
043: if (sendObjects(mosc)) {
044: if (mosc.hasMore()) {
045: syncRequestSink.add(new SyncObjectsRequest(mosc
046: .getNodeID()));
047: }
048: }
049: } else if (context instanceof ServerTxnAckMessage) {
050: ServerTxnAckMessage txnMsg = (ServerTxnAckMessage) context;
051: sendAcks(txnMsg);
052: } else {
053: throw new AssertionError("Unknown context type : "
054: + context.getClass().getName() + " : " + context);
055: }
056: }
057:
058: private void sendAcks(ServerTxnAckMessage ackMsg) {
059: try {
060: this .groupManager.sendTo(ackMsg.getDestinationID(), ackMsg);
061: } catch (GroupException e) {
062: String error = "ERROR sending ACKS: Caught exception while sending message to ACTIVE";
063: logger.error(error, e);
064: // try Zapping the active server so that a split brain war is initiated, at least we won't hold the whole cluster
065: // down.
066: groupManager
067: .zapNode(
068: ackMsg.getDestinationID(),
069: L2HAZapNodeRequestProcessor.COMMUNICATION_TO_ACTIVE_ERROR,
070: error
071: + L2HAZapNodeRequestProcessor
072: .getErrorString(e));
073: }
074: }
075:
076: private boolean sendObjects(ManagedObjectSyncContext mosc) {
077: ObjectSyncMessage msg = ObjectSyncMessageFactory
078: .createObjectSyncMessageFrom(mosc);
079: try {
080: this .groupManager.sendTo(mosc.getNodeID(), msg);
081: logger.info("Sent " + mosc.getDNACount() + " objects to "
082: + mosc.getNodeID() + " roots = "
083: + mosc.getRootsMap().size());
084: objectStateManager.close(mosc);
085: return true;
086: } catch (GroupException e) {
087: logger.error("Removing " + mosc.getNodeID()
088: + " from group because of Exception :", e);
089: groupManager.zapNode(mosc.getNodeID(),
090: L2HAZapNodeRequestProcessor.COMMUNICATION_ERROR,
091: "Error sending objects."
092: + L2HAZapNodeRequestProcessor
093: .getErrorString(e));
094: return false;
095: }
096: }
097:
098: public void initialize(ConfigurationContext context) {
099: super .initialize(context);
100: ServerConfigurationContext oscc = (ServerConfigurationContext) context;
101: L2Coordinator l2Coordinator = oscc.getL2Coordinator();
102: this.groupManager = l2Coordinator.getGroupManager();
103: this.syncRequestSink = oscc.getStage(
104: ServerConfigurationContext.OBJECTS_SYNC_REQUEST_STAGE)
105: .getSink();
106: }
107:
108: }
|