001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tctest;
005:
006: import com.tc.exception.TCRuntimeException;
007: import com.tc.object.config.ConfigVisitor;
008: import com.tc.object.config.DSOClientConfigHelper;
009: import com.tc.object.config.TransparencyClassSpec;
010: import com.tc.simulator.app.ApplicationConfig;
011: import com.tc.simulator.listener.ListenerProvider;
012: import com.tc.util.Assert;
013: import com.tctest.runner.AbstractTransparentApp;
014:
015: import java.util.ArrayList;
016: import java.util.List;
017: import java.util.concurrent.Callable;
018: import java.util.concurrent.CancellationException;
019: import java.util.concurrent.CyclicBarrier;
020: import java.util.concurrent.ExecutionException;
021: import java.util.concurrent.ExecutorService;
022: import java.util.concurrent.FutureTask;
023: import java.util.concurrent.LinkedBlockingQueue;
024: import java.util.concurrent.ThreadPoolExecutor;
025: import java.util.concurrent.TimeUnit;
026: import java.util.concurrent.TimeoutException;
027:
028: public class FutureTaskTestApp extends AbstractTransparentApp {
029:
030: private final CyclicBarrier barrier;
031: private final CyclicBarrier barrier2 = new CyclicBarrier(2);
032:
033: private final int NUM_OF_ITEMS = 500;
034: private final DataRoot root = new DataRoot();
035: private final LinkedBlockingQueue workerQueue = new LinkedBlockingQueue();
036: private final LinkedBlockingQueue resultQueue = new LinkedBlockingQueue();
037:
038: public FutureTaskTestApp(String appId, ApplicationConfig cfg,
039: ListenerProvider listenerProvider) {
040: super (appId, cfg, listenerProvider);
041: barrier = new CyclicBarrier(getParticipantCount());
042: }
043:
044: public void run() {
045: try {
046: int index = barrier.await();
047:
048: testSharedDuringRunning(index);
049: testWithCallable(index);
050: testWithRunnable(index);
051: testWithMyFutureTask(index);
052: testWithLinkedBlockingQueue(index);
053: testWithExecutorService(index);
054:
055: } catch (Throwable t) {
056: notifyError(t);
057: }
058: }
059:
060: private void testSharedDuringRunning(int index) throws Exception {
061: if (index == 0) {
062: FutureTask task = new MyFutureTask(new MyCallable());
063: Thread thread1 = new Thread(new GetUnSharedRunnable(task));
064: thread1.start();
065: Thread.sleep(1000);
066: root.setTask(task);
067: task.run();
068: barrier2.await();
069: }
070:
071: barrier.await();
072: }
073:
074: private void testWithExecutorService(int index) throws Exception {
075: long startTime = System.currentTimeMillis();
076: if (index == 0) {
077: List list = new ArrayList();
078: for (int i = 0; i < NUM_OF_ITEMS; i++) {
079: Callable callable = new MyCallable(i);
080: list.add(callable);
081: }
082: root.setList(list);
083: }
084:
085: barrier.await();
086:
087: if (index == 1) {
088: ExecutorService service = new ThreadPoolExecutor(10, 10,
089: 10L, TimeUnit.SECONDS, new LinkedBlockingQueue());
090: List futures = service.invokeAll(root.getList());
091: root.setTasksList(futures);
092: } else {
093: List tasksList = root.getTasksList();
094: while (tasksList == null) {
095: tasksList = root.getTasksList();
096: }
097: for (int i = 0; i < NUM_OF_ITEMS; i++) {
098: System.err.println("Getting Task " + i);
099: Assert.assertEquals(root, ((FutureTask) tasksList
100: .get(i)).get());
101: }
102: }
103:
104: long endTime = System.currentTimeMillis();
105: System.err.println("Elapsed time in ExecutorService: "
106: + (endTime - startTime));
107:
108: barrier.await();
109: }
110:
111: private void testWithLinkedBlockingQueue(int index)
112: throws Exception {
113: long startTime = System.currentTimeMillis();
114: if (index == 0) {
115: for (int i = 0; i < NUM_OF_ITEMS; i++) {
116: System.err.println("Putting task " + i);
117: FutureTask task = new MyFutureTask(new MyCallable(i));
118: workerQueue.put(task);
119: }
120: workerQueue.put("STOP");
121: workerQueue.put("STOP");
122: } else {
123: while (true) {
124: Object o = workerQueue.take();
125: if ("STOP".equals(o)) {
126: break;
127: } else {
128: FutureTask task = (FutureTask) o;
129: task.run();
130: resultQueue.put(task);
131: }
132: }
133: }
134: if (index == 0) {
135: for (int i = 0; i < NUM_OF_ITEMS; i++) {
136: FutureTask task = (FutureTask) resultQueue.take();
137:
138: Assert.assertEquals(root, task.get());
139: }
140: }
141:
142: long endTime = System.currentTimeMillis();
143: System.err.println("Elapsed time in LinkedBlockingQueue: "
144: + (endTime - startTime));
145:
146: barrier.await();
147: }
148:
149: private void testWithMyFutureTask(int index) throws Exception {
150: FutureTask task = new MyFutureTask(new MyCallable());
151: basicRunTask(index, task);
152:
153: task = new MyFutureTask(new MyLongCallable());
154: basicCancelTask(index, task);
155:
156: task = new MyFutureTask(new MySemiLongCallable());
157: basicCancelTaskWithCompletion(index, task);
158:
159: task = new MyFutureTask(new MyLongCallable());
160: timeoutGetTask(index, task);
161:
162: task = new MyFutureTask(new MyCallable());
163: basicSet(index, (MyFutureTask) task);
164:
165: task = new MyFutureTask(new MyCallable());
166: basicSetException(index, (MyFutureTask) task);
167:
168: task = new MyFutureTask(new MyCallable());
169: basicRunAndResetException(index, (MyFutureTask) task);
170: }
171:
172: private void testWithRunnable(int index) throws Exception {
173: FutureTask task = new FutureTask(new MyRunnable(), root);
174: basicRunTask(index, task);
175:
176: task = new FutureTask(new MyLongRunnable(), root);
177: basicCancelTask(index, task);
178:
179: task = new FutureTask(new MySemiLongRunnable(), root);
180: basicCancelTaskWithCompletion(index, task);
181:
182: task = new FutureTask(new MyLongRunnable(), root);
183: basicCancelTask(index, task);
184:
185: task = new FutureTask(new MyLongRunnable(), root);
186: timeoutGetTask(index, task);
187: }
188:
189: private void testWithCallable(int index) throws Exception {
190: FutureTask task = new FutureTask(new MyCallable());
191: basicRunTask(index, task);
192:
193: task = new FutureTask(new MyLongCallable());
194: basicCancelTask(index, task);
195:
196: task = new FutureTask(new MySemiLongCallable());
197: basicCancelTaskWithCompletion(index, task);
198:
199: task = new FutureTask(new MyLongCallable());
200: timeoutGetTask(index, task);
201:
202: task = new FutureTask(new MyThrowable());
203: basicRunTaskWithException(index, task);
204: }
205:
206: private void basicSet(int index, MyFutureTask task)
207: throws Exception {
208: if (index == 0) {
209: root.setTask(task);
210: }
211:
212: barrier.await();
213:
214: if (index == 1) {
215: ((MyFutureTask) root.getTask()).set(root);
216: }
217:
218: Object o = root.getTask().get();
219: while (o == null) {
220: o = root.getTask().get();
221: }
222: Assert.assertEquals(root, o);
223:
224: Assert.assertTrue(root.getTask().isDone());
225:
226: barrier.await();
227: }
228:
229: private void basicRunAndResetException(int index, MyFutureTask task)
230: throws Exception {
231: if (index == 0) {
232: root.setTask(task);
233: }
234:
235: barrier.await();
236:
237: if (index == 1) {
238: boolean flag = ((MyFutureTask) root.getTask())
239: .runAndReset();
240: Assert.assertTrue(flag);
241: }
242:
243: barrier.await();
244:
245: Assert.assertFalse(root.getTask().isDone());
246:
247: barrier.await();
248: }
249:
250: private void basicSetException(int index, MyFutureTask task)
251: throws Exception {
252: final String exceptionMsg = "Test setting InterruptedException";
253: if (index == 0) {
254: root.setTask(task);
255: }
256:
257: barrier.await();
258:
259: if (index == 1) {
260: ((MyFutureTask) root.getTask())
261: .setException(new InterruptedException(exceptionMsg));
262: }
263:
264: barrier.await();
265:
266: try {
267: root.getTask().get();
268: throw new AssertionError(
269: "Should have thrown an ExecutionException.");
270: } catch (ExecutionException e) {
271: Assert
272: .assertEquals(exceptionMsg, e.getCause()
273: .getMessage());
274: }
275:
276: barrier.await();
277: }
278:
279: private void timeoutGetTask(int index, FutureTask longTask)
280: throws Exception {
281: if (index == 0) {
282: root.setTask(longTask);
283: }
284:
285: barrier.await();
286:
287: if (index == 1) {
288: root.getTask().run();
289: } else if (index == 0) {
290: try {
291: root.getTask().get(10000, TimeUnit.MILLISECONDS);
292: throw new AssertionError(
293: "Should have thrown a TimeoutException.");
294: } catch (TimeoutException e) {
295: root.getTask().cancel(true);
296: }
297: }
298:
299: barrier.await();
300:
301: Assert.assertTrue(root.getTask().isCancelled());
302:
303: Assert.assertTrue(root.getTask().isDone());
304:
305: barrier.await();
306:
307: }
308:
309: private void basicCancelTask(int index, FutureTask longTask)
310: throws Exception {
311: if (index == 0) {
312: root.setTask(longTask);
313: }
314:
315: barrier.await();
316:
317: if (index == 1) {
318: root.getTask().run();
319: } else if (index == 0) {
320: root.getTask().cancel(true);
321: }
322:
323: barrier.await();
324:
325: Assert.assertTrue(root.getTask().isCancelled());
326:
327: Assert.assertTrue(root.getTask().isDone());
328:
329: try {
330: root.getTask().get();
331: throw new AssertionError(
332: "Could have thrown a CancellationException.");
333: } catch (CancellationException e) {
334: // Expected
335: }
336:
337: barrier.await();
338:
339: }
340:
341: private void basicCancelTaskWithCompletion(int index,
342: FutureTask longTask) throws Exception {
343: if (index == 0) {
344: root.setTask(longTask);
345: }
346:
347: barrier.await();
348:
349: if (index == 1) {
350: root.getTask().run();
351: } else if (index == 0) {
352: root.getTask().cancel(false);
353: }
354:
355: barrier.await();
356:
357: Assert.assertTrue(root.getTask().isCancelled());
358:
359: Assert.assertTrue(root.getTask().isDone());
360:
361: try {
362: root.getTask().get();
363: throw new AssertionError(
364: "Could have thrown a CancellationException.");
365: } catch (CancellationException e) {
366: // Expected
367: }
368:
369: barrier.await();
370:
371: }
372:
373: private void basicRunTaskWithException(int index, FutureTask task)
374: throws Exception {
375: if (index == 0) {
376: root.setTask(task);
377: }
378:
379: barrier.await();
380:
381: if (index == 1) {
382: root.getTask().run();
383: }
384:
385: try {
386: root.getTask().get();
387: throw new AssertionError(
388: "Should have thrown an ExecutionException");
389: } catch (ExecutionException e) {
390: Assert
391: .assertTrue(e.getCause() instanceof InterruptedException);
392: Assert.assertEquals(MyThrowable.EXCEPTION_MSG, e.getCause()
393: .getMessage());
394: }
395:
396: barrier.await();
397: }
398:
399: private void basicRunTask(int index, FutureTask task)
400: throws Exception {
401: if (index == 0) {
402: root.setTask(task);
403: }
404:
405: barrier.await();
406:
407: if (index == 1) {
408:
409: root.getTask().run();
410: }
411:
412: Assert.assertEquals(root, root.getTask().get());
413:
414: Assert.assertTrue(root.getTask().isDone());
415:
416: barrier.await();
417: }
418:
419: public static void visitL1DSOConfig(ConfigVisitor visitor,
420: DSOClientConfigHelper config) {
421: String testClass = FutureTaskTestApp.class.getName();
422: TransparencyClassSpec spec = config.getOrCreateSpec(testClass);
423:
424: config.addIncludePattern(testClass + "$*");
425:
426: String methodExpression = "* " + testClass + "*.*(..)";
427: config.addWriteAutolock(methodExpression);
428:
429: spec.addRoot("barrier", "barrier");
430: spec.addRoot("barrier2", "barrier2");
431: spec.addRoot("root", "root");
432: spec.addRoot("workerQueue", "workerQueue");
433: spec.addRoot("resultQueue", "resultQueue");
434: }
435:
436: private static class DataRoot {
437: private FutureTask task;
438: private List list;
439: private List tasksList;
440:
441: public DataRoot() {
442: super ();
443: }
444:
445: public synchronized void setTask(FutureTask task) {
446: this .task = task;
447: }
448:
449: public synchronized FutureTask getTask() {
450: return this .task;
451: }
452:
453: public List getList() {
454: return list;
455: }
456:
457: public synchronized void setList(List list) {
458: this .list = list;
459: }
460:
461: public synchronized List getTasksList() {
462: return tasksList;
463: }
464:
465: public synchronized void setTasksList(List tasksList) {
466: this .tasksList = tasksList;
467: }
468: }
469:
470: private class MyLongCallable implements Callable {
471: public Object call() throws Exception {
472: while (true) {
473: if (Thread.interrupted()) {
474: throw new InterruptedException();
475: }
476: Thread.sleep(10000);
477: }
478: }
479: }
480:
481: private class MySemiLongCallable implements Callable {
482: public Object call() throws Exception {
483: Thread.sleep(30000);
484: return root;
485: }
486: }
487:
488: private class MyCallable implements Callable {
489: private Integer value;
490:
491: public MyCallable(int i) {
492: this .value = new Integer(i);
493: }
494:
495: public MyCallable() {
496: super ();
497: }
498:
499: public Object call() throws Exception {
500: if (value != null) {
501: System.err.println("Running call() in MyCallable: "
502: + value);
503: }
504: return root;
505: }
506: }
507:
508: private class MyThrowable implements Callable {
509: public static final String EXCEPTION_MSG = "Test InterruptException";
510:
511: public Object call() throws Exception {
512: throw new InterruptedException(EXCEPTION_MSG);
513: }
514: }
515:
516: private class MyLongRunnable implements Runnable {
517: public void run() {
518: try {
519: while (true) {
520: if (Thread.interrupted()) {
521: throw new InterruptedException();
522: }
523: Thread.sleep(10000);
524: }
525: } catch (Exception e) {
526: throw new TCRuntimeException(e);
527: }
528: }
529: }
530:
531: private class MySemiLongRunnable implements Runnable {
532: public void run() {
533: try {
534: Thread.sleep(30000);
535: } catch (Exception e) {
536: throw new TCRuntimeException(e);
537: }
538: }
539: }
540:
541: private class MyRunnable implements Runnable {
542: public void run() {
543: // do nothing
544: }
545: }
546:
547: private class MyFutureTask extends FutureTask {
548: public MyFutureTask(Callable callable) {
549: super (callable);
550: }
551:
552: public MyFutureTask(Runnable runnable, Object result) {
553: super (runnable, result);
554: }
555:
556: public synchronized void set(Object v) {
557: super .set(v);
558: }
559:
560: public synchronized void setException(Throwable t) {
561: super .setException(t);
562: }
563:
564: public boolean runAndReset() {
565: return super .runAndReset();
566: }
567: }
568:
569: private class GetUnSharedRunnable implements Runnable {
570: private FutureTask task;
571:
572: public GetUnSharedRunnable(FutureTask task) {
573: this .task = task;
574: }
575:
576: public void run() {
577: try {
578: Assert.assertEquals(root, task.get());
579: barrier2.await();
580: } catch (Exception e) {
581: throw new TCRuntimeException(e);
582: }
583: }
584: }
585:
586: }
|