001: // $Id: NakReceiverWindowStressTest.java,v 1.8 2005/05/30 16:15:11 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.Address;
006: import org.jgroups.Message;
007: import org.jgroups.stack.IpAddress;
008: import org.jgroups.stack.NakReceiverWindow;
009: import org.jgroups.stack.Retransmitter;
010: import org.jgroups.util.Util;
011:
012: import java.io.IOException;
013:
014: /**
015: * Adds a large number of messages (with gaps to simulate message loss) into NakReceiverWindow. Receives messages
016: * on another thread. When the receiver thread has received all messages it prints out the time taken and terminates.
017: *
018: * @author Bela Ban
019: */
020: public class NakReceiverWindowStressTest implements
021: Retransmitter.RetransmitCommand {
022: NakReceiverWindow win = null;
023: Address sender = null;
024: int num_msgs = 1000, prev_value = 0;
025: double discard_prob = 0.0; // discard 0% of all insertions
026: long start, stop;
027: boolean trace = false;
028: boolean debug = false;
029:
030: public NakReceiverWindowStressTest(int num_msgs,
031: double discard_prob, boolean trace) {
032: this .num_msgs = num_msgs;
033: this .discard_prob = discard_prob;
034: this .trace = trace;
035: }
036:
037: public void retransmit(long first_seqno, long last_seqno,
038: Address sender) {
039: for (long i = first_seqno; i <= last_seqno; i++) {
040: if (debug)
041: out("-- xmit: " + i);
042: Message m = new Message(null, sender, new Long(i));
043: win.add(i, m);
044: }
045: }
046:
047: public void start() throws IOException {
048: System.out.println("num_msgs=" + num_msgs + "\ndiscard_prob="
049: + discard_prob);
050:
051: sender = new IpAddress("localhost", 5555);
052: win = new NakReceiverWindow(sender, this , 1);
053: start = System.currentTimeMillis();
054: sendMessages(num_msgs);
055: }
056:
057: void sendMessages(int num_msgs) {
058: Message msg;
059:
060: for (long i = 1; i <= num_msgs; i++) {
061: if (discard_prob > 0 && Util.tossWeightedCoin(discard_prob)
062: && i <= num_msgs) {
063: if (debug)
064: out("-- discarding " + i);
065: } else {
066: if (debug)
067: out("-- adding " + i);
068: win.add(i, new Message(null, null, new Long(i)));
069: if (trace && i % 1000 == 0)
070: System.out.println("-- added " + i);
071: while ((msg = win.remove()) != null)
072: processMessage(msg);
073: }
074: }
075: while (true) {
076: while ((msg = win.remove()) != null)
077: processMessage(msg);
078: }
079: }
080:
081: void processMessage(Message msg) {
082: long i;
083:
084: i = ((Long) msg.getObject()).longValue();
085: if (prev_value + 1 != i) {
086: System.err.println("** processMessage(): removed seqno ("
087: + i + ") is not 1 greater than "
088: + "previous value (" + prev_value + ')');
089: System.exit(0);
090: }
091: prev_value++;
092: if (trace && i % 1000 == 0)
093: System.out.println("Removed " + i);
094: if (i == num_msgs) {
095: stop = System.currentTimeMillis();
096: long total = stop - start;
097: double msgs_per_sec = num_msgs / (total / 1000.0);
098: double msgs_per_ms = num_msgs / (double) total;
099: System.out.println("Inserting and removing " + num_msgs
100: + " messages into NakReceiverWindow took " + total
101: + "ms");
102: System.out.println("Msgs/sec: " + msgs_per_sec
103: + ", msgs/ms: " + msgs_per_ms);
104: System.out.println("<enter> to terminate");
105: try {
106: System.in.read();
107: } catch (Exception ex) {
108: System.err.println(ex);
109: }
110: System.exit(0);
111: }
112: }
113:
114: void out(String msg) {
115: System.out.println(msg);
116: }
117:
118: public static void main(String[] args) {
119: NakReceiverWindowStressTest test;
120: int num_msgs = 1000;
121: double discard_prob = 0.0;
122: boolean trace = false;
123:
124: for (int i = 0; i < args.length; i++) {
125: if ("-help".equals(args[i])) {
126: help();
127: return;
128: }
129: if ("-num_msgs".equals(args[i])) {
130: num_msgs = Integer.parseInt(args[++i]);
131: continue;
132: }
133: if ("-discard".equals(args[i])) {
134: discard_prob = Double.parseDouble(args[++i]);
135: continue;
136: }
137: if ("-trace".equals(args[i])) {
138: trace = true;
139: continue;
140: }
141: }
142:
143: test = new NakReceiverWindowStressTest(num_msgs, discard_prob,
144: trace);
145: try {
146: test.start();
147: } catch (IOException e) {
148: e.printStackTrace();
149: }
150: }
151:
152: static void help() {
153: System.out
154: .println("NakReceiverWindowStressTest [-help] [-num_msgs <number>] [-discard <probability>] "
155: + "[-trace]");
156: }
157:
158: }
|