01: /*
02: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
03: */
04: package com.tc.net.protocol.transport;
05:
06: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
07: import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
08:
09: import com.tc.net.protocol.NetworkMessageSink;
10: import com.tc.net.protocol.TCNetworkMessage;
11:
12: import junit.framework.Assert;
13:
14: abstract class HandshakeMessageResponderBase implements
15: NetworkMessageSink, HandshakeMessageResponder {
16: protected final ConnectionID assignedConnectionId;
17: private final MessageTransportBase transport;
18: protected final TransportHandshakeMessageFactory messageFactory;
19: protected LinkedQueue sentQueue;
20: protected LinkedQueue receivedQueue;
21: private final SynchronizedRef errorRef;
22:
23: protected HandshakeMessageResponderBase(LinkedQueue sentQueue,
24: LinkedQueue receivedQueue,
25: TransportHandshakeMessageFactory messageFactory,
26: ConnectionID assignedConnectionId,
27: MessageTransportBase transport, SynchronizedRef errorRef) {
28: super ();
29: this .sentQueue = sentQueue;
30: this .receivedQueue = receivedQueue;
31: this .messageFactory = messageFactory;
32: this .assignedConnectionId = assignedConnectionId;
33: this .transport = transport;
34: this .errorRef = errorRef;
35: }
36:
37: public void putMessage(TCNetworkMessage msg) {
38: Assert.assertTrue(msg instanceof TransportHandshakeMessage);
39: TransportHandshakeMessage message = (TransportHandshakeMessage) msg;
40:
41: try {
42: this .receivedQueue.put(message);
43: handleHandshakeMessage(message);
44: } catch (InterruptedException e) {
45: setError(e);
46: }
47: }
48:
49: protected void setError(Exception e) {
50: e.printStackTrace();
51: errorRef.set(e);
52: }
53:
54: protected void sendResponseMessage(
55: final TransportHandshakeMessage responseMessage) {
56: new Thread(new Runnable() {
57: public void run() {
58: try {
59: sentQueue.put(responseMessage);
60: transport.receiveTransportMessage(responseMessage);
61: } catch (Exception e) {
62: setError(e);
63: }
64: }
65: }).start();
66: }
67: }
|