001: package org.jgroups.tests;
002:
003: import java.io.IOException;
004: import java.io.InputStream;
005: import java.io.OutputStream;
006: import java.util.ArrayList;
007: import java.util.Collection;
008: import java.util.Collections;
009: import java.util.HashMap;
010: import java.util.Iterator;
011: import java.util.LinkedList;
012: import java.util.List;
013: import java.util.Map;
014: import java.util.Properties;
015:
016: import junit.framework.Test;
017: import junit.framework.TestSuite;
018:
019: import org.jgroups.Address;
020: import org.jgroups.BlockEvent;
021: import org.jgroups.Channel;
022: import org.jgroups.ChannelException;
023: import org.jgroups.Event;
024: import org.jgroups.ExtendedReceiverAdapter;
025: import org.jgroups.GetStateEvent;
026: import org.jgroups.JChannel;
027: import org.jgroups.JChannelFactory;
028: import org.jgroups.Message;
029: import org.jgroups.SetStateEvent;
030: import org.jgroups.UnblockEvent;
031: import org.jgroups.View;
032: import org.jgroups.mux.MuxChannel;
033: import org.jgroups.stack.Protocol;
034: import org.jgroups.util.Util;
035:
036: import EDU.oswego.cs.dl.util.concurrent.Semaphore;
037:
038: /**
039: * Tests the FLUSH protocol, requires flush-udp.xml in ./conf to be present and configured to use FLUSH
040: * @author Bela Ban
041: * @version $Id: FlushTest.java,v 1.16.2.2 2007/04/25 15:24:21 vlada Exp $
042: */
043: public class FlushTest extends ChannelTestBase {
044: public FlushTest() {
045: super ();
046: // TODO Auto-generated constructor stub
047: }
048:
049: public FlushTest(String name) {
050: super (name);
051: // TODO Auto-generated constructor stub
052: }
053:
054: Channel c1, c2, c3;
055:
056: static final String CONFIG = "flush-udp.xml";
057:
058: public void setUp() throws Exception {
059: super .setUp();
060: CHANNEL_CONFIG = System.getProperty("channel.conf.flush",
061: "flush-udp.xml");
062: }
063:
064: public void tearDown() throws Exception {
065: if (c3 != null) {
066: c3.close();
067: assertFalse(c3.isOpen());
068: assertFalse(c3.isConnected());
069: c3 = null;
070: }
071:
072: if (c2 != null) {
073: c2.close();
074: assertFalse(c2.isOpen());
075: assertFalse(c2.isConnected());
076: c2 = null;
077: }
078:
079: if (c1 != null) {
080: c1.close();
081: assertFalse(c1.isOpen());
082: assertFalse(c1.isConnected());
083: c1 = null;
084: }
085:
086: Util.sleep(500);
087: super .tearDown();
088: }
089:
090: public boolean useBlocking() {
091: return true;
092: }
093:
094: public void testSingleChannel() throws Exception {
095: Semaphore s = new Semaphore(1);
096: FlushTestReceiver receivers[] = new FlushTestReceiver[] { new FlushTestReceiver(
097: "c1", s, false) };
098: receivers[0].start();
099: s.release(1);
100:
101: //Make sure everyone is in sync
102: blockUntilViewsReceived(receivers, 60000);
103:
104: // Sleep to ensure the threads get all the semaphore tickets
105: sleepThread(1000);
106:
107: // Reacquire the semaphore tickets; when we have them all
108: // we know the threads are done
109: try {
110: acquireSemaphore(s, 60000, 1);
111: } catch (Exception e) {
112: e.printStackTrace();
113: } finally {
114: receivers[0].cleanup();
115: sleepThread(1000);
116: }
117:
118: checkEventSequence(receivers[0], false);
119:
120: }
121:
122: /**
123: * Tests issue #1 in http://jira.jboss.com/jira/browse/JGRP-335
124: */
125: public void testJoinFollowedByUnicast() throws ChannelException {
126: c1 = createChannel();
127: c1.setReceiver(new SimpleReplier(c1, true));
128: c1.connect("test");
129:
130: Address target = c1.getLocalAddress();
131: Message unicast_msg = new Message(target);
132:
133: c2 = createChannel();
134: c2.setReceiver(new SimpleReplier(c2, false));
135: c2.connect("test");
136:
137: // now send unicast, this might block as described in the case
138: c2.send(unicast_msg);
139: // if we don't get here this means we'd time out
140: }
141:
142: /**
143: * Tests issue #2 in http://jira.jboss.com/jira/browse/JGRP-335
144: */
145: public void testStateTransferFollowedByUnicast()
146: throws ChannelException {
147: c1 = createChannel();
148: c1.setReceiver(new SimpleReplier(c1, true));
149: c1.connect("test");
150:
151: Address target = c1.getLocalAddress();
152: Message unicast_msg = new Message(target);
153:
154: c2 = createChannel();
155: c2.setReceiver(new SimpleReplier(c2, false));
156: c2.connect("test");
157:
158: // Util.sleep(100);
159: log.info("\n** Getting the state **");
160: c2.getState(null, 10000);
161: // now send unicast, this might block as described in the case
162: c2.send(unicast_msg);
163: }
164:
165: /**
166: * Tests emition of block/unblock/get|set state events in both mux and bare
167: * channel mode. In mux mode this test creates getFactoryCount() real channels
168: * and creates only one mux application on top of each channel. In bare
169: * channel mode 4 real channels are created.
170: *
171: */
172: public void testBlockingNoStateTransfer() {
173: String[] names = null;
174: if (isMuxChannelUsed()) {
175: names = createMuxApplicationNames(1);
176: testChannels(names, false, getMuxFactoryCount());
177: } else {
178: names = createApplicationNames(4);
179: testChannels(names, false, 4);
180: }
181: }
182:
183: /**
184: * Tests emition of block/unblock/get|set state events in mux mode. In this
185: * test all mux applications share the same "real" channel. This test runs
186: * only when mux.on=true. This test does not take into account
187: * -Dmux.factorycount parameter.
188: *
189: */
190: public void testBlockingSharedMuxFactory() {
191: String[] names = null;
192: int muxFactoryCount = 1;
193: if (isMuxChannelUsed()) {
194: names = createMuxApplicationNames(4, muxFactoryCount);
195: testChannels(names, muxFactoryCount, false,
196: new ChannelAssertable(1));
197: }
198: }
199:
200: /**
201: * Tests emition of block/unblock/get|set state events in mux mode. In this
202: * test there will be exactly two real channels created where each real
203: * channel has two mux applications on top of it. This test runs
204: * only when mux.on=true. This test does not take into account
205: * -Dmux.factorycount parameter.
206: *
207: */
208: public void testBlockingUnsharedMuxFactoryMultipleService() {
209: String[] names = null;
210: int muxFactoryCount = 2;
211: if (isMuxChannelUsed()) {
212: names = createMuxApplicationNames(2, muxFactoryCount);
213: testChannels(names, muxFactoryCount, false,
214: new ChannelAssertable(2));
215: }
216: }
217:
218: /**
219: * Tests emition of block/unblock/set|get state events for both
220: * mux and bare channel depending on mux.on parameter. In mux mode there
221: * will be only one mux channel for each "real" channel created and the
222: * number of real channels created is getMuxFactoryCount().
223: *
224: */
225: public void testBlockingWithStateTransfer() {
226: String[] names = null;
227: if (isMuxChannelUsed()) {
228: names = createMuxApplicationNames(1);
229: testChannels(names, true, getMuxFactoryCount());
230: } else {
231: names = createApplicationNames(4);
232: testChannels(names, true, 4);
233: }
234: }
235:
236: /**
237: * Tests emition of block/unblock/set|get state events in mux mode setup
238: * where each "real" channel has two mux service on top of it. The
239: * number of real channels created is getMuxFactoryCount(). This test runs
240: * only when mux.on=true.
241: *
242: */
243: public void testBlockingWithStateTransferAndMultipleServiceMuxChannel() {
244: String[] names = null;
245: if (isMuxChannelUsed()) {
246: names = createMuxApplicationNames(2);
247: testChannels(names, true, getMuxFactoryCount());
248: }
249: }
250:
251: private void testChannels(String names[], int muxFactoryCount,
252: boolean useTransfer, Assertable a) {
253: int count = names.length;
254:
255: ArrayList channels = new ArrayList(count);
256: try {
257: // Create a semaphore and take all its permits
258: Semaphore semaphore = new Semaphore(count);
259: takeAllPermits(semaphore, count);
260:
261: // Create channels and their threads that will block on the semaphore
262: for (int i = 0; i < count; i++) {
263: FlushTestReceiver channel = null;
264: if (isMuxChannelUsed()) {
265: channel = new FlushTestReceiver(names[i],
266: muxFactory[i % muxFactoryCount], semaphore,
267: useTransfer);
268: } else {
269: channel = new FlushTestReceiver(names[i],
270: semaphore, useTransfer);
271: }
272: channels.add(channel);
273:
274: // Release one ticket at a time to allow the thread to start working
275: channel.start();
276: if (!useTransfer) {
277: semaphore.release(1);
278: }
279: sleepThread(2000);
280: }
281:
282: if (isMuxChannelUsed()) {
283: blockUntilViewsReceived(channels, muxFactoryCount,
284: 60000);
285: } else {
286: blockUntilViewsReceived(channels, 60000);
287: }
288:
289: //if state transfer is used release all at once
290: //clear all channels of view events
291: if (useTransfer) {
292: for (Iterator iter = channels.iterator(); iter
293: .hasNext();) {
294: FlushTestReceiver app = (FlushTestReceiver) iter
295: .next();
296: app.clear();
297:
298: }
299: semaphore.release(count);
300: }
301:
302: // Sleep to ensure the threads get all the semaphore tickets
303: sleepThread(1000);
304:
305: // Reacquire the semaphore tickets; when we have them all
306: // we know the threads are done
307: acquireSemaphore(semaphore, 60000, count);
308:
309: //do general asserts about channels
310: a.verify(channels);
311:
312: //kill random member
313: FlushTestReceiver randomRecv = (FlushTestReceiver) channels
314: .remove(RANDOM.nextInt(count));
315: log.info("Closing random member " + randomRecv.getName()
316: + " at " + randomRecv.getLocalAddress());
317: ChannelCloseAssertable closeAssert = new ChannelCloseAssertable(
318: randomRecv);
319: randomRecv.cleanup();
320:
321: //let the view propagate and verify related asserts
322: sleepThread(5000);
323: closeAssert.verify(channels);
324:
325: //verify block/unblock/view/get|set state sequence
326:
327: for (Iterator iter = channels.iterator(); iter.hasNext();) {
328: FlushTestReceiver receiver = (FlushTestReceiver) iter
329: .next();
330: if (useTransfer) {
331: checkEventStateTransferSequence(receiver);
332: } else {
333: checkEventSequence(receiver, isMuxChannelUsed());
334: }
335: }
336: } catch (Exception ex) {
337: log.warn("Exception encountered during test", ex);
338: fail("Exception encountered during test execution");
339: } finally {
340: for (Iterator iter = channels.iterator(); iter.hasNext();) {
341: FlushTestReceiver app = (FlushTestReceiver) iter.next();
342: app.cleanup();
343: sleepThread(500);
344: }
345: }
346: }
347:
348: public void testChannels(String names[], boolean useTransfer,
349: int viewSize) {
350: testChannels(names, getMuxFactoryCount(), useTransfer,
351: new ChannelAssertable(viewSize));
352: }
353:
354: private class ChannelCloseAssertable implements Assertable {
355: ChannelApplication app;
356: View viewBeforeClose;
357: Address appAddress;
358: String muxId;
359:
360: public ChannelCloseAssertable(ChannelApplication app) {
361: this .app = app;
362: this .viewBeforeClose = app.getChannel().getView();
363: appAddress = app.getChannel().getLocalAddress();
364: if (app.isUsingMuxChannel()) {
365: MuxChannel mch = (MuxChannel) app.getChannel();
366: muxId = mch.getId();
367: }
368: }
369:
370: public void verify(Object verifiable) {
371: Collection channels = (Collection) verifiable;
372: Channel ch = app.getChannel();
373: assertFalse("Channel open", ch.isOpen());
374: assertFalse("Chnanel connected", ch.isConnected());
375:
376: //if this channel had more than one member then verify that
377: //the other member does not have departed member in its view
378: if (viewBeforeClose.getMembers().size() > 1) {
379: for (Iterator iter = channels.iterator(); iter
380: .hasNext();) {
381: FlushTestReceiver receiver = (FlushTestReceiver) iter
382: .next();
383: Channel channel = receiver.getChannel();
384: boolean pairServiceFound = (receiver
385: .isUsingMuxChannel() && muxId
386: .equals(((MuxChannel) channel).getId()));
387: if (pairServiceFound
388: || !receiver.isUsingMuxChannel()) {
389: assertTrue("Removed from view, address "
390: + appAddress + " view is "
391: + channel.getView(), !channel.getView()
392: .getMembers().contains(appAddress));
393: }
394: }
395: }
396: }
397: }
398:
399: private class ChannelAssertable implements Assertable {
400: int expectedViewSize = 0;
401:
402: public ChannelAssertable(int expectedViewSize) {
403: this .expectedViewSize = expectedViewSize;
404: }
405:
406: public void verify(Object verifiable) {
407: Collection channels = (Collection) verifiable;
408: for (Iterator iter = channels.iterator(); iter.hasNext();) {
409: FlushTestReceiver receiver = (FlushTestReceiver) iter
410: .next();
411: Channel ch = receiver.getChannel();
412: assertEquals("Correct view", ch.getView().getMembers()
413: .size(), expectedViewSize);
414: assertTrue("Channel open", ch.isOpen());
415: assertTrue("Chnanel connected", ch.isConnected());
416: assertNotNull("Valid address ", ch.getLocalAddress());
417: assertTrue("Address included in view ", ch.getView()
418: .getMembers().contains(ch.getLocalAddress()));
419: assertNotNull("Valid cluster name ", ch
420: .getClusterName());
421: }
422:
423: //verify views for pair services created on top of different "real" channels
424: if (expectedViewSize > 1 && isMuxChannelUsed()) {
425: for (Iterator iter = channels.iterator(); iter
426: .hasNext();) {
427: FlushTestReceiver receiver = (FlushTestReceiver) iter
428: .next();
429: MuxChannel ch = (MuxChannel) receiver.getChannel();
430: int servicePairs = 1;
431: for (Iterator it = channels.iterator(); it
432: .hasNext();) {
433: FlushTestReceiver receiver2 = (FlushTestReceiver) it
434: .next();
435: MuxChannel ch2 = (MuxChannel) receiver2
436: .getChannel();
437: if (ch.getId().equals(ch2.getId())
438: && !ch.getLocalAddress().equals(
439: ch2.getLocalAddress())) {
440: assertEquals(
441: "Correct view for service pair", ch
442: .getView(), ch2.getView());
443: assertTrue("Presence in view", ch.getView()
444: .getMembers().contains(
445: ch.getLocalAddress()));
446: assertTrue("Presence in view", ch.getView()
447: .getMembers().contains(
448: ch2.getLocalAddress()));
449: assertTrue("Presence in view", ch2
450: .getView().getMembers().contains(
451: ch2.getLocalAddress()));
452: assertTrue("Presence in view", ch2
453: .getView().getMembers().contains(
454: ch.getLocalAddress()));
455: servicePairs++;
456: }
457: }
458: assertEquals("Correct service count",
459: expectedViewSize, servicePairs);
460: }
461: }
462: }
463: }
464:
465: private void checkEventSequence(FlushTestReceiver receiver,
466: boolean isMuxUsed) {
467: List events = receiver.getEvents();
468: String eventString = "[" + receiver.getName() + "|"
469: + receiver.getLocalAddress() + ",events:" + events;
470: log.info(eventString);
471: assertNotNull(events);
472: int size = events.size();
473: for (int i = 0; i < size; i++) {
474: Object event = events.get(i);
475: if (event instanceof BlockEvent) {
476: if (i + 1 < size) {
477: Object ev = events.get(i + 1);
478: if (isMuxUsed) {
479: assertTrue(
480: "After Block should be View or Unblock"
481: + eventString,
482: ev instanceof View
483: || ev instanceof UnblockEvent);
484: } else {
485: assertTrue("After Block should be View "
486: + eventString,
487: events.get(i + 1) instanceof View);
488: }
489: }
490: if (i != 0) {
491: assertTrue("Before Block should be Unblock "
492: + eventString,
493: events.get(i - 1) instanceof UnblockEvent);
494: }
495: }
496: if (event instanceof View) {
497: if (i + 1 < size) {
498: assertTrue("After View should be Unblock "
499: + eventString,
500: events.get(i + 1) instanceof UnblockEvent);
501: }
502: assertTrue(
503: "Before View should be Block " + eventString,
504: events.get(i - 1) instanceof BlockEvent);
505: }
506: if (event instanceof UnblockEvent) {
507: if (i + 1 < size) {
508: assertTrue("After UnBlock should be Block "
509: + eventString,
510: events.get(i + 1) instanceof BlockEvent);
511: }
512:
513: Object ev = events.get(i - 1);
514: if (isMuxUsed) {
515: assertTrue("Before UnBlock should be View or Block"
516: + eventString, ev instanceof View
517: || ev instanceof BlockEvent);
518: } else {
519: assertTrue("Before UnBlock should be View "
520: + eventString,
521: events.get(i - 1) instanceof View);
522: }
523: }
524: }
525: receiver.clear();
526: }
527:
528: private void checkEventStateTransferSequence(
529: FlushTestReceiver receiver) {
530: List events = receiver.getEvents();
531: String eventString = "[" + receiver.getName() + ",events:"
532: + events;
533: log.info(eventString);
534: assertNotNull(events);
535: int size = events.size();
536: for (int i = 0; i < size; i++) {
537: Object event = events.get(i);
538: if (event instanceof BlockEvent) {
539: if (i + 1 < size) {
540: Object o = events.get(i + 1);
541: assertTrue(
542: "After Block should be state|unblock|view"
543: + eventString,
544: o instanceof SetStateEvent
545: || o instanceof GetStateEvent
546: || o instanceof UnblockEvent
547: || o instanceof View);
548: } else if (i != 0) {
549: Object o = events.get(i + 1);
550: assertTrue(
551: "Before Block should be state or Unblock "
552: + eventString,
553: o instanceof SetStateEvent
554: || o instanceof GetStateEvent
555: || o instanceof UnblockEvent);
556: }
557: }
558: if (event instanceof SetStateEvent
559: || event instanceof GetStateEvent) {
560: if (i + 1 < size) {
561: assertTrue("After state should be Unblock "
562: + eventString,
563: events.get(i + 1) instanceof UnblockEvent);
564: }
565: assertTrue("Before state should be Block "
566: + eventString,
567: events.get(i - 1) instanceof BlockEvent);
568: }
569:
570: if (event instanceof UnblockEvent) {
571: if (i + 1 < size) {
572: assertTrue("After UnBlock should be Block "
573: + eventString,
574: events.get(i + 1) instanceof BlockEvent);
575: } else {
576: Object o = events.get(size - 2);
577: assertTrue(
578: "Before UnBlock should be block|state|view "
579: + eventString,
580: o instanceof SetStateEvent
581: || o instanceof GetStateEvent
582: || o instanceof BlockEvent
583: || o instanceof View);
584: }
585: }
586:
587: }
588: receiver.clear();
589: }
590:
591: protected Channel createChannel() throws ChannelException {
592: Channel ret = new JChannel(CHANNEL_CONFIG);
593: ret.setOpt(Channel.BLOCK, Boolean.TRUE);
594: Protocol flush = ((JChannel) ret).getProtocolStack()
595: .findProtocol("FLUSH");
596: if (flush != null) {
597: Properties p = new Properties();
598: p.setProperty("timeout", "0");
599: flush.setProperties(p);
600:
601: // send timeout up and down the stack, so other protocols can use the same value too
602: Map map = new HashMap();
603: map.put("flush_timeout", new Long(0));
604: flush.passUp(new Event(Event.CONFIG, map));
605: flush.passDown(new Event(Event.CONFIG, map));
606: }
607: return ret;
608: }
609:
610: private interface Assertable {
611: public void verify(Object verifiable);
612: }
613:
614: private class FlushTestReceiver extends
615: PushChannelApplicationWithSemaphore {
616: List events;
617:
618: boolean shouldFetchState;
619:
620: protected FlushTestReceiver(String name, Semaphore semaphore,
621: boolean shouldFetchState) throws Exception {
622: super (name, semaphore);
623: this .shouldFetchState = shouldFetchState;
624: events = Collections.synchronizedList(new LinkedList());
625: channel.connect("test");
626: }
627:
628: protected FlushTestReceiver(String name,
629: JChannelFactory factory, Semaphore semaphore,
630: boolean shouldFetchState) throws Exception {
631: super (name, factory, semaphore);
632: this .shouldFetchState = shouldFetchState;
633: events = Collections.synchronizedList(new LinkedList());
634: channel.connect("test");
635: }
636:
637: public void clear() {
638: events.clear();
639: }
640:
641: public List getEvents() {
642: return new LinkedList(events);
643: }
644:
645: public void block() {
646: events.add(new BlockEvent());
647: }
648:
649: public void unblock() {
650: events.add(new UnblockEvent());
651: }
652:
653: public void viewAccepted(View new_view) {
654: events.add(new_view);
655: }
656:
657: public byte[] getState() {
658: events.add(new GetStateEvent(null, null));
659: return new byte[] { 'b', 'e', 'l', 'a' };
660: }
661:
662: public void setState(byte[] state) {
663: events.add(new SetStateEvent(null, null));
664: }
665:
666: public void getState(OutputStream ostream) {
667: events.add(new GetStateEvent(null, null));
668: byte[] payload = new byte[] { 'b', 'e', 'l', 'a' };
669: try {
670: ostream.write(payload);
671: } catch (IOException e) {
672: e.printStackTrace();
673: } finally {
674: Util.close(ostream);
675: }
676: }
677:
678: public void setState(InputStream istream) {
679: events.add(new SetStateEvent(null, null));
680: byte[] payload = new byte[4];
681: try {
682: istream.read(payload);
683: } catch (IOException e) {
684: e.printStackTrace();
685: } finally {
686: Util.close(istream);
687: }
688: }
689:
690: protected void useChannel() throws Exception {
691: if (shouldFetchState) {
692: channel.getState(null, 25000);
693: }
694: }
695: }
696:
697: private class SimpleReplier extends ExtendedReceiverAdapter {
698: Channel channel;
699:
700: boolean handle_requests = false;
701:
702: public SimpleReplier(Channel channel, boolean handle_requests) {
703: this .channel = channel;
704: this .handle_requests = handle_requests;
705: }
706:
707: public void receive(Message msg) {
708: Message reply = new Message(msg.getSrc());
709: try {
710: log.info("-- MySimpleReplier["
711: + channel.getLocalAddress()
712: + "]: received message from " + msg.getSrc());
713: if (handle_requests) {
714: log.info(", sending reply");
715: channel.send(reply);
716: } else
717: System.out.println("\n");
718: } catch (Exception e) {
719: e.printStackTrace();
720: }
721: }
722:
723: public void viewAccepted(View new_view) {
724: log.info("-- MySimpleReplier[" + channel.getLocalAddress()
725: + "]: viewAccepted(" + new_view + ")");
726: }
727:
728: public void block() {
729: log.info("-- MySimpleReplier[" + channel.getLocalAddress()
730: + "]: block()");
731: }
732:
733: public void unblock() {
734: log.info("-- MySimpleReplier[" + channel.getLocalAddress()
735: + "]: unblock()");
736: }
737: }
738:
739: public static Test suite() {
740: return new TestSuite(FlushTest.class);
741: }
742:
743: public static void main(String[] args) {
744: junit.textui.TestRunner.run(FlushTest.suite());
745: }
746: }
|