001: package org.jgroups.tests.stack;
002:
003: import junit.framework.Test;
004: import junit.framework.TestCase;
005: import junit.framework.TestSuite;
006: import org.apache.commons.logging.Log;
007: import org.apache.commons.logging.LogFactory;
008: import org.jgroups.Address;
009: import org.jgroups.Message;
010: import org.jgroups.stack.RouterStub;
011: import org.jgroups.util.Promise;
012:
013: import java.util.List;
014: import java.util.Random;
015:
016: /**
017: * Tests routing protocol primitives with the new GossipRouter. Since 2.2.1,
018: * the GossipRouter is supposed to answer Gossip requests too.
019: * <p/>
020: * Note: Disable DEBUG logging before this test, otherwise the stress tests
021: * may timeout.
022: *
023: * @author Ovidiu Feodorov <ovidiuf@users.sourceforge.net>
024: * @author Bela Ban
025: * @version $Id: RouterStubTest.java,v 1.3 2006/10/23 16:16:20 belaban Exp $
026: * @since 2.2.1
027: */
028: public class RouterStubTest extends TestCase {
029: RouterStub stub, stub2;
030:
031: private static final Log log = LogFactory
032: .getLog(RouterStubTest.class);
033: private static final String groupName = "TESTGROUP";
034:
035: private int routerPort = -1;
036: private Random random = new Random();
037:
038: public RouterStubTest(String name) {
039: super (name);
040: }
041:
042: public void setUp() throws Exception {
043: super .setUp();
044: routerPort = Utilities.startGossipRouter("127.0.0.1");
045: stub = new RouterStub("127.0.0.1", routerPort);
046: }
047:
048: public void tearDown() throws Exception {
049: super .tearDown();
050: stub.disconnect();
051: if (stub2 != null)
052: stub2.disconnect();
053: Utilities.stopGossipRouter();
054: }
055:
056: /**
057: * Sends a GossipRouter.GET request to a router with an empty routing table.
058: */
059: public void testEmptyGET() throws Exception {
060: log.info("running testEmptyGET");
061: List mbrs = stub.get("nosuchgroup");
062: assertNotNull(mbrs);
063: assertEquals(0, mbrs.size());
064: }
065:
066: /**
067: * Sends a GossipRouter.CONNECT request followed by a GossipRouter.GET for the
068: * group just registered.
069: */
070: public void test_CONNECT_GET() throws Exception {
071: log.info("running test_CONNECT_GET");
072: stub.connect(groupName);
073: Address localAddr = stub.getLocalAddress();
074: System.out.println("-- my address is " + localAddr);
075: assertNotNull(localAddr);
076: List groupList = stub.get(groupName);
077: assertEquals(1, groupList.size());
078: assertEquals(localAddr, groupList.remove(0));
079: }
080:
081: /**
082: * Sends a GossipRouter.CONNECT request followed by a series of simple routing requests (to all
083: * members of the group, to itself, to an inexistent member).
084: */
085: public void test_CONNECT_Route_To_Self() throws Exception {
086: log.info("running test_CONNECT_Route_To_Self");
087: Message msg;
088:
089: stub.connect(groupName);
090: Address localAddr = stub.getLocalAddress();
091:
092: // registration is complete
093: String payload = "THIS IS A MESSAGE PAYLOAD "
094: + random.nextLong();
095:
096: // send a simple routing request to all members (null dest address)
097: msg = new Message(null, localAddr, payload);
098: stub.send(msg, groupName);
099:
100: Message rsp = stub.receive();
101: assertEquals(localAddr, rsp.getSrc());
102: assertEquals(payload, rsp.getObject());
103:
104: // send a simple routing request to itself
105: msg = new Message(localAddr, localAddr, payload);
106: stub.send(msg, groupName);
107: rsp = stub.receive();
108: assertEquals(localAddr, rsp.getSrc());
109: assertEquals(payload, rsp.getObject());
110: }
111:
112: public void test_CONNECT_Route_To_All() throws Exception {
113: log.info("running test_CONNECT_Route_To_All");
114: Message msg, msgCopy;
115:
116: stub2 = new RouterStub("127.0.0.1", routerPort);
117:
118: stub.connect(groupName); // register the first member
119: Address addr = stub.getLocalAddress();
120:
121: stub2.connect(groupName); // register the second member
122: addr = stub2.getLocalAddress();
123:
124: String payload = "THIS IS A MESSAGE PAYLOAD "
125: + random.nextLong();
126:
127: // the first member sends a simple routing request to all members (null dest address)
128: msg = new Message(null, addr, payload);
129: stub.send(msg, groupName);
130:
131: // only the second member should receive the routing request, the router won't send a
132: // message to the originator
133:
134: // the second member reads the message
135: msgCopy = stub2.receive();
136: assertEquals(addr, msgCopy.getSrc());
137: assertNull(msgCopy.getDest());
138: assertEquals(msg.getObject(), msgCopy.getObject());
139: stub2.disconnect();
140: }
141:
142: public void test_CONNECT_Route_To_Other() throws Exception {
143: log.info("running test_CONNECT_Route_To_Other");
144: Message msg, msgCopy;
145:
146: stub.connect(groupName);
147: Address localAddrOne = stub.getLocalAddress();
148:
149: stub2 = new RouterStub("127.0.0.1", routerPort);
150:
151: stub2.connect(groupName);
152: Address localAddrTwo = stub2.getLocalAddress();
153: String payload = "THIS IS A MESSAGE PAYLOAD "
154: + random.nextLong();
155:
156: // first member send a simple routing request to the second member
157: msg = new Message(localAddrTwo, localAddrOne, payload);
158: stub.send(msg, groupName);
159:
160: // the second member reads the message
161: msgCopy = stub2.receive();
162: assertEquals(localAddrOne, msgCopy.getSrc());
163: assertEquals(localAddrTwo, msgCopy.getDest());
164: assertEquals(msg.getObject(), msgCopy.getObject());
165: stub2.disconnect();
166: }
167:
168: /**
169: * Sends a GossipRouter.CONNECT request followed by a series of stress routing
170: * requests to all members of the group.
171: */
172: public void test_CONNECT_RouteStressAll() throws Exception {
173: log
174: .info("running test_CONNECT_RouteStressAll, this may take a while .... ");
175:
176: stub.connect(groupName);
177: final Address localAddrOne = stub.getLocalAddress();
178:
179: stub2 = new RouterStub("127.0.0.1", routerPort);
180: stub2.connect(groupName);
181:
182: // send a series of stress routing requests to all members
183: final int count = 20000; // total number of messages to be sent
184: int timeout = 50; // nr of secs to wait for all messages to arrive
185:
186: final boolean[] received = new boolean[count];
187: for (int i = 0; i < count; i++) {
188: received[i] = false;
189: }
190: final Promise waitingArea = new Promise();
191: long start = System.currentTimeMillis();
192:
193: new Thread(new Runnable() {
194: public void run() {
195: for (int i = 0; i < count; i++) {
196: Message msg = new Message(null, localAddrOne,
197: new Integer(i));
198: try {
199: stub.send(msg, groupName);
200: if (i % 2000 == 0)
201: System.out.println("--sent " + i);
202: } catch (Exception e) {
203: waitingArea.setResult(e);
204: }
205: }
206: }
207: }, "Sending Thread").start();
208:
209: new Thread(new Runnable() {
210: public void run() {
211: int cnt = 0;
212: while (cnt < count) {
213: try {
214: Message msg = stub2.receive();
215: int index = ((Integer) msg.getObject())
216: .intValue();
217: received[index] = true;
218: cnt++;
219: if (cnt % 2000 == 0)
220: System.out.println("-- [stub2] received "
221: + cnt);
222: } catch (Exception e) {
223: waitingArea.setResult(e);
224: }
225: }
226: waitingArea.setResult(Boolean.TRUE);
227: }
228: }, "Receiving Thread stub2").start();
229:
230: new Thread(new Runnable() {
231: public void run() {
232: int cnt = 0;
233: while (cnt < count) {
234: try {
235: Message msg = stub.receive();
236: int index = ((Integer) msg.getObject())
237: .intValue();
238: received[index] = true;
239: cnt++;
240: if (cnt % 2000 == 0)
241: System.out.println("-- [stub] received "
242: + cnt);
243: } catch (Exception e) {
244: waitingArea.setResult(e);
245: }
246: }
247: waitingArea.setResult(Boolean.TRUE);
248: }
249: }, "Receiving Thread stub").start();
250:
251: // wait here the stress threads to finish
252: Object result = waitingArea.getResult((long) timeout * 1000);
253: long stop = System.currentTimeMillis();
254: stub2.disconnect();
255:
256: int messok = 0;
257: for (int i = 0; i < count; i++) {
258: if (received[i]) {
259: messok++;
260: }
261: }
262:
263: if (result == null) {
264: fail("Timeout while waiting for all messages to be received. "
265: + messok
266: + " messages out of "
267: + count
268: + " received so far.");
269: }
270: if (result instanceof Exception) {
271: throw (Exception) result;
272: }
273:
274: // make sure all messages have been received
275: for (int i = 0; i < count; i++) {
276: if (!received[i]) {
277: fail("At least message " + i + " NOT RECEIVED");
278: }
279: }
280: System.out.println("STRESS TEST OK, " + count + " messages, "
281: + 1000 * count / (stop - start) + " messages/sec");
282: }
283:
284: public static Test suite() {
285: return new TestSuite(RouterStubTest.class);
286: }
287:
288: public static void main(String[] args) {
289: junit.textui.TestRunner.run(suite());
290: System.exit(0);
291: }
292:
293: }
|