001: // $Id: CausalDemo.java,v 1.6 2005/05/30 16:14:40 belaban Exp $
002: package org.jgroups.demos;
003:
004: import org.jgroups.*;
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007:
008: import java.io.Serializable;
009: import java.util.Random;
010: import java.util.Vector;
011:
012: /**
013: * Simple causal demo where each member bcast a consecutive letter from the
014: * alphabet and picks the next member to transmit the next letter. Start a
015: * few instances of CausalDemo and pass a paramter "-start" to a CausalDemo
016: * that initiates transmission of a letter A. All participanting members should
017: * have correct alphabet. DISCARD layer has been added to simulate lost messages,
018: * thus forcing delaying of delivery of a certain alphabet letter until the causally
019: * prior one has been received. Remove CAUSAL from the stack and witness how FIFO
020: * alone doesn't provide this guarantee.
021: *
022: * @author Vladimir Blagojevic
023: */
024: public class CausalDemo implements Runnable {
025: private Channel channel;
026: private final Vector alphabet = new Vector();
027: private boolean starter = false;
028: private int doneCount = 0;
029: private Log log = LogFactory.getLog(getClass());
030:
031: private final String props = "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;"
032: + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"
033: + "PING(timeout=2000;num_initial_members=5):"
034: + "DISCARD(up=0.05;excludeitself=true):"
035: + "FD_SOCK:"
036: + "VERIFY_SUSPECT(timeout=1500):"
037: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800,9600):"
038: + "UNICAST(timeout=5000):"
039: + "pbcast.STABLE(desired_avg_gossip=2000):"
040: + "FRAG(frag_size=4096;down_thread=false;up_thread=false):"
041: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
042: + "shun=false;print_local_addr=true):CAUSAL";
043:
044: public CausalDemo(boolean start) {
045: starter = start;
046: }
047:
048: public String getNext(String c) {
049: char letter = c.charAt(0);
050: return new String(new char[] { ++letter });
051: }
052:
053: public void listAlphabet() {
054: System.out.println(alphabet);
055: }
056:
057: public void run() {
058: Object obj;
059: Message msg;
060: Random r = new Random();
061:
062: try {
063: channel = new JChannel(props);
064: channel.connect("CausalGroup");
065: System.out.println("View:" + channel.getView());
066: if (starter)
067: channel.send(new Message(null, null, new CausalMessage(
068: "A", (Address) channel.getView().getMembers()
069: .get(0))));
070:
071: } catch (Exception e) {
072: System.out.println("Could not conec to channel");
073: }
074:
075: try {
076: Runtime.getRuntime().addShutdownHook(
077: new Thread("Shutdown cleanup thread") {
078: public void run() {
079:
080: listAlphabet();
081: channel.disconnect();
082: channel.close();
083: }
084: });
085: } catch (Exception e) {
086: System.out.println("Exception while shutting down" + e);
087: }
088:
089: while (true) {
090: try {
091: CausalMessage cm = null;
092: obj = channel.receive(0); // no timeout
093: if (obj instanceof Message) {
094: msg = (Message) obj;
095: cm = (CausalMessage) msg.getObject();
096: Vector members = channel.getView().getMembers();
097: String receivedLetter = cm.message;
098:
099: if ("Z".equals(receivedLetter)) {
100: channel.send(new Message(null, null,
101: new CausalMessage("done", null)));
102: }
103: if ("done".equals(receivedLetter)) {
104: if (++doneCount >= members.size()) {
105: System.exit(0);
106: }
107: continue;
108: }
109:
110: alphabet.add(receivedLetter);
111: listAlphabet();
112:
113: //am I chosen to transmit next letter?
114: if (cm.member.equals(channel.getLocalAddress())) {
115: int nextTarget = r.nextInt(members.size());
116:
117: //chose someone other than yourself
118: while (nextTarget == members.indexOf(channel
119: .getLocalAddress())) {
120: nextTarget = r.nextInt(members.size());
121: }
122: Address next = (Address) members
123: .get(nextTarget);
124: String nextChar = getNext(receivedLetter);
125: if (nextChar.compareTo("Z") < 1) {
126: System.out.println("Sending " + nextChar);
127: channel.send(new Message(null, null,
128: new CausalMessage(nextChar, next)));
129: }
130: }
131: }
132: } catch (ChannelNotConnectedException conn) {
133: break;
134: } catch (Exception e) {
135: log.error(e);
136: }
137: }
138:
139: }
140:
141: public static void main(String args[]) {
142: CausalDemo test = null;
143: boolean start = false;
144:
145: for (int i = 0; i < args.length; i++) {
146: if ("-help".equals(args[i])) {
147: System.out.println("CausalDemo [-help] [-start]");
148: return;
149: }
150: if ("-start".equals(args[i])) {
151: start = true;
152: continue;
153: }
154: }
155:
156: //if parameter start is passed , start the demo
157: test = new CausalDemo(start);
158: try {
159: new Thread(test).start();
160: } catch (Exception e) {
161: System.err.println(e);
162: }
163:
164: }
165:
166: }
167:
168: class CausalMessage implements Serializable {
169: public final String message;
170: public final Address member;
171:
172: public CausalMessage(String message, Address member) {
173: this .message = message;
174: this .member = member;
175: }
176:
177: public String toString() {
178: return "CausalMessage[" + message + '=' + message + "member="
179: + member + ']';
180: }
181:
182: }
|