001: // $Id: TUNNELDeadLockTest.java,v 1.10 2006/10/11 14:32:38 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import junit.framework.Test;
006: import junit.framework.TestCase;
007: import junit.framework.TestSuite;
008: import org.jgroups.JChannel;
009: import org.jgroups.Message;
010: import org.jgroups.TimeoutException;
011: import org.jgroups.tests.stack.Utilities;
012: import org.jgroups.util.Promise;
013:
014: /**
015: * Test designed to make sure the TUNNEL doesn't lock the client and the GossipRouter
016: * under heavy load.
017: *
018: * @author Ovidiu Feodorov <ovidiu@feodorov.com>
019: * @version $Revision: 1.10 $
020: * @see TUNNELDeadLockTest#testStress
021: */
022: public class TUNNELDeadLockTest extends TestCase {
023: private JChannel channel;
024: private Promise promise;
025: private int receivedCnt;
026:
027: // the total number of the messages pumped down the channel
028: private int msgCount = 20000;
029: // the message payload size (in bytes);
030: private int payloadSize = 32;
031: // the time (in ms) the main thread waits for all the messages to arrive,
032: // before declaring the test failed.
033: private int mainTimeout = 60000;
034:
035: int routerPort = -1;
036:
037: public TUNNELDeadLockTest(String name) {
038: super (name);
039: }
040:
041: public void setUp() throws Exception {
042: super .setUp();
043: promise = new Promise();
044: routerPort = Utilities.startGossipRouter("127.0.0.1");
045: }
046:
047: public void tearDown() throws Exception {
048:
049: super .tearDown();
050:
051: // I prefer to close down the channel inside the test itself, for the
052: // reason that the channel might be brought in an uncloseable state by
053: // the test.
054:
055: // TO_DO: no elegant way to stop the Router threads and clean-up
056: // resources. Use the Router administrative interface, when
057: // available.
058:
059: channel = null;
060: promise.reset();
061: promise = null;
062: Utilities.stopGossipRouter();
063: }
064:
065: private String getTUNNELProps(int routerPort) {
066: String props;
067:
068: props = "TUNNEL(router_host=127.0.0.1;router_port="
069: + routerPort
070: + "):"
071: + "PING(timeout=3000;gossip_refresh=10000;num_initial_members=3;"
072: + "gossip_host=127.0.0.1;gossip_port="
073: + routerPort
074: + "):"
075: + "FD_SOCK:"
076: + "pbcast.NAKACK(gc_lag=100;retransmit_timeout=600,1200,2400,4800):"
077: + "pbcast.STABLE(stability_delay=1000;desired_avg_gossip=20000;down_thread=false;max_bytes=0;up_thread=false):"
078: + "pbcast.GMS(print_local_addr=true;join_timeout=5000;join_retry_timeout=2000;shun=true)";
079: return props;
080: }
081:
082: /**
083: * Pushes messages down the channel as fast as possible. Sometimes this
084: * manages to bring the channel and the Router into deadlock. On the
085: * machine I run it usually happens after 700 - 1000 messages and I
086: * suspect that this number it is related to the socket buffer size.
087: * (the comments are written when I didn't solve the bug yet). <br>
088: * <p/>
089: * The number of messages sent can be controlled with msgCount.
090: * The time (in ms) the main threads wait for the all messages to come can
091: * be controlled with mainTimeout. If this time passes and the test
092: * doesn't see all the messages, it declares itself failed.
093: */
094: public void testStress() throws Exception {
095: String props = getTUNNELProps(routerPort);
096: channel = new JChannel(props);
097: channel.connect("agroup");
098:
099: // receiver thread
100: new Thread(new Runnable() {
101: public void run() {
102: try {
103: while (true) {
104: if (channel == null)
105: return;
106: Object o = channel.receive(10000);
107: if (o instanceof Message) {
108: receivedCnt++;
109: if (receivedCnt % 2000 == 0)
110: System.out.println("-- received "
111: + receivedCnt);
112: if (receivedCnt == msgCount) {
113: // let the main thread know I got all msgs
114: promise.setResult(new Object());
115: return;
116: }
117: }
118: }
119: } catch (TimeoutException e) {
120: System.err
121: .println("Timeout receiving from the channel. "
122: + receivedCnt
123: + " msgs received so far.");
124: } catch (Exception e) {
125: System.err.println("Error receiving data");
126: e.printStackTrace();
127: }
128: }
129: }).start();
130:
131: // stress send messages - the sender thread
132: new Thread(new Runnable() {
133: public void run() {
134: try {
135: for (int i = 0; i < msgCount; i++) {
136: channel.send(null, null, new byte[payloadSize]);
137: if (i % 2000 == 0)
138: System.out.println("-- sent " + i);
139: }
140: } catch (Exception e) {
141: System.err.println("Error sending data over ...");
142: e.printStackTrace();
143: }
144: }
145: }).start();
146:
147: // wait for all the messages to come; if I don't see all of them in
148: // mainTimeout ms, I fail the test
149:
150: Object result = promise.getResult(mainTimeout);
151: if (result == null) {
152: String msg = "The channel has failed to send/receive "
153: + msgCount
154: + " messages "
155: + "possibly because of the channel deadlock or too short "
156: + "timeout (currently " + mainTimeout + " ms). "
157: + receivedCnt + " messages received so far.";
158: fail(msg);
159: }
160:
161: // don't close it in tearDown() because it hangs forever for a failed
162: // test.
163: channel.close();
164: }
165:
166: public static Test suite() {
167: return new TestSuite(TUNNELDeadLockTest.class);
168: }
169:
170: public static void main(String[] args) {
171: junit.textui.TestRunner.run(suite());
172: System.exit(0);
173: }
174:
175: }
|