001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.internal;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.main.*;
031: import seda.util.*;
032: import java.util.*;
033:
034: /**
035: * An implementation of ResponseTimeController that uses a direct
036: * adjustment of queue thresholds based on the error in the 90th
037: * percentile response time. Allows multiple class SLAs.
038: *
039: * @author Matt Welsh
040: */
041: public class ResponseTimeControllerMulticlass extends
042: ResponseTimeControllerDirect {
043:
044: private static final boolean DEBUG = true;
045:
046: private static final int MEASUREMENT_SIZE = 100;
047: private static final long MEASUREMENT_TIME = 1000;
048: private static final double SMOOTH_CONST = 0.7;
049: private static final int NINETIETH = (MEASUREMENT_SIZE / 10) * 9; //(int)((double)MEASUREMENT_SIZE * 0.9);
050:
051: private static final double LOW_WATER = -0.1;
052: private static final double HIGH_WATER = 0.0;
053: private static final double ADDITIVE_INCREASE = 2.0;
054: private static final double MULTIPLICATIVE_INCREASE = 1.1;
055: private static final double MULTIPLICATIVE_DECREASE = 1.2; //2
056: private static final double MULTIPLICATIVE_DECREASE_LOWPRIO = 10;
057: private static final double MULTIPLICATIVE_DECREASE_HIPRIO = 1.2;
058:
059: private static final double INIT_RATE = 100.0;
060: private static final int INIT_DEPTH = 1;
061: private static final double MAX_RATE = 5000.0;
062: private static final double MIN_RATE = 0.05;
063: private static final double MIN_90th = 1.0e-5;
064: private static final int LOWCOUNT_THRESH = 20;
065:
066: private static final boolean SAVE_MAX_RATE = false;
067:
068: private static final int MAX_CLASSES = 10;
069: private int NUM_CLASSES;
070:
071: private String name;
072: private boolean enabled;
073: private cinfo carr[];
074:
075: class cinfo {
076: int theclass;
077: double adjtime;
078: double targetRT;
079: long measurements[], sortedmeasurements[];
080: int cur_measurement = 0;
081: int num_measurements = 0;
082: double curRate;
083: int lowCount = 0;
084: double ninetiethRT;
085: double err, last_err;
086: boolean adjust = false;
087: boolean preempted = false;
088: double maxRate = -1.0;
089: boolean last_increased = false;
090:
091: cinfo(int theclass, double target) {
092: this .theclass = theclass;
093: this .targetRT = target;
094: this .measurements = new long[MEASUREMENT_SIZE];
095: this .sortedmeasurements = new long[MEASUREMENT_SIZE];
096:
097: this .curRate = ((MulticlassRateLimitingPredicate) pred)
098: .getTargetRate(theclass);
099: this .adjtime = System.currentTimeMillis();
100: System.err.println("RTControllerMulticlass: Class "
101: + theclass + " targetRT " + targetRT + ", curRate "
102: + this .curRate);
103: }
104:
105: void setRate(double newrate) {
106: curRate = newrate;
107: if (curRate < MIN_RATE)
108: curRate = MIN_RATE;
109: if (curRate > MAX_RATE)
110: curRate = MAX_RATE;
111: if (SAVE_MAX_RATE && maxRate > 0.0 && curRate > maxRate)
112: curRate = maxRate;
113: ((MulticlassRateLimitingPredicate) pred).setTargetRate(
114: theclass, curRate);
115: }
116:
117: void addMeasurement(long time) {
118: measurements[cur_measurement] = time;
119: cur_measurement++;
120: num_measurements++;
121: if (cur_measurement == MEASUREMENT_SIZE) {
122: cur_measurement = 0;
123: adjust = true;
124: }
125: }
126:
127: void record90th(int numsort, long curtime) {
128: System.arraycopy(measurements, 0, sortedmeasurements, 0,
129: numsort);
130: Arrays.sort(sortedmeasurements, 0, numsort);
131: long cur = sortedmeasurements[(int) (0.9 * (double) numsort)];
132: ninetiethRT = (SMOOTH_CONST * (double) ninetiethRT * 1.0)
133: + ((1.0 - SMOOTH_CONST) * ((double) cur * 1.0));
134: if (ninetiethRT < MIN_90th)
135: ninetiethRT = 0;
136:
137: if (theclass == 0)
138: stage.getStats().record90thRT(ninetiethRT);
139: adjtime = curtime;
140:
141: // Avoid timeout causing us to always read old value
142: if (cur_measurement == 0)
143: sortedmeasurements[0] = 0L;
144: }
145:
146: boolean adjust(long curtime) {
147:
148: int numsort = MEASUREMENT_SIZE;
149: if (num_measurements > 0
150: && (curtime - adjtime) >= MEASUREMENT_TIME) {
151: adjust = true;
152: numsort = cur_measurement;
153: cur_measurement = 0;
154: }
155:
156: if (!adjust)
157: return false;
158: adjust = false;
159:
160: record90th(numsort, curtime);
161:
162: if (!enabled)
163: return false;
164: if (targetRT == -1)
165: return false;
166:
167: num_measurements = 0;
168: err = (ninetiethRT - targetRT) / targetRT;
169:
170: if (err < LOW_WATER) {
171: // We are below our target - increase our rate only
172:
173: if (preempted) {
174: // Not allowed to increase - preempted by higher priority
175: preempted = false;
176: return false;
177: }
178:
179: setRate(curRate + rateAdd(err));
180: last_increased = true;
181: lowCount = 0;
182:
183: } else if (err > HIGH_WATER) {
184: // We are above our target - reduce rates of all lower classes
185: boolean found = false;
186: for (int c2 = 0; c2 < theclass; c2++) {
187: cinfo ci2 = carr[c2];
188:
189: ci2.preempted = true;
190: if (ci2.curRate > MIN_RATE) {
191: found = true;
192: ci2.setRate(ci2.curRate
193: / MULTIPLICATIVE_DECREASE_LOWPRIO);
194: }
195: }
196:
197: if (found)
198: lowCount = 0;
199:
200: if (!found
201: && ((++lowCount >= LOWCOUNT_THRESH) || (theclass == 0))) {
202: // Didn't find anyone else to penalize; adjust ourselves
203: setRate(curRate / MULTIPLICATIVE_DECREASE);
204: if (last_increased)
205: maxRate = curRate;
206: last_increased = false;
207:
208: } else {
209: // Found someone else to penalize or not at LOWCOUNT_THRESH
210: setRate(curRate / MULTIPLICATIVE_DECREASE_HIPRIO);
211: last_increased = false;
212: }
213:
214: } else {
215: last_increased = false;
216: }
217:
218: return true;
219: }
220:
221: }
222:
223: public ResponseTimeControllerMulticlass(ManagerIF mgr,
224: StageWrapperIF stage) throws IllegalArgumentException {
225: super (mgr, stage);
226:
227: this .name = stage.getStage().getName();
228:
229: SandstormConfig config = mgr.getConfig();
230: // First count number of classes
231: for (int c = 0; c < MAX_CLASSES; c++) {
232: double t = config
233: .getDouble("stages." + name
234: + ".rtController.multiclass.class" + (c)
235: + "Target");
236: if (t == -1) {
237: t = config
238: .getDouble("global.rtController.multiclass.class"
239: + c + "Target");
240: }
241: if (t != -1)
242: NUM_CLASSES++;
243: }
244:
245: if (NUM_CLASSES == 0) {
246: NUM_CLASSES = 1;
247: }
248:
249: this .pred = new MulticlassRateLimitingPredicate(stage
250: .getStage().getSink(), NUM_CLASSES, INIT_RATE,
251: INIT_DEPTH);
252: stage.getStage().getSink().setEnqueuePredicate(pred);
253:
254: this .carr = new cinfo[NUM_CLASSES];
255: for (int c = 0; c < NUM_CLASSES; c++) {
256: double t = config
257: .getDouble("stages." + name
258: + ".rtController.multiclass.class" + (c)
259: + "Target");
260: if (t == -1) {
261: t = config
262: .getDouble("global.rtController.multiclass.class"
263: + c + "Target");
264: }
265: this .carr[c] = new cinfo(c, t);
266: }
267:
268: System.err
269: .println("RTControllerMulticlass9 <" + name
270: + ">: MEASUREMENT_SIZE=" + MEASUREMENT_SIZE
271: + ", SMOOTH_CONST=" + SMOOTH_CONST
272: + ", LOW_WATER=" + LOW_WATER + ", HIGH_WATER="
273: + HIGH_WATER + ", ADDITIVE_INCREASE="
274: + ADDITIVE_INCREASE
275: + ", MULTIPLCATIVE_DECREASE="
276: + MULTIPLICATIVE_DECREASE);
277:
278: this .enabled = true;
279: }
280:
281: // Additive increase function
282: private double rateAdd(double err) {
283: // LOW_WATER gets increase of 0
284: if (err > -0.5)
285: return 0;
286: else
287: return ADDITIVE_INCREASE * ((-1.0 * err) + LOW_WATER);
288: }
289:
290: public synchronized void adjustThreshold(QueueElementIF fetched[],
291: long procTime) {
292: long curtime = System.currentTimeMillis();
293:
294: for (int i = 0; i < fetched.length; i++) {
295: if (fetched[i] instanceof TimeStampedEvent) {
296: TimeStampedEvent ev = (TimeStampedEvent) fetched[i];
297: long time = ev.timestamp;
298: if (time != 0) {
299: int theclass = 0;
300: if (ev instanceof ClassQueueElementIF) {
301: ClassQueueElementIF cqel = (ClassQueueElementIF) ev;
302: theclass = cqel.getRequestClass();
303: if (theclass == -1)
304: theclass = 0;
305: }
306: carr[theclass].addMeasurement(curtime - time);
307: }
308: }
309: }
310:
311: boolean adjusted_any = false;
312:
313: for (int c = NUM_CLASSES - 1; c >= 0; c--) {
314: if (carr[c].adjust(curtime)) {
315: adjusted_any = true;
316: }
317: }
318:
319: if (adjusted_any) {
320: for (int c = 0; c < NUM_CLASSES; c++) {
321: if (DEBUG)
322: System.err.println("RTController <" + name
323: + "> class " + c + ": ninetiethRT "
324: + MDWUtil.format(carr[c].ninetiethRT)
325: + " target "
326: + MDWUtil.format(carr[c].targetRT)
327: + " rate now "
328: + MDWUtil.format(carr[c].curRate));
329: }
330: }
331:
332: }
333:
334: }
|