001: // $Id: SpeedTest.java,v 1.18 2005/05/30 16:15:12 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.Channel;
006: import org.jgroups.JChannel;
007: import org.jgroups.Message;
008: import org.jgroups.conf.ConfiguratorFactory;
009: import org.jgroups.conf.ProtocolStackConfigurator;
010: import org.jgroups.debug.Debugger;
011: import org.jgroups.util.ExposedByteArrayOutputStream;
012: import org.jgroups.util.Util;
013:
014: import java.io.ByteArrayInputStream;
015: import java.io.DataInputStream;
016: import java.io.DataOutputStream;
017: import java.io.IOException;
018: import java.net.DatagramPacket;
019: import java.net.DatagramSocket;
020: import java.net.InetAddress;
021: import java.net.MulticastSocket;
022:
023: /**
024: * Test time taken for multicasting n local messages (messages sent to self). Uses simple MulticastSocket.
025: * Note that packets might get dropped if Util.sleep(1) is commented out (on certain systems this has
026: * to be increased even further). If running with -jg option and Util.sleep() is commented out, there will
027: * probably be packet loss, which will be repaired (by means of retransmission) by JGroups.
028: * @author Bela Ban
029: * @version $Id: SpeedTest.java,v 1.18 2005/05/30 16:15:12 belaban Exp $
030: */
031: public class SpeedTest {
032: static long start, stop;
033: private static final String LOOPBACK = "LOOPBACK(down_thread=false;up_thread=false)";
034:
035: public static void main(String[] args) {
036: DatagramSocket sock = null;
037: Receiver receiver;
038: int num_msgs = 1000, num_sent = 0, group_port = 7500, num_yields = 0;
039: DatagramPacket packet;
040: InetAddress group_addr = null;
041: int[][] matrix;
042: boolean jg = false; // use JGroups channel instead of UDP MulticastSocket
043: JChannel channel = null;
044: String group_name = "SpeedTest-Group";
045: Message send_msg;
046: boolean debug = false, cummulative = false, busy_sleep = false, yield = false, loopback = false;
047: Debugger debugger = null;
048: long sleep_time = 1; // sleep in msecs between msg sends
049: ExposedByteArrayOutputStream output = new ExposedByteArrayOutputStream(
050: 64);
051: String props;
052:
053: props = "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;"
054: + "ucast_send_buf_size=32000;ucast_recv_buf_size=64000;"
055: + "mcast_send_buf_size=32000;mcast_recv_buf_size=64000):"
056: + "PING(timeout=2000;num_initial_members=3):"
057: + "MERGE2(min_interval=5000;max_interval=10000):"
058: + "FD_SOCK:"
059: + "VERIFY_SUSPECT(timeout=1500):"
060: + "pbcast.NAKACK(max_xmit_size=8192;gc_lag=50;retransmit_timeout=600,800,1200,2400,4800):"
061: + "UNICAST(timeout=1200):"
062: + "pbcast.STABLE(desired_avg_gossip=10000):"
063: + "FRAG(frag_size=8192;down_thread=false;up_thread=false):"
064: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
065: + "shun=false;print_local_addr=true):"
066: + "pbcast.STATE_TRANSFER";
067: // "PERF(details=true)";
068:
069: for (int i = 0; i < args.length; i++) {
070: if ("-help".equals(args[i])) {
071: help();
072: return;
073: }
074: if ("-jg".equals(args[i])) {
075: jg = true;
076: continue;
077: }
078: if ("-loopback".equals(args[i])) {
079: loopback = true;
080: continue;
081: }
082: if ("-props".equals(args[i])) {
083: props = args[++i];
084: continue;
085: }
086: if ("-debug".equals(args[i])) {
087: debug = true;
088: continue;
089: }
090: if ("-cummulative".equals(args[i])) {
091: cummulative = true;
092: continue;
093: }
094: if ("-busy_sleep".equals(args[i])) {
095: busy_sleep = true;
096: continue;
097: }
098: if ("-yield".equals(args[i])) {
099: yield = true;
100: num_yields++;
101: continue;
102: }
103: if ("-sleep".equals(args[i])) {
104: sleep_time = Long.parseLong(args[++i]);
105: continue;
106: }
107: if ("-num_msgs".equals(args[i])) {
108: num_msgs = Integer.parseInt(args[++i]);
109: continue;
110: }
111: help();
112: return;
113: }
114:
115: System.out.println("jg = " + jg + "\nloopback = "
116: + loopback + "\ndebug = " + debug + "\nsleep = "
117: + sleep_time + "\nbusy_sleep=" + busy_sleep
118: + "\nyield=" + yield + "\nnum_yields=" + num_yields
119: + "\nnum_msgs = " + num_msgs + '\n');
120:
121: try {
122: matrix = new int[num_msgs][2];
123: for (int i = 0; i < num_msgs; i++) {
124: for (int j = 0; j < matrix[i].length; j++)
125: matrix[i][j] = 0;
126: }
127:
128: if (jg) {
129: if (loopback) {
130: ProtocolStackConfigurator conf = ConfiguratorFactory
131: .getStackConfigurator(props);
132: String tmp = conf.getProtocolStackString();
133: int index = tmp.indexOf(':');
134: props = LOOPBACK + tmp.substring(index);
135: }
136: channel = new JChannel(props);
137: // System.out.println("props:\n" + channel.getProperties());
138: channel.connect(group_name);
139: if (debug) {
140: debugger = new Debugger(channel, cummulative);
141: debugger.start();
142: }
143: } else {
144: group_addr = InetAddress.getByName("224.0.0.36");
145: sock = new DatagramSocket();
146: }
147:
148: if (debug) {
149: System.out.println("Press key to start");
150: System.in.read();
151: }
152: receiver = new Receiver(group_addr, group_port, channel,
153: matrix, jg);
154: receiver.start();
155:
156: byte[] buf;
157: DataOutputStream out;
158:
159: start = System.currentTimeMillis();
160: for (int i = 0; i < num_msgs; i++) {
161: // buf=Util.objectToByteBuffer(new Integer(i));
162: output.reset();
163: out = new DataOutputStream(output);
164: out.writeInt(i);
165: out.flush();
166: buf = output.getRawBuffer();
167: out.close();
168:
169: if (jg) {
170: send_msg = new Message(null, null, buf, 0,
171: buf.length);
172: channel.send(send_msg);
173: } else {
174: packet = new DatagramPacket(buf, buf.length,
175: group_addr, group_port);
176: sock.send(packet);
177: }
178: num_sent++;
179: if (num_sent % 1000 == 0)
180: System.out.println("-- sent " + num_sent);
181:
182: matrix[i][0] = 1;
183: if (yield) {
184: for (int k = 0; k < num_yields; k++) {
185: Thread.yield();
186: }
187: } else {
188: if (sleep_time > 0) {
189: sleep(sleep_time, busy_sleep);
190: }
191: }
192: }
193: while (true) {
194: System.in.read();
195: printMatrix(matrix);
196: }
197: } catch (Exception ex) {
198: ex.printStackTrace();
199: System.exit(-1);
200: }
201: }
202:
203: /**
204: * On most UNIX systems, the minimum sleep time is 10-20ms. Even if we specify sleep(1), the thread will
205: * sleep for at least 10-20ms. On Windows, sleep() seems to be implemented as a busy sleep, that is the
206: * thread never relinquishes control and therefore the sleep(x) is exactly x ms long.
207: */
208: static void sleep(long msecs, boolean busy_sleep) {
209: if (!busy_sleep) {
210: Util.sleep(msecs);
211: return;
212: }
213:
214: long start = System.currentTimeMillis();
215: long stop = start + msecs;
216:
217: while (stop > start) {
218: start = System.currentTimeMillis();
219: }
220: }
221:
222: static void printMatrix(int[][] m) {
223: int tmp = 0;
224: System.out.print("not sent: ");
225: for (int i = 0; i < m.length; i++) {
226: if (m[i][0] == 0) {
227: System.out.print(i + " ");
228: tmp++;
229: }
230: }
231: System.out.println("\ntotal not sent: " + tmp);
232:
233: tmp = 0;
234: System.out.print("not received: ");
235: for (int i = 0; i < m.length; i++) {
236: if (m[i][1] == 0) {
237: System.out.print(i + " ");
238: tmp++;
239: }
240: }
241: System.out.println("\ntotal not received: " + tmp);
242: System.out.println("Press CTRL-C to kill this test");
243: }
244:
245: static void help() {
246: System.out
247: .println("SpeedTest [-help] [-num_msgs <num>] [-sleep <sleeptime in msecs between messages>] "
248: + "[-busy_sleep] [-yield] [-jg] [-loopback] [-props <channel properties>] [-debug] [-cummulative]");
249: System.out
250: .println("Options -props -debug and -cummulative are only valid if -jg is used");
251: }
252:
253: static class Receiver implements Runnable {
254: Thread t = null;
255: byte[] buf = new byte[1024];
256: MulticastSocket sock;
257: Channel channel;
258: int num_msgs = 1000;
259: int[][] matrix = null;
260: boolean jg = false;
261:
262: Receiver(InetAddress group_addr, int group_port,
263: Channel channel, int[][] matrix, boolean jg)
264: throws IOException {
265: this .channel = channel;
266: this .matrix = matrix;
267: this .jg = jg;
268: num_msgs = matrix.length;
269: if (group_addr != null) {
270: sock = new MulticastSocket(group_port);
271: sock.joinGroup(group_addr);
272: }
273: }
274:
275: public void start() {
276: if (t == null) {
277: t = new Thread(this , "receiver thread");
278: t.start();
279: }
280: }
281:
282: public void run() {
283: int num_received = 0;
284: int number;
285: DatagramPacket packet;
286: Object obj;
287: long total_time;
288: double msgs_per_sec = 0;
289: DataInputStream in;
290:
291: packet = new DatagramPacket(buf, buf.length);
292: while (num_received <= num_msgs) {
293: try {
294: if (jg) {
295: obj = channel.receive(0);
296: if (obj instanceof Message) {
297: // msg=(Message)obj;
298: } else {
299: System.out.println("received non-msg: "
300: + obj.getClass());
301: continue;
302: }
303: } else {
304: sock.receive(packet);
305: }
306:
307: // number=((Integer)Util.objectFromByteBuffer(msg_data)).intValue();
308: in = new DataInputStream(new ByteArrayInputStream(
309: buf));
310: number = in.readInt();
311: matrix[number][1] = 1;
312: // System.out.println("-- received " + number);
313: num_received++;
314: if (num_received % 1000 == 0)
315: System.out.println("received " + num_received
316: + " packets");
317: if (num_received >= num_msgs)
318: break;
319: } catch (Exception ex) {
320: System.err.println(ex);
321: }
322: }
323: stop = System.currentTimeMillis();
324: total_time = stop - start;
325: msgs_per_sec = (num_received / (total_time / 1000.0));
326: System.out.println("\n** Sending and receiving "
327: + num_received + " took " + total_time + " msecs ("
328: + msgs_per_sec + " msgs/sec) **");
329: System.exit(1);
330: }
331: }
332:
333: }
|