001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.handler;
006:
007: import com.tc.async.api.AbstractEventHandler;
008: import com.tc.async.api.ConfigurationContext;
009: import com.tc.async.api.EventContext;
010: import com.tc.l2.context.IncomingTransactionContext;
011: import com.tc.l2.ha.L2HAZapNodeRequestProcessor;
012: import com.tc.l2.msg.RelayedCommitTransactionMessage;
013: import com.tc.l2.msg.RelayedCommitTransactionMessageFactory;
014: import com.tc.l2.objectserver.L2ObjectState;
015: import com.tc.l2.objectserver.L2ObjectStateManager;
016: import com.tc.logging.TCLogger;
017: import com.tc.logging.TCLogging;
018: import com.tc.net.groups.GroupManager;
019: import com.tc.net.groups.NodeID;
020: import com.tc.object.gtx.GlobalTransactionID;
021: import com.tc.object.tx.ServerTransactionID;
022: import com.tc.objectserver.core.api.ServerConfigurationContext;
023: import com.tc.objectserver.gtx.ServerGlobalTransactionManager;
024: import com.tc.objectserver.tx.ServerTransactionManager;
025: import com.tc.util.sequence.SequenceGenerator;
026:
027: import java.util.Collection;
028: import java.util.Iterator;
029:
030: public class TransactionRelayHandler extends AbstractEventHandler {
031: private static final TCLogger logger = TCLogging
032: .getLogger(TransactionRelayHandler.class);
033:
034: private final L2ObjectStateManager l2ObjectStateMgr;
035: private final SequenceGenerator sequenceGenerator;
036:
037: private GroupManager groupManager;
038:
039: private ServerTransactionManager transactionManager;
040:
041: private final ServerGlobalTransactionManager gtxm;
042:
043: public TransactionRelayHandler(
044: L2ObjectStateManager objectStateManager,
045: SequenceGenerator generator,
046: ServerGlobalTransactionManager gtxm) {
047: this .l2ObjectStateMgr = objectStateManager;
048: this .sequenceGenerator = generator;
049: this .gtxm = gtxm;
050: }
051:
052: public void handleEvent(EventContext context) {
053: IncomingTransactionContext ict = (IncomingTransactionContext) context;
054: GlobalTransactionID lowWaterMark = gtxm
055: .getLowGlobalTransactionIDWatermark();
056: Collection states = l2ObjectStateMgr.getL2ObjectStates();
057: for (Iterator i = states.iterator(); i.hasNext();) {
058: L2ObjectState state = (L2ObjectState) i.next();
059: NodeID nodeID = state.getNodeID();
060: sendCommitTransactionMessage(nodeID, ict, lowWaterMark);
061: }
062: transactionManager.transactionsRelayed(ict.getNodeID(), ict
063: .getServerTransactionIDs());
064: }
065:
066: private void sendCommitTransactionMessage(NodeID nodeID,
067: IncomingTransactionContext ict,
068: GlobalTransactionID lowWaterMark) {
069: addWaitForNotification(nodeID, ict);
070: try {
071: RelayedCommitTransactionMessage msg = RelayedCommitTransactionMessageFactory
072: .createRelayedCommitTransactionMessage(ict
073: .getCommitTransactionMessage(), ict
074: .getTxns(), sequenceGenerator
075: .getNextSequence(nodeID), lowWaterMark);
076: this .groupManager.sendTo(nodeID, msg);
077: } catch (Exception e) {
078: reconsileWaitForNotification(nodeID, ict);
079: logger.error("Removing " + nodeID
080: + " from group because of Exception :", e);
081: groupManager.zapNode(nodeID,
082: L2HAZapNodeRequestProcessor.COMMUNICATION_ERROR,
083: "Error relaying commit transaction message"
084: + L2HAZapNodeRequestProcessor
085: .getErrorString(e));
086: }
087: }
088:
089: private void reconsileWaitForNotification(NodeID waitee,
090: IncomingTransactionContext ict) {
091: // TODO::avoid this loop and thus N lookups in transactionManager
092: for (Iterator i = ict.getServerTransactionIDs().iterator(); i
093: .hasNext();) {
094: ServerTransactionID stxnID = (ServerTransactionID) i.next();
095: transactionManager.acknowledgement(ict.getNodeID(), stxnID
096: .getClientTransactionID(), waitee);
097: }
098: }
099:
100: private void addWaitForNotification(NodeID waitee,
101: IncomingTransactionContext ict) {
102: // TODO::avoid this loop and thus N lookups in transactionManager
103: for (Iterator i = ict.getServerTransactionIDs().iterator(); i
104: .hasNext();) {
105: ServerTransactionID stxnID = (ServerTransactionID) i.next();
106: transactionManager.addWaitingForAcknowledgement(ict
107: .getNodeID(), stxnID.getClientTransactionID(),
108: waitee);
109: }
110: }
111:
112: public void initialize(ConfigurationContext context) {
113: super .initialize(context);
114: ServerConfigurationContext oscc = (ServerConfigurationContext) context;
115: this.groupManager = oscc.getL2Coordinator().getGroupManager();
116: this.transactionManager = oscc.getTransactionManager();
117: }
118: }
|