001: package org.jgroups.tests;
002:
003: import junit.framework.Test;
004: import junit.framework.TestSuite;
005: import org.jgroups.*;
006: import org.jgroups.util.Util;
007: import org.jgroups.mux.MuxChannel;
008:
009: import java.util.List;
010: import java.util.LinkedList;
011:
012: /**
013: * Test the multiplexer functionality provided by JChannelFactory, especially the service views and cluster views
014: * @author Bela Ban
015: * @version $Id: MultiplexerViewTest.java,v 1.10.2.1 2006/12/04 22:45:49 vlada Exp $
016: */
017: public class MultiplexerViewTest extends ChannelTestBase {
018: private Channel c1, c2, c3, c4;
019: static final BlockEvent BLOCK_EVENT = new BlockEvent();
020: static final UnblockEvent UNBLOCK_EVENT = new UnblockEvent();
021: JChannelFactory factory, factory2;
022:
023: public MultiplexerViewTest(String name) {
024: super (name);
025: }
026:
027: public void setUp() throws Exception {
028: super .setUp();
029: factory = new JChannelFactory();
030: factory.setMultiplexerConfig(MUX_CHANNEL_CONFIG);
031:
032: factory2 = new JChannelFactory();
033: factory2.setMultiplexerConfig(MUX_CHANNEL_CONFIG);
034: }
035:
036: public void tearDown() throws Exception {
037: Util.sleep(1000);
038: if (c2 != null)
039: c2.close();
040: if (c1 != null)
041: c1.close();
042: factory.destroy();
043: factory2.destroy();
044: super .tearDown();
045: }
046:
047: public void testBasicLifeCycle() throws Exception {
048: c1 = factory.createMultiplexerChannel(
049: MUX_CHANNEL_CONFIG_STACK_NAME, "service-1");
050: System.out.println("c1: " + c1);
051: assertTrue(c1.isOpen());
052: assertFalse(c1.isConnected());
053: View v = c1.getView();
054: assertNull(v);
055:
056: ((MuxChannel) c1).getClusterView();
057: assertNull(v);
058: Address local_addr = c1.getLocalAddress();
059: assertNull(local_addr);
060:
061: c1.connect("bla");
062: assertTrue(c1.isConnected());
063: local_addr = c1.getLocalAddress();
064: assertNotNull(local_addr);
065: v = c1.getView();
066: assertNotNull(v);
067: assertEquals(1, v.size());
068: v = ((MuxChannel) c1).getClusterView();
069: assertNotNull(v);
070: assertEquals(1, v.size());
071:
072: c1.disconnect();
073: assertFalse(c1.isConnected());
074: assertTrue(c1.isOpen());
075: local_addr = c1.getLocalAddress();
076: assertNull(local_addr);
077: c1.close();
078: assertFalse(c1.isOpen());
079: }
080:
081: public void testBlockPush() throws Exception {
082: c1 = factory.createMultiplexerChannel(
083: MUX_CHANNEL_CONFIG_STACK_NAME, "service-1");
084: c1.setOpt(Channel.BLOCK, Boolean.TRUE);
085: MyReceiver receiver = new MyReceiver();
086: c1.setReceiver(receiver);
087: c1.connect("bla");
088:
089: c2 = factory2.createMultiplexerChannel(
090: MUX_CHANNEL_CONFIG_STACK_NAME, "service-1");
091: c2.setOpt(Channel.BLOCK, Boolean.TRUE);
092: MyReceiver receiver2 = new MyReceiver();
093: c2.setReceiver(receiver2);
094: c2.connect("bla");
095:
096: //let async block events propagate
097: Util.sleep(1000);
098:
099: List events = receiver.getEvents();
100: int num_events = events.size();
101: System.out.println("-- receiver: " + events);
102: assertEquals(
103: "we should have a BLOCK, UNBLOCK, BLOCK, UNBLOCK,BLOCK, UNBLOCK, BLOCK, UNBLOCK sequence",
104: 8, num_events);
105: Object evt = events.remove(0);
106: assertTrue("evt=" + evt, evt instanceof BlockEvent);
107: evt = events.remove(0);
108: assertTrue("evt=" + evt, evt instanceof UnblockEvent);
109: evt = events.remove(0);
110: assertTrue("evt=" + evt, evt instanceof BlockEvent);
111: evt = events.remove(0);
112: assertTrue("evt=" + evt, evt instanceof UnblockEvent);
113:
114: events = receiver2.getEvents();
115: num_events = events.size();
116: System.out.println("-- receiver2: " + events);
117: evt = events.remove(0);
118: assertTrue(evt instanceof BlockEvent);
119: evt = events.remove(0);
120: assertTrue(evt instanceof UnblockEvent);
121: }
122:
123: public void testBlockPush2() throws Exception {
124: c1 = factory.createMultiplexerChannel(
125: MUX_CHANNEL_CONFIG_STACK_NAME, "service-1");
126: c1.setOpt(Channel.BLOCK, Boolean.TRUE);
127: MyReceiver receiver = new MyReceiver();
128: c1.setReceiver(receiver);
129: c1.connect("bla");
130: // Util.sleep(500);
131:
132: c2 = factory.createMultiplexerChannel(
133: MUX_CHANNEL_CONFIG_STACK_NAME, "service-2");
134: c2.setOpt(Channel.BLOCK, Boolean.TRUE);
135: MyReceiver receiver2 = new MyReceiver();
136: c2.setReceiver(receiver2);
137: c2.connect("bla");
138: // Util.sleep(500);
139:
140: c3 = factory.createMultiplexerChannel(
141: MUX_CHANNEL_CONFIG_STACK_NAME, "service-3");
142: c3.setOpt(Channel.BLOCK, Boolean.TRUE);
143: MyReceiver receiver3 = new MyReceiver();
144: c3.setReceiver(receiver3);
145: c3.connect("bla");
146: // Util.sleep(500);
147:
148: c4 = factory2.createMultiplexerChannel(
149: MUX_CHANNEL_CONFIG_STACK_NAME, "service-3");
150: c4.setOpt(Channel.BLOCK, Boolean.TRUE);
151: MyReceiver receiver4 = new MyReceiver();
152: c4.setReceiver(receiver4);
153: c4.connect("bla");
154:
155: //let asynch block/unblock events propagate
156: Util.sleep(1000);
157: List events = receiver.getEvents();
158: checkBlockAndUnBlock(events, "receiver",
159: new Object[] { BLOCK_EVENT, UNBLOCK_EVENT, BLOCK_EVENT,
160: UNBLOCK_EVENT, BLOCK_EVENT, UNBLOCK_EVENT,
161: BLOCK_EVENT, UNBLOCK_EVENT, BLOCK_EVENT,
162: UNBLOCK_EVENT, BLOCK_EVENT, UNBLOCK_EVENT });
163:
164: events = receiver2.getEvents();
165: checkBlockAndUnBlock(events, "receiver2",
166: new Object[] { BLOCK_EVENT, UNBLOCK_EVENT, BLOCK_EVENT,
167: UNBLOCK_EVENT, BLOCK_EVENT, UNBLOCK_EVENT,
168: BLOCK_EVENT, UNBLOCK_EVENT, });
169:
170: events = receiver3.getEvents();
171: checkBlockAndUnBlock(events, "receiver3", new Object[] {
172: BLOCK_EVENT, UNBLOCK_EVENT, BLOCK_EVENT, UNBLOCK_EVENT,
173: BLOCK_EVENT, UNBLOCK_EVENT });
174:
175: events = receiver4.getEvents();
176: System.out.println("-- [receiver4] events: " + events);
177: // now the new joiner should *not* have a block event !
178: // assertFalse("new joiner should not have a BlockEvent", events.contains(new BlockEvent()));
179: checkBlockAndUnBlock(events, "receiver4",
180: new Object[] { BLOCK_EVENT, UNBLOCK_EVENT, BLOCK_EVENT,
181: UNBLOCK_EVENT });
182:
183: receiver.clear();
184: receiver2.clear();
185: receiver3.clear();
186: receiver4.clear();
187:
188: System.out.println("-- Closing c4");
189:
190: c4.close();
191: //close under FLUSH is not *synchronous* as connect/getState
192: //so we have to sleep here
193: Util.sleep(5000);
194:
195: events = receiver.getEvents();
196: checkBlockAndUnBlock(events, "receiver",
197: new Object[] { BLOCK_EVENT, UNBLOCK_EVENT, BLOCK_EVENT,
198: UNBLOCK_EVENT });
199:
200: events = receiver2.getEvents();
201: checkBlockAndUnBlock(events, "receiver2",
202: new Object[] { BLOCK_EVENT, UNBLOCK_EVENT, BLOCK_EVENT,
203: UNBLOCK_EVENT });
204:
205: events = receiver3.getEvents();
206: checkBlockAndUnBlock(events, "receiver3",
207: new Object[] { BLOCK_EVENT, UNBLOCK_EVENT, BLOCK_EVENT,
208: UNBLOCK_EVENT });
209:
210: events = receiver4.getEvents();
211: System.out.println("-- [receiver4] events: " + events);
212: // now the new joiner should *not* have a block event !
213: assertFalse("new joiner should not have a BlockEvent", events
214: .contains(new BlockEvent()));
215:
216: }
217:
218: private void checkBlockAndUnBlock(List events, String service_name,
219: Object[] seq) {
220: int num_events = events.size();
221: System.out.println("-- [" + service_name + "] events: "
222: + events);
223: assertEquals("[" + service_name
224: + "] we should have the following sequence: "
225: + Util.array2String(seq), seq.length, num_events);
226:
227: Object expected_type;
228: Object actual_type;
229: for (int i = 0; i < seq.length; i++) {
230: expected_type = seq[i];
231: actual_type = events.remove(0);
232: assertEquals("element should be "
233: + expected_type.getClass().getName(), actual_type
234: .getClass(), expected_type.getClass());
235: }
236:
237: }
238:
239: public void testTwoServicesOneChannel() throws Exception {
240: c1 = factory.createMultiplexerChannel(
241: MUX_CHANNEL_CONFIG_STACK_NAME, "service-1");
242: c2 = factory.createMultiplexerChannel(
243: MUX_CHANNEL_CONFIG_STACK_NAME, "service-2");
244: c1.connect("bla");
245: c2.connect("blo");
246:
247: View v = ((MuxChannel) c1).getClusterView(), v2 = ((MuxChannel) c2)
248: .getClusterView();
249: assertNotNull(v);
250: assertNotNull(v2);
251: assertEquals(v, v2);
252: assertEquals(1, v.size());
253: assertEquals(1, v2.size());
254:
255: v = c1.getView();
256: v2 = c2.getView();
257: assertNotNull(v);
258: assertNotNull(v2);
259: assertEquals(v, v2);
260: assertEquals(1, v.size());
261: assertEquals(1, v2.size());
262: }
263:
264: public void testTwoServicesTwoChannels() throws Exception {
265: View v, v2;
266: c1 = factory.createMultiplexerChannel(
267: MUX_CHANNEL_CONFIG_STACK_NAME, "service-1");
268: c2 = factory.createMultiplexerChannel(
269: MUX_CHANNEL_CONFIG_STACK_NAME, "service-2");
270: c1.connect("bla");
271: c2.connect("blo");
272:
273: c3 = factory2.createMultiplexerChannel(
274: MUX_CHANNEL_CONFIG_STACK_NAME, "service-1");
275: c4 = factory2.createMultiplexerChannel(
276: MUX_CHANNEL_CONFIG_STACK_NAME, "service-2");
277: c3.connect("foo");
278:
279: Util.sleep(500);
280: v = ((MuxChannel) c1).getClusterView();
281: v2 = ((MuxChannel) c3).getClusterView();
282: assertNotNull(v);
283: assertNotNull(v2);
284: assertEquals(2, v2.size());
285: assertEquals(v, v2);
286:
287: v = c1.getView();
288: v2 = c3.getView();
289: assertNotNull(v);
290: assertNotNull(v2);
291: assertEquals(2, v2.size());
292: assertEquals(v, v2);
293:
294: v = c2.getView();
295: assertEquals(1, v.size()); // c4 has not joined yet
296:
297: c4.connect("bar");
298:
299: Util.sleep(500);
300: v = c2.getView();
301: v2 = c4.getView();
302: assertEquals(2, v.size());
303: assertEquals(v, v2);
304:
305: c3.disconnect();
306:
307: Util.sleep(500);
308: v = c1.getView();
309: assertEquals(1, v.size());
310: v = c2.getView();
311: assertEquals(2, v.size());
312: v = c4.getView();
313: assertEquals(2, v.size());
314:
315: c3.close();
316: c2.close();
317: Util.sleep(500);
318: v = c4.getView();
319: assertEquals(1, v.size());
320: }
321:
322: public void testReconnect() throws Exception {
323: View v;
324: c1 = factory.createMultiplexerChannel(
325: MUX_CHANNEL_CONFIG_STACK_NAME, "service-1");
326: c1.connect("bla");
327:
328: c3 = factory2.createMultiplexerChannel(
329: MUX_CHANNEL_CONFIG_STACK_NAME, "service-1");
330: c4 = factory2.createMultiplexerChannel(
331: MUX_CHANNEL_CONFIG_STACK_NAME, "service-2");
332: c3.connect("foo");
333: c4.connect("bar");
334:
335: Util.sleep(500);
336: v = c1.getView();
337: assertEquals(2, v.size());
338: c3.disconnect();
339:
340: Util.sleep(500);
341: v = c1.getView();
342: assertEquals(1, v.size());
343:
344: c3.connect("foobar");
345: Util.sleep(500);
346: v = c1.getView();
347: assertEquals(2, v.size());
348:
349: c4.close();
350: Util.sleep(500);
351: v = c1.getView();
352: assertEquals(2, v.size());
353: }
354:
355: public static Test suite() {
356: return new TestSuite(MultiplexerViewTest.class);
357: }
358:
359: public static void main(String[] args) {
360: junit.textui.TestRunner.run(MultiplexerViewTest.suite());
361: }
362:
363: private static class MyReceiver extends ExtendedReceiverAdapter {
364: List events = new LinkedList();
365:
366: public List getEvents() {
367: return events;
368: }
369:
370: public void clear() {
371: events.clear();
372: }
373:
374: public void block() {
375: events.add(new BlockEvent());
376: }
377:
378: public void unblock() {
379: events.add(new UnblockEvent());
380: }
381:
382: public void viewAccepted(View new_view) {
383: }
384: }
385: }
|