001: // $Id: FragTestUnicast.java,v 1.7 2005/05/30 16:15:11 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.*;
006: import org.jgroups.util.Util;
007:
008: import java.util.Vector;
009:
010: /**
011: * Tests the fragmentation protocol (FRAG) for large unicast messages. The first member needs to be started as
012: * receiver. The second member needs to be started as sender (-sender). The latter then starts sending unicast
013: * messages to the receiver, which will be fragmented. If the relation between message size and fragmentation
014: * size results in many fragments to be sent, then the receive and send buffer sizes in UDP have to be
015: * adjusted accordingly, otherwise packets will be dropped and retransmitted.
016: * @author Bela Ban April 19 2001
017: */
018: public class FragTestUnicast {
019: int mode = 0; // 0=receiver, 1=sender
020: Channel channel;
021: String props;
022: int i = 1;
023: Message msg;
024: Object obj;
025: int MSG_SIZE; // bytes
026: String groupname = "FragTestUnicastGroup";
027: char sendingChar;
028: int num_msgs = 10;
029: long timeout = 3000;
030: int frag_size = 20000;
031: Address local_addr;
032:
033: public FragTestUnicast(char character, int mode, int msg_size,
034: int num_msgs, long timeout, int frag_size) {
035: this .sendingChar = character;
036: this .mode = mode;
037: this .MSG_SIZE = msg_size;
038: this .num_msgs = num_msgs;
039: this .timeout = timeout;
040: this .frag_size = frag_size;
041:
042: props = "UDP(mcast_addr=224.10.20.3;mcast_port=45566;ip_ttl=32;"
043: + "ucast_send_buf_size="
044: + (frag_size * 2)
045: + ";ucast_recv_buf_size="
046: + (frag_size * 4)
047: + "):"
048: + "PING(timeout=3000;num_initial_members=2):"
049: + "FD(timeout=5000):"
050: + "VERIFY_SUSPECT(timeout=1500):"
051: + "pbcast.NAKACK(gc_lag=20;retransmit_timeout=2000):"
052: + "UNICAST(timeout=4000):"
053: + "pbcast.STABLE(desired_avg_gossip=10000):"
054: + "FRAG(frag_size="
055: + frag_size
056: + "):"
057: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
058: + "shun=false;print_local_addr=true)";
059: }
060:
061: public void start() throws Exception {
062: Vector mbrs;
063: Object obj;
064: View view;
065: Address receiver = null;
066:
067: channel = new JChannel(props);
068: // if(mode == 1) channel.setOpt(Channel.LOCAL, Boolean.FALSE);
069: channel.connect(groupname);
070: local_addr = channel.getLocalAddress();
071:
072: if (mode == 1) {
073:
074: // waiting for receiver
075: mbrs = channel.getView().getMembers();
076: if (mbrs.size() < 2) {
077: // loop until a new member has joined
078: System.out
079: .println("Waiting for receiver to join group");
080: while (true) {
081: obj = channel.receive(0);
082: if (obj instanceof View) {
083: view = (View) obj;
084: mbrs = view.getMembers();
085: if (mbrs.size() >= 2)
086: break;
087: }
088: }
089: }
090: for (int i = 0; i < mbrs.size(); i++) {
091: if (mbrs.elementAt(i).equals(local_addr))
092: continue;
093: else {
094: receiver = (Address) mbrs.elementAt(i);
095: break;
096: }
097: }
098: System.out
099: .println("Receiver joined group: I'm starting to send messages to "
100: + receiver);
101:
102: for (int j = 0; j < num_msgs; j++) {
103: msg = createBigMessage(MSG_SIZE);
104: msg.setDest(receiver);
105: System.out.println("Sending msg (" + MSG_SIZE
106: + " bytes) to " + receiver);
107: channel.send(msg);
108: System.out.println("Done Sending msg (" + MSG_SIZE
109: + " bytes)");
110: Util.sleep(timeout);
111: }
112: System.out.println("Press [return] to exit");
113: System.in.read();
114: } else {
115: System.out.println("Waiting for messages:");
116:
117: while (true) {
118: try {
119: obj = channel.receive(0);
120: if (obj instanceof Message) {
121: System.out.println("Received message: " + obj);
122: Message tmp = (Message) obj;
123: byte[] buf = tmp.getBuffer();
124: for (int i = 0; i < (10 < MSG_SIZE ? 10
125: : MSG_SIZE); i++) {
126: System.out.print((char) buf[i]);
127: }
128: System.out.println();
129:
130: }
131: } catch (Exception e) {
132: System.err.println(e);
133: }
134: }
135:
136: }
137:
138: channel.close();
139: }
140:
141: Message createBigMessage(int size) {
142: byte[] buf = new byte[size];
143: for (int i = 0; i < buf.length; i++)
144: buf[i] = (byte) sendingChar;
145: return new Message(null, null, buf);
146: }
147:
148: public static void main(String[] args) {
149: char defaultChar = 'A';
150: int default_mode = 0; // receiver
151: int MSG_SIZE = 30000;
152: int num_msgs = 5;
153: long timeout = 1000;
154: int frag_size = 8195;
155:
156: for (int i = 0; i < args.length; i++) {
157: if ("-help".equals(args[i])) {
158: usage();
159: return;
160: }
161: if ("-sender".equals(args[i])) {
162: default_mode = 1;
163: continue;
164: }
165: if ("-size".equals(args[i])) {
166: MSG_SIZE = Integer.parseInt(args[++i]);
167: continue;
168: }
169: if ("-num_msgs".equals(args[i])) {
170: num_msgs = Integer.parseInt(args[++i]);
171: continue;
172: }
173: if ("-frag_size".equals(args[i])) {
174: frag_size = Integer.parseInt(args[++i]);
175: continue;
176: }
177: if ("-timeout".equals(args[i])) {
178: timeout = Long.parseLong(args[++i]);
179: continue;
180: }
181: if ("-char".equals(args[i])) {
182: defaultChar = args[++i].charAt(0);
183: continue;
184: }
185: usage();
186: return;
187: }
188:
189: try {
190: new FragTestUnicast(defaultChar, default_mode, MSG_SIZE,
191: num_msgs, timeout, frag_size).start();
192: } catch (Exception e) {
193: System.err.println(e);
194: }
195: }
196:
197: static void usage() {
198: System.out
199: .println("FragTestUnicast [-sender] [-size <message size (in bytes)>] [-timeout <msecs>]"
200: + " [-num_msgs <number of messages>] [-char <frag character>] "
201: + "[-frag_size <fragmentation size>] [-help]");
202: }
203:
204: }
|