001: // $Id: PullPushTestMux.java,v 1.7 2005/07/12 07:13:42 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.Channel;
006: import org.jgroups.JChannel;
007: import org.jgroups.Message;
008: import org.jgroups.MessageListener;
009: import org.jgroups.blocks.PullPushAdapter;
010:
011: /**
012: * Uses PullPush building block to send/receive messages. Reception is passive, e.g. the receiver's
013: * receive() method is invoked whenever a message is received. The receiver has to register a callback method
014: * when creating the channel. Uses multiple MessageListeners
015: *
016: * @author Bela Ban
017: */
018: public class PullPushTestMux implements MessageListener {
019: private Channel channel;
020: private PullPushAdapter adapter;
021: MyListener[] listeners = null;
022:
023: public PullPushTestMux() {
024: ;
025: }
026:
027: public void receive(Message msg) {
028: System.out.println("Main receiver: received msg: " + msg);
029: }
030:
031: public byte[] getState() { // only called if channel option GET_STATE_EVENTS is set to true
032: return null;
033: }
034:
035: public void setState(byte[] state) {
036:
037: }
038:
039: public void start() throws Exception {
040: int c;
041:
042: channel = new JChannel();
043: channel.connect("PullPushTestMux");
044: adapter = new PullPushAdapter(channel);
045: adapter.setListener(this );
046:
047: listeners = new MyListener[10];
048: for (int i = 0; i < listeners.length; i++) {
049: listeners[i] = new MyListener(i, adapter);
050: }
051:
052: while ((c = choice()) != 'q') {
053: c -= 48;
054: if (c < 0 || c > 9) {
055: System.err.println("Choose between 0 and 9");
056: continue;
057: }
058: if (c == 0)
059: adapter.send(new Message(null, null,
060: "Message from default message listener"));
061: else
062: listeners[c].sendMessage();
063: }
064:
065: channel.close();
066: System.exit(0);
067: }
068:
069: int choice() {
070: int c;
071: System.out
072: .println("\n[q]uit [0]: send message on default channel [1-9] send message on other channels:");
073: System.out.flush();
074: try {
075: c = System.in.read();
076: } catch (Exception ex) {
077: return -1;
078: } finally {
079: try {
080: System.in.skip(System.in.available());
081: } catch (Exception ex) {
082: }
083: }
084: return c;
085: }
086:
087: public static void main(String args[]) {
088: PullPushTestMux t = new PullPushTestMux();
089: try {
090: t.start();
091: } catch (Exception e) {
092: System.err.println(e);
093: }
094: }
095:
096: public class MyListener implements MessageListener {
097: Integer id = null;
098: PullPushAdapter ad = null;
099:
100: MyListener(int id, PullPushAdapter ad) {
101: this .id = new Integer(id);
102: this .ad = ad;
103: ad.registerListener(this .id, this );
104: }
105:
106: public void receive(Message msg) {
107: System.out.println("MyListener #" + id
108: + ": received message from " + msg.getSrc() + ": "
109: + msg.getObject());
110: }
111:
112: public byte[] getState() {
113: return null;
114: }
115:
116: public void setState(byte[] state) {
117: ;
118: }
119:
120: void sendMessage() throws Exception {
121: Message msg = new Message(null, null, "Message from " + id);
122: ad.send(id, msg);
123: }
124: }
125:
126: }
|