001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.net.protocol.tcm;
005:
006: import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
007:
008: import com.tc.net.protocol.tcm.msgs.PingMessage;
009: import com.tc.util.concurrent.SetOnceFlag;
010:
011: import java.security.SecureRandom;
012: import java.util.Random;
013:
014: import junit.framework.TestCase;
015:
016: public class TCMessageRouterTest extends TestCase {
017:
018: public void testDefaultRoute() {
019:
020: try {
021: TCMessageRouter router = new TCMessageRouterImpl();
022: router.putMessage(createMessage());
023: fail();
024: } catch (UnsupportedMessageTypeException umte) {
025: // expected
026: }
027:
028: final SynchronizedRef msg = new SynchronizedRef(null);
029: TCMessageRouter router = new TCMessageRouterImpl(
030: new TCMessageSink() {
031: public void putMessage(TCMessage message) {
032: msg.set(message);
033: }
034: });
035: TCMessage message = createMessage();
036: router.putMessage(message);
037: assertSame(message, msg.get());
038:
039: msg.set(null);
040: router.routeMessageType(TCMessageType.PING_MESSAGE,
041: new TCMessageSink() {
042: public void putMessage(TCMessage m) {
043: // ignore it
044: }
045: });
046: router.putMessage(createMessage());
047: assertNull(msg.get());
048: }
049:
050: public void testRouteByType() {
051: final SynchronizedRef defmsg = new SynchronizedRef(null);
052: TCMessageRouter router = new TCMessageRouterImpl(
053: new TCMessageSink() {
054: public void putMessage(TCMessage m) {
055: defmsg.set(m);
056: }
057: });
058:
059: final SynchronizedRef msg = new SynchronizedRef(null);
060: router.routeMessageType(TCMessageType.PING_MESSAGE,
061: new TCMessageSink() {
062: public void putMessage(TCMessage m) {
063: msg.set(m);
064: }
065: });
066: TCMessage message = createMessage();
067: router.putMessage(message);
068: assertSame(message, msg.get());
069: assertNull(defmsg.get());
070:
071: msg.set(null);
072: defmsg.set(null);
073: router.unrouteMessageType(TCMessageType.PING_MESSAGE);
074: router.putMessage(message);
075: assertNull(msg.get());
076: assertSame(message, defmsg.get());
077: }
078:
079: public void testConcurrency() throws Exception {
080: final Random random = new SecureRandom();
081: final SynchronizedRef error = new SynchronizedRef(null);
082: final SetOnceFlag stop = new SetOnceFlag();
083: final TCMessageSink nullSink = new TCMessageSink() {
084: public void putMessage(TCMessage message) {
085: // nada
086: }
087: };
088: final TCMessageRouter router = new TCMessageRouterImpl(nullSink);
089:
090: final Runnable putter = new Runnable() {
091: public void run() {
092: TCMessage msg = createMessage();
093: try {
094: while (true) {
095: for (int i = 0; i < 100; i++) {
096: router.putMessage(msg);
097: }
098: if (stop.isSet()) {
099: return;
100: }
101: }
102: } catch (Throwable t) {
103: setError(t, error);
104: }
105: }
106: };
107:
108: final Runnable changer = new Runnable() {
109: public void run() {
110: try {
111: while (true) {
112: for (int i = 0; i < 100; i++) {
113: if (random.nextBoolean()) {
114: router.routeMessageType(
115: TCMessageType.PING_MESSAGE,
116: nullSink);
117: } else {
118: router
119: .unrouteMessageType(TCMessageType.PING_MESSAGE);
120: }
121: }
122: if (stop.isSet()) {
123: return;
124: }
125: }
126: } catch (Throwable t) {
127: setError(t, error);
128: }
129: }
130: };
131:
132: Thread[] threads = new Thread[10];
133: for (int i = 0; i < 5; i++) {
134: threads[i] = new Thread(putter);
135: threads[5 + i] = new Thread(changer);
136: }
137:
138: for (int i = 0; i < threads.length; i++) {
139: threads[i].setDaemon(true);
140: threads[i].start();
141: }
142:
143: Thread.sleep(5000);
144: stop.set();
145:
146: for (int i = 0; i < threads.length; i++) {
147: threads[i].join(5000);
148: }
149:
150: assertNull(error.get());
151: }
152:
153: private static void setError(Throwable t, SynchronizedRef error) {
154: t.printStackTrace();
155: error.set(t);
156: }
157:
158: private PingMessage createMessage() {
159: PingMessage rv = new PingMessage(new NullMessageMonitor());
160: rv.dehydrate();
161: return rv;
162: }
163:
164: }
|