001: // $Id: PerfTest.java,v 1.9 2006/04/23 12:52:54 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.*;
006: import org.jgroups.blocks.PullPushAdapter;
007: import org.jgroups.util.Util;
008:
009: import java.io.IOException;
010: import java.io.ObjectInput;
011: import java.io.ObjectOutput;
012: import java.util.HashMap;
013: import java.util.Iterator;
014: import java.util.Map;
015: import java.util.Vector;
016:
017: /**
018: * Test which multicasts n messages to all members. Measures the time until all members have received
019: * all messages from all senders. Start a number of members (e.g. 4). Wait until all of them are up and
020: * have joined the group. Then press 's' for all senders to start multicasting messages. When you see all
021: * *--* DONE messages for all senders, press 'a' to see the total stats.
022: * @author Bela Ban
023: */
024: public class PerfTest implements MessageListener, MembershipListener {
025:
026: /** HashMap<Address, Entry>. Stores received multicasts. Keyed by sender */
027: HashMap data = new HashMap();
028:
029: /** Keeps track of membership */
030: Vector mbrs = new Vector();
031:
032: /** Channel properties */
033: String props = null;
034:
035: /** Sits on top of the channel */
036: PullPushAdapter adapter = null;
037:
038: /** My channel for sending and receiving messages */
039: JChannel ch = null;
040:
041: /** Am I a sender as well ? */
042: boolean sender = true;
043:
044: /** Sleep time between bursts in milliseconds. 0 means no sleep */
045: long sleep_time = 10;
046:
047: /** Use busy sleeping ? (see #Util.sleep(long,boolean) for details) */
048: boolean busy_sleep = false;
049:
050: /** Number of bursts. Total number of messages is <tt>num_bursts * num_msgs_per_burst</tt> */
051: int num_bursts = 100;
052:
053: /** Number of messages per burst. After a burst we sleep for <tt>sleep_time</tt> msecs */
054: int num_msgs_per_burst = 10;
055:
056: /** Size of a message in bytes */
057: int msg_size = 10000;
058:
059: /** The buffer to be sent (will be <tt>msg_size</tt> bytes) */
060: byte[] buf = null;
061:
062: /** Number of messages sent by us */
063: long sent_msgs = 0;
064:
065: final static String HDRNAME = "PerfHeaderName";
066:
067: public PerfTest(String props, int num_bursts,
068: int num_msgs_per_burst, int msg_size, long sleep_time,
069: boolean sender) {
070: this .props = props;
071: this .num_bursts = num_bursts;
072: this .num_msgs_per_burst = num_msgs_per_burst;
073: this .msg_size = msg_size;
074: this .sleep_time = sleep_time;
075: this .buf = new byte[msg_size];
076: this .sender = sender;
077: }
078:
079: public void start() throws Exception {
080: try {
081: ch = new JChannel(props);
082: ch.connect("PerfTest-Group");
083: adapter = new PullPushAdapter(ch, this , this );
084: mainLoop();
085: } finally {
086: if (ch != null)
087: ch.close();
088: }
089: }
090:
091: void mainLoop() throws Exception {
092: boolean looping = true;
093: int choice;
094: while (looping) {
095: choice = choice();
096: switch (choice) {
097: case 'q':
098: case 'x':
099: looping = false;
100: break;
101: case 's':
102: MyHeader hdr = new MyHeader(MyHeader.START, num_bursts
103: * num_msgs_per_burst);
104: Message start_msg = new Message(null);
105: start_msg.putHeader(HDRNAME, hdr);
106: adapter.send(start_msg);
107: break;
108: case 'c':
109: Message clear_msg = new Message();
110: clear_msg.putHeader(HDRNAME, new MyHeader(
111: MyHeader.CLEAR, 0));
112: adapter.send(clear_msg);
113: break;
114: case 't':
115: printStats();
116: break;
117: case 'p':
118: printParams();
119: break;
120: case 'v':
121: System.out.println("-- view: " + ch.getView());
122: break;
123: case 'a':
124: printStatsForAllSenders();
125: break;
126: }
127: }
128: }
129:
130: private void printStatsForAllSenders() {
131: long start_time = 0, stop_time = 0, total_time;
132: Entry entry;
133: int num_msgs = 0, num_senders = 0;
134:
135: for (Iterator it = data.values().iterator(); it.hasNext();) {
136: entry = (Entry) it.next();
137: if (entry.num_received > 0) {
138: num_msgs += entry.num_received;
139: num_senders++;
140:
141: // get the earliest start time
142: if (start_time == 0)
143: start_time = entry.start;
144: else {
145: start_time = Math.min(start_time, entry.start);
146: }
147:
148: // get the latest stop time
149: if (stop_time == 0) {
150: stop_time = entry.stop;
151: } else {
152: stop_time = Math.max(stop_time, entry.stop);
153: }
154: }
155: }
156:
157: total_time = stop_time - start_time;
158:
159: StringBuffer sb = new StringBuffer();
160: sb.append("total number of messages sent by me: ").append(
161: sent_msgs).append('\n');
162: sb.append("total number of messages received: ").append(
163: num_msgs).append('\n');
164: sb.append("total number of senders: ").append(num_senders)
165: .append('\n');
166: sb.append("total time: ").append(total_time).append(" ms\n");
167: sb.append("msgs/sec: ").append(
168: (double) num_msgs / (total_time / 1000.0)).append('\n');
169: sb.append("throughput (kb/sec): ").append(
170: (num_msgs * msg_size / 1000.0) / (total_time / 1000.0))
171: .append('\n');
172: System.out.println(sb.toString());
173: }
174:
175: private void printParams() {
176: System.out.println("num_bursts: " + num_bursts + '\n'
177: + "num_msgs_per_burst: " + num_msgs_per_burst + '\n'
178: + "msg_size: " + msg_size + '\n' + "sleep_time: "
179: + sleep_time + '\n' + "sender: " + sender);
180: }
181:
182: private void printStats() {
183: for (Iterator it = data.entrySet().iterator(); it.hasNext();) {
184: Map.Entry entry = (Map.Entry) it.next();
185: System.out.println("stats for " + entry.getKey() + "");
186: System.out
187: .println(((Entry) entry.getValue()).printStats() + '\n');
188: }
189: }
190:
191: void sendMessages() {
192: MyHeader hdr;
193: Message msg;
194: int seqno = 0;
195: long start, stop;
196:
197: if (sender == false) {
198: System.out
199: .println("-- I'm not a sender; will not send messages");
200: return;
201: } else {
202: System.out.println("-- sending " + num_bursts
203: * num_msgs_per_burst + " msgs");
204: }
205:
206: sent_msgs = 0;
207:
208: try {
209: start = System.currentTimeMillis();
210: for (int i = 0; i < num_bursts; i++) {
211: for (int j = 0; j < num_msgs_per_burst; j++) {
212: hdr = new MyHeader(MyHeader.DATA, seqno++);
213: msg = new Message(null, null, buf);
214: msg.putHeader(HDRNAME, hdr);
215: adapter.send(msg);
216: sent_msgs++;
217: if (sent_msgs % 100 == 0)
218: System.out.println("++ sent " + sent_msgs);
219: }
220: Util.sleep(sleep_time);
221: }
222: stop = System.currentTimeMillis();
223: System.out.println("-- sent " + num_bursts
224: * num_msgs_per_burst + " msgs (in "
225: + (stop - start) + " ms)");
226: // System.out.flush();
227: // Util.sleep(1000);
228: // System.exit(1);
229: } catch (Throwable t) {
230: t.printStackTrace();
231: }
232: }
233:
234: int choice() throws Exception {
235: System.out
236: .println("s=send, c=clear, t=print stats, p=print parameters v=view, "
237: + "a=times for all messages, q=quit\nChoice: ");
238: System.out.flush();
239: System.in.skip(System.in.available());
240: int c = System.in.read();
241: System.out.flush();
242: return c;
243: }
244:
245: public void receive(Message msg) {
246: Address sender = msg.getSrc();
247: MyHeader hdr = (MyHeader) msg.removeHeader(HDRNAME);
248: if (hdr == null) {
249: System.err.println("-- error: header was null");
250: return;
251: }
252: switch (hdr.type) {
253: case MyHeader.START:
254: updateTimestamp();
255:
256: new Thread() {
257: public void run() {
258: // needs to be done in a separate thread; otherwise we cannot receive
259: // data messages until we have sent all messages (sendMessages() returned).
260: sendMessages();
261: }
262: }.start();
263:
264: break;
265: case MyHeader.DATA:
266: Entry entry = (Entry) data.get(sender);
267: if (entry == null) {
268: System.err.println("-- received a message from "
269: + sender + ", who is not in the list");
270: } else {
271: entry.add(hdr.seqno);
272: if ((hdr.seqno) % 100 == 0)
273: System.out.println("-- received " + sender + ':'
274: + hdr.seqno);
275: if (entry.getNumReceived() >= num_bursts
276: * num_msgs_per_burst) {
277: if (entry.done())
278: System.out.println("*--* " + sender + " DONE");
279: }
280: }
281: break;
282: case MyHeader.DONE:
283:
284: break;
285: case MyHeader.CLEAR:
286: clear();
287: break;
288: default:
289: break;
290: }
291: }
292:
293: private void updateTimestamp() {
294: for (Iterator it = data.values().iterator(); it.hasNext();) {
295: Entry entry = (Entry) it.next();
296: entry.start = System.currentTimeMillis();
297: }
298: }
299:
300: void clear() {
301: System.out.println("-- clearing the data");
302: data.clear();
303: for (int i = 0; i < mbrs.size(); i++)
304: data.put(mbrs.elementAt(i), new Entry(num_bursts
305: * num_msgs_per_burst));
306: }
307:
308: public byte[] getState() {
309: return null;
310: }
311:
312: public void setState(byte[] state) {
313: ;
314: }
315:
316: public void viewAccepted(View new_view) {
317: System.out.println("-- new view: " + new_view.getMembers());
318: mbrs.clear();
319: mbrs.addAll(new_view.getMembers());
320: clear();
321: }
322:
323: public void suspect(Address suspected_mbr) {
324: ;
325: }
326:
327: public void block() {
328: ;
329: }
330:
331: public static void main(String[] args) {
332: String props = null;
333: int num_bursts = 100;
334: int num_msgs_per_burst = 10;
335: long sleep_time = 10;
336: int msg_size = 10000; // in bytes
337: boolean sender = true;
338:
339: PerfTest t;
340:
341: for (int i = 0; i < args.length; i++) {
342: if ("-props".equals(args[i])) {
343: props = args[++i];
344: continue;
345: }
346: if ("-num_bursts".equals(args[i])) {
347: num_bursts = Integer.parseInt(args[++i]);
348: continue;
349: }
350: if ("-num_msgs_per_burst".equals(args[i])) {
351: num_msgs_per_burst = Integer.parseInt(args[++i]);
352: continue;
353: }
354: if ("-sleep_time".equals(args[i])) {
355: sleep_time = Long.parseLong(args[++i]);
356: continue;
357: }
358: if ("-msg_size".equals(args[i])) {
359: msg_size = Integer.parseInt(args[++i]);
360: continue;
361: }
362: if ("-sender".equals(args[i])) {
363: sender = Boolean.valueOf(args[++i]).booleanValue();
364: continue;
365: }
366: help();
367: return;
368: }
369: try {
370: t = new PerfTest(props, num_bursts, num_msgs_per_burst,
371: msg_size, sleep_time, sender);
372: t.start();
373: } catch (Throwable ex) {
374: ex.printStackTrace();
375: }
376: }
377:
378: static void help() {
379: System.out
380: .println("PerfTest [-help] [-props <properties>] [-num_bursts <num>] "
381: + "[-num_msgs_per_burst <num>] [-sleep_time <number of msecs>] "
382: + "[-msg_size <bytes>] [-sender <true/false>]");
383: }
384:
385: public static class MyHeader extends Header {
386: public static final int DATA = 1;
387: public static final int START = 2;
388: public static final int CLEAR = 3;
389: public static final int DONE = 4;
390:
391: int type = 0;
392: int seqno = -1;
393:
394: public MyHeader() {
395:
396: }
397:
398: public MyHeader(int type, int seqno) {
399: this .type = type;
400: this .seqno = seqno;
401: }
402:
403: public long size() {
404: return 16;
405: }
406:
407: public String toString() {
408: StringBuffer sb = new StringBuffer();
409: switch (type) {
410: case DATA:
411: sb.append("DATA (seqno=").append(seqno).append(')');
412: break;
413: case START:
414: sb.append("START");
415: break;
416: case CLEAR:
417: sb.append("CLEAR");
418: break;
419: default:
420: sb.append("<n/a>");
421: break;
422: }
423: return sb.toString();
424: }
425:
426: public void writeExternal(ObjectOutput out) throws IOException {
427: out.writeInt(type);
428: out.writeInt(seqno);
429: }
430:
431: public void readExternal(ObjectInput in) throws IOException,
432: ClassNotFoundException {
433: type = in.readInt();
434: seqno = in.readInt();
435: }
436:
437: }
438:
439: class Entry {
440: long start = 0, stop = 0;
441: int num_received = 0;
442: int[] seqnos = null;
443:
444: Entry(int num) {
445: seqnos = new int[num];
446: for (int i = 0; i < seqnos.length; i++)
447: seqnos[i] = -1;
448: start = System.currentTimeMillis();
449: }
450:
451: void add(int seqno) {
452: if (seqnos != null)
453: seqnos[seqno] = seqno;
454: num_received++;
455: if (num_received >= seqnos.length) {
456: if (done())
457: stop = System.currentTimeMillis();
458: }
459: }
460:
461: boolean done() {
462: if (seqnos == null)
463: return false;
464: for (int i = 0; i < seqnos.length; i++)
465: if (seqnos[i] < 0)
466: return false;
467: return true;
468: }
469:
470: int getNumReceived() {
471: return num_received;
472: }
473:
474: int getRealReceived() {
475: int num = 0;
476: if (seqnos == null)
477: return 0;
478: for (int i = 0; i < seqnos.length; i++) {
479: if (seqnos[i] > -1)
480: num++;
481: }
482: return num;
483: }
484:
485: String printStats() {
486: StringBuffer sb = new StringBuffer();
487: sb.append("done=").append(done()).append('\n');
488: sb.append("number of messages received: ").append(
489: getRealReceived()).append('\n');
490: sb.append("total time: ").append(stop - start).append(
491: " ms\n");
492: sb.append("msgs/sec: ").append(
493: (double) getRealReceived()
494: / ((stop - start) / 1000.0)).append('\n');
495: sb.append("throughput (kb/sec): ").append(
496: (getRealReceived() * msg_size / 1000.0)
497: / ((stop - start) / 1000.0)).append('\n');
498: return sb.toString();
499: }
500: }
501:
502: }
|