001: // $Id: GroupRequestPull.java,v 1.11 2006/08/28 06:51:54 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.*;
006: import org.jgroups.blocks.GroupRequest;
007: import org.jgroups.blocks.PullPushAdapter;
008: import org.jgroups.util.RspList;
009: import org.jgroups.util.Util;
010:
011: import java.io.IOException;
012: import java.io.ObjectInput;
013: import java.io.ObjectOutput;
014: import java.util.Vector;
015:
016: /**
017: *
018: * @author Bela Ban
019: */
020: public class GroupRequestPull implements MessageListener,
021: MembershipListener, Transport {
022: PullPushAdapter adapter = null;
023: Channel ch = null;
024: String props = null;
025: GroupRequest group_req = null;
026: static final String HDRNAME = "GroupRequestPullHeader";
027: Vector members = new Vector();
028:
029: GroupRequestPull(String props) {
030: this .props = props;
031: }
032:
033: void start() throws Throwable {
034: ch = new JChannel(props);
035: ch.connect("GroupRequestPull-Group");
036: adapter = new PullPushAdapter(ch, this , this );
037: loop();
038: adapter.stop();
039: ch.close();
040: }
041:
042: void loop() throws Throwable {
043: boolean looping = true;
044: int c;
045:
046: while (looping) {
047: System.out.println("Members are "
048: + ch.getView().getMembers()
049: + "\n<enter to send a new group request>");
050: System.out.flush();
051: c = System.in.read();
052: if (c == 'q')
053: looping = false;
054: System.in.skip(System.in.available());
055: sendGroupRequest();
056: }
057: }
058:
059: void sendGroupRequest() throws Throwable {
060: Message msg = new Message();
061: RspList lst;
062:
063: msg.putHeader(HDRNAME, new MyHeader(MyHeader.REQUEST));
064: group_req = new GroupRequest(msg, this , // as Transport
065: members, // all current members
066: GroupRequest.GET_ALL);
067:
068: group_req.execute();
069: lst = group_req.getResults();
070: System.out.println("-- received " + lst.size() + " results:");
071: for (int i = 0; i < lst.size(); i++) {
072: System.out.println(lst.elementAt(i));
073: }
074: System.out.println();
075: }
076:
077: /* --------------------------- Interface MessageListener -------------------------- */
078:
079: public void receive(Message msg) {
080: MyHeader hdr = (MyHeader) msg.removeHeader(HDRNAME);
081: Message rsp;
082:
083: if (hdr == null) {
084: System.err
085: .println("GroupRequestPull.receive(): header for "
086: + HDRNAME + " was null");
087: return;
088: }
089: if (hdr.type == MyHeader.RESPONSE) {
090: if (group_req != null) {
091: Address sender = msg.getSrc();
092: Object retval = null;
093: try {
094: retval = Util.objectFromByteBuffer(msg.getBuffer());
095: } catch (Exception e) {
096: e.printStackTrace();
097: }
098: group_req.receiveResponse(retval, sender);
099: }
100: } else if (hdr.type == MyHeader.REQUEST) {
101: // System.out.println("-- received REQUEST from " + msg.getSrc());
102: rsp = new Message(msg.getSrc());
103: rsp.putHeader(HDRNAME, new MyHeader(MyHeader.RESPONSE));
104: rsp.setObject("Hello from member " + ch.getLocalAddress());
105: try {
106: adapter.send(rsp);
107: } catch (Exception ex) {
108: System.err
109: .println("GroupRequestPull.receive(): failure sending response: "
110: + ex);
111: }
112: } else {
113: System.err
114: .println("GroupRequestPull.receive(): header type of "
115: + hdr.type + " not known");
116: }
117: }
118:
119: public byte[] getState() {
120: return null;
121: }
122:
123: public void setState(byte[] state) {
124: ;
125: }
126:
127: /* ----------------------- End of Interface MessageListener ------------------------ */
128:
129: /* --------------------------- Interface MembershipListener -------------------------- */
130:
131: public void viewAccepted(View new_view) {
132: System.out.println("** viewAccepted(): " + new_view);
133: if (new_view != null && new_view.getMembers().size() > 0) {
134: members.removeAllElements();
135: members.addAll(new_view.getMembers());
136: }
137: if (group_req != null)
138: group_req.viewChange(new_view);
139: }
140:
141: public void suspect(Address suspected_mbr) {
142: System.out.println("** suspect(): " + suspected_mbr);
143: if (group_req != null)
144: group_req.suspect(suspected_mbr);
145: }
146:
147: public void block() {
148:
149: }
150:
151: /* ----------------------- End of Interface MembershipListener ----------------------- */
152:
153: /* --------------------------- Interface Transport ------------------------------------ */
154: /** Used by GroupRequest to send messages */
155: public void send(Message msg) throws Exception {
156: if (adapter == null) {
157: System.err
158: .println("GroupRequestPull.send(): adapter is null, cannot send message");
159: } else
160: adapter.send(msg);
161: }
162:
163: public Object receive(long timeout) throws Exception {
164: return null;
165: }
166:
167: /* ------------------------ End of Interface Transport -------------------------------- */
168:
169: public static void main(String[] args) {
170: String props = null;
171:
172: for (int i = 0; i < args.length; i++) {
173: if ("-props".equals(args[i])) {
174: props = args[++i];
175: continue;
176: }
177: help();
178: return;
179: }
180:
181: try {
182: new GroupRequestPull(props).start();
183: } catch (Throwable ex) {
184: ex.printStackTrace();
185: }
186: }
187:
188: static void help() {
189: System.out
190: .println("GroupRequestPull [-help] [-props <properties>]");
191: }
192:
193: public static class MyHeader extends Header {
194: public static final int REQUEST = 1;
195: public static final int RESPONSE = 2;
196:
197: int type = 0;
198:
199: public MyHeader() {
200:
201: }
202:
203: public MyHeader(int type) {
204: this .type = type;
205: }
206:
207: public void writeExternal(ObjectOutput out) throws IOException {
208: out.writeInt(type);
209: }
210:
211: public void readExternal(ObjectInput in) throws IOException,
212: ClassNotFoundException {
213: type = in.readInt();
214: }
215: }
216:
217: }
|