001: // $Id: NakackTest.java,v 1.7 2006/01/28 10:51:29 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import junit.framework.Test;
006: import junit.framework.TestCase;
007: import junit.framework.TestSuite;
008: import org.jgroups.*;
009: import org.jgroups.debug.ProtocolTester;
010: import org.jgroups.stack.IpAddress;
011: import org.jgroups.stack.Protocol;
012:
013: import java.util.Hashtable;
014: import java.util.Vector;
015:
016: public class NakackTest extends TestCase {
017: final long WAIT_TIME = 5000;
018: public final long NUM_MSGS = 10000;
019: long num_msgs_received = 0;
020: long num_msgs_sent = 0;
021:
022: public NakackTest(String name) {
023: super (name);
024: }
025:
026: public void setUp() throws Exception {
027: super .setUp();
028: num_msgs_received = 0;
029: num_msgs_sent = 0;
030: }
031:
032: public void test0() throws Exception {
033: Object mutex = new Object();
034: CheckNoGaps check = new CheckNoGaps(-1, this , mutex);
035: ProtocolTester t = new ProtocolTester("pbcast.NAKACK", check);
036: Address my_addr = new IpAddress("localhost", 10000);
037: ViewId vid = new ViewId(my_addr, 322649);
038: Vector mbrs = new Vector();
039: View view;
040:
041: mbrs.addElement(my_addr);
042: view = new View(vid, mbrs);
043:
044: t.start();
045: check.down(new Event(Event.BECOME_SERVER));
046: check.down(new Event(Event.VIEW_CHANGE, view));
047:
048: synchronized (mutex) {
049: for (long i = 0; i < NUM_MSGS; i++) {
050: if (i % 1000 == 0 && i > 0)
051: System.out.println("sending msg #" + i);
052: check.down(new Event(Event.MSG, new Message(null,
053: my_addr, new Long(i))));
054: num_msgs_sent++;
055: }
056: mutex.wait(WAIT_TIME);
057: }
058: System.out.println("\nMessages sent: " + num_msgs_sent
059: + ", messages received: " + num_msgs_received);
060: assertEquals(num_msgs_received, num_msgs_sent);
061: t.stop();
062: }
063:
064: public static Test suite() {
065: return new TestSuite(NakackTest.class);
066: }
067:
068: public static void main(String[] args) {
069: junit.textui.TestRunner.run(suite());
070: }
071:
072: private static class CheckNoGaps extends Protocol {
073: long starting_seqno = 0;
074: long num_msgs = 0;
075: Hashtable senders = new Hashtable(); // sender --> highest seqno received so far
076: NakackTest t = null;
077: Object mut = null;
078:
079: CheckNoGaps(long seqno, NakackTest t, Object mut) {
080: starting_seqno = seqno;
081: this .t = t;
082: this .mut = mut;
083: }
084:
085: public String getName() {
086: return "CheckNoGaps";
087: }
088:
089: public void up(Event evt) {
090: Message msg = null;
091: Address sender;
092: long highest_seqno, received_seqno;
093: Long s;
094:
095: if (evt == null)
096: return;
097:
098: if (evt.getType() == Event.SET_LOCAL_ADDRESS) {
099: System.out.println("local address is " + evt.getArg());
100: }
101:
102: if (evt.getType() != Event.MSG)
103: return;
104: msg = (Message) evt.getArg();
105: sender = msg.getSrc();
106: if (sender == null) {
107: log
108: .error("NakackTest.CheckNoGaps.up(): sender is null; discarding msg");
109: return;
110: }
111: s = (Long) senders.get(sender);
112: if (s == null) {
113: s = new Long(starting_seqno);
114: senders.put(sender, s);
115: }
116:
117: highest_seqno = s.longValue();
118:
119: try {
120: s = (Long) msg.getObject();
121: received_seqno = s.longValue();
122: if (received_seqno == highest_seqno + 1) {
123: // correct
124: if (received_seqno % 1000 == 0
125: && received_seqno > 0)
126: System.out.println("PASS: received msg #"
127: + received_seqno);
128: senders.put(sender, new Long(highest_seqno + 1));
129: num_msgs++;
130: if (num_msgs >= t.NUM_MSGS) {
131: synchronized (mut) {
132: t.num_msgs_received = num_msgs;
133: mut.notifyAll();
134: }
135: }
136: } else {
137: // error, terminate test
138: log.error("FAIL: received msg #" + received_seqno);
139: }
140: } catch (Exception ex) {
141: log.error("NakackTest.CheckNoGaps.up(): " + ex);
142: }
143:
144: }
145:
146: public void startUpHandler() {
147: ;
148: }
149:
150: public void startDownHandler() {
151: ;
152: }
153:
154: }
155: }
|