001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.state;
006:
007: import com.tc.async.api.EventContext;
008: import com.tc.async.api.Sink;
009: import com.tc.async.impl.MockSink;
010: import com.tc.l2.context.StateChangedEvent;
011: import com.tc.l2.ha.WeightGeneratorFactory;
012: import com.tc.l2.msg.L2StateMessage;
013: import com.tc.lang.TCThreadGroup;
014: import com.tc.lang.ThrowableHandler;
015: import com.tc.logging.TCLogger;
016: import com.tc.logging.TCLogging;
017: import com.tc.net.groups.AbstractGroupMessage;
018: import com.tc.net.groups.GroupEventsListener;
019: import com.tc.net.groups.GroupManagerFactory;
020: import com.tc.net.groups.GroupMessage;
021: import com.tc.net.groups.GroupMessageListener;
022: import com.tc.net.groups.Node;
023: import com.tc.net.groups.NodeID;
024: import com.tc.net.groups.TribesGroupManager;
025: import com.tc.properties.TCPropertiesImpl;
026: import com.tc.test.TCTestCase;
027: import com.tc.util.PortChooser;
028: import com.tc.util.State;
029: import com.tc.util.concurrent.NoExceptionLinkedQueue;
030: import com.tc.util.concurrent.ThreadUtil;
031:
032: import java.io.IOException;
033: import java.io.ObjectInput;
034: import java.io.ObjectOutput;
035: import java.util.Random;
036:
037: public class StateManagerTest extends TCTestCase {
038:
039: private static final TCLogger logger = TCLogging
040: .getLogger(StateManagerImpl.class);
041: private static short portnum = 0;
042:
043: public StateManagerTest() {
044: // disableAllUntil("2007-05-23");
045: useRandomMcastPort();
046: }
047:
048: /*
049: * Choose a random mcast port number to avoid conflict with other LAN machines. Must be called before joinMcast.
050: */
051: public void useRandomMcastPort() {
052: if (portnum == 0) {
053: // generate a random port number
054: Random r = new Random();
055: r.setSeed(System.currentTimeMillis());
056: portnum = (short) (r.nextInt(Short.MAX_VALUE - 1025) + 1024);
057: }
058:
059: TCPropertiesImpl.setProperty("l2.nha.tribes.mcast.mcastPort",
060: String.valueOf(portnum));
061: logger.info("McastService uses random mcast port: " + portnum);
062: }
063:
064: public void testStateManagerTwoServers() throws Exception {
065: // 2 nodes join concurrently
066: TCThreadGroup threadGroup = new TCThreadGroup(
067: new ThrowableHandler(logger), "StateManagerTestGroup");
068: // setup throwable ThreadGroup to catch AssertError from Tribes threads.
069: Thread throwableThread = new Thread(threadGroup,
070: new Runnable() {
071: public void run() {
072: try {
073: nodesConcurrentJoining(2);
074: } catch (Exception e) {
075: throw new RuntimeException(
076: "testStateManagerTwoServers failed!");
077: }
078: }
079: });
080: throwableThread.start();
081: throwableThread.join();
082: }
083:
084: public void testStateManagerThreeServers() throws Exception {
085: // 3 nodes join concurrently
086: TCThreadGroup threadGroup = new TCThreadGroup(
087: new ThrowableHandler(logger), "StateManagerTestGroup");
088: // setup throwable ThreadGroup to catch AssertError from Tribes threads.
089: Thread throwableThread = new Thread(threadGroup,
090: new Runnable() {
091: public void run() {
092: try {
093: nodesConcurrentJoining(3);
094: } catch (Exception e) {
095: throw new RuntimeException(
096: "testStateManagerThreeServers failed!");
097: }
098: }
099: });
100: throwableThread.start();
101: throwableThread.join();
102: }
103:
104: public void testStateManagerSixServers() throws Exception {
105: // 6 nodes join concurrently
106: TCThreadGroup threadGroup = new TCThreadGroup(
107: new ThrowableHandler(logger), "StateManagerTestGroup");
108: // setup throwable ThreadGroup to catch AssertError from Tribes threads.
109: Thread throwableThread = new Thread(threadGroup,
110: new Runnable() {
111: public void run() {
112: try {
113: nodesConcurrentJoining(6);
114: } catch (Exception e) {
115: throw new RuntimeException(
116: "testStateManagerSixServers failed!");
117: }
118: }
119: });
120: throwableThread.start();
121: throwableThread.join();
122: }
123:
124: public void testStateManagerMixJoinAndElect3() throws Exception {
125: // 3 nodes mix join and election
126: TCThreadGroup threadGroup = new TCThreadGroup(
127: new ThrowableHandler(logger), "StateManagerTestGroup");
128: // setup throwable ThreadGroup to catch AssertError from Tribes threads.
129: Thread throwableThread = new Thread(threadGroup,
130: new Runnable() {
131: public void run() {
132: try {
133: nodesMixJoinAndElect(3);
134: } catch (Exception e) {
135: throw new RuntimeException(
136: "testStateManagerMixJoinAndElect3 failed!");
137: }
138: }
139: });
140: throwableThread.start();
141: throwableThread.join();
142: }
143:
144: public void testStateManagerMixJoinAndElect6() throws Exception {
145: // 6 nodes mix join and election
146: TCThreadGroup threadGroup = new TCThreadGroup(
147: new ThrowableHandler(logger), "StateManagerTestGroup");
148: // setup throwable ThreadGroup to catch AssertError from Tribes threads.
149: Thread throwableThread = new Thread(threadGroup,
150: new Runnable() {
151: public void run() {
152: try {
153: nodesMixJoinAndElect(6);
154: } catch (Exception e) {
155: throw new RuntimeException(
156: "testStateManagerMixJoinAndElect6 failed!");
157: }
158: }
159: });
160: throwableThread.start();
161: throwableThread.join();
162: }
163:
164: public void testStateManagerJoinLater3() throws Exception {
165: // first node shall be active and remaining 5 nodes join later
166: TCThreadGroup threadGroup = new TCThreadGroup(
167: new ThrowableHandler(logger), "StateManagerTestGroup");
168: // setup throwable ThreadGroup to catch AssertError from Tribes threads.
169: Thread throwableThread = new Thread(threadGroup,
170: new Runnable() {
171: public void run() {
172: try {
173: nodesJoinLater(3);
174: } catch (Exception e) {
175: throw new RuntimeException(
176: "testStateManagerJoinLater3 failed!");
177: }
178: }
179: });
180: throwableThread.start();
181: throwableThread.join();
182: }
183:
184: public void testStateManagerJoinLater6() throws Exception {
185: // first node shall be active and remaining 5 nodes join later
186: TCThreadGroup threadGroup = new TCThreadGroup(
187: new ThrowableHandler(logger), "StateManagerTestGroup");
188: // setup throwable ThreadGroup to catch AssertError from Tribes threads.
189: Thread throwableThread = new Thread(threadGroup,
190: new Runnable() {
191: public void run() {
192: try {
193: nodesJoinLater(6);
194: } catch (Exception e) {
195: throw new RuntimeException(
196: "testStateManagerJoinLater6 failed!");
197: }
198: }
199: });
200: throwableThread.start();
201: throwableThread.join();
202: }
203:
204: // -----------------------------------------------------------------------
205:
206: private void nodesConcurrentJoining(int nodes) throws Exception {
207: System.out.println("*** Testing " + nodes
208: + " nodes join at same time.");
209:
210: TribesGroupManager[] groupMgr = new TribesGroupManager[nodes];
211: ChangeSink[] sinks = new ChangeSink[nodes];
212: PortChooser pc = new PortChooser();
213: int[] ports = new int[nodes];
214: Node[] allNodes = new Node[nodes];
215: for (int i = 0; i < nodes; ++i) {
216: ports[i] = pc.chooseRandomPort();
217: allNodes[i] = new Node("localhost", ports[i]);
218: }
219:
220: StateManager[] managers = new StateManager[nodes];
221: ElectionThread[] elections = new ElectionThread[nodes];
222: L2StateMessageStage[] msgStage = new L2StateMessageStage[nodes];
223: for (int i = 0; i < nodes; ++i) {
224: managers[i] = createStateManageNode(i, sinks, groupMgr,
225: msgStage);
226: elections[i] = new ElectionThread(managers[i]);
227: }
228:
229: // joining
230: System.out.println("*** Start Joining...");
231: for (int i = 0; i < nodes; ++i) {
232: groupMgr[i].join(allNodes[i], allNodes);
233: }
234:
235: System.out.println("*** Start Election...");
236: // run them concurrently
237: for (int i = 0; i < nodes; ++i) {
238: elections[i].start();
239: }
240: for (int i = 0; i < nodes; ++i) {
241: elections[i].join();
242: }
243:
244: Thread.sleep(1000);
245: // verification
246: int activeCount = 0;
247: for (int i = 0; i < nodes; ++i) {
248: boolean active = managers[i].isActiveCoordinator();
249: if (active)
250: ++activeCount;
251: System.out.println("*** Server[" + i + "] state is "
252: + sinks[i]);
253: }
254: assertEquals("Active coordinator", 1, activeCount);
255:
256: shutdown(groupMgr, msgStage);
257: }
258:
259: private void shutdown(TribesGroupManager[] groupMgr,
260: L2StateMessageStage[] msgStage) {
261: // shut them down
262: shutdown(groupMgr, msgStage, 0, groupMgr.length);
263: }
264:
265: private void shutdown(TribesGroupManager[] groupMgr,
266: L2StateMessageStage[] msgStage, int start, int end) {
267: for (int i = start; i < end; ++i) {
268: try {
269: msgStage[i].requestStop();
270: ThreadUtil.reallySleep(100);
271: groupMgr[i].stop();
272: } catch (Exception ex) {
273: System.out.println("*** Failed to stop Server[" + i
274: + "] " + groupMgr[i] + " " + ex);
275: }
276: }
277: System.out.println("*** shutdown done");
278: }
279:
280: private void nodesMixJoinAndElect(int nodes) throws Exception {
281: System.out.println("*** Testing " + nodes
282: + " nodes mixed join and election at same time.");
283:
284: TribesGroupManager[] groupMgr = new TribesGroupManager[nodes];
285: ChangeSink[] sinks = new ChangeSink[nodes];
286: PortChooser pc = new PortChooser();
287: int[] ports = new int[nodes];
288: Node[] allNodes = new Node[nodes];
289: for (int i = 0; i < nodes; ++i) {
290: ports[i] = pc.chooseRandomPort();
291: allNodes[i] = new Node("localhost", ports[i]);
292: }
293:
294: StateManager[] managers = new StateManager[nodes];
295: ElectionThread[] elections = new ElectionThread[nodes];
296: L2StateMessageStage[] msgStage = new L2StateMessageStage[nodes];
297: for (int i = 0; i < nodes; ++i) {
298: managers[i] = createStateManageNode(i, sinks, groupMgr,
299: msgStage);
300: elections[i] = new ElectionThread(managers[i]);
301: }
302:
303: // Joining and Electing
304: System.out.println("*** Start Joining and Electing...");
305: groupMgr[0].join(allNodes[0], allNodes);
306: for (int i = 0; i < nodes - 1; ++i) {
307: elections[i].start();
308: groupMgr[i + 1].join(allNodes[i + 1], allNodes);
309: }
310: elections[nodes - 1].start();
311:
312: for (int i = 0; i < nodes; ++i) {
313: elections[i].join();
314: }
315:
316: Thread.sleep(1000);
317: // verification
318: int activeCount = 0;
319: for (int i = 0; i < nodes; ++i) {
320: boolean active = managers[i].isActiveCoordinator();
321: if (active)
322: ++activeCount;
323: System.out.println("*** Server[" + i + "] state is "
324: + sinks[i]);
325: }
326: assertEquals("Active coordinator", 1, activeCount);
327:
328: shutdown(groupMgr, msgStage);
329: }
330:
331: private void nodesJoinLater(int nodes) throws Exception {
332: System.out.println("*** Testing " + nodes
333: + " nodes join at later time.");
334:
335: NodeID[] ids = new NodeID[nodes];
336: ChangeSink[] sinks = new ChangeSink[nodes];
337: TribesGroupManager[] groupMgr = new TribesGroupManager[nodes];
338: PortChooser pc = new PortChooser();
339: int[] ports = new int[nodes];
340: Node[] allNodes = new Node[nodes];
341: for (int i = 0; i < nodes; ++i) {
342: ports[i] = pc.chooseRandomPort();
343: allNodes[i] = new Node("localhost", ports[i]);
344: }
345:
346: final StateManager[] managers = new StateManager[nodes];
347: ElectionThread[] elections = new ElectionThread[nodes];
348: L2StateMessageStage[] msgStage = new L2StateMessageStage[nodes];
349: for (int i = 0; i < nodes; ++i) {
350: managers[i] = createStateManageNode(i, sinks, groupMgr,
351: msgStage);
352: elections[i] = new ElectionThread(managers[i]);
353: }
354:
355: // the first node to be the active one
356: System.out
357: .println("*** First node joins to be an active node...");
358: ids[0] = groupMgr[0].join(allNodes[0], allNodes);
359: managers[0].startElection();
360: Thread.sleep(100);
361:
362: // move following join nodes to passive-standby
363: groupMgr[0].registerForGroupEvents(new MyGroupEventListener() {
364: public void nodeJoined(NodeID nodeID) {
365: System.out.println("*** moveNodeToPassiveStandby -> "
366: + nodeID);
367: managers[0].moveNodeToPassiveStandby(nodeID);
368: // managers[0].publishActiveState(nodeID);
369: }
370: });
371:
372: System.out.println("*** Remaining nodes join");
373: for (int i = 1; i < nodes; ++i) {
374: ids[i] = groupMgr[i].join(allNodes[i], allNodes);
375: }
376:
377: Thread.sleep(1000);
378: // verification: first node must be active
379: int activeCount = 0;
380: for (int i = 0; i < nodes; ++i) {
381: boolean active = managers[i].isActiveCoordinator();
382: if (active)
383: ++activeCount;
384: System.out.println("*** Server[" + i + "] state is "
385: + sinks[i]);
386: }
387: assertEquals("Active coordinator", 1, activeCount);
388: assertTrue("Node-0 must be active coordinator", managers[0]
389: .isActiveCoordinator());
390:
391: // check API
392: try {
393: // active is supported not to move itself to passive stand-by
394: managers[0].moveNodeToPassiveStandby(ids[0]);
395: throw new RuntimeException(
396: "moveNodeToPassiveStandy expected to trows an expection");
397: } catch (Exception x) {
398: // expected
399: }
400:
401: System.out.println("*** Stop active and re-elect");
402: // stop active node
403: shutdown(groupMgr, msgStage, 0, 1);
404:
405: ElectionIfNecessaryThread reElectThreads[] = new ElectionIfNecessaryThread[nodes];
406: for (int i = 1; i < nodes; ++i) {
407: reElectThreads[i] = new ElectionIfNecessaryThread(
408: managers[i], ids[0]);
409: }
410: for (int i = 1; i < nodes; ++i) {
411: reElectThreads[i].start();
412: }
413: for (int i = 1; i < nodes; ++i) {
414: reElectThreads[i].join();
415: }
416: Thread.sleep(1000);
417:
418: // verify
419: activeCount = 0;
420: for (int i = 1; i < nodes; ++i) {
421: boolean active = managers[i].isActiveCoordinator();
422: if (active)
423: ++activeCount;
424: System.out.println("*** Server[" + i + "] ("
425: + (active ? "active" : "non-active") + ")state is "
426: + sinks[i]);
427: }
428: assertEquals("Active coordinator", 1, activeCount);
429:
430: // shut them down
431: shutdown(groupMgr, msgStage, 1, nodes);
432: }
433:
434: private StateManager createStateManageNode(int localIndex,
435: ChangeSink[] sinks, TribesGroupManager[] groupMgr,
436: L2StateMessageStage[] messageStage) throws Exception {
437:
438: TribesGroupManager gm = (TribesGroupManager) GroupManagerFactory
439: .createGroupManager();
440: groupMgr[localIndex] = gm;
441: MyGroupEventListener gel = new MyGroupEventListener();
442: MyListener l = new MyListener();
443: gm.registerForMessages(TestMessage.class, l);
444: gm.registerForGroupEvents(gel);
445: sinks[localIndex] = new ChangeSink(localIndex);
446: MyStateManagerConfig config = new MyStateManagerConfig();
447: config.electionTime = 5;
448: StateManager mgr = new StateManagerImpl(logger, gm,
449: sinks[localIndex], config, WeightGeneratorFactory
450: .createDefaultFactory());
451: messageStage[localIndex] = new L2StateMessageStage(mgr);
452: gm.routeMessages(L2StateMessage.class, messageStage[localIndex]
453: .getSink());
454: messageStage[localIndex].start();
455: return (mgr);
456: }
457:
458: private static class L2StateMessageStage extends Thread {
459: private final MockSink sink;
460: private final NoExceptionLinkedQueue processQ = new NoExceptionLinkedQueue();
461: private final StateManager mgr;
462: private volatile boolean stop = false;
463:
464: public L2StateMessageStage(StateManager mgr) {
465: this .mgr = mgr;
466: this .sink = new MockSink() {
467: public void add(EventContext ec) {
468: processQ.put(ec);
469: }
470: };
471: setDaemon(true);
472: setName("L2StateMessageStageThread");
473: }
474:
475: public synchronized void requestStop() {
476: stop = true;
477: }
478:
479: public synchronized boolean isStopped() {
480: return stop;
481: }
482:
483: public Sink getSink() {
484: return sink;
485: }
486:
487: public void run() {
488: while (!isStopped()) {
489: L2StateMessage m = (L2StateMessage) processQ.poll(3000);
490: if (m != null) {
491: mgr.handleClusterStateMessage(m);
492: }
493: }
494: }
495: }
496:
497: private static class ElectionThread extends Thread {
498: private StateManager mgr;
499:
500: public ElectionThread(StateManager mgr) {
501: setMgr(mgr);
502: }
503:
504: public void setMgr(StateManager mgr) {
505: this .mgr = mgr;
506: }
507:
508: public void run() {
509: mgr.startElection();
510: }
511: }
512:
513: private static class MyStateManagerConfig implements
514: StateManagerConfig {
515: public int electionTime;
516:
517: public int getElectionTimeInSecs() {
518: return electionTime;
519: }
520: }
521:
522: private static class ElectionIfNecessaryThread extends Thread {
523: private StateManager mgr;
524: private NodeID disconnectedNode;
525:
526: public ElectionIfNecessaryThread(StateManager mgr,
527: NodeID disconnectedNode) {
528: this .mgr = mgr;
529: this .disconnectedNode = disconnectedNode;
530: }
531:
532: public void run() {
533: mgr.startElectionIfNecessary(disconnectedNode);
534: }
535: }
536:
537: private static class ChangeSink extends MockSink {
538: private int serverIndex;
539: private StateChangedEvent event = null;
540:
541: public ChangeSink(int index) {
542: serverIndex = index;
543: }
544:
545: public void add(EventContext context) {
546: event = (StateChangedEvent) context;
547: System.out.println("*** Server[" + serverIndex + "]: "
548: + event);
549: }
550:
551: public State getState() {
552: if (event == null)
553: return null;
554: return event.getCurrentState();
555: }
556:
557: public String toString() {
558: State st = getState();
559: return ((st != null) ? st.toString() : "<state unknown>");
560: }
561:
562: }
563:
564: private static class MyGroupEventListener implements
565: GroupEventsListener {
566:
567: private NodeID lastNodeJoined;
568: private NodeID lastNodeLeft;
569:
570: public void nodeJoined(NodeID nodeID) {
571: System.err.println("\n### nodeJoined -> " + nodeID);
572: lastNodeJoined = nodeID;
573: }
574:
575: public void nodeLeft(NodeID nodeID) {
576: System.err.println("\n### nodeLeft -> " + nodeID);
577: lastNodeLeft = nodeID;
578: }
579:
580: public NodeID getLastNodeJoined() {
581: return lastNodeJoined;
582: }
583:
584: public NodeID getLastNodeLeft() {
585: return lastNodeLeft;
586: }
587:
588: public void reset() {
589: lastNodeJoined = lastNodeLeft = null;
590: }
591: }
592:
593: private static final class MyListener implements
594: GroupMessageListener {
595:
596: NoExceptionLinkedQueue queue = new NoExceptionLinkedQueue();
597:
598: public void messageReceived(NodeID fromNode, GroupMessage msg) {
599: queue.put(msg);
600: }
601:
602: public GroupMessage take() {
603: return (GroupMessage) queue.take();
604: }
605:
606: }
607:
608: private static final class TestMessage extends AbstractGroupMessage {
609:
610: // to make serialization sane
611: public TestMessage() {
612: super (0);
613: }
614:
615: public TestMessage(String message) {
616: super (0);
617: this .msg = message;
618: }
619:
620: String msg;
621:
622: protected void basicReadExternal(int msgType, ObjectInput in)
623: throws IOException {
624: msg = in.readUTF();
625:
626: }
627:
628: protected void basicWriteExternal(int msgType, ObjectOutput out)
629: throws IOException {
630: out.writeUTF(msg);
631:
632: }
633:
634: public int hashCode() {
635: return msg.hashCode();
636: }
637:
638: public boolean equals(Object o) {
639: if (o instanceof TestMessage) {
640: TestMessage other = (TestMessage) o;
641: return this .msg.equals(other.msg);
642: }
643: return false;
644: }
645:
646: public String toString() {
647: return "TestMessage [ " + msg + "]";
648: }
649: }
650:
651: }
|