001: // $Id: SendAndReceiveTest.java,v 1.4 2004/07/05 14:15:04 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import junit.framework.TestCase;
006: import org.jgroups.JChannel;
007: import org.jgroups.Message;
008:
009: /**
010: * Tests sending and receiving of messages within the same VM. Sends N messages
011: * and expects reception of N messages within a given time. Fails otherwise.
012: * @author Bela Ban
013: */
014: public class SendAndReceiveTest extends TestCase {
015: JChannel channel;
016: final int NUM_MSGS = 1000;
017: final long TIMEOUT = 30000;
018:
019: String props1 = "UDP(loopback=true;mcast_addr=228.8.8.8;mcast_port=27000;ip_ttl=1;"
020: + "mcast_send_buf_size=64000;mcast_recv_buf_size=64000):"
021: +
022: //"PIGGYBACK(max_wait_time=100;max_size=32000):" +
023: "PING(timeout=2000;num_initial_members=3):"
024: + "MERGE2(min_interval=5000;max_interval=10000):"
025: + "FD_SOCK:"
026: + "VERIFY_SUSPECT(timeout=1500):"
027: + "pbcast.NAKACK(max_xmit_size=8096;gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"
028: + "UNICAST(timeout=600,1200,2400,4800):"
029: + "pbcast.STABLE(desired_avg_gossip=20000):"
030: + "FRAG(frag_size=8096;down_thread=false;up_thread=false):"
031: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
032: + "shun=false;print_local_addr=true)";
033:
034: String props2 = "UDP(loopback=false;mcast_addr=228.8.8.8;mcast_port=27000;ip_ttl=1;"
035: + "mcast_send_buf_size=64000;mcast_recv_buf_size=64000):"
036: +
037: //"PIGGYBACK(max_wait_time=100;max_size=32000):" +
038: "PING(timeout=2000;num_initial_members=3):"
039: + "MERGE2(min_interval=5000;max_interval=10000):"
040: + "FD_SOCK:"
041: + "VERIFY_SUSPECT(timeout=1500):"
042: + "pbcast.NAKACK(max_xmit_size=8096;gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"
043: + "UNICAST(timeout=600,1200,2400,4800):"
044: + "pbcast.STABLE(desired_avg_gossip=20000):"
045: + "FRAG(frag_size=8096;down_thread=false;up_thread=false):"
046: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
047: + "shun=false;print_local_addr=true)";
048:
049: String props3 = "LOOPBACK:"
050: + "PING(timeout=2000;num_initial_members=3):"
051: + "MERGE2(min_interval=5000;max_interval=10000):"
052: + "FD_SOCK:"
053: + "VERIFY_SUSPECT(timeout=1500):"
054: + "pbcast.NAKACK(max_xmit_size=8096;gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"
055: + "UNICAST(timeout=600,1200,2400,4800):"
056: + "pbcast.STABLE(desired_avg_gossip=20000):"
057: + "FRAG(frag_size=8096;down_thread=false;up_thread=false):"
058: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
059: + "shun=false;print_local_addr=true)";
060:
061: public SendAndReceiveTest(String n) {
062: super (n);
063:
064: }
065:
066: public void setUp(String props) {
067: try {
068: channel = new JChannel(props);
069: channel.connect("test1");
070: } catch (Throwable t) {
071: t.printStackTrace(System.err);
072: fail("channel could not be created");
073: }
074: }
075:
076: public void tearDown() {
077: if (channel != null) {
078: channel.close();
079: channel = null;
080: }
081: }
082:
083: /**
084: * Sends NUM messages and expects NUM messages to be received. If
085: * NUM messages have not been received after 20 seconds, the test failed.
086: */
087: public void testSendAndReceiveWithDefaultUDP_Loopback() {
088: setUp(props1);
089: sendMessages(NUM_MSGS);
090: int received_msgs = receiveMessages(NUM_MSGS, TIMEOUT);
091: assertTrue(received_msgs >= NUM_MSGS);
092: }
093:
094: public void testSendAndReceiveWithDefaultUDP_NoLoopback() {
095: setUp(props2);
096: sendMessages(NUM_MSGS);
097: int received_msgs = receiveMessages(NUM_MSGS, TIMEOUT);
098: assertTrue(received_msgs >= NUM_MSGS);
099: }
100:
101: public void testSendAndReceiveWithLoopback() {
102: setUp(props3);
103: sendMessages(NUM_MSGS);
104: int received_msgs = receiveMessages(NUM_MSGS, TIMEOUT);
105: assertTrue(received_msgs >= NUM_MSGS);
106: }
107:
108: private void sendMessages(int num) {
109: Message msg;
110: for (int i = 0; i < num; i++) {
111: try {
112: msg = new Message();
113: channel.send(msg);
114: System.out.print(i + " ");
115: } catch (Throwable t) {
116: fail("could not send message #" + i);
117: }
118: }
119: }
120:
121: /**
122: * Receive at least <tt>num</tt> messages. Total time should not exceed <tt>timeout</tt>
123: * @param num
124: * @param timeout Must be > 0
125: * @return
126: */
127: private int receiveMessages(int num, long timeout) {
128: int received = 0;
129: Object msg;
130:
131: if (timeout <= 0)
132: timeout = 5000;
133:
134: long start = System.currentTimeMillis(), current, wait_time;
135: while (true) {
136: current = System.currentTimeMillis();
137: wait_time = timeout - (current - start);
138: if (wait_time <= 0)
139: break;
140: try {
141: msg = channel.receive(wait_time);
142: if (msg instanceof Message) {
143: received++;
144: System.out.print("+" + received + ' ');
145: }
146: if (received >= num)
147: break;
148: } catch (Throwable t) {
149: fail("failed receiving message");
150: }
151: }
152: return received;
153: }
154:
155: public static void main(String[] args) {
156: String[] testCaseName = { SendAndReceiveTest.class.getName() };
157: junit.textui.TestRunner.main(testCaseName);
158: } //public static void main(String[] args)
159:
160: }
|