001: // $Id: DiscardTest.java,v 1.9 2006/09/22 12:30:45 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import junit.framework.Test;
006: import junit.framework.TestCase;
007: import junit.framework.TestSuite;
008: import org.jgroups.*;
009: import org.jgroups.util.Promise;
010: import org.jgroups.util.Util;
011:
012: /**
013: * Tests the NAKACK (retransmission) and STABLE (garbage collection) protocols
014: * by discarding 10% of all network-bound messages
015: * @author Bela Ban
016: * @version $Id: DiscardTest.java,v 1.9 2006/09/22 12:30:45 belaban Exp $
017: */
018: public class DiscardTest extends TestCase {
019: JChannel ch1, ch2;
020:
021: final String discard_props = "discard.xml"; // located in JGroups/conf, needs to be in the classpath
022: final String fast_props = "udp.xml"; // located in JGroups/conf, needs to be in the classpath
023: final long NUM_MSGS = 10000;
024: final int MSG_SIZE = 1000;
025: private static final String GROUP = "DiscardTestGroup";
026: final Promise ch1_all_received = new Promise();
027: final Promise ch2_all_received = new Promise();
028:
029: public DiscardTest(String name) {
030: super (name);
031: }
032:
033: protected void setUp() throws Exception {
034: super .setUp();
035: ch1_all_received.reset();
036: ch2_all_received.reset();
037: }
038:
039: public void testDiscardProperties() throws Exception {
040: _testLosslessReception(discard_props);
041: }
042:
043: public void testFastProperties() throws Exception {
044: _testLosslessReception(fast_props);
045: }
046:
047: public void _testLosslessReception(String props) throws Exception {
048: Address ch1_addr, ch2_addr;
049: long start, stop;
050:
051: System.setProperty("bind.address", "127.0.0.1");
052:
053: ch1 = new JChannel(props);
054: ch1.setReceiver(new MyReceiver(ch1_all_received, NUM_MSGS,
055: "ch1"));
056: ch2 = new JChannel(props);
057: ch2.setReceiver(new MyReceiver(ch2_all_received, NUM_MSGS,
058: "ch2"));
059:
060: ch1.connect(GROUP);
061: ch1_addr = ch1.getLocalAddress();
062: ch2.connect(GROUP);
063: ch2_addr = ch2.getLocalAddress();
064:
065: Util.sleep(2000);
066: View v = ch2.getView();
067: System.out.println("**** ch2's view: " + v);
068: assertEquals(2, v.size());
069: assertTrue(v.getMembers().contains(ch1_addr));
070: assertTrue(v.getMembers().contains(ch2_addr));
071:
072: System.out.println("sending " + NUM_MSGS
073: + " 1K messages to all members (including myself)");
074: start = System.currentTimeMillis();
075: for (int i = 0; i < NUM_MSGS; i++) {
076: Message msg = createMessage(MSG_SIZE);
077: ch1.send(msg);
078: if (i % 1000 == 0)
079: System.out.println("-- sent " + i + " messages");
080: }
081:
082: System.out.println("-- waiting for ch1 and ch2 to receive "
083: + NUM_MSGS + " messages");
084: Long num_msgs;
085: num_msgs = (Long) ch1_all_received.getResult();
086: System.out.println("-- received " + num_msgs
087: + " messages on ch1");
088:
089: num_msgs = (Long) ch2_all_received.getResult();
090: stop = System.currentTimeMillis();
091: System.out.println("-- received " + num_msgs
092: + " messages on ch2");
093:
094: long diff = stop - start;
095: double msgs_sec = NUM_MSGS / (diff / 1000.0);
096: System.out.println("== Sent and received " + NUM_MSGS + " in "
097: + diff + "ms, " + msgs_sec + " msgs/sec");
098:
099: ch2.close();
100: ch1.close();
101: }
102:
103: class MyReceiver extends ReceiverAdapter {
104: final Promise p;
105: final long num_msgs_expected;
106: long num_msgs = 0;
107: String channel_name;
108: boolean operational = true;
109:
110: public MyReceiver(final Promise p,
111: final long num_msgs_expected, String channel_name) {
112: this .p = p;
113: this .num_msgs_expected = num_msgs_expected;
114: this .channel_name = channel_name;
115: }
116:
117: public void receive(Message msg) {
118: if (!operational)
119: return;
120: num_msgs++;
121:
122: if (num_msgs > 0 && num_msgs % 1000 == 0)
123: System.out.println("-- received " + num_msgs + " on "
124: + channel_name);
125:
126: if (num_msgs >= num_msgs_expected) {
127: System.out.println("SUCCESS: received all "
128: + num_msgs_expected + " messages on "
129: + channel_name);
130: operational = false;
131: p.setResult(new Long(num_msgs));
132: }
133: }
134:
135: public void viewAccepted(View new_view) {
136: System.out.println("-- view (" + channel_name + "): "
137: + new_view);
138: }
139: }
140:
141: private Message createMessage(int size) {
142: byte[] buf = new byte[size];
143: for (int i = 0; i < buf.length; i++)
144: buf[i] = (byte) 'x';
145: return new Message(null, null, buf);
146: }
147:
148: public static Test suite() {
149: return new TestSuite(DiscardTest.class);
150: }
151:
152: public static void main(String[] args) {
153: junit.textui.TestRunner.run(suite());
154: }
155:
156: }
|