001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.protocol.delivery;
006:
007: import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
008: import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
009: import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
010:
011: import com.tc.logging.TCLogger;
012: import com.tc.logging.TCLogging;
013: import com.tc.net.protocol.TCNetworkMessage;
014: import com.tc.properties.TCPropertiesImpl;
015: import com.tc.util.Assert;
016: import com.tc.util.DebugUtil;
017:
018: import java.util.LinkedList;
019: import java.util.ListIterator;
020:
021: /**
022: *
023: */
024: public class SendStateMachine extends AbstractStateMachine {
025: private static final int MAX_SEND_QUEUE_SIZE = 1000;
026: private final State ACK_WAIT_STATE = new AckWaitState();
027: private final State HANDSHAKE_WAIT_STATE = new HandshakeWaitState();
028: private final State MESSAGE_WAIT_STATE = new MessageWaitState();
029: private final SynchronizedLong sent = new SynchronizedLong(-1);
030: private final SynchronizedLong acked = new SynchronizedLong(-1);
031: private final OOOProtocolMessageDelivery delivery;
032: private BoundedLinkedQueue sendQueue;
033: private final LinkedList outstandingMsgs = new LinkedList();
034: private final SynchronizedInt outstandingCnt = new SynchronizedInt(
035: 0);
036: private final int sendWindow;
037: private final boolean isClient;
038: private final String debugId;
039: private static final boolean debug = false;
040: private static final TCLogger logger = TCLogging
041: .getLogger(SendStateMachine.class);
042:
043: // changed by tc.properties
044:
045: public SendStateMachine(OOOProtocolMessageDelivery delivery,
046: boolean isClient) {
047: super ();
048:
049: // set sendWindow from tc.properties if exist. 0 to disable window send.
050: sendWindow = TCPropertiesImpl.getProperties().getInt(
051: "l2.nha.ooo.sendWindow", 32);
052: this .delivery = delivery;
053: this .sendQueue = new BoundedLinkedQueue(MAX_SEND_QUEUE_SIZE);
054: this .isClient = isClient;
055: this .debugId = (this .isClient) ? "CLIENT" : "SERVER";
056: }
057:
058: protected void basicResume() {
059: switchToState(HANDSHAKE_WAIT_STATE);
060: }
061:
062: protected State initialState() {
063: Assert.eval(MESSAGE_WAIT_STATE != null);
064: return MESSAGE_WAIT_STATE;
065: }
066:
067: public void execute(OOOProtocolMessage msg) {
068: Assert.eval(isStarted());
069: getCurrentState().execute(msg);
070: }
071:
072: protected void switchToState(State state) {
073: debugLog("switching to " + state);
074: super .switchToState(state);
075: }
076:
077: private class MessageWaitState extends AbstractState {
078:
079: public MessageWaitState() {
080: super ("MESSAGE_WAIT_STATE");
081: }
082:
083: public void enter() {
084: execute(null);
085: }
086:
087: public void execute(OOOProtocolMessage protocolMessage) {
088: if (!sendQueue.isEmpty()) {
089: if ((sendWindow == 0)
090: || (outstandingCnt.get() < sendWindow)) {
091: delivery.sendMessage(createProtocolMessage(sent
092: .increment()));
093: }
094: switchToState(ACK_WAIT_STATE);
095: }
096: }
097: }
098:
099: private class HandshakeWaitState extends AbstractState {
100:
101: public HandshakeWaitState() {
102: super ("HANDSHAKE_WAIT_STATE");
103: }
104:
105: public void execute(OOOProtocolMessage msg) {
106: if (msg == null)
107: return;
108: // drop all msgs until handshake reply.
109: // Happens when short network disruptions and both L1 & L2 still keep states.
110: if (!msg.isHandshakeReplyOk()
111: && !msg.isHandshakeReplyFail()) {
112: logger.warn("Due to handshake drops stale message:"
113: + msg);
114: return;
115: }
116:
117: if (msg.isHandshakeReplyFail()) {
118: switchToState(MESSAGE_WAIT_STATE);
119: return;
120: }
121:
122: long ackedSeq = msg.getAckSequence();
123:
124: if (ackedSeq == -1) {
125: debugLog("The other side restarted [switching to MSG_WAIT_STATE]");
126: switchToState(MESSAGE_WAIT_STATE);
127: return;
128: }
129: if (ackedSeq < acked.get()) {
130: // this shall not, old ack
131: Assert.failure("Received bad ack: " + ackedSeq
132: + " expected >= " + acked.get());
133: } else {
134: while (ackedSeq > acked.get()) {
135: acked.increment();
136: removeMessage();
137: }
138: // resend outstanding which is not acked
139: if (outstandingCnt.get() > 0) {
140: // resend those not acked
141: resendOutstandings();
142: switchToState(ACK_WAIT_STATE);
143: } else {
144: // all acked, we're good here
145: switchToState(MESSAGE_WAIT_STATE);
146: }
147: }
148: }
149: }
150:
151: private class AckWaitState extends AbstractState {
152:
153: public AckWaitState() {
154: super ("ACK_WAIT_STATE");
155: }
156:
157: public void enter() {
158: sendMoreIfAvailable();
159: }
160:
161: public void execute(OOOProtocolMessage protocolMessage) {
162: if (protocolMessage == null || protocolMessage.isSend())
163: return;
164:
165: long ackedSeq = protocolMessage.getAckSequence();
166: Assert.eval(ackedSeq >= acked.get());
167:
168: while (ackedSeq > acked.get()) {
169: acked.increment();
170: removeMessage();
171: }
172:
173: // try pump more
174: sendMoreIfAvailable();
175:
176: if (outstandingCnt.get() == 0) {
177: switchToState(MESSAGE_WAIT_STATE);
178: }
179:
180: // ???: is this check properly synchronized?
181: Assert.eval(acked.get() <= sent.get());
182: }
183:
184: public void sendMoreIfAvailable() {
185: while ((outstandingCnt.get() < sendWindow)
186: && !sendQueue.isEmpty()) {
187: delivery.sendMessage(createProtocolMessage(sent
188: .increment()));
189: }
190: }
191: }
192:
193: private OOOProtocolMessage createProtocolMessage(long count) {
194: final OOOProtocolMessage opm = delivery.createProtocolMessage(
195: count, dequeue(sendQueue));
196: Assert.eval(opm != null);
197: outstandingCnt.increment();
198: outstandingMsgs.add(opm);
199: return (opm);
200: }
201:
202: private void resendOutstandings() {
203: ListIterator it = outstandingMsgs.listIterator(0);
204: while (it.hasNext()) {
205: OOOProtocolMessage msg = (OOOProtocolMessage) it.next();
206: delivery.sendMessage(msg);
207: }
208: }
209:
210: private void removeMessage() {
211: OOOProtocolMessage msg = (OOOProtocolMessage) outstandingMsgs
212: .removeFirst();
213: msg.reallyDoRecycleOnWrite();
214: outstandingCnt.decrement();
215: Assert.eval(outstandingCnt.get() >= 0);
216: }
217:
218: public void reset() {
219:
220: sent.set(-1);
221: acked.set(-1);
222:
223: // purge out outstanding sends
224: outstandingCnt.set(0);
225: outstandingMsgs.clear();
226:
227: BoundedLinkedQueue tmpQ = sendQueue;
228: sendQueue = new BoundedLinkedQueue(MAX_SEND_QUEUE_SIZE);
229: synchronized (tmpQ) {
230: while (!tmpQ.isEmpty()) {
231: dequeue(tmpQ);
232: }
233: }
234: }
235:
236: private static TCNetworkMessage dequeue(BoundedLinkedQueue q) {
237: try {
238: return (TCNetworkMessage) q.take();
239: } catch (InterruptedException e) {
240: throw new AssertionError(e);
241: }
242: }
243:
244: public void put(TCNetworkMessage message)
245: throws InterruptedException {
246: sendQueue.put(message);
247: }
248:
249: private void debugLog(String msg) {
250: if (debug) {
251: DebugUtil.trace("SENDER-" + debugId + "-"
252: + delivery.getConnectionId() + " -> " + msg);
253: }
254: }
255:
256: }
|