001: package org.jgroups.tests;
002:
003: import org.jgroups.Channel;
004: import org.jgroups.JChannel;
005: import org.jgroups.Message;
006:
007: /**
008: * Tests the time to multicast a message to everyone and then receive the responses
009: * @author Bela Ban
010: * @version $Id: SynchronousMessageSpeedTest.java,v 1.1 2005/07/22 10:30:21 belaban Exp $
011: */
012: public class SynchronousMessageSpeedTest {
013: Channel channel;
014: String props = null;
015: boolean server = false; // role is client by default
016: int num = 1000, received = 0;
017: static final long TIMEOUT = 10000;
018:
019: public SynchronousMessageSpeedTest(String props, boolean server,
020: int num) {
021: this .props = props;
022: this .server = server;
023: this .num = num;
024: }
025:
026: public void start() throws Exception {
027: Object obj;
028: Message msg;
029: channel = new JChannel(props);
030: channel.setOpt(Channel.LOCAL, Boolean.FALSE); // do *not* receive my own messages
031: channel.connect("MessageDispatcherSpeedTestGroup");
032:
033: try {
034: while (channel.getNumMessages() > 0)
035: channel.receive(10); // clear the input queue
036:
037: if (server) {
038: System.out
039: .println("-- Started as server. Press ctrl-c to kill");
040: int i = 0;
041: while (true) {
042: obj = channel.receive(0);
043: if (obj instanceof Message) {
044: msg = (Message) obj;
045: Message rsp = new Message(msg.getSrc(), null,
046: null);
047: if (++received % 1000 == 0)
048: System.out.println("-- received "
049: + received);
050: channel.send(rsp);
051: }
052: }
053: } else {
054: sendMessages(num);
055: }
056: } catch (Throwable t) {
057: t.printStackTrace(System.err);
058: } finally {
059: channel.close();
060: }
061: }
062:
063: void sendMessages(int num) throws Exception {
064: long start, stop;
065: int show = num / 10;
066: Object obj;
067:
068: if (show <= 0)
069: show = 1;
070: start = System.currentTimeMillis();
071:
072: System.out.println("-- sending " + num + " messages");
073: for (int i = 1; i <= num; i++) {
074: channel.send(new Message());
075: if (i % show == 0)
076: System.out.println("-- sent " + i);
077:
078: while (true) {
079: obj = channel.receive(0);
080: if (obj instanceof Message) {
081: received++;
082: if (received % show == 0)
083: System.out.println("-- received response: "
084: + received);
085: break;
086: }
087: }
088: }
089: stop = System.currentTimeMillis();
090: printStats(stop - start, num);
091: }
092:
093: void printStats(long total_time, int num) {
094: double throughput = ((double) num)
095: / ((double) total_time / 1000.0);
096: System.out.println("time for " + num + " remote calls was "
097: + total_time + ", avg=" + (total_time / (double) num)
098: + "ms/invocation, " + (long) throughput + " calls/sec");
099: }
100:
101: public static void main(String[] args) {
102: String props = null;
103: boolean server = false;
104: int num = 1000;
105: SynchronousMessageSpeedTest test;
106:
107: for (int i = 0; i < args.length; i++) {
108: if ("-props".equals(args[i])) {
109: props = args[++i];
110: continue;
111: }
112: if ("-server".equals(args[i])) {
113: server = true;
114: continue;
115: }
116: if ("-num".equals(args[i])) {
117: num = Integer.parseInt(args[++i]);
118: continue;
119: }
120: help();
121: return;
122: }
123:
124: try {
125: test = new SynchronousMessageSpeedTest(props, server, num);
126: test.start();
127: } catch (Exception e) {
128: System.err.println(e);
129: }
130: }
131:
132: static void help() {
133: System.out
134: .println("RpcDispatcherSpeedTest [-help] [-props <props>] [-server] [-num <number of calls>]");
135: }
136:
137: }
|