0001: /**
0002: * Copyright 2003-2007 Luck Consulting Pty Ltd
0003: *
0004: * Licensed under the Apache License, Version 2.0 (the "License");
0005: * you may not use this file except in compliance with the License.
0006: * You may obtain a copy of the License at
0007: *
0008: * http://www.apache.org/licenses/LICENSE-2.0
0009: *
0010: * Unless required by applicable law or agreed to in writing, software
0011: * distributed under the License is distributed on an "AS IS" BASIS,
0012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013: * See the License for the specific language governing permissions and
0014: * limitations under the License.
0015: */package net.sf.ehcache.distribution;
0016:
0017: import junit.framework.AssertionFailedError;
0018: import net.sf.ehcache.AbstractCacheTest;
0019: import net.sf.ehcache.Cache;
0020: import net.sf.ehcache.CacheException;
0021: import net.sf.ehcache.CacheManager;
0022: import net.sf.ehcache.Ehcache;
0023: import net.sf.ehcache.Element;
0024: import net.sf.ehcache.StopWatch;
0025: import net.sf.ehcache.ThreadKiller;
0026: import net.sf.ehcache.management.ManagementService;
0027: import net.sf.ehcache.event.CountingCacheEventListener;
0028: import org.apache.commons.logging.Log;
0029: import org.apache.commons.logging.LogFactory;
0030:
0031: import java.io.IOException;
0032: import java.io.Serializable;
0033: import java.rmi.RemoteException;
0034: import java.util.ArrayList;
0035: import java.util.Arrays;
0036: import java.util.Date;
0037: import java.util.List;
0038: import java.util.Random;
0039:
0040: /**
0041: * Tests replication of Cache events
0042: * <p/>
0043: * Note these tests need a live network interface running in multicast mode to work
0044: * <p/>
0045: * If running involving RMIAsynchronousCacheReplicator individually the test will fail because
0046: * the VM will gobble up the SoftReferences rather than allocating more memory. Uncomment the
0047: * forceVMGrowth() method usage in setup.
0048: *
0049: * @author Greg Luck
0050: * @version $Id: RMICacheReplicatorTest.java 575 2008-01-30 07:22:04Z gregluck $
0051: */
0052: public class RMICacheReplicatorTest extends AbstractCacheTest {
0053:
0054: /**
0055: * A value to represent replicate asynchronously
0056: */
0057: protected static final boolean ASYNCHRONOUS = true;
0058:
0059: /**
0060: * A value to represent replicate synchronously
0061: */
0062: protected static final boolean SYNCHRONOUS = false;
0063:
0064: private static final Log LOG = LogFactory
0065: .getLog(RMICacheReplicatorTest.class.getName());
0066:
0067: /**
0068: * CacheManager 1 in the cluster
0069: */
0070: protected CacheManager manager1;
0071: /**
0072: * CacheManager 2 in the cluster
0073: */
0074: protected CacheManager manager2;
0075: /**
0076: * CacheManager 3 in the cluster
0077: */
0078: protected CacheManager manager3;
0079: /**
0080: * CacheManager 4 in the cluster
0081: */
0082: protected CacheManager manager4;
0083: /**
0084: * CacheManager 5 in the cluster
0085: */
0086: protected CacheManager manager5;
0087: /**
0088: * CacheManager 6 in the cluster
0089: */
0090: protected CacheManager manager6;
0091:
0092: /**
0093: * The name of the cache under test
0094: */
0095: protected String cacheName = "sampleCache1";
0096: /**
0097: * CacheManager 1 of 2s cache being replicated
0098: */
0099: protected Ehcache cache1;
0100:
0101: /**
0102: * CacheManager 2 of 2s cache being replicated
0103: */
0104: protected Ehcache cache2;
0105:
0106: /**
0107: * Allows setup to be the same
0108: */
0109: protected String cacheNameBase = "ehcache-distributed";
0110:
0111: /**
0112: * {@inheritDoc}
0113: * Sets up two caches: cache1 is local. cache2 is to be receive updates
0114: *
0115: * @throws Exception
0116: */
0117: protected void setUp() throws Exception {
0118: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0119: return;
0120: }
0121:
0122: //Required to get SoftReference tests to pass. The VM clean up SoftReferences rather than allocating
0123: // memory to -Xmx!
0124: //forceVMGrowth();
0125: //System.gc();
0126: MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
0127:
0128: CountingCacheEventListener.resetCounters();
0129: manager1 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0130: + "distribution/ehcache-distributed1.xml");
0131: manager2 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0132: + "distribution/ehcache-distributed2.xml");
0133: manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0134: + "distribution/ehcache-distributed3.xml");
0135: manager4 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0136: + "distribution/ehcache-distributed4.xml");
0137: manager5 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0138: + "distribution/ehcache-distributed5.xml");
0139:
0140: //manager6 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-jndi6.xml");
0141:
0142: //allow cluster to be established
0143: Thread.sleep(1020);
0144:
0145: cache1 = manager1.getCache(cacheName);
0146: cache1.removeAll();
0147:
0148: cache2 = manager2.getCache(cacheName);
0149: cache2.removeAll();
0150:
0151: //enable distributed removeAlls to finish
0152: waitForProgagate();
0153:
0154: }
0155:
0156: /**
0157: * {@inheritDoc}
0158: *
0159: * @throws Exception
0160: */
0161: protected void tearDown() throws Exception {
0162:
0163: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0164: return;
0165: }
0166:
0167: if (manager1 != null) {
0168: manager1.shutdown();
0169: }
0170: if (manager2 != null) {
0171: manager2.shutdown();
0172: }
0173: if (manager3 != null) {
0174: manager3.shutdown();
0175: }
0176: if (manager4 != null) {
0177: manager4.shutdown();
0178: }
0179: if (manager5 != null) {
0180: manager5.shutdown();
0181: }
0182: if (manager6 != null) {
0183: manager6.shutdown();
0184: }
0185: Thread.sleep(5000);
0186:
0187: List threads = JVMUtil.enumerateThreads();
0188: for (int i = 0; i < threads.size(); i++) {
0189: Thread thread = (Thread) threads.get(i);
0190: if (thread.getName().equals("Replication Thread")) {
0191: fail("There should not be any replication threads running after shutdown");
0192: }
0193: }
0194:
0195: }
0196:
0197: /**
0198: * 5 cache managers should means that each cache has four remote peers
0199: */
0200: public void testRemoteCachePeersEqualsNumberOfCacheManagersInCluster() {
0201:
0202: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0203: return;
0204: }
0205:
0206: CacheManagerPeerProvider provider = manager1
0207: .getCachePeerProvider();
0208: List remotePeersOfCache1 = provider
0209: .listRemoteCachePeers(cache1);
0210: assertEquals(4, remotePeersOfCache1.size());
0211: }
0212:
0213: /**
0214: * Does a new cache manager in the cluster get detected?
0215: */
0216: public void testRemoteCachePeersDetectsNewCacheManager()
0217: throws InterruptedException {
0218:
0219: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0220: return;
0221: }
0222:
0223: CacheManagerPeerProvider provider = manager1
0224: .getCachePeerProvider();
0225: List remotePeersOfCache1 = provider
0226: .listRemoteCachePeers(cache1);
0227: assertEquals(4, remotePeersOfCache1.size());
0228:
0229: //Add new CacheManager to cluster
0230: manager6 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0231: + "distribution/ehcache-distributed6.xml");
0232:
0233: //Allow detection to occur
0234: Thread.sleep(10020);
0235:
0236: remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
0237: assertEquals(5, remotePeersOfCache1.size());
0238: }
0239:
0240: /**
0241: * Does a down cache manager in the cluster get removed?
0242: */
0243: public void testRemoteCachePeersDetectsDownCacheManager()
0244: throws InterruptedException {
0245:
0246: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0247: return;
0248: }
0249:
0250: CacheManagerPeerProvider provider = manager1
0251: .getCachePeerProvider();
0252: List remotePeersOfCache1 = provider
0253: .listRemoteCachePeers(cache1);
0254: assertEquals(4, remotePeersOfCache1.size());
0255:
0256: //Drop a CacheManager from the cluster
0257: manager5.shutdown();
0258:
0259: //Allow change detection to occur. Heartbeat 1 second and is not stale until 5000
0260: Thread.sleep(11020);
0261: remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
0262:
0263: assertEquals(3, remotePeersOfCache1.size());
0264: }
0265:
0266: /**
0267: * Does a down cache manager in the cluster get removed?
0268: */
0269: public void testRemoteCachePeersDetectsDownCacheManagerSlow()
0270: throws InterruptedException {
0271:
0272: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0273: return;
0274: }
0275:
0276: try {
0277: CacheManagerPeerProvider provider = manager1
0278: .getCachePeerProvider();
0279: List remotePeersOfCache1 = provider
0280: .listRemoteCachePeers(cache1);
0281: assertEquals(4, remotePeersOfCache1.size());
0282:
0283: MulticastKeepaliveHeartbeatSender
0284: .setHeartBeatInterval(2000);
0285: Thread.sleep(2000);
0286:
0287: //Drop a CacheManager from the cluster
0288: manager5.shutdown();
0289:
0290: //Insufficient time for it to timeout
0291: remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
0292: assertEquals(4, remotePeersOfCache1.size());
0293: } finally {
0294: MulticastKeepaliveHeartbeatSender
0295: .setHeartBeatInterval(1000);
0296: Thread.sleep(2000);
0297: }
0298:
0299: }
0300:
0301: /**
0302: * Tests put and remove initiated from cache1 in a cluster
0303: * <p/>
0304: * This test goes into an infinite loop if the chain of notifications is not somehow broken.
0305: */
0306: public void testPutProgagatesFromAndToEveryCacheManagerAndCache()
0307: throws CacheException, InterruptedException {
0308:
0309: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0310: return;
0311: }
0312:
0313: //Put
0314: String[] cacheNames = manager1.getCacheNames();
0315: int numberOfCaches = getNumberOfReplicatingCachesInCacheManager();
0316: Arrays.sort(cacheNames);
0317: for (int i = 0; i < cacheNames.length; i++) {
0318: String name = cacheNames[i];
0319: manager1.getCache(name).put(
0320: new Element("" + i, new Integer(i)));
0321: //Add some non serializable elements that should not get propagated
0322: manager1.getCache(name).put(
0323: new Element("nonSerializable" + i, new Object()));
0324: }
0325:
0326: waitForProgagate();
0327:
0328: int count2 = 0;
0329: int count3 = 0;
0330: int count4 = 0;
0331: int count5 = 0;
0332: for (int i = 0; i < cacheNames.length; i++) {
0333: String name = cacheNames[i];
0334: Element element2 = manager2.getCache(name).get("" + i);
0335: if (element2 != null) {
0336: count2++;
0337: }
0338: Element nonSerializableElement2 = manager2.getCache(name)
0339: .get("nonSerializable" + i);
0340: if (nonSerializableElement2 != null) {
0341: count2++;
0342: }
0343: Element element3 = manager3.getCache(name).get("" + i);
0344: if (element3 != null) {
0345: count3++;
0346: }
0347: Element element4 = manager4.getCache(name).get("" + i);
0348: if (element4 != null) {
0349: count4++;
0350: }
0351: Element element5 = manager5.getCache(name).get("" + i);
0352: if (element5 != null) {
0353: count5++;
0354: }
0355: }
0356: assertEquals(numberOfCaches, count2);
0357: assertEquals(numberOfCaches, count3);
0358: assertEquals(numberOfCaches, count4);
0359: assertEquals(numberOfCaches, count5);
0360:
0361: }
0362:
0363: /**
0364: * Tests what happens when a CacheManager in the cluster comes and goes. In ehcache-1.2.4 this would cause the new RMI CachePeers in the CacheManager to
0365: * be permanently corrupt.
0366: */
0367: public void testPutProgagatesFromAndToEveryCacheManagerAndCacheDirty()
0368: throws CacheException, InterruptedException {
0369:
0370: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0371: return;
0372: }
0373:
0374: manager3.shutdown();
0375:
0376: Thread.sleep(11020);
0377:
0378: manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0379: + "distribution/ehcache-distributed3.xml");
0380: Thread.sleep(11020);
0381:
0382: //Put
0383: String[] cacheNames = manager1.getCacheNames();
0384: int numberOfCaches = getNumberOfReplicatingCachesInCacheManager();
0385: Arrays.sort(cacheNames);
0386: for (int i = 0; i < cacheNames.length; i++) {
0387: String name = cacheNames[i];
0388: manager1.getCache(name).put(
0389: new Element("" + i, new Integer(i)));
0390: //Add some non serializable elements that should not get propagated
0391: manager1.getCache(name).put(
0392: new Element("nonSerializable" + i, new Object()));
0393: }
0394:
0395: waitForProgagate();
0396:
0397: int count2 = 0;
0398: int count3 = 0;
0399: int count4 = 0;
0400: int count5 = 0;
0401: for (int i = 0; i < cacheNames.length; i++) {
0402: String name = cacheNames[i];
0403: Element element2 = manager2.getCache(name).get("" + i);
0404: if (element2 != null) {
0405: count2++;
0406: }
0407: Element nonSerializableElement2 = manager2.getCache(name)
0408: .get("nonSerializable" + i);
0409: if (nonSerializableElement2 != null) {
0410: count2++;
0411: }
0412: Element element3 = manager3.getCache(name).get("" + i);
0413: if (element3 != null) {
0414: count3++;
0415: }
0416: Element element4 = manager4.getCache(name).get("" + i);
0417: if (element4 != null) {
0418: count4++;
0419: }
0420: Element element5 = manager5.getCache(name).get("" + i);
0421: if (element5 != null) {
0422: count5++;
0423: }
0424: }
0425: assertEquals(numberOfCaches, count2);
0426: assertEquals(numberOfCaches, count3);
0427: assertEquals(numberOfCaches, count4);
0428: assertEquals(numberOfCaches, count5);
0429:
0430: }
0431:
0432: /**
0433: * Enables long stabilty runs using replication to be done.
0434: * <p/>
0435: * This test has been run in a profile for 15 hours without any observed issues.
0436: *
0437: * @throws InterruptedException
0438: */
0439: public void manualStabilityTest() throws InterruptedException {
0440: forceVMGrowth();
0441:
0442: ManagementService.registerMBeans(manager3, createMBeanServer(),
0443: true, true, true, true);
0444: while (true) {
0445: testBigPutsProgagatesAsynchronous();
0446: }
0447: }
0448:
0449: /**
0450: * Non JUnit invocation of stability test to get cleaner run
0451: *
0452: * @param args
0453: * @throws InterruptedException
0454: */
0455: public static void main(String[] args) throws Exception {
0456: RMICacheReplicatorTest replicatorTest = new RMICacheReplicatorTest();
0457: replicatorTest.setUp();
0458: replicatorTest.manualStabilityTest();
0459: }
0460:
0461: /**
0462: * The number of caches there should be.
0463: */
0464: protected int getNumberOfReplicatingCachesInCacheManager() {
0465: return 55;
0466: }
0467:
0468: /**
0469: * Performance and capacity tests.
0470: * <p/>
0471: * The numbers given are for the remote peer tester (java -jar ehcache-1.x-remote-debugger.jar ehcache-distributed1.xml)
0472: * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
0473: * it has fully received.
0474: * <p/>
0475: * r37 and earlier - initial implementation
0476: * 38 seconds to get all notifications with 6 peers, 2000 Elements and 400 byte payload
0477: * 18 seconds to get all notifications with 2 peers, 2000 Elements and 400 byte payload
0478: * 40 seconds to get all notifications with 2 peers, 2000 Elements and 10k payload
0479: * 22 seconds to get all notifications with 2 peers, 2000 Elements and 1k payload
0480: * 26 seconds to get all notifications with 2 peers, 200 Elements and 100k payload
0481: * <p/>
0482: * r38 - RMI stub lookup on registration rather than at each lookup. Saves quite a few lookups. Also change to 5 second heartbeat
0483: * 38 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (1 second heartbeat)
0484: * 16 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (5 second heartbeat)
0485: * 13 seconds to get 2000 notifications with 2 peers, Elements with 400 byte payload
0486: * <p/>
0487: * r39 - Batching asyn replicator. Send all queued messages in one RMI call once per second.
0488: * 2 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (5 second heartbeat)
0489: */
0490: public void testBigPutsProgagatesAsynchronous()
0491: throws CacheException, InterruptedException {
0492:
0493: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0494: return;
0495: }
0496:
0497: //Give everything a chance to startup
0498: //Thread.sleep(10000);
0499: StopWatch stopWatch = new StopWatch();
0500: Integer index = null;
0501: for (int i = 0; i < 2; i++) {
0502: for (int j = 0; j < 1000; j++) {
0503: index = new Integer(((1000 * i) + j));
0504: cache1
0505: .put(new Element(
0506: index,
0507: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0508: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0509: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0510: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0511: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
0512: }
0513:
0514: }
0515: long elapsed = stopWatch.getElapsedTime();
0516: long putTime = ((elapsed / 1000));
0517: LOG.info("Put Elapsed time: " + putTime);
0518: //assertTrue(putTime < 8);
0519:
0520: assertEquals(2000, cache1.getSize());
0521:
0522: Thread.sleep(2000);
0523: assertEquals(2000, manager2.getCache("sampleCache1").getSize());
0524: assertEquals(2000, manager3.getCache("sampleCache1").getSize());
0525: assertEquals(2000, manager4.getCache("sampleCache1").getSize());
0526: assertEquals(2000, manager5.getCache("sampleCache1").getSize());
0527:
0528: CountingCacheEventListener.resetCounters();
0529:
0530: }
0531:
0532: /**
0533: * Performance and capacity tests.
0534: * <p/>
0535: */
0536: public void testBootstrap() throws CacheException,
0537: InterruptedException, RemoteException {
0538:
0539: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0540: return;
0541: }
0542:
0543: //load up some data
0544: StopWatch stopWatch = new StopWatch();
0545: Integer index = null;
0546: for (int i = 0; i < 2; i++) {
0547: for (int j = 0; j < 1000; j++) {
0548: index = new Integer(((1000 * i) + j));
0549: cache1
0550: .put(new Element(
0551: index,
0552: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0553: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0554: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0555: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0556: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
0557: }
0558:
0559: }
0560: long elapsed = stopWatch.getElapsedTime();
0561: long putTime = ((elapsed / 1000));
0562: LOG.info("Put Elapsed time: " + putTime);
0563:
0564: assertEquals(2000, cache1.getSize());
0565:
0566: Thread.sleep(7000);
0567: assertEquals(2000, manager2.getCache("sampleCache1").getSize());
0568: assertEquals(2000, manager3.getCache("sampleCache1").getSize());
0569: assertEquals(2000, manager4.getCache("sampleCache1").getSize());
0570: assertEquals(2000, manager5.getCache("sampleCache1").getSize());
0571:
0572: //now test bootstrap
0573: manager1.addCache("bootStrapResults");
0574: Cache cache = manager1.getCache("bootStrapResults");
0575: List cachePeers = manager1.getCacheManagerPeerProvider()
0576: .listRemoteCachePeers(cache1);
0577: CachePeer cachePeer = (CachePeer) cachePeers.get(0);
0578:
0579: List keys = cachePeer.getKeys();
0580: assertEquals(2000, keys.size());
0581:
0582: Element firstElement = cachePeer.getQuiet((Serializable) keys
0583: .get(0));
0584: long size = firstElement.getSerializedSize();
0585: assertEquals(574, size);
0586:
0587: int chunkSize = (int) (5000000 / size);
0588:
0589: List requestChunk = new ArrayList();
0590: for (int i = 0; i < keys.size(); i++) {
0591: Serializable serializable = (Serializable) keys.get(i);
0592: requestChunk.add(serializable);
0593: if (requestChunk.size() == chunkSize) {
0594: fetchAndPutElements(cache, requestChunk, cachePeer);
0595: requestChunk.clear();
0596: }
0597: }
0598: //get leftovers
0599: fetchAndPutElements(cache, requestChunk, cachePeer);
0600:
0601: assertEquals(keys.size(), cache.getSize());
0602:
0603: }
0604:
0605: private void fetchAndPutElements(Ehcache cache, List requestChunk,
0606: CachePeer cachePeer) throws RemoteException {
0607: List receivedChunk = cachePeer.getElements(requestChunk);
0608: for (int i = 0; i < receivedChunk.size(); i++) {
0609: Element element = (Element) receivedChunk.get(i);
0610: assertNotNull(element);
0611: cache.put(element, true);
0612: }
0613:
0614: }
0615:
0616: /**
0617: * Drive everything to point of breakage within a 64MB VM.
0618: */
0619: public void xTestHugePutsBreaksAsynchronous()
0620: throws CacheException, InterruptedException {
0621:
0622: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0623: return;
0624: }
0625:
0626: //Give everything a chance to startup
0627: StopWatch stopWatch = new StopWatch();
0628: Integer index = null;
0629: for (int i = 0; i < 500; i++) {
0630: for (int j = 0; j < 1000; j++) {
0631: index = new Integer(((1000 * i) + j));
0632: cache1
0633: .put(new Element(
0634: index,
0635: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0636: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0637: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0638: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0639: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
0640: }
0641:
0642: }
0643: long elapsed = stopWatch.getElapsedTime();
0644: long putTime = ((elapsed / 1000));
0645: LOG.info("Put Elapsed time: " + putTime);
0646: //assertTrue(putTime < 8);
0647:
0648: assertEquals(100000, cache1.getSize());
0649:
0650: Thread.sleep(100000);
0651: assertEquals(20000, manager2.getCache("sampleCache1").getSize());
0652: assertEquals(20000, manager3.getCache("sampleCache1").getSize());
0653: assertEquals(20000, manager4.getCache("sampleCache1").getSize());
0654: assertEquals(20000, manager5.getCache("sampleCache1").getSize());
0655:
0656: }
0657:
0658: /**
0659: * Performance and capacity tests.
0660: * <p/>
0661: * The numbers given are for the remote peer tester (java -jar ehcache-1.x-remote-debugger.jar ehcache-distributed1.xml)
0662: * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
0663: * it has fully received.
0664: * <p/>
0665: * 4 seconds to get all remove notifications with 6 peers, 5000 Elements and 400 byte payload
0666: */
0667: public void testBigRemovesProgagatesAsynchronous()
0668: throws CacheException, InterruptedException {
0669:
0670: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0671: return;
0672: }
0673:
0674: //Give everything a chance to startup
0675: Integer index = null;
0676: for (int i = 0; i < 5; i++) {
0677: for (int j = 0; j < 1000; j++) {
0678: index = new Integer(((1000 * i) + j));
0679: cache1
0680: .put(new Element(
0681: index,
0682: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0683: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0684: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0685: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0686: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
0687: }
0688:
0689: }
0690: Thread.sleep(8000);
0691: assertEquals(5000, cache1.getSize());
0692: assertEquals(5000, manager2.getCache("sampleCache1").getSize());
0693: assertEquals(5000, manager3.getCache("sampleCache1").getSize());
0694: assertEquals(5000, manager4.getCache("sampleCache1").getSize());
0695: assertEquals(5000, manager5.getCache("sampleCache1").getSize());
0696:
0697: //Let the disk stores catch up before the next stage of the test
0698: Thread.sleep(2000);
0699:
0700: StopWatch stopWatch = new StopWatch();
0701:
0702: for (int i = 0; i < 5; i++) {
0703: for (int j = 0; j < 1000; j++) {
0704: index = new Integer(((1000 * i) + j));
0705: cache1.remove(index);
0706: }
0707: }
0708:
0709: int timeForPropagate = 10000;
0710:
0711: Thread.sleep(timeForPropagate);
0712: assertEquals(0, cache1.getSize());
0713: assertEquals(0, manager2.getCache("sampleCache1").getSize());
0714: assertEquals(0, manager3.getCache("sampleCache1").getSize());
0715: assertEquals(0, manager4.getCache("sampleCache1").getSize());
0716: assertEquals(0, manager5.getCache("sampleCache1").getSize());
0717:
0718: LOG.info("Remove Elapsed time: " + timeForPropagate);
0719:
0720: }
0721:
0722: /**
0723: * Performance and capacity tests.
0724: * <p/>
0725: * 5 seconds to send all notifications synchronously with 5 peers, 2000 Elements and 400 byte payload
0726: * The numbers given below are for the remote peer tester (java -jar ehcache-1.x-remote-debugger.jar ehcache-distributed1.xml)
0727: * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
0728: * it has fully received.
0729: */
0730: public void testBigPutsProgagatesSynchronous()
0731: throws CacheException, InterruptedException {
0732:
0733: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0734: return;
0735: }
0736:
0737: //Give everything a chance to startup
0738: StopWatch stopWatch = new StopWatch();
0739: Integer index;
0740: for (int i = 0; i < 2; i++) {
0741: for (int j = 0; j < 1000; j++) {
0742: index = new Integer(((1000 * i) + j));
0743: manager1
0744: .getCache("sampleCache3")
0745: .put(
0746: new Element(
0747: index,
0748: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0749: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0750: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0751: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0752: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
0753: }
0754:
0755: }
0756: long elapsed = stopWatch.getElapsedTime();
0757: long putTime = ((elapsed / 1000));
0758: LOG.info("Put and Propagate Synchronously Elapsed time: "
0759: + putTime + " seconds");
0760:
0761: assertEquals(2000, manager1.getCache("sampleCache3").getSize());
0762: assertEquals(2000, manager2.getCache("sampleCache3").getSize());
0763: assertEquals(2000, manager3.getCache("sampleCache3").getSize());
0764: assertEquals(2000, manager4.getCache("sampleCache3").getSize());
0765: assertEquals(2000, manager5.getCache("sampleCache3").getSize());
0766:
0767: }
0768:
0769: /**
0770: * manager1 adds a replicating cache, then manager2 and so on. Then we remove one. Does everything work as expected?
0771: */
0772: public void testPutWithNewCacheAddedProgressively()
0773: throws InterruptedException {
0774:
0775: manager1.addCache("progressiveAddCache");
0776: manager2.addCache("progressiveAddCache");
0777:
0778: //The cluster will not have formed yet, so it will fail
0779: try {
0780: putTest(manager1.getCache("progressiveAddCache"), manager2
0781: .getCache("progressiveAddCache"), ASYNCHRONOUS);
0782: fail();
0783: } catch (AssertionFailedError e) {
0784: //expected
0785: }
0786:
0787: //The cluster will now have formed yet, so it will succeed
0788: putTest(manager1.getCache("progressiveAddCache"), manager2
0789: .getCache("progressiveAddCache"), ASYNCHRONOUS);
0790:
0791: Cache secondCache = manager2.getCache("progressiveAddCache");
0792:
0793: //The second peer disappears. The test will fail.
0794: manager2.removeCache("progressiveAddCache");
0795: try {
0796: putTest(manager1.getCache("progressiveAddCache"),
0797: secondCache, ASYNCHRONOUS);
0798: fail();
0799: } catch (IllegalStateException e) {
0800: //The second cache will not alive. Expected. But no other exception is caught and this will otherwise fail.
0801:
0802: }
0803:
0804: }
0805:
0806: /**
0807: * Test various cache configurations for cache1 - explicit setting of:
0808: * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
0809: */
0810: public void testPutWithExplicitReplicationConfig()
0811: throws InterruptedException {
0812: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0813: return;
0814: }
0815: putTest(manager1.getCache("sampleCache1"), manager2
0816: .getCache("sampleCache1"), ASYNCHRONOUS);
0817: }
0818:
0819: /**
0820: * Test various cache configurations for cache1 - explicit setting of:
0821: * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
0822: */
0823: public void testPutWithThreadKiller() throws InterruptedException {
0824: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0825: return;
0826: }
0827: putTestWithThreadKiller(manager1.getCache("sampleCache1"),
0828: manager2.getCache("sampleCache1"), ASYNCHRONOUS);
0829: }
0830:
0831: /**
0832: * CacheEventListeners that are not CacheReplicators should receive cache events originated from receipt
0833: * of a remote event by a CachePeer.
0834: */
0835: public void testRemotelyReceivedPutNotifiesCountingListener()
0836: throws InterruptedException {
0837: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0838: return;
0839: }
0840: putTest(manager1.getCache("sampleCache1"), manager2
0841: .getCache("sampleCache1"), ASYNCHRONOUS);
0842: assertEquals(1, CountingCacheEventListener.getCacheElementsPut(
0843: manager1.getCache("sampleCache1")).size());
0844: assertEquals(1, CountingCacheEventListener.getCacheElementsPut(
0845: manager2.getCache("sampleCache1")).size());
0846:
0847: }
0848:
0849: /**
0850: * Test various cache configurations for cache1 - explicit setting of:
0851: * properties="replicateAsynchronously=false, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
0852: */
0853: public void testPutWithExplicitReplicationSynchronousConfig()
0854: throws InterruptedException {
0855: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0856: return;
0857: }
0858: putTest(manager1.getCache("sampleCache3"), manager2
0859: .getCache("sampleCache3"), SYNCHRONOUS);
0860: }
0861:
0862: /**
0863: * Test put replicated for cache4 - no properties.
0864: * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
0865: */
0866: public void testPutWithEmptyReplicationPropertiesConfig()
0867: throws InterruptedException {
0868: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0869: return;
0870: }
0871: putTest(manager1.getCache("sampleCache4"), manager2
0872: .getCache("sampleCache4"), ASYNCHRONOUS);
0873: }
0874:
0875: /**
0876: * Test put replicated for cache4 - missing replicatePuts property.
0877: * replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
0878: * should equal replicateAsynchronously=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
0879: */
0880: public void testPutWithOneMissingReplicationPropertyConfig()
0881: throws InterruptedException {
0882: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0883: return;
0884: }
0885: putTest(manager1.getCache("sampleCache5"), manager2
0886: .getCache("sampleCache5"), ASYNCHRONOUS);
0887: }
0888:
0889: /**
0890: * Tests put and remove initiated from cache1 in a cluster
0891: * <p/>
0892: * This test goes into an infinite loop if the chain of notifications is not somehow broken.
0893: */
0894: public void putTest(Ehcache fromCache, Ehcache toCache,
0895: boolean asynchronous) throws CacheException,
0896: InterruptedException {
0897:
0898: Serializable key = new Date();
0899: Serializable value = new Date();
0900: Element sourceElement = new Element(key, value);
0901:
0902: //Put
0903: fromCache.put(sourceElement);
0904: int i = 0;
0905:
0906: if (asynchronous) {
0907: waitForProgagate();
0908: }
0909:
0910: //Should have been replicated to toCache.
0911: Element deliveredElement = toCache.get(key);
0912: assertEquals(sourceElement, deliveredElement);
0913:
0914: }
0915:
0916: /**
0917: * Tests put and remove initiated from cache1 in a cluster
0918: * <p/>
0919: * This test goes into an infinite loop if the chain of notifications is not somehow broken.
0920: */
0921: public void putTestWithThreadKiller(Ehcache fromCache,
0922: Ehcache toCache, boolean asynchronous)
0923: throws CacheException, InterruptedException {
0924:
0925: fromCache.put(new Element("thread killer", new ThreadKiller()));
0926: if (asynchronous) {
0927: waitForProgagate();
0928: }
0929:
0930: Serializable key = new Date();
0931: Serializable value = new Date();
0932: Element sourceElement = new Element(key, value);
0933:
0934: //Put
0935: fromCache.put(sourceElement);
0936:
0937: if (asynchronous) {
0938: waitForProgagate();
0939: }
0940:
0941: //Should have been replicated to toCache.
0942: Element deliveredElement = toCache.get(key);
0943: assertEquals(sourceElement, deliveredElement);
0944:
0945: }
0946:
0947: /**
0948: * Checks that a put received from a remote cache notifies any registered listeners.
0949: * <p/>
0950: * This test goes into an infinite loop if the chain of notifications is not somehow broken.
0951: */
0952: public void testRemotePutNotificationGetsToOtherListeners()
0953: throws CacheException, InterruptedException {
0954:
0955: if (JVMUtil.isSingleRMIRegistryPerVM()) {
0956: return;
0957: }
0958:
0959: Serializable key = new Date();
0960: Serializable value = new Date();
0961: Element element1 = new Element(key, value);
0962:
0963: //Put
0964: cache1.put(new Element("1", new Date()));
0965: cache1.put(new Element("2", new Date()));
0966: cache1.put(new Element("3", new Date()));
0967:
0968: //Nonserializable and non deliverable put
0969: Object nonSerializableObject = new Object();
0970: cache1.put(new Element(nonSerializableObject, new Object()));
0971:
0972: waitForProgagate();
0973:
0974: //local initiating cache's counting listener should have been notified
0975: assertEquals(4, CountingCacheEventListener.getCacheElementsPut(
0976: cache1).size());
0977: //remote receiving caches' counting listener should have been notified
0978: assertEquals(3, CountingCacheEventListener.getCacheElementsPut(
0979: cache2).size());
0980:
0981: //Update
0982: cache1.put(new Element("1", new Date()));
0983: cache1.put(new Element("2", new Date()));
0984: cache1.put(new Element("3", new Date()));
0985:
0986: //Nonserializable and non deliverable put
0987: cache1.put(new Element(nonSerializableObject, new Object()));
0988:
0989: waitForProgagate();
0990:
0991: //local initiating cache's counting listener should have been notified
0992: assertEquals(4, CountingCacheEventListener
0993: .getCacheElementsUpdated(cache1).size());
0994: //remote receiving caches' counting listener should have been notified
0995: assertEquals(3, CountingCacheEventListener
0996: .getCacheElementsUpdated(cache2).size());
0997:
0998: //Remove
0999: cache1.remove("1");
1000: cache1.remove("2");
1001: cache1.remove("3");
1002: cache1.remove(nonSerializableObject);
1003:
1004: waitForProgagate();
1005:
1006: //local initiating cache's counting listener should have been notified
1007: assertEquals(4, CountingCacheEventListener
1008: .getCacheElementsRemoved(cache1).size());
1009: //remote receiving caches' counting listener should have been notified
1010: assertEquals(3, CountingCacheEventListener
1011: .getCacheElementsRemoved(cache2).size());
1012:
1013: }
1014:
1015: /**
1016: * Test various cache configurations for cache1 - explicit setting of:
1017: * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
1018: */
1019: public void testRemoveWithExplicitReplicationConfig()
1020: throws InterruptedException {
1021: if (JVMUtil.isSingleRMIRegistryPerVM()) {
1022: return;
1023: }
1024: removeTest(manager1.getCache("sampleCache1"), manager2
1025: .getCache("sampleCache1"), ASYNCHRONOUS);
1026: }
1027:
1028: /**
1029: * Test various cache configurations for cache1 - explicit setting of:
1030: * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
1031: */
1032: public void testRemoveWithExplicitReplicationSynchronousConfig()
1033: throws InterruptedException {
1034: if (JVMUtil.isSingleRMIRegistryPerVM()) {
1035: return;
1036: }
1037: removeTest(manager1.getCache("sampleCache3"), manager2
1038: .getCache("sampleCache3"), SYNCHRONOUS);
1039: }
1040:
1041: /**
1042: * Test put replicated for cache4 - no properties.
1043: * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
1044: */
1045: public void testRemoveWithEmptyReplicationPropertiesConfig()
1046: throws InterruptedException {
1047: if (JVMUtil.isSingleRMIRegistryPerVM()) {
1048: return;
1049: }
1050: removeTest(manager1.getCache("sampleCache4"), manager2
1051: .getCache("sampleCache4"), ASYNCHRONOUS);
1052: }
1053:
1054: /**
1055: * Tests put and remove initiated from a cache to another cache in a cluster
1056: * <p/>
1057: * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1058: */
1059: public void removeTest(Ehcache fromCache, Ehcache toCache,
1060: boolean asynchronous) throws CacheException,
1061: InterruptedException {
1062:
1063: Serializable key = new Date();
1064: Serializable value = new Date();
1065: Element element1 = new Element(key, value);
1066:
1067: //Put
1068: fromCache.put(element1);
1069:
1070: if (asynchronous) {
1071: waitForProgagate();
1072: }
1073:
1074: //Should have been replicated to cache2.
1075: Element element2 = toCache.get(key);
1076: assertEquals(element1, element2);
1077:
1078: //Remove
1079: fromCache.remove(key);
1080: if (asynchronous) {
1081: waitForProgagate();
1082: }
1083:
1084: //Should have been replicated to cache2.
1085: element2 = toCache.get(key);
1086: assertNull(element2);
1087:
1088: }
1089:
1090: /**
1091: * test removeAll sync
1092: */
1093: public void testRemoveAllAsynchronous() throws Exception {
1094: if (JVMUtil.isSingleRMIRegistryPerVM()) {
1095: return;
1096: }
1097: removeAllTest(manager1.getCache("sampleCache1"), manager2
1098: .getCache("sampleCache1"), ASYNCHRONOUS);
1099: }
1100:
1101: /**
1102: * test removeAll async
1103: */
1104: public void testRemoveAllSynchronous() throws Exception {
1105: if (JVMUtil.isSingleRMIRegistryPerVM()) {
1106: return;
1107: }
1108: removeAllTest(manager1.getCache("sampleCache3"), manager2
1109: .getCache("sampleCache3"), SYNCHRONOUS);
1110: }
1111:
1112: /**
1113: * Tests removeAll initiated from a cache to another cache in a cluster
1114: * <p/>
1115: * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1116: */
1117: public void removeAllTest(Ehcache fromCache, Ehcache toCache,
1118: boolean asynchronous) throws Exception {
1119:
1120: //removeAll is distributed. Stop it colliding with the rest of the test
1121: waitForProgagate();
1122:
1123: Serializable key = new Date();
1124: Serializable value = new Date();
1125: Element element1 = new Element(key, value);
1126:
1127: //Put
1128: fromCache.put(element1);
1129:
1130: if (asynchronous) {
1131: waitForProgagate();
1132: }
1133:
1134: //Should have been replicated to cache2.
1135: Element element2 = toCache.get(key);
1136: assertEquals(element1, element2);
1137:
1138: //Remove
1139: fromCache.removeAll();
1140: if (asynchronous) {
1141: waitForProgagate();
1142: }
1143:
1144: //Should have been replicated to cache2.
1145: element2 = toCache.get(key);
1146: assertNull(element2);
1147: assertEquals(0, toCache.getSize());
1148:
1149: }
1150:
1151: /**
1152: * Test various cache configurations for cache1 - explicit setting of:
1153: * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
1154: */
1155: public void testUpdateWithExplicitReplicationConfig()
1156: throws Exception {
1157: if (JVMUtil.isSingleRMIRegistryPerVM()) {
1158: return;
1159: }
1160: updateViaCopyTest(manager1.getCache("sampleCache1"), manager2
1161: .getCache("sampleCache1"), ASYNCHRONOUS);
1162: }
1163:
1164: /**
1165: * Test various cache configurations for cache1 - explicit setting of:
1166: * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
1167: */
1168: public void testUpdateWithExplicitReplicationSynchronousConfig()
1169: throws Exception {
1170: if (JVMUtil.isSingleRMIRegistryPerVM()) {
1171: return;
1172: }
1173: updateViaCopyTest(manager1.getCache("sampleCache3"), manager2
1174: .getCache("sampleCache3"), SYNCHRONOUS);
1175: }
1176:
1177: /**
1178: * Test put replicated for cache4 - no properties.
1179: * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
1180: */
1181: public void testUpdateWithEmptyReplicationPropertiesConfig()
1182: throws Exception {
1183: if (JVMUtil.isSingleRMIRegistryPerVM()) {
1184: return;
1185: }
1186: updateViaCopyTest(manager1.getCache("sampleCache4"), manager2
1187: .getCache("sampleCache4"), ASYNCHRONOUS);
1188: }
1189:
1190: /**
1191: * Tests put and update through copy initiated from cache1 in a cluster
1192: * <p/>
1193: * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1194: */
1195: public void updateViaCopyTest(Ehcache fromCache, Ehcache toCache,
1196: boolean asynchronous) throws Exception {
1197:
1198: fromCache.removeAll();
1199: toCache.removeAll();
1200:
1201: //removeAll is distributed. Stop it colliding with the rest of the test
1202: waitForProgagate();
1203:
1204: Serializable key = new Date();
1205: Serializable value = new Date();
1206: Element element1 = new Element(key, value);
1207:
1208: //Put
1209: fromCache.put(element1);
1210: if (asynchronous) {
1211: waitForProgagate();
1212: }
1213:
1214: //Should have been replicated to cache2.
1215: Element element2 = toCache.get(key);
1216: assertEquals(element1, element2);
1217:
1218: //Update
1219: Element updatedElement1 = new Element(key, new Date());
1220:
1221: fromCache.put(updatedElement1);
1222: if (asynchronous) {
1223: waitForProgagate();
1224: }
1225:
1226: //Should have been replicated to cache2.
1227: Element receivedUpdatedElement2 = toCache.get(key);
1228: assertEquals(updatedElement1, receivedUpdatedElement2);
1229:
1230: }
1231:
1232: /**
1233: * Tests put and update through invalidation initiated from cache1 in a cluster
1234: * <p/>
1235: * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1236: */
1237: public void testUpdateViaInvalidate() throws CacheException,
1238: InterruptedException, IOException {
1239:
1240: if (JVMUtil.isSingleRMIRegistryPerVM()) {
1241: return;
1242: }
1243:
1244: cache1 = manager1.getCache("sampleCache2");
1245: cache1.removeAll();
1246:
1247: cache2 = manager2.getCache("sampleCache2");
1248: cache2.removeAll();
1249:
1250: //removeAll is distributed. Stop it colliding with the rest of the test
1251: waitForProgagate();
1252:
1253: String key = "1";
1254: Serializable value = new Date();
1255: Element element1 = new Element(key, value);
1256:
1257: //Put
1258: cache1.put(element1);
1259: waitForProgagate();
1260:
1261: //Should have been replicated to cache2.
1262: Element element2 = cache2.get(key);
1263: assertEquals(element1, element2);
1264:
1265: //Update
1266: cache1.put(element1);
1267: waitForProgagate();
1268: waitForProgagate();
1269: waitForProgagate();
1270:
1271: //Should have been removed in cache2.
1272: element2 = cache2.get(key);
1273: assertNull(element2);
1274:
1275: }
1276:
1277: /**
1278: * What happens when two cache instances replicate to each other and a change is initiated
1279: */
1280: public void testInfiniteNotificationsLoop()
1281: throws InterruptedException {
1282:
1283: if (JVMUtil.isSingleRMIRegistryPerVM()) {
1284: return;
1285: }
1286:
1287: Serializable key = "1";
1288: Serializable value = new Date();
1289: Element element = new Element(key, value);
1290:
1291: //Put
1292: cache1.put(element);
1293: waitForProgagate();
1294:
1295: //Should have been replicated to cache2.
1296: Element element2 = cache2.get(key);
1297: assertEquals(element, element2);
1298:
1299: //Remove
1300: cache1.remove(key);
1301: assertNull(cache1.get(key));
1302:
1303: //Should have been replicated to cache2.
1304: waitForProgagate();
1305: element2 = cache2.get(key);
1306: assertNull(element2);
1307:
1308: //Put into 2
1309: Element element3 = new Element("3", "ddsfds");
1310: cache2.put(element3);
1311: waitForProgagate();
1312: Element element4 = cache2.get("3");
1313: assertEquals(element3, element4);
1314:
1315: }
1316:
1317: /**
1318: * Shows result of perf problem and fix in flushReplicationQueue
1319: * <p/>
1320: * Behaviour before change:
1321: * <p/>
1322: * INFO: Items written: 10381
1323: * Oct 29, 2007 11:40:04 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1324: * INFO: Items written: 29712
1325: * Oct 29, 2007 11:40:57 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1326: * INFO: Items written: 1
1327: * Oct 29, 2007 11:40:58 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1328: * INFO: Items written: 32354
1329: * Oct 29, 2007 11:42:34 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1330: * INFO: Items written: 322
1331: * Oct 29, 2007 11:42:35 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1332: * INFO: Items written: 41909
1333: * <p/>
1334: * Behaviour after change:
1335: * INFO: Items written: 26356
1336: * Oct 29, 2007 11:44:39 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1337: * INFO: Items written: 33656
1338: * Oct 29, 2007 11:44:40 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1339: * INFO: Items written: 32234
1340: * Oct 29, 2007 11:44:42 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1341: * INFO: Items written: 38677
1342: * Oct 29, 2007 11:44:43 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1343: * INFO: Items written: 43418
1344: * Oct 29, 2007 11:44:44 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1345: * INFO: Items written: 31277
1346: * Oct 29, 2007 11:44:45 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1347: * INFO: Items written: 27769
1348: * Oct 29, 2007 11:44:46 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1349: * INFO: Items written: 29596
1350: * Oct 29, 2007 11:44:47 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1351: * INFO: Items written: 17142
1352: * Oct 29, 2007 11:44:48 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1353: * INFO: Items written: 14775
1354: * Oct 29, 2007 11:44:49 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1355: * INFO: Items written: 4088
1356: * Oct 29, 2007 11:44:51 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1357: * INFO: Items written: 5492
1358: * Oct 29, 2007 11:44:52 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1359: * INFO: Items written: 10188
1360: *
1361: * Also no pauses noted.
1362: */
1363: public void testReplicatePerf() throws InterruptedException {
1364:
1365: if (manager2 != null) {
1366: manager2.shutdown();
1367: }
1368: if (manager3 != null) {
1369: manager3.shutdown();
1370: }
1371: if (manager4 != null) {
1372: manager4.shutdown();
1373: }
1374: if (manager5 != null) {
1375: manager5.shutdown();
1376: }
1377: if (manager6 != null) {
1378: manager6.shutdown();
1379: }
1380:
1381: //wait for cluster to drop back to just one: manager1
1382: waitForProgagate();
1383: waitForProgagate();
1384:
1385: long start = System.currentTimeMillis();
1386: final String keyBase = Long.toString(start);
1387: int count = 0;
1388:
1389: for (int i = 0; i < 100000; i++) {
1390: final String key = keyBase + ':'
1391: + Integer.toString((int) (Math.random() * 1000.0));
1392: cache1.put(new Element(key, "My Test"));
1393: cache1.get(key);
1394: cache1.remove(key);
1395: count++;
1396:
1397: final long end = System.currentTimeMillis();
1398: if (end - start >= 1000) {
1399: start = end;
1400: LOG.info("Items written: " + count);
1401: //make sure it does not choke
1402: assertTrue(count > 1000);
1403: count = 0;
1404: }
1405: }
1406: }
1407:
1408: /**
1409: * Need to wait for async
1410: *
1411: * @throws InterruptedException
1412: */
1413: protected void waitForProgagate() throws InterruptedException {
1414: Thread.sleep(2000);
1415: }
1416:
1417: /**
1418: * Need to wait for async
1419: *
1420: * @throws InterruptedException
1421: */
1422: protected void waitForSlowProgagate() throws InterruptedException {
1423: Thread.sleep(6000);
1424: }
1425:
1426: /**
1427: * Distributed operations create extra scope for deadlock.
1428: * This test checks whether a distributed deadlock scenario exists for synchronous replication
1429: * of each distributed operation all at once.
1430: * It shows that no distributed deadlock exists for asynchronous replication. It is multi thread
1431: * and multi process safe.
1432: * <p/>
1433: * Carefully tailored to exercise:
1434: * <ol>
1435: * <li>overflow to disk. We put in 20 things and the memory size is 10
1436: * <li>each peer is working on the same set of keys thus maximising contention
1437: * <li>we do puts, gets and removes to explore all the execution paths
1438: * </ol>
1439: * If a deadlock occurs, processing will stop until a SocketTimeout exception is thrown and
1440: * the deadlock will be released.
1441: */
1442: public void testCacheOperationsSynchronousMultiThreaded()
1443: throws Exception, InterruptedException {
1444:
1445: if (JVMUtil.isSingleRMIRegistryPerVM()) {
1446: return;
1447: }
1448:
1449: // Run a set of threads, that attempt to fetch the elements
1450: final List executables = new ArrayList();
1451:
1452: executables
1453: .add(new ClusterExecutable(manager1, "sampleCache3"));
1454: executables
1455: .add(new ClusterExecutable(manager2, "sampleCache3"));
1456: executables
1457: .add(new ClusterExecutable(manager3, "sampleCache3"));
1458:
1459: runThreads(executables);
1460: }
1461:
1462: /**
1463: * Distributed operations create extra scope for deadlock.
1464: * This test checks whether a distributed deadlock scenario exists for asynchronous replication
1465: * of each distributed operation all at once.
1466: * It shows that no distributed deadlock exists for asynchronous replication. It is multi thread
1467: * and multi process safe.
1468: * It uses sampleCache2, which is configured to be asynchronous
1469: * <p/>
1470: * Carefully tailored to exercise:
1471: * <ol>
1472: * <li>overflow to disk. We put in 20 things and the memory size is 10
1473: * <li>each peer is working on the same set of keys thus maximising contention
1474: * <li>we do puts, gets and removes to explore all the execution paths
1475: * </ol>
1476: */
1477: public void testCacheOperationsAynchronousMultiThreaded()
1478: throws Exception, InterruptedException {
1479:
1480: if (JVMUtil.isSingleRMIRegistryPerVM()) {
1481: return;
1482: }
1483:
1484: // Run a set of threads, that attempt to fetch the elements
1485: final List executables = new ArrayList();
1486:
1487: executables
1488: .add(new ClusterExecutable(manager1, "sampleCache2"));
1489: executables
1490: .add(new ClusterExecutable(manager2, "sampleCache2"));
1491: executables
1492: .add(new ClusterExecutable(manager3, "sampleCache2"));
1493:
1494: runThreads(executables);
1495: }
1496:
1497: /**
1498: * An Exececutable which allows the CacheManager to be set
1499: */
1500: class ClusterExecutable implements Executable {
1501:
1502: private CacheManager manager;
1503: private String cacheName;
1504:
1505: /**
1506: * Construct with CacheManager
1507: *
1508: * @param manager
1509: */
1510: public ClusterExecutable(CacheManager manager, String cacheName) {
1511: this .manager = manager;
1512: this .cacheName = cacheName;
1513: }
1514:
1515: /**
1516: * Execute
1517: *
1518: * @throws Exception
1519: */
1520: public void execute() throws Exception {
1521: Random random = new Random();
1522:
1523: for (int i = 0; i < 20; i++) {
1524: Integer key = new Integer((i));
1525: int operationSelector = random.nextInt(4);
1526: Cache cache = manager.getCache(cacheName);
1527: if (operationSelector == 100) {
1528: cache.get(key);
1529: if (LOG.isDebugEnabled()) {
1530: LOG.debug(cache.getGuid() + ": get " + key);
1531: }
1532: } else if (operationSelector == 100) {
1533: cache.remove(key);
1534: if (LOG.isDebugEnabled()) {
1535: LOG.debug(cache.getGuid() + ": remove " + key);
1536: }
1537: } else if (operationSelector == 2) {
1538: cache
1539: .put(new Element(
1540: key,
1541: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1542: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1543: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1544: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1545: + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
1546: if (LOG.isDebugEnabled()) {
1547: LOG.debug(cache.getGuid() + ": put " + key);
1548: }
1549: } else {
1550: //every twelfth time 1/4 * 1/3 = 1/12
1551: if (random.nextInt(3) == 1) {
1552: LOG.debug("cache.removeAll()");
1553: cache.removeAll();
1554: }
1555: }
1556: }
1557:
1558: }
1559: }
1560:
1561: }
|