001: package org.jgroups.tests.perf;
002:
003: import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
004: import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
005: import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
006: import org.apache.commons.logging.Log;
007: import org.apache.commons.logging.LogFactory;
008: import org.jgroups.Version;
009: import org.jgroups.util.Util;
010:
011: import java.io.BufferedReader;
012: import java.io.FileReader;
013: import java.io.FileWriter;
014: import java.io.IOException;
015: import java.text.NumberFormat;
016: import java.util.*;
017:
018: /** You start the test by running this class.
019: * @author Bela Ban (belaban@yahoo.com)
020:
021: */
022: public class Test implements Receiver {
023: String props = null;
024: Properties config;
025: boolean sender = false;
026: Transport transport = null;
027: Object local_addr = null;
028:
029: /** Map<Object,MemberInfo> members. Keys=member addresses, value=MemberInfo */
030: Map senders = new ConcurrentReaderHashMap(10);
031:
032: /** Keeps track of members. ArrayList<SocketAddress> */
033: final ArrayList members = new ArrayList();
034:
035: /** Set when first message is received */
036: long start = 0;
037:
038: /** Set when last message is received */
039: long stop = 0;
040:
041: int num_members = 0;
042: int num_senders = 0;
043: long num_msgs_expected = 0;
044:
045: long num_msgs_received = 0; // from everyone
046: long num_bytes_received = 0; // from everyone
047:
048: Log log = LogFactory.getLog(getClass());
049:
050: boolean all_received = false;
051: boolean final_results_received = false;
052:
053: /** Map<Object, MemberInfo>. A hashmap of senders, each value is the 'senders' hashmap */
054: Map results = new HashMap();
055:
056: private ResultsPublisher publisher = new ResultsPublisher();
057:
058: List heard_from = new ArrayList();
059:
060: boolean dump_transport_stats = false;
061:
062: /** Log every n msgs received */
063: long log_interval = 1000;
064:
065: long counter = 1;
066: long msg_size = 1000;
067: boolean jmx = false;
068:
069: /** Number of ms to wait at the receiver to simulate processing of the received message (0 == don't wait) */
070: long processing_delay = 0;
071:
072: FileWriter output = null;
073:
074: QueuedExecutor response_sender = new QueuedExecutor();
075:
076: static NumberFormat f;
077:
078: static {
079: f = NumberFormat.getNumberInstance();
080: f.setGroupingUsed(false);
081: f.setMaximumFractionDigits(2);
082: }
083:
084: public void start(Properties c, boolean verbose, boolean jmx,
085: String output) throws Exception {
086: String config_file = "config.txt";
087: BufferedReader fileReader;
088: String line;
089: String key, val;
090: StringTokenizer st;
091: Properties tmp = new Properties();
092:
093: if (output != null)
094: this .output = new FileWriter(output, false);
095:
096: response_sender.setThreadFactory(new ThreadFactory() {
097: public Thread newThread(Runnable runnable) {
098: return new Thread(runnable, "Test.ResponseSender");
099: }
100: });
101:
102: config_file = c.getProperty("config");
103: fileReader = new BufferedReader(new FileReader(config_file));
104: while ((line = fileReader.readLine()) != null) {
105: if (line.startsWith("#"))
106: continue;
107: line = line.trim();
108: if (line.length() == 0)
109: continue;
110: st = new StringTokenizer(line, "=", false);
111: key = st.nextToken().toLowerCase();
112: val = st.nextToken();
113: tmp.put(key, val);
114: }
115: fileReader.close();
116:
117: // 'tmp' now contains all properties from the file, now we need to override the ones
118: // passed to us by 'c'
119: tmp.putAll(c);
120: this .config = tmp;
121:
122: StringBuffer sb = new StringBuffer();
123: sb
124: .append("\n\n----------------------- TEST -----------------------\n");
125: sb.append("Date: ").append(new Date()).append('\n');
126: sb.append("Run by: ").append(System.getProperty("user.name"))
127: .append("\n\n");
128: if (verbose)
129: sb.append("Properties: ").append(printProperties()).append(
130: "\n-------------------------\n\n");
131:
132: for (Iterator it = this .config.entrySet().iterator(); it
133: .hasNext();) {
134: Map.Entry entry = (Map.Entry) it.next();
135: sb.append(entry.getKey()).append(":\t").append(
136: entry.getValue()).append('\n');
137: }
138: sb.append("JGroups version: ").append(Version.description)
139: .append('\n');
140: System.out.println("Configuration is: " + sb);
141:
142: output(sb.toString());
143:
144: props = this .config.getProperty("props");
145: num_members = Integer.parseInt(this .config
146: .getProperty("num_members"));
147: num_senders = Integer.parseInt(this .config
148: .getProperty("num_senders"));
149: long num_msgs = Long.parseLong(this .config
150: .getProperty("num_msgs"));
151: this .num_msgs_expected = num_senders * num_msgs;
152: sender = Boolean.valueOf(this .config.getProperty("sender"))
153: .booleanValue();
154: msg_size = Long.parseLong(this .config.getProperty("msg_size"));
155: String tmp2 = this .config.getProperty("dump_transport_stats",
156: "false");
157: if (Boolean.valueOf(tmp2).booleanValue())
158: this .dump_transport_stats = true;
159: tmp2 = this .config.getProperty("log_interval");
160: if (tmp2 != null)
161: log_interval = Long.parseLong(tmp2);
162:
163: sb = new StringBuffer();
164: sb.append("\n##### msgs_received");
165: sb.append(", current time (in ms)");
166: sb.append(", msgs/sec");
167: sb.append(", throughput/sec [KB]");
168: sb.append(", free_mem [KB] ");
169: sb.append(", total_mem [KB] ");
170: output(sb.toString());
171:
172: if (jmx) {
173: this .config.setProperty("jmx", "true");
174: }
175: this .jmx = new Boolean(this .config.getProperty("jmx"))
176: .booleanValue();
177:
178: String tmp3 = this .config.getProperty("processing_delay");
179: if (tmp3 != null)
180: this .processing_delay = Long.parseLong(tmp3);
181:
182: String transport_name = this .config.getProperty("transport");
183: transport = (Transport) Util.loadClass(transport_name,
184: this .getClass()).newInstance();
185: transport.create(this .config);
186: transport.setReceiver(this );
187: transport.start();
188: local_addr = transport.getLocalAddress();
189: }
190:
191: private void output(String msg) {
192: // if(log.isInfoEnabled())
193: // log.info(msg);
194: if (this .output != null) {
195: try {
196: this .output.write(msg + "\n");
197: this .output.flush();
198: } catch (IOException e) {
199: }
200: }
201: }
202:
203: private String printProperties() {
204: StringBuffer sb = new StringBuffer();
205: Properties p = System.getProperties();
206: for (Iterator it = p.entrySet().iterator(); it.hasNext();) {
207: Map.Entry entry = (Map.Entry) it.next();
208: sb.append(entry.getKey()).append(": ").append(
209: entry.getValue()).append('\n');
210: }
211: return sb.toString();
212: }
213:
214: public void stop() {
215: if (transport != null) {
216: transport.stop();
217: transport.destroy();
218: }
219: if (response_sender != null) {
220: response_sender.shutdownNow();
221: }
222: if (this .output != null) {
223: try {
224: this .output.close();
225: } catch (IOException e) {
226: }
227: }
228: }
229:
230: public void receive(Object sender, byte[] payload) {
231: if (payload == null || payload.length == 0) {
232: System.err.println("payload is incorrect (sender=" + sender
233: + "): " + payload);
234: return;
235: }
236:
237: try {
238: int type = payload[0];
239: if (type == 1) { // DATA
240: int len = payload.length - 1;
241: handleData(sender, len);
242: return;
243: }
244:
245: byte[] tmp = new byte[payload.length - 1];
246: System.arraycopy(payload, 1, tmp, 0, tmp.length);
247: Data d = (Data) Util.streamableFromByteBuffer(Data.class,
248: tmp);
249:
250: switch (d.getType()) {
251: case Data.DISCOVERY_REQ:
252: // System.out.println("-- received discovery request");
253: sendDiscoveryResponse();
254: break;
255: case Data.DISCOVERY_RSP:
256: // System.out.println("-- received discovery response from " + sender);
257: synchronized (this .members) {
258: if (!this .members.contains(sender)) {
259: this .members.add(sender);
260: System.out.println("-- " + sender + " joined");
261: if (d.sender) {
262: synchronized (this .members) {
263: if (!this .senders.containsKey(sender)) {
264: this .senders.put(sender,
265: new MemberInfo(d.num_msgs));
266: }
267: }
268: }
269: this .members.notifyAll();
270: }
271: }
272: break;
273:
274: case Data.FINAL_RESULTS:
275: publisher.stop();
276: if (!final_results_received) {
277: dumpResults(d.results);
278: final_results_received = true;
279: }
280: synchronized (this ) {
281: this .notifyAll();
282: }
283: break;
284:
285: case Data.RESULTS:
286: results.put(sender, d.result);
287: heard_from.remove(sender);
288: if (heard_from.size() == 0) {
289: for (int i = 0; i < 3; i++) {
290: sendFinalResults();
291: Util.sleep(100);
292: }
293: }
294: break;
295:
296: default:
297: log.error("received invalid data type: " + payload[0]);
298: break;
299: }
300: } catch (Exception e) {
301: e.printStackTrace();
302: }
303: }
304:
305: private void handleData(Object sender, int num_bytes) {
306: if (all_received)
307: return;
308: if (start == 0) {
309: start = System.currentTimeMillis();
310: }
311:
312: num_msgs_received++;
313: num_bytes_received += num_bytes;
314:
315: if (num_msgs_received >= num_msgs_expected) {
316: if (stop == 0)
317: stop = System.currentTimeMillis();
318: all_received = true;
319: }
320:
321: if (num_msgs_received % log_interval == 0)
322: System.out.println(new StringBuffer("-- received ").append(
323: num_msgs_received).append(" messages"));
324:
325: if (counter % log_interval == 0) {
326: output(dumpStats(counter));
327: }
328:
329: MemberInfo info = (MemberInfo) this .senders.get(sender);
330: if (info != null) {
331: if (info.start == 0)
332: info.start = System.currentTimeMillis();
333: info.num_msgs_received++;
334: counter++;
335: info.total_bytes_received += num_bytes;
336: if (info.num_msgs_received >= info.num_msgs_expected) {
337: info.done = true;
338: if (info.stop == 0)
339: info.stop = System.currentTimeMillis();
340: } else {
341: if (processing_delay > 0)
342: Util.sleep(processing_delay);
343: }
344: } else {
345: log.error("-- sender " + sender
346: + " not found in senders hashmap");
347: }
348:
349: if (all_received) {
350: if (!this .sender)
351: dumpSenders();
352: publisher.start();
353: }
354: }
355:
356: private void sendResults() throws Exception {
357: Data d = new Data(Data.RESULTS);
358: byte[] buf;
359: MemberInfo info = new MemberInfo(num_msgs_expected);
360: info.done = true;
361: info.num_msgs_received = num_msgs_received;
362: info.start = start;
363: info.stop = stop;
364: info.total_bytes_received = this .num_bytes_received;
365: d.result = info;
366: buf = generatePayload(d, null);
367: transport.send(null, buf);
368: }
369:
370: private void sendFinalResults() throws Exception {
371: Data d = new Data(Data.FINAL_RESULTS);
372: d.results = new ConcurrentReaderHashMap(this .results);
373: final byte[] buf = generatePayload(d, null);
374: // transport.send(null, buf);
375:
376: response_sender.execute(new Runnable() {
377: public void run() {
378: try {
379: transport.send(null, buf);
380: } catch (Exception e) {
381: log.error("failed sending discovery response", e);
382: }
383: }
384: });
385: }
386:
387: boolean allReceived() {
388: return all_received;
389: }
390:
391: boolean receivedFinalResults() {
392: return final_results_received;
393: }
394:
395: void sendMessages(long interval, int nanos, boolean busy_sleep)
396: throws Exception {
397: long total_msgs = 0;
398: int msgSize = Integer.parseInt(config.getProperty("msg_size"));
399: int num_msgs = Integer.parseInt(config.getProperty("num_msgs"));
400: // int logInterval=Integer.parseInt(config.getProperty("log_interval"));
401: byte[] buf = new byte[msgSize];
402: for (int k = 0; k < msgSize; k++)
403: buf[k] = '.';
404: Data d = new Data(Data.DATA);
405: byte[] payload = generatePayload(d, buf);
406: System.out.println("-- sending " + num_msgs + " "
407: + Util.printBytes(msgSize) + " messages");
408: for (int i = 0; i < num_msgs; i++) {
409: transport.send(null, payload);
410: total_msgs++;
411: if (total_msgs % log_interval == 0) {
412: System.out.println("++ sent " + total_msgs);
413: }
414: if (interval > 0 || nanos > 0) {
415: if (busy_sleep)
416: Util.sleep(interval, busy_sleep);
417: else
418: Util.sleep(interval, nanos);
419: }
420: }
421: }
422:
423: byte[] generatePayload(Data d, byte[] buf) throws Exception {
424: byte[] tmp = buf != null ? buf : Util.streamableToByteBuffer(d);
425: byte[] payload = new byte[tmp.length + 1];
426: payload[0] = intToByte(d.getType());
427: System.arraycopy(tmp, 0, payload, 1, tmp.length);
428: return payload;
429: }
430:
431: private byte intToByte(int type) {
432: switch (type) {
433: case Data.DATA:
434: return 1;
435: case Data.DISCOVERY_REQ:
436: return 2;
437: case Data.DISCOVERY_RSP:
438: return 3;
439: case Data.RESULTS:
440: return 4;
441: case Data.FINAL_RESULTS:
442: return 5;
443: default:
444: return 0;
445: }
446: }
447:
448: private void dumpResults(Map final_results) {
449: Object member;
450: Map.Entry entry;
451: MemberInfo val;
452: double combined_msgs_sec, tmp = 0;
453: long combined_tp;
454: StringBuffer sb = new StringBuffer();
455: sb.append("\n-- results:\n");
456:
457: for (Iterator it = final_results.entrySet().iterator(); it
458: .hasNext();) {
459: entry = (Map.Entry) it.next();
460: member = entry.getKey();
461: val = (MemberInfo) entry.getValue();
462: tmp += val.getMessageSec();
463: sb.append("\n").append(member);
464: if (member.equals(local_addr))
465: sb.append(" (myself)");
466: sb.append(":\n");
467: sb.append(val);
468: sb.append('\n');
469: }
470: combined_msgs_sec = tmp / final_results.size();
471: combined_tp = (long) combined_msgs_sec * msg_size;
472:
473: sb.append("\ncombined: ").append(f.format(combined_msgs_sec))
474: .append(
475: " msgs/sec averaged over all receivers (throughput="
476: + Util.printBytes(combined_tp)
477: + "/sec)\n");
478: System.out.println(sb.toString());
479: output(sb.toString());
480: }
481:
482: private void dumpSenders() {
483: StringBuffer sb = new StringBuffer();
484: dump(this .senders, sb);
485: System.out.println(sb.toString());
486: }
487:
488: private void dump(Map map, StringBuffer sb) {
489: Map.Entry entry;
490: Object mySender;
491: MemberInfo mi;
492: MemberInfo combined = new MemberInfo(0);
493: combined.start = Long.MAX_VALUE;
494: combined.stop = Long.MIN_VALUE;
495:
496: sb.append("\n-- local results:\n");
497: for (Iterator it2 = map.entrySet().iterator(); it2.hasNext();) {
498: entry = (Map.Entry) it2.next();
499: mySender = entry.getKey();
500: mi = (MemberInfo) entry.getValue();
501: combined.start = Math.min(combined.start, mi.start);
502: combined.stop = Math.max(combined.stop, mi.stop);
503: combined.num_msgs_expected += mi.num_msgs_expected;
504: combined.num_msgs_received += mi.num_msgs_received;
505: combined.total_bytes_received += mi.total_bytes_received;
506: sb.append("sender: ").append(mySender).append(": ").append(
507: mi).append('\n');
508: }
509: }
510:
511: private String dumpStats(long received_msgs) {
512: double msgs_sec, throughput_sec;
513: long current;
514:
515: StringBuffer sb = new StringBuffer();
516: sb.append(received_msgs).append(' ');
517:
518: current = System.currentTimeMillis();
519: sb.append(current).append(' ');
520:
521: msgs_sec = received_msgs / ((current - start) / 1000.0);
522: throughput_sec = msgs_sec * msg_size;
523:
524: sb.append(f.format(msgs_sec)).append(' ').append(
525: f.format(throughput_sec)).append(' ');
526:
527: sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(
528: ' ');
529:
530: sb.append(Runtime.getRuntime().totalMemory() / 1000.0);
531:
532: if (dump_transport_stats) {
533: Map stats = transport.dumpStats();
534: if (stats != null) {
535: print(stats, sb);
536: }
537: }
538: return sb.toString();
539: }
540:
541: public String dumpTransportStats() {
542: Map stats = transport.dumpStats();
543: StringBuffer sb = new StringBuffer(128);
544: if (stats != null) {
545: Map.Entry entry;
546: String key;
547: Map value;
548: for (Iterator it = stats.entrySet().iterator(); it
549: .hasNext();) {
550: entry = (Map.Entry) it.next();
551: key = (String) entry.getKey();
552: value = (Map) entry.getValue();
553: sb.append("\n").append(key).append(":\n");
554: for (Iterator it2 = value.entrySet().iterator(); it2
555: .hasNext();) {
556: sb.append(it2.next()).append("\n");
557: }
558: }
559: }
560: return sb.toString();
561: }
562:
563: private void print(Map stats, StringBuffer sb) {
564: sb.append("\nTransport stats:\n\n");
565: Map.Entry entry;
566: Object key, val;
567: for (Iterator it = stats.entrySet().iterator(); it.hasNext();) {
568: entry = (Map.Entry) it.next();
569: key = entry.getKey();
570: val = entry.getValue();
571: sb.append(key).append(": ").append(val).append("\n");
572: }
573: }
574:
575: void runDiscoveryPhase() throws Exception {
576: sendDiscoveryRequest();
577: sendDiscoveryResponse();
578:
579: synchronized (this .members) {
580: System.out.println("-- waiting for " + num_members
581: + " members to join");
582: while (this .members.size() < num_members) {
583: this .members.wait(2000);
584: sendDiscoveryRequest();
585: sendDiscoveryResponse();
586: }
587:
588: heard_from.addAll(members);
589: System.out.println("-- members: " + this .members.size());
590: }
591: }
592:
593: void sendDiscoveryRequest() throws Exception {
594: Data d = new Data(Data.DISCOVERY_REQ);
595: // System.out.println("-- sending discovery request");
596: transport.send(null, generatePayload(d, null));
597: }
598:
599: void sendDiscoveryResponse() throws Exception {
600: final Data d2 = new Data(Data.DISCOVERY_RSP);
601: if (sender) {
602: d2.sender = true;
603: d2.num_msgs = Long
604: .parseLong(config.getProperty("num_msgs"));
605: }
606:
607: response_sender.execute(new Runnable() {
608: public void run() {
609: try {
610: transport.send(null, generatePayload(d2, null));
611: } catch (Exception e) {
612: log.error("failed sending discovery response", e);
613: }
614: }
615: });
616:
617: }
618:
619: public static void main(String[] args) {
620: Properties config = new Properties();
621: boolean sender = false, verbose = false, jmx = false, dump_stats = false; // dumps at end of run
622: Test t = null;
623: String output = null;
624: long interval = 0;
625: int interval_nanos = 0;
626: boolean busy_sleep = false;
627:
628: for (int i = 0; i < args.length; i++) {
629: if ("-sender".equals(args[i])) {
630: config.put("sender", "true");
631: sender = true;
632: continue;
633: }
634: if ("-receiver".equals(args[i])) {
635: config.put("sender", "false");
636: sender = false;
637: continue;
638: }
639: if ("-config".equals(args[i])) {
640: String config_file = args[++i];
641: config.put("config", config_file);
642: continue;
643: }
644: if ("-props".equals(args[i])) {
645: String props = args[++i];
646: config.put("props", props);
647: continue;
648: }
649: if ("-verbose".equals(args[i])) {
650: verbose = true;
651: continue;
652: }
653: if ("-jmx".equals(args[i])) {
654: jmx = true;
655: continue;
656: }
657: if ("-dump_stats".equals(args[i])) {
658: dump_stats = true;
659: continue;
660: }
661: if ("-interval".equals(args[i])) {
662: interval = Long.parseLong(args[++i]);
663: continue;
664: }
665: if ("-nanos".equals(args[i])) {
666: interval_nanos = Integer.parseInt(args[++i]);
667: continue;
668: }
669: if ("-busy_sleep".equals(args[i])) {
670: busy_sleep = true;
671: continue;
672: }
673: if ("-f".equals(args[i])) {
674: output = args[++i];
675: continue;
676: }
677: help();
678: return;
679: }
680:
681: try {
682:
683: /*int prio=Thread.currentThread().getPriority();
684: System.out.println("current thread: " + Thread.currentThread() + ", prio: " + prio);
685:
686: Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
687: prio=Thread.currentThread().getPriority();
688: System.out.println("current thread: " + Thread.currentThread() + ", prio: " + prio);*/
689:
690: t = new Test();
691: t.start(config, verbose, jmx, output);
692: t.runDiscoveryPhase();
693: if (sender) {
694: t.sendMessages(interval, interval_nanos, busy_sleep);
695: }
696: synchronized (t) {
697: while (t.receivedFinalResults() == false) {
698: t.wait(2000);
699: }
700: }
701: if (dump_stats) {
702: String stats = t.dumpTransportStats();
703: System.out.println("\nTransport statistics:\n" + stats);
704: }
705: if (t.jmx) {
706: System.out.println("jmx=true: not terminating");
707: if (t != null) {
708: t.stop();
709: t = null;
710: }
711: while (true) {
712: Util.sleep(60000);
713: }
714: }
715: } catch (Exception e) {
716: e.printStackTrace();
717: } finally {
718: if (t != null) {
719: t.stop();
720: }
721: }
722: }
723:
724: static void help() {
725: System.out
726: .println("Test [-help] ([-sender] | [-receiver]) "
727: + "[-config <config file>] "
728: + "[-props <stack config>] [-verbose] [-jmx] "
729: + "[-dump_stats] [-f <filename>] [-interval <ms between sends>] "
730: + "[-nanos <additional nanos to sleep in interval>] [-busy_sleep (cancels out -nanos)]");
731: }
732:
733: private class ResultsPublisher implements Runnable {
734: final long interval = 1000;
735: boolean running = true;
736: Thread t;
737:
738: void start() {
739: if (t == null) {
740: t = new Thread(this , "ResultsPublisher");
741: t.setDaemon(true);
742: t.start();
743: }
744: }
745:
746: void stop() {
747: if (t != null && t.isAlive()) {
748: Thread tmp = t;
749: t = null;
750: tmp.interrupt();
751: }
752: }
753:
754: public void run() {
755: try {
756: while (t != null) {
757: sendResults();
758: Util.sleep(interval);
759: }
760: } catch (Exception e) {
761: e.printStackTrace();
762: }
763: }
764: }
765:
766: }
|