01: // $Id: MessageDispatcherTest.java,v 1.14 2005/08/29 12:19:11 belaban Exp $
02:
03: package org.jgroups.tests;
04:
05: import org.jgroups.Channel;
06: import org.jgroups.JChannel;
07: import org.jgroups.Message;
08: import org.jgroups.blocks.GroupRequest;
09: import org.jgroups.blocks.MessageDispatcher;
10: import org.jgroups.blocks.RequestHandler;
11: import org.jgroups.util.RspList;
12: import org.jgroups.util.Util;
13:
14: /**
15: * Example for MessageDispatcher (see also RpcDispatcherTest). Message is periodically broadcast to all
16: * members; handle() method is invoked whenever a message is received.
17: */
18: public class MessageDispatcherTest implements RequestHandler {
19: Channel channel;
20: MessageDispatcher disp;
21: RspList rsp_list;
22: String props = null;
23:
24: public void start() throws Exception {
25: channel = new JChannel(props);
26: //channel.setOpt(Channel.LOCAL, Boolean.FALSE);
27: // disp=new MessageDispatcher(channel, null, null, this);
28: disp = new MessageDispatcher(channel, null, null, this , false, // deadlock detection is disabled
29: true); // concurrent processing is enabled
30: channel.connect("MessageDispatcherTestGroup");
31:
32: // for(int i=0; i < 10; i++) {
33: // //Util.sleep(1000);
34: // System.out.println("Casting message #" +i);
35: // rsp_list=disp.castMessage(null,
36: // new Message(null, null, new String("Number #" +i).getBytes()),
37: // GroupRequest.GET_ALL, 0);
38: // System.out.println("Responses:\n" + rsp_list);
39: // }
40:
41: MyThread t1 = new MyThread("one"), t2 = new MyThread("two");
42: t1.start();
43: t2.start();
44: t1.join();
45: t2.join();
46:
47: System.out.println("** Disconnecting channel");
48: channel.disconnect();
49: System.out.println("** Disconnecting channel -- done");
50:
51: System.out.println("** Closing channel");
52: channel.close();
53: System.out.println("** Closing channel -- done");
54:
55: System.out.println("** disp.stop()");
56: disp.stop();
57: System.out.println("** disp.stop() -- done");
58:
59: //Util.printThreads();
60: //Util.sleep(2000);
61: //Util.printThreads();
62: }
63:
64: class MyThread extends Thread {
65: public MyThread(String name) {
66: setName(name);
67: }
68:
69: public void run() {
70: for (int i = 0; i < 10; i++) {
71: System.out.println('[' + getName()
72: + "] casting message #" + i);
73: rsp_list = disp.castMessage(null, new Message(null,
74: null, '[' + getName() + "] number #" + i),
75: GroupRequest.GET_ALL, 0);
76: System.out.println('[' + getName() + "] responses:\n"
77: + rsp_list);
78: }
79: }
80: }
81:
82: public Object handle(Message msg) {
83: System.out.println("handle(): " + msg.getObject());
84: Util.sleepRandom(1000);
85: return "Success !";
86: }
87:
88: public static void main(String[] args) {
89:
90: try {
91: new MessageDispatcherTest().start();
92: } catch (Exception e) {
93: System.err.println(e);
94: }
95: }
96: }
|