001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tctest;
005:
006: import com.tc.logging.TCLogger;
007: import com.tc.logging.TCLogging;
008: import com.tc.object.config.ConfigVisitor;
009: import com.tc.object.config.DSOClientConfigHelper;
010: import com.tc.object.config.Root;
011: import com.tc.simulator.app.ApplicationConfig;
012: import com.tc.simulator.listener.ListenerProvider;
013: import com.tctest.runner.AbstractTransparentApp;
014:
015: import java.util.ArrayList;
016: import java.util.Arrays;
017: import java.util.HashSet;
018: import java.util.Iterator;
019: import java.util.LinkedList;
020: import java.util.List;
021: import java.util.Random;
022: import java.util.Set;
023:
024: public class WaitNotifySystemTestApp extends AbstractTransparentApp {
025: private static final TCLogger logger = TCLogging
026: .getTestingLogger(WaitNotifySystemTestApp.class);
027:
028: // roots
029: private final List queue = new LinkedList();
030: private final Set takers = new HashSet();
031: private final Set putters = new HashSet();
032: private final Set workers = new HashSet();
033: private final List takeCounts = new LinkedList();
034: private final Flag first = new Flag();
035:
036: private static final int PUTS = 50;
037: private static final boolean debug = true;
038: private Random random;
039:
040: public WaitNotifySystemTestApp(String globalId,
041: ApplicationConfig cfg, ListenerProvider listenerProvider) {
042: super (globalId, cfg, listenerProvider);
043:
044: if (getParticipantCount() < 3) {
045: throw new RuntimeException(
046: "Must have at least 3 participants to run this test");
047: }
048:
049: }
050:
051: public static void visitL1DSOConfig(ConfigVisitor visitor,
052: DSOClientConfigHelper config) {
053: String testClassName = WaitNotifySystemTestApp.class.getName();
054:
055: List roots = Arrays.asList(new Object[] { "queue", "takers",
056: "putters", "workers", "takeCounts", "first" });
057: for (Iterator iter = roots.iterator(); iter.hasNext();) {
058: String root = (String) iter.next();
059: config.addRoot(
060: new Root(testClassName, root, root + "Lock"), true);
061: System.err.println("Adding root for " + testClassName + "."
062: + root);
063: }
064:
065: String methodExpression = "* " + testClassName + "*.*(..)";
066: System.err.println("Adding autolock for: " + methodExpression);
067: config.addWriteAutolock(methodExpression);
068:
069: config.addIncludePattern(Flag.class.getName());
070: config.addIncludePattern(WorkItem.class.getName());
071: }
072:
073: public void run() {
074: random = new Random(new Random(System.currentTimeMillis()
075: + getApplicationId().hashCode()).nextLong());
076:
077: try {
078: run0();
079: notifyResult(Boolean.TRUE);
080: } catch (Throwable t) {
081: notifyError(t);
082: notifyResult(Boolean.FALSE);
083: }
084: }
085:
086: public void run0() throws Throwable {
087: final long id = new Long(getApplicationId()).longValue();
088:
089: if (first.attemptSet()) {
090: // I am the master, bow before me
091: runMaster(getParticipantCount() - 1);
092: } else {
093: if (random.nextBoolean()) {
094: runPutter(id);
095: } else {
096: runTaker(id);
097: }
098: }
099: }
100:
101: private void runTaker(long id) throws InterruptedException {
102: Thread.currentThread().setName("TAKER-" + id);
103:
104: Long myID = new Long(id);
105: synchronized (takers) {
106: takers.add(myID);
107: }
108:
109: synchronized (workers) {
110: workers.add(myID);
111: workers.notify();
112: }
113:
114: log("STARTED");
115:
116: long count = 0;
117: try {
118: synchronized (queue) {
119: while (true) {
120: if (queue.size() > 0) {
121: WorkItem wi = (WorkItem) queue.remove(0);
122: // log("TOOK " + wi);
123: if (wi.isStop()) {
124: log("took a stop item");
125: return;
126: }
127: count++;
128: } else {
129: if (random.nextBoolean()) {
130: long millis = random.nextInt(10000);
131: if (random.nextBoolean()) {
132: int nanos = random.nextInt(10000);
133: // log("wait(" + millis + ", " + nanos + ")");
134: queue.wait(millis, nanos);
135: } else {
136: // log("wait(" + millis + ")");
137: queue.wait(millis);
138: }
139: } else {
140: // log("wait()");
141: queue.wait();
142: }
143: }
144: }
145: }
146:
147: } finally {
148: log("adding to takeCount");
149: synchronized (takeCounts) {
150: takeCounts.add(new Long(count));
151: }
152:
153: log("removing self from takers set");
154: synchronized (takers) {
155: takers.remove(myID);
156: takers.notify();
157: }
158:
159: log("ENDED");
160: }
161: }
162:
163: private void runPutter(long id) {
164: Thread.currentThread().setName("PUTTER-" + id);
165:
166: Long myID = new Long(id);
167: synchronized (putters) {
168: putters.add(myID);
169: }
170:
171: synchronized (workers) {
172: workers.add(myID);
173: workers.notify();
174: }
175:
176: log("STARTED");
177:
178: try {
179: for (int i = 0; i < PUTS; i++) {
180: synchronized (queue) {
181: WorkItem newWork = new WorkItem(myID.toString()
182: + "-" + i);
183:
184: // log("PUTTING new work: " + newWork);
185:
186: queue.add(newWork);
187:
188: if (random.nextBoolean()) {
189: // log("notify all");
190: queue.notifyAll();
191: } else {
192: // log("notify");
193: queue.notify();
194: }
195: }
196: }
197: } finally {
198: log("removing self from putters set");
199: synchronized (putters) {
200: putters.remove(myID);
201: putters.notify();
202: }
203:
204: log("ENDED");
205: }
206: }
207:
208: private void runMaster(int workerCount) throws InterruptedException {
209: Thread.currentThread().setName("MASTER");
210:
211: waitForAllWorkers(workerCount);
212:
213: log("All worker nodes started");
214:
215: final Long workerIDs[];
216: synchronized (workers) {
217: workerIDs = (Long[]) workers.toArray(new Long[] {});
218: }
219:
220: final long extraTakerID = getUniqueId(workerIDs);
221: final List next = new ArrayList(workers);
222: next.add(new Long(extraTakerID));
223: final long extraPutterID = getUniqueId((Long[]) next
224: .toArray(new Long[] {}));
225:
226: // start up another taker and putter locally. Do this for two reasons:
227: // 1) Taker/Putter choice is made randomly, thus it is possible that there are only putters or takers out there
228: // 2) To force some wait/notify in the intra-VM context
229: Thread extraTaker = new Thread(new Runnable() {
230: public void run() {
231: try {
232: runTaker(extraTakerID);
233: } catch (Throwable t) {
234: WaitNotifySystemTestApp.this .notifyError(t);
235: }
236: }
237: });
238: extraTaker.start();
239:
240: Thread extraPutter = new Thread(new Runnable() {
241: public void run() {
242: try {
243: runPutter(extraPutterID);
244: } catch (Throwable t) {
245: WaitNotifySystemTestApp.this .notifyError(t);
246: }
247: }
248: });
249: extraPutter.start();
250:
251: workerCount += 2;
252: waitForAllWorkers(workerCount);
253:
254: log("Extra workers started");
255:
256: final int numTakers;
257: synchronized (takers) {
258: numTakers = takers.size();
259: }
260: log("takers count = " + numTakers);
261:
262: final int numPutters = workerCount - numTakers;
263: log("putters count = " + numPutters);
264:
265: // wait for all putters to finish
266: synchronized (putters) {
267: while (putters.size() > 0) {
268: log("waiting for putters: " + putters.size());
269: putters.wait();
270: }
271: }
272:
273: log("All putters done");
274:
275: // tell the takers to stop
276: synchronized (queue) {
277: for (int i = 0; i < numTakers; i++) {
278: queue.add(WorkItem.STOP);
279: }
280: queue.notifyAll();
281: }
282:
283: log("Takers told to stop");
284:
285: // wait for all the takers to finish
286: synchronized (takers) {
287: while (takers.size() > 0) {
288: log("waiting for takers: " + takers.size());
289: takers.wait();
290: }
291: }
292:
293: log("Takers all done");
294:
295: // total up the work items each taker saw
296: long total = 0;
297: synchronized (takeCounts) {
298: log("Collecting take counts");
299:
300: if (takeCounts.size() != numTakers) {
301: // shouldn't happen, but if something is wrong, it might be useful to know how many take counts were there
302: throw new RuntimeException(
303: "Wrong number of take counts: "
304: + takeCounts.size() + " != "
305: + numTakers);
306: }
307:
308: for (Iterator iter = takeCounts.iterator(); iter.hasNext();) {
309: Long count = (Long) iter.next();
310: total += count.longValue();
311: }
312: }
313:
314: // verify the results
315: final int expectedTotal = numPutters * PUTS;
316: if (total != expectedTotal) {
317: throw new RuntimeException("Expected " + expectedTotal
318: + ", but we got " + total);
319: }
320: }
321:
322: private void waitForAllWorkers(int workerCount)
323: throws InterruptedException {
324: // wait for everyone to start
325: synchronized (workers) {
326: while (workers.size() < workerCount) {
327: final int lastCount = workers.size();
328: log("waiting for workers " + workers.size());
329: workers.wait();
330: if (lastCount == workers.size()) {
331: throw new Error("Size didn't change!!!");
332: }
333: }
334: }
335: }
336:
337: private long getUniqueId(Long[] workerIDs) {
338: while (true) {
339: final long candidate = random.nextInt(Integer.MAX_VALUE);
340: boolean okay = true;
341: for (int i = 0; i < workerIDs.length; i++) {
342: if (workerIDs[i].longValue() == candidate) {
343: okay = false;
344: break;
345: }
346:
347: if (okay) {
348: return candidate;
349: }
350: }
351: }
352: }
353:
354: private static void log(String msg) {
355: if (debug)
356: logger.info(msg);
357: }
358:
359: private static class Flag {
360: private boolean set = false;
361:
362: synchronized boolean attemptSet() {
363: if (!set) {
364: set = true;
365: return true;
366: }
367: return false;
368: }
369: }
370:
371: private static class WorkItem {
372: static final WorkItem STOP = new WorkItem("STOP");
373:
374: private final String name;
375:
376: WorkItem(String name) {
377: this .name = name;
378: }
379:
380: boolean isStop() {
381: return STOP.name.equals(name);
382: }
383:
384: public String toString() {
385: return this.name;
386: }
387: }
388:
389: }
|