001: // $Id: RpcDispatcherStressTest.java,v 1.8 2005/05/30 16:15:12 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.*;
006: import org.jgroups.blocks.GroupRequest;
007: import org.jgroups.blocks.RpcDispatcher;
008: import org.jgroups.util.RspList;
009: import org.jgroups.util.Util;
010:
011: /**
012: * Example for RpcDispatcher (see also MessageDispatcher). Multiple threads will invoke print() on
013: * all members and wait indefinitely for all responses (excluding of course crashed members). Run this
014: * on 2 nodes for an extended period of time to see whether GroupRequest.doExecute() hangs.
015: * @author Bela Ban
016: */
017: public class RpcDispatcherStressTest implements MembershipListener {
018: Channel channel;
019: RpcDispatcher disp;
020: RspList rsp_list;
021: Publisher[] threads = null;
022: int[] results;
023:
024: public int print(int number) throws Exception {
025: return number * 2;
026: }
027:
028: public void start(String props, int num_threads, long interval,
029: boolean discard_local) throws Exception {
030: channel = new JChannel(props);
031: if (discard_local)
032: channel.setOpt(Channel.LOCAL, Boolean.FALSE);
033: disp = new RpcDispatcher(channel, null, this , this );
034: channel.connect("RpcDispatcherStressTestGroup");
035:
036: threads = new Publisher[num_threads];
037: results = new int[num_threads];
038: for (int i = 0; i < threads.length; i++) {
039: threads[i] = new Publisher(i, interval);
040: results[i] = 0;
041: }
042:
043: System.out.println("-- Created " + threads.length
044: + " threads. Press enter to start them "
045: + "('-' for sent message, '+' for received message)");
046: System.out.println("-- Press enter to stop the threads");
047:
048: System.out.flush();
049: System.in.read();
050: System.in.skip(System.in.available());
051:
052: for (int i = 0; i < threads.length; i++)
053: threads[i].start();
054:
055: System.out.flush();
056: System.in.read();
057: System.in.skip(System.in.available());
058:
059: for (int i = 0; i < threads.length; i++) {
060: threads[i].stopThread();
061: threads[i].join(2000);
062: }
063:
064: System.out.println("\n");
065: for (int i = 0; i < threads.length; i++) {
066: System.out
067: .println("-- thread #" + i
068: + ": called remote method " + results[i]
069: + " times");
070: }
071:
072: System.out.println("Closing channel");
073: channel.close();
074: System.out.println("Closing channel: -- done");
075:
076: System.out.println("Stopping dispatcher");
077: disp.stop();
078: System.out.println("Stopping dispatcher: -- done");
079: }
080:
081: /* --------------------------------- MembershipListener interface ---------------------------------- */
082:
083: public void viewAccepted(View new_view) {
084: System.out.println("-- new view: " + new_view);
085: }
086:
087: public void suspect(Address suspected_mbr) {
088: System.out.println("-- suspected " + suspected_mbr);
089: }
090:
091: public void block() {
092: ;
093: }
094:
095: /* ------------------------------ End of MembershipListener interface -------------------------------- */
096:
097: class Publisher extends Thread {
098: int rank = 0;
099: boolean running = true;
100: int num_calls = 0;
101: long interval = 1000;
102:
103: Publisher(int rank, long interval) {
104: super ();
105: setDaemon(true);
106: this .rank = rank;
107: this .interval = interval;
108: }
109:
110: public void stopThread() {
111: running = false;
112: }
113:
114: public void run() {
115: while (running) {
116: System.out.print(rank + "- ");
117: disp.callRemoteMethods(null, "print",
118: new Object[] { new Integer(num_calls) },
119: new Class[] { int.class },
120: GroupRequest.GET_ALL, 0);
121: num_calls++;
122: System.out.print(rank + "+ ");
123: Util.sleep(interval);
124: }
125: results[rank] = num_calls;
126: }
127: }
128:
129: public static void main(String[] args) {
130: String props;
131: int num_threads = 1;
132: long interval = 1000;
133: boolean discard_local = false;
134:
135: props = "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;"
136: + "ucast_recv_buf_size=16000;ucast_send_buf_size=16000;"
137: + "mcast_send_buf_size=32000;mcast_recv_buf_size=64000;loopback=true):"
138: + "PING(timeout=2000;num_initial_members=3):"
139: + "MERGE2(min_interval=5000;max_interval=10000):"
140: + "FD_SOCK:"
141: + "VERIFY_SUSPECT(timeout=1500):"
142: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=1000,1500,2000,3000;max_xmit_size=8192):"
143: + "UNICAST(timeout=1000,1500,2000,3000):"
144: + "pbcast.STABLE(desired_avg_gossip=10000):"
145: + "FRAG(frag_size=8192;down_thread=false;up_thread=false):"
146: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true):"
147: + "pbcast.STATE_TRANSFER";
148:
149: try {
150: for (int i = 0; i < args.length; i++) {
151: if ("-num_threads".equals(args[i])) {
152: num_threads = Integer.parseInt(args[++i]);
153: continue;
154: }
155: if ("-interval".equals(args[i])) {
156: interval = Long.parseLong(args[++i]);
157: continue;
158: }
159: if ("-props".equals(args[i])) {
160: props = args[++i];
161: continue;
162: }
163: if ("-discard_local".equals(args[i])) {
164: discard_local = true;
165: continue;
166: }
167: help();
168: return;
169: }
170:
171: new RpcDispatcherStressTest().start(props, num_threads,
172: interval, discard_local);
173: } catch (Exception e) {
174: System.err.println(e);
175: }
176: }
177:
178: static void help() {
179: System.out
180: .println("RpcDispatcherStressTest [-help] [-interval <msecs>] "
181: + "[-num_threads <number>] [-props <stack properties>] [-discard_local]");
182: }
183: }
|