001: // $Id: MergeStressTest.java,v 1.4 2005/12/22 14:27:51 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
006: import junit.framework.Test;
007: import junit.framework.TestCase;
008: import junit.framework.TestSuite;
009: import org.jgroups.*;
010: import org.jgroups.util.Util;
011:
012: /**
013: * Creates NUM channels, all trying to join the same channel concurrently. This will lead to singleton groups
014: * and subsequent merging. To enable merging, GMS.handle_concurrent_startup has to be set to false.
015: * @author Bela Ban
016: * @version $Id: MergeStressTest.java,v 1.4 2005/12/22 14:27:51 belaban Exp $
017: */
018: public class MergeStressTest extends TestCase {
019: static CyclicBarrier start_connecting = null;
020: static CyclicBarrier received_all_views = null;
021: static CyclicBarrier start_disconnecting = null;
022: static CyclicBarrier disconnected = null;
023: static final int NUM = 10;
024: static final long TIMEOUT = 50000;
025: static final MyThread[] threads = new MyThread[NUM];
026: static String groupname = "ConcurrentTestDemo";
027:
028: static String props = "UDP(mcast_addr=228.8.8.9;mcast_port=7788;ip_ttl=1;"
029: + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"
030: + "PING(timeout=3000;num_initial_members=3):"
031: + "MERGE2(min_interval=3000;max_interval=5000):"
032: + "FD_SOCK:"
033: + "VERIFY_SUSPECT(timeout=1500):"
034: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"
035: + "UNICAST(timeout=300,600,1200,2400):"
036: + "pbcast.STABLE(desired_avg_gossip=5000):"
037: + "FRAG(frag_size=4096;down_thread=false;up_thread=false):"
038: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
039: + "shun=false;print_local_addr=false;view_ack_collection_timeout=5000;"
040: + "digest_timeout=0;merge_timeout=30000;handle_concurrent_startup=false)";
041:
042: public MergeStressTest(String name) {
043: super (name);
044: }
045:
046: static void log(String msg) {
047: System.out.println("-- [" + Thread.currentThread().getName()
048: + "] " + msg);
049: }
050:
051: public void testConcurrentStartupAndMerging() throws Exception {
052: start_connecting = new CyclicBarrier(NUM + 1);
053: received_all_views = new CyclicBarrier(NUM + 1);
054: start_disconnecting = new CyclicBarrier(NUM + 1);
055: disconnected = new CyclicBarrier(NUM + 1);
056:
057: long start, stop;
058:
059: for (int i = 0; i < threads.length; i++) {
060: threads[i] = new MyThread(i);
061: threads[i].start();
062: }
063:
064: // signal the threads to start connecting to their channels
065: Util.sleep(1000);
066: start_connecting.barrier();
067: start = System.currentTimeMillis();
068:
069: try {
070: received_all_views.barrier();
071: stop = System.currentTimeMillis();
072: System.out.println("-- took " + (stop - start)
073: + " msecs for all " + NUM
074: + " threads to see all views");
075:
076: int num_members;
077: MyThread t;
078: System.out.print("checking that all views have " + NUM
079: + " members: ");
080: for (int i = 0; i < threads.length; i++) {
081: t = threads[i];
082: num_members = t.numMembers();
083: assertEquals(num_members, NUM);
084: }
085: System.out.println("SUCCESSFUL");
086: } catch (Exception ex) {
087: fail(ex.toString());
088: } finally {
089: start_disconnecting.barrier();
090: disconnected.barrier();
091: }
092: }
093:
094: public static class MyThread extends ReceiverAdapter implements
095: Runnable {
096: int index = -1;
097: long total_connect_time = 0, total_disconnect_time = 0;
098: private JChannel ch = null;
099: private Address my_addr = null;
100: private View current_view;
101: private Thread thread;
102: private int num_members = 0;
103:
104: public MyThread(int i) {
105: thread = new Thread(this , "thread #" + i);
106: index = i;
107: }
108:
109: public void start() {
110: thread.start();
111: }
112:
113: public void closeChannel() {
114: if (ch != null) {
115: ch.close();
116: }
117: }
118:
119: public int numMembers() {
120: return ch.getView().size();
121: }
122:
123: public void viewAccepted(View new_view) {
124: String type = "view";
125: if (new_view instanceof MergeView)
126: type = "merge view";
127: if (current_view == null) {
128: current_view = new_view;
129: log(type + " accepted: " + current_view.getVid()
130: + " :: " + current_view.getMembers());
131: } else {
132: if (!current_view.equals(new_view)) {
133: current_view = new_view;
134: log(type + " accepted: " + current_view.getVid()
135: + " :: " + current_view.getMembers());
136: }
137: }
138:
139: num_members = current_view.getMembers().size();
140: if (num_members == NUM) {
141: synchronized (this ) {
142: this .notifyAll();
143: }
144: }
145: }
146:
147: public void run() {
148: View view;
149:
150: try {
151: start_connecting.barrier();
152: ch = new JChannel(props);
153: ch.setReceiver(this );
154: log("connecting to channel");
155: long start = System.currentTimeMillis(), stop;
156: ch.connect(groupname);
157: stop = System.currentTimeMillis();
158: total_connect_time = stop - start;
159: view = ch.getView();
160: my_addr = ch.getLocalAddress();
161: log(my_addr + " connected in " + total_connect_time
162: + " msecs (" + view.getMembers().size()
163: + " members). VID=" + ch.getView());
164:
165: synchronized (this ) {
166: while (num_members < NUM) {
167: try {
168: this .wait();
169: } catch (InterruptedException e) {
170: }
171: }
172: }
173:
174: log("reached " + num_members + " members");
175: received_all_views.barrier();
176:
177: start_disconnecting.barrier();
178: start = System.currentTimeMillis();
179: ch.shutdown();
180: stop = System.currentTimeMillis();
181:
182: log(my_addr + " shut down in " + (stop - start)
183: + " msecs");
184: disconnected.barrier();
185: } catch (Exception e) {
186: e.printStackTrace();
187: }
188: }
189:
190: }
191:
192: public static Test suite() {
193: TestSuite s = new TestSuite(MergeStressTest.class);
194: return s;
195: }
196:
197: public static void main(String[] args) {
198: String[] testCaseName = { MergeStressTest.class.getName() };
199: junit.textui.TestRunner.main(testCaseName);
200: }
201:
202: }
|