001: package org.jgroups.tests;
002:
003: import org.jgroups.Global;
004: import org.jgroups.ReceiverAdapter;
005: import org.jgroups.stack.IpAddress;
006: import org.jgroups.util.Util;
007:
008: import java.io.IOException;
009: import java.net.*;
010: import java.nio.ByteBuffer;
011:
012: /**
013: * Class that measure RTT between a client and server using multicast sockets
014: * @author Bela Ban
015: * @version $Id: RoundTripMulticast.java,v 1.2 2006/08/08 06:28:19 belaban Exp $
016: */
017: public class RoundTripMulticast extends ReceiverAdapter {
018: MulticastSocket mcast_recv_sock; // to receive mcast traffic
019: MulticastSocket mcast_send_sock; // to send mcast traffic
020: DatagramSocket ucast_sock; // to receive and send unicast traffic
021: InetAddress bind_addr, mcast_addr;
022: int mcast_port = 7500;
023: int num = 1000;
024: int msg_size = 10;
025: boolean server = false;
026: final byte[] RSP_BUF = new byte[] { 1 }; // 1=response
027: int num_responses = 0;
028: final Object mutex = new Object();
029: IpAddress local_addr;
030:
031: interface Receiver {
032: void receive(byte[] buffer, int offset, int length,
033: InetAddress sender, int sender_port);
034: }
035:
036: private void start(boolean server, int num, int msg_size,
037: InetAddress bind_addr, InetAddress mcast_addr,
038: int mcast_port) throws Exception {
039: this .server = server;
040: this .num = num;
041: this .msg_size = msg_size;
042: this .bind_addr = bind_addr;
043: this .mcast_addr = mcast_addr;
044: this .mcast_port = mcast_port;
045:
046: // SocketAddress saddr=new InetSocketAddress(bind_addr, mcast_port);
047: mcast_send_sock = new MulticastSocket(mcast_port);
048: mcast_send_sock.setTimeToLive(2);
049: mcast_send_sock.setInterface(bind_addr);
050: SocketAddress group = new InetSocketAddress(mcast_addr,
051: mcast_port);
052: mcast_send_sock.joinGroup(group, null);
053:
054: mcast_recv_sock = new MulticastSocket(mcast_port);
055: mcast_recv_sock.setTimeToLive(2);
056: mcast_recv_sock.setInterface(bind_addr);
057: mcast_recv_sock.joinGroup(group, null);
058:
059: ucast_sock = new DatagramSocket(0, bind_addr);
060: ucast_sock.setTrafficClass(16); // 0x10
061: local_addr = new IpAddress(ucast_sock.getLocalAddress(),
062: ucast_sock.getLocalPort());
063:
064: if (server) {
065: Receiver r = new Receiver() {
066: public void receive(byte[] buf, int offset, int length,
067: InetAddress sender, int sender_port) {
068: ByteBuffer buffer = ByteBuffer.wrap(buf, offset,
069: length);
070: byte r = buffer.get();
071: // System.out.println("received " + (r == 0? "request" : "response"));
072: short len = buffer.getShort();
073: byte[] tmp = new byte[len];
074: buffer.get(tmp, 0, len);
075: try {
076: IpAddress real_sender = (IpAddress) Util
077: .streamableFromByteBuffer(
078: IpAddress.class, tmp);
079: DatagramPacket packet = new DatagramPacket(
080: RSP_BUF, 0, RSP_BUF.length, real_sender
081: .getIpAddress(), real_sender
082: .getPort());
083: ucast_sock.send(packet); // send the response via DatagramSocket
084: } catch (Exception e) {
085: e.printStackTrace();
086: }
087: }
088: };
089: ReceiverThread rt = new ReceiverThread(r, mcast_recv_sock);
090: rt.start();
091:
092: System.out.println("server started (ctrl-c to kill)");
093: while (true) {
094: Util.sleep(60000);
095: }
096: } else {
097: System.out.println("sending " + num + " requests");
098: sendRequests();
099: }
100:
101: mcast_recv_sock.close();
102: mcast_send_sock.close();
103: ucast_sock.close();
104: }
105:
106: private void sendRequests() throws Exception {
107: byte[] marshalled_addr = Util
108: .streamableToByteBuffer(local_addr);
109: int length = Global.BYTE_SIZE + // request or response byte
110: Global.SHORT_SIZE + // length of marshalled IpAddress
111: marshalled_addr.length + msg_size;
112: long start, stop, total;
113: double requests_per_sec;
114: double ms_per_req;
115: int print = num / 10;
116: int count = 0;
117:
118: num_responses = 0;
119:
120: ByteBuffer buffer = ByteBuffer.allocate(length);
121: buffer.put((byte) 0); // request
122: buffer.putShort((short) marshalled_addr.length);
123: buffer.put(marshalled_addr, 0, marshalled_addr.length);
124: byte[] payload = new byte[msg_size];
125: buffer.put(payload, 0, payload.length);
126: byte[] array = buffer.array();
127:
128: ReceiverThread mcast_receiver = new ReceiverThread(
129: new Receiver() {
130: public void receive(byte[] buffer, int offset,
131: int length, InetAddress sender,
132: int sender_port) {
133: // System.out.println("mcast from " + sender + ":" + sender_port + " was discarded");
134: }
135: }, mcast_recv_sock);
136: mcast_receiver.start();
137:
138: ReceiverThread ucast_receiver = new ReceiverThread(
139: new Receiver() {
140: public void receive(byte[] buffer, int offset,
141: int length, InetAddress sender,
142: int sender_port) {
143: synchronized (mutex) {
144: num_responses++;
145: mutex.notify();
146: }
147: }
148: }, ucast_sock);
149: ucast_receiver.start();
150:
151: start = System.currentTimeMillis();
152: for (int i = 0; i < num; i++) {
153: DatagramPacket packet = new DatagramPacket(array, 0,
154: array.length, mcast_addr, mcast_port);
155: try {
156: mcast_send_sock.send(packet);
157: synchronized (mutex) {
158: while (num_responses != count + 1) {
159: mutex.wait(1000);
160: }
161: count = num_responses;
162: if (num_responses >= num) {
163: System.out.println("received all responses ("
164: + num_responses + ")");
165: break;
166: }
167: }
168: if (num_responses % print == 0) {
169: System.out.println("- received " + num_responses);
170: }
171: } catch (Exception e) {
172: e.printStackTrace();
173: }
174: }
175: stop = System.currentTimeMillis();
176:
177: /*start=System.currentTimeMillis();
178: for(int i=0; i < num; i++) {
179: DatagramPacket packet=new DatagramPacket(array, 0, array.length, mcast_addr, mcast_port);
180: try {
181: mcast_send_sock.send(packet);
182:
183: if(num_responses % print == 0) {
184: System.out.println("- received " + num_responses);
185: }
186: synchronized(mutex) {
187: if(num_responses >= num) {
188: System.out.println("received all responses (" + num_responses + ")");
189: break;
190: }
191: else
192: mutex.wait();
193: }
194: }
195: catch(Exception e) {
196: e.printStackTrace();
197: }
198: }
199: stop=System.currentTimeMillis();*/
200: total = stop - start;
201: requests_per_sec = num / (total / 1000.0);
202: ms_per_req = total / (double) num;
203: System.out.println("Took " + total + "ms for " + num
204: + " requests: " + requests_per_sec + " requests/sec, "
205: + ms_per_req + " ms/request");
206: }
207:
208: static class ReceiverThread implements Runnable {
209: Receiver receiver;
210: Thread thread;
211: DatagramSocket sock;
212: byte[] buf = new byte[65000];
213: DatagramPacket packet;
214:
215: public ReceiverThread(Receiver r, DatagramSocket sock) {
216: this .receiver = r;
217: this .sock = sock;
218: }
219:
220: public final void start() {
221: thread = new Thread(this );
222: thread.start();
223: }
224:
225: public void stop() {
226: thread = null;
227: sock.close();
228: }
229:
230: public void run() {
231: while (thread != null
232: && thread.equals(Thread.currentThread())) {
233: packet = new DatagramPacket(buf, 0, buf.length);
234: try {
235: sock.receive(packet);
236: if (receiver != null) {
237: receiver.receive(packet.getData(), packet
238: .getOffset(), packet.getLength(),
239: packet.getAddress(), packet.getPort());
240: }
241: } catch (IOException e) {
242: break;
243: }
244: }
245: }
246: }
247:
248: public static void main(String[] args) throws Exception {
249: boolean server = false;
250: int num = 100;
251: int msg_size = 10; // 10 bytes
252: InetAddress bind_addr = null, mcast_addr = null;
253: int mcast_port = 7500;
254:
255: for (int i = 0; i < args.length; i++) {
256: if (args[i].equals("-num")) {
257: num = Integer.parseInt(args[++i]);
258: continue;
259: }
260: if (args[i].equals("-server")) {
261: server = true;
262: continue;
263: }
264: if (args[i].equals("-size")) {
265: msg_size = Integer.parseInt(args[++i]);
266: continue;
267: }
268: if (args[i].equals("-bind_addr")) {
269: bind_addr = InetAddress.getByName(args[++i]);
270: continue;
271: }
272: if (args[i].equals("-mcast_addr")) {
273: mcast_addr = InetAddress.getByName(args[++i]);
274: continue;
275: }
276: if (args[i].equals("-mcast_port")) {
277: mcast_port = Integer.parseInt(args[++i]);
278: continue;
279: }
280: RoundTripMulticast.help();
281: return;
282: }
283:
284: if (bind_addr == null)
285: bind_addr = InetAddress.getLocalHost();
286: if (mcast_addr == null)
287: mcast_addr = InetAddress.getByName("225.5.5.5");
288: new RoundTripMulticast().start(server, num, msg_size,
289: bind_addr, mcast_addr, mcast_port);
290: }
291:
292: private static void help() {
293: System.out
294: .println("RoundTrip [-server] [-num <number of messages>] "
295: + "[-size <size of each message (in bytes)>] [-bind_addr <bind address>] "
296: + "[-mcast_addr <mcast addr>] [-mcast_port <mcast port>]");
297: }
298: }
|