001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.protocol.tcm;
006:
007: import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
008:
009: import com.tc.logging.TCLogger;
010: import com.tc.logging.TCLogging;
011: import com.tc.net.TCSocketAddress;
012: import com.tc.net.core.ConnectionAddressProvider;
013: import com.tc.net.core.ConnectionInfo;
014: import com.tc.net.protocol.NetworkStackHarnessFactory;
015: import com.tc.net.protocol.PlainNetworkStackHarnessFactory;
016: import com.tc.net.protocol.tcm.msgs.PingMessage;
017: import com.tc.net.protocol.transport.DefaultConnectionIdFactory;
018: import com.tc.net.protocol.transport.NullConnectionPolicy;
019: import com.tc.net.protocol.transport.WireProtocolMessage;
020: import com.tc.net.protocol.transport.WireProtocolMessageSink;
021: import com.tc.object.session.NullSessionManager;
022: import com.tc.test.TCTestCase;
023: import com.tc.util.SequenceGenerator;
024: import com.tc.util.TCTimeoutException;
025: import com.tc.util.concurrent.ThreadUtil;
026:
027: import gnu.trove.TLongHashSet;
028:
029: import java.io.IOException;
030: import java.net.ConnectException;
031: import java.net.UnknownHostException;
032: import java.text.SimpleDateFormat;
033: import java.util.Date;
034: import java.util.HashSet;
035: import java.util.LinkedList;
036: import java.util.List;
037: import java.util.Random;
038:
039: /**
040: * This is a test case for MessageChannel. XXX: This test could use some work. It's not very coherent and uses sleeps.
041: * --Orion 12/19/2005
042: */
043: public class MessageChannelTest extends TCTestCase {
044: static final int ITERATIONS = 100;
045: static final int WAIT_PERIOD = 100;
046: static final int WAIT = ITERATIONS * WAIT_PERIOD;
047: static final int MESSAGE_COUNT = 250;
048:
049: TCLogger logger = TCLogging.getLogger(getClass());
050: NetworkListener lsnr;
051: CommunicationsManager clientComms;
052: CommunicationsManager serverComms;
053: ClientMessageChannel clientChannel;
054: SequenceGenerator sequence;
055: MessageSendAndReceiveWatcher clientWatcher;
056: MessageSendAndReceiveWatcher serverWatcher;
057: SynchronizedRef error = new SynchronizedRef(null);
058: SequenceGenerator sq = new SequenceGenerator();
059:
060: private int port = 0;
061:
062: // public MessageChannelTest() {
063: // disableAllUntil("2006-02-15");
064: // }
065:
066: protected void setUp(int maxReconnectTries) throws Exception {
067: setUp(maxReconnectTries, false);
068: }
069:
070: protected void setUp(int maxReconnectTries,
071: boolean allowConnectionReplace) throws Exception {
072: setUp(maxReconnectTries, allowConnectionReplace, false);
073: }
074:
075: protected void setUp(int maxReconnectTries,
076: boolean allowConnectionReplace, boolean dumbSink)
077: throws Exception {
078: setUp(maxReconnectTries, new PlainNetworkStackHarnessFactory(
079: allowConnectionReplace),
080: new PlainNetworkStackHarnessFactory(
081: allowConnectionReplace), dumbSink);
082: }
083:
084: protected void setUp(int maxReconnectTries,
085: NetworkStackHarnessFactory clientStackHarnessFactory,
086: NetworkStackHarnessFactory serverStackHarnessFactory,
087: boolean dumbServerSink) throws Exception {
088: super .setUp();
089:
090: clientWatcher = new MessageSendAndReceiveWatcher();
091: serverWatcher = new MessageSendAndReceiveWatcher();
092:
093: this .sequence = new SequenceGenerator();
094:
095: MessageMonitor mm = new NullMessageMonitor();
096: clientComms = new CommunicationsManagerImpl(mm,
097: clientStackHarnessFactory, new NullConnectionPolicy());
098: serverComms = new CommunicationsManagerImpl(mm,
099: serverStackHarnessFactory, new NullConnectionPolicy());
100:
101: initListener(clientWatcher, serverWatcher, dumbServerSink);
102: this .clientChannel = createClientMessageChannel(maxReconnectTries);
103: this .setUpClientReceiveSink();
104: }
105:
106: private void initListener(
107: final MessageSendAndReceiveWatcher myClientSenderWatcher,
108: final MessageSendAndReceiveWatcher myServerSenderWatcher,
109: boolean dumbServerSink) throws IOException,
110: TCTimeoutException {
111: if (lsnr != null) {
112: lsnr.stop(WAIT);
113: }
114:
115: if (dumbServerSink) {
116: lsnr = serverComms.createListener(new NullSessionManager(),
117: new TCSocketAddress(port), false,
118: new DefaultConnectionIdFactory(),
119: new WireProtocolMessageSink() {
120:
121: public void putMessage(
122: WireProtocolMessage message) {
123: // Thanks for the message.
124: // But i don't give you back anything
125: // as i am Dumb.
126: }
127: }
128:
129: );
130: } else {
131: lsnr = serverComms.createListener(new NullSessionManager(),
132: new TCSocketAddress(port), false,
133: new DefaultConnectionIdFactory());
134: }
135:
136: lsnr.addClassMapping(TCMessageType.PING_MESSAGE,
137: PingMessage.class);
138: lsnr.routeMessageType(TCMessageType.PING_MESSAGE,
139: new TCMessageSink() {
140: public void putMessage(TCMessage message)
141: throws UnsupportedMessageTypeException {
142: // System.out.println(message);
143:
144: PingMessage ping = (PingMessage) message;
145: try {
146: message.hydrate();
147: } catch (Exception e) {
148: setError(e);
149: }
150: myClientSenderWatcher.addMessageReceived(ping);
151:
152: PingMessage pong = ping.createResponse();
153: pong.send();
154: myServerSenderWatcher.addMessageSent(pong);
155: }
156: });
157: lsnr.start(new HashSet());
158: this .port = lsnr.getBindPort();
159: }
160:
161: protected void tearDown() throws Exception {
162: super .tearDown();
163:
164: final Throwable lastError = (Throwable) error.get();
165: if (lastError != null) {
166: throw new Exception(lastError);
167: }
168:
169: if (lsnr != null)
170: lsnr.stop(WAIT);
171: if (this .clientChannel != null)
172: this .clientChannel.close();
173: if (clientComms != null)
174: clientComms.shutdown();
175: if (serverComms != null)
176: serverComms.shutdown();
177: }
178:
179: public void testAttachments() throws Exception {
180: setUp(10);
181: String key = "key";
182: MessageChannel channel = createClientMessageChannel(10);
183: assertNull(channel.getAttachment(key));
184: assertNull(channel.removeAttachment(key));
185:
186: Object attachment = new Object();
187: Object attachment2 = new Object();
188: channel.addAttachment(key, attachment, false);
189: assertSame(attachment, channel.getAttachment(key));
190: channel.addAttachment(key, attachment, false);
191: assertSame(attachment, channel.getAttachment(key));
192:
193: channel.addAttachment(key, attachment2, true);
194: assertSame(attachment2, channel.getAttachment(key));
195:
196: Object removed = channel.removeAttachment(key);
197: assertSame(attachment2, removed);
198:
199: removed = channel.removeAttachment(key);
200: assertNull(removed);
201: assertNull(channel.getAttachment(key));
202: }
203:
204: public void testOpenRaceWithAutoReconnect() throws Exception {
205: setUp(-1, false, true);
206:
207: Thread t = new Thread() {
208: public void run() {
209: ThreadUtil.reallySleep(WAIT / 2);
210: serverComms.getConnectionManager().shutdown();
211: System.err.println("closed connections on server side");
212: }
213: };
214:
215: t.start();
216:
217: try {
218: clientChannel.open();
219: fail();
220: } catch (TCTimeoutException e) {
221: // expected;
222: System.err
223: .println("Expected: got timeout exception for first open() : "
224: + e);
225: }
226:
227: try {
228: clientChannel.open();
229: fail();
230: } catch (ConnectException e) {
231: // expected
232: System.err.println("Expected: Connection Error: " + e);
233: }
234: }
235:
236: public void testAutomaticReconnect() throws Exception {
237: setUp(10, true);
238: assertEquals(0, clientChannel.getConnectCount());
239: assertEquals(0, clientChannel.getConnectAttemptCount());
240: clientChannel.open();
241: assertEquals(1, clientChannel.getConnectCount());
242: assertEquals(1, clientChannel.getConnectAttemptCount());
243:
244: final int closeCount = new Random().nextInt(MESSAGE_COUNT);
245:
246: for (int i = 0; i < MESSAGE_COUNT; i++) {
247: if (i == closeCount) {
248: waitForArrivalOrFail(clientWatcher, i);
249: waitForArrivalOrFail(serverWatcher, i);
250: clientComms.getConnectionManager().closeAllConnections(
251: WAIT);
252: if (!waitUntilReconnected()) {
253: fail("Didn't reconnect");
254: }
255: }
256: createAndSendMessage();
257: }
258: assertTrue(clientChannel.getConnectAttemptCount() > 1);
259: assertTrue(clientChannel.getConnectCount() > 1);
260:
261: waitForMessages(MESSAGE_COUNT);
262: }
263:
264: private void waitForMessages(int count) throws InterruptedException {
265: waitForArrivalOrFail(clientWatcher, count);
266: waitForArrivalOrFail(serverWatcher, count);
267:
268: String msg = "expected: " + count + ", client sent: "
269: + clientWatcher.sent() + ", client received: "
270: + clientWatcher.received() + ", server sent: "
271: + serverWatcher.sent() + ", server received: "
272: + serverWatcher.received();
273:
274: assertEquals(msg, count, clientWatcher.sent());
275: assertEquals(msg, count, clientWatcher.received());
276: assertEquals(msg, count, serverWatcher.sent());
277: assertEquals(msg, count, serverWatcher.received());
278: }
279:
280: public void testManualReconnectAfterFailure() throws Exception {
281: setUp(0);
282:
283: lsnr.stop(WAIT);
284: serverComms.getConnectionManager().closeAllConnections(WAIT);
285: clientComms.getConnectionManager().closeAllConnections(WAIT);
286:
287: for (int i = 0; i < 10; i++) {
288: try {
289: clientChannel.open();
290: fail("Should have thrown an exception");
291: } catch (TCTimeoutException e) {
292: // expected
293: } catch (UnknownHostException e) {
294: fail(e.getMessage());
295: } catch (IOException e) {
296: // expected
297: }
298:
299: assertFalse(clientChannel.isConnected());
300: }
301:
302: initListener(this .clientWatcher, this .serverWatcher, false);
303: clientChannel.open();
304: assertTrue(clientChannel.isConnected());
305: }
306:
307: public void testSendAfterDisconnect() throws Exception {
308: setUp(0);
309: clientChannel.open();
310:
311: createAndSendMessage();
312: waitForArrivalOrFail(clientWatcher, 1);
313: waitForArrivalOrFail(serverWatcher, 1);
314:
315: sendMessagesWhileDisconnected(MESSAGE_COUNT, 25);
316:
317: // don't explicitly need to do this, but if we wait, it's possible an error will happen on another thread
318: ThreadUtil.reallySleep(5000);
319: }
320:
321: public void testZeroMaxRetriesDoesntAutoreconnect()
322: throws Exception {
323: setUp(0);
324: assertEquals(0, clientChannel.getConnectAttemptCount());
325: assertEquals(0, clientChannel.getConnectCount());
326:
327: clientChannel.open();
328: assertEquals(1, clientChannel.getConnectAttemptCount());
329: assertEquals(1, clientChannel.getConnectCount());
330: clientComms.getConnectionManager().closeAllConnections(WAIT);
331: ThreadUtil.reallySleep(5000);
332: assertEquals(1, clientChannel.getConnectAttemptCount());
333: assertEquals(1, clientChannel.getConnectCount());
334: }
335:
336: public void testNegativeMaxRetriesAlwaysReconnects()
337: throws Exception {
338: setUp(-1);
339:
340: assertEquals(0, clientChannel.getConnectCount());
341: assertEquals(0, clientChannel.getConnectAttemptCount());
342:
343: clientChannel.open();
344:
345: assertEquals(1, clientChannel.getConnectCount());
346: assertEquals(1, clientChannel.getConnectAttemptCount());
347:
348: lsnr.stop(WAIT);
349: assertEquals(0, serverComms.getAllListeners().length);
350:
351: clientComms.getConnectionManager().closeAllConnections(5000);
352: int count = clientChannel.getConnectAttemptCount();
353: ThreadUtil.reallySleep(WAIT * 4);
354: assertTrue(clientChannel.getConnectAttemptCount() + " vs "
355: + count, clientChannel.getConnectAttemptCount() > count);
356: assertEquals(1, clientChannel.getConnectCount());
357: }
358:
359: // public void testSendBeforeOpen() throws Exception {
360: // setUp(0);
361: // PingMessage ping = createMessage();
362: // assertTrue(clientChannel.getStatus().isClosed());
363: // try {
364: // ping.send();
365: // fail("Should have thrown an assertion error");
366: // } catch (TCAssertionError e) {
367: // // expected
368: // }
369: // }
370: //
371: // public void testSendAfterClose() throws Exception {
372: // setUp(0);
373: // clientChannel.open();
374: // assertTrue(clientChannel.getStatus().isOpen());
375: //
376: // PingMessage ping = createMessage();
377: // clientChannel.close();
378: // assertTrue(clientChannel.isClosed());
379: //
380: // try {
381: // // send should fail
382: // ping.send();
383: // fail("should have thrown an exception");
384: // } catch (TCAssertionError err) {
385: // // expected
386: // }
387: // }
388:
389: public void testGetStatus() throws Exception {
390: setUp(0);
391: clientChannel.open();
392: assertTrue(clientChannel.isOpen());
393: clientChannel.close();
394: assertTrue(clientChannel.isClosed());
395: }
396:
397: public void testSend() throws Exception {
398: setUp(0);
399: clientChannel.open();
400: int count = 100;
401: List messages = new LinkedList();
402: for (int i = 0; i < count; i++) {
403: messages.add(createAndSendMessage());
404: }
405: waitForMessages(count);
406:
407: }
408:
409: public void testSocketInfo() throws Exception {
410: setUp(0);
411:
412: assertNull(clientChannel.getRemoteAddress());
413: assertNull(clientChannel.getLocalAddress());
414:
415: clientChannel.open();
416: createAndSendMessage();
417: waitForMessages(1);
418:
419: TCSocketAddress clientRemote = clientChannel.getRemoteAddress();
420: TCSocketAddress clientLocal = clientChannel.getLocalAddress();
421:
422: MessageChannelInternal[] serverChannels = lsnr
423: .getChannelManager().getChannels();
424: assertEquals(1, serverChannels.length);
425: MessageChannelInternal serverChannel = serverChannels[0];
426:
427: TCSocketAddress serverRemote = serverChannel.getRemoteAddress();
428: TCSocketAddress serverLocal = serverChannel.getLocalAddress();
429:
430: assertEquals(clientRemote, serverLocal);
431: assertEquals(clientLocal, serverRemote);
432: }
433:
434: private PingMessage createAndSendMessage() {
435: PingMessage ping = createMessage();
436: clientWatcher.addMessageSent(ping);
437: ping.send();
438: return ping;
439: }
440:
441: private static void waitForArrivalOrFail(
442: MessageSendAndReceiveWatcher watcher, int count)
443: throws InterruptedException {
444: int i = 0;
445: while (!watcher.allReceived() || (watcher.sent() < count)
446: || (watcher.received() < count)) {
447: if (i == ITERATIONS) {
448: fail((watcher.sent() - watcher.received())
449: + " messages of " + watcher.sent()
450: + " messages total failed to arrive in "
451: + ITERATIONS + " iterations of " + WAIT_PERIOD
452: + " ms. waiting.");
453: }
454:
455: Thread.sleep(WAIT_PERIOD);
456: i++;
457: }
458: }
459:
460: private ClientMessageChannel createClientMessageChannel(
461: int maxReconnectTries) {
462: ClientMessageChannel ch = clientComms.createClientChannel(
463: new NullSessionManager(), maxReconnectTries,
464: TCSocketAddress.LOOPBACK_IP, lsnr.getBindPort(), WAIT,
465: new ConnectionAddressProvider(
466: new ConnectionInfo[] { new ConnectionInfo(
467: "localhost", lsnr.getBindPort()) }));
468: ch.addClassMapping(TCMessageType.PING_MESSAGE,
469: PingMessage.class);
470: return ch;
471: }
472:
473: private PingMessage createMessage() {
474: PingMessage ping = (PingMessage) clientChannel
475: .createMessage(TCMessageType.PING_MESSAGE);
476: ping.initialize(sq);
477: return ping;
478: }
479:
480: private void sendMessagesWhileDisconnected(int count, int afterCount)
481: throws InterruptedException {
482: Random rnd = new Random();
483: final int closeCount = rnd.nextInt(count);
484: final boolean serverClose = rnd.nextBoolean();
485:
486: Thread thread = null;
487:
488: for (int i = 0; i < count; i++) {
489: if (i == closeCount) {
490: // close down the connection in a seperate thread to increase the timing randomness
491: thread = new Thread("Connection closer thread") {
492: public void run() {
493: try {
494: if (serverClose) {
495: logger
496: .info("Initiating close on the SERVER side...");
497: serverComms.getConnectionManager()
498: .asynchCloseAllConnections();
499: } else {
500: logger
501: .info("Initiating close on the CLIENT side...");
502: clientComms.getConnectionManager()
503: .asynchCloseAllConnections();
504: }
505: } catch (Throwable t) {
506: setError(t);
507: }
508: }
509: };
510: Thread.sleep(rnd.nextInt(25) + 10);
511: thread.setDaemon(true);
512: thread.start();
513: }
514:
515: createAndSendMessage();
516: }
517:
518: thread.join(WAIT);
519: assertFalse(thread.isAlive());
520:
521: // make sure we send messages after the connection has actually closed for good measure
522: for (int i = 0; i < afterCount; i++) {
523: createAndSendMessage();
524: }
525: }
526:
527: private boolean waitUntilReconnected() {
528: final long start = System.currentTimeMillis();
529: while (System.currentTimeMillis() - start < WAIT) {
530: if (clientChannel.isConnected())
531: return true;
532: try {
533: Thread.sleep(WAIT_PERIOD);
534: } catch (InterruptedException e) {
535: e.printStackTrace();
536: }
537: }
538: return false;
539: }
540:
541: private void setUpClientReceiveSink() {
542: final MessageSendAndReceiveWatcher myServerSenderWatcher = this .serverWatcher;
543: clientChannel.routeMessageType(TCMessageType.PING_MESSAGE,
544: new TCMessageSink() {
545: public void putMessage(TCMessage message)
546: throws UnsupportedMessageTypeException {
547: try {
548: PingMessage ping = (PingMessage) message;
549: ping.hydrate();
550: // System.out.println("CLIENT RECEIVE: " + ping.getSequence());
551: } catch (Exception e) {
552: setError(e);
553: }
554: PingMessage ping = (PingMessage) message;
555: myServerSenderWatcher.addMessageReceived(ping);
556: }
557: });
558: }
559:
560: private void setError(Throwable t) {
561: synchronized (System.err) {
562: System.err.println(new SimpleDateFormat(
563: "yyyy-MM-dd HH:mm:ss,S").format(new Date())
564: + ": Exception Thrown in thread ["
565: + Thread.currentThread().getName() + "]");
566: t.printStackTrace(System.err);
567: }
568: error.set(t);
569: }
570:
571: public class MessageSendAndReceiveWatcher {
572:
573: private TLongHashSet sentSequences = new TLongHashSet();
574: private TLongHashSet receivedSequences = new TLongHashSet();
575:
576: public synchronized void addMessageSent(PingMessage sent) {
577: sentSequences.add(sent.getSequence());
578: }
579:
580: public synchronized void addMessageReceived(PingMessage received) {
581: receivedSequences.add(received.getSequence());
582: }
583:
584: public int sent() {
585: return sentSequences.size();
586: }
587:
588: public int received() {
589: return receivedSequences.size();
590: }
591:
592: public synchronized boolean allReceived() {
593: return receivedSequences.containsAll(sentSequences
594: .toArray());
595: }
596: }
597: }
|