001: /*
002: * Copyright (c) 2002-2007 by OpenSymphony
003: * All rights reserved.
004: */
005: package com.opensymphony.oscache.base;
006:
007: import com.opensymphony.oscache.general.GeneralCacheAdministrator;
008:
009: import junit.framework.Test;
010: import junit.framework.TestCase;
011: import junit.framework.TestSuite;
012:
013: import net.sourceforge.groboutils.junit.v1.MultiThreadedTestRunner;
014: import net.sourceforge.groboutils.junit.v1.TestRunnable;
015:
016: import org.apache.commons.logging.Log;
017: import org.apache.commons.logging.LogFactory;
018:
019: import java.util.Properties;
020:
021: /**
022: * Test the Cache class for any concurrency problems
023: *
024: * $Id: TestConcurrency.java 404 2007-02-24 10:21:00Z larst $
025: * @version $Revision: 404 $
026: */
027: public class TestConcurrency2 extends TestCase {
028:
029: private static transient final Log log = LogFactory
030: .getLog(GeneralCacheAdministrator.class); //TestConcurrency2.class
031:
032: // Static variables required thru all the tests
033: private static GeneralCacheAdministrator admin = null;
034:
035: // Constants needed in the tests
036: private final String KEY = "key";
037: private final String VALUE = "This is some content";
038: private final int ITERATION_COUNT = 1000;
039: private final int THREAD_COUNT = 3;
040: private final int UNIQUE_KEYS = 1013;
041:
042: /**
043: * Class constructor.
044: * <p>
045: * @param str The test name (required by JUnit)
046: */
047: public TestConcurrency2(String str) {
048: super (str);
049: }
050:
051: /**
052: * This method is invoked before each testXXXX methods of the
053: * class. It set ups the variables required for each tests.
054: */
055: public void setUp() {
056: // At first invocation, create a new Cache
057: if (admin == null) {
058: Properties config = new Properties();
059: config
060: .setProperty(
061: AbstractCacheAdministrator.CACHE_CAPACITY_KEY,
062: "70");
063: config.setProperty(
064: AbstractCacheAdministrator.CACHE_BLOCKING_KEY,
065: "false");
066: admin = new GeneralCacheAdministrator(config);
067: assertNotNull(admin);
068: }
069: }
070:
071: /**
072: * This methods returns the name of this test class to JUnit
073: * <p>
074: * @return The name of this class
075: */
076: public static Test suite() {
077: return new TestSuite(TestConcurrency2.class);
078: }
079:
080: /**
081: * Check that the cache handles simultaneous attempts to access a
082: * new cache entry correctly
083: */
084: public void testNewEntry() {
085: String key = "new";
086:
087: try {
088: admin.getFromCache(key, -1);
089: fail("NeedsRefreshException should have been thrown");
090: } catch (NeedsRefreshException nre) {
091: // Fire off another couple of threads to get the same cache entry
092: GetEntry getEntry1 = new GetEntry(key, VALUE, -1, false);
093: GetEntry getEntry2 = new GetEntry(key, VALUE, -1, false);
094:
095: // OK, those threads should be blocked waiting for the new cache
096: // entry to appear. Sleep for a bit to simulate the time taken to
097: // build the cache entry
098: PutInCache putInCache = new PutInCache(key, VALUE, 500);
099:
100: // pass that instance to the MTTR
101: TestRunnable[] trs = { getEntry1, getEntry2, putInCache };
102: MultiThreadedTestRunner mttr = new MultiThreadedTestRunner(
103: trs);
104:
105: // kickstarts the MTTR & fires off threads
106: try {
107: mttr.runTestRunnables(5000);
108: } catch (Throwable e) {
109: fail("Thread should have blocked until a new cache entry was ready");
110: }
111: }
112: }
113:
114: /**
115: * Check that the cache handles simultaneous attempts to access a
116: * new cache entry correctly
117: */
118: public void testNewEntryCancel() {
119: final String key = "newCancel";
120: final String NEW_VALUE = VALUE + "...";
121:
122: try {
123: admin.getFromCache(key, -1);
124: fail("NeedsRefreshException should have been thrown");
125: } catch (NeedsRefreshException nre) {
126: // Fire off another thread to get the same cache entry
127: // We can't use GrobeUtils, because joining functionality is missing
128: GetEntrySimple getEntry = new GetEntrySimple(key,
129: NEW_VALUE, CacheEntry.INDEFINITE_EXPIRY, true);
130: Thread thread = new Thread(getEntry);
131: thread.start();
132:
133: // The above thread will be blocked waiting for the new content
134: try {
135: Thread.sleep(500);
136: } catch (InterruptedException ie) {
137: }
138:
139: // Now cancel the update (eg because an exception occurred while building the content).
140: // This will unblock the other thread and it will receive a NeedsRefreshException.
141: admin.cancelUpdate(key);
142:
143: // Wait a bit for the other thread to update the cache
144: try {
145: Thread.sleep(500);
146: } catch (InterruptedException ie) {
147: }
148:
149: try {
150: Object newValue = admin.getFromCache(key,
151: CacheEntry.INDEFINITE_EXPIRY);
152: assertEquals(NEW_VALUE, newValue);
153: } catch (NeedsRefreshException e) {
154: admin.cancelUpdate(key);
155: e.printStackTrace();
156: fail("A NeedsRefreshException should not have been thrown. content="
157: + e.getCacheContent() + ", " + e.getMessage());
158: }
159: }
160: }
161:
162: /**
163: * Verify that we can concurrently access the cache without problems
164: */
165: public void testPut() {
166: Thread[] thread = new Thread[THREAD_COUNT];
167:
168: for (int idx = 0; idx < THREAD_COUNT; idx++) {
169: OSGeneralTest runner = new OSGeneralTest();
170: thread[idx] = new Thread(runner);
171: thread[idx].start();
172: }
173:
174: boolean stillAlive;
175:
176: do {
177: try {
178: Thread.sleep(100);
179: } catch (InterruptedException e) {
180: // do nothing
181: }
182:
183: stillAlive = false;
184:
185: int i = 0;
186:
187: while ((i < thread.length) && !stillAlive) {
188: stillAlive |= thread[i++].isAlive();
189: }
190: } while (stillAlive);
191: }
192:
193: /**
194: * Check that the cache handles simultaneous attempts to access a
195: * stale cache entry correctly
196: */
197: public void testStaleEntry() {
198: String key = "stale";
199: assertFalse(
200: "The cache should not be in blocking mode for this test.",
201: admin.isBlocking());
202:
203: admin.putInCache(key, VALUE);
204:
205: try {
206: // This should throw a NeedsRefreshException since the refresh
207: // period is 0
208: admin.getFromCache(key, 0);
209: fail("NeedsRefreshException should have been thrown");
210: } catch (NeedsRefreshException nre) {
211: // Fire off another thread to get the same cache entry.
212: // Since blocking mode is currently disabled we should
213: // immediately get back the stale entry
214: GetEntry getEntry = new GetEntry(key, VALUE, 0, false);
215: Thread thread = new Thread(getEntry);
216: thread.start();
217:
218: // Sleep for a bit to simulate the time taken to build the cache entry
219: try {
220: Thread.sleep(200);
221: } catch (InterruptedException ie) {
222: }
223:
224: // Putting the entry in the cache should mean that threads now retrieve
225: // the updated entry
226: String newValue = "New value";
227: admin.putInCache(key, newValue);
228:
229: getEntry = new GetEntry(key, newValue, -1, false);
230: thread = new Thread(getEntry);
231: thread.start();
232:
233: try {
234: Object fromCache = admin.getFromCache(key, -1);
235: assertEquals(newValue, fromCache);
236: } catch (NeedsRefreshException e) {
237: admin.cancelUpdate(key);
238: fail("Should not have received a NeedsRefreshException");
239: }
240:
241: // Give the GetEntry thread a chance to finish
242: try {
243: Thread.sleep(200);
244: } catch (InterruptedException ie) {
245: }
246: }
247: }
248:
249: /**
250: * A test for the updating of a stale entry when CACHE.BLOCKING = TRUE
251: */
252: public void testStaleEntryBlocking() {
253: // A test for the case where oscache.blocking = true
254: admin.destroy();
255:
256: Properties p = new Properties();
257: p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY,
258: "true");
259: admin = new GeneralCacheAdministrator(p);
260:
261: assertTrue(
262: "The cache should be in blocking mode for this test.",
263: admin.isBlocking());
264:
265: // Use a unique key in case these test entries are being persisted
266: String key = "blocking";
267: String NEW_VALUE = VALUE + " abc";
268: admin.putInCache(key, VALUE);
269:
270: try {
271: // Force a NeedsRefreshException
272: admin.getFromCache(key, 0);
273: fail("NeedsRefreshException should have been thrown");
274: } catch (NeedsRefreshException nre) {
275: // Fire off another thread to get the same cache entry.
276: // Since blocking mode is enabled this thread should block
277: // until the entry has been updated.
278: GetEntry getEntry = new GetEntry(key, NEW_VALUE, 0, false);
279: Thread thread = new Thread(getEntry);
280: thread.start();
281:
282: // Sleep for a bit to simulate the time taken to build the cache entry
283: try {
284: Thread.sleep(20);
285: } catch (InterruptedException ie) {
286: }
287:
288: // Putting the entry in the cache should mean that threads now retrieve
289: // the updated entry
290: admin.putInCache(key, NEW_VALUE);
291:
292: getEntry = new GetEntry(key, NEW_VALUE, -1, false);
293: thread = new Thread(getEntry);
294: thread.start();
295:
296: try {
297: Object fromCache = admin.getFromCache(key, -1);
298: assertEquals(NEW_VALUE, fromCache);
299: } catch (NeedsRefreshException e) {
300: admin.cancelUpdate(key);
301: fail("Should not have received a NeedsRefreshException");
302: }
303: }
304: }
305:
306: private static final int RETRY_BY_THREADS = 100000;
307: private static final int NB_THREADS = 4;
308:
309: /**
310: * Checks whether the cache handles simultaneous attempts to access a
311: * stable cache entry correctly when the blocking mode is enabled.
312: *
313: * Basically N threads are concurrently trying to access a same stale cache entry and each is cancelling its update. Each thread repeat this operation M times.
314: * The test is sucessfull if after some time, all threads are properly released
315: */
316: public void testConcurrentStaleGets() {
317: GeneralCacheAdministrator staticAdmin = admin;
318: //admin = new GeneralCacheAdministrator(); //avoid poluting other test cases
319:
320: try {
321: // A test for the case where oscache.blocking = true
322: //admin.destroy();
323: Properties p = new Properties();
324: p.setProperty(
325: AbstractCacheAdministrator.CACHE_BLOCKING_KEY,
326: "true");
327: admin = new GeneralCacheAdministrator(p);
328:
329: assertTrue(
330: "The cache should be in blocking mode for this test.",
331: admin.isBlocking());
332:
333: String key = "new";
334:
335: //First put a value
336: admin.putInCache(key, VALUE);
337:
338: try {
339: //Then test without concurrency that it is reported as stale when time-to-live is zero
340: admin.getFromCache(key, 0);
341: fail("NeedsRefreshException should have been thrown");
342: } catch (NeedsRefreshException nre) {
343: //Ok this is was is excpected, we can release the update
344: admin.cancelUpdate(key);
345: }
346:
347: //Then ask N threads to concurrently try to access this stale resource and each should receive a NeedsRefreshException, and cancel the update
348: TestRunnable[] spawnedThreads = new TestRunnable[NB_THREADS];
349:
350: for (int threadIndex = 0; threadIndex < NB_THREADS; threadIndex++) {
351: spawnedThreads[threadIndex] = new GetStaleEntryAndCancelUpdate(
352: key, 0, RETRY_BY_THREADS);
353: }
354: MultiThreadedTestRunner mttr = new MultiThreadedTestRunner(
355: spawnedThreads);
356:
357: //kickstarts the MTTR & fires off threads
358: try {
359: mttr.runTestRunnables(120 * 1000);
360: } catch (Throwable e) {
361: fail("at least one thread did not complete");
362: e.printStackTrace();
363: }
364:
365: } finally {
366: // avoid poluting other test cases
367: admin = staticAdmin;
368: }
369: }
370:
371: private class GetEntry extends TestRunnable {
372: String key;
373: String value;
374: boolean expectNRE;
375: int time;
376:
377: GetEntry(String key, String value, int time, boolean expectNRE) {
378: this .key = key;
379: this .value = value;
380: this .time = time;
381: this .expectNRE = expectNRE;
382: }
383:
384: public void runTest() {
385: try {
386: // Get from the cache
387: Object fromCache = admin.getFromCache(key, time);
388: assertEquals(value, fromCache);
389: } catch (NeedsRefreshException nre) {
390: if (!expectNRE) {
391: admin.cancelUpdate(key);
392: fail("Thread should have blocked until a new cache entry was ready");
393: } else {
394: // Put a new piece of content into the cache
395: admin.putInCache(key, value);
396: }
397: }
398: }
399: }
400:
401: private class GetEntrySimple extends GetEntry {
402: GetEntrySimple(String key, String value, int time,
403: boolean expectNRE) {
404: super (key, value, time, expectNRE);
405: }
406:
407: public void run() {
408: runTest();
409: }
410:
411: }
412:
413: private class PutInCache extends TestRunnable {
414:
415: String key;
416: String value;
417: long wait;
418:
419: PutInCache(String key, String value, long wait) {
420: this .key = key;
421: this .value = value;
422: this .wait = wait;
423: }
424:
425: public void runTest() {
426: try {
427: Thread.sleep(wait);
428: } catch (InterruptedException ie) {
429: fail("PutInCache thread shouldn't be interrupted.");
430: }
431: admin.putInCache(key, value);
432: }
433: }
434:
435: /**
436: * Basically requests a stale entry, expects to receive a NeedsRefreshException, and always cancels the update.
437: */
438: private class GetStaleEntryAndCancelUpdate extends TestRunnable {
439: String key;
440: int retries;
441: int time;
442:
443: GetStaleEntryAndCancelUpdate(String key, int time, int retries) {
444: this .key = key;
445: this .time = time;
446: this .retries = retries;
447: }
448:
449: public void runTest() {
450: for (int retryIndex = 0; retryIndex < retries; retryIndex++) {
451: try {
452: // Get from the cache
453: Object fromCache = admin.getFromCache(key, time);
454: assertNull("Thread index [" + retryIndex
455: + "] expected stale request [" + retryIndex
456: + "] to be received, got [" + fromCache
457: + "]", fromCache);
458: } catch (NeedsRefreshException nre) {
459: try {
460: admin.cancelUpdate(key);
461: } catch (Throwable t) {
462: log.error("Thread index [" + retryIndex
463: + "]: Unexpectedly caught exception ["
464: + t + "]", t);
465: fail("Thread index [" + retryIndex
466: + "] : Unexpectedly caught exception ["
467: + t + "]");
468: }
469: } catch (Throwable t) {
470: log.error("Thread index [" + retryIndex
471: + "] : Unexpectedly caught exception [" + t
472: + "]", t);
473: fail("Thread index [" + retryIndex
474: + "] : Unexpectedly caught exception [" + t
475: + "]");
476: }
477: }
478: }
479: }
480:
481: private class OSGeneralTest extends TestRunnable {
482: public void doit(int i) {
483: int refreshPeriod = 500 /*millis*/;
484: String key = KEY + (i % UNIQUE_KEYS);
485: admin.putInCache(key, VALUE);
486:
487: try {
488: // Get from the cache
489: admin.getFromCache(KEY, refreshPeriod);
490: } catch (NeedsRefreshException nre) {
491: // Get the value
492: // Store in the cache
493: admin.putInCache(KEY, VALUE);
494: }
495:
496: // Flush occasionally
497: if ((i % (UNIQUE_KEYS + 1)) == 0) {
498: admin.getCache().flushEntry(key);
499: }
500: }
501:
502: public void runTest() {
503: int start = (int) (Math.random() * UNIQUE_KEYS);
504: System.out.print(start + " ");
505:
506: for (int i = start; i < (start + ITERATION_COUNT); i++) {
507: doit(i);
508: }
509: }
510: }
511:
512: }
|