001: // $Id: UnicastTest.java,v 1.8 2005/08/18 09:45:25 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.*;
006: import org.jgroups.util.Util;
007:
008: import java.io.*;
009: import java.util.Vector;
010:
011: /**
012: * Tests the UNICAST by sending unicast messages between a sender and a receiver
013: *
014: * @author Bela Ban
015: */
016: public class UnicastTest implements Runnable {
017: UnicastTest test;
018: JChannel channel;
019: final String groupname = "UnicastTest-Group";
020: Thread t = null;
021: long sleep_time = 0;
022: boolean exit_on_end = false, busy_sleep = false;
023:
024: public static class Data implements Externalizable {
025: public Data() {
026: }
027:
028: public void writeExternal(ObjectOutput out) throws IOException {
029: }
030:
031: public void readExternal(ObjectInput in) throws IOException,
032: ClassNotFoundException {
033: }
034: }
035:
036: public static class StartData extends Data {
037: long num_values = 0;
038:
039: public StartData() {
040: super ();
041: }
042:
043: StartData(long num_values) {
044: this .num_values = num_values;
045: }
046:
047: public void writeExternal(ObjectOutput out) throws IOException {
048: out.writeLong(num_values);
049: }
050:
051: public void readExternal(ObjectInput in) throws IOException,
052: ClassNotFoundException {
053: num_values = in.readLong();
054: }
055: }
056:
057: public static class Value extends Data {
058: long value = 0;
059:
060: public Value() {
061: super ();
062: }
063:
064: Value(long value) {
065: this .value = value;
066: }
067:
068: public void writeExternal(ObjectOutput out) throws IOException {
069: out.writeLong(value);
070: }
071:
072: public void readExternal(ObjectInput in) throws IOException,
073: ClassNotFoundException {
074: value = in.readLong();
075: }
076: }
077:
078: public void init(String props, long sleep_time,
079: boolean exit_on_end, boolean busy_sleep) throws Exception {
080: this .sleep_time = sleep_time;
081: this .exit_on_end = exit_on_end;
082: this .busy_sleep = busy_sleep;
083: channel = new JChannel(props);
084: channel.connect(groupname);
085: t = new Thread(this , "UnicastTest - receiver thread");
086: t.start();
087: }
088:
089: public void run() {
090: Data data;
091: Message msg;
092: Object obj;
093: boolean started = false;
094: long start = 0, stop = 0;
095: long current_value = 0, tmp = 0, num_values = 0;
096: long total_msgs = 0, total_time = 0, msgs_per_sec;
097:
098: while (true) {
099: try {
100: obj = channel.receive(0);
101: if (obj instanceof View)
102: System.out.println("** view: " + obj);
103: else if (obj instanceof Message) {
104: msg = (Message) obj;
105: data = (Data) msg.getObject();
106:
107: if (data instanceof StartData) {
108: if (started) {
109: System.err
110: .println("UnicastTest.run(): received START data, but am already processing data");
111: } else {
112: started = true;
113: current_value = 0; // first value to be received
114: tmp = 0;
115: num_values = ((StartData) data).num_values;
116: start = System.currentTimeMillis();
117: }
118: } else if (data instanceof Value) {
119: tmp = ((Value) data).value;
120: if (current_value + 1 != tmp) {
121: System.err.println("-- message received ("
122: + tmp + ") is not 1 greater than "
123: + current_value);
124: } else {
125: current_value++;
126: if (current_value % 1000 == 0)
127: System.out.println("received "
128: + current_value);
129: if (current_value >= num_values) {
130: stop = System.currentTimeMillis();
131: total_time = stop - start;
132: msgs_per_sec = (long) (num_values / (total_time / 1000.0));
133: System.out.println("-- received "
134: + num_values + " messages in "
135: + total_time + " ms ("
136: + msgs_per_sec
137: + " messages/sec)");
138: started = false;
139: if (exit_on_end)
140: System.exit(0);
141: }
142: }
143: }
144: }
145: } catch (ChannelNotConnectedException not_connected) {
146: System.err.println(not_connected);
147: break;
148: } catch (ChannelClosedException closed_ex) {
149: System.err.println(closed_ex);
150: break;
151: } catch (TimeoutException timeout) {
152: System.err.println(timeout);
153: break;
154: } catch (Throwable t) {
155: System.err.println(t);
156: started = false;
157: current_value = 0;
158: tmp = 0;
159: Util.sleep(1000);
160: }
161: }
162: // System.out.println("UnicastTest.run(): receiver thread terminated");
163: }
164:
165: public void eventLoop() throws Exception {
166: int c;
167:
168: while (true) {
169: System.out.print("[1] Send msgs [2] Print view [q] Quit ");
170: System.out.flush();
171: c = System.in.read();
172: switch (c) {
173: case -1:
174: break;
175: case '1':
176: sendMessages();
177: break;
178: case '2':
179: printView();
180: break;
181: case '3':
182: break;
183: case '4':
184: break;
185: case '5':
186: break;
187: case '6':
188: break;
189: case 'q':
190: channel.close();
191: return;
192: default:
193: break;
194: }
195: }
196: }
197:
198: void sendMessages() throws Exception {
199: long num_msgs = getNumberOfMessages();
200: Address receiver = getReceiver();
201: Message msg;
202: Value val = new Value(1);
203:
204: if (receiver == null) {
205: System.err
206: .println("UnicastTest.sendMessages(): receiver is null, cannot send messages");
207: return;
208: }
209:
210: System.out.println("sending " + num_msgs + " messages to "
211: + receiver);
212: msg = new Message(receiver, null, new StartData(num_msgs));
213: channel.send(msg);
214:
215: for (int i = 1; i <= num_msgs; i++) {
216: val = new Value(i);
217: msg = new Message(receiver, null, val);
218: if (i % 1000 == 0)
219: System.out.println("-- sent " + i);
220: channel.send(msg);
221: if (sleep_time > 0)
222: Util.sleep(sleep_time, busy_sleep);
223: }
224: System.out.println("done sending " + num_msgs + " to "
225: + receiver);
226: }
227:
228: void printView() {
229: System.out.println("\n-- view: " + channel.getView() + '\n');
230: try {
231: System.in.skip(System.in.available());
232: } catch (Exception e) {
233: }
234: }
235:
236: long getNumberOfMessages() {
237: BufferedReader reader = null;
238: String tmp = null;
239:
240: try {
241: System.out.print("Number of messages to send: ");
242: System.out.flush();
243: System.in.skip(System.in.available());
244: reader = new BufferedReader(
245: new InputStreamReader(System.in));
246: tmp = reader.readLine().trim();
247: return Long.parseLong(tmp);
248: } catch (Exception e) {
249: System.err.println("UnicastTest.getNumberOfMessages(): "
250: + e);
251: return 0;
252: }
253: }
254:
255: Address getReceiver() {
256: Vector mbrs = null;
257: int index;
258: BufferedReader reader;
259: String tmp;
260:
261: try {
262: mbrs = channel.getView().getMembers();
263: System.out
264: .println("pick receiver from the following members:");
265: for (int i = 0; i < mbrs.size(); i++) {
266: if (mbrs.elementAt(i).equals(channel.getLocalAddress()))
267: System.out.println("[" + i + "]: "
268: + mbrs.elementAt(i) + " (self)");
269: else
270: System.out.println("[" + i + "]: "
271: + mbrs.elementAt(i));
272: }
273: System.out.flush();
274: System.in.skip(System.in.available());
275: reader = new BufferedReader(
276: new InputStreamReader(System.in));
277: tmp = reader.readLine().trim();
278: index = Integer.parseInt(tmp);
279: return (Address) mbrs.elementAt(index); // index out of bounds caught below
280: } catch (Exception e) {
281: System.err.println("UnicastTest.getReceiver(): " + e);
282: return null;
283: }
284: }
285:
286: public static void main(String[] args) {
287: long sleep_time = 0;
288: boolean exit_on_end = false;
289: boolean busy_sleep = false;
290:
291: String udp_props = "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;"
292: + "ucast_recv_buf_size=32000;ucast_send_buf_size=64000;"
293: + "mcast_send_buf_size=32000;mcast_recv_buf_size=64000;loopback=true):";
294:
295: String regular_props = "PING(timeout=1000;num_initial_members=2):"
296: + "MERGE2(min_interval=5000;max_interval=10000):"
297: + "FD_SOCK:"
298: + "VERIFY_SUSPECT(timeout=1500):"
299: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):"
300: + "UNICAST(timeout=2000,4000,6000;window_size=100;min_threshold=10;use_gms=false):"
301: + "pbcast.STABLE(desired_avg_gossip=20000):"
302: + "FRAG(frag_size=8192;down_thread=false;up_thread=false):"
303: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)";
304:
305: String props = udp_props + regular_props;
306: String loopback_props = "LOOPBACK:" + regular_props;
307:
308: for (int i = 0; i < args.length; i++) {
309: if ("-help".equals(args[i])) {
310: help();
311: return;
312: }
313: if ("-props".equals(args[i])) {
314: props = args[++i];
315: continue;
316: }
317: if ("-sleep".equals(args[i])) {
318: sleep_time = Long.parseLong(args[++i]);
319: continue;
320: }
321: if ("-loopback".equals(args[i])) {
322: props = loopback_props;
323: continue;
324: }
325: if ("-exit_on_end".equals(args[i])) {
326: exit_on_end = true;
327: continue;
328: }
329: if ("-busy_sleep".equals(args[i])) {
330: busy_sleep = true;
331: continue;
332: }
333: }
334:
335: try {
336: UnicastTest test = new UnicastTest();
337: test.init(props, sleep_time, exit_on_end, busy_sleep);
338: test.eventLoop();
339: } catch (Exception ex) {
340: System.err.println(ex);
341: }
342: }
343:
344: static void help() {
345: System.out
346: .println("UnicastTest [-help] [-props <props>] [-sleep <time in ms between msg sends] "
347: + "[-loopback] [-exit_on_end] [-busy-sleep]");
348: }
349: }
|