001: package org.jgroups.tests;
002:
003: import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
004: import junit.framework.Test;
005: import junit.framework.TestCase;
006: import junit.framework.TestSuite;
007: import org.jgroups.Address;
008: import org.jgroups.JChannel;
009: import org.jgroups.View;
010:
011: /**
012: * Tests concurrent leaves of all members of a channel
013: * @author Bela Ban
014: * @version $Id: DisconnectStressTest.java,v 1.2 2006/05/04 12:28:45 belaban Exp $
015: */
016: public class DisconnectStressTest extends TestCase {
017: static CyclicBarrier all_disconnected = null;
018: static CyclicBarrier start_disconnecting = null;
019: static final int NUM = 30;
020: static final long TIMEOUT = 50000;
021: static final MyThread[] threads = new MyThread[NUM];
022: static String groupname = "ConcurrentTestDemo";
023:
024: static String props = "UDP(mcast_addr=228.8.8.9;mcast_port=7788;ip_ttl=1;"
025: + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"
026: + "PING(timeout=3000;num_initial_members=3):"
027: + "MERGE2(min_interval=3000;max_interval=5000):"
028: + "FD_SOCK:"
029: + "VERIFY_SUSPECT(timeout=1500):"
030: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"
031: + "UNICAST(timeout=300,600,1200,2400):"
032: + "pbcast.STABLE(desired_avg_gossip=5000):"
033: + "FRAG(frag_size=4096;down_thread=false;up_thread=false):"
034: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
035: + "shun=false;print_local_addr=false;view_ack_collection_timeout=5000;"
036: + "digest_timeout=0;merge_timeout=30000;handle_concurrent_startup=true)";
037:
038: public DisconnectStressTest(String name) {
039: super (name);
040: }
041:
042: static void log(String msg) {
043: System.out.println("-- [" + Thread.currentThread().getName()
044: + "] " + msg);
045: }
046:
047: public void testConcurrentStartupAndMerging() throws Exception {
048: all_disconnected = new CyclicBarrier(NUM + 1);
049: start_disconnecting = new CyclicBarrier(NUM + 1);
050:
051: for (int i = 0; i < threads.length; i++) {
052: threads[i] = new MyThread(i);
053: synchronized (threads[i]) {
054: threads[i].start();
055: threads[i].wait(20000);
056: }
057: }
058:
059: log("DISCONNECTING");
060: start_disconnecting.barrier(); // causes all channels to disconnect
061:
062: all_disconnected.barrier(); // notification when all threads have disconnected
063: }
064:
065: public static class MyThread extends Thread {
066: int index = -1;
067: long total_connect_time = 0, total_disconnect_time = 0;
068: private JChannel ch = null;
069: private Address my_addr = null;
070:
071: public MyThread(int i) {
072: super ("thread #" + i);
073: index = i;
074: }
075:
076: public void closeChannel() {
077: if (ch != null) {
078: ch.close();
079: }
080: }
081:
082: public int numMembers() {
083: return ch.getView().size();
084: }
085:
086: public void run() {
087: View view;
088:
089: try {
090: ch = new JChannel(props);
091: log("connecting to channel");
092: long start = System.currentTimeMillis(), stop;
093: ch.connect(groupname);
094: stop = System.currentTimeMillis();
095: synchronized (this ) {
096: this .notify();
097: }
098: total_connect_time = stop - start;
099: view = ch.getView();
100: my_addr = ch.getLocalAddress();
101: log(my_addr + " connected in " + total_connect_time
102: + " msecs (" + view.getMembers().size()
103: + " members). VID=" + ch.getView());
104:
105: start_disconnecting.barrier();
106:
107: start = System.currentTimeMillis();
108: ch.disconnect();
109: stop = System.currentTimeMillis();
110:
111: log(my_addr + " disconnected in " + (stop - start)
112: + " msecs");
113: all_disconnected.barrier();
114: } catch (Exception e) {
115: e.printStackTrace();
116: }
117: }
118:
119: }
120:
121: public static Test suite() {
122: return new TestSuite(DisconnectStressTest.class);
123: }
124:
125: public static void main(String[] args) {
126: String[] testCaseName = { DisconnectStressTest.class.getName() };
127: junit.textui.TestRunner.main(testCaseName);
128: }
129:
130: }
|