001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.internal;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.main.*;
031: import java.util.*;
032:
033: /**
034: * AggTPSThreadManager is a refinement of the TPSTM; it attempts to
035: * schedule stages to improve aggregation. The basic algorithm is to
036: * maintain a tunable "aggregation target", the minimum queue size
037: * threshold which triggers the execution of a stage's handler. This
038: * aggregation target is increased when more than 1 stage can meet
039: * the target, and reduced when no stages can meet it. A target of 1
040: * is equivalent to the TPSTM algorithm.
041: *
042: * @author Matt Welsh
043: */
044: class AggTPSThreadManager implements ThreadManagerIF, sandStormConst {
045:
046: private static final boolean DEBUG = true;
047: private static final boolean DEBUG_VERBOSE = false;
048:
049: private static final int INITIAL_THREADPOOL_SIZE = 1;
050: private int maxAggregation;
051:
052: private Vector stages;
053: private Vector threadpools;
054: private ThreadGroup tg;
055: private Thread governor;
056: private boolean useGovernor;
057: private int governorDelay, governorMaxThreads, governorThreshold;
058:
059: // Number of events we would like to wait for before scheduling stage
060: private int aggregationTarget = 1;
061: private Object lock;
062:
063: AggTPSThreadManager(SandstormConfig config) {
064: this .useGovernor = config
065: .getBoolean("global.AggTPSTM.governor.enable");
066: this .governorDelay = config
067: .getInt("global.AggTPSTM.governor.delay");
068: this .governorMaxThreads = config
069: .getInt("global.AggTPSTM.governor.maxThreads");
070: this .governorThreshold = config
071: .getInt("global.AggTPSTM.governor.threshold");
072:
073: maxAggregation = config.getInt("global.maxBatch");
074:
075: stages = new Vector(1);
076: tg = new ThreadGroup("AggTPSThreadManager");
077: lock = new Object();
078: }
079:
080: /**
081: * Register a stage with this thread manager.
082: */
083: public void register(StageWrapperIF stage) {
084:
085: if (useGovernor && (governor == null)) {
086: System.err
087: .println("AggTPSThreadManager: Starting thread governor");
088: governor = new Thread(tg, new governorThread(),
089: "AggTPSTM Governor");
090: governor.start();
091: }
092:
093: System.err
094: .println("AggTPSThreadManager: Starting thread pool for "
095: + stage + ", maxAggregation " + maxAggregation);
096: stageInfo si = new stageInfo(stage);
097: stages.addElement(si);
098: si.start();
099: }
100:
101: /**
102: * Deregister a stage with this thread manager.
103: */
104: public void deregister(StageWrapperIF stage) {
105: System.err.println("AggTPSThreadManager: Deregistering stage "
106: + stage);
107: Enumeration e = stages.elements();
108: while (e.hasMoreElements()) {
109: stageInfo stageinfo = (stageInfo) e.nextElement();
110: if (stageinfo.stage == stage) {
111: stageinfo.stop();
112: }
113: }
114: if (!stages.removeElement(stage))
115: throw new IllegalArgumentException("Stage " + stage
116: + " not registered with this TM");
117: }
118:
119: /**
120: * Stop the thread manager and all threads managed by it.
121: */
122: public void deregisterAll() {
123: Enumeration e = stages.elements();
124: while (e.hasMoreElements()) {
125: stageInfo stageinfo = (stageInfo) e.nextElement();
126: deregister(stageinfo.stage);
127: }
128: }
129:
130: /**
131: * Stop the thread manager and all threads managed by it.
132: */
133: public void stop() {
134: System.err.println("AggTPSThreadManager: Stopping "
135: + threadpools.size() + " threadpools");
136: tg.stop();
137: }
138:
139: /**
140: * Internal class representing state for a given stage.
141: */
142: class stageInfo {
143: StageWrapperIF stage;
144: threadPool tp;
145:
146: stageInfo(StageWrapperIF stage) {
147: this .stage = stage;
148: // Create a threadPool for each stage
149: tp = new threadPool(stage, stage.getSource());
150: }
151:
152: void start() {
153: tp.start();
154: }
155:
156: void stop() {
157: tp.stop();
158: }
159: }
160:
161: /**
162: * Internal class representing a single AggTPSTM-managed thread.
163: */
164: class appThread implements Runnable {
165:
166: private StageWrapperIF wrapper;
167: private SourceIF source;
168: private String name;
169: private threadPool mytp;
170:
171: appThread(StageWrapperIF wrapper, SourceIF source, String name,
172: threadPool tp) {
173: this .wrapper = wrapper;
174: this .source = source;
175: this .name = name;
176: this .mytp = tp;
177: }
178:
179: public void run() {
180: if (DEBUG)
181: System.err.println(name + ": starting, source is "
182: + source);
183: int aTarget;
184: boolean needToBlock = false;
185:
186: while (true) {
187:
188: try {
189:
190: synchronized (lock) {
191: aTarget = aggregationTarget;
192: }
193:
194: if (aTarget > 1) {
195:
196: // First check if my queue has enough elements
197: if (source.size() >= aTarget) {
198: if (DEBUG_VERBOSE)
199: System.err
200: .println(name
201: + ": "
202: + source.size()
203: + " elements in queue, dispatching");
204: QueueElementIF fetched[];
205: if (maxAggregation == -1) {
206: fetched = source.dequeue_all();
207: } else {
208: fetched = source
209: .dequeue(maxAggregation);
210: }
211: wrapper.getEventHandler().handleEvents(
212: fetched);
213: needToBlock = false;
214: } else {
215: needToBlock = true;
216: }
217:
218: // Now check other stages
219: int numActive = 0;
220:
221: while (numActive == 0) {
222:
223: // Is any other stage ready to run?
224: for (int i = 0; i < stages.size(); i++) {
225: stageInfo si = (stageInfo) stages
226: .elementAt(i);
227: if (si.tp.source.size() >= aTarget) {
228: // Wake it up
229: synchronized (si.tp) {
230: si.tp.notifyAll();
231: }
232: numActive++;
233: }
234: }
235: }
236:
237: if (numActive == 0) {
238: // Reduce aggregationTarget
239: synchronized (lock) {
240: aggregationTarget /= 2;
241: if (DEBUG)
242: System.err
243: .println("aggTPS: numActive is 0, decreasing aggregationTarget to "
244: + aggregationTarget);
245:
246: if (aggregationTarget == 1) {
247: // Wake up every pool
248: for (int i = 0; i < stages.size(); i++) {
249: stageInfo si = (stageInfo) stages
250: .elementAt(i);
251: // Wake it up
252: synchronized (si.tp) {
253: si.tp.notifyAll();
254: }
255: }
256: }
257: }
258: } else if (numActive > 1) {
259: // Increase aggregationTarget
260: synchronized (lock) {
261: aggregationTarget *= 2;
262: if (DEBUG)
263: System.err
264: .println("aggTPS: numActive is "
265: + numActive
266: + ", increasing aggregation target to "
267: + aggregationTarget);
268: }
269: }
270:
271: if (needToBlock) {
272: // Wait for another thread to signal
273: synchronized (mytp) {
274: try {
275: mytp.wait();
276: } catch (InterruptedException ie) {
277: // Ignore
278: }
279: }
280: }
281:
282: } else {
283:
284: // If aggregationTarget is 1, all we can do is block
285: if (DEBUG_VERBOSE)
286: System.err.println(name
287: + ": Blocking dequeue");
288: QueueElementIF fetched[];
289: if (maxAggregation == -1) {
290: fetched = source.blocking_dequeue_all(-1);
291: } else {
292: fetched = source.blocking_dequeue(-1,
293: maxAggregation);
294: }
295: if (DEBUG_VERBOSE)
296: System.err.println(name + ": Got "
297: + fetched.length + " elements");
298: wrapper.getEventHandler().handleEvents(fetched);
299: }
300:
301: } catch (Exception e) {
302: System.err
303: .println("AggTPSThreadManager: appThread ["
304: + name + "] got exception " + e);
305: e.printStackTrace();
306: }
307: }
308: }
309: }
310:
311: class threadPool {
312: String stagename;
313: StageWrapperIF wrapper;
314: SourceIF source;
315: private Vector threads;
316:
317: threadPool(StageWrapperIF wrapper, SourceIF source) {
318: this .wrapper = wrapper;
319: this .source = source;
320: this .stagename = wrapper.getStage().getName();
321: threads = new Vector(1);
322: addThreads(1, false);
323: }
324:
325: void addThreads(int num, boolean start) {
326: for (int i = 0; i < num; i++) {
327: String name = "AggTPSTM-" + numThreads() + " <"
328: + stagename + ">";
329: Thread t = new Thread(tg, new appThread(wrapper,
330: source, name, this ), name);
331: threads.addElement(t);
332: if (start)
333: t.start();
334: }
335: }
336:
337: int numThreads() {
338: return threads.size();
339: }
340:
341: void start() {
342: System.err.println(" <" + stagename + "> pool: Starting "
343: + numThreads() + " threads");
344: for (int i = 0; i < threads.size(); i++) {
345: Thread t = (Thread) threads.elementAt(i);
346: t.start();
347: }
348: }
349:
350: void stop() {
351: System.err.println(" <" + stagename + "> pool: Stopping "
352: + numThreads() + " threads");
353: for (int i = 0; i < threads.size(); i++) {
354: Thread t = (Thread) threads.elementAt(i);
355: t.stop();
356: }
357: }
358:
359: public String toString() {
360: return "AggTPSTM threadPool (size=" + numThreads()
361: + ") for <" + stagename + ">";
362: }
363:
364: }
365:
366: /**
367: * Internal class implementing a thread governor - analyses appThread
368: * queue lengths and adjusts thread pool sizes accordingly.
369: */
370: class governorThread implements Runnable {
371:
372: public void run() {
373: if (DEBUG)
374: System.err.println("AggTPSTM Governor: starting");
375:
376: while (true) {
377: adjustThreadPools();
378: try {
379: Thread.currentThread().sleep(governorDelay);
380: } catch (InterruptedException ie) {
381: // Ignore
382: }
383: }
384: }
385:
386: private void adjustThreadPools() {
387: // Really dumb algorithm for now
388: for (int i = 0; i < threadpools.size(); i++) {
389: threadPool pool = (threadPool) threadpools.elementAt(i);
390:
391: if (DEBUG)
392: System.err.println("AggTPSTM Governor: Inspecting "
393: + pool);
394:
395: // Only adjust pools pulling data from a SourceIF/SinkIF pair
396: if (pool.source instanceof SinkIF) {
397: SinkIF sink = (SinkIF) pool.source;
398: int sz = sink.size();
399: if (DEBUG)
400: System.err.println("AggTPSTM Governor: size "
401: + sz + ", thresh " + governorThreshold);
402: if (sz == governorThreshold) {
403: // Queue is full, add a thread
404: int numt = pool.numThreads();
405: if (numt < governorMaxThreads) {
406: System.err
407: .println("AggTPSTM Governor: Adding thread to pool "
408: + pool);
409: pool.addThreads(1, true);
410: } else {
411: if (DEBUG)
412: System.err
413: .println("AggTPSTM Governor: Pool "
414: + pool
415: + " already at max");
416: }
417: }
418: }
419: }
420: }
421: }
422:
423: }
|