001: // $Id: AckMcastSenderWindowTest.java,v 1.3 2006/05/16 11:14:28 belaban Exp $
002: package org.jgroups.tests;
003:
004: import junit.framework.Test;
005: import junit.framework.TestCase;
006: import junit.framework.TestSuite;
007: import org.jgroups.Address;
008: import org.jgroups.Message;
009: import org.jgroups.stack.AckMcastSenderWindow;
010: import org.jgroups.stack.IpAddress;
011:
012: import java.util.ArrayList;
013: import java.util.Hashtable;
014: import java.util.List;
015: import java.util.Vector;
016: import java.net.UnknownHostException;
017:
018: /**
019: * Test <code>AckMcastSenderWindow</code>
020: * <p>
021: * <code>testAck()</code>:<br>
022: * 1. Create two messages {1,2} each with 3 distinct destinations.<br>
023: * 2. Start a thread that acknowledges messages between sleeping
024: * intervals.<br>
025: * 3. When the callback retransmission function is called, check that the
026: * request is for a destination that is still associated with the given
027: * message sequence number.<br>
028: * <p>
029: * Since <code>AckMcastSenderWindow</code> does not export its state, keep
030: * track of seqnos and address lists in a hashtable.
031: */
032: public class AckMcastSenderWindowTest extends TestCase {
033: private class Cmd implements AckMcastSenderWindow.RetransmitCommand {
034: public void retransmit(long seqno, Message msg, Address addr) {
035: _retransmit(seqno, msg, addr);
036: }
037: }
038:
039: private class Acker extends Thread {
040: public void run() {
041: _ackerRun();
042: }
043: }
044:
045: /** A list of destination addresses */
046: private static Address[] _RECVS = { new IpAddress(5000),
047: new IpAddress(5001), new IpAddress(5002) };
048:
049: /** The retransmit command */
050: private AckMcastSenderWindow.RetransmitCommand _cmd;
051: /** The mcast retransmit window */
052: private AckMcastSenderWindow _win;
053: /**
054: * 2-level table
055: * seqNo -> list of destinations
056: */
057: private Hashtable _tbl;
058:
059: /**
060: * Associate the given addess with this sequence number. This is to
061: * reflect the state of the <code>AckMcastSenderWindow</code> as its state
062: * is not exported
063: *
064: * @param seqno the sequence number
065: * @param addr the address to associate with the seqno
066: */
067: private void _put(long seqno, Address addr) {
068: List list;
069:
070: synchronized (_tbl) {
071: if ((list = (List) _tbl.get(new Long(seqno))) == null) {
072: list = new ArrayList();
073: _tbl.put(new Long(seqno), list);
074: }
075: if (!list.contains(addr))
076: list.add(addr);
077: else {
078: if (list.isEmpty())
079: _tbl.remove(new Long(seqno));
080: }
081: } // synchronized(_tbl)
082: }
083:
084: /**
085: * Remove the given address from the list of addresses for this seqno
086: *
087: * @param seqno the sequence number associated with a list of addresses
088: * @param addr the address to remove from the list of addresses mapped
089: * to this seqno
090: */
091: private void _remove(long seqno, Address addr) {
092: List list;
093:
094: synchronized (_tbl) {
095: if ((list = (List) _tbl.get(new Long(seqno))) == null)
096: return;
097: list.remove(addr);
098: if (list.isEmpty())
099: _tbl.remove(new Long(seqno));
100: } // synchronized(_tbl)
101: }
102:
103: /**
104: * @return true if <code>addr</code> is associated with <code>seqno</code>
105: */
106: private boolean _contains(long seqno, Address addr) {
107: List list;
108:
109: synchronized (_tbl) {
110: if ((list = (List) _tbl.get(new Long(seqno))) == null)
111: return (false);
112: return (list.contains(addr));
113: } // synchronized(_tbl)
114: }
115:
116: /**
117: * Thread acknowledging messages
118: */
119: private void _ackerRun() {
120: // Ack {2, _RECVS[2]}
121: _win.ack(2, _RECVS[2]);
122: _remove(2, _RECVS[2]);
123: try {
124: Thread.sleep(1000);
125: } catch (InterruptedException ex) {
126: ex.printStackTrace();
127: }
128:
129: // Ack {1, _RECVS[1]}
130: _win.ack(1, _RECVS[1]);
131: _remove(1, _RECVS[1]);
132: try {
133: Thread.sleep(500);
134: } catch (InterruptedException ex) {
135: ex.printStackTrace();
136: }
137:
138: // Ack {1, _RECVS[0]}
139: // Ack {2, _RECVS[0]}
140: // Ack {2, _RECVS[1]}
141: _win.ack(1, _RECVS[0]);
142: _remove(1, _RECVS[0]);
143: _win.ack(2, _RECVS[0]);
144: _remove(2, _RECVS[0]);
145: _win.ack(2, _RECVS[1]);
146: _remove(2, _RECVS[1]);
147: try {
148: Thread.sleep(500);
149: } catch (InterruptedException ex) {
150: ex.printStackTrace();
151: }
152:
153: // Ack {1, _RECVS[2]}
154: _win.ack(1, _RECVS[2]);
155: _remove(1, _RECVS[2]);
156: }
157:
158: /**
159: * Check if retransmission is expected
160: */
161: private void _retransmit(long seqno, Message msg, Address addr) {
162: if (!_contains(seqno, addr))
163: fail("Acknowledging a non-existent msg, great!");
164: else
165: System.out.println("retransmitting " + seqno);
166: }
167:
168: /**
169: * Add 2 messages to 3 destinations
170: *
171: * Start acknowledging messages while checking the validity of
172: * retransmissions
173: */
174: public void test1() {
175: Vector dests = new Vector();
176: Message msg = new Message();
177: Acker acker = new Acker();
178: long seqno;
179:
180: for (int i = 0; i < _RECVS.length; ++i)
181: dests.add(_RECVS[i]);
182:
183: // seqno/1
184: seqno = 1;
185: for (int i = 0; i < _RECVS.length; ++i)
186: _put(seqno, _RECVS[i]);
187: _win.add(seqno, msg, dests);
188:
189: // seqno/2
190: seqno = 2;
191: for (int i = 0; i < _RECVS.length; ++i)
192: _put(seqno, _RECVS[i]);
193: _win.add(seqno, msg, dests);
194:
195: // start
196: acker.start();
197: try {
198: acker.join();
199: } catch (InterruptedException ex) {
200: ex.printStackTrace();
201: }
202:
203: _win.stop();
204: } // testAck()
205:
206: public void testRemove() throws UnknownHostException {
207: AckMcastSenderWindow mywin = new AckMcastSenderWindow(
208: new MyCommand(), new long[] { 1000, 2000, 3000 });
209: Address sender1 = new IpAddress("127.0.0.1", 10000);
210: Address sender2 = new IpAddress("127.0.0.1", 10001);
211: Address sender3 = new IpAddress("127.0.0.1", 10002);
212: Vector senders = new Vector();
213: Message msg = new Message();
214: long seqno = 322649;
215:
216: senders.addElement(sender1);
217: senders.addElement(sender2);
218: senders.addElement(sender3);
219:
220: mywin.add(seqno, msg, (Vector) senders.clone()); // clone() for the fun of it...
221:
222: mywin.ack(seqno, sender1);
223: mywin.ack(seqno, sender2);
224:
225: System.out.println("entry is " + mywin.printDetails(seqno));
226: assertTrue(mywin.getNumberOfResponsesExpected(seqno) == 3);
227: assertTrue(mywin.getNumberOfResponsesReceived(seqno) == 2);
228: mywin.waitUntilAllAcksReceived(4000);
229: mywin.suspect(sender3);
230: assertTrue(mywin.size() == 0); // because suspect() removed that entry
231: }
232:
233: public AckMcastSenderWindowTest(String name) {
234: super (name);
235: }
236:
237: public void setUp() {
238: _cmd = new Cmd();
239: _win = new AckMcastSenderWindow(_cmd);
240: _tbl = new Hashtable();
241: }
242:
243: public void tearDown() {
244: _win.stop();
245: }
246:
247: class MyCommand implements AckMcastSenderWindow.RetransmitCommand {
248:
249: public void retransmit(long seqno, Message msg, Address dest) {
250: System.out.println("-- retransmitting " + seqno);
251: }
252: }
253:
254: public static Test suite() {
255: TestSuite suite;
256: suite = new TestSuite(AckMcastSenderWindowTest.class);
257: return (suite);
258: }
259:
260: public static void main(String[] args) {
261: String[] name = { AckMcastSenderWindowTest.class.getName() };
262: junit.textui.TestRunner.main(name);
263: }
264: }
|