001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.protocol.transport;
006:
007: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
008: import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
009:
010: import com.tc.exception.ImplementMe;
011: import com.tc.net.TCSocketAddress;
012: import com.tc.net.core.ConnectionAddressProvider;
013: import com.tc.net.core.ConnectionInfo;
014: import com.tc.net.core.MockConnectionManager;
015: import com.tc.net.core.MockTCConnection;
016: import com.tc.net.core.event.TCConnectionEvent;
017: import com.tc.net.protocol.PlainNetworkStackHarnessFactory;
018: import com.tc.net.protocol.tcm.CommunicationsManager;
019: import com.tc.net.protocol.tcm.CommunicationsManagerImpl;
020: import com.tc.net.protocol.tcm.NetworkListener;
021: import com.tc.net.protocol.tcm.NullMessageMonitor;
022: import com.tc.object.session.NullSessionManager;
023: import com.tc.test.TCTestCase;
024:
025: import java.util.Collections;
026: import java.util.List;
027:
028: /**
029: * x normal connect and handshake o reconnect and handshake
030: */
031: public class ClientMessageTransportTest extends TCTestCase {
032: private ConnectionID connectionId;
033: private ClientMessageTransport transport;
034: private MockConnectionManager connectionManager;
035: private MockTCConnection connection;
036: private TransportHandshakeMessageFactory transportMessageFactory;
037: private TransportHandshakeErrorHandler handshakeErrorHandler;
038: private int maxRetries = 10;
039:
040: public void setUp() {
041: DefaultConnectionIdFactory connectionIDProvider = new DefaultConnectionIdFactory();
042: this .connectionId = connectionIDProvider.nextConnectionId();
043: this .connectionManager = new MockConnectionManager();
044: this .connection = new MockTCConnection();
045: this .connectionManager.setConnection(connection);
046: this .transportMessageFactory = new TransportHandshakeMessageFactoryImpl();
047: handshakeErrorHandler = new TransportHandshakeErrorHandler() {
048:
049: public void handleHandshakeError(
050: TransportHandshakeErrorContext e) {
051: throw new ImplementMe();
052:
053: }
054:
055: public void handleHandshakeError(
056: TransportHandshakeErrorContext e,
057: TransportHandshakeMessage m) {
058: throw new ImplementMe();
059:
060: }
061:
062: };
063: final ConnectionInfo connectionInfo = new ConnectionInfo("", 0);
064: ClientConnectionEstablisher cce = new ClientConnectionEstablisher(
065: connectionManager, new ConnectionAddressProvider(
066: new ConnectionInfo[] { connectionInfo }),
067: maxRetries, 5000);
068: transport = new ClientMessageTransport(cce,
069: handshakeErrorHandler, this .transportMessageFactory,
070: new WireProtocolAdaptorFactoryImpl());
071: }
072:
073: public void testRoundRobinReconnect() throws Exception {
074: SynchronizedRef errorRef = new SynchronizedRef(null);
075: ClientHandshakeMessageResponder tester = new ClientHandshakeMessageResponder(
076: new LinkedQueue(), new LinkedQueue(),
077: this .transportMessageFactory, this .connectionId,
078: this .transport, errorRef);
079: this .connection.setMessageSink(tester);
080:
081: transport.open();
082: while (!connection.connectCalls.isEmpty()) {
083: connection.connectCalls.take();
084: }
085:
086: connection.fail = true;
087: transport.closeEvent(new TCConnectionEvent(connection));
088:
089: // FIXME 2005-12-14 -- We should restore this test.
090: // assertNull(connection.connectCalls.poll(3000));
091:
092: }
093:
094: public void testConnectAndHandshake() throws Exception {
095: SynchronizedRef errorRef = new SynchronizedRef(null);
096: ClientHandshakeMessageResponder tester = new ClientHandshakeMessageResponder(
097: new LinkedQueue(), new LinkedQueue(),
098: this .transportMessageFactory, this .connectionId,
099: this .transport, errorRef);
100:
101: this .connection.setMessageSink(tester);
102:
103: transport.open();
104:
105: assertTrue(errorRef.get() == null);
106:
107: List sentMessages = connection.getSentMessages();
108:
109: assertEquals(2, sentMessages.size());
110: assertEquals(this .connectionId, transport.getConnectionId());
111: Thread.sleep(1000);
112: assertTrue(tester.waitForAckToBeReceived(3000));
113: }
114:
115: /**
116: * Test interaction with a real network listener.
117: */
118: public void testConnectAndHandshakeActuallyConnected()
119: throws Exception {
120: CommunicationsManager commsMgr = new CommunicationsManagerImpl(
121: new NullMessageMonitor(),
122: new PlainNetworkStackHarnessFactory(),
123: new NullConnectionPolicy());
124: NetworkListener listener = commsMgr.createListener(
125: new NullSessionManager(), new TCSocketAddress(0), true,
126: new DefaultConnectionIdFactory());
127: listener.start(Collections.EMPTY_SET);
128: int port = listener.getBindPort();
129:
130: final ConnectionInfo connInfo = new ConnectionInfo(
131: TCSocketAddress.LOOPBACK_IP, port);
132: ClientConnectionEstablisher cce = new ClientConnectionEstablisher(
133: commsMgr.getConnectionManager(),
134: new ConnectionAddressProvider(
135: new ConnectionInfo[] { connInfo }), 0, 1000);
136: transport = new ClientMessageTransport(cce,
137: this .handshakeErrorHandler,
138: this .transportMessageFactory,
139: new WireProtocolAdaptorFactoryImpl());
140: transport.open();
141: assertTrue(transport.isConnected());
142: listener.stop(5000);
143:
144: }
145: }
|