001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tctest;
006:
007: import com.tc.object.config.ConfigVisitor;
008: import com.tc.object.config.DSOClientConfigHelper;
009: import com.tc.object.config.TransparencyClassSpec;
010: import com.tc.simulator.app.ApplicationConfig;
011: import com.tc.simulator.listener.ListenerProvider;
012: import com.tc.util.Assert;
013: import com.tctest.runner.AbstractTransparentApp;
014:
015: import java.util.concurrent.CyclicBarrier;
016: import java.util.concurrent.LinkedBlockingQueue;
017:
018: public class LinkedBlockingQueueMultiThreadTestApp extends
019: AbstractTransparentApp {
020: private static final int NUM_OF_PUTS = 1000;
021: private static final int NUM_OF_THREADS = 5;
022:
023: private LinkedBlockingQueue queue = new LinkedBlockingQueue(2);
024: private final CyclicBarrier barrier;
025: private int[] putCount;
026: private int[] getCount;
027:
028: public LinkedBlockingQueueMultiThreadTestApp(String appId,
029: ApplicationConfig cfg, ListenerProvider listenerProvider) {
030: super (appId, cfg, listenerProvider);
031: barrier = new CyclicBarrier(getParticipantCount());
032: }
033:
034: public void run() {
035: try {
036: int index = barrier.await();
037:
038: final CyclicBarrier localBarrier = new CyclicBarrier(
039: NUM_OF_THREADS + 1);
040: if (index == 0) {
041: Thread[] putter = new Thread[NUM_OF_THREADS];
042: final int[] localPutCount = new int[NUM_OF_THREADS];
043: for (int i = 0; i < NUM_OF_THREADS; i++) {
044: final int k = i;
045: localPutCount[k] = 0;
046: putter[k] = new Thread(new Runnable() {
047: public void run() {
048: try {
049: localBarrier.await();
050: for (int j = 0; j < NUM_OF_PUTS; j++) {
051: int seed = (this .hashCode() ^ (int) System
052: .nanoTime());
053: localPutCount[k] += seed;
054: queue.put(new Integer(seed));
055: }
056: localBarrier.await();
057: } catch (Throwable t) {
058: throw new AssertionError(t);
059: }
060: }
061:
062: });
063: }
064: for (int i = 0; i < NUM_OF_THREADS; i++) {
065: putter[i].start();
066: }
067: localBarrier.await();
068: localBarrier.await();
069: this .putCount = localPutCount;
070: } else {
071: Thread[] getter = new Thread[NUM_OF_THREADS];
072: final int[] localGetCount = new int[NUM_OF_THREADS];
073: for (int i = 0; i < NUM_OF_THREADS; i++) {
074: final int k = i;
075: localGetCount[k] = 0;
076: getter[k] = new Thread(new Runnable() {
077: public void run() {
078: try {
079: localBarrier.await();
080: for (int j = 0; j < NUM_OF_PUTS; j++) {
081: Integer o = (Integer) queue.take();
082: localGetCount[k] += o.intValue();
083: }
084: localBarrier.await();
085: } catch (Throwable t) {
086: throw new AssertionError(t);
087: }
088:
089: }
090: });
091: }
092: for (int i = 0; i < NUM_OF_THREADS; i++) {
093: getter[i].start();
094: }
095: localBarrier.await();
096: localBarrier.await();
097: this .getCount = localGetCount;
098: }
099: barrier.await();
100:
101: if (index == 0) {
102: int totalPutCount = 0;
103: int totalGetCount = 0;
104: for (int i = 0; i < NUM_OF_THREADS; i++) {
105: totalPutCount += putCount[i];
106: totalGetCount += getCount[i];
107: }
108: Assert.assertEquals(totalPutCount, totalGetCount);
109: }
110:
111: barrier.await();
112: } catch (Throwable t) {
113: notifyError(t);
114: }
115: }
116:
117: public static void visitL1DSOConfig(ConfigVisitor visitor,
118: DSOClientConfigHelper config) {
119: String testClass = LinkedBlockingQueueMultiThreadTestApp.class
120: .getName();
121: TransparencyClassSpec spec = config.getOrCreateSpec(testClass);
122:
123: config.addIncludePattern(testClass + "$*");
124:
125: String methodExpression = "* " + testClass + "*.*(..)";
126: config.addWriteAutolock(methodExpression);
127:
128: spec.addRoot("queue", "queue");
129: spec.addRoot("barrier", "barrier");
130: spec.addRoot("putCount", "putCount");
131: spec.addRoot("getCount", "getCount");
132: }
133:
134: }
|