001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.internal;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.main.*;
031: import seda.util.*;
032: import java.util.*;
033:
034: /**
035: * An implementation of ResponseTimeController that uses a PID control.
036: *
037: * @author Matt Welsh
038: */
039: public class ResponseTimeControllerPID extends ResponseTimeController {
040:
041: private static final boolean DEBUG = true;
042:
043: private static final boolean ADJUST_THRESHOLD = false;
044: private static final boolean ADJUST_RATE = true;
045:
046: private static final boolean BE_CREATIVE = false;
047:
048: private static final int MEASUREMENT_SIZE = 100;
049: private static final long MEASUREMENT_TIME = 5000;
050: private static final double SMOOTH_CONST = 0.8;
051: private static final double PROP_GAIN = 1.0;
052: private static final double DERIV_GAIN = -0.5;
053: private static final double INTR_GAIN = (0.2 / MEASUREMENT_SIZE);
054: private static final int NINETIETH = (MEASUREMENT_SIZE / 10) * 9; //(int)((double)MEASUREMENT_SIZE * 0.9);
055:
056: protected final static int INIT_THRESHOLD = 1;
057: protected final static int MIN_THRESHOLD = 1;
058: protected final static int MAX_THRESHOLD = 1024;
059:
060: private static final double INIT_RATE = 10.0;
061: private static final int INIT_DEPTH = 10;
062: private static final double MAX_RATE = 5000.0;
063: private static final double MIN_RATE = 0.05;
064:
065: private SinkProxy sinkProxy;
066: private long measurements[], sortedmeasurements[];
067: private double errors[], lasterr, lastinterr, totalinterr;
068: private int curThreshold, cur_measurement, cur_error;
069: private long numReceived;
070: private double curRate;
071: private double ninetiethRT, lambda;
072: private long adjtime;
073: private boolean enabled;
074:
075: public ResponseTimeControllerPID(ManagerIF mgr, StageWrapperIF stage)
076: throws IllegalArgumentException {
077: super (mgr, stage);
078: this .adjtime = System.currentTimeMillis();
079: this .sinkProxy = (SinkProxy) stage.getStage().getSink();
080: this .measurements = new long[MEASUREMENT_SIZE];
081: this .sortedmeasurements = new long[MEASUREMENT_SIZE];
082: this .errors = new double[MEASUREMENT_SIZE];
083: this .cur_measurement = 0;
084: this .cur_error = 0;
085:
086: // Add profile
087: mgr.getProfiler().add(
088: "RTControllerPID 90th-percentile RT <"
089: + stage.getStage().getName() + ">",
090: new ProfilableIF() {
091: public int profileSize() {
092: return (int) ninetiethRT;
093: }
094: });
095:
096: if (ADJUST_THRESHOLD) {
097:
098: mgr.getProfiler().add(
099: "RTControllerPID queue threshold <"
100: + stage.getStage().getName() + ">",
101: new ProfilableIF() {
102: public int profileSize() {
103: return curThreshold;
104: }
105: });
106:
107: this .pred = new QueueThresholdPredicate(stage.getStage()
108: .getSink(), MAX_THRESHOLD);
109: ((QueueThresholdPredicate) pred)
110: .setThreshold(INIT_THRESHOLD);
111: this .curThreshold = ((QueueThresholdPredicate) pred)
112: .getThreshold();
113: stage.getStage().getSink().setEnqueuePredicate(this .pred);
114:
115: System.err.println("RTControllerPID <"
116: + stage.getStage().getName()
117: + ">: ADJUST_THRESH enabled, MEASUREMENT_SIZE="
118: + MEASUREMENT_SIZE + ", SMOOTH_CONST="
119: + SMOOTH_CONST + ", PROP_GAIN=" + PROP_GAIN
120: + ", DERIV_GAIN=" + DERIV_GAIN + ", INTR_GAIN="
121: + INTR_GAIN);
122:
123: } else if (ADJUST_RATE) {
124:
125: this .pred = new RateLimitingPredicate(stage.getStage()
126: .getSink(), INIT_RATE, INIT_DEPTH);
127: this .curRate = ((RateLimitingPredicate) pred)
128: .getTargetRate();
129: stage.getStage().getSink().setEnqueuePredicate(pred);
130:
131: System.err.println("RTControllerPID <"
132: + stage.getStage().getName()
133: + ">: ADJUST_RATE enabled, MEASUREMENT_SIZE="
134: + MEASUREMENT_SIZE + ", SMOOTH_CONST="
135: + SMOOTH_CONST + ", PROP_GAIN=" + PROP_GAIN
136: + ", DERIV_GAIN=" + DERIV_GAIN + ", INTR_GAIN="
137: + INTR_GAIN);
138: }
139:
140: this .enabled = true;
141: }
142:
143: public synchronized void enable() {
144: if (enabled)
145: return;
146:
147: System.err.println("RTControllerPID <"
148: + stage.getStage().getName() + ">: Enabling");
149: if (ADJUST_THRESHOLD) {
150: this .pred = new QueueThresholdPredicate(stage.getStage()
151: .getSink(), curThreshold);
152:
153: } else if (ADJUST_RATE) {
154: this .pred = new RateLimitingPredicate(stage.getStage()
155: .getSink(), curRate, INIT_DEPTH);
156: }
157:
158: stage.getStage().getSink().setEnqueuePredicate(this .pred);
159: enabled = true;
160: }
161:
162: public synchronized void disable() {
163: if (!enabled)
164: return;
165: System.err.println("RTControllerPID <"
166: + stage.getStage().getName() + ">: Disabling");
167: this .pred = null;
168: stage.getStage().getSink().setEnqueuePredicate(null);
169: enabled = false;
170: }
171:
172: public synchronized void adjustThreshold(QueueElementIF fetched[],
173: long procTime) {
174: long curtime = System.currentTimeMillis();
175: boolean adjust = false;
176:
177: for (int i = 0; i < fetched.length; i++) {
178: if (fetched[i] instanceof TimeStampedEvent) {
179: TimeStampedEvent ev = (TimeStampedEvent) fetched[i];
180: long time = ev.timestamp;
181: if (time != 0) {
182: measurements[cur_measurement] = curtime - time;
183: cur_measurement++;
184: if (cur_measurement == MEASUREMENT_SIZE) {
185: cur_measurement = 0;
186: adjust = true;
187: }
188: }
189: }
190: }
191:
192: int numsort = MEASUREMENT_SIZE;
193: long elapsed = curtime - adjtime;
194: if (elapsed >= MEASUREMENT_TIME) {
195: adjust = true;
196: numsort = cur_measurement;
197: cur_measurement = 0;
198: }
199:
200: if (!adjust)
201: return;
202: System.arraycopy(measurements, 0, sortedmeasurements, 0,
203: numsort);
204: Arrays.sort(sortedmeasurements, 0, numsort);
205: long cur = sortedmeasurements[(int) (0.9 * (double) numsort)];
206: ninetiethRT = (SMOOTH_CONST * (double) ninetiethRT * 1.0)
207: + ((1.0 - SMOOTH_CONST) * ((double) cur * 1.0));
208: adjtime = curtime;
209: stage.getStats().record90thRT(ninetiethRT);
210:
211: int numReceived = sinkProxy.enqueueSuccessCount;
212: sinkProxy.enqueueSuccessCount = 0;
213: double cur_lambda = (numReceived * 1.0) / (elapsed * 1.0e-3);
214: lambda = (SMOOTH_CONST * lambda)
215: + ((1.0 - SMOOTH_CONST) * cur_lambda);
216:
217: // Apply PID control
218: double err = (targetRT - ninetiethRT) / targetRT;
219: double derr = (err - lasterr) / (double) (elapsed * 1.0e-3);
220: double interr = (((lasterr + err) / 2) * (double) ((elapsed) * 1.0e-3));
221: lasterr = err;
222: adjtime = curtime;
223:
224: totalinterr -= errors[cur_error];
225: totalinterr += interr;
226: errors[cur_error] = interr;
227: cur_error++;
228: if (cur_error == MEASUREMENT_SIZE)
229: cur_error = 0;
230:
231: // interr -= errors[cur_error];
232: // errors[cur_error] = interr;
233: // cur_error++; if (cur_error == MEASUREMENT_SIZE) cur_error = 0;
234: // lasterr = err; lastinterr = interr; adjtime = curtime;
235:
236: double out;
237: if (BE_CREATIVE) {
238: out = (PROP_GAIN * err * err);
239: if (err < 0) {
240: out *= -1;
241: }
242: } else {
243: out = ((PROP_GAIN * err) + (DERIV_GAIN * derr) + (INTR_GAIN * totalinterr));
244: }
245:
246: if (DEBUG)
247: System.err.println("RTControllerPID <"
248: + stage.getStage().getName() + ">: lambda "
249: + MDWUtil.format(lambda) + " 90th "
250: + MDWUtil.format(ninetiethRT) + " err "
251: + MDWUtil.format(err) + " derr "
252: + MDWUtil.format(derr) + " interr "
253: + MDWUtil.format(totalinterr) + " out "
254: + MDWUtil.format(out));
255:
256: if (!enabled)
257: return;
258:
259: if (ADJUST_THRESHOLD) {
260: curThreshold += out;
261: //curThreshold = (int)((MIN_THRESHOLD) + ((MAX_THRESHOLD - MIN_THRESHOLD) * out));
262: if (curThreshold < MIN_THRESHOLD)
263: curThreshold = MIN_THRESHOLD;
264: if (curThreshold > MAX_THRESHOLD)
265: curThreshold = MAX_THRESHOLD;
266:
267: if (DEBUG)
268: System.err.println("RTControllerPID <"
269: + stage.getStage().getName()
270: + ">: ninetiethRT " + ninetiethRT + " target "
271: + targetRT + " threshold " + curThreshold);
272: ((QueueThresholdPredicate) pred).setThreshold(curThreshold);
273:
274: } else if (ADJUST_RATE) {
275:
276: if (BE_CREATIVE) {
277: if (out < 0) {
278: //curRate /= (out * -1);
279: curRate /= 2;
280: } else {
281: curRate += out;
282: }
283: } else {
284: curRate += out;
285: }
286:
287: curRate = Math.max(MIN_RATE, curRate);
288: curRate = Math.min(MAX_RATE, curRate);
289: ((RateLimitingPredicate) pred).setTargetRate(this .curRate);
290:
291: if (DEBUG)
292: System.err.println("RTControllerPID <"
293: + stage.getStage().getName()
294: + ">: ninetiethRT " + ninetiethRT + " target "
295: + targetRT + " rate now " + curRate);
296:
297: }
298:
299: }
300:
301: }
|