001: package org.jgroups.tests;
002:
003: import org.jgroups.*;
004: import org.jgroups.blocks.GroupRequest;
005: import org.jgroups.blocks.RpcDispatcher;
006:
007: /**
008: * Tests the "NAKACK retransmit message lost" problem. Start 2 members, then a third one, and you should never see
009: * the problem with messages than cannot be retransmitted because they were already garbage-collected.
010: * @author Bela Ban Apr 4, 2004
011: * @version $Id: NAKACKTest2.java,v 1.4 2004/07/05 14:15:22 belaban Exp $
012: */
013: public class NAKACKTest2 {
014: Channel ch;
015: Address local_addr;
016: Receiver receiver;
017: RpcDispatcher disp;
018:
019: class Receiver extends Thread {
020: public void run() {
021: Object obj;
022: Message msg;
023: boolean running = true;
024: while (running) {
025: try {
026: obj = ch.receive(0);
027: if (obj instanceof Message) {
028: msg = (Message) obj;
029: System.out.println(msg.getSrc() + "::"
030: + msg.getObject());
031: } else
032: System.out.println("received " + obj);
033: } catch (ChannelNotConnectedException e) {
034: running = false;
035: } catch (ChannelClosedException e) {
036: running = false;
037: } catch (TimeoutException e) {
038: ;
039: }
040: }
041: }
042: }
043:
044: public void receive(Address sender, Long i) {
045: System.out.println(sender + "::" + i);
046: }
047:
048: void start(String props, boolean use_rpc) throws Exception {
049: long i = 0;
050: Message msg;
051: ch = new JChannel(props);
052: if (use_rpc)
053: disp = new RpcDispatcher(ch, null, null, this );
054: ch.connect("NAKACKTest");
055: local_addr = ch.getLocalAddress();
056: if (use_rpc == false) {
057: receiver = new Receiver();
058: receiver.start();
059: }
060: while (true) {
061: // for(int j=0; j < 10000; j++) {
062: if (use_rpc) {
063: disp.callRemoteMethods(null, "receive", new Object[] {
064: local_addr, new Long(i++) }, new Class[] {
065: Address.class, Long.class },
066: GroupRequest.GET_ALL, 10000);
067: } else {
068: msg = new Message(null, null, new Long(i++));
069: ch.send(msg);
070: }
071: //Util.sleep(1);
072: }
073:
074: // Util.sleep(3600000);
075: }
076:
077: public static void main(String[] args) {
078: String props = null;
079: boolean use_rpc = false;
080:
081: for (int i = 0; i < args.length; i++) {
082: if ("-props".equals(args[i])) {
083: props = args[++i];
084: continue;
085: }
086: if ("-use_rpc".equals(args[i])) {
087: use_rpc = true;
088: continue;
089: }
090: help();
091: return;
092: }
093:
094: try {
095: new NAKACKTest2().start(props, use_rpc);
096: } catch (Exception e) {
097: e.printStackTrace();
098: }
099: }
100:
101: private static void help() {
102: System.out
103: .println("NAKACKTest [-help] [-props properties] [-use_rpc]");
104: }
105: }
|