01: /*
02: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
03: * notice. All rights reserved.
04: */
05: package com.tc.net.protocol.delivery;
06:
07: import com.tc.async.api.Sink;
08: import com.tc.net.protocol.TCNetworkMessage;
09: import com.tc.util.Assert;
10:
11: /**
12: * This implements an asynchronous Once and only once protocol. Sent messages go out on the sent queue received messages
13: * come in to the ProtocolMessageDelivery instance.
14: */
15: class GuaranteedDeliveryProtocol {
16: private final StateMachineRunner send;
17: private final StateMachineRunner receive;
18: private final SendStateMachine sender;
19: private final ReceiveStateMachine receiver;
20:
21: public GuaranteedDeliveryProtocol(
22: OOOProtocolMessageDelivery delivery, Sink workSink,
23: boolean isClient) {
24: this .sender = new SendStateMachine(delivery, isClient);
25: this .send = new StateMachineRunner(sender, workSink);
26: this .receiver = new ReceiveStateMachine(delivery);
27: this .receive = new StateMachineRunner(receiver, workSink);
28: receiver.setRunner(receive);
29: }
30:
31: public void send(TCNetworkMessage message) {
32: try {
33: sender.put(message);
34: send.addEvent(new OOOProtocolEvent());
35: } catch (InterruptedException e) {
36: throw new AssertionError(e);
37: }
38: }
39:
40: public void receive(OOOProtocolMessage msg) {
41: if (msg.isSend()) {
42: receive.addEvent(new OOOProtocolEvent(msg));
43: } else if (msg.isAck() || msg.isHandshakeReplyOk()
44: || msg.isHandshakeReplyFail()) {
45: send.addEvent(new OOOProtocolEvent(msg));
46: } else {
47: Assert.inv(false);
48: }
49: }
50:
51: public void start() {
52: send.start();
53: receive.start();
54: }
55:
56: public void pause() {
57: send.pause();
58: receive.pause();
59: }
60:
61: public boolean isPaused() {
62: return (send.isPaused() && receive.isPaused());
63: }
64:
65: public void resume() {
66: send.resume();
67: receive.resume();
68: }
69:
70: public void reset() {
71: send.reset();
72: receive.reset();
73: }
74:
75: public ReceiveStateMachine getReceiver() {
76: return receiver;
77: }
78:
79: public SendStateMachine getSender() {
80: return sender;
81: }
82:
83: public void setDebugId(String debugId) {
84: receiver.setDebugId(debugId);
85: }
86: }
|