001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.internal;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.main.*;
031: import seda.util.*;
032: import java.util.*;
033:
034: /**
035: * An implementation of ResponseTimeController that models the stage
036: * as an M/M/1 queue.
037: *
038: * @author Matt Welsh
039: */
040: public class ResponseTimeControllerMM1 extends ResponseTimeController {
041:
042: private static final boolean DEBUG = true;
043:
044: private static final boolean ADJUST_THRESHOLD = false;
045: private static final boolean ADJUST_RATE = true;
046:
047: protected final static int INIT_THRESHOLD = 1;
048: protected final static int MIN_THRESHOLD = 1;
049: protected final static int MAX_THRESHOLD = 1024;
050:
051: private static final double INIT_RATE = -1.0;
052: private static final int INIT_DEPTH = 100;
053: private static final double MIN_RATE = 0.5;
054:
055: private static final boolean DEBUG_CAP_RATE = false;
056: private static final double DEBUG_RATE = 100000.0;
057:
058: private static final boolean MOVING_AVERAGE = true;
059: private static final int MEASUREMENT_SIZE = 200;
060:
061: // Arashi runs
062: private static final int ESTIMATION_SIZE = 5000;
063: private static final long ESTIMATION_TIME = 5000;
064:
065: // Arashi runs
066: // private static final int ESTIMATION_SIZE = 500;
067: // private static final long ESTIMATION_TIME = 2000;
068:
069: // Original benchmarking
070: // private static final int ESTIMATION_SIZE = 100;
071: // private static final long ESTIMATION_TIME = 1000;
072:
073: private static final double SMOOTH_CONST = 0.1;
074: private static final int NINETIETH = (MEASUREMENT_SIZE / 10) * 9; //(int)((double)MEASUREMENT_SIZE * 0.9);
075:
076: private static final boolean BIDIRECTIONAL_FILTER = true;
077: private static final double SMOOTH_CONST_UP = 0.9;
078: private static final double SMOOTH_CONST_DOWN = 0.1;
079:
080: private SinkProxy sinkProxy;
081: private long measurements[], sortedmeasurements[];
082: private int curThreshold, cur_measurement;
083: private double curRate;
084: private double measured_mu, measured_lambda, est_ninetiethRT;
085: private double total_measured_mu, count_measured_mu,
086: total_measured_lambda, count_measured_lambda,
087: total_est_ninetiethRT, count_est_ninetiethRT;
088: private double ninetiethRT, totalNinetiethRT;
089: private int countNinetiethRT;
090: private long lasttime, totalProcTime;
091: private long startProcTime, endProcTime;
092: private int numProcessed, numReceived, numEst;
093: private double avgNumThreads = 1.0;
094: private int totalNumThreads = 0, countNumThreads = 0;
095: private boolean enabled;
096:
097: public ResponseTimeControllerMM1(ManagerIF mgr, StageWrapperIF stage)
098: throws IllegalArgumentException {
099: super (mgr, stage);
100: this .sinkProxy = (SinkProxy) stage.getStage().getSink();
101: this .lasttime = System.currentTimeMillis();
102:
103: if (ADJUST_THRESHOLD) {
104: this .pred = new QueueThresholdPredicate(stage.getStage()
105: .getSink(), INIT_THRESHOLD);
106: this .curThreshold = ((QueueThresholdPredicate) pred)
107: .getThreshold();
108: }
109: if (ADJUST_RATE) {
110: this .pred = new RateLimitingPredicate(stage.getStage()
111: .getSink(), INIT_RATE, INIT_DEPTH);
112: this .curRate = ((RateLimitingPredicate) pred)
113: .getTargetRate();
114: }
115: stage.getStage().getSink().setEnqueuePredicate(pred);
116: enabled = true;
117:
118: this .measurements = new long[MEASUREMENT_SIZE];
119: this .sortedmeasurements = new long[MEASUREMENT_SIZE];
120: this .cur_measurement = 0;
121: this .startProcTime = Long.MAX_VALUE;
122: this .endProcTime = 0L;
123:
124: // Add profile
125: mgr.getProfiler().add(
126: "RTControllerMM1 90th-percentile RT <"
127: + stage.getStage().getName() + ">",
128: new ProfilableIF() {
129: public int profileSize() {
130: return (int) ninetiethRT;
131: }
132: });
133: mgr.getProfiler().add(
134: "RTControllerMM1 lambda <" + stage.getStage().getName()
135: + ">", new ProfilableIF() {
136: public int profileSize() {
137: return (int) measured_lambda;
138: }
139: });
140: mgr.getProfiler().add(
141: "RTControllerMM1 mu <" + stage.getStage().getName()
142: + ">", new ProfilableIF() {
143: public int profileSize() {
144: return (int) measured_mu;
145: }
146: });
147: mgr.getProfiler().add(
148: "RTControllerMM1 est90thRT <"
149: + stage.getStage().getName() + ">",
150: new ProfilableIF() {
151: public int profileSize() {
152: return (int) est_ninetiethRT;
153: }
154: });
155: mgr.getProfiler().add(
156: "RTControllerMM1 avgNumThreads <"
157: + stage.getStage().getName() + ">",
158: new ProfilableIF() {
159: public int profileSize() {
160: return (int) avgNumThreads;
161: }
162: });
163:
164: if (ADJUST_THRESHOLD) {
165: System.err.print("RTControllerMM1 <"
166: + stage.getStage().getName()
167: + ">: ADJUST_THRESHOLD enabled, INIT_THRESHOLD="
168: + INIT_THRESHOLD + ", ESTIMATION_SIZE="
169: + ESTIMATION_SIZE + ", ESTIMATION_TIME="
170: + ESTIMATION_TIME);
171: mgr.getProfiler().add(
172: "RTControllerMM1 queueThreshold <"
173: + stage.getStage().getName() + ">",
174: new ProfilableIF() {
175: public int profileSize() {
176: return curThreshold;
177: }
178: });
179: }
180:
181: if (ADJUST_RATE) {
182: System.err.print("RTControllerMM1 <"
183: + stage.getStage().getName()
184: + ">: ADJUST_RATE enabled, INIT_DEPTH="
185: + INIT_DEPTH + ", ESTIMATION_SIZE="
186: + ESTIMATION_SIZE + ", ESTIMATION_TIME="
187: + ESTIMATION_TIME);
188: if (BIDIRECTIONAL_FILTER) {
189: System.err.println(", SMOOTH_CONST_UP="
190: + SMOOTH_CONST_UP + ", SMOOTH_CONST_DOWN="
191: + SMOOTH_CONST_DOWN);
192: } else {
193: System.err.println(", SMOOTH_CONST=" + SMOOTH_CONST);
194: }
195:
196: mgr.getProfiler().add(
197: "RTControllerMM1 queueRate <"
198: + stage.getStage().getName() + ">",
199: new ProfilableIF() {
200: public int profileSize() {
201: return (int) curRate;
202: }
203: });
204: mgr.getProfiler().add(
205: "RTControllerMM1 tokenBucket <"
206: + stage.getStage().getName() + ">",
207: new ProfilableIF() {
208: public int profileSize() {
209: return ((RateLimitingPredicate) pred)
210: .getBucketSize();
211: }
212: });
213: }
214:
215: System.err.println("RTControllerMM1 <"
216: + stage.getStage().getName()
217: + ">: initialized, targetRT=" + targetRT + " ms");
218: }
219:
220: public synchronized void enable() {
221: if (enabled)
222: return;
223:
224: System.err.println("RTControllerMM1 <"
225: + stage.getStage().getName() + ">: Enabling");
226: if (ADJUST_THRESHOLD) {
227: this .pred = new QueueThresholdPredicate(stage.getStage()
228: .getSink(), curThreshold);
229:
230: } else if (ADJUST_RATE) {
231: this .pred = new RateLimitingPredicate(stage.getStage()
232: .getSink(), curRate, INIT_DEPTH);
233: }
234: stage.getStage().getSink().setEnqueuePredicate(this .pred);
235: enabled = true;
236: }
237:
238: public synchronized void disable() {
239: if (!enabled)
240: return;
241: System.err.println("RTControllerMM1 <"
242: + stage.getStage().getName() + ">: Disabling");
243: this .pred = null;
244: stage.getStage().getSink().setEnqueuePredicate(null);
245: enabled = false;
246: }
247:
248: // Measure 90thRT, mu, lambda, estimate RT from model
249: public synchronized void adjustThreshold(QueueElementIF fetched[],
250: long startTime, long endTime, boolean isFirst,
251: int numThreads) {
252: // if (DEBUG) System.err.println("RTControllerMM1 <"+stage.getStage().getName()+">: adjustThreshold called, fetched.len="+fetched.length+", time="+(endTime-startTime)+", isFirst="+isFirst+", numThreads="+numThreads);
253:
254: boolean adjust_meas = false;
255: boolean adjust_est = false;
256:
257: if (MOVING_AVERAGE) {
258: avgNumThreads = (SMOOTH_CONST * avgNumThreads)
259: + ((1.0 - SMOOTH_CONST) * (double) (numThreads * 1.0));
260: } else {
261: totalNumThreads += numThreads;
262: countNumThreads++;
263: avgNumThreads = (totalNumThreads * 1.0)
264: / (countNumThreads * 1.0);
265: }
266:
267: numProcessed += fetched.length;
268: totalProcTime += endTime - startTime;
269:
270: //if (startTime < startProcTime) startProcTime = startTime;
271: //if (endTime > endProcTime) endProcTime = endTime;
272:
273: /*
274: if ((endTime <= startProcTime) || (startTime >= endProcTime)) {
275: totalProcTime += endTime - startTime;
276: startProcTime = startTime; endProcTime = endTime;
277: } else {
278: if (startTime < startProcTime) {
279: totalProcTime += startProcTime - startTime;
280: startProcTime = startTime;
281: }
282: if (endTime > endProcTime) {
283: totalProcTime += endTime - endProcTime;
284: endProcTime = endTime;
285: }
286: }
287: */
288:
289: //System.err.println("RTControllerMM1 <"+stage.getStage().getName()+"> S="+(startTime-startProcTime)+" E="+(endTime-startProcTime));
290: if (!isFirst)
291: return;
292:
293: // On every iteration reset the timespan
294: //totalProcTime += endProcTime - startProcTime;
295: //startProcTime = Long.MAX_VALUE; endProcTime = 0L;
296:
297: numReceived += sinkProxy.enqueueSuccessCount;
298: sinkProxy.enqueueSuccessCount = 0;
299:
300: // Measure actual 90th RT
301: long curtime = System.currentTimeMillis();
302: for (int i = 0; i < fetched.length; i++) {
303: if (fetched[i] instanceof TimeStampedEvent) {
304: adjust_est = true;
305: TimeStampedEvent ev = (TimeStampedEvent) fetched[i];
306: long time = ev.timestamp;
307: if (time != 0) {
308: measurements[cur_measurement] = curtime - time;
309: cur_measurement++;
310: if (cur_measurement == MEASUREMENT_SIZE) {
311: cur_measurement = 0;
312: adjust_meas = true;
313: break; // XXX MDW TESTING
314: }
315: }
316: }
317: }
318:
319: // XXX MDW: Continuously update
320: adjust_meas = true;
321: if (adjust_meas) {
322: System.arraycopy(measurements, 0, sortedmeasurements, 0,
323: MEASUREMENT_SIZE);
324: Arrays.sort(sortedmeasurements);
325: long cur = sortedmeasurements[NINETIETH];
326:
327: if (MOVING_AVERAGE) {
328: ninetiethRT = (SMOOTH_CONST * (double) ninetiethRT * 1.0)
329: + ((1.0 - SMOOTH_CONST) * ((double) (cur) * 1.0));
330: } else {
331: totalNinetiethRT += cur;
332: countNinetiethRT++;
333: ninetiethRT = (totalNinetiethRT * 1.0)
334: / (countNinetiethRT * 1.0);
335: }
336: stage.getStats().record90thRT(ninetiethRT);
337:
338: }
339:
340: // XXX MDW: Always adjust estimated lambda/mu
341: //if (!adjust_est) return;
342:
343: long elapsed = curtime - lasttime;
344: numEst++;
345: if ((numEst == ESTIMATION_SIZE) || (elapsed >= ESTIMATION_TIME)) {
346: numEst = 0;
347: } else {
348: return;
349: }
350:
351: lasttime = curtime;
352:
353: //System.err.println("RT: recv "+numReceived+" proc "+numProcessed);
354:
355: // Estimate 90th RT using M/M/1 model
356: // Assume mu scales linearly with number of threads
357: double mu_scaling = Math.log(avgNumThreads) + 1.0;
358:
359: if (DEBUG)
360: System.err.println("\nRT: numProcessed " + numProcessed
361: + " mu_scaling " + mu_scaling + " totalProcTime "
362: + totalProcTime);
363:
364: // Don't recalculate if we don't have enough data - avoid large mu
365: // spikes due to fast measurements
366: if ((totalProcTime < 2) || (numProcessed < 2))
367: return;
368:
369: // XXX TESTING
370: if (elapsed < 2)
371: return;
372:
373: double cur_mu = (numProcessed * mu_scaling * 1.0)
374: / (totalProcTime * 1.0e-3);
375: double cur_lambda = (numReceived * 1.0) / (elapsed * 1.0e-3);
376:
377: if (MOVING_AVERAGE) {
378: if (BIDIRECTIONAL_FILTER) {
379: if (cur_mu < measured_mu) {
380: measured_mu = (SMOOTH_CONST_DOWN
381: * (double) measured_mu * 1.0)
382: + ((1.0 - SMOOTH_CONST_DOWN) * cur_mu);
383: } else {
384: measured_mu = (SMOOTH_CONST_UP
385: * (double) measured_mu * 1.0)
386: + ((1.0 - SMOOTH_CONST_UP) * cur_mu);
387: }
388:
389: } else {
390: measured_mu = (SMOOTH_CONST * (double) measured_mu * 1.0)
391: + ((1.0 - SMOOTH_CONST) * cur_mu);
392: }
393:
394: measured_lambda = (SMOOTH_CONST * measured_lambda)
395: + ((1.0 - SMOOTH_CONST) * cur_lambda);
396:
397: } else {
398: total_measured_mu += numProcessed * mu_scaling;
399: count_measured_mu += totalProcTime * 1.0e-3;
400: measured_mu = total_measured_mu / count_measured_mu;
401: total_measured_lambda += numReceived;
402: count_measured_lambda += elapsed * 1.0e-3;
403: measured_lambda = total_measured_lambda
404: / count_measured_lambda;
405: }
406:
407: double rho = measured_lambda / measured_mu;
408: // XXX MDW: This is wrong - for waiting time
409: //double est = (((1.0/measured_mu)/(1.0-rho)) * Math.log(10.0)) * 1.0e3;
410: double est = (((1.0 / measured_mu) / (1.0 - rho)) * 2.3) * 1.0e3;
411: if (DEBUG)
412: System.err
413: .println("\nRT: cur_mu " + cur_mu + ", cur_lambda "
414: + cur_lambda + ", est90th " + est);
415: if (est >= 0.0) {
416: if (MOVING_AVERAGE) {
417: est_ninetiethRT = (SMOOTH_CONST
418: * (double) est_ninetiethRT * 1.0)
419: + ((1.0 - SMOOTH_CONST) * ((double) (est) * 1.0));
420: } else {
421: total_est_ninetiethRT += est;
422: count_est_ninetiethRT++;
423: est_ninetiethRT = (total_est_ninetiethRT * 1.0)
424: / (count_est_ninetiethRT * 1.0);
425: }
426: }
427:
428: numProcessed = 0;
429: numReceived = 0;
430: totalProcTime = 0L;
431:
432: if (DEBUG)
433: System.err.println("RTControllerMM1 <"
434: + stage.getStage().getName() + ">: ninetiethRT "
435: + ninetiethRT + " est "
436: + MDWUtil.format(est_ninetiethRT) + " mu "
437: + MDWUtil.format(measured_mu) + " lambda "
438: + MDWUtil.format(measured_lambda));
439:
440: if (!enabled)
441: return;
442:
443: // Now do threshold scaling
444: if (ADJUST_THRESHOLD) {
445: if (est < 0.0) {
446: // If under overload
447: if (DEBUG)
448: System.err.println("RT: Overload detected");
449: // XXX MDW TESTING: Was 0.9
450: curThreshold = MIN_THRESHOLD;
451: } else {
452: if (est_ninetiethRT < (0.5 * targetRT)) {
453: curThreshold += 1;
454: if (curThreshold > MAX_THRESHOLD)
455: curThreshold = MAX_THRESHOLD;
456: } else if (est_ninetiethRT >= (1.2 * targetRT)) {
457: curThreshold /= 2;
458: if (curThreshold < MIN_THRESHOLD)
459: curThreshold = MIN_THRESHOLD;
460: }
461: }
462: ((QueueThresholdPredicate) pred).setThreshold(curThreshold);
463: if (DEBUG)
464: System.err.println("RTControllerMM1 <"
465: + stage.getStage().getName()
466: + "> threshold now " + curThreshold);
467: }
468:
469: // Do rate scaling
470: if (ADJUST_RATE) {
471: if (est < 0.0) {
472: // If under overload
473: if (DEBUG)
474: System.err.println("RT: Overload detected");
475: // XXX MDW TESTING: Was 0.9
476: this .curRate = measured_mu * 0.1;
477: } else {
478:
479: // XXX TESTING: Maybe don't adjust if we are under the target,
480: // but, then we may be rejecting requests needlessly
481: // } else if (est_ninetiethRT >= targetRT) {
482:
483: this .curRate = measured_mu - (2.302)
484: / (targetRT / 1.0e3);
485: if (this .curRate < 0.0) {
486: // If the target is not feasible
487: if (DEBUG)
488: System.err.println("RT: Target infeasible");
489: this .curRate = measured_mu * 0.1;
490: }
491: }
492:
493: if (DEBUG_CAP_RATE) {
494: this .curRate = DEBUG_RATE;
495: }
496:
497: this .curRate = Math.max(MIN_RATE, this .curRate);
498: ((RateLimitingPredicate) pred).setTargetRate(this .curRate);
499:
500: if (DEBUG)
501: System.err.println("RTControllerMM1 <"
502: + stage.getStage().getName() + "> rate now "
503: + curRate);
504: }
505: }
506:
507: public void adjustThreshold(QueueElementIF fetched[], long procTime) {
508: throw new IllegalArgumentException("Not supported");
509: }
510:
511: }
|