001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.protocol.delivery;
006:
007: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
008:
009: import com.tc.async.api.Sink;
010: import com.tc.bytes.TCByteBuffer;
011: import com.tc.exception.TCRuntimeException;
012: import com.tc.logging.TCLogger;
013: import com.tc.logging.TCLogging;
014: import com.tc.net.MaxConnectionsExceededException;
015: import com.tc.net.TCSocketAddress;
016: import com.tc.net.core.TCConnection;
017: import com.tc.net.protocol.NetworkLayer;
018: import com.tc.net.protocol.NetworkStackID;
019: import com.tc.net.protocol.TCNetworkMessage;
020: import com.tc.net.protocol.TCProtocolException;
021: import com.tc.net.protocol.tcm.MessageChannelInternal;
022: import com.tc.net.protocol.transport.AbstractMessageTransport;
023: import com.tc.net.protocol.transport.ConnectionID;
024: import com.tc.net.protocol.transport.MessageTransport;
025: import com.tc.net.protocol.transport.WireProtocolMessage;
026: import com.tc.util.Assert;
027: import com.tc.util.DebugUtil;
028: import com.tc.util.TCTimeoutException;
029:
030: import java.io.IOException;
031: import java.net.UnknownHostException;
032: import java.util.Random;
033:
034: /**
035: * NetworkLayer implementation for once and only once message delivery protocol.
036: */
037: public class OnceAndOnlyOnceProtocolNetworkLayerImpl extends
038: AbstractMessageTransport implements
039: OnceAndOnlyOnceProtocolNetworkLayer, OOOProtocolMessageDelivery {
040: private static final TCLogger logger = TCLogging
041: .getLogger(OnceAndOnlyOnceProtocolNetworkLayerImpl.class);
042: private final OOOProtocolMessageFactory messageFactory;
043: private final OOOProtocolMessageParser messageParser;
044: boolean wasConnected = false;
045: private MessageChannelInternal receiveLayer;
046: private MessageTransport sendLayer;
047: private GuaranteedDeliveryProtocol delivery;
048: private final SynchronizedBoolean reconnectMode = new SynchronizedBoolean(
049: false);
050: private final SynchronizedBoolean handshakeMode = new SynchronizedBoolean(
051: false);
052: private final SynchronizedBoolean channelConnected = new SynchronizedBoolean(
053: false);
054: private boolean isClosed = false;
055: private final boolean isClient;
056: private final String debugId;
057: private short sessionId = -1;
058: private static final boolean debug = false;
059:
060: public OnceAndOnlyOnceProtocolNetworkLayerImpl(
061: OOOProtocolMessageFactory messageFactory,
062: OOOProtocolMessageParser messageParser, Sink workSink,
063: boolean isClient) {
064: super (logger);
065: this .messageFactory = messageFactory;
066: this .messageParser = messageParser;
067: this .isClient = isClient;
068: this .delivery = new GuaranteedDeliveryProtocol(this , workSink,
069: isClient);
070: this .delivery.start();
071: this .delivery.pause();
072: this .sessionId = (this .isClient) ? -1 : newRandomSessionId();
073: this .debugId = (this .isClient) ? "CLIENT" : "SERVER";
074: }
075:
076: /*********************************************************************************************************************
077: * Network layer interface...
078: */
079:
080: public void setSendLayer(NetworkLayer layer) {
081: if (!(layer instanceof MessageTransport)) {
082: throw new IllegalArgumentException(
083: "Error: send layer must be MessageTransport!");
084: }
085: this .setSendLayer((MessageTransport) layer);
086: }
087:
088: public void setSendLayer(MessageTransport transport) {
089: this .sendLayer = transport;
090: }
091:
092: public void setReceiveLayer(NetworkLayer layer) {
093: if (!(layer instanceof MessageChannelInternal)) {
094: throw new IllegalArgumentException(
095: "Error: receive layer must be MessageChannelInternal, was "
096: + layer.getClass().getName());
097: }
098: this .receiveLayer = (MessageChannelInternal) layer;
099: }
100:
101: public void send(TCNetworkMessage message) {
102: delivery.send(message);
103: }
104:
105: public void receive(TCByteBuffer[] msgData) {
106: OOOProtocolMessage msg = createProtocolMessage(msgData);
107: debugLog("receive -> " + msg.getHeader().toString());
108: if (msg.isSend() || msg.isAck()) {
109: Assert.inv(!handshakeMode.get());
110: Assert.inv(channelConnected.get());
111: if (sessionId != msg.getSessionId())
112: return; // drop bad message
113: delivery.receive(msg);
114: } else if (msg.isHandshake()) {
115: Assert.inv(!isClient);
116: debugLog("Got Handshake message...");
117: if (msg.getSessionId() == -1) {
118: debugLog("A brand new client is trying to connect - reply OK");
119: OOOProtocolMessage reply = createHandshakeReplyOkMessage(delivery
120: .getReceiver().getReceived().get());
121: sendMessage(reply);
122: delivery.resume();
123: delivery.receive(createHandshakeReplyOkMessage(-1));
124: handshakeMode.set(false);
125: if (!channelConnected.get()) {
126: channelConnected.set(true);
127: receiveLayer.notifyTransportConnected(this );
128: }
129: reconnectMode.set(false);
130: } else if (msg.getSessionId() == getSessionId()) {
131: debugLog("A same-session client is trying to connect - reply OK");
132: OOOProtocolMessage reply = createHandshakeReplyOkMessage(delivery
133: .getReceiver().getReceived().get());
134: sendMessage(reply);
135: handshakeMode.set(false);
136: delivery.resume();
137: // tell local sender the ackseq of client
138: delivery.receive(createHandshakeReplyOkMessage(msg
139: .getAckSequence()));
140: if (!channelConnected.get()) {
141: channelConnected.set(true);
142: receiveLayer.notifyTransportConnected(this );
143: }
144: reconnectMode.set(false);
145: } else {
146: debugLog("A DIFF-session client is trying to connect - reply FAIL");
147: OOOProtocolMessage reply = createHandshakeReplyFailMessage(delivery
148: .getReceiver().getReceived().get());
149: sendMessage(reply);
150: handshakeMode.set(false);
151: if (channelConnected.get())
152: receiveLayer.notifyTransportDisconnected(this );
153: channelConnected.set(false);
154: resetStack();
155: delivery.resume();
156: delivery.receive(reply);
157: if (!channelConnected.get()) {
158: channelConnected.set(true);
159: receiveLayer.notifyTransportConnected(this );
160: }
161: reconnectMode.set(false);
162: }
163: } else if (msg.isHandshakeReplyOk()) {
164: Assert.inv(isClient);
165: Assert.inv(handshakeMode.get());
166: debugLog("Got reply OK");
167: // current session is still ok:
168: // 1. might have to resend some messages
169: // 2. no need to signal to Higher Level
170: handshakeMode.set(false);
171: sessionId = msg.getSessionId();
172: delivery.resume();
173: delivery.receive(msg);
174: if (!channelConnected.get()) {
175: channelConnected.set(true);
176: receiveLayer.notifyTransportConnected(this );
177: }
178: reconnectMode.set(false);
179: } else if (msg.isHandshakeReplyFail()) {
180: debugLog("Received handshake fail reply");
181: Assert.inv(isClient);
182: Assert.inv(handshakeMode.get());
183: // we did not synch'ed the existing session.
184: // 1. clear OOO state (drop messages, clear counters, etc)
185: // 2. set the new session
186: // 3. signal Higher Lever to re-synch
187: if (channelConnected.get())
188: receiveLayer.notifyTransportDisconnected(this );
189: channelConnected.set(false);
190: resetStack();
191: sessionId = msg.getSessionId();
192: handshakeMode.set(false);
193: delivery.resume();
194: delivery.receive(msg);
195: if (!channelConnected.get()) {
196: channelConnected.set(true);
197: receiveLayer.notifyTransportConnected(this );
198: }
199: } else if (msg.isGoodbye()) {
200: debugLog("Got GoodBye message - shutting down");
201: isClosed = true;
202: sendLayer.close();
203: receiveLayer.close();
204: delivery.pause();
205: } else {
206: Assert.inv(false);
207: }
208: }
209:
210: private void debugLog(String msg) {
211: if (debug) {
212: DebugUtil.trace("OOOLayer-" + debugId + "-"
213: + sendLayer.getConnectionId() + " -> " + msg);
214: }
215: }
216:
217: public boolean isConnected() {
218: return (channelConnected.get() && !delivery.isPaused());
219: }
220:
221: public NetworkStackID open() throws TCTimeoutException,
222: UnknownHostException, IOException,
223: MaxConnectionsExceededException {
224: Assert.assertNotNull(sendLayer);
225: return sendLayer.open();
226: }
227:
228: public void close() {
229: Assert.assertNotNull(sendLayer);
230: // send goobye message with session-id on it
231: OOOProtocolMessage opm = messageFactory
232: .createNewGoodbyeMessage(getSessionId());
233: sendLayer.send(opm);
234: sendLayer.close();
235: }
236:
237: /*********************************************************************************************************************
238: * Transport listener interface...
239: */
240:
241: public void notifyTransportConnected(MessageTransport transport) {
242: handshakeMode.set(true);
243: if (isClient) {
244: OOOProtocolMessage handshake = createHandshakeMessage(delivery
245: .getReceiver().getReceived().get());
246: debugLog("Sending Handshake message...");
247: sendMessage(handshake);
248: } else {
249: // resue for missing transportDisconnected events
250: if (!delivery.isPaused()) {
251: notifyTransportDisconnected(null);
252: }
253: }
254: reconnectMode.set(false);
255: }
256:
257: public void notifyTransportDisconnected(MessageTransport transport) {
258: final boolean restoreConnectionMode = reconnectMode.get();
259: debugLog("Transport Disconnected - pausing delivery, restoreConnection = "
260: + restoreConnectionMode);
261: this .delivery.pause();
262: if (!restoreConnectionMode) {
263: if (channelConnected.get())
264: receiveLayer.notifyTransportDisconnected(this );
265: channelConnected.set(false);
266: }
267: }
268:
269: public void start() {
270: //
271: }
272:
273: public void pause() {
274: this .delivery.pause();
275: }
276:
277: public void resume() {
278: this .delivery.resume();
279: }
280:
281: public void notifyTransportConnectAttempt(MessageTransport transport) {
282: if (!reconnectMode.get()) {
283: receiveLayer.notifyTransportConnectAttempt(this );
284: }
285: }
286:
287: public void notifyTransportClosed(MessageTransport transport) {
288: // XXX: do we do anything here? We've probably done everything we need to do when close() was called.
289: debugLog("Transport Closed - notifying higher layer");
290: receiveLayer.notifyTransportClosed(this );
291: channelConnected.set(false);
292: }
293:
294: /*********************************************************************************************************************
295: * Protocol Message Delivery interface
296: */
297:
298: public OOOProtocolMessage createHandshakeMessage(long ack) {
299: OOOProtocolMessage rv = this .messageFactory
300: .createNewHandshakeMessage(getSessionId(), ack);
301: return rv;
302: }
303:
304: public OOOProtocolMessage createHandshakeReplyOkMessage(long ack) {
305: // FIXME: need to use correct ack
306: OOOProtocolMessage rv = this .messageFactory
307: .createNewHandshakeReplyOkMessage(getSessionId(), ack);
308: return rv;
309: }
310:
311: public OOOProtocolMessage createHandshakeReplyFailMessage(long ack) {
312: // FIXME: need to use correct ack
313: OOOProtocolMessage rv = this .messageFactory
314: .createNewHandshakeReplyFailMessage(getSessionId(), ack);
315: return rv;
316: }
317:
318: private short getSessionId() {
319: return sessionId;
320: }
321:
322: public OOOProtocolMessage createAckMessage(long ack) {
323: return (this .messageFactory.createNewAckMessage(getSessionId(),
324: ack));
325: }
326:
327: public boolean sendMessage(OOOProtocolMessage msg) {
328: // this method doesn't do anything at the moment, but it is a good spot to plug in things you might want to do
329: // every message flowing down from the layer (like logging for example)
330: if (this .sendLayer.isConnected()) {
331: this .sendLayer.send(msg);
332: return (true);
333: } else {
334: return (false);
335: }
336: }
337:
338: public void receiveMessage(OOOProtocolMessage msg) {
339: Assert.assertNotNull("Receive layer is null.",
340: this .receiveLayer);
341: Assert.assertNotNull("Attempt to null msg", msg);
342: Assert.eval(msg.isSend());
343:
344: this .receiveLayer.receive(msg.getPayload());
345: }
346:
347: public OOOProtocolMessage createProtocolMessage(long sequence,
348: final TCNetworkMessage msg) {
349: OOOProtocolMessage rv = messageFactory.createNewSendMessage(
350: getSessionId(), sequence, msg);
351: final Runnable callback = msg.getSentCallback();
352: if (callback != null) {
353: rv.setSentCallback(new Runnable() {
354: public void run() {
355: callback.run();
356: }
357: });
358: }
359:
360: return rv;
361: }
362:
363: private OOOProtocolMessage createProtocolMessage(
364: TCByteBuffer[] msgData) {
365: try {
366: return messageParser.parseMessage(msgData);
367: } catch (TCProtocolException e) {
368: // XXX: this isn't the right thing to do here
369: throw new TCRuntimeException(e);
370: }
371: }
372:
373: public void attachNewConnection(TCConnection connection) {
374: throw new AssertionError("Must not call!");
375: }
376:
377: public void setAllowConnectionReplace(boolean allow) {
378: throw new AssertionError("Must not call!");
379: }
380:
381: public ConnectionID getConnectionId() {
382: return sendLayer != null ? sendLayer.getConnectionId() : null;
383: }
384:
385: public TCSocketAddress getLocalAddress() {
386: return sendLayer.getLocalAddress();
387: }
388:
389: public TCSocketAddress getRemoteAddress() {
390: return sendLayer.getRemoteAddress();
391: }
392:
393: public void receiveTransportMessage(WireProtocolMessage message) {
394: throw new AssertionError("Must not call!");
395: }
396:
397: public void sendToConnection(TCNetworkMessage message) {
398: throw new AssertionError("Must not call!");
399: }
400:
401: public void startRestoringConnection() {
402: debugLog("Switched to restoreConnection mode");
403: reconnectMode.set(true);
404: }
405:
406: public void connectionRestoreFailed() {
407: debugLog("RestoreConnectionFailed - resetting stack");
408: if (channelConnected.get()) {
409: receiveLayer.notifyTransportDisconnected(this );
410: channelConnected.set(false);
411: }
412: reconnectMode.set(false);
413: delivery.pause();
414: delivery.reset();
415: sessionId = newRandomSessionId();
416: }
417:
418: private void resetStack() {
419: // we need to reset because we are talking to a new stack on the other side
420: reconnectMode.set(false);
421: delivery.pause();
422: delivery.reset();
423: }
424:
425: public boolean isClosed() {
426: return isClosed;
427: }
428:
429: private short newRandomSessionId() {
430: // generate a random session id
431: Random r = new Random();
432: r.setSeed(System.currentTimeMillis());
433: return ((short) r.nextInt(Short.MAX_VALUE));
434: }
435:
436: }
|