001: // $Id: FragTest2.java,v 1.8 2005/05/30 16:15:11 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.Channel;
006: import org.jgroups.JChannel;
007: import org.jgroups.Message;
008: import org.jgroups.util.Util;
009:
010: /**
011: * Tests the fragmentation protocol (FRAG). A large message is broadcast to the group (sender does not
012: * receive its own broadcasts, though). The fragmentation protocol fragments messages bigger than
013: * 8K and reassembles them at the receiver. Messages sent are 100K in size (configurable).
014: */
015: public class FragTest2 {
016: int mode = 0; // 0=receiver, 1=sender
017: Channel channel;
018: String props;
019: int i = 1;
020: Message msg;
021: Object obj;
022: int MSG_SIZE; // bytes
023: String groupname = "FragTest2Group";
024: char sendingChar;
025: int num_msgs = 5;
026: long timeout = 1000;
027: int frag_size = 20000;
028:
029: public FragTest2(char character, int mode, int msg_size,
030: int num_msgs, long timeout, int frag_size) {
031: this .sendingChar = character;
032: this .mode = mode;
033: this .MSG_SIZE = msg_size;
034: this .num_msgs = num_msgs;
035: this .timeout = timeout;
036: this .frag_size = frag_size;
037:
038: props = "UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=0;"
039: + "mcast_recv_buf_size="
040: + (frag_size * 4)
041: + ";mcast_send_buf_size="
042: + (frag_size * 2)
043: + "):"
044: + "PING(timeout=3000;num_initial_members=1):"
045: + "FD(timeout=5000):"
046: + "VERIFY_SUSPECT(timeout=1500):"
047: + "pbcast.NAKACK(gc_lag=20;retransmit_timeout=2000):"
048: + "UNICAST(timeout=2000):"
049: + "pbcast.STABLE(desired_avg_gossip=5000):"
050: + "FRAG(frag_size="
051: + frag_size
052: + "):"
053: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
054: + "shun=false;print_local_addr=true)";
055: }
056:
057: public void start() throws Exception {
058:
059: channel = new JChannel(props);
060: if (mode == 1)
061: channel.setOpt(Channel.LOCAL, Boolean.FALSE);
062: channel.connect(groupname);
063:
064: if (mode == 1) {
065: for (int j = 0; j < num_msgs; j++) {
066: msg = createBigMessage(MSG_SIZE);
067: System.out.println("Sending msg (" + MSG_SIZE
068: + " bytes)");
069: channel.send(msg);
070: System.out.println("Done Sending msg (" + MSG_SIZE
071: + " bytes)");
072: Util.sleep(timeout);
073: }
074: System.out.println("Press [return] to exit");
075: System.in.read();
076: } else {
077: System.out.println("Waiting for messages:");
078:
079: while (true) {
080: try {
081: obj = channel.receive(0);
082: if (obj instanceof Message) {
083: System.out.println("Received message: " + obj);
084: Message tmp = (Message) obj;
085: byte[] buf = tmp.getBuffer();
086: for (int i = 0; i < (10 < MSG_SIZE ? 10
087: : MSG_SIZE); i++) {
088: System.out.print((char) buf[i]);
089: }
090: System.out.println();
091:
092: }
093: } catch (Exception e) {
094: System.err.println(e);
095: }
096: }
097:
098: }
099: channel.close();
100: }
101:
102: Message createBigMessage(int size) {
103: byte[] buf = new byte[size];
104: for (int i = 0; i < buf.length; i++)
105: buf[i] = (byte) sendingChar;
106: return new Message(null, null, buf);
107: }
108:
109: public static void main(String[] args) {
110: char defaultChar = 'A';
111: int default_mode = 0; // receiver
112: int MSG_SIZE = 30000;
113: int num_msgs = 10;
114: long timeout = 3000;
115: int frag_size = 20000;
116:
117: for (int i = 0; i < args.length; i++) {
118: if ("-help".equals(args[i])) {
119: usage();
120: return;
121: }
122: if ("-sender".equals(args[i])) {
123: default_mode = 1;
124: continue;
125: }
126: if ("-size".equals(args[i])) {
127: MSG_SIZE = Integer.parseInt(args[++i]);
128: continue;
129: }
130: if ("-num_msgs".equals(args[i])) {
131: num_msgs = Integer.parseInt(args[++i]);
132: continue;
133: }
134: if ("-frag_size".equals(args[i])) {
135: frag_size = Integer.parseInt(args[++i]);
136: continue;
137: }
138: if ("-timeout".equals(args[i])) {
139: timeout = Long.parseLong(args[++i]);
140: continue;
141: }
142: if ("-char".equals(args[i])) {
143: defaultChar = args[++i].charAt(0);
144: continue;
145: }
146: usage();
147: return;
148: }
149:
150: try {
151: new FragTest2(defaultChar, default_mode, MSG_SIZE,
152: num_msgs, timeout, frag_size).start();
153: } catch (Exception e) {
154: System.err.println(e);
155: }
156: }
157:
158: static void usage() {
159: System.out
160: .println("FragTest2 [-sender] [-size <message size (in bytes)>] [-timeout <msecs>]"
161: + " [-num_msgs <number of messages>] [-char <frag character>] "
162: + "[-frag_size <fragmentation size>] [-help]");
163: }
164:
165: }
|