001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.apps.Haboob.bottleneck;
026:
027: import seda.apps.Haboob.*;
028: import seda.apps.Haboob.http.*;
029: import seda.sandStorm.api.*;
030: import seda.sandStorm.core.*;
031: import seda.sandStorm.lib.http.*;
032: import seda.sandStorm.lib.aSocket.*;
033: import seda.sandStorm.lib.aDisk.*;
034: import seda.util.*;
035: import java.io.*;
036: import java.util.*;
037:
038: /**
039: * An intentional bottleneck stage, used for demonstrating load conditioning.
040: * Does some I/O and CPU crunching to generate a dynamic web page; also
041: * provides an adaptive load shedding controller that adjusts the stage's
042: * queue threshold to meet a response time target. All of this is described
043: * in the SOSP'01 paper on SEDA, found at
044: * http://www.cs.berkeley.edu/~mdw/proj/seda/
045: *
046: * This version implements its own threshold-based response time controller.
047: *
048: */
049: public class BottleneckDirectControl implements EventHandlerIF,
050: HaboobConst {
051:
052: private static final boolean DEBUG = false;
053: private static final boolean VERBOSE = false;
054:
055: private static final int OUTPUT_STATIC_PAGE_SIZE = 8192;
056:
057: private SinkIF sendSink;
058: private Hashtable ht;
059: private Random rand;
060: private httpOKResponse static_page_response;
061:
062: // If true, allocate a large byte array and insert into a hashtable
063: private static final boolean BOTTLENECK_ALLOC = false;
064: private static final int MAX_ALLOC_SIZE = 81920;
065:
066: // If true, sleep
067: private static final boolean BOTTLENECK_SLEEP = false;
068: private static final long SLEEP_TIME = 100;
069:
070: // If true, read data from file and process sums
071: private static final boolean BOTTLENECK_PROCESSFILE = true;
072: // If true, generate random data and process sums.
073: private static final boolean BOTTLENECK_PROCESSRANDOM = false;
074: private static final String RANDOM_FILE = "/scratch/mdw/specweb99-runs/cgi-bin/random.data";
075: private static final int NUM_RUNS = 50;
076: private static final int NUM_SUMS = 50;
077: private static final int NUM_BYTES_TO_READ = 100;
078: private volatile static int sum = 0;
079: private static byte data[] = new byte[NUM_BYTES_TO_READ];
080:
081: // Adjust queue threshold to meet a response time target
082: private double targetResponseTime = -1.0;
083: private double smoothConst = -1.0;
084: private double currentResponseTime;
085: private FiniteQueue myqueue;
086: private int curThreshold;
087: private int recalcCount = 0;
088: private static final int RECALC_WINDOW = 200;
089: private static final int MAX_THRESHOLD = 1024;
090: private static final int MIN_THRESHOLD = 1;
091:
092: public void init(ConfigDataIF config) throws Exception {
093: SinkIF mysink = config.getStage().getSink();
094:
095: // XXX Kind of a hack -- get a handle to our own finitequeue
096: try {
097: myqueue = (FiniteQueue) mysink;
098: } catch (ClassCastException cce) {
099: // Whoops - try to get a SinkProxy instead
100: myqueue = (FiniteQueue) ((seda.sandStorm.internal.SinkProxy) mysink).thesink;
101: }
102:
103: // Add profile
104: ManagerIF mgr = config.getManager();
105: mgr.getProfiler().add("Bottleneck stage response time average",
106: new ProfilableIF() {
107: public int profileSize() {
108: return (int) currentResponseTime;
109: }
110: });
111: mgr.getProfiler().add("Bottleneck stage queue threshold",
112: new ProfilableIF() {
113: public int profileSize() {
114: return curThreshold;
115: }
116: });
117:
118: sendSink = config.getManager().getStage(HTTP_SEND_STAGE)
119: .getSink();
120: ht = new Hashtable();
121: rand = new Random();
122: targetResponseTime = config.getDouble("targetResponseTime");
123: System.err
124: .println("Bottleneck stage initialized, targetResponseTime "
125: + MDWUtil.format(targetResponseTime) + " ms");
126: if (targetResponseTime != -1.0) {
127: smoothConst = config.getDouble("smoothConst");
128: if (smoothConst == -1.0)
129: smoothConst = 0.5;
130: // Start out with min threshold
131: curThreshold = MIN_THRESHOLD;
132: ((QueueThresholdPredicate) myqueue.getEnqueuePredicate())
133: .setThreshold(curThreshold);
134: }
135:
136: byte response[] = new byte[OUTPUT_STATIC_PAGE_SIZE];
137: for (int i = 0; i < OUTPUT_STATIC_PAGE_SIZE; i++) {
138: response[i] = (byte) 'A';
139: }
140: static_page_response = new httpOKResponse("text/plain",
141: OUTPUT_STATIC_PAGE_SIZE);
142: BufferElement payload = static_page_response.getPayload();
143: byte paydata[] = payload.data;
144: System.arraycopy(response, 0, paydata, payload.offset,
145: payload.size);
146: }
147:
148: public void destroy() {
149: }
150:
151: public void handleEvent(QueueElementIF item) {
152: if (DEBUG)
153: System.err.println("Bottleneck: GOT QEL: " + item);
154:
155: if (item instanceof httpRequest) {
156: HaboobStats.numRequests++;
157:
158: httpRequest req = (httpRequest) item;
159: if (req.getRequest() != httpRequest.REQUEST_GET) {
160: HaboobStats.numErrors++;
161: sendSink
162: .enqueue_lossy(new httpResponder(
163: new httpBadRequestResponse(req,
164: "Only GET requests supported at this time"),
165: req, true));
166: return;
167: }
168:
169: // Do bottleneck work
170: long t1, t2;
171: if (VERBOSE)
172: t1 = System.currentTimeMillis();
173: doBottleneck();
174: if (VERBOSE) {
175: t2 = System.currentTimeMillis();
176: System.err.println("Bottleneck: " + (t2 - t1) + " ms");
177: }
178:
179: // Check response time
180: if (targetResponseTime != -1.0) {
181: synchronized (this ) {
182: long cur = System.currentTimeMillis()
183: - req.timestamp;
184: currentResponseTime = (smoothConst * currentResponseTime)
185: + ((1.0 - smoothConst) * (cur * 1.0));
186: recalcCount++;
187: if (recalcCount == RECALC_WINDOW) {
188: recalcCount = 0;
189: if (currentResponseTime < (0.9 * targetResponseTime)) {
190: curThreshold += 2;
191: if (curThreshold > MAX_THRESHOLD)
192: curThreshold = MAX_THRESHOLD;
193: } else if (currentResponseTime > (1.1 * targetResponseTime)) {
194: curThreshold /= 2;
195: if (curThreshold < MIN_THRESHOLD)
196: curThreshold = MIN_THRESHOLD;
197: }
198: System.err.println("Bottleneck: target "
199: + MDWUtil.format(targetResponseTime)
200: + ", current "
201: + MDWUtil.format(currentResponseTime)
202: + ", threshold " + curThreshold);
203: ((QueueThresholdPredicate) myqueue
204: .getEnqueuePredicate())
205: .setThreshold(curThreshold);
206: }
207: }
208: }
209:
210: // Send response
211: httpResponder resp = new httpResponder(
212: static_page_response, req, false);
213: HttpSend.sendResponse(resp);
214: return;
215:
216: } else if (item instanceof SinkClosedEvent) {
217: // Ignore
218:
219: } else {
220: System.err.println("StaticPage: Got unknown event type: "
221: + item);
222: }
223:
224: }
225:
226: public void handleEvents(QueueElementIF items[]) {
227: if (DEBUG)
228: System.err.println("Bottleneck: " + Thread.currentThread()
229: + " got " + items.length + " events");
230:
231: for (int i = 0; i < items.length; i++) {
232: handleEvent(items[i]);
233: }
234: }
235:
236: private void doBottleneck() {
237:
238: if (BOTTLENECK_ALLOC) {
239: // Allocate big chunk of memory and stash it away
240: int sz = Math.abs(rand.nextInt()) % MAX_ALLOC_SIZE;
241: int key = Math.abs(rand.nextInt());
242: ht.put(new Integer(key), new byte[sz]);
243: }
244:
245: if (BOTTLENECK_SLEEP) {
246: MDWUtil.sleep(SLEEP_TIME);
247: }
248:
249: if (BOTTLENECK_PROCESSFILE) {
250: try {
251:
252: for (int run = 0; run < NUM_RUNS; run++) {
253: RandomAccessFile raf = new RandomAccessFile(
254: RANDOM_FILE, "r");
255: for (int i = 0; i < NUM_BYTES_TO_READ; i++) {
256: raf.read(data, 0, NUM_BYTES_TO_READ);
257: // data[i] = (byte)raf.read();
258: }
259: raf.close();
260: for (int n = 0; n < NUM_SUMS; n++) {
261: for (int i = 0; i < NUM_BYTES_TO_READ; i++) {
262: sum += data[i];
263: }
264: }
265: }
266:
267: } catch (Exception e) {
268: System.err
269: .println("Warning: Bottleneck processing got exception: "
270: + e);
271: }
272: }
273:
274: if (BOTTLENECK_PROCESSRANDOM) {
275: try {
276: Random r = new Random();
277: for (int run = 0; run < NUM_RUNS; run++) {
278: r.nextBytes(data);
279: for (int n = 0; n < NUM_SUMS; n++) {
280: for (int i = 0; i < NUM_BYTES_TO_READ; i++) {
281: sum += data[i];
282: }
283: }
284: }
285:
286: } catch (Exception e) {
287: System.err
288: .println("Warning: Bottleneck processing got exception: "
289: + e);
290: }
291: }
292:
293: }
294:
295: }
|