001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.handler;
006:
007: import com.tc.async.api.AbstractEventHandler;
008: import com.tc.async.api.ConfigurationContext;
009: import com.tc.async.api.EventContext;
010: import com.tc.async.api.Sink;
011: import com.tc.net.protocol.tcm.ChannelIDProvider;
012: import com.tc.object.ClientConfigurationContext;
013: import com.tc.object.dmi.DmiDescriptor;
014: import com.tc.object.event.DmiEventContext;
015: import com.tc.object.event.DmiManager;
016: import com.tc.object.gtx.ClientGlobalTransactionManager;
017: import com.tc.object.gtx.GlobalTransactionID;
018: import com.tc.object.lockmanager.api.ClientLockManager;
019: import com.tc.object.lockmanager.api.LockContext;
020: import com.tc.object.msg.AcknowledgeTransactionMessage;
021: import com.tc.object.msg.AcknowledgeTransactionMessageFactory;
022: import com.tc.object.msg.BroadcastTransactionMessageImpl;
023: import com.tc.object.session.SessionManager;
024: import com.tc.object.tx.ClientTransactionManager;
025: import com.tc.util.Assert;
026: import com.tcclient.object.DistributedMethodCall;
027:
028: import java.util.Collection;
029: import java.util.Iterator;
030: import java.util.LinkedList;
031: import java.util.List;
032:
033: /**
034: * @author steve
035: */
036: public class ReceiveTransactionHandler extends AbstractEventHandler {
037: private ClientTransactionManager txManager;
038: private ClientLockManager lockManager;
039: private final SessionManager sessionManager;
040: private final ClientGlobalTransactionManager gtxManager;
041: private final AcknowledgeTransactionMessageFactory atmFactory;
042: private final ChannelIDProvider cidProvider;
043: private final Sink dmiSink;
044: private final DmiManager dmiManager;
045:
046: public ReceiveTransactionHandler(ChannelIDProvider provider,
047: AcknowledgeTransactionMessageFactory atmFactory,
048: ClientGlobalTransactionManager gtxManager,
049: SessionManager sessionManager, Sink dmiSink,
050: DmiManager dmiManager) {
051: this .cidProvider = provider;
052: this .atmFactory = atmFactory;
053: this .gtxManager = gtxManager;
054: this .sessionManager = sessionManager;
055: this .dmiSink = dmiSink;
056: this .dmiManager = dmiManager;
057: }
058:
059: public void handleEvent(EventContext context) {
060: final BroadcastTransactionMessageImpl btm = (BroadcastTransactionMessageImpl) context;
061:
062: if (false)
063: System.err
064: .println(cidProvider.getChannelID()
065: + ": ReceiveTransactionHandler: committer="
066: + btm.getCommitterID() + ", "
067: + btm.getTransactionID()
068: + btm.getGlobalTransactionID()
069: + ", notified: "
070: + btm.addNotifiesTo(new LinkedList())
071: + ", lookup ObjectIDs: "
072: + btm.getLookupObjectIDs());
073:
074: Assert.eval(btm.getLockIDs().size() > 0);
075: GlobalTransactionID lowWaterMark = btm
076: .getLowGlobalTransactionIDWatermark();
077: if (!lowWaterMark.isNull()) {
078: gtxManager.setLowWatermark(lowWaterMark);
079: }
080: if (gtxManager.startApply(btm.getCommitterID(), btm
081: .getTransactionID(), btm.getGlobalTransactionID())) {
082: Collection changes = btm.getObjectChanges();
083: if (changes.size() > 0
084: || btm.getLookupObjectIDs().size() > 0
085: || btm.getNewRoots().size() > 0) {
086:
087: if (false)
088: System.err.println(cidProvider.getChannelID()
089: + " Applying - committer="
090: + btm.getCommitterID() + " , "
091: + btm.getTransactionID() + " , "
092: + btm.getGlobalTransactionID());
093:
094: txManager.apply(btm.getTransactionType(), btm
095: .getLockIDs(), changes, btm
096: .getLookupObjectIDs(), btm.getNewRoots());
097: }
098: }
099:
100: Collection notifies = btm.addNotifiesTo(new LinkedList());
101: for (Iterator i = notifies.iterator(); i.hasNext();) {
102: LockContext lc = (LockContext) i.next();
103: lockManager.notified(lc.getLockID(), lc.getThreadID());
104: }
105:
106: List dmis = btm.getDmiDescriptors();
107: for (Iterator i = dmis.iterator(); i.hasNext();) {
108: DmiDescriptor dd = (DmiDescriptor) i.next();
109:
110: // NOTE: This prepare call must happen before handing off the DMI to the stage, and more
111: // importantly before sending ACK below
112: DistributedMethodCall dmc = dmiManager.extract(dd);
113: if (dmc != null) {
114: dmiSink.add(new DmiEventContext(dmc));
115: }
116: }
117:
118: // XXX:: This is a potential race condition here 'coz after we decide to send an ACK
119: // and before we actually send it, the server may go down and come back up !
120: if (sessionManager.isCurrentSession(btm.getLocalSessionID())) {
121: AcknowledgeTransactionMessage ack = atmFactory
122: .newAcknowledgeTransactionMessage();
123: ack
124: .initialize(btm.getCommitterID(), btm
125: .getTransactionID());
126: ack.send();
127: }
128: btm.recycle();
129: }
130:
131: public void initialize(ConfigurationContext context) {
132: super .initialize(context);
133: ClientConfigurationContext ccc = (ClientConfigurationContext) context;
134: this.txManager = ccc.getTransactionManager();
135: this.lockManager = ccc.getLockManager();
136: }
137:
138: }
|