001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tctest.restart.system;
006:
007: import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
008: import EDU.oswego.cs.dl.util.concurrent.LinkedNode;
009: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
010: import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
011:
012: import com.tc.exception.TCRuntimeException;
013: import com.tc.object.config.ConfigVisitor;
014: import com.tc.object.config.DSOClientConfigHelper;
015: import com.tc.object.config.TransparencyClassSpec;
016: import com.tc.object.config.spec.SynchronizedIntSpec;
017: import com.tc.simulator.app.ApplicationConfig;
018: import com.tc.simulator.listener.ListenerProvider;
019: import com.tc.simulator.listener.OutputListener;
020: import com.tc.util.Assert;
021: import com.tc.util.DebugUtil;
022: import com.tctest.runner.AbstractTransparentApp;
023:
024: import java.util.ArrayList;
025: import java.util.Collection;
026: import java.util.Date;
027: import java.util.HashMap;
028: import java.util.HashSet;
029: import java.util.Iterator;
030: import java.util.List;
031: import java.util.Map;
032:
033: public class ObjectDataTestApp extends AbstractTransparentApp {
034: public static final String SYNCHRONOUS_WRITE = "synch-write";
035:
036: private int threadCount = 10;
037: private int workSize = 1 * 100;
038: private int testObjectDepth = 1 * 50;
039: // I had to dial this down considerably because this takes a long time to run.
040: private int iterationCount = 1 * 2;
041: private List workQueue = new ArrayList();
042: private Collection resultSet = new HashSet();
043: private SynchronizedInt complete = new SynchronizedInt(0);
044: private OutputListener out;
045: private SynchronizedInt nodes = new SynchronizedInt(0);
046:
047: public ObjectDataTestApp(String appId, ApplicationConfig cfg,
048: ListenerProvider listenerProvider) {
049: super (appId, cfg, listenerProvider);
050: this .out = listenerProvider.getOutputListener();
051: }
052:
053: public void run() {
054: try {
055: // set up workers
056: WorkerFactory wf = new WorkerFactory(getApplicationId(),
057: workQueue, resultSet, complete);
058:
059: for (int i = 0; i < threadCount; i++) {
060: Worker worker = wf.newWorker();
061: new Thread(worker).start();
062: }
063:
064: if (nodes.increment() == 1) {
065: // if we are the first participant, we control the work queue and do the verifying
066: // System.err.println("Populating work queue...");
067: populateWorkQueue(workSize, testObjectDepth, workQueue);
068: for (int i = 0; i < iterationCount; i++) {
069: synchronized (resultSet) {
070: while (resultSet.size() < workSize) {
071: try {
072: resultSet.wait();
073: } catch (InterruptedException e) {
074: throw new TCRuntimeException(e);
075: }
076: }
077: }
078: verify(i + 1, resultSet);
079: if (i != (iterationCount - 1)) {
080: synchronized (resultSet) {
081: for (Iterator iter = resultSet.iterator(); iter
082: .hasNext();) {
083: put(workQueue, iter.next());
084: iter.remove();
085: }
086: }
087: }
088: }
089: for (int i = 0; i < wf.getGlobalWorkerCount(); i++) {
090: put(workQueue, "STOP");
091: }
092: }
093: } catch (Exception e) {
094: throw new TCRuntimeException(e);
095: }
096: }
097:
098: private void populateWorkQueue(int size, int depth, List queue) {
099: System.err.println(" Thread - "
100: + Thread.currentThread().getName()
101: + " inside populateWorkQueue !");
102: for (int i = 0; i < size; i++) {
103: TestObject to = new TestObject("" + i);
104: to.populate(depth);
105: put(queue, to);
106: }
107: }
108:
109: private void verify(int expectedValue, Collection results) {
110: synchronized (results) {
111: Assert.assertEquals(workSize, results.size());
112: int cnt = 0;
113: for (Iterator i = results.iterator(); i.hasNext();) {
114: TestObject to = (TestObject) i.next();
115: if (!to.validate(expectedValue)) {
116: throw new RuntimeException("Failed!");
117: }
118: System.out.println("Verified object " + (cnt++));
119: }
120: }
121: }
122:
123: public final void println(Object o) {
124: try {
125: out.println(o);
126: } catch (InterruptedException e) {
127: throw new TCRuntimeException(e);
128: }
129: }
130:
131: public static void visitL1DSOConfig(ConfigVisitor visitor,
132: DSOClientConfigHelper config) {
133: visitL1DSOConfig(visitor, config, new HashMap());
134: }
135:
136: public static void visitL1DSOConfig(ConfigVisitor visitor,
137: DSOClientConfigHelper config, Map optionalAttributes) {
138: DebugUtil.DEBUG = true;
139:
140: boolean isSynchronousWrite = false;
141: if (optionalAttributes.size() > 0) {
142: isSynchronousWrite = Boolean.valueOf(
143: (String) optionalAttributes
144: .get(ObjectDataTestApp.SYNCHRONOUS_WRITE))
145: .booleanValue();
146: }
147:
148: visitor.visit(config, Barriers.class);
149:
150: String testClassName = ObjectDataTestApp.class.getName();
151: TransparencyClassSpec spec = config
152: .getOrCreateSpec(testClassName);
153:
154: String idProviderClassname = IDProvider.class.getName();
155: config.addIncludePattern(idProviderClassname);
156:
157: String linkedQueueClassname = LinkedQueue.class.getName();
158: config.addIncludePattern(linkedQueueClassname);
159:
160: String linkedNodeClassname = LinkedNode.class.getName();
161: config.addIncludePattern(linkedNodeClassname);
162: //
163: // String syncIntClassname = SynchronizedInt.class.getName();
164: // config.addIncludeClass(syncIntClassname);
165: //
166: // String syncVarClassname = SynchronizedVariable.class.getName();
167: // config.addIncludeClass(syncVarClassname);
168:
169: String testObjectClassname = TestObject.class.getName();
170: config.addIncludePattern(testObjectClassname);
171:
172: String workerClassname = Worker.class.getName();
173: config.addIncludePattern(workerClassname);
174:
175: // Create Roots
176: spec.addRoot("workQueue", testClassName + ".workQueue");
177: spec.addRoot("resultSet", testClassName + ".resultSet");
178: spec.addRoot("complete", testClassName + ".complete");
179: spec.addRoot("nodes", testClassName + ".nodes");
180:
181: String workerFactoryClassname = WorkerFactory.class.getName();
182: config.addIncludePattern(workerFactoryClassname);
183: TransparencyClassSpec workerFactorySpec = config
184: .getOrCreateSpec(workerFactoryClassname);
185: workerFactorySpec.addRoot("globalWorkerCount",
186: workerFactoryClassname + ".globalWorkerCount");
187:
188: // Create locks
189: String verifyExpression = "* " + testClassName + ".verify(..)";
190: addWriteAutolock(config, isSynchronousWrite, verifyExpression);
191:
192: String runExpression = "* " + testClassName + ".run(..)";
193: addWriteAutolock(config, isSynchronousWrite, runExpression);
194:
195: String populateWorkQueueExpression = "* " + testClassName
196: + ".populateWorkQueue(..)";
197: addWriteAutolock(config, isSynchronousWrite,
198: populateWorkQueueExpression);
199:
200: String putExpression = "* " + testClassName + ".put(..)";
201: addWriteAutolock(config, isSynchronousWrite, putExpression);
202:
203: String takeExpression = "* " + testClassName + ".take(..)";
204: addWriteAutolock(config, isSynchronousWrite, takeExpression);
205:
206: // TestObject config
207: String incrementExpression = "* " + testObjectClassname
208: + ".increment(..)";
209: addWriteAutolock(config, isSynchronousWrite,
210: incrementExpression);
211:
212: String populateExpression = "* " + testObjectClassname
213: + ".populate(..)";
214: addWriteAutolock(config, isSynchronousWrite, populateExpression);
215:
216: String validateExpression = "* " + testObjectClassname
217: + ".validate(..)";
218: config.addReadAutolock(validateExpression);
219:
220: // Worker factory config
221: String workerFactoryExpression = "* " + workerFactoryClassname
222: + ".*(..)";
223: addWriteAutolock(config, isSynchronousWrite,
224: workerFactoryExpression);
225:
226: // Worker config
227: String workerRunExpression = "* " + workerClassname
228: + ".run(..)";
229: addWriteAutolock(config, isSynchronousWrite,
230: workerRunExpression);
231:
232: new SynchronizedIntSpec().visit(visitor, config);
233:
234: // IDProvider config
235: String nextIDExpression = "* " + idProviderClassname
236: + ".nextID(..)";
237: addWriteAutolock(config, isSynchronousWrite, nextIDExpression);
238:
239: DebugUtil.DEBUG = false;
240: }
241:
242: private static void addWriteAutolock(DSOClientConfigHelper config,
243: boolean isSynchronousWrite, String methodPattern) {
244: if (isSynchronousWrite) {
245: config.addSynchronousWriteAutolock(methodPattern);
246: debugPrintln("***** doing a synchronous write");
247: } else {
248: config.addWriteAutolock(methodPattern);
249: }
250: }
251:
252: private static void debugPrintln(String s) {
253: if (DebugUtil.DEBUG) {
254: System.err.println(s);
255: }
256: }
257:
258: public static final class WorkerFactory {
259: private int localWorkerCount = 0;
260: private final SynchronizedInt globalWorkerCount; // = new SynchronizedInt(0);
261: private final List workQueue;
262: private final Collection results;
263: private final Collection localWorkers = new HashSet();
264: private final SynchronizedInt complete;
265: private final String appId;
266:
267: public WorkerFactory(String appId, List workQueue,
268: Collection results, SynchronizedInt complete) {
269: this .appId = appId;
270: this .workQueue = workQueue;
271: this .results = results;
272: this .complete = complete;
273: this .globalWorkerCount = new SynchronizedInt(0);
274: }
275:
276: public Worker newWorker() {
277: localWorkerCount++;
278: int globalWorkerID = globalWorkerCount.increment();
279: // System.err.println("Worker: " + globalWorkerID);
280: Worker rv = new Worker("(" + appId + ") : Worker "
281: + globalWorkerID + "," + localWorkerCount,
282: this .workQueue, this .results, this .complete);
283: this .localWorkers.add(rv);
284: return rv;
285: }
286:
287: public int getGlobalWorkerCount() {
288: return globalWorkerCount.get();
289: }
290: }
291:
292: protected static final class Worker implements Runnable {
293:
294: private final String name;
295: private final List workQueue;
296: private final Collection results;
297: private final SynchronizedInt workCompletedCount = new SynchronizedInt(
298: 0);
299: private final SynchronizedInt objectChangeCount = new SynchronizedInt(
300: 0);
301:
302: public Worker(String name, List workQueue, Collection results,
303: SynchronizedInt complete) {
304: this .name = name;
305: this .workQueue = workQueue;
306: this .results = results;
307: }
308:
309: public void run() {
310: Thread.currentThread().setName(name);
311: try {
312: while (true) {
313: TestObject to;
314: Object o = take(workQueue);
315: if (o instanceof TestObject) {
316: to = (TestObject) o;
317: System.err.println(name + " : Got : " + to);
318: } else if ("STOP".equals(o)) {
319: return;
320: } else {
321: throw new RuntimeException("Unexpected task: "
322: + o);
323: }
324: objectChangeCount.add(to.increment());
325: synchronized (results) {
326: results.add(to);
327: results.notifyAll();
328: }
329: workCompletedCount.increment();
330: }
331: } catch (Exception e) {
332: throw new TCRuntimeException(e);
333: }
334: }
335:
336: public int getWorkCompletedCount() {
337: return this .workCompletedCount.get();
338: }
339:
340: public int getObjectChangeCount() {
341: return this .objectChangeCount.get();
342: }
343: }
344:
345: public static final class Barriers {
346: private final Map barriers;
347: private final int nodeCount;
348:
349: public static void visitL1DSOConfig(ConfigVisitor visitor,
350: DSOClientConfigHelper config) {
351: String classname = Barriers.class.getName();
352: TransparencyClassSpec spec = config
353: .getOrCreateSpec(classname);
354: spec.addRoot("barriers", classname + ".barriers");
355: String barriersExpression = "* " + classname + ".*(..)";
356: config.addWriteAutolock(barriersExpression);
357:
358: String cyclicBarrierClassname = CyclicBarrier.class
359: .getName();
360: config.addIncludePattern(cyclicBarrierClassname);
361:
362: // CyclicBarrier config
363: String cyclicBarrierExpression = "* "
364: + cyclicBarrierClassname + ".*(..)";
365: config.addWriteAutolock(cyclicBarrierExpression);
366: }
367:
368: public Barriers(int nodeCount) {
369: this .barriers = new HashMap();
370: this .nodeCount = nodeCount;
371: }
372:
373: public int barrier(int barrierID) {
374: try {
375: return getOrCreateBarrier(barrierID).barrier();
376: } catch (InterruptedException e) {
377: throw new TCRuntimeException(e);
378: }
379: }
380:
381: private CyclicBarrier getOrCreateBarrier(int barrierID) {
382: synchronized (barriers) {
383: Integer key = new Integer(barrierID);
384: CyclicBarrier rv = (CyclicBarrier) barriers.get(key);
385: if (rv == null) {
386: rv = new CyclicBarrier(this .nodeCount);
387: this .barriers.put(key, rv);
388: }
389: return rv;
390: }
391: }
392:
393: }
394:
395: protected static final class TestObject {
396: private TestObject child;
397: private int counter;
398: private List activity = new ArrayList();
399: private String id;
400:
401: public TestObject(String id) {
402: this .id = id;
403: }
404:
405: private synchronized void addActivity(Object msg) {
406: activity.add(msg + "\n");
407: }
408:
409: public void populate(int count) {
410: TestObject to = this ;
411: for (int i = 0; i < count; i++) {
412: synchronized (to) {
413: addActivity(this + ": Populated : (i,count) = ("
414: + i + "," + count + ") @ " + new Date()
415: + " by thread "
416: + Thread.currentThread().getName());
417: to.child = new TestObject(id + "," + i);
418: }
419: to = to.child;
420: }
421: }
422:
423: public int increment() {
424: TestObject to = this ;
425: int currentValue = Integer.MIN_VALUE;
426: int changeCounter = 0;
427: do {
428: synchronized (to) {
429: // XXX: This synchronization is here to provide transaction boundaries, not because other threads will be
430: // fussing with this object.
431: if (currentValue == Integer.MIN_VALUE) {
432: currentValue = to.counter;
433: }
434: if (currentValue != to.counter) {
435: throw new RuntimeException(
436: "Expected current value="
437: + currentValue
438: + ", actual current value="
439: + to.counter);
440: }
441: to.addActivity(this
442: + ": increment <inside loop> : old value="
443: + to.counter + ", thread="
444: + Thread.currentThread().getName() + " - "
445: + to.counter + " @ " + new Date());
446: to.counter++;
447: changeCounter++;
448: }
449: } while ((to = to.getChild()) != null);
450: return changeCounter;
451: }
452:
453: public boolean validate(int expectedValue) {
454: TestObject to = this ;
455: do {
456: // XXX: This synchronization is here to provide transaction boundaries, not because other threads will be
457: // fussing with this object.
458: synchronized (to) {
459: if (to.counter != expectedValue) {
460: System.err.println("Expected " + expectedValue
461: + " but found: " + to.counter
462: + " on Test Object : " + to);
463: System.err.println(" To Activities = "
464: + to.activity);
465: System.err.println(" This Activities = "
466: + activity);
467: return false;
468: }
469: }
470: } while ((to = to.getChild()) != null);
471: return true;
472: }
473:
474: private synchronized TestObject getChild() {
475: return child;
476: }
477:
478: public String toString() {
479: return "TestObject@" + System.identityHashCode(this ) + "("
480: + id + ")={ counter = " + counter + " }";
481: }
482: }
483:
484: protected static final class IDProvider {
485: private int current;
486:
487: public synchronized Integer nextID() {
488: int rv = current++;
489: // System.err.println("Issuing new id: " + rv);
490: return new Integer(rv);
491: }
492:
493: public synchronized Integer getCurrentID() {
494: return new Integer(current);
495: }
496: }
497:
498: private static Object take(List workQueue2) {
499: synchronized (workQueue2) {
500: while (workQueue2.size() == 0) {
501: try {
502: System.err.println(Thread.currentThread().getName()
503: + " : Going to sleep : Size = "
504: + workQueue2.size());
505: workQueue2.wait();
506: System.err.println(Thread.currentThread().getName()
507: + " : Waking from sleep : Size = "
508: + workQueue2.size());
509: } catch (InterruptedException e) {
510: throw new RuntimeException(e);
511: }
512: }
513: return workQueue2.remove(0);
514: }
515: }
516:
517: private static void put(List workQueue2, Object o) {
518: synchronized (workQueue2) {
519: workQueue2.add(o);
520: workQueue2.notify();
521: }
522: }
523: }
|