0001: package org.jgroups.tests;
0002:
0003: import junit.framework.Test;
0004: import junit.framework.TestSuite;
0005: import org.jgroups.*;
0006: import org.jgroups.mux.MuxChannel;
0007: import org.jgroups.stack.IpAddress;
0008: import org.jgroups.stack.ProtocolStack;
0009: import org.jgroups.stack.Protocol;
0010: import org.jgroups.util.Util;
0011:
0012: import java.util.*;
0013: import java.io.*;
0014:
0015: /**
0016: * Test the multiplexer functionality provided by JChannelFactory
0017: * @author Bela Ban
0018: * @version $Id: MultiplexerTest.java,v 1.31.2.1 2006/12/04 22:45:49 vlada Exp $
0019: */
0020: public class MultiplexerTest extends ChannelTestBase {
0021: private Cache c1, c2, c1_repl, c2_repl;
0022: private Channel ch1, ch2, ch1_repl, ch2_repl;
0023: JChannelFactory factory, factory2;
0024:
0025: public MultiplexerTest(String name) {
0026: super (name);
0027: }
0028:
0029: public void setUp() throws Exception {
0030: super .setUp();
0031: factory = new JChannelFactory();
0032: factory.setMultiplexerConfig(MUX_CHANNEL_CONFIG);
0033:
0034: factory2 = new JChannelFactory();
0035: factory2.setMultiplexerConfig(MUX_CHANNEL_CONFIG);
0036: }
0037:
0038: public void tearDown() throws Exception {
0039: if (ch1_repl != null)
0040: ch1_repl.close();
0041: if (ch2_repl != null)
0042: ch2_repl.close();
0043: if (ch1 != null)
0044: ch1.close();
0045: if (ch2 != null)
0046: ch2.close();
0047: if (ch1 != null) {
0048: assertFalse(((MuxChannel) ch1).getChannel().isOpen());
0049: assertFalse(((MuxChannel) ch1).getChannel().isConnected());
0050: }
0051: if (ch2 != null) {
0052: assertFalse(((MuxChannel) ch2).getChannel().isOpen());
0053: assertFalse(((MuxChannel) ch2).getChannel().isConnected());
0054: }
0055: if (ch1_repl != null) {
0056: assertFalse(((MuxChannel) ch1_repl).getChannel().isOpen());
0057: assertFalse(((MuxChannel) ch1_repl).getChannel()
0058: .isConnected());
0059: }
0060: if (ch2_repl != null) {
0061: assertFalse(((MuxChannel) ch2_repl).getChannel().isOpen());
0062: assertFalse(((MuxChannel) ch2_repl).getChannel()
0063: .isConnected());
0064: }
0065:
0066: if (c1 != null)
0067: c1.clear();
0068: if (c2 != null)
0069: c2.clear();
0070: if (c1_repl != null)
0071: c1_repl.clear();
0072: if (c2_repl != null)
0073: c2_repl.clear();
0074:
0075: ch1_repl = ch2_repl = ch1 = ch2 = null;
0076: c1 = c2 = c1_repl = c2_repl = null;
0077:
0078: super .tearDown();
0079: }
0080:
0081: public void testReplicationWithOneChannel() throws Exception {
0082: ch1 = factory.createMultiplexerChannel(
0083: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0084: ch1.connect("bla");
0085: c1 = new Cache(ch1, "cache-1");
0086: assertEquals("cache has to be empty initially", 0, c1.size());
0087: c1.put("name", "Bela");
0088: Util.sleep(300); // we need to wait because replication is asynchronous here
0089: assertEquals(1, c1.size());
0090: assertEquals("Bela", c1.get("name"));
0091: }
0092:
0093: public void testLifecycle() throws Exception {
0094: ch1 = factory.createMultiplexerChannel(
0095: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0096: assertTrue(ch1.isOpen());
0097: assertFalse(ch1.isConnected());
0098:
0099: ch1.connect("bla");
0100: assertTrue(ch1.isOpen());
0101: assertTrue(ch1.isConnected());
0102:
0103: ch2 = factory.createMultiplexerChannel(
0104: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0105: assertTrue(ch2.isOpen());
0106: assertFalse(ch2.isConnected());
0107:
0108: ch2.connect("bla");
0109: assertTrue(ch2.isOpen());
0110: assertTrue(ch2.isConnected());
0111:
0112: ch2.disconnect();
0113: assertTrue(ch2.isOpen());
0114: assertFalse(ch2.isConnected());
0115:
0116: ch2.connect("bla");
0117: assertTrue(ch2.isOpen());
0118: assertTrue(ch2.isConnected());
0119:
0120: ch2.disconnect();
0121: assertTrue(ch2.isOpen());
0122: assertFalse(ch2.isConnected());
0123:
0124: ch2.close();
0125: assertFalse(ch2.isOpen());
0126: assertFalse(ch2.isConnected());
0127:
0128: ch2 = factory.createMultiplexerChannel(
0129: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0130: ch2.connect("bla");
0131: assertTrue(ch2.isOpen());
0132: assertTrue(ch2.isConnected());
0133:
0134: ch2.close();
0135: assertFalse(ch2.isOpen());
0136: assertFalse(ch2.isConnected());
0137: }
0138:
0139: public void testDisconnect() throws Exception {
0140: ch1 = factory.createMultiplexerChannel(
0141: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0142: assertTrue(ch1.isOpen());
0143: assertFalse(ch1.isConnected());
0144: assertTrue(((MuxChannel) ch1).getChannel().isOpen());
0145: assertFalse(((MuxChannel) ch1).getChannel().isConnected());
0146:
0147: ch1.connect("bla");
0148: assertTrue(ch1.isOpen());
0149: assertTrue(ch1.isConnected());
0150: assertTrue(((MuxChannel) ch1).getChannel().isOpen());
0151: assertTrue(((MuxChannel) ch1).getChannel().isConnected());
0152:
0153: ch2 = factory.createMultiplexerChannel(
0154: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0155: assertTrue(ch2.isOpen());
0156: assertFalse(ch2.isConnected());
0157:
0158: ch1.disconnect();
0159: assertTrue(ch1.isOpen());
0160: assertFalse(ch1.isConnected());
0161:
0162: ch1.connect("bla");
0163: assertTrue(ch1.isOpen());
0164: assertTrue(ch1.isConnected());
0165:
0166: ch1.close();
0167: assertFalse(ch1.isOpen());
0168: assertFalse(ch1.isConnected());
0169: assertTrue(((MuxChannel) ch1).getChannel().isOpen());
0170: assertTrue(((MuxChannel) ch1).getChannel().isConnected());
0171:
0172: ch2.close();
0173: assertFalse(ch2.isOpen());
0174: assertFalse(ch2.isConnected());
0175: }
0176:
0177: public void testDisconnect2() throws Exception {
0178: ch1 = factory.createMultiplexerChannel(
0179: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0180: assertTrue(ch1.isOpen());
0181: assertFalse(ch1.isConnected());
0182:
0183: ch1.connect("bla");
0184: assertTrue(ch1.isOpen());
0185: assertTrue(ch1.isConnected());
0186:
0187: ch2 = factory.createMultiplexerChannel(
0188: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0189: assertTrue(ch2.isOpen());
0190: assertFalse(ch2.isConnected());
0191:
0192: ch1.disconnect();
0193: assertTrue(ch1.isOpen());
0194: assertFalse(ch1.isConnected());
0195:
0196: assertTrue(ch2.isOpen());
0197: assertFalse(ch2.isConnected());
0198:
0199: ch1.connect("bla");
0200: assertTrue(ch1.isOpen());
0201: assertTrue(ch1.isConnected());
0202:
0203: assertTrue(ch2.isOpen());
0204: assertFalse(ch2.isConnected());
0205: }
0206:
0207: public void testClose() throws Exception {
0208: ch1 = factory.createMultiplexerChannel(
0209: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0210: ch1.connect("bla");
0211: ch2 = factory.createMultiplexerChannel(
0212: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0213: ch2.connect("bla");
0214: ch1.close();
0215: ch2.close();
0216: }
0217:
0218: public void testReplicationWithTwoChannels() throws Exception {
0219: ch1 = factory.createMultiplexerChannel(
0220: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0221: c1 = new Cache(ch1, "cache-1");
0222: assertEquals("cache has to be empty initially", 0, c1.size());
0223: ch1.connect("bla");
0224:
0225: ch1_repl = factory2.createMultiplexerChannel(
0226: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0227: c1_repl = new Cache(ch1_repl, "cache-1-repl");
0228: assertEquals("cache has to be empty initially", 0, c1_repl
0229: .size());
0230: ch1_repl.connect("bla");
0231:
0232: View v = ch1_repl.getView();
0233: assertNotNull(v);
0234: assertEquals(2, v.size());
0235:
0236: // System.out.println("****** [c1] PUT(name, Bela) *******");
0237: c1.put("name", "Bela");
0238: if (ch1.flushSupported())
0239: ch1.startFlush(5000, true);
0240: else
0241: Util.sleep(10000);
0242:
0243: System.out.println("c1: " + c1 + ", c1_repl: " + c1_repl);
0244:
0245: assertEquals(1, c1.size());
0246: assertEquals("Bela", c1.get("name"));
0247:
0248: assertEquals(1, c1_repl.size());
0249: assertEquals("Bela", c1_repl.get("name"));
0250:
0251: c1.put("id", new Long(322649));
0252: c1_repl.put("hobbies", "biking");
0253: c1_repl.put("bike", "Centurion");
0254: if (ch1.flushSupported())
0255: ch1.startFlush(5000, true);
0256: else
0257: Util.sleep(10000);
0258:
0259: System.out.println("c1: " + c1 + ", c1_repl: " + c1_repl);
0260:
0261: assertEquals(4, c1.size());
0262: assertEquals(4, c1_repl.size());
0263:
0264: assertEquals(new Long(322649), c1.get("id"));
0265: assertEquals(new Long(322649), c1_repl.get("id"));
0266:
0267: assertEquals("biking", c1.get("hobbies"));
0268: assertEquals("biking", c1_repl.get("hobbies"));
0269:
0270: assertEquals("Centurion", c1.get("bike"));
0271: assertEquals("Centurion", c1_repl.get("bike"));
0272: }
0273:
0274: public void testReplicationWithReconnect() throws Exception {
0275: ch1 = factory.createMultiplexerChannel(
0276: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0277: ch1.connect("bla");
0278: c1 = new Cache(ch1, "cache-1");
0279: assertEquals("cache has to be empty initially", 0, c1.size());
0280: c1.put("name", "Bela");
0281: Util.sleep(300); // we need to wait because replication is asynchronous here
0282: assertEquals(1, c1.size());
0283: assertEquals("Bela", c1.get("name"));
0284:
0285: ch1.disconnect();
0286:
0287: ch1.connect("bla");
0288:
0289: c2 = new Cache(ch1, "cache-1");
0290: assertEquals("cache has to be empty initially", 0, c2.size());
0291: c2.put("name", "Bela");
0292: Util.sleep(300); // we need to wait because replication is asynchronous here
0293: assertEquals(1, c2.size());
0294: assertEquals("Bela", c2.get("name"));
0295:
0296: }
0297:
0298: public void testStateTransfer() throws Exception {
0299: ch1 = factory.createMultiplexerChannel(
0300: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0301: ch1.connect("bla");
0302: c1 = new Cache(ch1, "cache-1");
0303: assertEquals("cache has to be empty initially", 0, c1.size());
0304:
0305: ch1_repl = factory2.createMultiplexerChannel(
0306: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0307:
0308: c1.put("name", "Bela");
0309: c1.put("id", new Long(322649));
0310: c1.put("hobbies", "biking");
0311: c1.put("bike", "Centurion");
0312:
0313: ch1_repl.connect("bla");
0314: c1_repl = new Cache(ch1_repl, "cache-1-repl");
0315: boolean rc = ch1_repl.getState(null, 5000);
0316: System.out.println("state transfer: " + rc);
0317: Util.sleep(500);
0318:
0319: System.out.println("c1_repl: " + c1_repl);
0320: assertEquals("initial state should have been transferred", 4,
0321: c1_repl.size());
0322:
0323: assertEquals(new Long(322649), c1.get("id"));
0324: assertEquals(new Long(322649), c1_repl.get("id"));
0325:
0326: assertEquals("biking", c1.get("hobbies"));
0327: assertEquals("biking", c1_repl.get("hobbies"));
0328:
0329: assertEquals("Centurion", c1.get("bike"));
0330: assertEquals("Centurion", c1_repl.get("bike"));
0331: }
0332:
0333: public void testStateTransferWithTwoApplications() throws Exception {
0334: ch1 = factory.createMultiplexerChannel(
0335: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0336: ch1.connect("bla");
0337: c1 = new Cache(ch1, "cache-1");
0338: assertEquals("cache has to be empty initially", 0, c1.size());
0339:
0340: ch2 = factory.createMultiplexerChannel(
0341: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0342: ch2.connect("bla");
0343: c2 = new Cache(ch2, "cache-2");
0344: assertEquals("cache has to be empty initially", 0, c2.size());
0345:
0346: ch1_repl = factory2.createMultiplexerChannel(
0347: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0348:
0349: ch2_repl = factory2.createMultiplexerChannel(
0350: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0351:
0352: c1.put("name", "cache-1");
0353: c2.put("name", "cache-2");
0354:
0355: ch1_repl.connect("bla");
0356: c1_repl = new Cache(ch1_repl, "cache-1-repl");
0357: boolean rc = ch1_repl.getState(null, 5000);
0358: System.out.println("state transfer: " + rc);
0359:
0360: ch2_repl.connect("bla");
0361: c2_repl = new Cache(ch2_repl, "cache-2-repl");
0362: rc = ch2_repl.getState(null, 5000);
0363: System.out.println("state transfer: " + rc);
0364: Util.sleep(500);
0365:
0366: System.out.println("Caches after state transfers:");
0367: System.out.println("c1: " + c1);
0368: System.out.println("c1_repl: " + c1_repl);
0369: System.out.println("c2: " + c2);
0370: System.out.println("c2_repl: " + c2_repl);
0371:
0372: assertEquals(1, c1.size());
0373: assertEquals(1, c1_repl.size());
0374:
0375: assertEquals(1, c2.size());
0376: assertEquals(1, c2_repl.size());
0377:
0378: assertEquals("cache-1", c1.get("name"));
0379: assertEquals("cache-1", c1_repl.get("name"));
0380:
0381: assertEquals("cache-2", c2.get("name"));
0382: assertEquals("cache-2", c2_repl.get("name"));
0383: }
0384:
0385: public void testStateTransferWithRegistration() throws Exception {
0386: ch1 = factory.createMultiplexerChannel(
0387: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0388: ch1.connect("bla");
0389: c1 = new Cache(ch1, "cache-1");
0390: assertEquals("cache has to be empty initially", 0, c1.size());
0391:
0392: ch2 = factory.createMultiplexerChannel(
0393: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0394: ch2.connect("bla");
0395: c2 = new Cache(ch2, "cache-2");
0396: assertEquals("cache has to be empty initially", 0, c2.size());
0397: c1.put("name", "cache-1");
0398: c2.put("name", "cache-2");
0399:
0400: ch1_repl = factory2.createMultiplexerChannel(
0401: MUX_CHANNEL_CONFIG_STACK_NAME, "c1", true, null); // register for state transfer
0402: ch2_repl = factory2.createMultiplexerChannel(
0403: MUX_CHANNEL_CONFIG_STACK_NAME, "c2", true, null); // register for state transfer
0404:
0405: ch1_repl.connect("bla");
0406: c1_repl = new Cache(ch1_repl, "cache-1-repl");
0407: boolean rc = ch1_repl.getState(null, 5000); // this will *not* trigger the state transfer protocol
0408: System.out.println("state transfer: " + rc);
0409:
0410: ch2_repl.connect("bla");
0411: c2_repl = new Cache(ch2_repl, "cache-2-repl");
0412: rc = ch2_repl.getState(null, 5000); // only *this* will trigger the state transfer
0413: System.out.println("state transfer: " + rc);
0414: Util.sleep(500);
0415:
0416: System.out.println("Caches after state transfers:");
0417: System.out.println("c1: " + c1);
0418: System.out.println("c1_repl: " + c1_repl);
0419: System.out.println("c2: " + c2);
0420: System.out.println("c2_repl: " + c2_repl);
0421:
0422: assertEquals(1, c1.size());
0423: assertEquals(1, c1_repl.size());
0424:
0425: assertEquals(1, c2.size());
0426: assertEquals(1, c2_repl.size());
0427:
0428: assertEquals("cache-1", c1.get("name"));
0429: assertEquals("cache-1", c1_repl.get("name"));
0430:
0431: assertEquals("cache-2", c2.get("name"));
0432: assertEquals("cache-2", c2_repl.get("name"));
0433: c1.clear();
0434: c1_repl.clear();
0435: c2.clear();
0436: c2_repl.clear();
0437: }
0438:
0439: private void setCorrectPortRange(Channel ch) {
0440: ProtocolStack stack = ((MuxChannel) ch).getProtocolStack();
0441: Protocol tcpping = stack.findProtocol("TCPPING");
0442: if (tcpping == null)
0443: return;
0444:
0445: Properties props = tcpping.getProperties();
0446: String port_range = props.getProperty("port_range");
0447: if (port_range != null) {
0448: System.out.println("port_range in TCPPING: " + port_range
0449: + ", setting it to 2");
0450: port_range = "2";
0451: Properties p = new Properties();
0452: // p.putAll(props);
0453: p.setProperty("port_range", port_range);
0454: tcpping.setProperties(p);
0455: }
0456: }
0457:
0458: public void testStateTransferWithReconnect() throws Exception {
0459: ch1 = factory.createMultiplexerChannel(
0460: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0461: setCorrectPortRange(ch1);
0462:
0463: assertTrue(ch1.isOpen());
0464: assertFalse(ch1.isConnected());
0465: ch1.connect("bla");
0466: assertTrue(ch1.isOpen());
0467: assertTrue(ch1.isConnected());
0468: assertServiceAndClusterView(ch1, 1, 1);
0469:
0470: c1 = new Cache(ch1, "cache-1");
0471: assertEquals("cache has to be empty initially", 0, c1.size());
0472:
0473: ch1_repl = factory2.createMultiplexerChannel(
0474: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0475: setCorrectPortRange(ch1_repl);
0476: assertTrue(ch1_repl.isOpen());
0477: assertFalse(ch1_repl.isConnected());
0478:
0479: c1.put("name", "Bela");
0480: c1.put("id", new Long(322649));
0481: c1.put("hobbies", "biking");
0482: c1.put("bike", "Centurion");
0483:
0484: ch1_repl.connect("bla");
0485: assertTrue(ch1_repl.isOpen());
0486: assertTrue(ch1_repl.isConnected());
0487: assertServiceAndClusterView(ch1_repl, 2, 2);
0488: Util.sleep(500);
0489: assertServiceAndClusterView(ch1, 2, 2);
0490:
0491: c1_repl = new Cache(ch1_repl, "cache-1-repl");
0492: boolean rc = ch1_repl.getState(null, 5000);
0493: System.out.println("state transfer: " + rc);
0494: Util.sleep(500);
0495:
0496: System.out.println("c1_repl: " + c1_repl);
0497: assertEquals("initial state should have been transferred", 4,
0498: c1_repl.size());
0499: assertEquals(new Long(322649), c1.get("id"));
0500: assertEquals(new Long(322649), c1_repl.get("id"));
0501:
0502: assertEquals("biking", c1.get("hobbies"));
0503: assertEquals("biking", c1_repl.get("hobbies"));
0504:
0505: assertEquals("Centurion", c1.get("bike"));
0506: assertEquals("Centurion", c1_repl.get("bike"));
0507:
0508: ch1_repl.disconnect();
0509: assertTrue(ch1_repl.isOpen());
0510: assertFalse(ch1_repl.isConnected());
0511: Util.sleep(1000);
0512: assertServiceAndClusterView(ch1, 1, 1);
0513:
0514: c1_repl.clear();
0515:
0516: ch1_repl.connect("bla");
0517: assertTrue(ch1_repl.isOpen());
0518: assertTrue(ch1_repl.isConnected());
0519: assertServiceAndClusterView(ch1_repl, 2, 2);
0520: Util.sleep(300);
0521: assertServiceAndClusterView(ch1, 2, 2);
0522:
0523: assertEquals("cache has to be empty initially", 0, c1_repl
0524: .size());
0525:
0526: rc = ch1_repl.getState(null, 5000);
0527: System.out.println("state transfer: " + rc);
0528: Util.sleep(500);
0529:
0530: System.out.println("c1_repl: " + c1_repl);
0531: assertEquals("initial state should have been transferred", 4,
0532: c1_repl.size());
0533:
0534: assertEquals(new Long(322649), c1.get("id"));
0535: assertEquals(new Long(322649), c1_repl.get("id"));
0536:
0537: assertEquals("biking", c1.get("hobbies"));
0538: assertEquals("biking", c1_repl.get("hobbies"));
0539:
0540: assertEquals("Centurion", c1.get("bike"));
0541: assertEquals("Centurion", c1_repl.get("bike"));
0542:
0543: // Now see what happens if we reconnect the first channel
0544: // But first, add another MuxChannel on that JChannel
0545: // just so it remains coordinator (test that it doesn't
0546: // ask for state from itself)
0547: ch2 = factory.createMultiplexerChannel(
0548: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0549: setCorrectPortRange(ch2);
0550: assertTrue(ch2.isOpen());
0551: assertFalse(ch2.isConnected());
0552: assertServiceAndClusterView(ch1, 2, 2);
0553: assertServiceAndClusterView(ch1_repl, 2, 2);
0554:
0555: ch1.disconnect();
0556: //sleep a bit and thus let asynch VIEW to propagate to other channel
0557: Util.sleep(500);
0558: assertTrue(ch1.isOpen());
0559: assertFalse(ch1.isConnected());
0560: assertServiceAndClusterView(ch1_repl, 1, 1);
0561: assertTrue(ch2.isOpen());
0562: assertFalse(ch2.isConnected());
0563:
0564: c1.clear();
0565:
0566: ch1.connect("bla");
0567: assertTrue(ch1.isOpen());
0568: assertTrue(ch1.isConnected());
0569: assertServiceAndClusterView(ch1, 2, 2);
0570: Util.sleep(500);
0571: assertServiceAndClusterView(ch1_repl, 2, 2);
0572: assertTrue(ch2.isOpen());
0573: assertFalse(ch2.isConnected());
0574:
0575: assertEquals("cache has to be empty initially", 0, c1.size());
0576:
0577: rc = ch1.getState(null, 5000);
0578: System.out.println("state transfer: " + rc);
0579: Util.sleep(500);
0580:
0581: System.out.println("c1: " + c1);
0582: assertEquals("initial state should have been transferred", 4,
0583: c1.size());
0584:
0585: assertEquals(new Long(322649), c1.get("id"));
0586: assertEquals(new Long(322649), c1_repl.get("id"));
0587:
0588: assertEquals("biking", c1.get("hobbies"));
0589: assertEquals("biking", c1_repl.get("hobbies"));
0590:
0591: assertEquals("Centurion", c1.get("bike"));
0592: assertEquals("Centurion", c1_repl.get("bike"));
0593: }
0594:
0595: private void assertServiceAndClusterView(Channel ch,
0596: int num_service_view_mbrs, int num_cluster_view_mbrs) {
0597: View service_view, cluster_view;
0598: service_view = ch.getView();
0599: cluster_view = ((MuxChannel) ch).getClusterView();
0600:
0601: String msg = "cluster view=" + cluster_view + ", service view="
0602: + service_view;
0603:
0604: assertNotNull(service_view);
0605: assertNotNull(cluster_view);
0606:
0607: assertEquals(msg, num_service_view_mbrs, service_view.size());
0608: assertEquals(msg, num_cluster_view_mbrs, cluster_view.size());
0609: }
0610:
0611: public void testStateTransferFromSelfWithRegularChannel()
0612: throws Exception {
0613: JChannel ch = new JChannel();
0614: ch.connect("X");
0615: try {
0616: boolean rc = ch.getState(null, 2000);
0617: assertFalse("getState() on singleton should return false",
0618: rc);
0619: } finally {
0620: ch.close();
0621: }
0622: }
0623:
0624: public void testStateTransferFromSelf() throws Exception {
0625: ch1 = factory.createMultiplexerChannel(
0626: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0627: ch1.connect("bla");
0628: boolean rc = ch1.getState(null, 2000);
0629: assertFalse("getState() on singleton should return false", rc);
0630: ch2 = factory.createMultiplexerChannel(
0631: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0632: ch2.connect("foo");
0633: rc = ch2.getState(null, 2000);
0634: assertFalse("getState() on singleton should return false", rc);
0635: }
0636:
0637: public void testAdditionalData() throws Exception {
0638: byte[] additional_data = new byte[] { 'b', 'e', 'l', 'a' };
0639: ch1 = factory.createMultiplexerChannel(
0640: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0641: Map m = new HashMap(1);
0642: m.put("additional_data", additional_data);
0643: ch1.down(new Event(Event.CONFIG, m));
0644: ch1.connect("bla");
0645: IpAddress local_addr = (IpAddress) ch1.getLocalAddress();
0646: assertNotNull(local_addr);
0647: byte[] tmp = local_addr.getAdditionalData();
0648: assertNotNull(tmp);
0649: assertEquals(tmp, additional_data);
0650:
0651: ch2 = factory.createMultiplexerChannel(
0652: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0653: ch2.connect("foo");
0654: local_addr = (IpAddress) ch2.getLocalAddress();
0655: assertNotNull(local_addr);
0656: tmp = local_addr.getAdditionalData();
0657: assertNotNull(tmp);
0658: assertEquals(tmp, additional_data);
0659: }
0660:
0661: public void testAdditionalData2() throws Exception {
0662: byte[] additional_data = new byte[] { 'b', 'e', 'l', 'a' };
0663: byte[] additional_data2 = new byte[] { 'm', 'i', 'c', 'h', 'i' };
0664: ch1 = factory.createMultiplexerChannel(
0665: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0666: ch1.connect("bla");
0667: IpAddress local_addr = (IpAddress) ch1.getLocalAddress();
0668: assertNotNull(local_addr);
0669: byte[] tmp = local_addr.getAdditionalData();
0670: assertNull(tmp);
0671:
0672: ch2 = factory.createMultiplexerChannel(
0673: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0674: Map m = new HashMap(1);
0675: m.put("additional_data", additional_data);
0676: ch2.down(new Event(Event.CONFIG, m));
0677: ch2.connect("foo");
0678: local_addr = (IpAddress) ch2.getLocalAddress();
0679: assertNotNull(local_addr);
0680: tmp = local_addr.getAdditionalData();
0681: assertNotNull(tmp);
0682: assertEquals(tmp, additional_data);
0683:
0684: local_addr = (IpAddress) ch1.getLocalAddress();
0685: assertNotNull(local_addr);
0686: tmp = local_addr.getAdditionalData();
0687: assertNotNull(tmp);
0688: assertEquals(tmp, additional_data);
0689:
0690: m.clear();
0691: m.put("additional_data", additional_data2);
0692: ch2.down(new Event(Event.CONFIG, m));
0693: local_addr = (IpAddress) ch2.getLocalAddress();
0694: assertNotNull(local_addr);
0695: tmp = local_addr.getAdditionalData();
0696: assertNotNull(tmp);
0697: assertEquals(tmp, additional_data2);
0698: assertFalse(Arrays.equals(tmp, additional_data));
0699: }
0700:
0701: public void testGetSubstates() throws Exception {
0702: ch1 = factory.createMultiplexerChannel(
0703: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0704: ch1.connect("bla");
0705: c1 = new ExtendedCache(ch1, "cache-1");
0706: assertEquals("cache has to be empty initially", 0, c1.size());
0707:
0708: ch2 = factory.createMultiplexerChannel(
0709: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0710: ch2.connect("bla");
0711: c2 = new ExtendedCache(ch2, "cache-2");
0712: assertEquals("cache has to be empty initially", 0, c2.size());
0713:
0714: for (int i = 0; i < 10; i++) {
0715: c1.put(new Integer(i), new Integer(i));
0716: c2.put(new Integer(i), new Integer(i));
0717: }
0718:
0719: ch1_repl = factory2.createMultiplexerChannel(
0720: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0721: ch2_repl = factory2.createMultiplexerChannel(
0722: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0723: ch1_repl.connect("bla");
0724: c1_repl = new ExtendedCache(ch1_repl, "cache-1-repl");
0725: boolean rc = ch1_repl.getState(null, "odd", 5000);
0726: System.out.println("state transfer: " + rc);
0727:
0728: ch2_repl.connect("bla");
0729: c2_repl = new ExtendedCache(ch2_repl, "cache-2-repl");
0730: rc = ch2_repl.getState(null, "even", 5000);
0731: System.out.println("state transfer: " + rc);
0732: Util.sleep(500);
0733:
0734: System.out.println("Caches after state transfers:");
0735: System.out.println("c1: " + c1);
0736: System.out.println("c2: " + c2);
0737:
0738: System.out
0739: .println("c1_repl (removed odd substate): " + c1_repl);
0740: System.out.println("c2_repl (removed even substate): "
0741: + c2_repl);
0742:
0743: assertEquals(5, c1_repl.size());
0744: assertEquals(5, c2_repl.size());
0745:
0746: _testEvenNumbersPresent(c1_repl);
0747: _testOddNumbersPresent(c2_repl);
0748: }
0749:
0750: private void _testEvenNumbersPresent(Cache c) {
0751: Integer[] evens = new Integer[] { new Integer(0),
0752: new Integer(2), new Integer(4), new Integer(6),
0753: new Integer(8) };
0754: _testNumbersPresent(c, evens);
0755:
0756: }
0757:
0758: private void _testOddNumbersPresent(Cache c) {
0759: Integer[] odds = new Integer[] { new Integer(1),
0760: new Integer(3), new Integer(5), new Integer(7),
0761: new Integer(9) };
0762: _testNumbersPresent(c, odds);
0763: }
0764:
0765: private void _testNumbersPresent(Cache c, Integer[] numbers) {
0766: int len = numbers.length;
0767: assertEquals(len, c.size());
0768: for (int i = 0; i < numbers.length; i++) {
0769: Integer number = numbers[i];
0770: assertEquals(number, c.get(number));
0771: }
0772: }
0773:
0774: public void testGetSubstatesMultipleTimes() throws Exception {
0775: ch1 = factory.createMultiplexerChannel(
0776: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0777: ch1.connect("bla");
0778: c1 = new ExtendedCache(ch1, "cache-1");
0779: assertEquals("cache has to be empty initially", 0, c1.size());
0780:
0781: ch2 = factory.createMultiplexerChannel(
0782: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0783: ch2.connect("bla");
0784: c2 = new ExtendedCache(ch2, "cache-2");
0785: assertEquals("cache has to be empty initially", 0, c2.size());
0786:
0787: for (int i = 0; i < 10; i++) {
0788: c1.put(new Integer(i), new Integer(i));
0789: c2.put(new Integer(i), new Integer(i));
0790: }
0791:
0792: ch1_repl = factory2.createMultiplexerChannel(
0793: MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0794: ch2_repl = factory2.createMultiplexerChannel(
0795: MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0796: ch1_repl.connect("bla");
0797: c1_repl = new ExtendedCache(ch1_repl, "cache-1-repl");
0798: boolean rc = ch1_repl.getState(null, "odd", 5000);
0799: System.out.println("state transfer: " + rc);
0800:
0801: ch2_repl.connect("bla");
0802: c2_repl = new ExtendedCache(ch2_repl, "cache-2-repl");
0803: rc = ch2_repl.getState(null, "even", 5000);
0804: System.out.println("state transfer: " + rc);
0805: Util.sleep(500);
0806: _testOddNumbersPresent(c2_repl);
0807:
0808: System.out.println("Caches after state transfers:");
0809: System.out.println("c1: " + c1);
0810: System.out.println("c2: " + c2);
0811: System.out
0812: .println("c1_repl (removed odd substate): " + c1_repl);
0813: System.out.println("c2_repl (removed even substate): "
0814: + c2_repl);
0815:
0816: assertEquals(5, c2_repl.size());
0817: rc = ch2_repl.getState(null, "odd", 5000);
0818: Util.sleep(500);
0819: System.out
0820: .println("c2_repl (removed odd substate): " + c2_repl);
0821: _testEvenNumbersPresent(c2_repl);
0822:
0823: assertEquals(5, c2_repl.size());
0824: rc = ch2_repl.getState(null, "even", 5000);
0825: Util.sleep(500);
0826: System.out.println("c2_repl (removed even substate): "
0827: + c2_repl);
0828: _testOddNumbersPresent(c2_repl);
0829:
0830: assertEquals(5, c2_repl.size());
0831: rc = ch2_repl.getState(null, "odd", 5000);
0832: Util.sleep(500);
0833: System.out
0834: .println("c2_repl (removed odd substate): " + c2_repl);
0835: _testEvenNumbersPresent(c2_repl);
0836: }
0837:
0838: public static Test suite() {
0839: return new TestSuite(MultiplexerTest.class);
0840: }
0841:
0842: public static void main(String[] args) {
0843: junit.textui.TestRunner.run(MultiplexerTest.suite());
0844: }
0845:
0846: private static class Cache extends ExtendedReceiverAdapter {
0847: final Map data = new HashMap();
0848: Channel ch;
0849: String name;
0850:
0851: public Cache(Channel ch, String name) {
0852: this .ch = ch;
0853: this .name = name;
0854: this .ch.setReceiver(this );
0855: }
0856:
0857: protected Object get(Object key) {
0858: synchronized (data) {
0859: return data.get(key);
0860: }
0861: }
0862:
0863: protected void put(Object key, Object val) throws Exception {
0864: Object[] buf = new Object[2];
0865: buf[0] = key;
0866: buf[1] = val;
0867: Message msg = new Message(null, null, buf);
0868: ch.send(msg);
0869: }
0870:
0871: protected int size() {
0872: synchronized (data) {
0873: return data.size();
0874: }
0875: }
0876:
0877: public void receive(Message msg) {
0878: Object[] modification = (Object[]) msg.getObject();
0879: Object key = modification[0];
0880: Object val = modification[1];
0881: synchronized (data) {
0882: // System.out.println("****** [" + name + "] received PUT(" + key + ", " + val + ") " + " from " + msg.getSrc() + " *******");
0883: data.put(key, val);
0884: }
0885: }
0886:
0887: public byte[] getState() {
0888: byte[] state = null;
0889: synchronized (data) {
0890: try {
0891: state = Util.objectToByteBuffer(data);
0892: } catch (Exception e) {
0893: e.printStackTrace();
0894: return null;
0895: }
0896: }
0897: return state;
0898: }
0899:
0900: public byte[] getState(String state_id) {
0901: return getState();
0902: }
0903:
0904: public void setState(byte[] state) {
0905: Map m;
0906: try {
0907: m = (Map) Util.objectFromByteBuffer(state);
0908: synchronized (data) {
0909: data.clear();
0910: data.putAll(m);
0911: }
0912: } catch (Exception e) {
0913: e.printStackTrace();
0914: }
0915: }
0916:
0917: public void setState(String state_id, byte[] state) {
0918: setState(state);
0919: }
0920:
0921: public void getState(OutputStream ostream) {
0922: ObjectOutputStream oos = null;
0923: try {
0924: oos = new ObjectOutputStream(ostream);
0925: synchronized (data) {
0926: oos.writeObject(data);
0927: }
0928: oos.flush();
0929: } catch (IOException e) {
0930: } finally {
0931: try {
0932: if (oos != null)
0933: oos.close();
0934: } catch (IOException e) {
0935: System.err.println(e);
0936: }
0937: }
0938: }
0939:
0940: public void getState(String state_id, OutputStream ostream) {
0941: getState(ostream);
0942: }
0943:
0944: public void setState(InputStream istream) {
0945: ObjectInputStream ois = null;
0946: try {
0947: ois = new ObjectInputStream(istream);
0948: Map m = (Map) ois.readObject();
0949: synchronized (data) {
0950: data.clear();
0951: data.putAll(m);
0952: }
0953:
0954: } catch (Exception e) {
0955: } finally {
0956: try {
0957: if (ois != null)
0958: ois.close();
0959: } catch (IOException e) {
0960: System.err.println(e);
0961: }
0962: }
0963: }
0964:
0965: public void setState(String state_id, InputStream istream) {
0966: setState(istream);
0967: }
0968:
0969: public void clear() {
0970: synchronized (data) {
0971: data.clear();
0972: }
0973: }
0974:
0975: public void viewAccepted(View new_view) {
0976: log("view is " + new_view);
0977: }
0978:
0979: public String toString() {
0980: return data.toString();
0981: }
0982:
0983: private void log(String msg) {
0984: System.out.println("-- [" + name + "] " + msg);
0985: }
0986:
0987: }
0988:
0989: static class ExtendedCache extends Cache {
0990:
0991: public ExtendedCache(Channel ch, String name) {
0992: super (ch, name);
0993: }
0994:
0995: public byte[] getState(String state_id) {
0996: Map copy = null;
0997: synchronized (data) {
0998: copy = new HashMap(data);
0999: }
1000: for (Iterator it = copy.keySet().iterator(); it.hasNext();) {
1001: Integer key = (Integer) it.next();
1002: if (state_id.equals("odd") && key.intValue() % 2 != 0)
1003: it.remove();
1004: else if (state_id.equals("even")
1005: && key.intValue() % 2 == 0)
1006: it.remove();
1007: }
1008: try {
1009: return Util.objectToByteBuffer(copy);
1010: } catch (Exception e) {
1011: e.printStackTrace();
1012: return null;
1013: }
1014: }
1015:
1016: public void getState(String state_id, OutputStream os) {
1017: Map copy = null;
1018: synchronized (data) {
1019: copy = new HashMap(data);
1020: }
1021: for (Iterator it = copy.keySet().iterator(); it.hasNext();) {
1022: Integer key = (Integer) it.next();
1023: if (state_id.equals("odd") && key.intValue() % 2 != 0)
1024: it.remove();
1025: else if (state_id.equals("even")
1026: && key.intValue() % 2 == 0)
1027: it.remove();
1028: }
1029: ObjectOutputStream oos = null;
1030: try {
1031: oos = new ObjectOutputStream(os);
1032: oos.writeObject(copy);
1033: oos.flush();
1034: } catch (IOException e) {
1035: } finally {
1036: try {
1037: if (oos != null)
1038: oos.close();
1039: } catch (IOException e) {
1040: System.err.println(e);
1041: }
1042: }
1043: }
1044:
1045: public void setState(String state_id, InputStream is) {
1046: setState(is);
1047: }
1048:
1049: public void setState(String state_id, byte[] state) {
1050: setState(state);
1051: }
1052:
1053: public String toString() {
1054: synchronized (data) {
1055: Set keys = new TreeSet(data.keySet());
1056: StringBuffer sb = new StringBuffer();
1057: for (Iterator it = keys.iterator(); it.hasNext();) {
1058: Object o = it.next();
1059: sb.append(o).append("=").append(data.get(o))
1060: .append(" ");
1061: }
1062: return sb.toString();
1063: }
1064: }
1065: }
1066:
1067: }
|