001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tctest;
005:
006: import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
007:
008: import com.tc.object.config.ConfigVisitor;
009: import com.tc.object.config.DSOClientConfigHelper;
010: import com.tc.object.config.Root;
011: import com.tc.simulator.app.ApplicationConfig;
012: import com.tc.simulator.listener.ListenerProvider;
013: import com.tc.util.concurrent.ThreadUtil;
014: import com.tctest.runner.AbstractTransparentApp;
015:
016: import java.util.ArrayList;
017: import java.util.Collections;
018: import java.util.Iterator;
019: import java.util.LinkedList;
020: import java.util.List;
021: import java.util.Random;
022:
023: public class SingleVMWaitNotifyTestApp extends AbstractTransparentApp {
024: private static Random random = new Random();
025: private static int TAKERS = random.nextInt(7) + 5;
026: private static int PUTTERS = random.nextInt(7) + 5;
027:
028: private static final SynchronizedLong takeCount = new SynchronizedLong(
029: 0);
030:
031: protected static final int PUTS = 200;
032:
033: // root
034: private final List queue = new LinkedList();
035:
036: public SingleVMWaitNotifyTestApp(String appId,
037: ApplicationConfig cfg, ListenerProvider listenerProvider) {
038: super (appId, cfg, listenerProvider);
039:
040: if (getParticipantCount() != 1) {
041: throw new RuntimeException("participant count must be 1");
042: }
043: }
044:
045: public static void visitL1DSOConfig(ConfigVisitor visitor,
046: DSOClientConfigHelper config) {
047: String testClassName = SingleVMWaitNotifyTestApp.class
048: .getName();
049: config.addIncludePattern(testClassName);
050: String root = "queue";
051: config.addRoot(new Root(testClassName, root, root + "Lock"),
052: true);
053: System.err.println("Adding root for " + testClassName + "."
054: + root);
055:
056: String methodExpression = "* " + testClassName + "*.*(..)";
057: System.err.println("Adding autolock for: " + methodExpression);
058: config.addWriteAutolock(methodExpression);
059:
060: //config.addIncludePattern(Item.class.getName());
061: config.addIncludePattern(testClassName + "$*");
062: }
063:
064: public void run() {
065: System.out.println("Number of putters: " + PUTTERS);
066: System.out.println("Number of takers: " + TAKERS);
067:
068: ArrayList threads = new ArrayList();
069:
070: for (int i = 0; i < PUTTERS; i++) {
071: threads.add(createPutter(i));
072: }
073:
074: for (int i = 0; i < TAKERS; i++) {
075: threads.add(createTaker(i));
076: }
077:
078: // randomize the thread positions in the array, such that they start at random times
079: Collections.shuffle(threads, random);
080: for (Iterator iter = threads.iterator(); iter.hasNext();) {
081: Thread t = (Thread) iter.next();
082: t.start();
083: }
084:
085: final Thread me = Thread.currentThread();
086: Thread timeout = new Thread(new Runnable() {
087: public void run() {
088: ThreadUtil.reallySleep(1000 * 60 * 5);
089: me.interrupt();
090: }
091: });
092: timeout.setDaemon(true);
093: timeout.start();
094:
095: waitForThreads(threads, true);
096:
097: // queue up the stop messages for the takers
098: synchronized (queue) {
099: for (int i = 0; i < TAKERS; i++) {
100: queue.add(new Item(true));
101: }
102: queue.notifyAll();
103: }
104:
105: // wait for everyone to finish (should just be the takers)
106: waitForThreads(threads, false);
107:
108: final long expected = PUTTERS * PUTS;
109: final long actual = takeCount.get();
110: if (expected != actual) {
111: throw new RuntimeException(actual + " != " + expected);
112: }
113:
114: notifyResult(Boolean.TRUE);
115: }
116:
117: private void waitForThreads(ArrayList threads, boolean justPutters) {
118: for (Iterator iter = threads.iterator(); iter.hasNext();) {
119: Thread t = (Thread) iter.next();
120:
121: if (!justPutters || t.getName().startsWith("PUTTER")) {
122: try {
123: t.join();
124: } catch (InterruptedException e) {
125: e.printStackTrace();
126: throw new RuntimeException(e);
127: }
128: }
129: }
130: }
131:
132: private Thread createTaker(int i) {
133: Thread rv = new Thread(new Runnable() {
134: public void run() {
135: try {
136: synchronized (queue) {
137: while (true) {
138: if (queue.size() > 0) {
139: Item item = (Item) queue.remove(0);
140: if (item.stop) {
141: return;
142: }
143: takeCount.increment();
144: } else {
145: try {
146: queue.wait();
147: } catch (InterruptedException e) {
148: throw new RuntimeException(e);
149: }
150: }
151: }
152: }
153: } catch (Throwable t) {
154: notifyError(t);
155: }
156: }
157: }, "TAKER " + i);
158:
159: rv.setDaemon(true);
160: return rv;
161: }
162:
163: private Thread createPutter(final int id) {
164: Thread rv = new Thread(new Runnable() {
165: public void run() {
166: try {
167: for (int i = 0; i < PUTS; i++) {
168: synchronized (queue) {
169: queue.add(new Item());
170: queue.notifyAll();
171: }
172: }
173: } catch (Throwable t) {
174: notifyError(t);
175: }
176: }
177: }, "PUTTER " + id);
178:
179: rv.setDaemon(true);
180: return rv;
181: }
182:
183: static class Item {
184: private final boolean stop;
185:
186: Item() {
187: this (false);
188: }
189:
190: Item(boolean stop) {
191: this .stop = stop;
192: }
193:
194: boolean isStop() {
195: return this.stop;
196: }
197:
198: }
199:
200: }
|