001: // $Id: RpcDispatcherBlocking.java,v 1.8 2005/05/30 16:15:12 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.*;
006: import org.jgroups.blocks.GroupRequest;
007: import org.jgroups.blocks.RpcDispatcher;
008: import org.jgroups.util.RspList;
009: import org.jgroups.util.Util;
010:
011: /**
012: * Tests synchronous group RPCs. 2 main test cases:
013: * <ol>
014: * <li>Member crashes during invocation of sync group RPC: start 3 instances (A,
015: * B,C), set the timeout to be 30 seconds. Then invoke a sync group RPC by A
016: * (press 's' in A's window). A,B and C should receive the RPC. Now kill C.
017: * After some time, A's method call should return and show A's and B's reply to
018: * be valid, while showing C's response marked as suspected.
019: * <li>Member joins group during synchronous group RPC: start A and B with
020: * timeout=30000. Invoke a sync group RPC on A. Start C. A and B should
021: * <em>not</em> receive the view change <em>before</em> the group RPC has
022: * returned with A's and B's results. Therefore A and B should <em>not</em> wait
023: * for C's response, which would never be received because C never got the RPC
024: * in the first place. This would block A forever.
025: * </ol>
026: *
027: * @author bela Dec 19, 2002
028: */
029: public class RpcDispatcherBlocking implements MembershipListener {
030: RpcDispatcher disp;
031: Channel channel;
032: long timeout = 30000;
033: String props = null;
034: int i = 0;
035:
036: public RpcDispatcherBlocking(String props, long timeout) {
037: this .props = props;
038: this .timeout = timeout;
039: }
040:
041: public void print(int i) throws Exception {
042: System.out.println("<-- " + i + " [sleeping for " + timeout
043: + " msecs");
044: Util.sleep(timeout);
045: }
046:
047: public void viewAccepted(View new_view) {
048: System.out.println("new view: " + new_view);
049: }
050:
051: /** Called when a member is suspected */
052: public void suspect(Address suspected_mbr) {
053: System.out.println(suspected_mbr + " is suspected");
054: }
055:
056: /** Block sending and receiving of messages until viewAccepted() is called */
057: public void block() {
058:
059: }
060:
061: public void start() throws Exception {
062: int c;
063: RspList rsps;
064:
065: channel = new JChannel(); // default props
066: disp = new RpcDispatcher(channel, null, this , this );
067: channel.connect("rpc-test");
068:
069: while (true) {
070: System.out.println("[x]: exit [s]: send sync group RPC");
071: System.out.flush();
072: c = System.in.read();
073: switch (c) {
074: case 'x':
075: channel.close();
076: disp.stop();
077: return;
078: case 's':
079: rsps = sendGroupRpc();
080: System.out.println("responses:\n" + rsps);
081: break;
082: }
083:
084: System.in.skip(System.in.available());
085: }
086: }
087:
088: RspList sendGroupRpc() throws Exception {
089: return disp.callRemoteMethods(null, "print",
090: new Object[] { new Integer(i++) },
091: new Class[] { int.class }, GroupRequest.GET_ALL, 0);
092: }
093:
094: public static void main(String[] args) {
095: long timeout = 30000;
096: String props = null;
097:
098: for (int i = 0; i < args.length; i++) {
099: if ("-props".equals(args[i])) {
100: props = args[++i];
101: continue;
102: }
103: if ("-timeout".equals(args[i])) {
104: timeout = Long.parseLong(args[++i]);
105: continue;
106: }
107: help();
108: return;
109: }
110:
111: try {
112: new RpcDispatcherBlocking(props, timeout).start();
113: } catch (Exception ex) {
114: System.err.println(ex);
115: }
116: }
117:
118: static void help() {
119: System.out
120: .println("RpcDispatcherBlocking [-help] [-props <properties>] [-timeout <timeout>]");
121: }
122: }
|