001: // $Id: NotificationBusDemo.java,v 1.7 2006/05/25 12:10:19 belaban Exp $
002:
003: package org.jgroups.demos;
004:
005: import org.jgroups.Address;
006: import org.jgroups.blocks.NotificationBus;
007: import org.apache.commons.logging.Log;
008: import org.apache.commons.logging.LogFactory;
009:
010: import java.io.BufferedReader;
011: import java.io.InputStreamReader;
012: import java.io.Serializable;
013: import java.util.Vector;
014:
015: /**
016: * Demoes the NotificationBus (without caching). Start a number of members and type in messages. All members will
017: * receive the messages. View changes will also be displayed (e.g. member joined, left).
018: * @author Bela Ban
019: */
020: public class NotificationBusDemo implements NotificationBus.Consumer {
021: NotificationBus bus = null;
022: BufferedReader in = null;
023: String line;
024: final long timeout = 0;
025: final Vector cache = null;
026: Log log = LogFactory.getLog(getClass());
027:
028: public void start(String bus_name, String props) {
029: try {
030:
031: bus = new NotificationBus(bus_name, props);
032: bus.setConsumer(this );
033: bus.start();
034: //System.out.println("Getting the cache from coordinator:");
035: //cache=(Vector)bus.getCacheFromCoordinator(3000, 3);
036: //if(cache == null) cache=new Vector();
037: //System.out.println("cache is " + cache);
038:
039: in = new BufferedReader(new InputStreamReader(System.in));
040: while (true) {
041: try {
042: System.out.print("> ");
043: System.out.flush();
044: line = in.readLine();
045: if (line.startsWith("quit")
046: || line.startsWith("exit")) {
047: bus.stop();
048: bus = null;
049: break;
050: }
051: bus.sendNotification(line);
052: } catch (Exception e) {
053: log.error(e);
054: }
055: }
056: } catch (Exception ex) {
057: log.error(ex);
058: } finally {
059: if (bus != null)
060: bus.stop();
061: }
062: }
063:
064: public void handleNotification(Serializable n) {
065: System.out.println("** Received notification: " + n);
066: //if(cache != null)
067: // cache.addElement(n);
068: //System.out.println("cache is " + cache);
069: }
070:
071: public Serializable getCache() {
072: // return cache;
073: return null;
074: }
075:
076: public void memberJoined(Address mbr) {
077: System.out.println("** Member joined: " + mbr);
078: }
079:
080: public void memberLeft(Address mbr) {
081: System.out.println("** Member left: " + mbr);
082: }
083:
084: public static void main(String[] args) {
085: String name = "BusDemo";
086: String props = "UDP(mcast_addr=224.0.0.35;mcast_port=45566;ip_ttl=32;"
087: + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"
088: + "PING(timeout=2000;num_initial_members=3):"
089: + "MERGE2(min_interval=5000;max_interval=10000):"
090: + "FD_SOCK:"
091: + "VERIFY_SUSPECT(timeout=1500):"
092: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"
093: + "UNICAST(timeout=5000):"
094: + "pbcast.STABLE(desired_avg_gossip=20000):"
095: + "FRAG(frag_size=8096;down_thread=false;up_thread=false):"
096: +
097: // "CAUSAL:" +
098: "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
099: + "shun=false;print_local_addr=true)";
100:
101: for (int i = 0; i < args.length; i++) {
102: if ("-bus_name".equals(args[i])) {
103: name = args[++i];
104: continue;
105: }
106: if ("-props".equals(args[i])) {
107: props = args[++i];
108: continue;
109: }
110: System.out
111: .println("NotificationBusDemo [-help] [-bus_name <name>] "
112: + "[-props <properties>]");
113: return;
114: }
115: System.out
116: .println("Starting NotificationBus with name " + name);
117: new NotificationBusDemo().start(name, props);
118: }
119:
120: }
|