001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.protocol.delivery;
006:
007: import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
008: import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
009:
010: import com.tc.properties.TCPropertiesImpl;
011: import com.tc.util.Assert;
012: import com.tc.util.DebugUtil;
013:
014: /**
015: *
016: */
017: public class ReceiveStateMachine extends AbstractStateMachine {
018: private final State MESSAGE_WAIT_STATE = new MessageWaitState();
019:
020: private final SynchronizedLong received = new SynchronizedLong(-1);
021: private final SynchronizedInt delayedAcks = new SynchronizedInt(0);
022: private final int maxDelayedAcks;
023: private final OOOProtocolMessageDelivery delivery;
024: private StateMachineRunner runner;
025:
026: private String debugId = "UNKNOWN";
027:
028: private static final boolean debug = false;
029:
030: public ReceiveStateMachine(OOOProtocolMessageDelivery delivery) {
031: // set MaxDelayedAcks from tc.properties if exist. 0 to disable ack delay.
032: maxDelayedAcks = TCPropertiesImpl.getProperties().getInt(
033: "l2.nha.ooo.maxDelayedAcks", 16);
034: this .delivery = delivery;
035: }
036:
037: public void execute(OOOProtocolMessage msg) {
038: getCurrentState().execute(msg);
039: }
040:
041: protected State initialState() {
042: return MESSAGE_WAIT_STATE;
043: }
044:
045: private int getRunnerEventLength() {
046: return ((runner != null) ? runner.getEventsCount() : 0);
047: }
048:
049: private class MessageWaitState extends AbstractState {
050:
051: public MessageWaitState() {
052: super ("MESSAGE_WAIT_STATE");
053: }
054:
055: public void execute(OOOProtocolMessage msg) {
056: if (msg.isSend()) {
057: handleSendMessage(msg);
058: } else {
059: // these message should be handled at higher level
060: Assert.inv(msg.isAck() || msg.isGoodbye());
061: Assert.inv(false);
062: }
063: }
064:
065: private void handleSendMessage(OOOProtocolMessage msg) {
066: final long r = msg.getSent();
067: final long curRecv = received.get();
068: if (r <= curRecv) {
069: // we already got message
070: debugLog("Received dup msg " + r);
071: sendAck(curRecv);
072: delayedAcks.set(0);
073: return;
074: } else if (r > (curRecv + 1)) {
075: // message missed, resend ack, receive to resend message.
076: debugLog("Received out of order msg " + r);
077: sendAck(curRecv);
078: delayedAcks.set(0);
079: return;
080: } else {
081: Assert.inv(r == (curRecv + 1));
082: putMessage(msg);
083: ackIfNeeded(received.increment());
084: }
085: }
086: }
087:
088: private void putMessage(OOOProtocolMessage msg) {
089: this .delivery.receiveMessage(msg);
090: }
091:
092: private void ackIfNeeded(long next) {
093: if ((delayedAcks.get() < maxDelayedAcks)
094: && (getRunnerEventLength() > 0)) {
095: delayedAcks.increment();
096: } else {
097: /*
098: * saw IllegalStateException by AbstractTCNetworkMessage.checkSealed
099: * when message sent to non-established transport by MessageTransportBase.send.
100: * reset delayedAcks only ack can be sent.
101: */
102: if (sendAck(next)) {
103: delayedAcks.set(0);
104: } else {
105: debugLog("Failed to send ack:" + next);
106: }
107: }
108: }
109:
110: private boolean sendAck(long seq) {
111: OOOProtocolMessage opm = delivery.createAckMessage(seq);
112: Assert.inv(opm.getSessionId() > -1);
113: return (delivery.sendMessage(opm));
114: }
115:
116: public void reset() {
117: received.set(-1);
118: delayedAcks.set(0);
119: }
120:
121: private void debugLog(String msg) {
122: if (debug) {
123: DebugUtil.trace("Receiver-" + debugId + "-"
124: + delivery.getConnectionId() + " -> " + msg);
125: }
126: }
127:
128: public void setDebugId(String debugId) {
129: this .debugId = debugId;
130: }
131:
132: public SynchronizedLong getReceived() {
133: return received;
134: }
135:
136: public void setRunner(StateMachineRunner receive) {
137: this.runner = receive;
138: }
139: }
|