01: /*
02: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
03: * notice. All rights reserved.
04: */
05: package com.tc.net.protocol.delivery;
06:
07: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
08:
09: import com.tc.net.protocol.TCNetworkMessage;
10: import com.tc.net.protocol.tcm.NullMessageMonitor;
11: import com.tc.net.protocol.tcm.msgs.PingMessage;
12: import com.tc.objectserver.api.TestSink;
13:
14: import junit.framework.TestCase;
15:
16: /**
17: *
18: */
19: public class GuaranteedDeliveryProtocolTest extends TestCase {
20: public void tests() throws Exception {
21: LinkedQueue receiveQueue = new LinkedQueue();
22: TestProtocolMessageDelivery delivery = new TestProtocolMessageDelivery(
23: receiveQueue);
24: TestSink workSink = new TestSink();
25: final short sessionId = 124;
26:
27: GuaranteedDeliveryProtocol gdp = new GuaranteedDeliveryProtocol(
28: delivery, workSink, true);
29: gdp.start();
30: gdp.resume();
31:
32: // hand shake state
33: // send AckRequest to receiver
34: TestProtocolMessage msg = new TestProtocolMessage();
35: msg.isHandshake = true;
36: msg.setSessionId(sessionId);
37: gdp.receive(msg);
38: runWorkSink(workSink);
39: // reply ack=-1 from receiver
40: msg = new TestProtocolMessage(null, 0, -1);
41: msg.isAck = true;
42: msg.setSessionId(sessionId);
43: gdp.receive(msg);
44: runWorkSink(workSink);
45:
46: TCNetworkMessage tcMessage = new PingMessage(
47: new NullMessageMonitor());
48: assertTrue(workSink.size() == 0);
49: gdp.send(tcMessage);
50: assertTrue(workSink.size() == 1);
51: runWorkSink(workSink);
52: assertTrue(delivery.created);
53: assertTrue(delivery.tcMessage == tcMessage);
54: TestProtocolMessage pm = (TestProtocolMessage) delivery.msg;
55: pm.setSessionId(sessionId);
56: delivery.clear();
57: pm.isSend = true;
58: gdp.receive(pm);
59: assertTrue(workSink.size() == 1);
60: runWorkSink(workSink);
61:
62: assertTrue(receiveQueue.take() == tcMessage);
63: assertTrue(delivery.sentAck);
64: assertTrue(delivery.ackCount == 0);
65:
66: delivery.clear();
67: TestProtocolMessage ackMessage = new TestProtocolMessage();
68: ackMessage.setSessionId(sessionId);
69: ackMessage.ack = 0;
70: ackMessage.isAck = true;
71: gdp.receive(ackMessage);
72: assertTrue(workSink.size() == 1);
73: runWorkSink(workSink);
74: delivery.clear();
75:
76: gdp.send(tcMessage);
77: gdp.send(tcMessage);
78: assertTrue(workSink.size() == 1);
79: runWorkSink(workSink);
80: assertTrue(workSink.size() == 1);
81: }
82:
83: private void runWorkSink(TestSink sink) {
84: StateMachineRunner smr = (StateMachineRunner) sink
85: .getInternalQueue().remove(0);
86: smr.run();
87: }
88: }
|