001: package org.jgroups.tests;
002:
003: import java.io.IOException;
004: import java.io.InputStream;
005: import java.io.ObjectInputStream;
006: import java.io.ObjectOutputStream;
007: import java.io.OutputStream;
008: import java.util.LinkedList;
009: import java.util.List;
010: import java.util.Map;
011: import java.util.TreeMap;
012:
013: import junit.framework.Test;
014: import junit.framework.TestSuite;
015:
016: import org.jgroups.Channel;
017: import org.jgroups.JChannelFactory;
018: import org.jgroups.Message;
019: import org.jgroups.View;
020: import org.jgroups.util.Util;
021:
022: import EDU.oswego.cs.dl.util.concurrent.Semaphore;
023:
024: /**
025: * Tests concurrent startup with state transfer and concurrent state tranfer.
026: * @author bela
027: * @version $Id: ConcurrentStartupTest.java,v 1.16.2.1 2006/12/04 22:45:49 vlada Exp $
028: */
029: public class ConcurrentStartupTest extends ChannelTestBase {
030:
031: private int mod = 1;
032:
033: public void setUp() throws Exception {
034: super .setUp();
035: mod = 1;
036: CHANNEL_CONFIG = System.getProperty("channel.conf.flush",
037: "flush-udp.xml");
038: }
039:
040: public boolean useBlocking() {
041: return true;
042: }
043:
044: public void testConcurrentStartupLargeState() {
045: concurrentStartupHelper(true, false);
046: }
047:
048: public void testConcurrentStartupSmallState() {
049: concurrentStartupHelper(false, true);
050: }
051:
052: /**
053: * Tests concurrent startup and message sending directly after joining
054: * See doc/design/ConcurrentStartupTest.txt for details. This will only work 100% correctly once we have
055: * FLUSH support (JGroups 2.4)
056: *
057: * NOTE: This test is not guaranteed to pass at 100% rate until combined join
058: * and state transfer using one FLUSH phase is introduced (Jgroups 2.5)[1].
059: *
060: * [1] http://jira.jboss.com/jira/browse/JGRP-236
061: *
062: *
063: */
064: protected void concurrentStartupHelper(boolean largeState,
065: boolean useDispatcher) {
066: String[] names = null;
067:
068: //mux applications on top of same channel have to have unique name
069: if (isMuxChannelUsed()) {
070: names = createMuxApplicationNames(1);
071: } else {
072: names = new String[] { "A", "B", "C", "D" };
073: }
074:
075: int count = names.length;
076:
077: ConcurrentStartupChannel[] channels = new ConcurrentStartupChannel[count];
078: try {
079: // Create a semaphore and take all its permits
080: Semaphore semaphore = new Semaphore(count);
081: takeAllPermits(semaphore, count);
082:
083: // Create activation threads that will block on the semaphore
084: for (int i = 0; i < count; i++) {
085: if (largeState) {
086: if (isMuxChannelUsed()) {
087: channels[i] = new ConcurrentStartupChannelWithLargeState(
088: names[i], muxFactory[i
089: % getMuxFactoryCount()],
090: semaphore);
091: } else {
092: channels[i] = new ConcurrentStartupChannelWithLargeState(
093: semaphore, names[i], useDispatcher);
094: }
095: } else {
096:
097: if (isMuxChannelUsed()) {
098: channels[i] = new ConcurrentStartupChannel(
099: names[i], muxFactory[i
100: % getMuxFactoryCount()],
101: semaphore);
102: } else {
103: channels[i] = new ConcurrentStartupChannel(
104: names[i], semaphore, useDispatcher);
105: }
106: }
107:
108: // Release one ticket at a time to allow the thread to start working
109: channels[i].start();
110: semaphore.release(1);
111: sleepRandom(1500);
112: }
113:
114: // Make sure everyone is in sync
115: if (isMuxChannelUsed()) {
116: blockUntilViewsReceived(channels, getMuxFactoryCount(),
117: 60000);
118: } else {
119: blockUntilViewsReceived(channels, 60000);
120: }
121:
122: // Sleep to ensure the threads get all the semaphore tickets
123: sleepThread(1000);
124:
125: // Reacquire the semaphore tickets; when we have them all
126: // we know the threads are done
127: acquireSemaphore(semaphore, 60000, count);
128:
129: //Sleep to ensure async message arrive
130: sleepThread(3000);
131:
132: //do test verification
133: List[] lists = new List[count];
134: for (int i = 0; i < count; i++) {
135: lists[i] = channels[i].getList();
136: }
137:
138: Map[] mods = new Map[count];
139: for (int i = 0; i < count; i++) {
140: mods[i] = channels[i].getModifications();
141: }
142:
143: printLists(lists);
144: printModifications(mods);
145:
146: int len = lists.length;
147: for (int i = 0; i < lists.length; i++) {
148: List l = lists[i];
149: assertEquals("list #" + i + " should have " + len
150: + " elements", len, l.size());
151: }
152: } catch (Exception ex) {
153: log.warn("Exception encountered during test", ex);
154: } finally {
155: for (int i = 0; i < count; i++) {
156: sleepThread(500);
157: channels[i].cleanup();
158: }
159: }
160: }
161:
162: public void testConcurrentLargeStateTranfer() {
163: concurrentStateTranferHelper(true, false);
164: }
165:
166: public void testConcurrentSmallStateTranfer() {
167: concurrentStateTranferHelper(false, true);
168: }
169:
170: /**
171: * Tests concurrent state transfer. This test should pass at 100% rate when [1]
172: * is solved.
173: *
174: * [1]http://jira.jboss.com/jira/browse/JGRP-332
175: *
176: *
177: */
178: protected void concurrentStateTranferHelper(boolean largeState,
179: boolean useDispatcher) {
180: String[] names = null;
181:
182: //mux applications on top of same channel have to have unique name
183: if (isMuxChannelUsed()) {
184: names = createMuxApplicationNames(1);
185: } else {
186: names = new String[] { "A", "B", "C", "D" };
187: }
188:
189: int count = names.length;
190: ConcurrentStateTransfer[] channels = new ConcurrentStateTransfer[count];
191:
192: //Create a semaphore and take all its tickets
193: Semaphore semaphore = new Semaphore(count);
194: takeAllPermits(semaphore, count);
195:
196: try {
197:
198: // Create activation threads that will block on the semaphore
199: for (int i = 0; i < count; i++) {
200: if (largeState) {
201: if (isMuxChannelUsed()) {
202: channels[i] = new ConcurrentLargeStateTransfer(
203: names[i], muxFactory[i
204: % getMuxFactoryCount()],
205: semaphore);
206: } else {
207: channels[i] = new ConcurrentLargeStateTransfer(
208: names[i], semaphore, useDispatcher);
209: }
210: } else {
211: if (isMuxChannelUsed()) {
212: channels[i] = new ConcurrentStateTransfer(
213: names[i], muxFactory[i
214: % getMuxFactoryCount()],
215: semaphore);
216: } else {
217: channels[i] = new ConcurrentStateTransfer(
218: names[i], semaphore, useDispatcher);
219: }
220: }
221:
222: // Start threads and let them join the channel
223: channels[i].start();
224: sleepThread(2000);
225: }
226:
227: // Make sure everyone is in sync
228: if (isMuxChannelUsed()) {
229: blockUntilViewsReceived(channels, getMuxFactoryCount(),
230: 60000);
231: } else {
232: blockUntilViewsReceived(channels, 60000);
233: }
234:
235: sleepThread(2000);
236: //Unleash hell !
237: semaphore.release(count);
238:
239: // Sleep to ensure the threads get all the semaphore tickets
240: sleepThread(2000);
241:
242: //Reacquire the semaphore tickets; when we have them all
243: //we know the threads are done
244: acquireSemaphore(semaphore, 60000, count);
245:
246: //Sleep to ensure async message arrive
247: sleepThread(3000);
248: //do test verification
249: List[] lists = new List[count];
250: for (int i = 0; i < count; i++) {
251: lists[i] = channels[i].getList();
252: }
253:
254: Map[] mods = new Map[count];
255: for (int i = 0; i < count; i++) {
256: mods[i] = channels[i].getModifications();
257: }
258:
259: printLists(lists);
260: printModifications(mods);
261:
262: int len = lists.length;
263: for (int i = 0; i < lists.length; i++) {
264: List l = lists[i];
265: assertEquals("list #" + i + " should have " + len
266: + " elements", len, l.size());
267: }
268: } catch (Exception ex) {
269: log.warn("Exception encountered during test", ex);
270: } finally {
271: for (int i = 0; i < count; i++) {
272: sleepThread(500);
273: channels[i].cleanup();
274: }
275: }
276: }
277:
278: protected int getMod() {
279: synchronized (this ) {
280: int retval = mod;
281: mod++;
282: return retval;
283: }
284: }
285:
286: protected void printModifications(Map[] modifications) {
287: for (int i = 0; i < modifications.length; i++) {
288: Map modification = modifications[i];
289: log.info("modifications for #" + i + ": " + modification);
290: }
291: }
292:
293: protected void printLists(List[] lists) {
294: for (int i = 0; i < lists.length; i++) {
295: List l = lists[i];
296: log.info(i + ": " + l);
297: }
298: }
299:
300: protected class ConcurrentStateTransfer extends
301: ConcurrentStartupChannel {
302: public ConcurrentStateTransfer(String name,
303: Semaphore semaphore, boolean useDispatcher)
304: throws Exception {
305: super (name, semaphore, useDispatcher);
306: channel.connect("test");
307: }
308:
309: public ConcurrentStateTransfer(String name,
310: JChannelFactory factory, Semaphore semaphore)
311: throws Exception {
312: super (name, factory, semaphore);
313: channel.connect("test");
314: }
315:
316: public void useChannel() throws Exception {
317: boolean success = channel.getState(null, 30000);
318: log.info("channel.getState at " + getName()
319: + getLocalAddress() + " returned " + success);
320: channel.send(null, null, channel.getLocalAddress());
321: }
322: }
323:
324: protected class ConcurrentLargeStateTransfer extends
325: ConcurrentStateTransfer {
326: public ConcurrentLargeStateTransfer(String name,
327: Semaphore semaphore, boolean useDispatcher)
328: throws Exception {
329: super (name, semaphore, useDispatcher);
330: }
331:
332: public ConcurrentLargeStateTransfer(String name,
333: JChannelFactory factory, Semaphore semaphore)
334: throws Exception {
335: super (name, factory, semaphore);
336: }
337:
338: public void setState(byte[] state) {
339: super .setState(state);
340: sleepThread(5000);
341: }
342:
343: public byte[] getState() {
344: sleepThread(5000);
345: return super .getState();
346: }
347:
348: public void getState(OutputStream ostream) {
349: super .getState(ostream);
350: sleepThread(5000);
351: }
352:
353: public void setState(InputStream istream) {
354: super .setState(istream);
355: sleepThread(5000);
356: }
357: }
358:
359: protected class ConcurrentStartupChannelWithLargeState extends
360: ConcurrentStartupChannel {
361: public ConcurrentStartupChannelWithLargeState(
362: Semaphore semaphore, String name, boolean useDispatcher)
363: throws Exception {
364: super (name, semaphore, useDispatcher);
365: }
366:
367: public ConcurrentStartupChannelWithLargeState(String name,
368: JChannelFactory f, Semaphore semaphore)
369: throws Exception {
370: super (name, f, semaphore);
371: }
372:
373: public void setState(byte[] state) {
374: super .setState(state);
375: sleepThread(5000);
376: }
377:
378: public byte[] getState() {
379: sleepThread(5000);
380: return super .getState();
381: }
382:
383: public void getState(OutputStream ostream) {
384: super .getState(ostream);
385: sleepThread(5000);
386: }
387:
388: public void setState(InputStream istream) {
389: super .setState(istream);
390: sleepThread(5000);
391: }
392: }
393:
394: protected class ConcurrentStartupChannel extends
395: PushChannelApplicationWithSemaphore {
396: final List l = new LinkedList();
397:
398: Channel ch;
399:
400: int modCount = 1;
401:
402: final Map mods = new TreeMap();
403:
404: public ConcurrentStartupChannel(String name, Semaphore semaphore)
405: throws Exception {
406: super (name, semaphore, true);
407: }
408:
409: public ConcurrentStartupChannel(String name, JChannelFactory f,
410: Semaphore semaphore) throws Exception {
411: super (name, f, semaphore);
412: }
413:
414: public ConcurrentStartupChannel(String name,
415: Semaphore semaphore, boolean useDispatcher)
416: throws Exception {
417: super (name, semaphore, useDispatcher);
418: }
419:
420: public void useChannel() throws Exception {
421: channel.connect("test");
422: channel.getState(null, 25000);
423: channel.send(null, null, channel.getLocalAddress());
424: }
425:
426: List getList() {
427: return l;
428: }
429:
430: Map getModifications() {
431: return mods;
432: }
433:
434: public void receive(Message msg) {
435: if (msg.getBuffer() == null)
436: return;
437: Object obj = msg.getObject();
438: synchronized (this ) {
439: l.add(obj);
440: Integer key = new Integer(getMod());
441: mods.put(key, obj);
442: }
443: }
444:
445: public void viewAccepted(View new_view) {
446: super .viewAccepted(new_view);
447: synchronized (this ) {
448: Integer key = new Integer(getMod());
449: mods.put(key, new_view.getVid());
450: }
451: }
452:
453: public void setState(byte[] state) {
454: super .setState(state);
455: try {
456: List tmp = (List) Util.objectFromByteBuffer(state);
457: synchronized (this ) {
458: l.clear();
459: l.addAll(tmp);
460: log.info("-- [#" + getName() + " ("
461: + channel.getLocalAddress()
462: + ")]: state is " + l);
463: Integer key = new Integer(getMod());
464: mods.put(key, tmp);
465: }
466: } catch (Exception e) {
467: e.printStackTrace();
468: }
469: }
470:
471: public byte[] getState() {
472: super .getState();
473: List tmp = null;
474: synchronized (this ) {
475: tmp = new LinkedList(l);
476: try {
477: return Util.objectToByteBuffer(tmp);
478: } catch (Exception e) {
479: e.printStackTrace();
480: return null;
481: }
482: }
483: }
484:
485: public void getState(OutputStream ostream) {
486: super .getState(ostream);
487: ObjectOutputStream oos = null;
488: try {
489: oos = new ObjectOutputStream(ostream);
490: List tmp = null;
491: synchronized (this ) {
492: tmp = new LinkedList(l);
493: }
494: oos.writeObject(tmp);
495: oos.flush();
496: } catch (IOException e) {
497: e.printStackTrace();
498: } finally {
499: Util.close(oos);
500: }
501: }
502:
503: public void setState(InputStream istream) {
504: super .setState(istream);
505: ObjectInputStream ois = null;
506: try {
507: ois = new ObjectInputStream(istream);
508: List tmp = (List) ois.readObject();
509: synchronized (this ) {
510: l.clear();
511: l.addAll(tmp);
512: log.info("-- [#" + getName() + " ("
513: + channel.getLocalAddress()
514: + ")]: state is " + l);
515: Integer key = new Integer(getMod());
516: mods.put(key, tmp);
517: }
518: } catch (Exception e) {
519: e.printStackTrace();
520: } finally {
521: Util.close(ois);
522: }
523: }
524: }
525:
526: public static Test suite() {
527: return new TestSuite(ConcurrentStartupTest.class);
528: }
529:
530: public static void main(String[] args) {
531: String[] testCaseName = { ConcurrentStartupTest.class.getName() };
532: junit.textui.TestRunner.main(testCaseName);
533: }
534: }
|