001: // $Id: UnicastTest2.java,v 1.7 2005/05/30 16:15:12 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.*;
006:
007: import java.io.Serializable;
008: import java.util.Enumeration;
009: import java.util.Hashtable;
010: import java.util.Vector;
011:
012: /**
013: * Demos the UNICAST protocol. As soon as we have 2 members in a group, a destination member is randomly
014: * chosen (not self !). Then, as long as that destination member is member of the group, we send NUM_MSGS
015: * unicast messages to it. The receiver checks that it receives messages in order (monotonically increasing).<p>
016: * The sample protocol stack below has a DISCARD protocol in it, which randomly discards
017: * both unicast and multicast messages (in the example below, down messages are discarded with a probability
018: * of 10%, i.e. 1 out of 10 messages is discarded)).<p>
019: * If you want to see the informational messages for DISCARD and UNICAST, you have to enable them in trace, e.g.
020: * by adding the following statements to your jgroups.properties file (in your home directory):
021: * <pre>
022: * trace1=DISCARD DEBUG STDOUT
023: * trace2=UNICAST DEBUG STDOUT
024: * </pre>
025: * @author Bela Ban
026: */
027: public class UnicastTest2 implements Runnable {
028: Channel channel;
029: String groupname = "UnicastTest2Group";
030: String props = "UDP:PING:FD:DISCARD(down=0.1):NAKACK(retransmit_timeout=1000):"
031: + "UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:QUEUE";
032: Thread writer = null;
033: Vector mbrs = new Vector();
034: Hashtable senders = new Hashtable();
035: boolean running = true;
036: final int NUM_MSGS = 100;
037:
038: public void start() throws Exception {
039: Object obj;
040: Message msg;
041: View view;
042: Vector tmp;
043: UnicastTest2Info info, myinfo;
044: Object sender;
045:
046: channel = new JChannel(props);
047: channel.connect(groupname);
048: System.out.println("[ready]");
049:
050: while (true) {
051: try {
052: obj = channel.receive(0);
053: if (obj instanceof View) {
054: view = (View) obj;
055: tmp = view.getMembers();
056: mbrs.removeAllElements();
057: for (int i = 0; i < tmp.size(); i++)
058: mbrs.addElement(tmp.elementAt(i));
059:
060: for (Enumeration e = senders.keys(); e
061: .hasMoreElements();) {
062: sender = e.nextElement();
063: if (!mbrs.contains(sender)) {
064: mbrs.removeElement(sender);
065: }
066: }
067:
068: if (mbrs.size() > 1) {
069: if (writer == null) {
070: writer = new Thread(this , "WriterThread");
071: writer.start();
072: }
073: } else {
074: if (writer != null) {
075: running = false;
076: writer.interrupt();
077: }
078: writer = null;
079: }
080: } else if (obj instanceof Message) {
081: msg = (Message) obj;
082: info = (UnicastTest2Info) msg.getObject();
083: System.out.println("Received msg: " + info);
084:
085: myinfo = (UnicastTest2Info) senders
086: .get(info.sender);
087: if (myinfo == null) { // first msg
088: if (info.msgno == 1) {
089: // must be 1
090: senders.put(info.sender, info);
091: } else {
092: // error
093: System.err
094: .println("UnicastTest2.start(): first seqno must be 1");
095: }
096:
097: } else {
098: if (info.msgno - 1 != myinfo.msgno) {
099: System.err
100: .println("UnicastTest2.start(): received msg "
101: + info.sender
102: + ':'
103: + info.msgno
104: + ", but last received was "
105: + myinfo.sender
106: + ':'
107: + myinfo.msgno);
108: } else {
109: System.out
110: .println("UnicastTest2.start(): OK received "
111: + info.sender
112: + ':'
113: + info.msgno
114: + ", prev seqno="
115: + myinfo.sender
116: + ':'
117: + myinfo.msgno);
118: myinfo.msgno++;
119: }
120: }
121:
122: } else
123: ;
124: } catch (ChannelClosedException closed) {
125: System.err.println("Channel closed");
126: break;
127: } catch (ChannelNotConnectedException not_conn) {
128: System.err.println("Channel not connected");
129: break;
130: } catch (Exception e) {
131: System.err.println(e);
132: }
133: }
134:
135: }
136:
137: Address selectTarget() {
138: Vector tmp = new Vector();
139: Address ret;
140: int t;
141:
142: if (mbrs == null || mbrs.size() < 2)
143: return null;
144:
145: for (int i = 0; i < mbrs.size(); i++) {
146: if (!(mbrs.elementAt(i).equals(channel.getLocalAddress())))
147: tmp.addElement(mbrs.elementAt(i));
148: }
149: t = (int) ((Math.random() * 100));
150: ret = (Address) tmp.elementAt(t % tmp.size());
151: return ret;
152: }
153:
154: public void run() {
155: Address target = selectTarget();
156: UnicastTest2Info info = null;
157: int msgno = 1;
158:
159: if (target == null)
160: return;
161:
162: while (running && msgno <= NUM_MSGS) {
163: try {
164: info = new UnicastTest2Info(msgno++, channel
165: .getLocalAddress());
166: System.out.println("Sending message #" + (msgno - 1)
167: + " to " + target);
168: channel.send(new Message(target, null, info));
169: Thread.sleep(500);
170: } catch (ChannelClosedException closed) {
171: System.err.println(closed);
172: break;
173: } catch (ChannelNotConnectedException not_conn) {
174: System.err.println(not_conn);
175: } catch (Exception e) {
176: System.err.println(e);
177: }
178: }
179: System.out
180: .println("UnicastTest2Info.run(): writer thread terminated");
181: }
182:
183: public static void main(String[] args) {
184: try {
185:
186: new UnicastTest2().start();
187: } catch (Exception e) {
188: System.err.println(e);
189: }
190: }
191:
192: private static class UnicastTest2Info implements Serializable {
193: int msgno = 0;
194: Object sender = null;
195:
196: public UnicastTest2Info() {
197:
198: }
199:
200: public UnicastTest2Info(int msgno, Object sender) {
201: this .msgno = msgno;
202: this .sender = sender;
203: }
204:
205: public String toString() {
206: return "#" + msgno + " (sender=" + sender + ')';
207: }
208: }
209:
210: }
|