001: package org.jgroups.tests;
002:
003: import java.io.IOException;
004: import java.io.InputStream;
005: import java.io.OutputStream;
006: import java.util.ArrayList;
007: import java.util.List;
008: import java.util.Random;
009:
010: import junit.framework.Test;
011: import junit.framework.TestCase;
012: import junit.framework.TestSuite;
013:
014: import org.jgroups.Address;
015: import org.jgroups.BlockEvent;
016: import org.jgroups.Channel;
017: import org.jgroups.ExtendedReceiver;
018: import org.jgroups.JChannel;
019: import org.jgroups.Message;
020: import org.jgroups.StreamingGetStateEvent;
021: import org.jgroups.StreamingSetStateEvent;
022: import org.jgroups.TimeoutException;
023: import org.jgroups.UnblockEvent;
024: import org.jgroups.View;
025: import org.jgroups.ViewId;
026: import org.jgroups.blocks.RpcDispatcher;
027: import org.jgroups.util.Util;
028:
029: /**
030: * Tests streaming state transfer for both pull and push mode of channel
031: * operations. Size of the transfer is configurable. Test runner should
032: * specify "pull" and "size" parameter as JVM parameters when running this
033: * test. If not specified default values are to use push mode and transfer
034: * size of 100 MB.
035: *
036: * <p>
037: *
038: * To specify pull mode and size transfer of 500 MB test runner should pass
039: * JVM parameters:
040: *
041: * <p>
042: * -Dpull=true -Dsize=500
043: *
044: *
045: * @author Vladimir Blagojevic
046: * @version $Id$
047: *
048: */
049: public class StreamingStateTransferTest extends TestCase {
050:
051: private final static String CHANNEL_PROPS = "streaming-state-transfer.xml";
052: private final static int INITIAL_NUMBER_OF_MEMBERS = 5;
053: private int runningTime = 1000 * 50; // 50 secs
054: private Random r = new Random();
055: private boolean usePullMode = false;
056: private boolean useDisp = false;
057: private int size = 100; //100MB
058:
059: private final static int MEGABYTE = 1048576;
060:
061: public StreamingStateTransferTest(String arg0) {
062: super (arg0);
063: }
064:
065: public void testTransfer() throws Exception {
066: long start = System.currentTimeMillis();
067: boolean running = true;
068: List members = new ArrayList();
069:
070: //first spawn and join
071: for (int i = 0; i < INITIAL_NUMBER_OF_MEMBERS; i++) {
072: GroupMember member = new GroupMember(usePullMode, useDisp,
073: size);
074: members.add(member);
075: Thread t = new Thread(member);
076: t.start();
077: Util.sleep(getRandomDelayInSeconds(10, 12) * 1000);
078: }
079:
080: for (; running;) {
081:
082: //and then flip a coin
083: if (r.nextBoolean()) {
084: Util.sleep(getRandomDelayInSeconds(10, 12) * 1000);
085: GroupMember member = new GroupMember(usePullMode,
086: useDisp, size);
087: members.add(member);
088: Thread t = new Thread(member);
089: t.start();
090: } else if (members.size() > 1) {
091: Util.sleep(getRandomDelayInSeconds(3, 8) * 1000);
092: GroupMember unluckyBastard = (GroupMember) members
093: .get(r.nextInt(members.size()));
094: if (!unluckyBastard.isCoordinator()) {
095: members.remove(unluckyBastard);
096: unluckyBastard.stopRunning();
097: } else {
098: System.out.println("Not killing coordinator ");
099: }
100: }
101: running = System.currentTimeMillis() - start <= runningTime;
102: System.out.println("Running time "
103: + ((System.currentTimeMillis() - start) / 1000)
104: + " secs");
105: }
106: System.out.println("Done");
107: }
108:
109: protected int getRandomDelayInSeconds(int from, int to) {
110: return from + r.nextInt(to - from);
111: }
112:
113: protected void setUp() throws Exception {
114:
115: //NOTE use -Ddisp=true|false -Dpull=true|false -Dsize=int (size of transfer)
116:
117: String prop = System.getProperty("disp");
118: if (prop != null) {
119: useDisp = prop.equalsIgnoreCase("true");
120: System.out.println("Using parameter disp=" + useDisp);
121: }
122: prop = System.getProperty("pull");
123: if (prop != null) {
124: usePullMode = prop.equalsIgnoreCase("true");
125: System.out.println("Using parameter usePullMode="
126: + usePullMode);
127: }
128:
129: prop = System.getProperty("size");
130: if (prop != null) {
131: size = Integer.parseInt(System.getProperty("size"));
132: System.out.println("Using parameter size=" + size);
133: }
134: super .setUp();
135: }
136:
137: protected void tearDown() throws Exception {
138: super .tearDown();
139: }
140:
141: public static Test suite() {
142: return new TestSuite(StreamingStateTransferTest.class);
143: }
144:
145: public static void main(String[] args) {
146: String[] testCaseName = { StreamingStateTransferTest.class
147: .getName() };
148: junit.textui.TestRunner.main(testCaseName);
149: }
150:
151: private static class GroupMember implements Runnable,
152: ExtendedReceiver {
153: JChannel ch = null;
154: View currentView;
155: volatile boolean running = true;
156: private int stateSize;
157: private int bufferSize = 8 * 1024;
158: private boolean usePullMode;
159: private Random ran = new Random();
160: private boolean useDispacher;
161:
162: public GroupMember(boolean pullMode, boolean dispMode, int size) {
163: setStateSize(size * MEGABYTE);
164: setUsePullMode(pullMode);
165: setUseDispatcher(dispMode);
166: try {
167: ch = new JChannel(CHANNEL_PROPS);
168: ch.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
169: ch.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
170: ch.setOpt(Channel.BLOCK, Boolean.TRUE);
171: if (useDispacher) {
172: RpcDispatcher disp = new RpcDispatcher(ch, this ,
173: this , this );
174: } else if (!usePullMode) {
175: ch.setReceiver(this );
176: }
177: ch.connect("transfer");
178: } catch (Exception e) {
179: e.printStackTrace();
180: }
181: }
182:
183: public final void setUsePullMode(boolean usePullMode) {
184: this .usePullMode = usePullMode;
185: }
186:
187: public final void setUseDispatcher(boolean useDispacher) {
188: this .useDispacher = useDispacher;
189: }
190:
191: public String getAddress() {
192: if (ch != null && ch.isConnected()) {
193: return ch.getLocalAddress().toString();
194: }
195: return null;
196: }
197:
198: public void stopRunning() {
199: running = false;
200: System.out.println("Disconnect " + getAddress());
201: if (ch != null)
202: ch.close();
203: }
204:
205: protected boolean isCoordinator() {
206: if (ch == null)
207: return false;
208: Object local_addr = ch.getLocalAddress();
209: if (local_addr == null)
210: return false;
211: View view = ch.getView();
212: if (view == null)
213: return false;
214: ViewId vid = view.getVid();
215: if (vid == null)
216: return false;
217: Object coord = vid.getCoordAddress();
218: if (coord == null)
219: return false;
220: return local_addr.equals(coord);
221: }
222:
223: public final void setStateSize(int stateSize) {
224: this .stateSize = stateSize;
225: }
226:
227: public void run() {
228: Runnable r = new Runnable() {
229: public void run() {
230: try {
231: if (ran.nextBoolean()) {
232: ch.getState(null, 5000);
233: } else {
234: String randomStateId = Long.toString(Math
235: .abs(ran.nextLong()), 36);
236: ch.getState(null, randomStateId, 5000);
237: }
238: } catch (Exception e) {
239: e.printStackTrace();
240: }
241: }
242: };
243: if (usePullMode) {
244:
245: //when BLOCK events are turned on, pbcast.FLUSH is used and we use pull channel mode
246: //we have to getState on a separate thread. Why? Because joining member has to immediatelly
247: //go into receive and fetch/respond to block event which is received as part of state transfer.
248: new Thread(r).start();
249: } else {
250: r.run();
251: }
252: while (running) {
253: Object msgReceived = null;
254: try {
255: msgReceived = ch.receive(0);
256: if (msgReceived instanceof BlockEvent) {
257: this .block();
258: ch.blockOk();
259: } else if (msgReceived instanceof UnblockEvent) {
260: this .unblock();
261: }
262: if (!running) {
263: // I am not a group member anymore so
264: // I will discard any transient message I
265: // receive
266: } else {
267: if (msgReceived instanceof View) {
268: } else if (msgReceived instanceof StreamingGetStateEvent) {
269: StreamingGetStateEvent evt = (StreamingGetStateEvent) msgReceived;
270: if (evt.getStateId() != null) {
271: this .getState(evt.getStateId(), evt
272: .getArg());
273: } else {
274: this .getState(evt.getArg());
275: }
276: } else if (msgReceived instanceof StreamingSetStateEvent) {
277: StreamingSetStateEvent evt = (StreamingSetStateEvent) msgReceived;
278: if (evt.getStateId() != null) {
279: this .setState(evt.getStateId(), evt
280: .getArg());
281: } else {
282: this .setState(evt.getArg());
283: }
284: }
285: }
286:
287: } catch (TimeoutException e) {
288: } catch (Exception e) {
289: ch.close();
290: running = false;
291: }
292: }
293: }
294:
295: public void getState(OutputStream ostream) {
296: InputStream stream = Thread.currentThread()
297: .getContextClassLoader().getResourceAsStream(
298: "org/jgroups/JChannel.class");
299: System.out.println(Thread.currentThread() + " at "
300: + getAddress() + " is sending state of "
301: + (stateSize / MEGABYTE) + " MB");
302:
303: int markSize = 1024 * 100; //100K should be enough
304: byte buffer[] = new byte[bufferSize];
305: int bytesRead = -1;
306: int size = stateSize;
307: try {
308: while (size > 0) {
309: stream.mark(markSize);
310: bytesRead = stream.read(buffer);
311: ostream.write(buffer);
312: stream.reset();
313: size = size - bytesRead;
314: }
315: } catch (IOException e) {
316: e.printStackTrace();
317: } finally {
318: try {
319: ostream.flush();
320: ostream.close();
321: } catch (IOException e) {
322: e.printStackTrace();
323: }
324: }
325: }
326:
327: public void setState(InputStream istream) {
328: int totalRead = 0;
329: byte buffer[] = new byte[bufferSize];
330: int bytesRead = -1;
331: long start = System.currentTimeMillis();
332: try {
333: while ((bytesRead = istream.read(buffer)) >= 0) {
334: totalRead += bytesRead;
335: }
336: } catch (IOException e) {
337: e.printStackTrace();
338: } finally {
339: try {
340: istream.close();
341: } catch (IOException e) {
342: e.printStackTrace();
343: }
344: }
345: long readingTime = System.currentTimeMillis() - start;
346: System.out.println(Thread.currentThread() + " at "
347: + getAddress() + " read state of "
348: + (totalRead / MEGABYTE) + " MB in " + readingTime
349: + " msec");
350: }
351:
352: public void receive(Message msg) {
353: }
354:
355: public void setState(byte[] state) {
356: }
357:
358: public void viewAccepted(View new_view) {
359: }
360:
361: public void suspect(Address suspected_mbr) {
362: }
363:
364: public void block() {
365: // System.out.println("Block at " + ch.getLocalAddress());
366: }
367:
368: public void unblock() {
369: // System.out.println("Unblock at " + ch.getLocalAddress());
370: }
371:
372: public byte[] getState() {
373: return null;
374: }
375:
376: public byte[] getState(String state_id) {
377: return null;
378: }
379:
380: public void setState(String state_id, byte[] state) {
381: }
382:
383: public void getState(String state_id, OutputStream ostream) {
384: System.out
385: .println("Writing partial streaming state transfer for "
386: + state_id);
387: getState(ostream);
388: }
389:
390: public void setState(String state_id, InputStream istream) {
391: System.out
392: .println("Reading partial streaming state transfer for "
393: + state_id);
394: setState(istream);
395: }
396: }
397: }
|