001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.protocol.transport;
006:
007: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
008: import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
009:
010: import com.tc.exception.ImplementMe;
011: import com.tc.net.TCSocketAddress;
012: import com.tc.net.core.ConnectionAddressProvider;
013: import com.tc.net.core.ConnectionInfo;
014: import com.tc.net.core.MockConnectionManager;
015: import com.tc.net.core.MockTCConnection;
016: import com.tc.net.core.event.TCConnectionEvent;
017: import com.tc.net.protocol.PlainNetworkStackHarnessFactory;
018: import com.tc.net.protocol.tcm.CommunicationsManagerImpl;
019: import com.tc.net.protocol.tcm.MessageChannel;
020: import com.tc.net.protocol.tcm.NetworkListener;
021: import com.tc.net.protocol.tcm.NullMessageMonitor;
022: import com.tc.object.session.NullSessionManager;
023: import com.tc.test.TCTestCase;
024: import com.tc.util.TCAssertionError;
025:
026: import java.util.Collections;
027:
028: /**
029: * Test case for MessageTransportImpl
030: */
031: public class MessageTransportTest extends TCTestCase {
032: private SynchronizedRef clientErrorRef;
033: private SynchronizedRef serverErrorRef;
034: private MessageChannel channel;
035: private ClientHandshakeMessageResponder clientResponder;
036: private ServerHandshakeMessageResponder serverResponder;
037: private LinkedQueue clientResponderReceivedQueue;
038: private LinkedQueue clientResponderSentQueue;
039: private LinkedQueue serverResponderReceivedQueue;
040: private LinkedQueue serverResponderSentQueue;
041: private TransportEventMonitor clientEventMonitor;
042: private TransportEventMonitor serverEventMonitor;
043: private ClientMessageTransport clientTransport;
044: private ServerMessageTransport serverTransport;
045: private ConnectionID connectionId;
046: private MockTCConnection clientConnection;
047: private MockConnectionManager connManager;
048: private CommunicationsManagerImpl commsManager;
049: private NetworkListener lsnr;
050: private TransportHandshakeMessageFactory transportHandshakeMessageFactory;
051: private MockTCConnection serverConnection;
052:
053: public void setUp() throws Exception {
054: this .clientResponderReceivedQueue = new LinkedQueue();
055: this .clientResponderSentQueue = new LinkedQueue();
056: this .serverResponderReceivedQueue = new LinkedQueue();
057: this .serverResponderSentQueue = new LinkedQueue();
058: this .clientErrorRef = new SynchronizedRef(null);
059: this .serverErrorRef = new SynchronizedRef(null);
060: DefaultConnectionIdFactory connectionIDProvider = new DefaultConnectionIdFactory();
061: this .connectionId = connectionIDProvider.nextConnectionId();
062:
063: this .transportHandshakeMessageFactory = new TransportHandshakeMessageFactoryImpl();
064: serverConnection = new MockTCConnection();
065: serverConnection.isConnected(true);
066:
067: clientConnection = new MockTCConnection();
068: clientConnection.remoteAddress = new TCSocketAddress(
069: "localhost", 0);
070: clientConnection.localAddress = new TCSocketAddress(
071: "localhost", 0);
072: connManager = new MockConnectionManager();
073: connManager.setConnection(clientConnection);
074: commsManager = new CommunicationsManagerImpl(
075: new NullMessageMonitor(),
076: new PlainNetworkStackHarnessFactory(), connManager,
077: new NullConnectionPolicy());
078: lsnr = commsManager.createListener(new NullSessionManager(),
079: new TCSocketAddress(0), true,
080: new DefaultConnectionIdFactory());
081: lsnr.start(Collections.EMPTY_SET);
082: }
083:
084: public void tearDown() throws Exception {
085: super .tearDown();
086: assertTrue(this .clientErrorRef.get() == null);
087: assertTrue(this .serverErrorRef.get() == null);
088: if (channel != null)
089: channel.close();
090: lsnr.stop(5000);
091: commsManager.shutdown();
092: }
093:
094: public void testDoubleOpen() throws Exception {
095:
096: createClientTransport(0);
097: createServerTransport();
098:
099: assertFalse(clientTransport.wasOpened());
100:
101: // make sure that when you open two different transports, the mock
102: // connection manager
103: // records two create connection requests.
104:
105: clientTransport.open();
106: createClientTransport(0);
107: clientTransport.open();
108: assertEquals(2, connManager.getCreateConnectionCallCount());
109:
110: // now open the same transport again and make sure it throws an assertion
111: // error.
112: try {
113: clientTransport.open();
114: fail("Should have thrown an assertion error.");
115: } catch (TCAssertionError e) {
116: // expected.
117: }
118: assertEquals(2, connManager.getCreateConnectionCallCount());
119: }
120:
121: public void testDoubleClose() throws Exception {
122: createClientTransport(0);
123:
124: clientTransport.open();
125: clientTransport.close();
126: assertEquals(1, clientConnection.getCloseCallCount());
127:
128: try {
129: clientTransport.close();
130: } catch (TCAssertionError e) {
131: fail("Should not have thrown an AssertionError");
132: }
133: }
134:
135: public void testClientTransportEvents() throws Exception {
136: createClientTransport(1);
137:
138: // add an extra event monitor to make sure that, if there are multiple
139: // listeners,
140: // they all get the event.
141: TransportEventMonitor extraMonitor = new TransportEventMonitor();
142: this .clientTransport.addTransportListener(extraMonitor);
143:
144: clientTransport.open();
145: assertTrue(clientEventMonitor.waitForConnect(1000));
146: assertTrue(extraMonitor.waitForConnect(1000));
147:
148: TCConnectionEvent event = new TCConnectionEvent(
149: clientConnection);
150:
151: clientTransport.closeEvent(event);
152: assertTrue(clientEventMonitor.waitForDisconnect(1000));
153: assertTrue(extraMonitor.waitForDisconnect(1000));
154: assertTrue(clientEventMonitor.waitForConnectAttempt(1000));
155: assertTrue(extraMonitor.waitForConnectAttempt(1000));
156:
157: clientTransport.close();
158: assertTrue(clientEventMonitor.waitForClose(1000));
159: assertTrue(extraMonitor.waitForClose(1000));
160: }
161:
162: public void testServerTransportEvents() throws Exception {
163: createServerTransport();
164: assertFalse(serverEventMonitor.waitForConnect(500));
165:
166: // add an extra event monitor to make sur ethat, if there are multiple
167: // listeners,
168: // they all get the event.
169: TransportEventMonitor extraMonitor = new TransportEventMonitor();
170: serverTransport.addTransportListener(extraMonitor);
171:
172: TCConnectionEvent event = new TCConnectionEvent(
173: serverConnection);
174:
175: serverTransport.closeEvent(event);
176: assertTrue(serverEventMonitor.waitForDisconnect(1000));
177: assertTrue(extraMonitor.waitForDisconnect(1000));
178:
179: assertFalse(serverEventMonitor.waitForConnectAttempt(1000));
180: assertFalse(extraMonitor.waitForConnectAttempt(1000));
181:
182: serverTransport.close();
183: assertTrue(serverEventMonitor.waitForClose(1000));
184: assertTrue(extraMonitor.waitForClose(1000));
185: }
186:
187: public void testCloseBeforeOpen() throws Exception {
188: createClientTransport(0);
189: clientTransport.close();
190: }
191:
192: public void testWasOpened() throws Exception {
193: createClientTransport(0);
194: assertFalse(clientTransport.wasOpened());
195:
196: clientTransport.open();
197: assertTrue(clientTransport.wasOpened());
198:
199: clientTransport.close();
200: assertTrue(clientTransport.wasOpened());
201: }
202:
203: public void testOpenCloseAndIsConnected() throws Exception {
204: createClientTransport(0);
205: assertFalse(clientTransport.isConnected());
206:
207: clientTransport.open();
208: assertTrue(clientTransport.isConnected());
209:
210: clientTransport.close();
211: assertTrue(clientEventMonitor.waitForClose(1000));
212: assertFalse(clientTransport.isOpen.get());
213:
214: createServerTransport();
215: TransportHandshakeMessage ack = this .transportHandshakeMessageFactory
216: .createAck(connectionId, this .serverTransport
217: .getConnection());
218: this .serverTransport.receiveTransportMessage(ack);
219: assertTrue(serverEventMonitor.waitForConnect(1000));
220: assertTrue(serverTransport.isConnected());
221: }
222:
223: private void createServerTransport() throws Exception {
224: this .serverTransport = new ServerMessageTransport(
225: this .connectionId, this .serverConnection,
226: createHandshakeErrorHandler(),
227: this .transportHandshakeMessageFactory);
228:
229: this .serverResponder = new ServerHandshakeMessageResponder(
230: this .serverResponderSentQueue,
231: this .serverResponderReceivedQueue,
232: this .transportHandshakeMessageFactory,
233: this .connectionId, serverTransport, this .serverErrorRef);
234: this .serverConnection.setMessageSink(this .serverResponder);
235: this .serverEventMonitor = new TransportEventMonitor();
236: this .serverTransport
237: .addTransportListener(this .serverEventMonitor);
238: }
239:
240: private void createClientTransport(int maxReconnectTries)
241: throws Exception {
242: final ConnectionInfo connInfo = new ConnectionInfo(
243: TCSocketAddress.LOOPBACK_IP, 0);
244: final ClientConnectionEstablisher cce = new ClientConnectionEstablisher(
245: connManager, new ConnectionAddressProvider(
246: new ConnectionInfo[] { connInfo }),
247: maxReconnectTries, 0);
248:
249: this .clientTransport = new ClientMessageTransport(cce,
250: createHandshakeErrorHandler(),
251: this .transportHandshakeMessageFactory,
252: new WireProtocolAdaptorFactoryImpl());
253: this .clientResponder = new ClientHandshakeMessageResponder(
254: this .clientResponderSentQueue,
255: this .clientResponderReceivedQueue,
256: this .transportHandshakeMessageFactory,
257: this .connectionId, this .clientTransport,
258: this .clientErrorRef);
259: this .clientConnection.setMessageSink(this .clientResponder);
260: this .clientEventMonitor = new TransportEventMonitor();
261: this .clientTransport
262: .addTransportListener(this .clientEventMonitor);
263: }
264:
265: private TransportHandshakeErrorHandler createHandshakeErrorHandler() {
266: return new TransportHandshakeErrorHandler() {
267:
268: public void handleHandshakeError(
269: TransportHandshakeErrorContext e) {
270: new ImplementMe(e.toString()).printStackTrace();
271: }
272:
273: public void handleHandshakeError(
274: TransportHandshakeErrorContext e,
275: TransportHandshakeMessage m) {
276: throw new ImplementMe();
277:
278: }
279:
280: };
281: }
282:
283: }
|