001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.internal;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.main.*;
031: import java.util.*;
032:
033: /**
034: * The ThreadPoolController is responsible for dynamically adusting the
035: * size of a given ThreadPool.
036: *
037: * @author Matt Welsh
038: */
039:
040: public class ThreadPoolController {
041:
042: private static final boolean DEBUG = false;
043:
044: // Multiple of standard controller delay
045: private static final int CONTROLLER_DELAY = 4;
046:
047: // Multiple of standard controller delay
048: private static final int THROUGHPUT_MEASUREMENT_DELAY = 1;
049:
050: // Multiple of standard controller delay
051: private static final int AUTO_MAX_DETECT_DELAY = 10;
052:
053: // Size of random jump down in number of threads
054: private static final int AUTO_MAX_DETECT_RANDOM_JUMP = 4;
055:
056: private static final double SMOOTH_CONST = 0.3;
057:
058: private ManagerIF mgr;
059: private Vector tpvec;
060:
061: private boolean autoMaxDetect;
062: private Thread controller;
063: private int controllerDelay, controllerThreshold;
064:
065: public ThreadPoolController(ManagerIF mgr) {
066: this .mgr = mgr;
067: tpvec = new Vector();
068:
069: SandstormConfig config = mgr.getConfig();
070: this .controllerDelay = config
071: .getInt("global.threadPool.sizeController.delay");
072: this .controllerThreshold = config
073: .getInt("global.threadPool.sizeController.threshold");
074: this .autoMaxDetect = config
075: .getBoolean("global.threadPool.sizeController.autoMaxDetect");
076:
077: start();
078: }
079:
080: public ThreadPoolController(ManagerIF mgr, int delay, int threshold) {
081: this .mgr = mgr;
082: tpvec = new Vector();
083: this .controllerDelay = delay;
084: SandstormConfig config = mgr.getConfig();
085: if (this .controllerDelay == -1) {
086: this .controllerDelay = config
087: .getInt("global.threadPool.sizeController.delay");
088: }
089: this .controllerThreshold = threshold;
090: if (this .controllerThreshold == -1) {
091: this .controllerThreshold = config
092: .getInt("global.threadPool.sizeController.threshold");
093: }
094:
095: this .autoMaxDetect = config
096: .getBoolean("global.threadPool.sizeController.autoMaxDetect");
097: start();
098: }
099:
100: /**
101: * Register a thread pool with this controller, using the queue threshold
102: * specified by the system configuration.
103: */
104: public void register(StageWrapperIF stage, ThreadPool tp) {
105: SandstormConfig config = mgr.getConfig();
106: int thresh = config.getInt("stages."
107: + stage.getStage().getName()
108: + ".threadPool.sizeController.threshold",
109: controllerThreshold);
110: tpvec.addElement(new tpcClient(stage, tp, null, thresh));
111: }
112:
113: /**
114: * Register a thread pool with this controller, using the queue threshold
115: * specified by the system configuration.
116: */
117: public void register(StageWrapperIF stage, ThreadPool tp,
118: ProfilableIF metric) {
119: tpvec.addElement(new tpcClient(stage, tp, metric,
120: controllerThreshold));
121: }
122:
123: private void start() {
124: System.err.println("ThreadPoolController: Started, delay "
125: + controllerDelay + " ms, threshold "
126: + controllerThreshold + ", autoMaxDetect "
127: + autoMaxDetect);
128: controller = new Thread(new controllerThread(), "TPC");
129: controller.start();
130: }
131:
132: /**
133: * Internal class representing a single TPC-controlled thread pool.
134: */
135: class tpcClient {
136: private StageWrapperIF stage;
137: private ThreadPool tp;
138: private int threshold;
139: private ProfilableIF metric;
140:
141: int savedThreads, avgThreads;
142: long savedTotalEvents;
143: double savedThroughput, avgThroughput;
144: long last_time, reset_time;
145:
146: tpcClient(final StageWrapperIF stage, ThreadPool tp,
147: ProfilableIF metric, int threshold) {
148: this .stage = stage;
149: this .tp = tp;
150: this .threshold = threshold;
151: this .metric = metric;
152: if (this .metric == null) {
153: this .metric = new ProfilableIF() {
154: public int profileSize() {
155: return stage.getSource().size();
156: }
157: };
158: }
159:
160: savedThreads = tp.numThreads();
161: reset_time = last_time = System.currentTimeMillis();
162:
163: mgr.getProfiler().add(
164: "TPController savedThreads <"
165: + stage.getStage().getName() + ">",
166: new ProfilableIF() {
167: public int profileSize() {
168: return (int) savedThreads;
169: }
170: });
171:
172: mgr.getProfiler().add(
173: "TPController avgThreads <"
174: + stage.getStage().getName() + ">",
175: new ProfilableIF() {
176: public int profileSize() {
177: return (int) avgThreads;
178: }
179: });
180:
181: mgr.getProfiler().add(
182: "TPController savedThroughput <"
183: + stage.getStage().getName() + ">",
184: new ProfilableIF() {
185: public int profileSize() {
186: return (int) savedThroughput;
187: }
188: });
189:
190: mgr.getProfiler().add(
191: "TPController avgThroughput <"
192: + stage.getStage().getName() + ">",
193: new ProfilableIF() {
194: public int profileSize() {
195: return (int) avgThroughput;
196: }
197: });
198: }
199: }
200:
201: /**
202: * Internal class implementing the controller.
203: */
204: class controllerThread implements Runnable {
205:
206: int adjust_count = 0;
207: Random rand;
208:
209: controllerThread() {
210: rand = new Random();
211: }
212:
213: public void run() {
214: if (DEBUG)
215: System.err.println("TP size controller: starting");
216:
217: while (true) {
218: adjustThreadPools();
219: try {
220: Thread.currentThread().sleep(controllerDelay);
221: } catch (InterruptedException ie) {
222: // Ignore
223: }
224: }
225: }
226:
227: private void adjustThreadPools() {
228:
229: adjust_count++;
230:
231: if ((adjust_count % CONTROLLER_DELAY) == 0) {
232:
233: for (int i = 0; i < tpvec.size(); i++) {
234: tpcClient tpc = (tpcClient) tpvec.elementAt(i);
235:
236: //if (DEBUG) System.err.println("TP controller: Inspecting "+tpc.tp);
237:
238: int sz = tpc.metric.profileSize();
239: //if (DEBUG) System.err.println("TP controller: "+tpc.tp+" has size "+sz+", threshold "+tpc.threshold);
240: boolean addThread = false;
241: if (sz >= tpc.threshold)
242: addThread = true;
243:
244: if (addThread) {
245: tpc.tp.addThreads(1, true);
246: }
247: }
248: }
249:
250: if ((DEBUG || autoMaxDetect)
251: && (adjust_count % THROUGHPUT_MEASUREMENT_DELAY) == 0) {
252:
253: long curTime = System.currentTimeMillis();
254:
255: for (int i = 0; i < tpvec.size(); i++) {
256: tpcClient tpc = (tpcClient) tpvec.elementAt(i);
257:
258: StageWrapper sw;
259: try {
260: sw = (StageWrapper) tpc.stage;
261: } catch (ClassCastException se) {
262: // Skip this one
263: continue;
264: }
265:
266: long events = sw.getStats().getTotalEvents();
267: long curEvents = events - tpc.savedTotalEvents;
268: tpc.savedTotalEvents = events;
269: if (DEBUG)
270: System.err.println("TP <"
271: + tpc.stage.getStage().getName()
272: + "> events " + events + " curEvents "
273: + curEvents);
274:
275: int curThreads = tpc.tp.numThreads();
276: tpc.avgThreads = (int) ((SMOOTH_CONST * curThreads) + ((1.0 - SMOOTH_CONST) * (double) (tpc.avgThreads * 1.0)));
277:
278: //double throughput = (sw.getStats().getServiceRate() * curThreads);
279: double throughput = (curEvents * 1.0)
280: / ((curTime - tpc.last_time) * 1.0e-3);
281: tpc.avgThroughput = (SMOOTH_CONST * throughput)
282: + ((1.0 - SMOOTH_CONST) * (double) (tpc.avgThroughput * 1.0));
283: if (DEBUG)
284: System.err.println("TP <"
285: + tpc.stage.getStage().getName()
286: + "> throughput " + tpc.avgThroughput);
287: tpc.last_time = curTime;
288: }
289: }
290:
291: if (autoMaxDetect
292: && (adjust_count % AUTO_MAX_DETECT_DELAY) == 0) {
293:
294: for (int i = 0; i < tpvec.size(); i++) {
295: tpcClient tpc = (tpcClient) tpvec.elementAt(i);
296:
297: // Periodically override saved values
298: //long tr = curTime - tpc.reset_time;
299: //if (rand.nextDouble() < 1.0 - Math.exp(-1.0 * (tr / 1e5)))
300: // System.err.println("TP controller <"+tpc.stage.getStage().getName()+"> Resetting saved values");
301: // tpc.reset_time = curTime;
302: // tpc.savedThreads = tpc.avgThreads;
303: // tpc.savedThroughput = tpc.avgThroughput;
304:
305: // Make random jump down
306: // int nt = (int)(rand.nextDouble() * AUTO_MAX_DETECT_RANDOM_JUMP);
307: // tpc.tp.removeThreads(nt);
308: //
309:
310: //continue;
311:
312: if (tpc.avgThroughput >= (1.0 * tpc.savedThroughput)) {
313: // Accept new state
314:
315: tpc.savedThreads = tpc.tp.numThreads();
316: tpc.savedThroughput = tpc.avgThroughput;
317: if (DEBUG)
318: System.err.println("TP controller <"
319: + tpc.stage.getStage().getName()
320: + "> Setting new state to threads="
321: + tpc.savedThreads + " tp="
322: + tpc.savedThroughput);
323:
324: // else if (tpc.avgThroughput <= (1.2 * tpc.savedThroughput))
325: // We are degrading: halve the number of threads
326:
327: // int numThreads = tpc.tp.numThreads();
328: // int newThreads = Math.max(1, numThreads / 2);
329: // System.err.println("TP controller <"+tpc.stage.getStage().getName()+"> Degrading (tp="+tpc.avgThroughput+") Reverting to threads="+tpc.savedThreads+"/"+newThreads+" stp="+tpc.savedThroughput);
330: // if (newThreads < numThreads)
331: // tpc.tp.removeThreads(numThreads - newThreads);
332: // tpc.savedThroughput = tpc.avgThroughput;
333: // tpc.savedThreads = newThreads;
334:
335: } else if (tpc.avgThroughput <= (1.2 * tpc.savedThroughput)) {
336: // Otherwise reset to savedThreads (minus random jump down)
337: // as long as the number of threads is different
338:
339: if (tpc.savedThreads != tpc.tp.numThreads()) {
340: int numThreads = tpc.tp.numThreads();
341: int nt = (int) (rand.nextDouble() * AUTO_MAX_DETECT_RANDOM_JUMP);
342: int newThreads = Math.max(1,
343: tpc.savedThreads - nt);
344:
345: if (DEBUG || autoMaxDetect)
346: System.err.println("TP controller <"
347: + tpc.stage.getStage()
348: .getName()
349: + "> Reverting to threads="
350: + tpc.savedThreads + "/"
351: + newThreads + " stp="
352: + tpc.savedThroughput);
353:
354: if (newThreads < numThreads) {
355: // Remove threads
356: tpc.tp.removeThreads(numThreads
357: - newThreads);
358: } else if (newThreads > numThreads) {
359: // Add threads
360: tpc.tp.addThreads(newThreads
361: - numThreads, true);
362: }
363: }
364: }
365: }
366: return;
367: }
368:
369: }
370: }
371: }
|