001: package org.jgroups.tests;
002:
003: import junit.framework.Test;
004: import junit.framework.TestCase;
005: import junit.framework.TestSuite;
006: import org.jgroups.*;
007: import org.jgroups.util.Util;
008: import org.jgroups.util.Promise;
009:
010: import java.io.InputStream;
011: import java.io.ObjectInputStream;
012: import java.io.ObjectOutputStream;
013: import java.io.OutputStream;
014: import java.util.Collections;
015: import java.util.HashMap;
016: import java.util.Map;
017: import java.util.Set;
018:
019: /**
020: * Tests correct state transfer while other members continue sending messages to the group
021: * @author Bela Ban
022: * @version $Id: StateTransferTest.java,v 1.10 2006/09/22 12:33:12 belaban Exp $
023: */
024: public class StateTransferTest extends TestCase {
025: final int NUM = 10000;
026: final int NUM_THREADS = 2;
027: String props = "udp.xml";
028:
029: public StateTransferTest(String name) {
030: super (name);
031: }
032:
033: protected void setUp() throws Exception {
034: props = System.getProperty("props", props);
035: log("Using configuration file " + props);
036: super .setUp();
037: }
038:
039: public void testStateTransferWhileSending() throws Exception {
040: Worker[] workers = new Worker[NUM_THREADS];
041:
042: int from = 0, to = NUM;
043:
044: for (int i = 0; i < workers.length; i++) {
045: workers[i] = new Worker(from, to);
046: from += NUM;
047: to += NUM;
048: }
049:
050: for (int i = 0; i < workers.length; i++) {
051: Worker worker = workers[i];
052: worker.start();
053: Util.sleep(50); // to have threads join the group a bit later and get the state
054: }
055:
056: for (int i = 0; i < workers.length; i++) {
057: Worker worker = workers[i];
058: worker.waitUntilDone();
059: }
060: for (int i = 0; i < workers.length; i++) {
061: Worker worker = workers[i];
062: worker.stop();
063: }
064:
065: log("\n\nhashmaps\n");
066: for (int i = 0; i < workers.length; i++) {
067: Worker w = workers[i];
068: Map m = w.getMap();
069: log("map has " + m.size() + " elements");
070: assertEquals(NUM * NUM_THREADS, m.size());
071: }
072:
073: Set keys = workers[0].getMap().keySet();
074: for (int i = 0; i < workers.length; i++) {
075: Worker w = workers[i];
076: Map m = w.getMap();
077: Set s = m.keySet();
078: assertEquals(keys, s);
079: }
080: }
081:
082: class Worker implements Runnable {
083: JChannel ch;
084: int to;
085: int from;
086: final Promise promise = new Promise();
087: Thread t;
088: Receiver receiver;
089:
090: public Worker(int from, int to) {
091: this .to = to;
092: this .from = from;
093: }
094:
095: public Map getMap() {
096: return receiver.getMap();
097: }
098:
099: void start() throws Exception {
100: ch = new JChannel(props);
101: ch.connect("StateTransferTest-Group");
102: receiver = new Receiver(ch, promise);
103: boolean rc = ch.getState(null, 10000);
104: if (rc)
105: log("state transfer: OK");
106: else {
107: if (ch.getView().size() == 1)
108: log("state transfer: OK");
109: else
110: log("state transfer: FAIL");
111: }
112:
113: receiver.setName("Receiver [" + from + " - " + to + "]");
114: receiver.start();
115: if (rc)
116: promise.getResult();
117:
118: t = new Thread(this );
119: t.setName("Worker [" + from + " - " + to + "]");
120: t.start();
121: }
122:
123: public void stop() {
124: ch.close();
125: }
126:
127: void waitUntilDone() throws InterruptedException {
128: t.join();
129: receiver.join();
130: }
131:
132: public void run() {
133: Object[] data = new Object[2];
134: log("Worker thread started (sending msgs from " + from
135: + " to " + to + " (excluding " + to + ")");
136: for (int i = from; i < to; i++) {
137: data[0] = new Integer(i);
138: data[1] = "Value #" + i;
139: try {
140: ch.send(null, null, data);
141: if (i % 1000 == 0)
142: log("sent " + i);
143: // log("sent " + data[0]);
144: } catch (Exception e) {
145: e.printStackTrace();
146: break;
147: }
148: }
149: }
150: }
151:
152: class Receiver extends Thread {
153: JChannel ch;
154: Promise promise;
155: Map map;
156:
157: public Receiver(JChannel ch, Promise promise) {
158: this .ch = ch;
159: this .promise = promise;
160: map = Collections.synchronizedMap(new HashMap(NUM
161: * NUM_THREADS));
162: }
163:
164: public Map getMap() {
165: return map;
166: }
167:
168: public void run() {
169: Object obj, prev_val;
170: Object[] data;
171: int num_received = 0, to_be_received = NUM * NUM_THREADS;
172:
173: log("Receiver thread started");
174: while (ch.isConnected()) {
175: try {
176: obj = ch.receive(0);
177: if (obj instanceof Message) {
178: data = (Object[]) ((Message) obj).getObject();
179: prev_val = map.put(data[0], data[1]);
180: if (prev_val != null) // we have a duplicate value
181: continue;
182: num_received = map.size();
183: if (num_received % 1000 == 0)
184: log("received " + num_received);
185:
186: // log("received " + data[0] + " total: " + num_received + ")");
187:
188: if (num_received >= to_be_received) {
189: log("DONE: received " + num_received
190: + " messages");
191: break;
192: }
193: } else if (obj instanceof View) {
194: log("VIEW: " + obj);
195: } else if (obj instanceof GetStateEvent) {
196: byte[] state = Util.objectToByteBuffer(map);
197: log("returning state, map has " + map.size()
198: + " elements");
199: ch.returnState(state);
200: } else if (obj instanceof SetStateEvent) {
201: byte state[] = ((SetStateEvent) obj).getArg();
202: if (state == null) {
203: log("received null state");
204: } else {
205: Map tmp = (Map) Util
206: .objectFromByteBuffer(state);
207: log("received state, map has " + tmp.size()
208: + " elements");
209: map = Collections.synchronizedMap(tmp);
210: }
211: promise.setResult(Boolean.TRUE);
212: } else if (obj instanceof StreamingGetStateEvent) {
213: StreamingGetStateEvent evt = (StreamingGetStateEvent) obj;
214: OutputStream stream = evt.getArg();
215: ObjectOutputStream out = new ObjectOutputStream(
216: stream);
217: synchronized (map) {
218: out.writeObject(map);
219: }
220: out.close();
221: } else if (obj instanceof StreamingSetStateEvent) {
222: StreamingSetStateEvent evt = (StreamingSetStateEvent) obj;
223: InputStream stream = evt.getArg();
224: ObjectInputStream in = new ObjectInputStream(
225: stream);
226: map = Collections.synchronizedMap((Map) in
227: .readObject());
228: in.close();
229: promise.setResult(Boolean.TRUE);
230: }
231: } catch (Exception e) {
232: log("receiver thread terminated due to exception: "
233: + e);
234: break;
235: }
236: }
237: log("Receiver thread terminated");
238: }
239: }
240:
241: static void log(String msg) {
242: System.out.println(Thread.currentThread() + " -- " + msg);
243: }
244:
245: public static Test suite() {
246: return new TestSuite(StateTransferTest.class);
247: }
248:
249: public static void main(String[] args) {
250: junit.textui.TestRunner.run(suite());
251: }
252:
253: }
|