001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.handler;
006:
007: import com.tc.async.api.AbstractEventHandler;
008: import com.tc.async.api.ConfigurationContext;
009: import com.tc.async.api.EventContext;
010: import com.tc.async.api.Sink;
011: import com.tc.async.api.Stage;
012: import com.tc.l2.context.IncomingTransactionContext;
013: import com.tc.l2.objectserver.ReplicatedObjectManager;
014: import com.tc.logging.TCLogger;
015: import com.tc.logging.TCLogging;
016: import com.tc.net.groups.NodeID;
017: import com.tc.net.protocol.tcm.MessageChannel;
018: import com.tc.object.msg.CommitTransactionMessageImpl;
019: import com.tc.object.msg.MessageRecycler;
020: import com.tc.objectserver.core.api.ServerConfigurationContext;
021: import com.tc.objectserver.tx.ServerTransaction;
022: import com.tc.objectserver.tx.ServerTransactionManager;
023: import com.tc.objectserver.tx.TransactionBatchManager;
024: import com.tc.objectserver.tx.TransactionBatchReader;
025: import com.tc.objectserver.tx.TransactionBatchReaderFactory;
026: import com.tc.util.SequenceValidator;
027:
028: import java.util.LinkedHashMap;
029: import java.util.Map;
030:
031: public class ProcessTransactionHandler extends AbstractEventHandler {
032: private static final TCLogger logger = TCLogging
033: .getLogger(ProcessTransactionHandler.class);
034:
035: private TransactionBatchReaderFactory batchReaderFactory;
036: private ReplicatedObjectManager replicatedObjectMgr;
037:
038: private final TransactionBatchManager transactionBatchManager;
039: private final MessageRecycler messageRecycler;
040: private final SequenceValidator sequenceValidator;
041:
042: private Sink txnRelaySink;
043:
044: private ServerTransactionManager transactionManager;
045:
046: public ProcessTransactionHandler(
047: TransactionBatchManager transactionBatchManager,
048: SequenceValidator sequenceValidator,
049: MessageRecycler messageRecycler) {
050: this .transactionBatchManager = transactionBatchManager;
051: this .sequenceValidator = sequenceValidator;
052: this .messageRecycler = messageRecycler;
053: }
054:
055: public void handleEvent(EventContext context) {
056: final CommitTransactionMessageImpl ctm = (CommitTransactionMessageImpl) context;
057: try {
058: final TransactionBatchReader reader = batchReaderFactory
059: .newTransactionBatchReader(ctm);
060: transactionBatchManager.defineBatch(reader.getNodeID(),
061: reader.getNumTxns());
062: ServerTransaction txn;
063:
064: // XXX:: Has to be ordered.
065: Map txns = new LinkedHashMap(reader.getNumTxns());
066: NodeID nodeID = reader.getNodeID();
067: // NOTE::XXX:: GlobalTransactionID id assigned in the process transaction stage. The transaction could be
068: // re-ordered before apply. This is not a problem because for an transaction to be re-ordered, it should not
069: // have any common objects between them. hence if g1 is the first txn and g2 is the second txn, g2 will be applied
070: // before g1, only when g2 has no common objects with g1. If this is not true then we cant assign gid here.
071: while ((txn = reader.getNextTransaction()) != null) {
072: sequenceValidator.setCurrent(nodeID, txn
073: .getClientSequenceID());
074: txns.put(txn.getServerTransactionID(), txn);
075: }
076: messageRecycler.addMessage(ctm, txns.keySet());
077: if (replicatedObjectMgr.relayTransactions()) {
078: transactionManager.incomingTransactions(nodeID, txns
079: .keySet(), txns.values(), true);
080: txnRelaySink.add(new IncomingTransactionContext(nodeID,
081: ctm, txns));
082: } else {
083: transactionManager.incomingTransactions(nodeID, txns
084: .keySet(), txns.values(), false);
085: }
086: } catch (Exception e) {
087: logger.error("Error reading transaction batch. : ", e);
088: MessageChannel c = ctm.getChannel();
089: logger.error("Closing channel " + c.getChannelID()
090: + " due to previous errors !");
091: c.close();
092: }
093: }
094:
095: public void initialize(ConfigurationContext context) {
096: super .initialize(context);
097: ServerConfigurationContext oscc = (ServerConfigurationContext) context;
098: batchReaderFactory = oscc.getTransactionBatchReaderFactory();
099: transactionManager = oscc.getTransactionManager();
100: replicatedObjectMgr = oscc.getL2Coordinator()
101: .getReplicatedObjectManager();
102: Stage relayStage = oscc
103: .getStage(ServerConfigurationContext.TRANSACTION_RELAY_STAGE);
104: if (relayStage != null) {
105: txnRelaySink = relayStage.getSink();
106: }
107: }
108: }
|