001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.handler;
006:
007: import com.tc.async.api.AbstractEventHandler;
008: import com.tc.async.api.ConfigurationContext;
009: import com.tc.async.api.EventContext;
010: import com.tc.async.api.Sink;
011: import com.tc.l2.msg.ObjectSyncCompleteMessage;
012: import com.tc.l2.msg.ObjectSyncMessage;
013: import com.tc.l2.msg.RelayedCommitTransactionMessage;
014: import com.tc.l2.msg.ServerTxnAckMessage;
015: import com.tc.l2.msg.ServerTxnAckMessageFactory;
016: import com.tc.l2.objectserver.ReplicatedTransactionManager;
017: import com.tc.l2.objectserver.ServerTransactionFactory;
018: import com.tc.l2.state.StateManager;
019: import com.tc.logging.TCLogger;
020: import com.tc.logging.TCLogging;
021: import com.tc.object.gtx.GlobalTransactionID;
022: import com.tc.objectserver.core.api.ServerConfigurationContext;
023: import com.tc.objectserver.tx.ServerTransaction;
024: import com.tc.objectserver.tx.TransactionBatchReader;
025: import com.tc.objectserver.tx.TransactionBatchReaderFactory;
026:
027: import java.io.IOException;
028: import java.util.LinkedHashMap;
029: import java.util.Map;
030: import java.util.Set;
031:
032: public class L2ObjectSyncHandler extends AbstractEventHandler {
033:
034: private static final TCLogger logger = TCLogging
035: .getLogger(L2ObjectSyncHandler.class);
036:
037: private TransactionBatchReaderFactory batchReaderFactory;
038:
039: private Sink sendSink;
040: private ReplicatedTransactionManager rTxnManager;
041: private StateManager stateManager;
042:
043: public void handleEvent(EventContext context) {
044: if (context instanceof ObjectSyncMessage) {
045: ObjectSyncMessage syncMsg = (ObjectSyncMessage) context;
046: doSyncObjectsResponse(syncMsg);
047: } else if (context instanceof RelayedCommitTransactionMessage) {
048: RelayedCommitTransactionMessage commitMessage = (RelayedCommitTransactionMessage) context;
049: Set serverTxnIDs = processCommitTransactionMessage(commitMessage);
050: processTransactionLowWaterMark(commitMessage
051: .getLowGlobalTransactionIDWatermark());
052: ackTransactions(commitMessage, serverTxnIDs);
053: } else if (context instanceof ObjectSyncCompleteMessage) {
054: ObjectSyncCompleteMessage msg = (ObjectSyncCompleteMessage) context;
055: logger.info("Received ObjectSyncComplete Msg from : "
056: + msg.messageFrom() + " msg : " + msg);
057: // Now this node can move to Passive StandBy
058: stateManager.moveToPassiveStandbyState();
059: } else {
060: throw new AssertionError("Unknown context type : "
061: + context.getClass().getName() + " : " + context);
062: }
063: }
064:
065: private void processTransactionLowWaterMark(
066: GlobalTransactionID lowGlobalTransactionIDWatermark) {
067: // TODO:: This processing could be handled by another stage thread.
068: rTxnManager
069: .clearTransactionsBelowLowWaterMark(lowGlobalTransactionIDWatermark);
070: }
071:
072: // TODO:: Implement throttling between active/passive
073: private void ackTransactions(
074: RelayedCommitTransactionMessage commitMessage,
075: Set serverTxnIDs) {
076: ServerTxnAckMessage msg = ServerTxnAckMessageFactory
077: .createServerTxnAckMessage(commitMessage, serverTxnIDs);
078: sendSink.add(msg);
079: }
080:
081: // TODO::Recycle msg after use. NOTE:: If you are implementing recycling, checkout ReplicatedTransactionManager's
082: // PASSIVE-UNINITIALIZED pruned changes code. Messgaes may have to live longer than Txn acks.
083: private Set processCommitTransactionMessage(
084: RelayedCommitTransactionMessage commitMessage) {
085: try {
086: final TransactionBatchReader reader = batchReaderFactory
087: .newTransactionBatchReader(commitMessage);
088: ServerTransaction txn;
089: // XXX:: Order has to be maintained.
090: Map txns = new LinkedHashMap(reader.getNumTxns());
091: while ((txn = reader.getNextTransaction()) != null) {
092: txns.put(txn.getServerTransactionID(), txn);
093: }
094: rTxnManager.addCommitedTransactions(reader.getNodeID(),
095: txns.keySet(), txns.values());
096: return txns.keySet();
097: } catch (IOException e) {
098: throw new AssertionError(e);
099: }
100: }
101:
102: private void doSyncObjectsResponse(ObjectSyncMessage syncMsg) {
103: ServerTransaction txn = ServerTransactionFactory
104: .createTxnFrom(syncMsg);
105: rTxnManager.addObjectSyncTransaction(txn);
106: }
107:
108: public void initialize(ConfigurationContext context) {
109: super .initialize(context);
110: ServerConfigurationContext oscc = (ServerConfigurationContext) context;
111: this.batchReaderFactory = oscc
112: .getTransactionBatchReaderFactory();
113: this.rTxnManager = oscc.getL2Coordinator()
114: .getReplicatedTransactionManager();
115: this.stateManager = oscc.getL2Coordinator().getStateManager();
116: this.sendSink = oscc.getStage(
117: ServerConfigurationContext.OBJECTS_SYNC_SEND_STAGE)
118: .getSink();
119: }
120:
121: }
|