001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tctest;
005:
006: import com.tc.object.config.ConfigVisitor;
007: import com.tc.object.config.DSOClientConfigHelper;
008: import com.tc.object.config.TransparencyClassSpec;
009: import com.tc.simulator.app.ApplicationConfig;
010: import com.tc.simulator.listener.ListenerProvider;
011: import com.tctest.util.AbstractTransparentAppMultiplexer;
012: import com.tctest.util.DSOConfigUtil;
013: import com.tctest.util.TestUtil;
014: import com.tctest.util.Timer;
015:
016: import java.util.concurrent.BlockingQueue;
017: import java.util.concurrent.CyclicBarrier;
018: import java.util.concurrent.LinkedBlockingQueue;
019:
020: public class WorkQueuesTestApp extends
021: AbstractTransparentAppMultiplexer {
022: private static final int NUM_ITEMS = 100;
023: private static final int SIZE_ITEMS = 10;
024:
025: private final QueueMultiplexer multiplexer;
026: private final CyclicBarrier readBarrier;
027: private final Object poison;
028:
029: private ItemGenerator itemGenerator = new ItemGenerator();
030:
031: public static class Item {
032: public byte[] data;
033: }
034:
035: public static class ItemGenerator {
036: public Item next() {
037: Item item = new Item();
038: item.data = new byte[SIZE_ITEMS];
039: return item;
040: }
041:
042: }
043:
044: public WorkQueuesTestApp(String appId, ApplicationConfig cfg,
045: ListenerProvider listenerProvider) {
046: super (appId, cfg, listenerProvider);
047: multiplexer = new QueueMultiplexer();
048: readBarrier = new CyclicBarrier(Math.min(getParticipantCount(),
049: 2));
050: poison = new Object();
051: }
052:
053: public void run(CyclicBarrier barrier, int index) throws Throwable {
054: if (index == 0) {
055: doPuts();
056: return;
057: }
058:
059: doReads();
060: }
061:
062: private void doPuts() throws Exception {
063: BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(
064: 500);
065: Timer t = new Timer();
066:
067: multiplexer.start(queue);
068:
069: System.out.println("Warming up...");
070: for (int i = 0; i < NUM_ITEMS; i++) {
071: queue.put(itemGenerator.next());
072: }
073:
074: // put the read barrier in the queue so that we
075: // wait for the last warmup item to be read
076: queue.put(readBarrier);
077: readBarrier.await();
078:
079: // dump the items
080: System.out.println("Putting items...");
081: int total = NUM_ITEMS * getIntensity();
082: t.start();
083: for (int i = 0; i < total; i++) {
084: queue.put(itemGenerator.next());
085: }
086:
087: // put the read barrier in the queue so that we
088: // wait for the last item to be read
089: queue.put(readBarrier);
090: readBarrier.await();
091:
092: // stop the timer
093: t.stop();
094:
095: // add one more object to the total to account
096: // for the read barrier
097: total++;
098:
099: // send poison to all readers
100: multiplexer.putAll(poison);
101:
102: // print stats
103: TestUtil.printStats("" + getParticipantCount(), "nodes");
104: TestUtil.printStats("" + total, "transactions");
105: TestUtil.printStats("" + t.elapsed(), "milliseconds");
106: TestUtil.printStats("" + t.tps(total), "tps");
107: }
108:
109: private void doReads() throws Exception {
110: BlockingQueue<Object> queue = multiplexer.getNewOutputQueue();
111:
112: System.out.println("Getting items...");
113:
114: while (true) {
115: Object item = queue.take();
116: if (item instanceof CyclicBarrier) {
117: ((CyclicBarrier) item).await();
118: continue;
119: }
120: if (item == poison) {
121: break;
122: }
123: }
124: }
125:
126: public static void visitL1DSOConfig(ConfigVisitor visitor,
127: DSOClientConfigHelper config) {
128: TransparencyClassSpec spec = config
129: .getOrCreateSpec(WorkQueuesTestApp.class.getName());
130:
131: AbstractTransparentAppMultiplexer.visitL1DSOConfig(visitor,
132: config);
133:
134: DSOConfigUtil.autoLockAndInstrumentClass(config,
135: WorkQueuesTestApp.class);
136: DSOConfigUtil.autoLockAndInstrumentClass(config,
137: QueueMultiplexer.class, true);
138:
139: DSOConfigUtil.addRoot(spec, "multiplexer");
140: DSOConfigUtil.addRoot(spec, "readBarrier");
141: DSOConfigUtil.addRoot(spec, "poison");
142: }
143: }
|