001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.groups;
006:
007: import com.tc.lang.TCThreadGroup;
008: import com.tc.lang.ThrowableHandler;
009: import com.tc.logging.TCLogger;
010: import com.tc.logging.TCLogging;
011: import com.tc.properties.TCPropertiesImpl;
012: import com.tc.test.TCTestCase;
013: import com.tc.util.PortChooser;
014: import com.tc.util.concurrent.NoExceptionLinkedQueue;
015: import com.tc.util.concurrent.ThreadUtil;
016: import com.tc.util.runtime.ThreadDump;
017:
018: import java.io.IOException;
019: import java.io.ObjectInput;
020: import java.io.ObjectOutput;
021: import java.util.Random;
022:
023: public class TribesGroupManagerTest extends TCTestCase {
024:
025: private static final TCLogger logger = TCLogging
026: .getLogger(TribesGroupManager.class);
027: private static short portnum = 0;
028:
029: public TribesGroupManagerTest() {
030: // use random mcast port for testing purpose.
031: useRandomMcastPort();
032: }
033:
034: /*
035: * Choose a random mcast port number to avoid conflict with other LAN machines. Must be called before joinMcast.
036: */
037: public void useRandomMcastPort() {
038: if (portnum == 0) {
039: // generate a random port number
040: Random r = new Random();
041: r.setSeed(System.currentTimeMillis());
042: portnum = (short) (r.nextInt(Short.MAX_VALUE - 1025) + 1024);
043: }
044:
045: TCPropertiesImpl.setProperty("l2.nha.tribes.mcast.mcastPort",
046: String.valueOf(portnum));
047: logger.info("McastService uses random mcast port: " + portnum);
048: }
049:
050: // public void testTribesTimeoutOnCrash() throws Exception {
051: // PortChooser pc = new PortChooser();
052: // final int p1 = pc.chooseRandomPort();
053: // final int p2 = pc.chooseRandomPort();
054: // final Node[] allNodes = new Node[] { new Node("localhost", p1), new Node("localhost", p2) };
055: //
056: // TribesGroupManager gm1 = new TribesGroupManager();
057: // MyGroupEventListener gel1 = new MyGroupEventListener();
058: // MyListener l1 = new MyListener();
059: // gm1.registerForMessages(TestMessage.class, l1);
060: // gm1.registerForGroupEvents(gel1);
061: // NodeID n1 = gm1.joinStatic(allNodes[0], allNodes);
062: //
063: // TribesGroupManager gm2 = new TribesGroupManager();
064: // MyListener l2 = new MyListener();
065: // MyGroupEventListener gel2 = new MyGroupEventListener();
066: // gm2.registerForMessages(TestMessage.class, l2);
067: // gm2.registerForGroupEvents(gel2);
068: // NodeID n2 = gm2.joinStatic(allNodes[1], allNodes);
069: // assertNotEquals(n1, n2);
070: //
071: // // setup throwable ThreadGroup to catch AssertError from threads.
072: // TCThreadGroup threadGroup = new TCThreadGroup(new ThrowableHandler(logger), "StateManagerTestGroup");
073: // ThreadUtil.reallySleep(1000);
074: //
075: // SenderThread sender = new SenderThread(threadGroup, "Node-0", gm1, Integer.MAX_VALUE, n2);
076: // Thread receiver = new ReceiverThread(threadGroup, "Node-1", l2, Integer.MAX_VALUE);
077: //
078: // System.err.println("*** Starting sending and receiving messages....");
079: // sender.start();
080: // receiver.start();
081: //
082: // ThreadUtil.reallySleep(5000);
083: // System.err.println("*** " + new Date() + " Stopping GM2 ....");
084: // gm2.stop();
085: //
086: // System.err.println("*** " + new Date() + " Waiting for sender to fail ....");
087: // ThreadUtil.reallySleep(60000);
088: //
089: // System.err.println("*** " + new Date() + " Stopping GM1 ....");
090: // gm1.stop();
091: //
092: // System.err.println("*** Test complete ....");
093: // }
094:
095: public void testIfTribesGroupManagerLoads() throws Exception {
096: GroupManager gm = GroupManagerFactory.createGroupManager();
097: assertNotNull(gm);
098: assertEquals(TribesGroupManager.class.getName(), gm.getClass()
099: .getName());
100: }
101:
102: public void testGroupEventsMcast() throws Exception {
103: TribesGroupManager gm1 = new TribesGroupManager();
104: MyGroupEventListener gel1 = new MyGroupEventListener();
105: MyListener l1 = new MyListener();
106: gm1.registerForMessages(TestMessage.class, l1);
107: gm1.registerForGroupEvents(gel1);
108: NodeID n1 = gm1.joinMcast();
109:
110: TribesGroupManager gm2 = new TribesGroupManager();
111: MyGroupEventListener gel2 = new MyGroupEventListener();
112: MyListener l2 = new MyListener();
113: gm2.registerForMessages(TestMessage.class, l2);
114: gm2.registerForGroupEvents(gel2);
115: NodeID n2 = gm2.joinMcast();
116:
117: assertTrue(checkGroupEvent("MCAST", n2, gel1, true));
118: assertTrue(checkGroupEvent("MCAST", n1, gel2, true));
119:
120: gm1.stop();
121: assertTrue(checkGroupEvent("MCAST", n1, gel2, false));
122: gm2.stop();
123: }
124:
125: public void testGroupEventsStatic() throws Exception {
126: PortChooser pc = new PortChooser();
127: final int p1 = pc.chooseRandomPort();
128: final int p2 = pc.chooseRandomPort();
129: final Node[] allNodes = new Node[] { new Node("localhost", p1),
130: new Node("localhost", p2) };
131:
132: TribesGroupManager gm1 = new TribesGroupManager();
133: MyGroupEventListener gel1 = new MyGroupEventListener();
134: MyListener l1 = new MyListener();
135: gm1.registerForMessages(TestMessage.class, l1);
136: gm1.registerForGroupEvents(gel1);
137: NodeID n1 = gm1.joinStatic(allNodes[0], allNodes);
138:
139: TribesGroupManager gm2 = new TribesGroupManager();
140: MyListener l2 = new MyListener();
141: MyGroupEventListener gel2 = new MyGroupEventListener();
142: gm2.registerForMessages(TestMessage.class, l2);
143: gm2.registerForGroupEvents(gel2);
144: NodeID n2 = gm2.joinStatic(allNodes[1], allNodes);
145: assertNotEquals(n1, n2);
146:
147: assertTrue(checkGroupEvent("STATIC", n2, gel1, true));
148: assertTrue(checkGroupEvent("STATIC", n1, gel2, true));
149:
150: gm1.stop();
151: assertTrue(checkGroupEvent("STATIC", n1, gel2, false));
152: gm2.stop();
153: }
154:
155: public void testZapNode() throws Exception {
156: PortChooser pc = new PortChooser();
157: final int p1 = pc.chooseRandomPort();
158: final int p2 = pc.chooseRandomPort();
159: final Node[] allNodes = new Node[] { new Node("localhost", p1),
160: new Node("localhost", p2) };
161:
162: TribesGroupManager gm1 = new TribesGroupManager();
163: MyGroupEventListener g1 = new MyGroupEventListener();
164: gm1.registerForGroupEvents(g1);
165: MyListener l1 = new MyListener();
166: gm1.registerForMessages(TestMessage.class, l1);
167: MyZapNodeRequestProcessor z1 = new MyZapNodeRequestProcessor();
168: gm1.setZapNodeRequestProcessor(z1);
169: NodeID n1 = gm1.joinStatic(allNodes[0], allNodes);
170:
171: TribesGroupManager gm2 = new TribesGroupManager();
172: MyListener l2 = new MyListener();
173: gm2.registerForMessages(TestMessage.class, l2);
174: MyZapNodeRequestProcessor z2 = new MyZapNodeRequestProcessor();
175: gm2.setZapNodeRequestProcessor(z2);
176: NodeID n2 = gm2.joinStatic(allNodes[1], allNodes);
177:
178: checkSendingReceivingMessages(gm1, l1, gm2, l2);
179:
180: System.err.println("ZAPPING NODE : " + n2);
181: gm1.zapNode(g1.getLastNodeJoined(), 01,
182: "test : Zap the other node " + n2 + " from " + n1);
183:
184: Object r1 = z1.outgoing.take();
185: Object r2 = z2.incoming.take();
186: assertEquals(r1, r2);
187:
188: r1 = z1.outgoing.poll(500);
189: assertNull(r1);
190: r2 = z2.incoming.poll(500);
191: assertNull(r2);
192:
193: gm1.stop();
194: gm2.stop();
195: }
196:
197: public void testIfNodeIDsAreReferenceEqual() throws Exception {
198: PortChooser pc = new PortChooser();
199: final int p1 = pc.chooseRandomPort();
200: final int p2 = pc.chooseRandomPort();
201: final Node[] allNodes = new Node[] { new Node("localhost", p1),
202: new Node("localhost", p2) };
203:
204: TribesGroupManager gm1 = new TribesGroupManager();
205: MyGroupEventListener g1 = new MyGroupEventListener();
206: gm1.registerForGroupEvents(g1);
207: MyListener l1 = new MyListener();
208: gm1.registerForMessages(TestMessage.class, l1);
209: MyZapNodeRequestProcessor z1 = new MyZapNodeRequestProcessor();
210: gm1.setZapNodeRequestProcessor(z1);
211: NodeID n1 = gm1.joinStatic(allNodes[0], allNodes);
212:
213: TribesGroupManager gm2 = new TribesGroupManager();
214: MyGroupEventListener g2 = new MyGroupEventListener();
215: gm2.registerForGroupEvents(g2);
216: MyListener l2 = new MyListener();
217: gm2.registerForMessages(TestMessage.class, l2);
218: MyZapNodeRequestProcessor z2 = new MyZapNodeRequestProcessor();
219: gm2.setZapNodeRequestProcessor(z2);
220: NodeID n2 = gm2.joinStatic(allNodes[1], allNodes);
221:
222: ThreadUtil.reallySleep(5000);
223:
224: NodeID n1sn2 = g1.getLastNodeJoined();
225: System.err.println("N1'sN2 = " + n1sn2);
226:
227: NodeID n2sn1 = g2.getLastNodeJoined();
228: System.err.println("N2'sN1 = " + n2sn1);
229:
230: TestMessage m1 = new TestMessage("Message 1");
231: gm1.sendAll(m1);
232:
233: TestMessage m2 = (TestMessage) l2.take();
234: System.err.println(m2);
235: assertTrue(n2sn1 == m2.messageFrom());
236:
237: System.err.println("Trying to ZAP NODE : " + n1);
238: // now trying to zap with different instance which is not reference equal
239: gm2.zapNode(n1, 01,
240: "test : Should be ignored : Zap the other node " + n1
241: + " from " + n2);
242:
243: Object r2 = z2.outgoing.poll(3000);
244: assertNull(r2);
245: Object r1 = z1.incoming.poll(2000);
246: assertNull(r1);
247:
248: gm2.zapNode(n2sn1, 01, "test : Zap the other node " + n1
249: + " from " + n2);
250:
251: r2 = z2.outgoing.take();
252: r1 = z1.incoming.take();
253: assertEquals(r1, r2);
254:
255: gm1.stop();
256: ThreadUtil.reallySleep(3000);
257: assertTrue(n2sn1 == g2.getLastNodeLeft());
258:
259: gm2.stop();
260: }
261:
262: public void testSendingReceivingMessagesMcast() throws Exception {
263: TribesGroupManager gm1 = new TribesGroupManager();
264: MyListener l1 = new MyListener();
265: gm1.registerForMessages(TestMessage.class, l1);
266: NodeID n1 = gm1.joinMcast();
267:
268: TribesGroupManager gm2 = new TribesGroupManager();
269: MyListener l2 = new MyListener();
270: gm2.registerForMessages(TestMessage.class, l2);
271: NodeID n2 = gm2.joinMcast();
272: assertNotEquals(n1, n2);
273: checkSendingReceivingMessages(gm1, l1, gm2, l2);
274: gm1.stop();
275: gm2.stop();
276: }
277:
278: public void testSendingReceivingMessagesStatic() throws Exception {
279: PortChooser pc = new PortChooser();
280: final int p1 = pc.chooseRandomPort();
281: final int p2 = pc.chooseRandomPort();
282: final Node[] allNodes = new Node[] { new Node("localhost", p1),
283: new Node("localhost", p2) };
284:
285: TribesGroupManager gm1 = new TribesGroupManager();
286: MyListener l1 = new MyListener();
287: gm1.registerForMessages(TestMessage.class, l1);
288: NodeID n1 = gm1.joinStatic(allNodes[0], allNodes);
289:
290: TribesGroupManager gm2 = new TribesGroupManager();
291: MyListener l2 = new MyListener();
292: gm2.registerForMessages(TestMessage.class, l2);
293: NodeID n2 = gm2.joinStatic(allNodes[1], allNodes);
294: assertNotEquals(n1, n2);
295: checkSendingReceivingMessages(gm1, l1, gm2, l2);
296: gm1.stop();
297: gm2.stop();
298: }
299:
300: private void checkSendingReceivingMessages(TribesGroupManager gm1,
301: MyListener l1, TribesGroupManager gm2, MyListener l2)
302: throws GroupException {
303: ThreadUtil.reallySleep(5 * 1000);
304:
305: TestMessage m1 = new TestMessage("Hello there");
306: gm1.sendAll(m1);
307:
308: TestMessage m2 = (TestMessage) l2.take();
309: System.err.println(m2);
310:
311: assertEquals(m1, m2);
312:
313: TestMessage m3 = new TestMessage("Hello back");
314: gm2.sendAll(m3);
315:
316: TestMessage m4 = (TestMessage) l1.take();
317: System.err.println(m4);
318:
319: assertEquals(m3, m4);
320: }
321:
322: public void testMessagesOrderingStatic() throws Exception {
323: PortChooser pc = new PortChooser();
324: final int p1 = pc.chooseRandomPort();
325: final int p2 = pc.chooseRandomPort();
326: final Node[] allNodes = new Node[] { new Node("localhost", p1),
327: new Node("localhost", p2) };
328:
329: System.err.println("Testing message Ordering - Static");
330:
331: TribesGroupManager gm1 = new TribesGroupManager();
332: NodeID n1 = gm1.joinStatic(allNodes[0], allNodes);
333: TribesGroupManager gm2 = new TribesGroupManager();
334: NodeID n2 = gm2.joinStatic(allNodes[1], allNodes);
335:
336: assertNotEquals(n1, n2);
337:
338: try {
339: checkMessagesOrdering(gm1, gm2);
340: } catch (Exception e) {
341: System.out.println("***** message order check failed: "
342: + e.getStackTrace());
343: ThreadDump.dumpThreadsMany(3, 500);
344: throw e;
345: }
346:
347: // checkMessagesOrdering(gm1, gm2);
348:
349: gm1.stop();
350: gm2.stop();
351: }
352:
353: public void testMessagesOrderingMcast() throws Exception {
354: System.err.println("Testing message Ordering - Mcast");
355: TribesGroupManager gm1 = new TribesGroupManager();
356: NodeID n1 = gm1.joinMcast();
357: TribesGroupManager gm2 = new TribesGroupManager();
358: NodeID n2 = gm2.joinMcast();
359:
360: assertNotEquals(n1, n2);
361:
362: checkMessagesOrdering(gm1, gm2);
363:
364: gm1.stop();
365: gm2.stop();
366: }
367:
368: private void checkMessagesOrdering(final TribesGroupManager mgr1,
369: final TribesGroupManager mgr2) throws GroupException {
370:
371: final Integer upbound = new Integer(50);
372: final MyListener myl1 = new MyListener();
373: final MyListener myl2 = new MyListener();
374: mgr1.registerForMessages(TestMessage.class, myl1);
375: mgr2.registerForMessages(TestMessage.class, myl2);
376:
377: // setup throwable ThreadGroup to catch AssertError from threads.
378: TCThreadGroup threadGroup = new TCThreadGroup(
379: new ThrowableHandler(logger), "StateManagerTestGroup");
380: ThreadUtil.reallySleep(1000);
381:
382: Thread t1 = new SenderThread(threadGroup, "Node-0", mgr1,
383: upbound);
384: Thread t2 = new SenderThread(threadGroup, "Node-1", mgr2,
385: upbound);
386: Thread vt1 = new ReceiverThread(threadGroup, "Node-0", myl1,
387: upbound);
388: Thread vt2 = new ReceiverThread(threadGroup, "Node-1", myl2,
389: upbound);
390:
391: System.err.println("*** Start sending ordered messages....");
392: t1.start();
393: t2.start();
394: vt1.start();
395: vt2.start();
396:
397: try {
398: t1.join();
399: t2.join();
400: vt1.join();
401: vt2.join();
402: } catch (InterruptedException x) {
403: throw new GroupException("Join interrupted:" + x);
404: }
405: System.err.println("*** Done with messages ordering test");
406:
407: }
408:
409: private boolean checkGroupEvent(String msg, NodeID n1,
410: MyGroupEventListener gel2, boolean checkNodeJoined) {
411: final String event = (checkNodeJoined) ? "NodeJoined"
412: : "NodeLeft";
413: for (int i = 0; i < 10; i++) {
414: NodeID actual = null;
415: if (checkNodeJoined)
416: actual = gel2.getLastNodeJoined();
417: else
418: actual = gel2.getLastNodeLeft();
419: System.err.println("\n### [" + msg + "] attempt # " + i
420: + " -> actual" + event + "=" + actual);
421: if (actual == null) {
422: ThreadUtil.reallySleep(1 * 500);
423: } else {
424: assertTrue(n1.equals(actual)
425: || ((NodeIDImpl) n1).getName().equals(
426: ((NodeIDImpl) actual).getName()));
427: System.err.println("\n### [" + msg + "] it took "
428: + (i * 500) + " millis to get " + event
429: + " event");
430: return true;
431: }
432: }
433: return false;
434: }
435:
436: private static final class SenderThread extends Thread {
437: TribesGroupManager mgr;
438: Integer upbound;
439: Integer index = new Integer(0);
440: NodeID node;
441:
442: public SenderThread(ThreadGroup group, String name,
443: TribesGroupManager mgr, Integer upbound) {
444: this (group, name, mgr, upbound, NodeIDImpl.NULL_ID);
445: }
446:
447: public SenderThread(ThreadGroup group, String name,
448: TribesGroupManager mgr, Integer upbound, NodeID node) {
449: super (group, name);
450: this .mgr = mgr;
451: this .upbound = upbound;
452: this .node = node;
453: }
454:
455: public void run() {
456: while (index <= upbound) {
457: TestMessage msg = new TestMessage(index.toString());
458: if (index % 10 == 0)
459: System.err.println("*** " + getName() + " sends "
460: + index);
461: try {
462: if (node.isNull()) {
463: mgr.sendAll(msg);
464: } else {
465: mgr.sendTo(node, msg);
466: }
467: } catch (Exception x) {
468: System.err.println("Got exception : " + getName()
469: + " " + x.getMessage());
470: x.printStackTrace();
471: throw new RuntimeException(
472: "sendAll GroupException:" + x);
473: }
474: // ThreadUtil.reallySleep(100);
475: ++index;
476: }
477: }
478: }
479:
480: private static final class ReceiverThread extends Thread {
481: MyListener l;
482: Integer upbound;
483: Integer index = new Integer(0);
484:
485: public ReceiverThread(ThreadGroup group, String name,
486: MyListener l, Integer upbound) {
487: super (group, name);
488: this .l = l;
489: this .upbound = upbound;
490: }
491:
492: public void run() {
493: while (index <= upbound) {
494: TestMessage msg = (TestMessage) l.take();
495: if (index % 10 == 0)
496: System.err.println("*** " + getName()
497: + " receives " + msg);
498: assertEquals(new TestMessage(index.toString()), msg);
499: index++;
500: }
501: }
502:
503: }
504:
505: private static final class MyGroupEventListener implements
506: GroupEventsListener {
507:
508: private NodeID lastNodeJoined;
509: private NodeID lastNodeLeft;
510:
511: public void nodeJoined(NodeID nodeID) {
512: System.err.println("\n### nodeJoined -> " + nodeID);
513: lastNodeJoined = nodeID;
514: }
515:
516: public void nodeLeft(NodeID nodeID) {
517: System.err.println("\n### nodeLeft -> " + nodeID);
518: lastNodeLeft = nodeID;
519: }
520:
521: public NodeID getLastNodeJoined() {
522: return lastNodeJoined;
523: }
524:
525: public NodeID getLastNodeLeft() {
526: return lastNodeLeft;
527: }
528:
529: public void reset() {
530: lastNodeJoined = lastNodeLeft = null;
531: }
532: }
533:
534: private static final class MyListener implements
535: GroupMessageListener {
536:
537: NoExceptionLinkedQueue queue = new NoExceptionLinkedQueue();
538:
539: public void messageReceived(NodeID fromNode, GroupMessage msg) {
540: queue.put(msg);
541: }
542:
543: public GroupMessage take() {
544: return (GroupMessage) queue.take();
545: }
546:
547: }
548:
549: private static final class TestMessage extends AbstractGroupMessage {
550:
551: // to make serialization sane
552: public TestMessage() {
553: super (0);
554: }
555:
556: public TestMessage(String message) {
557: super (0);
558: this .msg = message;
559: }
560:
561: String msg;
562:
563: @Override
564: protected void basicReadExternal(int msgType, ObjectInput in)
565: throws IOException {
566: msg = in.readUTF();
567:
568: }
569:
570: @Override
571: protected void basicWriteExternal(int msgType, ObjectOutput out)
572: throws IOException {
573: out.writeUTF(msg);
574:
575: }
576:
577: public int hashCode() {
578: return msg.hashCode();
579: }
580:
581: public boolean equals(Object o) {
582: if (o instanceof TestMessage) {
583: TestMessage other = (TestMessage) o;
584: return this .msg.equals(other.msg);
585: }
586: return false;
587: }
588:
589: public String toString() {
590: return "TestMessage [ " + msg + "]";
591: }
592: }
593:
594: private static final class MyZapNodeRequestProcessor implements
595: ZapNodeRequestProcessor {
596:
597: public NoExceptionLinkedQueue outgoing = new NoExceptionLinkedQueue();
598: public NoExceptionLinkedQueue incoming = new NoExceptionLinkedQueue();
599:
600: public boolean acceptOutgoingZapNodeRequest(NodeID nodeID,
601: int type, String reason) {
602: outgoing.put(reason);
603: return true;
604: }
605:
606: public void incomingZapNodeRequest(NodeID nodeID,
607: int zapNodeType, String reason, long[] weights) {
608: incoming.put(reason);
609: }
610:
611: public long[] getCurrentNodeWeights() {
612: return new long[0];
613: }
614:
615: }
616: }
|