001: // $Id: ConnectStressTest.java,v 1.16 2006/05/19 11:23:27 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import EDU.oswego.cs.dl.util.concurrent.BrokenBarrierException;
006: import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
007: import junit.framework.TestCase;
008: import junit.framework.Test;
009: import junit.framework.TestSuite;
010: import org.jgroups.*;
011: import org.jgroups.util.Util;
012:
013: import java.util.Vector;
014:
015: /**
016: * Creates 1 channel, then creates NUM channels, all try to join the same channel concurrently.
017: * @author Bela Ban Nov 20 2003
018: * @version $Id: ConnectStressTest.java,v 1.16 2006/05/19 11:23:27 belaban Exp $
019: */
020: public class ConnectStressTest extends TestCase {
021: static CyclicBarrier start_connecting = null;
022: static CyclicBarrier connected = null;
023: static CyclicBarrier received_all_views = null;
024: static CyclicBarrier start_disconnecting = null;
025: static CyclicBarrier disconnected = null;
026: static final int NUM = 30;
027: static final MyThread[] threads = new MyThread[NUM];
028: static JChannel channel = null;
029: static String groupname = "ConcurrentTestDemo";
030:
031: static String props = "UDP(mcast_addr=228.8.8.9;mcast_port=7788;ip_ttl=1;"
032: + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"
033: + "PING(timeout=3000;num_initial_members=10):"
034: + "MERGE2(min_interval=3000;max_interval=5000):"
035: + "FD_SOCK:"
036: + "VERIFY_SUSPECT(timeout=1500):"
037: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"
038: + "UNICAST(timeout=300,600,1200,2400):"
039: + "pbcast.STABLE(desired_avg_gossip=5000):"
040: + "FRAG(frag_size=4096;down_thread=false;up_thread=false):"
041: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
042: + "shun=false;print_local_addr=false;view_ack_collection_timeout=5000;"
043: + "digest_timeout=0;merge_timeout=30000)";
044:
045: public ConnectStressTest(String name) {
046: super (name);
047:
048: }
049:
050: static void log(String msg) {
051: System.out.println("-- [" + Thread.currentThread().getName()
052: + "] " + msg);
053: }
054:
055: public void testConcurrentJoins() throws Exception {
056: start_connecting = new CyclicBarrier(NUM + 1);
057: connected = new CyclicBarrier(NUM + 1);
058: received_all_views = new CyclicBarrier(NUM + 1);
059: start_disconnecting = new CyclicBarrier(NUM + 1);
060: disconnected = new CyclicBarrier(NUM + 1);
061:
062: long start, stop;
063:
064: // create main channel - will be coordinator for JOIN requests
065: channel = new JChannel(props);
066: channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
067: start = System.currentTimeMillis();
068: channel.connect(groupname);
069: stop = System.currentTimeMillis();
070: log(channel.getLocalAddress() + " connected in "
071: + (stop - start) + " msecs ("
072: + channel.getView().getMembers().size()
073: + " members). VID=" + channel.getView().getVid());
074: assertEquals(1, channel.getView().getMembers().size());
075:
076: for (int i = 0; i < threads.length; i++) {
077: threads[i] = new MyThread(i);
078: threads[i].start();
079: }
080:
081: // signal the threads to start connecting to their channels
082: start_connecting.barrier();
083: start = System.currentTimeMillis();
084:
085: try {
086: connected.barrier();
087: stop = System.currentTimeMillis();
088: System.out.println("-- took " + (stop - start)
089: + " msecs for all " + NUM + " threads to connect");
090:
091: received_all_views.barrier();
092: stop = System.currentTimeMillis();
093: System.out.println("-- took " + (stop - start)
094: + " msecs for all " + NUM
095: + " threads to see all views");
096:
097: int num_members = -1;
098: for (int i = 0; i < 10; i++) {
099: View v = channel.getView();
100: num_members = v.getMembers().size();
101: System.out.println("*--* number of members connected: "
102: + num_members + ", (expected: " + (NUM + 1)
103: + "), v=" + v);
104: if (num_members == NUM + 1)
105: break;
106: Util.sleep(500);
107: }
108: assertEquals((NUM + 1), num_members);
109: } catch (Exception ex) {
110: fail(ex.toString());
111: }
112: }
113:
114: public void testConcurrentLeaves() throws Exception {
115: start_disconnecting.barrier();
116: long start, stop;
117: start = System.currentTimeMillis();
118:
119: disconnected.barrier();
120: stop = System.currentTimeMillis();
121: System.out.println("-- took " + (stop - start) + " msecs for "
122: + NUM + " threads to disconnect");
123:
124: int num_members = 0;
125: for (int i = 0; i < 10; i++) {
126: View v = channel.getView();
127: Vector mbrs = v != null ? v.getMembers() : null;
128: if (mbrs != null) {
129: num_members = mbrs.size();
130: System.out.println("*--* number of members connected: "
131: + num_members + ", (expected: 1), view=" + v);
132: if (num_members <= 1)
133: break;
134: }
135: Util.sleep(3000);
136: }
137: assertEquals(1, num_members);
138: log("closing all channels");
139: for (int i = 0; i < threads.length; i++) {
140: MyThread t = threads[i];
141: t.closeChannel();
142: }
143: channel.close();
144: }
145:
146: public static class MyThread extends Thread {
147: int index = -1;
148: long total_connect_time = 0, total_disconnect_time = 0;
149: private JChannel ch = null;
150: private Address my_addr = null;
151:
152: public MyThread(int i) {
153: super ("thread #" + i);
154: index = i;
155: }
156:
157: public void closeChannel() {
158: if (ch != null) {
159: ch.close();
160: }
161: }
162:
163: public void run() {
164: View view;
165:
166: try {
167: ch = new JChannel(props);
168:
169: start_connecting.barrier();
170:
171: long start = System.currentTimeMillis(), stop;
172: ch.connect(groupname);
173: stop = System.currentTimeMillis();
174: total_connect_time = stop - start;
175: view = ch.getView();
176: my_addr = ch.getLocalAddress();
177: log(my_addr + " connected in " + total_connect_time
178: + " msecs (" + view.getMembers().size()
179: + " members). VID=" + view.getVid());
180:
181: connected.barrier();
182:
183: int num_members = 0;
184: while (true) {
185: View v = ch.getView();
186: Vector mbrs = v != null ? v.getMembers() : null;
187: if (mbrs == null) {
188: System.err.println("mbrs is null, v=" + v);
189: } else {
190: num_members = mbrs.size();
191: log("num_members=" + num_members);
192: if (num_members == NUM + 1) // all threads (NUM) plus the first channel (1)
193: break;
194: }
195: Util.sleep(2000);
196: }
197: log("reached " + num_members + " members");
198: received_all_views.barrier();
199:
200: start_disconnecting.barrier();
201: start = System.currentTimeMillis();
202: ch.disconnect();
203: stop = System.currentTimeMillis();
204:
205: log(my_addr + " disconnected in " + (stop - start)
206: + " msecs");
207: disconnected.barrier();
208: } catch (BrokenBarrierException e) {
209: e.printStackTrace();
210: } catch (ChannelException e) {
211: e.printStackTrace();
212: } catch (InterruptedException e) {
213: e.printStackTrace();
214: }
215: }
216:
217: }
218:
219: public static Test suite() {
220: TestSuite s = new TestSuite();
221: // we're adding the tests manually, because they need to be run in *this exact order*
222: s.addTest(new ConnectStressTest("testConcurrentJoins"));
223: s.addTest(new ConnectStressTest("testConcurrentLeaves"));
224: return s;
225: }
226:
227: public static void main(String[] args) {
228: String[] testCaseName = { ConnectStressTest.class.getName() };
229: junit.textui.TestRunner.main(testCaseName);
230: }
231:
232: }
|