001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.internal;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.main.*;
031: import java.util.*;
032:
033: /**
034: * An implementation of ResponseTimeController that uses a direct
035: * adjustment of queue thresholds based on the error in the 90th
036: * percentile response time.
037: *
038: * @author Matt Welsh
039: */
040: public class ResponseTimeControllerDirect extends
041: ResponseTimeController {
042:
043: private static final boolean DEBUG = true;
044:
045: private static final boolean ADJUST_THRESHOLD = false;
046: private static final boolean ADJUST_RATE = true;
047:
048: private static final int MEASUREMENT_SIZE = 100;
049: private static final long MEASUREMENT_TIME = 1000;
050: private static final double SMOOTH_CONST = 0.7;
051: private static final int NINETIETH = (MEASUREMENT_SIZE / 10) * 9; //(int)((double)MEASUREMENT_SIZE * 0.9);
052:
053: private static final double LOW_WATER = 0.9;
054: private static final double HIGH_WATER = 1.2;
055: private static final double ADDITIVE_INCREASE = 0.5;
056: private static final double MULTIPLICATIVE_INCREASE = 1.1;
057: private static final double MULTIPLICATIVE_DECREASE = 2;
058:
059: protected final static int INIT_THRESHOLD = 1;
060: protected final static int MIN_THRESHOLD = 1;
061: protected final static int MAX_THRESHOLD = 1024;
062:
063: private static final double INIT_RATE = 10.0;
064: private static final int INIT_DEPTH = 10;
065: private static final double MAX_RATE = 5000.0;
066: private static final double MIN_RATE = 0.05;
067:
068: private long adjtime;
069: private long measurements[], sortedmeasurements[];
070: private int curThreshold, cur_measurement;
071: private double curRate;
072: private double ninetiethRT;
073: private boolean enabled;
074:
075: public ResponseTimeControllerDirect(ManagerIF mgr,
076: StageWrapperIF stage) throws IllegalArgumentException {
077: super (mgr, stage);
078:
079: this .measurements = new long[MEASUREMENT_SIZE];
080: this .sortedmeasurements = new long[MEASUREMENT_SIZE];
081: this .cur_measurement = 0;
082: this .adjtime = System.currentTimeMillis();
083:
084: // Add profile
085: mgr.getProfiler().add(
086: "RTController 90th-percentile RT <"
087: + stage.getStage().getName() + ">",
088: new ProfilableIF() {
089: public int profileSize() {
090: return (int) ninetiethRT;
091: }
092: });
093:
094: if (ADJUST_THRESHOLD) {
095: mgr.getProfiler().add(
096: "RTController queueThreshold <"
097: + stage.getStage().getName() + ">",
098: new ProfilableIF() {
099: public int profileSize() {
100: return curThreshold;
101: }
102: });
103:
104: this .pred = new QueueThresholdPredicate(stage.getStage()
105: .getSink(), MAX_THRESHOLD);
106: ((QueueThresholdPredicate) pred)
107: .setThreshold(INIT_THRESHOLD);
108: this .curThreshold = ((QueueThresholdPredicate) pred)
109: .getThreshold();
110: stage.getStage().getSink().setEnqueuePredicate(this .pred);
111:
112: System.err.println("RTControllerDirect <"
113: + stage.getStage().getName()
114: + ">: ADJUST_THRESH enabled, target=" + targetRT
115: + ", MEASUREMENT_SIZE=" + MEASUREMENT_SIZE
116: + ", SMOOTH_CONST=" + SMOOTH_CONST + ", LOW_WATER="
117: + LOW_WATER + ", HIGH_WATER=" + HIGH_WATER
118: + ", ADDITIVE_INCREASE=" + ADDITIVE_INCREASE
119: + ", MULTIPLCATIVE_DECREASE="
120: + MULTIPLICATIVE_DECREASE);
121:
122: } else if (ADJUST_RATE) {
123: mgr.getProfiler().add(
124: "RTController curRate <"
125: + stage.getStage().getName() + ">",
126: new ProfilableIF() {
127: public int profileSize() {
128: return (int) curRate;
129: }
130: });
131:
132: this .pred = new RateLimitingPredicate(stage.getStage()
133: .getSink(), INIT_RATE, INIT_DEPTH);
134: this .curRate = ((RateLimitingPredicate) pred)
135: .getTargetRate();
136: stage.getStage().getSink().setEnqueuePredicate(pred);
137:
138: System.err.println("RTControllerDirect <"
139: + stage.getStage().getName()
140: + ">: ADJUST_RATE enabled, target=" + targetRT
141: + ", MEASUREMENT_SIZE=" + MEASUREMENT_SIZE
142: + ", SMOOTH_CONST=" + SMOOTH_CONST + ", LOW_WATER="
143: + LOW_WATER + ", HIGH_WATER=" + HIGH_WATER
144: + ", ADDITIVE_INCREASE=" + ADDITIVE_INCREASE
145: + ", MULTIPLCATIVE_DECREASE="
146: + MULTIPLICATIVE_DECREASE);
147: }
148:
149: this .enabled = true;
150: }
151:
152: public synchronized void enable() {
153: if (enabled)
154: return;
155:
156: System.err.println("RTControllerDirect <"
157: + stage.getStage().getName() + ">: Enabling");
158: if (ADJUST_THRESHOLD) {
159: this .pred = new QueueThresholdPredicate(stage.getStage()
160: .getSink(), curThreshold);
161: } else if (ADJUST_RATE) {
162: this .pred = new RateLimitingPredicate(stage.getStage()
163: .getSink(), curRate, INIT_DEPTH);
164: }
165:
166: stage.getStage().getSink().setEnqueuePredicate(pred);
167: enabled = true;
168: }
169:
170: public synchronized void disable() {
171: if (!enabled)
172: return;
173: System.err.println("RTControllerDirect <"
174: + stage.getStage().getName() + ">: Disabling");
175: this .pred = null;
176: stage.getStage().getSink().setEnqueuePredicate(null);
177: enabled = false;
178: }
179:
180: public synchronized void adjustThreshold(QueueElementIF fetched[],
181: long procTime) {
182: long curtime = System.currentTimeMillis();
183: boolean adjust = false;
184:
185: for (int i = 0; i < fetched.length; i++) {
186: if (fetched[i] instanceof TimeStampedEvent) {
187: TimeStampedEvent ev = (TimeStampedEvent) fetched[i];
188: long time = ev.timestamp;
189: if (time != 0) {
190: measurements[cur_measurement] = curtime - time;
191: cur_measurement++;
192: if (cur_measurement == MEASUREMENT_SIZE) {
193: cur_measurement = 0;
194: adjust = true;
195: }
196: }
197: }
198: }
199:
200: int numsort = MEASUREMENT_SIZE;
201: if ((curtime - adjtime) >= MEASUREMENT_TIME) {
202: adjust = true;
203: numsort = cur_measurement;
204: cur_measurement = 0;
205: }
206:
207: if (!adjust)
208: return;
209: System.arraycopy(measurements, 0, sortedmeasurements, 0,
210: numsort);
211: Arrays.sort(sortedmeasurements, 0, numsort);
212: long cur = sortedmeasurements[(int) (0.9 * (double) numsort)];
213: ninetiethRT = (SMOOTH_CONST * (double) ninetiethRT * 1.0)
214: + ((1.0 - SMOOTH_CONST) * ((double) cur * 1.0));
215: stage.getStats().record90thRT(ninetiethRT);
216:
217: adjtime = curtime;
218:
219: if (!enabled)
220: return;
221:
222: if (ADJUST_THRESHOLD) {
223:
224: if (ninetiethRT < (LOW_WATER * targetRT)) {
225: curThreshold += ADDITIVE_INCREASE;
226: //curThreshold *= MULTIPLICATIVE_INCREASE;
227: if (curThreshold > MAX_THRESHOLD)
228: curThreshold = MAX_THRESHOLD;
229: } else if (ninetiethRT > (HIGH_WATER * targetRT)) {
230: curThreshold /= MULTIPLICATIVE_DECREASE;
231: if (curThreshold < MIN_THRESHOLD)
232: curThreshold = MIN_THRESHOLD;
233: }
234: if (DEBUG)
235: System.err.println("RTController <"
236: + stage.getStage().getName()
237: + ">: ninetiethRT " + ninetiethRT + " target "
238: + targetRT + " threshold " + curThreshold);
239: ((QueueThresholdPredicate) pred).setThreshold(curThreshold);
240:
241: } else if (ADJUST_RATE) {
242:
243: if (ninetiethRT < (LOW_WATER * targetRT)) {
244: curRate += ADDITIVE_INCREASE;
245: //curRate *= MULTIPLICATIVE_INCREASE;
246: if (curRate > MAX_RATE)
247: curRate = MAX_RATE;
248: } else if (ninetiethRT > (HIGH_WATER * targetRT)) {
249: curRate /= MULTIPLICATIVE_DECREASE;
250: if (curRate < MIN_RATE)
251: curRate = MIN_RATE;
252: }
253: if (DEBUG)
254: System.err.println("RTController <"
255: + stage.getStage().getName()
256: + ">: ninetiethRT " + ninetiethRT + " target "
257: + targetRT + " rate now " + curRate);
258: ((RateLimitingPredicate) pred).setTargetRate(this.curRate);
259:
260: }
261:
262: }
263:
264: }
|