001: package org.jgroups.protocols;
002:
003: import junit.framework.Test;
004: import junit.framework.TestCase;
005: import junit.framework.TestSuite;
006: import org.jgroups.Event;
007: import org.jgroups.Global;
008: import org.jgroups.Message;
009: import org.jgroups.View;
010: import org.jgroups.debug.Simulator;
011: import org.jgroups.stack.IpAddress;
012: import org.jgroups.stack.Protocol;
013:
014: import java.nio.ByteBuffer;
015: import java.util.Properties;
016: import java.util.Vector;
017:
018: /**
019: * Tests the fragmentation (FRAG) protocol for http://jira.jboss.com/jira/browse/JGRP-215
020: * @author Bela Ban
021: */
022: public class FRAG_Test extends TestCase {
023: IpAddress a1;
024: Vector members;
025: View v;
026: static Simulator s = null;
027: static int num_done = 0;
028:
029: static Sender[] senders = null;
030:
031: public static final int SIZE = 10000; // bytes
032: public static final int NUM_MSGS = 10;
033: public static final int NUM_THREADS = 100;
034:
035: public FRAG_Test(String name) {
036: super (name);
037: }
038:
039: public void setUp() throws Exception {
040: super .setUp();
041: a1 = new IpAddress(1111);
042: members = new Vector();
043: members.add(a1);
044: v = new View(a1, 1, members);
045: s = new Simulator();
046: s.setLocalAddress(a1);
047: s.setView(v);
048: s.addMember(a1);
049: Protocol frag = createProtocol();
050: System.out.println("protocol to be tested: " + frag);
051: Properties props = new Properties();
052: props.setProperty("frag_size", "512");
053: props.setProperty("up_thread", "false");
054: props.setProperty("down_thread", "false");
055: frag.setPropertiesInternal(props);
056: Protocol[] stack = new Protocol[] { frag };
057: s.setProtocolStack(stack);
058: s.start();
059: }
060:
061: protected Protocol createProtocol() {
062: return new FRAG();
063: }
064:
065: public void tearDown() throws Exception {
066: super .tearDown();
067: s.stop();
068: }
069:
070: public void testFragmentation() throws InterruptedException {
071: FRAG_Test.Receiver r = new FRAG_Test.Receiver();
072: s.setReceiver(r);
073:
074: senders = new Sender[NUM_THREADS];
075: for (int i = 0; i < senders.length; i++) {
076: senders[i] = new Sender(i);
077: }
078:
079: for (int i = 0; i < senders.length; i++) {
080: Sender sender = senders[i];
081: sender.start();
082: }
083:
084: for (int i = 0; i < senders.length; i++) {
085: Sender sender = senders[i];
086: sender.join(5000);
087: if (sender.isAlive()) {
088: System.err.println("sender #" + i
089: + " could not be joined (still alive)");
090: }
091: }
092:
093: int sent = 0, received = 0, corrupted = 0;
094: for (int i = 0; i < senders.length; i++) {
095: Sender sender = senders[i];
096: received += sender.getNumReceived();
097: sent += sender.getNumSent();
098: corrupted += sender.getNumCorrupted();
099: }
100:
101: System.out.println("sent: " + sent + ", received: " + received
102: + ", corrupted: " + corrupted);
103: assertEquals("sent and received should be the same", sent,
104: received);
105: assertEquals("we should have 0 corrupted messages", 0,
106: corrupted);
107: }
108:
109: static class Sender extends Thread {
110: int id = -1;
111: int num_sent = 0;
112: int num_received = 0;
113: int num_corrupted = 0;
114: boolean done = false;
115:
116: public int getIdent() {
117: return id;
118: }
119:
120: public int getNumReceived() {
121: return num_received;
122: }
123:
124: public int getNumSent() {
125: return num_sent;
126: }
127:
128: public int getNumCorrupted() {
129: return num_corrupted;
130: }
131:
132: public Sender(int id) {
133: super ("sender #" + id);
134: this .id = id;
135: }
136:
137: public void run() {
138: byte[] buf = createBuffer(id);
139: Message msg;
140: Event evt;
141:
142: for (int i = 0; i < NUM_MSGS; i++) {
143: msg = new Message(null, null, buf);
144: evt = new Event(Event.MSG, msg);
145: s.send(evt);
146: num_sent++;
147: }
148:
149: synchronized (this ) {
150: try {
151: while (!done)
152: this .wait(500);
153: num_done++;
154: System.out.println("thread #" + id + " is done ("
155: + num_done + ")");
156: } catch (InterruptedException e) {
157: }
158: }
159: }
160:
161: private byte[] createBuffer(int id) {
162: ByteBuffer buf = ByteBuffer.allocate(SIZE);
163: int elements = SIZE / Global.INT_SIZE;
164: for (int i = 0; i < elements; i++) {
165: buf.putInt(id);
166: }
167: return buf.array();
168: }
169:
170: /** 1 int has already been read by the Receiver */
171: public void verify(ByteBuffer buf) {
172: boolean corrupted = false;
173:
174: int num_elements = (SIZE / Global.INT_SIZE) - 1;
175: int tmp;
176: for (int i = 0; i < num_elements; i++) {
177: tmp = buf.getInt();
178: if (tmp != id) {
179: corrupted = true;
180: break;
181: }
182: }
183:
184: if (corrupted)
185: num_corrupted++;
186: else
187: num_received++;
188:
189: if (num_corrupted + num_received >= NUM_MSGS) {
190: synchronized (this ) {
191: done = true;
192: this .notify();
193: }
194: }
195: }
196: }
197:
198: static class Receiver implements Simulator.Receiver {
199: int received = 0;
200:
201: public void receive(Event evt) {
202: if (evt.getType() == Event.MSG) {
203: received++;
204: if (received % 1000 == 0)
205: System.out.println("<== " + received);
206:
207: Message msg = (Message) evt.getArg();
208: byte[] data = msg.getBuffer();
209: ByteBuffer buf = ByteBuffer.wrap(data);
210: int id = buf.getInt();
211: Sender sender = senders[id];
212: sender.verify(buf);
213: }
214: }
215: }
216:
217: public static Test suite() {
218: return new TestSuite(FRAG_Test.class);
219: }
220:
221: public static void main(String[] args) {
222: junit.textui.TestRunner.run(FRAG_Test.suite());
223: }
224: }
|