001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.internal;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.main.*;
031: import java.util.*;
032: import seda.util.*;
033:
034: /**
035: * AggThrottle is used by thread managers to adjust their aggregation
036: * level based on observations of stage throughput.
037: *
038: * @author Matt Welsh
039: */
040: class AggThrottle {
041:
042: private static final boolean DEBUG = false;
043:
044: private StageWrapperIF stage;
045: private String name;
046: private ManagerIF mgr;
047:
048: private double bestThroughput, lastThroughput;
049: private int bestTarget;
050: private long lastEvents;
051: private long lastMeasurementTime;
052: private int measurementCount, adjustCount;
053:
054: private static final int STATE_DECREASING = 0;
055: private static final int STATE_INCREASING = 1;
056: private int state = STATE_DECREASING;
057: private int increase_count = 0;
058:
059: private static final int ADJUST_DELAY = 5;
060:
061: private int minAggregation = 8;
062: // private int maxAggregation = -1;
063: private int maxAggregation = 1000;
064: private int recalcWindow = 1000;
065: private double smoothConst = 0.7;
066:
067: private static final double REDUCE_FACTOR = 1.2;
068: private static final double INCREASE_FACTOR = 1.2;
069: private static final double LOW_WATER = 0.90;
070: private static final double HIGH_WATER = 0.98;
071: private static final double VERY_LOW_WATER = 0.2;
072: private static final double VERY_HIGH_WATER = 2.0;
073:
074: private int aggregationTarget;
075: private Random rand = new Random();
076:
077: AggThrottle(StageWrapperIF stage, ManagerIF mgr) {
078: this .stage = stage;
079: this .name = stage.getStage().getName();
080: this .mgr = mgr;
081: SandstormConfig config = mgr.getConfig();
082:
083: this .minAggregation = config.getInt(
084: "global.batchController.minBatch", minAggregation);
085: this .maxAggregation = config.getInt(
086: "global.batchController.maxBatch", maxAggregation);
087: // this.recalcWindow = config.getInt("global.batchController.recalcWindow",
088: // recalcWindow);
089: this .smoothConst = config.getDouble(
090: "global.batchController.smoothConst", smoothConst);
091:
092: System.err.println("AggThrottle <" + name
093: + "> created: minBatch " + minAggregation
094: + ", maxBatch " + maxAggregation + ", recalcWindow "
095: + recalcWindow);
096: this .aggregationTarget = this .maxAggregation;
097:
098: lastThroughput = 0.0;
099: bestThroughput = 0.0;
100: bestTarget = aggregationTarget;
101: lastEvents = 0;
102: lastMeasurementTime = System.currentTimeMillis();
103: measurementCount = adjustCount = 0;
104:
105: mgr.getProfiler().add(
106: "AggThrottle throughput for <" + name + ">",
107: new ProfilableIF() {
108: public int profileSize() {
109: //int foo = getAggTarget(); // Recalculate
110: return (int) lastThroughput;
111: }
112: });
113: mgr.getProfiler().add(
114: "AggThrottle bestThroughput for <" + name + ">",
115: new ProfilableIF() {
116: public int profileSize() {
117: //int foo = getAggTarget(); // Recalculate
118: return (int) bestThroughput;
119: }
120: });
121: mgr.getProfiler().add(
122: "AggThrottle aggTarget for <" + name + ">",
123: new ProfilableIF() {
124: public int profileSize() {
125: //int foo = getAggTarget(); // Recalculate
126: return aggregationTarget;
127: }
128: });
129: }
130:
131: public String toString() {
132: return "AggThrottle <" + name + ">";
133: }
134:
135: synchronized int getAggTarget() {
136:
137: long cur_time = System.currentTimeMillis();
138: long time_elapsed = cur_time - lastMeasurementTime;
139:
140: if (time_elapsed < recalcWindow) {
141: return aggregationTarget;
142: }
143:
144: // measurementCount++;
145: // if ((measurementCount % recalcWindow) != 0) {
146: // return aggregationTarget;
147: // }
148:
149: long events = stage.getStats().getTotalEvents();
150: long curEvents = events - lastEvents;
151: lastEvents = events;
152:
153: lastMeasurementTime = cur_time;
154:
155: double throughput = (curEvents * 1.0)
156: / ((double) time_elapsed * 1.0e-3);
157: double avgThroughput = (smoothConst * lastThroughput)
158: + ((1.0 - smoothConst) * throughput);
159:
160: adjustCount++;
161: if ((adjustCount % ADJUST_DELAY) == 0) {
162:
163: if (avgThroughput < (VERY_LOW_WATER * bestThroughput)) {
164: aggregationTarget = maxAggregation;
165: state = STATE_DECREASING;
166: }
167:
168: if (avgThroughput >= (VERY_HIGH_WATER * bestThroughput)) {
169: aggregationTarget = maxAggregation;
170: state = STATE_DECREASING;
171: }
172:
173: if (state == STATE_DECREASING) {
174: if (avgThroughput <= (LOW_WATER * bestThroughput)) {
175: // Fell below low water - increase
176: //bestThroughput = avgThroughput;
177: state = STATE_INCREASING;
178: aggregationTarget *= INCREASE_FACTOR;
179: if (aggregationTarget > maxAggregation)
180: aggregationTarget = maxAggregation;
181: } else if (avgThroughput > bestThroughput) {
182: // Better throughput - save and decrease
183: bestThroughput = avgThroughput;
184: aggregationTarget /= REDUCE_FACTOR;
185: if (aggregationTarget < minAggregation)
186: aggregationTarget = minAggregation;
187: } else {
188: // Just decrease
189: aggregationTarget /= REDUCE_FACTOR;
190: if (aggregationTarget < minAggregation)
191: aggregationTarget = minAggregation;
192: }
193:
194: } else if (state == STATE_INCREASING) {
195: if (avgThroughput > bestThroughput) {
196: // Better throughput - save
197: bestThroughput = avgThroughput;
198: }
199: if (avgThroughput >= (HIGH_WATER * bestThroughput)) {
200: // Start decreasing
201: state = STATE_DECREASING;
202: aggregationTarget /= REDUCE_FACTOR;
203: if (aggregationTarget < minAggregation)
204: aggregationTarget = minAggregation;
205: // } else if (avgThroughput <= (LOW_WATER*bestThroughput)) {
206: // Fell below low water - decrease
207: //bestThroughput = avgThroughput;
208: // state = STATE_DECREASING;
209: // aggregationTarget /= REDUCE_FACTOR;
210: // if (aggregationTarget < minAggregation) aggregationTarget = minAggregation;
211: } else {
212: // Just increase
213: aggregationTarget *= INCREASE_FACTOR;
214: if (aggregationTarget > maxAggregation) {
215: // Maxed out, so save best throughput and start decreasing
216: aggregationTarget = maxAggregation;
217: state = STATE_DECREASING;
218: bestThroughput = avgThroughput;
219: }
220: }
221: }
222:
223: // Randomly reset best estimate if not below LOW_WATER
224: // if (rand.nextDouble() <= 0.2) {
225: // if (avgThroughput >= (LOW_WATER*bestThroughput)) {
226: // bestThroughput = avgThroughput;
227: // }
228: // }
229:
230: // Randomly switch direction
231: if (rand.nextDouble() <= 0.0) {
232: if (state == STATE_INCREASING) {
233: state = STATE_DECREASING;
234: aggregationTarget /= REDUCE_FACTOR;
235: if (aggregationTarget < minAggregation)
236: aggregationTarget = minAggregation;
237: } else {
238: state = STATE_INCREASING;
239: aggregationTarget *= INCREASE_FACTOR;
240: if (aggregationTarget > maxAggregation)
241: aggregationTarget = maxAggregation;
242: }
243: }
244:
245: // Randomly reset
246: if (rand.nextDouble() <= 0.00) {
247: state = STATE_DECREASING;
248: aggregationTarget = maxAggregation;
249: bestThroughput = 0.0;
250: }
251: }
252:
253: if (DEBUG)
254: System.err.println("AggThrottle <" + name
255: + ">: avgThroughput "
256: + MDWUtil.format(avgThroughput) + ", last "
257: + MDWUtil.format(lastThroughput) + ", state "
258: + ((state == 0) ? "dec" : "inc") + ", aggTarget "
259: + aggregationTarget);
260:
261: //if ((adjustCount % ADJUST_DELAY) == 0) lastThroughput = avgThroughput;
262: lastThroughput = avgThroughput;
263: return aggregationTarget;
264: }
265:
266: }
|