001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tctest;
005:
006: import com.tc.object.config.ConfigVisitor;
007: import com.tc.object.config.DSOClientConfigHelper;
008: import com.tc.object.config.TransparencyClassSpec;
009: import com.tc.simulator.app.ApplicationConfig;
010: import com.tc.simulator.listener.ListenerProvider;
011: import com.tctest.util.AbstractTransparentAppMultiplexer;
012: import com.tctest.util.DSOConfigUtil;
013: import com.tctest.util.TestUtil;
014: import com.tctest.util.Timer;
015:
016: import java.util.concurrent.BlockingQueue;
017: import java.util.concurrent.CyclicBarrier;
018: import java.util.concurrent.LinkedBlockingQueue;
019:
020: public class WorkQueueTestApp extends AbstractTransparentAppMultiplexer {
021: private static final int NUM_ITEMS = 100;
022: private static final int SIZE_ITEMS = 10;
023:
024: private final BlockingQueue<Object> queue;
025: private final CyclicBarrier readBarrier;
026: private final Object poison;
027:
028: private ItemGenerator itemGenerator = new ItemGenerator();
029:
030: public static class Item {
031: public byte[] data;
032: }
033:
034: public static class ItemGenerator {
035: public Object next() {
036: Item item = new Item();
037: item.data = new byte[SIZE_ITEMS];
038: return item;
039: }
040:
041: }
042:
043: public WorkQueueTestApp(String appId, ApplicationConfig cfg,
044: ListenerProvider listenerProvider) {
045: super (appId, cfg, listenerProvider);
046: queue = new LinkedBlockingQueue<Object>();
047: readBarrier = new CyclicBarrier(Math.min(getParticipantCount(),
048: 2));
049: poison = new Object();
050: }
051:
052: public void run(CyclicBarrier barrier, int index) throws Throwable {
053: if (index == 0) {
054: doPuts();
055: return;
056: }
057:
058: doReads();
059: }
060:
061: private void doPuts() throws Exception {
062: Timer t = new Timer();
063:
064: System.out.println("Warming up...");
065:
066: for (int i = 0; i < NUM_ITEMS; i++) {
067: queue.put(itemGenerator.next());
068: }
069:
070: // put the read barrier in the queue so that we
071: // wait for the last warmup item to be read
072: queue.put(readBarrier);
073: readBarrier.await();
074:
075: // dump the items
076: System.out.println("Putting items...");
077: int total = NUM_ITEMS * getIntensity();
078: t.start();
079: for (int i = 0; i < total; i++) {
080: queue.put(itemGenerator.next());
081: }
082:
083: // put the read barrier in the queue so that we
084: // wait for the last item to be read
085: queue.put(readBarrier);
086: readBarrier.await();
087:
088: // stop the timer
089: t.stop();
090:
091: // add one more object to the total to account
092: // for the read barrier
093: total++;
094:
095: // send poison to one reader (each reader will requeue it to
096: // kill them all)
097: queue.put(poison);
098:
099: TestUtil.printStats("" + getParticipantCount(), "nodes");
100: TestUtil.printStats("" + total, "transactions");
101: TestUtil.printStats("" + t.elapsed(), "milliseconds");
102: TestUtil.printStats("" + t.tps(total), "tps");
103: }
104:
105: private void doReads() throws Exception {
106: System.out.println("Getting items...");
107:
108: while (true) {
109: Object item = queue.take();
110: if (item instanceof CyclicBarrier) {
111: ((CyclicBarrier) item).await();
112: continue;
113: }
114: if (item == poison) {
115: break;
116: }
117: }
118:
119: queue.put(poison);
120: }
121:
122: public static void visitL1DSOConfig(ConfigVisitor visitor,
123: DSOClientConfigHelper config) {
124: TransparencyClassSpec spec = config
125: .getOrCreateSpec(WorkQueueTestApp.class.getName());
126:
127: AbstractTransparentAppMultiplexer.visitL1DSOConfig(visitor,
128: config);
129:
130: DSOConfigUtil.autoLockAndInstrumentClass(config,
131: WorkQueueTestApp.class);
132:
133: DSOConfigUtil.addRoot(spec, "queue");
134: DSOConfigUtil.addRoot(spec, "readBarrier");
135: DSOConfigUtil.addRoot(spec, "poison");
136: }
137: }
|