001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.apps.Haboob.cache;
026:
027: import seda.apps.Haboob.*;
028: import seda.sandStorm.api.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.lib.http.*;
031: import seda.sandStorm.lib.aSocket.*;
032: import seda.sandStorm.lib.aDisk.*;
033: import seda.util.*;
034: import java.io.*;
035: import java.util.*;
036:
037: /**
038: * This implementation of the Haboob cache maintains a fixed-size set
039: * of buffers that cache recently-accessed Web pages. It turns out this
040: * approach does not work well -- see PageCacheSized for a better
041: * alternative. (This implementation may be broken.)
042: */
043: public class BufferCache implements EventHandlerIF, HaboobConst {
044:
045: private static final boolean DEBUG = false;
046:
047: // Don't actually read file; just store empty buffer in cache
048: private static final boolean DEBUG_NO_FILE_READ = true;
049: // Don't even stat file; just allocate buffer of fixed size
050: private static final boolean DEBUG_NO_FILE_READ_SAMESIZE = true;
051: // Send static httpOKResponse coupled with buffer
052: private static final boolean DEBUG_NO_FILE_READ_RESP = true;
053: // Send static httpOKResponse coupled with buffer, but payload and
054: // header separate
055: private static final boolean DEBUG_NO_FILE_READ_RESP_TWOWRITES = true;
056:
057: private String DEFAULT_URL;
058: private String ROOT_DIR;
059:
060: private SinkIF mysink, sendSink;
061: private Hashtable aFileTbl; // Map aFile -> outstandingRead
062: private int bufferSize;
063: private int numBuffers;
064: private ssLinkedList freeResps; // List of free static responses
065: private ssLinkedList freeBuffers; // List of free buffers
066: private ssLinkedList waitingRequests; // Requests waiting for free buffer
067:
068: private Hashtable mimeTbl; // Filename extension -> MIME type
069: private static final String defaultMimeType = "text/plain";
070:
071: public void init(ConfigDataIF config) throws Exception {
072: mysink = config.getStage().getSink();
073: sendSink = config.getManager().getStage(HTTP_SEND_STAGE)
074: .getSink();
075: aFileTbl = new Hashtable();
076:
077: mimeTbl = new Hashtable();
078: mimeTbl.put(".html", "text/html");
079: mimeTbl.put(".gif", "image/gif");
080: mimeTbl.put(".jpg", "image/jpeg");
081: mimeTbl.put(".jpeg", "image/jpeg");
082: mimeTbl.put(".pdf", "application/pdf");
083:
084: DEFAULT_URL = config.getString("defaultURL");
085: if (DEFAULT_URL == null)
086: throw new IllegalArgumentException(
087: "Must specify defaultURL");
088: ROOT_DIR = config.getString("rootDir");
089: if (ROOT_DIR == null)
090: throw new IllegalArgumentException("Must specify rootDir");
091:
092: bufferSize = config.getInt("bufferSize");
093: if (bufferSize == -1)
094: throw new IllegalArgumentException(
095: "Must specify bufferSize");
096: numBuffers = config.getInt("numBuffers");
097: if (numBuffers == -1)
098: throw new IllegalArgumentException(
099: "Must specify numBuffers");
100:
101: // Allocate buffer pool
102: freeBuffers = new ssLinkedList();
103: freeResps = new ssLinkedList();
104: for (int i = 0; i < numBuffers; i++) {
105: BufferElement buf;
106: if (DEBUG_NO_FILE_READ_RESP) {
107: if (DEBUG_NO_FILE_READ_RESP_TWOWRITES) {
108: buf = new BufferElement(bufferSize);
109: httpOKResponse resp = new httpOKResponse(
110: "text/html", buf);
111: buf.userTag = resp;
112: freeResps.add_to_tail(resp);
113: } else {
114: httpOKResponse resp = new httpOKResponse(
115: "text/html", bufferSize, mysink);
116: buf = resp.getPayload();
117: freeResps.add_to_tail(resp);
118: resp.getBuffers(true)[0].userTag = resp;
119: }
120: } else {
121: buf = new BufferElement(bufferSize);
122: }
123: buf.compQ = mysink;
124: freeBuffers.add_to_tail(buf);
125: }
126: waitingRequests = new ssLinkedList();
127: System.err.println("BufferCache: Started with " + numBuffers
128: + " buffers of " + bufferSize + " bytes each");
129: }
130:
131: public void destroy() {
132: }
133:
134: public void handleEvent(QueueElementIF item) {
135: if (DEBUG)
136: System.err.println("BufferCache: GOT QEL: " + item);
137:
138: if (item instanceof httpRequest) {
139: HaboobStats.numRequests++;
140:
141: httpRequest req = (httpRequest) item;
142: if (req.getRequest() != httpRequest.REQUEST_GET) {
143: HaboobStats.numErrors++;
144: sendSink
145: .enqueue_lossy(new httpResponder(
146: new httpBadRequestResponse(req,
147: "Only GET requests supported at this time"),
148: req, true));
149: return;
150: }
151:
152: handleRequest(req);
153:
154: } else if (item instanceof AFileIOCompleted) {
155: AFileIOCompleted comp = (AFileIOCompleted) item;
156: AFile af = comp.getFile();
157: outstandingRead or = (outstandingRead) aFileTbl.get(af);
158: // Could be null if the connection was closed while we did a file read
159: if (or != null)
160: or.processFileIO(comp);
161:
162: } else if (item instanceof SinkDrainedEvent) {
163: if (DEBUG)
164: System.err.println("BufferCache: Got SDE: " + item);
165:
166: SinkDrainedEvent sde = (SinkDrainedEvent) item;
167: BufferElement buf = (BufferElement) (sde.element);
168: outstandingRead or = null;
169: if (!DEBUG_NO_FILE_READ_SAMESIZE) {
170: or = (outstandingRead) (buf.userTag);
171: } else if (DEBUG_NO_FILE_READ_RESP) {
172: httpOKResponse resp = (httpOKResponse) (buf.userTag);
173: freeResps.add_to_tail(resp);
174: } else {
175: freeBuffer(buf);
176: }
177: if (DEBUG_NO_FILE_READ_SAMESIZE || or.processNetIO(sde)) {
178: if (DEBUG)
179: System.err
180: .println("BufferCache: Finished with buffer, "
181: + waitingRequests.size()
182: + " waiters, "
183: + freeBuffers.size()
184: + " freebufs");
185: // Finished with buffer, fire off next waiter
186: httpRequest req;
187: boolean ret = false;
188: do {
189: req = (httpRequest) waitingRequests.remove_head();
190: if (req != null) {
191: if (DEBUG)
192: System.err
193: .println("BufferCache: Firing off waiter "
194: + req);
195: ret = handleRequest(req);
196: }
197: } while ((req != null) && (ret == false));
198: }
199:
200: } else if (item instanceof AFileEOFReached) {
201: // Ignore
202:
203: } else if (item instanceof SinkClosedEvent) {
204: SinkClosedEvent sce = (SinkClosedEvent) item;
205: if (sce.sink instanceof httpConnection) {
206: httpConnection hc = (httpConnection) sce.sink;
207: outstandingRead or = (outstandingRead) hc.userTag;
208: if (or != null)
209: or.cleanup();
210: }
211:
212: } else {
213: System.err.println("BufferCache: Got unknown event type: "
214: + item);
215: }
216:
217: }
218:
219: public void handleEvents(QueueElementIF items[]) {
220: for (int i = 0; i < items.length; i++) {
221: handleEvent(items[i]);
222: }
223: }
224:
225: // Return false if error occurs
226: private boolean handleRequest(httpRequest req) {
227: String url;
228: String fname;
229:
230: url = req.getURL();
231: fname = ROOT_DIR + url;
232:
233: AFile af = null;
234: AFileStat stat;
235: httpOKResponse resp;
236: outstandingRead or;
237: BufferElement buf;
238:
239: if (DEBUG_NO_FILE_READ_RESP) {
240: // Send static response
241: resp = (httpOKResponse) freeResps.remove_head();
242: if (resp == null) {
243: System.err
244: .println("BufferCache: No free responses -- waiting!");
245: waitingRequests.add_to_tail(req);
246: return true;
247: }
248: sendSink.enqueue_lossy(new httpResponder(resp, req, false));
249: return true;
250: }
251:
252: // Try to get a free buffer
253: buf = (BufferElement) freeBuffers.remove_head();
254: if (buf == null) {
255: if (DEBUG)
256: System.err
257: .println("BufferCache: No free buffers, waiting.");
258: waitingRequests.add_to_tail(req);
259: return true;
260: }
261: if (DEBUG)
262: System.err.println("BufferCache: Got free buffer, "
263: + waitingRequests.size() + " waiters, "
264: + freeBuffers.size() + " freebufs");
265:
266: if (DEBUG_NO_FILE_READ && DEBUG_NO_FILE_READ_SAMESIZE) {
267: // Just send response with the free buffer
268: resp = new httpOKResponse("text/html", buf);
269: sendSink.enqueue_lossy(new httpResponder(resp, req, false));
270: // Will free buffer and fire off waiters when net I/O completes
271: return true;
272: }
273:
274: // Open file and stat it to determine size
275: try {
276: af = new AFile(fname, mysink, false, true);
277: stat = af.stat();
278: if (stat.isDirectory) {
279: af.close();
280: fname = fname + "/" + DEFAULT_URL;
281: af = new AFile(fname, mysink, false, true);
282: stat = af.stat();
283: }
284: } catch (IOException ioe) {
285: // File not found
286: System.err.println("BufferCache: Could not open file "
287: + fname + ": " + ioe);
288: HaboobStats.numErrors++;
289: httpNotFoundResponse notfound = new httpNotFoundResponse(
290: req, ioe.getMessage());
291: sendSink.enqueue_lossy(new httpResponder(notfound, req,
292: true));
293: freeBuffer(buf);
294: return false;
295: }
296:
297: // Create OR and fire off read
298: or = new outstandingRead(req, af, buf);
299: return true;
300: }
301:
302: private void freeBuffer(BufferElement buf) {
303: freeBuffers.add_to_tail(buf);
304: }
305:
306: private String getMimeType(String url) {
307: Enumeration e = mimeTbl.keys();
308: while (e.hasMoreElements()) {
309: String key = (String) e.nextElement();
310: if (url.endsWith(key))
311: return (String) mimeTbl.get(key);
312: }
313: return defaultMimeType;
314: }
315:
316: private class outstandingRead {
317: httpRequest request;
318: httpOKResponse response;
319: AFile af;
320: BufferElement buf, bufToSend;
321: int cur_file_offset;
322: int file_size;
323: ssLinkedList waiting;
324: boolean first = true;
325:
326: outstandingRead(httpRequest req, AFile af, BufferElement buf) {
327: this .request = req;
328: this .af = af;
329: this .file_size = (int) (af.stat().length);
330: this .cur_file_offset = 0;
331: this .response = new httpOKResponse(getMimeType(af
332: .getFilename()), null, file_size);
333: this .buf = buf;
334: req.getConnection().userTag = this ;
335: buf.userTag = this ;
336:
337: if (!DEBUG_NO_FILE_READ) {
338: aFileTbl.put(af, this );
339: if (DEBUG)
340: System.err.println("BufferCache: OR (file_size "
341: + file_size + ") doing initial read");
342: try {
343: af.read(buf);
344: } catch (SinkException se) {
345: // Should not happen!
346: System.err
347: .println("BufferCache: Warning: Got SE doing af.read(): "
348: + se);
349: cleanup();
350: }
351: } else {
352: // DEBUG_NO_FILE_READ: Fake out file read
353: AFileIOCompleted fake = new AFileIOCompleted(null, Math
354: .min(file_size, buf.size));
355: processFileIO(fake);
356: }
357: }
358:
359: // Process an AFile IO Completion on this file
360: void processFileIO(AFileIOCompleted comp) {
361: // Can issue next socket write
362:
363: if (DEBUG)
364: System.err
365: .println("BufferCache: Processing IOCompletion "
366: + comp);
367: int l = comp.sizeCompleted;
368: if (l < buf.size) {
369: if (DEBUG)
370: System.err
371: .println("BufferCache: Creating bufToSize, size "
372: + l);
373: bufToSend = new BufferElement(buf.data, 0, l);
374: bufToSend.userTag = this ;
375: bufToSend.compQ = mysink;
376: } else {
377: if (DEBUG)
378: System.err
379: .println("BufferCache: Using original buffer");
380: bufToSend = buf;
381: }
382: if (DEBUG)
383: System.err.println("BufferCache: Setting payload");
384: response.setPayload(bufToSend);
385:
386: httpResponder respd;
387: if (first) {
388: // Send header with response
389: if (DEBUG)
390: System.err.println("BufferCache: OR (file_size "
391: + file_size
392: + ") sending initial response, size " + l);
393: respd = new httpResponder(response, request, false,
394: true);
395: first = false;
396: } else {
397: // DON'T send header with response
398: if (DEBUG)
399: System.err.println("BufferCache: OR (file_size "
400: + file_size + ") sending response, size "
401: + l);
402: respd = new httpResponder(response, request, false,
403: false);
404: }
405: sendSink.enqueue_lossy(respd);
406: }
407:
408: // Process network I/O completion
409: boolean processNetIO(SinkDrainedEvent sde) {
410: // Issue next file read?
411: if (DEBUG)
412: System.err.println("BufferCache: OR (file_size "
413: + file_size
414: + ") incrementing cur_file_offset by "
415: + bufToSend.size);
416: cur_file_offset += bufToSend.size;
417: if (cur_file_offset < file_size) {
418:
419: if (!DEBUG_NO_FILE_READ) {
420: try {
421: if (DEBUG)
422: System.err
423: .println("BufferCache: OR (file_size "
424: + file_size
425: + ") issuing next read");
426: af.read(buf);
427: } catch (SinkException se) {
428: // Should not happen!
429: System.err
430: .println("BufferCache: Warning: Got SE doing af.read(): "
431: + se);
432: cleanup();
433: return true;
434: }
435: return false;
436: } else {
437: // DEBUG_NO_FILE_READ: Fake out file read
438: AFileIOCompleted fake = new AFileIOCompleted(null,
439: Math.min(file_size - cur_file_offset,
440: buf.size));
441: processFileIO(fake);
442: return false;
443: }
444:
445: } else {
446: // We are done with this file
447: cleanup();
448: return true;
449: }
450: }
451:
452: void cleanup() {
453: if (DEBUG)
454: System.err.println("BufferCache: OR (file_size "
455: + file_size + ") finished");
456: buf.userTag = null;
457: request.getConnection().userTag = null;
458: freeBuffer(buf);
459: af.close();
460: aFileTbl.remove(af);
461: }
462:
463: }
464:
465: }
|