001: /*
002: * Copyright (c) 2002-2003 by OpenSymphony
003: * All rights reserved.
004: */
005: package com.opensymphony.oscache.base;
006:
007: import com.opensymphony.oscache.general.GeneralCacheAdministrator;
008:
009: import junit.framework.Test;
010: import junit.framework.TestCase;
011: import junit.framework.TestSuite;
012:
013: import org.apache.commons.logging.Log;
014: import org.apache.commons.logging.LogFactory;
015:
016: import java.util.BitSet;
017: import java.util.Properties;
018:
019: /**
020: * Test the Cache class for any concurrency problems
021: *
022: * $Id: TestConcurrency.java 404 2007-02-24 10:21:00Z larst $
023: * @version $Revision: 404 $
024: * @author <a href="mailto:chris@chris.com">Chris Miller</a>
025: */
026: public class TestConcurrency extends TestCase {
027: private static transient final Log log = LogFactory
028: .getLog(GeneralCacheAdministrator.class); //TestConcurrency.class
029:
030: // Static variables required thru all the tests
031: private static GeneralCacheAdministrator admin = null;
032:
033: // Constants needed in the tests
034: private final String KEY = "key";
035: private final String VALUE = "This is some content";
036: private final int ITERATION_COUNT = 5; //500;
037: private final int THREAD_COUNT = 6; //600;
038: private final int UNIQUE_KEYS = 1013;
039:
040: /**
041: * Class constructor.
042: * <p>
043: * @param str The test name (required by JUnit)
044: */
045: public TestConcurrency(String str) {
046: super (str);
047: }
048:
049: /**
050: * This method is invoked before each testXXXX methods of the
051: * class. It set ups the variables required for each tests.
052: */
053: public void setUp() {
054: // At first invocation, create a new Cache
055: if (admin == null) {
056: Properties config = new Properties();
057: config
058: .setProperty(
059: AbstractCacheAdministrator.CACHE_CAPACITY_KEY,
060: "70");
061: config.setProperty(
062: AbstractCacheAdministrator.CACHE_BLOCKING_KEY,
063: "false");
064: admin = new GeneralCacheAdministrator();
065: assertNotNull(admin);
066: }
067: }
068:
069: /**
070: * This methods returns the name of this test class to JUnit
071: * <p>
072: * @return The name of this class
073: */
074: public static Test suite() {
075: return new TestSuite(TestConcurrency.class);
076: }
077:
078: /**
079: * Check that the cache handles simultaneous attempts to access a
080: * new cache entry correctly
081: */
082: public void testNewEntry() {
083: String key = "new";
084:
085: try {
086: admin.getFromCache(key, -1);
087: fail("NeedsRefreshException should have been thrown");
088: } catch (NeedsRefreshException nre) {
089: // Fire off another couple of threads to get the same cache entry
090: GetEntry getEntry = new GetEntry(key, VALUE, -1, false);
091: Thread thread = new Thread(getEntry);
092: thread.start();
093: getEntry = new GetEntry(key, VALUE, -1, false);
094: thread = new Thread(getEntry);
095: thread.start();
096:
097: // OK, those threads should now be blocked waiting for the new cache
098: // entry to appear. Sleep for a bit to simulate the time taken to
099: // build the cache entry
100: try {
101: Thread.sleep(500);
102: } catch (InterruptedException ie) {
103: }
104:
105: // Putting the entry in the cache should unblock the previous threads
106: admin.putInCache(key, VALUE);
107: }
108: }
109:
110: /**
111: * Check that the cache handles simultaneous attempts to access a
112: * new cache entry correctly
113: */
114: public void testNewEntryCancel() {
115: String key = "newCancel";
116: String NEW_VALUE = VALUE + "...";
117:
118: try {
119: admin.getFromCache(key, -1);
120: fail("NeedsRefreshException should have been thrown");
121: } catch (NeedsRefreshException nre) {
122: // Fire off another thread to get the same cache entry
123: GetEntry getEntry = new GetEntry(key, NEW_VALUE, -1, true);
124: Thread thread = new Thread(getEntry);
125: thread.start();
126:
127: // The above thread will be blocked waiting for the new content
128: try {
129: Thread.sleep(500);
130: } catch (InterruptedException ie) {
131: }
132:
133: // Now cancel the update (eg because an exception occurred while building the content).
134: // This will unblock the other thread and it will receive a NeedsRefreshException.
135: admin.cancelUpdate(key);
136:
137: // Wait a bit for the other thread to update the cache
138: try {
139: Thread.sleep(500);
140: } catch (InterruptedException ie) {
141: }
142:
143: try {
144: Object newValue = admin.getFromCache(key, -1);
145: assertEquals(NEW_VALUE, newValue);
146: } catch (NeedsRefreshException e) {
147: admin.cancelUpdate(key);
148: fail("A NeedsRefreshException should not have been thrown");
149: }
150: }
151: }
152:
153: /**
154: * Verify that we can concurrently access the cache without problems
155: */
156: public void testPut() {
157: Thread[] thread = new Thread[THREAD_COUNT];
158:
159: for (int idx = 0; idx < THREAD_COUNT; idx++) {
160: OSGeneralTest runner = new OSGeneralTest();
161: thread[idx] = new Thread(runner);
162: thread[idx].start();
163: }
164:
165: boolean stillAlive;
166:
167: do {
168: try {
169: Thread.sleep(100);
170: } catch (InterruptedException e) {
171: // do nothing
172: }
173:
174: stillAlive = false;
175:
176: int i = 0;
177:
178: while ((i < thread.length) && !stillAlive) {
179: stillAlive |= thread[i++].isAlive();
180: }
181: } while (stillAlive);
182: }
183:
184: /**
185: * Check that the cache handles simultaneous attempts to access a
186: * stale cache entry correctly
187: */
188: public void testStaleEntry() {
189: String key = "stale";
190: assertFalse(
191: "The cache should not be in blocking mode for this test.",
192: admin.isBlocking());
193:
194: admin.putInCache(key, VALUE);
195:
196: try {
197: // This should throw a NeedsRefreshException since the refresh
198: // period is 0
199: admin.getFromCache(key, 0);
200: fail("NeedsRefreshException should have been thrown");
201: } catch (NeedsRefreshException nre) {
202: // Fire off another thread to get the same cache entry.
203: // Since blocking mode is currently disabled we should
204: // immediately get back the stale entry
205: GetEntry getEntry = new GetEntry(key, VALUE, 0, false);
206: Thread thread = new Thread(getEntry);
207: thread.start();
208:
209: // Sleep for a bit to simulate the time taken to build the cache entry
210: try {
211: Thread.sleep(200);
212: } catch (InterruptedException ie) {
213: }
214:
215: // Putting the entry in the cache should mean that threads now retrieve
216: // the updated entry
217: String newValue = "New value";
218: admin.putInCache(key, newValue);
219:
220: getEntry = new GetEntry(key, newValue, -1, false);
221: thread = new Thread(getEntry);
222: thread.start();
223:
224: try {
225: Object fromCache = admin.getFromCache(key, -1);
226: assertEquals(newValue, fromCache);
227: } catch (NeedsRefreshException e) {
228: admin.cancelUpdate(key);
229: fail("Should not have received a NeedsRefreshException");
230: }
231:
232: // Give the GetEntry thread a chance to finish
233: try {
234: Thread.sleep(200);
235: } catch (InterruptedException ie) {
236: }
237: }
238: }
239:
240: /**
241: * A test for the updating of a stale entry when CACHE.BLOCKING = TRUE
242: */
243: public void testStaleEntryBlocking() {
244: // A test for the case where oscache.blocking = true
245: admin.destroy();
246:
247: Properties p = new Properties();
248: p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY,
249: "true");
250: admin = new GeneralCacheAdministrator(p);
251:
252: assertTrue(
253: "The cache should be in blocking mode for this test.",
254: admin.isBlocking());
255:
256: // Use a unique key in case these test entries are being persisted
257: String key = "blocking";
258: String NEW_VALUE = VALUE + " abc";
259: admin.putInCache(key, VALUE);
260:
261: try {
262: // Force a NeedsRefreshException
263: admin.getFromCache(key, 0);
264: fail("NeedsRefreshException should have been thrown");
265: } catch (NeedsRefreshException nre) {
266: // Fire off another thread to get the same cache entry.
267: // Since blocking mode is enabled this thread should block
268: // until the entry has been updated.
269: GetEntry getEntry = new GetEntry(key, NEW_VALUE, 0, false);
270: Thread thread = new Thread(getEntry);
271: thread.start();
272:
273: // Sleep for a bit to simulate the time taken to build the cache entry
274: try {
275: Thread.sleep(200);
276: } catch (InterruptedException ie) {
277: }
278:
279: // Putting the entry in the cache should mean that threads now retrieve
280: // the updated entry
281: admin.putInCache(key, NEW_VALUE);
282:
283: getEntry = new GetEntry(key, NEW_VALUE, -1, false);
284: thread = new Thread(getEntry);
285: thread.start();
286:
287: try {
288: Object fromCache = admin.getFromCache(key, -1);
289: assertEquals(NEW_VALUE, fromCache);
290: } catch (NeedsRefreshException e) {
291: admin.cancelUpdate(key);
292: fail("Should not have received a NeedsRefreshException");
293: }
294: }
295: }
296:
297: /**
298: * Checks whether the cache handles simultaneous attempts to access a
299: * stable cache entry correctly when the blocking mode is enabled.
300: *
301: * Basically N threads are concurrently trying to access a same stale cache entry and each is cancelling its update. Each thread repeat this operation M times.
302: * The test is sucessfull if after some time, all threads are properly released
303: */
304: public void testConcurrentStaleGets() {
305: GeneralCacheAdministrator staticAdmin = admin;
306: admin = new GeneralCacheAdministrator(); //avoid poluting other test cases
307:
308: try {
309: // A test for the case where oscache.blocking = true
310: //admin.destroy();
311: Properties p = new Properties();
312: p.setProperty(
313: AbstractCacheAdministrator.CACHE_BLOCKING_KEY,
314: "true");
315: admin = new GeneralCacheAdministrator(p);
316:
317: assertTrue(
318: "The cache should be in blocking mode for this test.",
319: admin.isBlocking());
320:
321: int nbThreads = 50;
322: int retryByThreads = 10000;
323:
324: String key = "new";
325:
326: //First put a value
327: admin.putInCache(key, VALUE);
328:
329: try {
330: //Then test without concurrency that it is reported as stale when time-to-live is zero
331: admin.getFromCache(key, 0);
332: fail("NeedsRefreshException should have been thrown");
333: } catch (NeedsRefreshException nre) {
334: //Ok this is was is excpected, we can release the update
335: admin.cancelUpdate(key);
336: }
337:
338: //Then ask N threads to concurrently try to access this stale resource and each should receive a NeedsRefreshException, and cancel the update
339: Thread[] spawnedThreads = new Thread[nbThreads];
340: BitSet successfullThreadTerminations = new BitSet(nbThreads); //Track which thread successfully terminated
341:
342: for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) {
343: GetStaleEntryAndCancelUpdate getEntry = new GetStaleEntryAndCancelUpdate(
344: key, 0, retryByThreads, threadIndex,
345: successfullThreadTerminations);
346: Thread thread = new Thread(getEntry);
347: spawnedThreads[threadIndex] = thread;
348: thread.start();
349: }
350:
351: // OK, those threads should now repeatidely be blocked waiting for the new cache
352: // entry to appear. Wait for all of them to terminate
353: long maxWaitingSeconds = 100;
354: int maxWaitForEachThread = 5;
355: long waitStartTime = System.currentTimeMillis();
356:
357: boolean atLeastOneThreadRunning = false;
358:
359: while ((System.currentTimeMillis() - waitStartTime) < (maxWaitingSeconds * 1000)) {
360: atLeastOneThreadRunning = false;
361:
362: //Wait a bit between each step to avoid consumming all CPU and preventing other threads from running.
363: try {
364: Thread.sleep(500);
365: } catch (InterruptedException ie) {
366: }
367:
368: //check whether all threads are done.
369: for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) {
370: Thread inspectedThread = spawnedThreads[threadIndex];
371:
372: try {
373: inspectedThread
374: .join(maxWaitForEachThread * 1000);
375: } catch (InterruptedException e) {
376: fail("Thread #" + threadIndex
377: + " was interrupted");
378: }
379:
380: if (inspectedThread.isAlive()) {
381: atLeastOneThreadRunning = true;
382: log
383: .error("Thread #"
384: + threadIndex
385: + " did not complete within ["
386: + ((System.currentTimeMillis() - waitStartTime) / 1000)
387: + "] s ");
388: }
389: }
390:
391: if (!atLeastOneThreadRunning) {
392: break; //while loop, test success.
393: }
394: }
395:
396: assertTrue(
397: "at least one thread did not complete within ["
398: + ((System.currentTimeMillis() - waitStartTime) / 1000)
399: + "] s ", !atLeastOneThreadRunning);
400:
401: for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) {
402: assertTrue("thread [" + threadIndex
403: + "] did not successfully complete. ",
404: successfullThreadTerminations.get(threadIndex));
405: }
406: } finally {
407: admin = staticAdmin;
408:
409: //Avoid po
410: }
411: }
412:
413: private class GetEntry implements Runnable {
414: String key;
415: String value;
416: boolean expectNRE;
417: int time;
418:
419: GetEntry(String key, String value, int time, boolean expectNRE) {
420: this .key = key;
421: this .value = value;
422: this .time = time;
423: this .expectNRE = expectNRE;
424: }
425:
426: public void run() {
427: try {
428: // Get from the cache
429: Object fromCache = admin.getFromCache(key, time);
430: assertEquals(value, fromCache);
431: } catch (NeedsRefreshException nre) {
432: if (!expectNRE) {
433: admin.cancelUpdate(key);
434: fail("Thread should have blocked until a new cache entry was ready");
435: } else {
436: // Put a new piece of content into the cache
437: admin.putInCache(key, value);
438: }
439: }
440: }
441: }
442:
443: /**
444: * Basically requests a stale entry, expects to receive a NeedsRefreshException, and always cancels the update.
445: */
446: private class GetStaleEntryAndCancelUpdate implements Runnable {
447: String key;
448: int retries;
449: int time;
450: private final BitSet successfullThreadTerminations;
451: private final int threadIndex;
452:
453: GetStaleEntryAndCancelUpdate(String key, int time, int retries,
454: int threadIndex, BitSet successfullThreadTerminations) {
455: this .key = key;
456: this .time = time;
457: this .retries = retries;
458: this .threadIndex = threadIndex;
459: this .successfullThreadTerminations = successfullThreadTerminations;
460: }
461:
462: public void run() {
463: for (int retryIndex = 0; retryIndex < retries; retryIndex++) {
464: try {
465: // Get from the cache
466: Object fromCache = admin.getFromCache(key, time);
467: assertNull("Thread index [" + retryIndex
468: + "] expected stale request [" + retryIndex
469: + "] to be received, got [" + fromCache
470: + "]", fromCache);
471: } catch (NeedsRefreshException nre) {
472: try {
473: admin.cancelUpdate(key);
474: } catch (Throwable t) {
475: log.error("Thread index [" + retryIndex
476: + "]: Unexpectedly caught exception ["
477: + t + "]", t);
478: fail("Thread index [" + retryIndex
479: + "] : Unexpectedly caught exception ["
480: + t + "]");
481: }
482: } catch (Throwable t) {
483: log.error("Thread index [" + retryIndex
484: + "] : Unexpectedly caught exception [" + t
485: + "]", t);
486: fail("Thread index [" + retryIndex
487: + "] : Unexpectedly caught exception [" + t
488: + "]");
489: }
490: }
491:
492: //Once we successfully terminate, we update the corresponding bit to let the Junit know we succeeded.
493: synchronized (successfullThreadTerminations) {
494: successfullThreadTerminations.set(threadIndex);
495: }
496: }
497: }
498:
499: private class OSGeneralTest implements Runnable {
500: public void doit(int i) {
501: int refreshPeriod = 500 /*millis*/;
502: String key = KEY + (i % UNIQUE_KEYS);
503: admin.putInCache(key, VALUE);
504:
505: try {
506: // Get from the cache
507: admin.getFromCache(KEY, refreshPeriod);
508: } catch (NeedsRefreshException nre) {
509: // Get the value
510: // Store in the cache
511: admin.putInCache(KEY, VALUE);
512: }
513:
514: // Flush occasionally
515: if ((i % (UNIQUE_KEYS + 1)) == 0) {
516: admin.getCache().flushEntry(key);
517: }
518: }
519:
520: public void run() {
521: int start = (int) (Math.random() * UNIQUE_KEYS);
522: System.out.print(start + " ");
523:
524: for (int i = start; i < (start + ITERATION_COUNT); i++) {
525: doit(i);
526: }
527: }
528: }
529: }
|