001: package org.jgroups.tests;
002:
003: // $Id: SpeedTest_NIO.java,v 1.1 2005/06/23 13:31:10 belaban Exp $
004:
005: import org.jgroups.Channel;
006: import org.jgroups.JChannel;
007: import org.jgroups.Message;
008: import org.jgroups.debug.Debugger;
009: import org.jgroups.util.Util;
010:
011: import java.net.DatagramPacket;
012: import java.net.InetAddress;
013: import java.net.MulticastSocket;
014: import java.nio.ByteBuffer;
015:
016: /**
017: * Same test as SpeedTest, but using NIO ByteBuffer rather than serialization. For 10000 messages, this took
018: * 25% of SpeedTest !
019: * Test time taken for multicasting n local messages (messages sent to self). Uses simple MulticastSocket.
020: * Note that packets might get dropped if Util.sleep(1) is commented out (on certain systems this has
021: * to be increased even further). If running with -jg option and Util.sleep() is commented out, there will
022: * probably be packet loss, which will be repaired (by means of retransmission) by JGroups. To see the
023: * retransmit messages, enable tracing (trace=true) in jgroups.properties and add the following lines:
024: * <pre>
025: * trace0=NAKACK.retransmit DEBUG STDOUT
026: * trace1=UNICAST.retransmit DEBUG STDOUT
027: * </pre>
028: *
029: * @author Bela Ban
030: */
031: public class SpeedTest_NIO {
032: static long start, stop;
033:
034: public static void main(String[] args) {
035: MulticastSocket sock = null;
036: Receiver receiver = null;
037: int num_msgs = 1000;
038: byte[] buf;
039: DatagramPacket packet;
040: InetAddress group_addr = null;
041: int[][] matrix;
042: boolean jg = false; // use JGroups channel instead of UDP MulticastSocket
043: JChannel channel = null;
044: String props = null, loopback_props;
045: String group_name = "SpeedTest-Group";
046: Message send_msg;
047: boolean debug = false, cummulative = false;
048: Debugger debugger = null;
049: long sleep_time = 1; // sleep in msecs between msg sends
050: boolean busy_sleep = false;
051: boolean yield = false;
052: int num_yields = 0;
053: boolean loopback = false;
054:
055: props = "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;"
056: + "ucast_send_buf_size=32000;ucast_recv_buf_size=64000;"
057: + "mcast_send_buf_size=32000;mcast_recv_buf_size=64000):"
058: + "PING(timeout=2000;num_initial_members=3):"
059: + "MERGE2(min_interval=5000;max_interval=10000):"
060: + "FD_SOCK:"
061: + "VERIFY_SUSPECT(timeout=1500):"
062: + "pbcast.NAKACK(max_xmit_size=8192;gc_lag=50;retransmit_timeout=600,800,1200,2400,4800):"
063: + "UNICAST(timeout=1200):"
064: + "pbcast.STABLE(desired_avg_gossip=10000):"
065: + "FRAG(frag_size=8192;down_thread=false;up_thread=false):"
066: +
067: // "PIGGYBACK(max_size=16000;max_wait_time=500):" +
068: "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
069: + "shun=false;print_local_addr=true):"
070: + "pbcast.STATE_TRANSFER";
071: // "PERF(details=true)";
072:
073: loopback_props = "LOOPBACK:"
074: + "PING(timeout=2000;num_initial_members=3):"
075: + "MERGE2(min_interval=5000;max_interval=10000):"
076: + "FD_SOCK:"
077: + "VERIFY_SUSPECT(timeout=1500):"
078: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,800,1200,2400,4800):"
079: + "UNICAST(timeout=5000):"
080: + "pbcast.STABLE(desired_avg_gossip=20000):"
081: + "FRAG(frag_size=16000;down_thread=false;up_thread=false):"
082: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
083: + "shun=false;print_local_addr=true):"
084: + "pbcast.STATE_TRANSFER";
085:
086: for (int i = 0; i < args.length; i++) {
087: if ("-help".equals(args[i])) {
088: help();
089: return;
090: }
091: if ("-jg".equals(args[i])) {
092: jg = true;
093: continue;
094: }
095: if ("-loopback".equals(args[i])) {
096: loopback = true;
097: props = loopback_props;
098: continue;
099: }
100: if ("-props".equals(args[i])) {
101: props = args[++i];
102: continue;
103: }
104: if ("-debug".equals(args[i])) {
105: debug = true;
106: continue;
107: }
108: if ("-cummulative".equals(args[i])) {
109: cummulative = true;
110: continue;
111: }
112: if ("-busy_sleep".equals(args[i])) {
113: busy_sleep = true;
114: continue;
115: }
116: if ("-yield".equals(args[i])) {
117: yield = true;
118: num_yields++;
119: continue;
120: }
121: if ("-sleep".equals(args[i])) {
122: sleep_time = Long.parseLong(args[++i]);
123: continue;
124: }
125: if ("-num_msgs".equals(args[i])) {
126: num_msgs = Integer.parseInt(args[++i]);
127: continue;
128: }
129: help();
130: return;
131: }
132:
133: System.out.println("jg = " + jg + "\nloopback = "
134: + loopback + "\ndebug = " + debug + "\nsleep = "
135: + sleep_time + "\nbusy_sleep=" + busy_sleep
136: + "\nyield=" + yield + "\nnum_yields=" + num_yields
137: + "\nnum_msgs = " + num_msgs + '\n');
138:
139: try {
140: matrix = new int[num_msgs][2];
141: for (int i = 0; i < num_msgs; i++) {
142: for (int j = 0; j < matrix[i].length; j++)
143: matrix[i][j] = 0;
144: }
145:
146: if (jg) {
147: channel = new JChannel(props);
148: channel.connect(group_name);
149: if (debug) {
150: debugger = new Debugger(channel, cummulative);
151: debugger.start();
152: }
153: } else {
154: group_addr = InetAddress.getByName("224.0.0.36");
155: sock = new MulticastSocket(7777);
156: sock.joinGroup(group_addr);
157: }
158:
159: if (debug) {
160: System.out.println("Press key to start");
161: System.in.read();
162: }
163: receiver = new Receiver(sock, channel, matrix, jg);
164: receiver.start();
165:
166: ByteBuffer bb = ByteBuffer.allocate(16);
167: bb.mark();
168:
169: start = System.currentTimeMillis();
170: for (int i = 0; i < num_msgs; i++) {
171: bb.reset();
172: bb.putInt(i);
173: buf = (byte[]) (bb.array()).clone();
174:
175: if (jg) {
176: send_msg = new Message(null, null, buf);
177: channel.send(send_msg);
178: } else {
179: packet = new DatagramPacket(buf, buf.length,
180: group_addr, 7777);
181: sock.send(packet);
182: }
183: if (i % 1000 == 0)
184: System.out.println("-- sent " + i);
185:
186: matrix[i][0] = 1;
187: if (yield) {
188: for (int k = 0; k < num_yields; k++) {
189: Thread.yield();
190: }
191: } else {
192: if (sleep_time > 0) {
193: sleep(sleep_time, busy_sleep);
194: }
195: }
196: }
197: while (true) {
198: System.in.read();
199: printMatrix(matrix);
200: }
201: } catch (Exception ex) {
202: System.err.println(ex);
203: }
204: }
205:
206: /**
207: * On most UNIX systems, the minimum sleep time is 10-20ms. Even if we specify sleep(1), the thread will
208: * sleep for at least 10-20ms. On Windows, sleep() seems to be implemented as a busy sleep, that is the
209: * thread never relinquishes control and therefore the sleep(x) is exactly x ms long.
210: */
211: static void sleep(long msecs, boolean busy_sleep) {
212: if (!busy_sleep) {
213: Util.sleep(msecs);
214: return;
215: }
216:
217: long start = System.currentTimeMillis();
218: long stop = start + msecs;
219:
220: while (stop > start) {
221: start = System.currentTimeMillis();
222: }
223: }
224:
225: static void printMatrix(int[][] m) {
226: int tmp = 0;
227: System.out.print("not sent: ");
228: for (int i = 0; i < m.length; i++) {
229: if (m[i][0] == 0) {
230: System.out.print(i + " ");
231: tmp++;
232: }
233: }
234: System.out.println("\ntotal not sent: " + tmp);
235:
236: tmp = 0;
237: System.out.print("not received: ");
238: for (int i = 0; i < m.length; i++) {
239: if (m[i][1] == 0) {
240: System.out.print(i + " ");
241: tmp++;
242: }
243: }
244: System.out.println("\ntotal not received: " + tmp);
245: System.out.println("Press CTRL-C to kill this test");
246: }
247:
248: static void help() {
249: System.out
250: .println("SpeedTest [-help] [-num_msgs <num>] [-sleep <sleeptime in msecs between messages>] "
251: + "[-busy_sleep] [-yield] [-jg] [-loopback] [-props <channel properties>] [-debug] [-cummulative]");
252: System.out
253: .println("Options -props -debug and -cummulative are only valid if -jg is used");
254: }
255:
256: static class Receiver implements Runnable {
257: Thread t = null;
258: byte[] buf = new byte[1024];
259: MulticastSocket sock;
260: Channel channel;
261: int num_msgs = 1000;
262: int[][] matrix = null;
263: boolean jg = false;
264:
265: Receiver(MulticastSocket sock, Channel channel, int[][] matrix,
266: boolean jg) {
267: this .sock = sock;
268: this .channel = channel;
269: this .matrix = matrix;
270: this .jg = jg;
271: num_msgs = matrix.length;
272: }
273:
274: public void start() {
275: if (t == null) {
276: t = new Thread(this , "receiver thread");
277: t.start();
278: }
279: }
280:
281: public void run() {
282: int num_received = 0;
283: int number;
284: DatagramPacket packet;
285: Object obj;
286: Message msg;
287: byte[] msg_data = null;
288: long total_time;
289: double msgs_per_sec = 0;
290: ByteBuffer rb = ByteBuffer.allocate(16);
291: rb.mark();
292:
293: packet = new DatagramPacket(buf, buf.length);
294: while (num_received <= num_msgs) {
295: try {
296: if (jg) {
297: obj = channel.receive(0);
298: if (obj instanceof Message) {
299: msg = (Message) obj;
300: msg_data = msg.getBuffer();
301: } else {
302: System.out.println("received non-msg: "
303: + obj.getClass());
304: continue;
305: }
306: } else {
307: sock.receive(packet);
308: msg_data = packet.getData();
309: }
310:
311: rb.rewind();
312: rb.put(msg_data);
313: rb.rewind();
314: number = rb.getInt();
315:
316: matrix[number][1] = 1;
317: // System.out.println("#set " + number);
318: num_received++;
319: if (num_received % 1000 == 0)
320: System.out.println("received " + num_received
321: + " packets");
322: if (num_received >= num_msgs)
323: break;
324: } catch (Exception ex) {
325: System.err.println("receiver: " + ex);
326: }
327: }
328: stop = System.currentTimeMillis();
329: total_time = stop - start;
330: msgs_per_sec = (num_received / (total_time / 1000.0));
331: System.out.println("\n** Sending and receiving "
332: + num_received + " took " + total_time + " msecs ("
333: + msgs_per_sec + " msgs/sec) **");
334: System.exit(1);
335: }
336: }
337:
338: }
|