001: // $Id: MessageDispatcherTestAsync.java,v 1.11 2006/08/28 06:51:54 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.*;
006: import org.jgroups.blocks.MessageDispatcher;
007: import org.jgroups.blocks.RequestHandler;
008: import org.jgroups.blocks.RspCollector;
009: import org.jgroups.debug.Debugger;
010: import org.jgroups.util.RspList;
011: import org.jgroups.util.Util;
012:
013: import java.io.IOException;
014:
015: /**
016: * Asynchronous example for MessageDispatcher; message is mcast to all members, responses are received
017: * asynchronously by calling RspCollector.receiveResponse(). Message is periodically broadcast to all
018: * members; handle() method is invoked whenever a message is received.
019: *
020: * @author Bela Ban
021: */
022: public class MessageDispatcherTestAsync implements RequestHandler {
023: Channel channel;
024: MessageDispatcher disp;
025: RspList rsp_list;
026: MyCollector coll = new MyCollector();
027: Debugger debugger = null;
028: boolean debug = false;
029: boolean cummulative = false;
030: boolean done_submitted = true;
031: static final int NUM = 10;
032:
033: String props = "UDP(loopback=true;mcast_addr=224.0.0.35;mcast_port=45566;ip_ttl=32;"
034: + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"
035: + "PING(timeout=2000;num_initial_members=3):"
036: + "MERGE2(min_interval=10000;max_interval=20000):"
037: + "FD_SOCK:"
038: + "VERIFY_SUSPECT(timeout=1500):"
039: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,1200,2400,4800):"
040: + "UNICAST(timeout=5000):"
041: + "pbcast.STABLE(desired_avg_gossip=20000):"
042: + "FRAG(frag_size=8096;down_thread=false;up_thread=false):"
043: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
044: + "shun=false;print_local_addr=true)";
045:
046: static class MyCollector implements RspCollector {
047:
048: public void receiveResponse(Object retval, Address sender) {
049: System.out.println("** received response " + retval
050: + " [sender=" + sender + ']');
051: }
052:
053: public void suspect(Address mbr) {
054: System.out.println("** suspected member " + mbr);
055: }
056:
057: public void viewChange(View new_view) {
058: System.out.println("** received new view " + new_view);
059: }
060: }
061:
062: public MessageDispatcherTestAsync(boolean debug, boolean cummulative) {
063: this .debug = debug;
064: this .cummulative = cummulative;
065: }
066:
067: public void start() throws Exception {
068: channel = new JChannel(props);
069: if (debug) {
070: debugger = new Debugger((JChannel) channel, cummulative);
071: debugger.start();
072: }
073: //channel.setOpt(Channel.LOCAL, Boolean.FALSE);
074: disp = new MessageDispatcher(channel, null, null, this );
075: channel.connect("MessageDispatcherTestAsyncGroup");
076: }
077:
078: public void mcast(int num) throws IOException {
079: if (!done_submitted) {
080: System.err
081: .println("Must submit 'done' (press 'd') before mcasting new message");
082: return;
083: }
084: for (int i = 0; i < num; i++) {
085: Util.sleep(100);
086: System.out.println("Casting message #" + i);
087: disp.castMessage(null, i, new Message(null, null,
088: "Number #" + i), coll);
089: }
090: done_submitted = false;
091: }
092:
093: public void disconnect() {
094: System.out.println("** Disconnecting channel");
095: channel.disconnect();
096: System.out.println("** Disconnecting channel -- done");
097:
098: System.out.println("** Closing channel");
099: channel.close();
100: System.out.println("** Closing channel -- done");
101:
102: System.out.println("** disp.stop()");
103: disp.stop();
104: System.out.println("** disp.stop() -- done");
105: }
106:
107: public void done() {
108: for (int i = 0; i < NUM; i++)
109: disp.done(i);
110: done_submitted = true;
111: }
112:
113: public Object handle(Message msg) {
114: Object tmp = msg.getObject();
115: System.out.println("** handle(" + tmp + ')');
116: return tmp + ": success";
117: }
118:
119: public static void main(String[] args) {
120: int c;
121: MessageDispatcherTestAsync test = null;
122: boolean debug = false, cummulative = false;
123:
124: for (int i = 0; i < args.length; i++) {
125: if ("-help".equals(args[i])) {
126: help();
127: return;
128: }
129: if ("-debug".equals(args[i])) {
130: debug = true;
131: continue;
132: }
133: if ("-cummulative".equals(args[i])) {
134: cummulative = true;
135: }
136: }
137:
138: try {
139: test = new MessageDispatcherTestAsync(debug, cummulative);
140: test.start();
141: while (true) {
142: System.out.println("[m=mcast " + NUM + " msgs x=exit]");
143: c = System.in.read();
144: switch (c) {
145: case 'x':
146: test.disconnect();
147: System.exit(0);
148: return;
149: case 'm':
150: test.mcast(NUM);
151: break;
152: case 'd':
153: test.done();
154: break;
155: default:
156: break;
157: }
158:
159: }
160: } catch (Exception e) {
161: System.err.println(e);
162: }
163: }
164:
165: static void help() {
166: System.out
167: .println("MessageDispatcherTestAsync [-debug] [-cummulative]");
168: }
169:
170: }
|