001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.apps.Haboob.cache;
026:
027: import seda.apps.Haboob.*;
028: import seda.apps.Haboob.http.*;
029: import seda.sandStorm.api.*;
030: import seda.sandStorm.core.*;
031: import seda.sandStorm.lib.http.*;
032: import seda.sandStorm.lib.aSocket.*;
033: import seda.sandStorm.lib.aDisk.*;
034: import seda.util.*;
035: import java.io.*;
036: import java.util.*;
037:
038: // For JDK1.1 Collections package
039: //import com.sun.java.util.collections.*;
040:
041: /**
042: * This version of PageCache maintains a list of cacheEntries for each
043: * page size, and attempts to reuse old entries of the same size on reject.
044: * This is the best implementation of the Haboob web page cache.
045: */
046: public class PageCacheSized implements EventHandlerIF, HaboobConst {
047:
048: private static final boolean DEBUG = false;
049: private static final boolean PROFILE = false;
050:
051: // Don't actually read file; just store empty buffer in cache
052: private static final boolean DEBUG_NO_FILE_READ = false;
053: // Don't even stat file; just allocate buffer of fixed size
054: private static final boolean DEBUG_NO_FILE_READ_SAMESIZE = false;
055: private static final int DEBUG_NO_FILE_READ_SAMESIZE_SIZE = 8192;
056: // Don't read file through aFile interface - just do it directly
057: private static final boolean DEBUG_DIRECT_FILE = false;
058:
059: // Rewrite incoming filename so all cache entries hit
060: private static final boolean DEBUG_SINGLE_CACHE_PAGE = false;
061: // If true, rewrite all request URLs to DEBUG_SINGLE_CACHE_PAGE_FNAME
062: // If false, all cache misses access same file, but different entries
063: private static final boolean DEBUG_SINGLE_CACHE_PAGE_SAMENAME = false;
064: // This file is of size 8192 bytes
065: private static final String DEBUG_SINGLE_CACHE_PAGE_FNAME = "/dir00000/class1_7";
066:
067: // Whether to prioritize cache hits over misses
068: private static final boolean PRIORITIZE_HITS = true;
069: private myComparator myComp;
070:
071: // Whether to handle misses in separate stage
072: private static final boolean SEPARATE_MISS_STAGE = true;
073: private SinkIF missStageSink;
074: private boolean missStage;
075:
076: private String DEFAULT_URL;
077: private String ROOT_DIR;
078:
079: private SinkIF mysink, sendSink;
080: private Hashtable pageTbl; // Map URL -> cacheEntry
081: private Hashtable sizeTbl; // Map size -> linked list of free cacheEntries
082: private Hashtable aFileTbl; // Map aFile -> cacheEntry
083: private int maxCacheSize;
084: private Random rand;
085:
086: private Hashtable mimeTbl; // Filename extension -> MIME type
087: private static final String defaultMimeType = "text/plain";
088:
089: // Used to initialize hit stage
090: public PageCacheSized() {
091: missStage = false;
092: }
093:
094: // Used to initialize miss stage
095: PageCacheSized(PageCacheSized hitStage) {
096: missStage = true;
097: pageTbl = hitStage.pageTbl;
098: sizeTbl = hitStage.sizeTbl;
099: aFileTbl = hitStage.aFileTbl;
100: mimeTbl = hitStage.mimeTbl;
101: rand = new Random();
102: DEFAULT_URL = hitStage.DEFAULT_URL;
103: ROOT_DIR = hitStage.ROOT_DIR;
104: maxCacheSize = hitStage.maxCacheSize;
105: if (PRIORITIZE_HITS) {
106: myComp = hitStage.myComp;
107: }
108: }
109:
110: public void init(ConfigDataIF config) throws Exception {
111: mysink = config.getStage().getSink();
112: System.err.println("PageCacheSized: missStage=" + missStage
113: + ", mysink=" + mysink);
114: sendSink = config.getManager().getStage(HTTP_SEND_STAGE)
115: .getSink();
116:
117: if (!missStage) {
118: pageTbl = new Hashtable();
119: sizeTbl = new Hashtable();
120: aFileTbl = new Hashtable();
121: rand = new Random();
122:
123: mimeTbl = new Hashtable();
124: mimeTbl.put(".html", "text/html");
125: mimeTbl.put(".gif", "image/gif");
126: mimeTbl.put(".jpg", "image/jpeg");
127: mimeTbl.put(".jpeg", "image/jpeg");
128: mimeTbl.put(".pdf", "application/pdf");
129:
130: DEFAULT_URL = config.getString("defaultURL");
131: if (DEFAULT_URL == null)
132: throw new IllegalArgumentException(
133: "Must specify defaultURL");
134: ROOT_DIR = config.getString("rootDir");
135: if (ROOT_DIR == null)
136: throw new IllegalArgumentException(
137: "Must specify rootDir");
138: maxCacheSize = config.getInt("maxCacheSize");
139:
140: if (PRIORITIZE_HITS) {
141: myComp = new myComparator();
142: }
143:
144: if (SEPARATE_MISS_STAGE) {
145: StageIF missStage = config.getManager().createStage(
146: "PageCacheSized missStage",
147: new PageCacheSized(this ), null);
148: missStageSink = missStage.getSink();
149: }
150: }
151: }
152:
153: public void destroy() {
154: }
155:
156: public void handleEvent(QueueElementIF item) {
157: if (DEBUG)
158: System.err.println("PageCacheSized (missStage=" + missStage
159: + "): GOT QEL: " + item);
160:
161: if (item instanceof httpRequest) {
162: HaboobStats.numRequests++;
163:
164: httpRequest req = (httpRequest) item;
165: if (req.getRequest() != httpRequest.REQUEST_GET) {
166: HaboobStats.numErrors++;
167: HttpSend
168: .sendResponse(new httpResponder(
169: new httpBadRequestResponse(req,
170: "Only GET requests supported at this time"),
171: req, true));
172: return;
173: }
174:
175: String url;
176: if (DEBUG_SINGLE_CACHE_PAGE
177: && DEBUG_SINGLE_CACHE_PAGE_SAMENAME) {
178: url = DEBUG_SINGLE_CACHE_PAGE_FNAME;
179: } else {
180: url = req.getURL();
181: }
182:
183: cacheEntry entry;
184: synchronized (pageTbl) {
185:
186: if (DEBUG)
187: System.err
188: .println("PageCacheSized: Checking cache for URL "
189: + url);
190: long t1 = 0, t2;
191: if (PROFILE)
192: t1 = System.currentTimeMillis();
193: entry = (cacheEntry) pageTbl.get(url);
194: if (PROFILE) {
195: t2 = System.currentTimeMillis();
196: HaboobStats.numCacheLookup++;
197: HaboobStats.timeCacheLookup += (t2 - t1);
198: }
199:
200: if (entry == null) {
201: // Got a cache miss
202: handleCacheMiss(req);
203: return;
204: }
205: }
206:
207: if (DEBUG)
208: System.err
209: .println("PageCacheSized: Got entry " + entry);
210: HaboobStats.numCacheHits++;
211: synchronized (entry) {
212: if (entry.pending) {
213: // Entry still pending - wait for it
214: if (DEBUG)
215: System.err
216: .println("PageCacheSized: Entry still pending");
217: entry.addWaiter(req);
218: } else {
219: // Got a hit - send it
220: if (DEBUG)
221: System.err
222: .println("PageCacheSized: Sending entry");
223: entry.send(req);
224: }
225: }
226:
227: } else if (item instanceof AFileIOCompleted) {
228:
229: AFileIOCompleted comp = (AFileIOCompleted) item;
230: AFile af = comp.getFile();
231: if (DEBUG)
232: System.err.println("PageCacheSized: Got AIOComp for "
233: + af.getFilename());
234:
235: cacheEntry entry = (cacheEntry) aFileTbl.get(af);
236: if (entry == null) {
237: System.err
238: .println("PageCacheSized: WARNING: Got AFileIOCompleted for non-entry: "
239: + comp);
240: return;
241: }
242: entry.done(comp);
243:
244: } else if (item instanceof SinkClosedEvent) {
245: SinkClosedEvent sce = (SinkClosedEvent) item;
246: if (sce.sink instanceof httpConnection) {
247: // Pass on to sendSink if not a file close event
248: sendSink.enqueue_lossy(sce);
249: }
250:
251: } else {
252: System.err
253: .println("PageCacheSized: Got unknown event type: "
254: + item);
255: }
256: }
257:
258: class myComparator implements Comparator {
259: public int compare(Object o1, Object o2) {
260: if ((o1 instanceof httpRequest)
261: && (o2 instanceof httpRequest)) {
262: httpRequest req1 = (httpRequest) o1;
263: httpRequest req2 = (httpRequest) o2;
264: int req1sz = isHit(req1);
265: int req2sz = isHit(req2);
266: if ((req1sz != -1) && (req2sz != -1)) {
267: if (req1sz < req2sz)
268: return -1;
269: else if (req1sz > req2sz)
270: return 1;
271: else
272: return 0;
273: } else if ((req1sz == -1) && (req2sz != -1)) {
274: return 1;
275: } else if ((req1sz != -1) && (req2sz == -1)) {
276: return -1;
277: } else {
278: return 0;
279: }
280: } else if ((o1 instanceof httpRequest)
281: && (!(o2 instanceof httpRequest))) {
282: return -1;
283: } else if (!(o1 instanceof httpRequest)
284: && ((o2 instanceof httpRequest))) {
285: return 1;
286: } else {
287: return 0;
288: }
289: }
290: }
291:
292: public void handleEvents(QueueElementIF items[]) {
293: if (PRIORITIZE_HITS) {
294: // Sort entries first
295: Arrays.sort(items, myComp);
296: }
297:
298: for (int i = 0; i < items.length; i++) {
299: handleEvent(items[i]);
300: }
301: }
302:
303: private int isHit(httpRequest req) {
304: cacheEntry entry = (cacheEntry) pageTbl.get(req.getURL());
305: if ((entry != null) && (!entry.pending))
306: return entry.size;
307: else
308: return -1;
309: }
310:
311: private void handleCacheMiss(httpRequest req) {
312: String url;
313: String fname;
314: long t1 = 0, t2;
315:
316: if (SEPARATE_MISS_STAGE && !missStage) {
317: if (!missStageSink.enqueue_lossy(req)) {
318: System.err
319: .println("PageCacheSized: WARNING: Could not enqueue "
320: + req + " to missStageSink");
321: }
322: return;
323: }
324:
325: if (DEBUG)
326: System.err
327: .println("PageCacheSized: Handling cache miss for "
328: + req);
329: HaboobStats.numCacheMisses++;
330:
331: if (DEBUG_SINGLE_CACHE_PAGE) {
332: if (DEBUG_SINGLE_CACHE_PAGE_SAMENAME) {
333: // Rewrite url
334: url = DEBUG_SINGLE_CACHE_PAGE_FNAME;
335: fname = ROOT_DIR + url;
336: } else {
337: // Rewrite fname, not url
338: url = req.getURL();
339: fname = ROOT_DIR + DEBUG_SINGLE_CACHE_PAGE_FNAME;
340: }
341: } else {
342: url = req.getURL();
343: fname = ROOT_DIR + url;
344: }
345:
346: AFile af = null;
347: AFileStat stat = null;
348: cacheEntry entry;
349:
350: if (DEBUG_NO_FILE_READ && DEBUG_NO_FILE_READ_SAMESIZE) {
351: // Create bogus entry
352: if (DEBUG)
353: System.err
354: .println("PageCacheSized: Creating bogus cacheEntry");
355: entry = getEntry(req, null,
356: DEBUG_NO_FILE_READ_SAMESIZE_SIZE);
357:
358: } else if (DEBUG_DIRECT_FILE) {
359:
360: // Don't use AFile - just read file directly
361: try {
362: File f = new File(fname);
363: RandomAccessFile raf = new RandomAccessFile(f, "r");
364: if (DEBUG)
365: System.err.println("PageCacheSized: Got file size "
366: + f.length());
367: entry = getEntry(req, null, (int) f.length());
368: if (DEBUG)
369: System.err
370: .println("PageCacheSized: Reading file directly, length "
371: + f.length()
372: + ", entrysize "
373: + entry.response.getPayload()
374: .getBytes().length);
375: BufferElement payload = entry.response.getPayload();
376: raf.readFully(payload.getBytes(), payload.offset,
377: payload.size);
378: raf.close();
379: entry.pending = false;
380: httpResponder respd = new httpResponder(entry.response,
381: req);
382: HttpSend.sendResponse(respd);
383: return;
384:
385: } catch (IOException ioe) {
386: // File not found
387: System.err
388: .println("PageCacheSized: Could not open file "
389: + fname + ": " + ioe);
390: ioe.printStackTrace();
391: HaboobStats.numErrors++;
392: httpNotFoundResponse notfound = new httpNotFoundResponse(
393: req, ioe.getMessage());
394: HttpSend.sendResponse(new httpResponder(notfound, req,
395: true));
396: return;
397: }
398:
399: } else {
400:
401: while (true) {
402: // Open file and stat it to determine size
403: try {
404: af = new AFile(fname, mysink, false, true);
405: stat = af.stat();
406: break;
407:
408: } catch (FileIsDirectoryException fde) {
409: // Tried to open a directory
410:
411: if (url.endsWith("/")) {
412: // Replace file with DEFAULT_URL and try again
413: if (fname.endsWith("/")) {
414: fname = fname + DEFAULT_URL;
415: } else {
416: fname = fname + "/" + DEFAULT_URL;
417: }
418: continue;
419: } else {
420: // Redirect to url+"/" (so that img src works in the document)
421: String newURL = url + "/";
422: httpRedirectResponse redirect = new httpRedirectResponse(
423: req, newURL);
424: HttpSend.sendResponse(new httpResponder(
425: redirect, req, true));
426: return;
427: }
428:
429: } catch (IOException ioe) {
430: // File not found
431: System.err
432: .println("PageCacheSized: Could not open file "
433: + fname + ": " + ioe);
434: HaboobStats.numErrors++;
435: httpNotFoundResponse notfound = new httpNotFoundResponse(
436: req, ioe.getMessage());
437: HttpSend.sendResponse(new httpResponder(notfound,
438: req, true));
439: return;
440: }
441: }
442:
443: // Allocate entry
444: if (DEBUG)
445: System.err.println("PageCacheSized: Got file size "
446: + stat.length);
447: entry = getEntry(req, af, (int) stat.length);
448: }
449:
450: if (DEBUG_NO_FILE_READ) {
451: // Pretend we got it already
452: entry.done(null);
453: } else {
454: // Issue read
455: entry.doRead();
456: }
457: }
458:
459: private String getMimeType(String url) {
460: Enumeration e = mimeTbl.keys();
461: while (e.hasMoreElements()) {
462: String key = (String) e.nextElement();
463: if (url.endsWith(key))
464: return (String) mimeTbl.get(key);
465: }
466: return defaultMimeType;
467: }
468:
469: // Obtain a new cache entry (either allocating a new entry or
470: // reusing an old one)
471: private cacheEntry getEntry(httpRequest req, AFile af, int size) {
472: cacheEntry entry = null;
473:
474: if (DEBUG)
475: System.err.println("PageCacheSized: Finding entry of size "
476: + size);
477:
478: if ((maxCacheSize != -1)
479: && (HaboobStats.cacheSizeBytes + size > maxCacheSize * 1024)) {
480: // Cache is full, try to reuse entry
481: if (DEBUG)
482: System.err
483: .println("PageCacheSized: Cache is full (size "
484: + (HaboobStats.cacheSizeBytes / 1024)
485: + " Kb)");
486: Integer isz = new Integer(size);
487: ssLinkedList ll = (ssLinkedList) sizeTbl.get(isz);
488: if ((ll == null) || (ll.size() == 0)) {
489: // No entries available, allocate new
490: if (DEBUG)
491: System.err
492: .println("PageCacheSized: No entry of this size, allocating");
493: return new cacheEntry(req, af, size);
494: } else {
495: // Reuse entry
496: if (DEBUG)
497: System.err.println("PageCacheSized: Sizelist has "
498: + ll.size() + " elements");
499: boolean found = false;
500: int count = 0;
501: while (count < ll.size()) {
502: entry = (cacheEntry) ll.remove_head();
503: if (entry.pending) {
504: ll.add_to_tail(entry);
505: count++;
506: } else {
507: if (DEBUG)
508: System.err
509: .println("PageCacheSized: Reusing entry "
510: + entry);
511: found = true;
512: break;
513: }
514: }
515: if (!found) {
516: // All entries pending, allocate anyway
517: if (DEBUG)
518: System.err
519: .println("PageCacheSized: All entries pending, allocating new");
520: return new cacheEntry(req, af, size);
521: }
522:
523: // Place back on list and reuse
524: ll.add_to_tail(entry);
525: entry.reuse(req, af);
526: return entry;
527: }
528:
529: } else {
530: if (DEBUG)
531: System.err
532: .println("PageCacheSized: Cache not full (size "
533: + (HaboobStats.cacheSizeBytes / 1024)
534: + " Kb), allocating");
535: // Cache not full, go ahead and allocate
536: return new cacheEntry(req, af, size);
537: }
538:
539: }
540:
541: private class cacheEntry {
542: httpOKResponse response;
543: boolean pending;
544: int size;
545: AFile af;
546: ssLinkedList waiting, sizeList;
547: String url;
548: long tStartRead, tEndRead;
549:
550: // Allocate a new cache entry
551: private cacheEntry(httpRequest req, AFile af, int size) {
552: if (DEBUG)
553: System.err
554: .println("PageCacheSized: Allocating new cache entry for "
555: + af.getFilename() + ", size=" + size);
556:
557: if (af == null) {
558: this .response = new httpOKResponse("text/plain", size);
559: } else {
560: this .response = new httpOKResponse(getMimeType(af
561: .getFilename()), size);
562: }
563: this .size = size;
564: this .url = req.getURL();
565: this .af = af;
566: pending = true;
567: waiting = new ssLinkedList();
568: addWaiter(req);
569:
570: // Add to pageTbl
571: pageTbl.put(url, this );
572: // Add to aFileTbl
573: if (af != null) {
574: aFileTbl.put(af, this );
575: }
576: // Add to sizeTbl
577: Integer isz = new Integer(size);
578: ssLinkedList ll = (ssLinkedList) sizeTbl.get(isz);
579: if (ll == null) {
580: ll = new ssLinkedList();
581: sizeTbl.put(isz, ll);
582: }
583: ll.add_to_tail(this );
584: this .sizeList = ll;
585: HaboobStats.cacheSizeEntries++;
586: HaboobStats.cacheSizeBytes += size;
587: }
588:
589: // Reuse a cache entry
590: /* FIXME: Avoid reuse of cache entry that is currently being
591: * written out to another socket? (Maintain 'write count' which
592: * is incremented for each send, decremented for each SinkDreainedEvent,
593: * and SinkClosedEvent (when the associated SinkDrainedEvent did
594: * not arrive yet due to the conn being closed first).
595: */
596: private synchronized void reuse(httpRequest req, AFile af) {
597: if (DEBUG)
598: System.err.println("PageCacheSized: entry " + this
599: + " being reused for " + af.getFilename());
600: if (this .af != null) {
601: aFileTbl.remove(this .af);
602: this .af.close();
603: }
604: this .af = af;
605: if (af != null) {
606: aFileTbl.put(af, this );
607: }
608: synchronized (pageTbl) {
609: pageTbl.remove(url);
610: this .url = req.getURL();
611: pageTbl.put(url, this );
612: }
613: pending = true;
614: waiting.remove_all();
615: addWaiter(req);
616: }
617:
618: // Initiate file read
619: void doRead() {
620: if (DEBUG)
621: System.err
622: .println("PageCacheSized: Initiating read on entry "
623: + this );
624: if (af == null)
625: return;
626: try {
627: if (PROFILE)
628: tStartRead = System.currentTimeMillis();
629: af.read(response.getPayload());
630: } catch (SinkException se) {
631: System.err
632: .println("PageCacheSized: Got SinkException attempting read on "
633: + af + ": " + se);
634: HaboobStats.numErrors++;
635: httpRequest waiter;
636: while ((waiter = (httpRequest) waiting.remove_head()) != null) {
637: httpNotFoundResponse notfound = new httpNotFoundResponse(
638: waiter, se.getMessage());
639: httpResponder respd = new httpResponder(notfound,
640: waiter, true);
641: HttpSend.sendResponse(respd);
642: }
643: free();
644: }
645: }
646:
647: // Free cache entry and remove from system for GC
648: void free() {
649: System.err.println("PageCacheSized: Freeing entry " + this );
650: if (af != null) {
651: aFileTbl.remove(af);
652: af.close();
653: af = null;
654: }
655: pageTbl.remove(url);
656: sizeList.remove_item(this );
657: response = null;
658: }
659:
660: synchronized void addWaiter(httpRequest req) {
661: waiting.add_to_tail(req);
662: }
663:
664: httpOKResponse getResponse() {
665: return response;
666: }
667:
668: // Send response to all waiters when done reading
669: synchronized void done(AFileIOCompleted comp) {
670: if (DEBUG)
671: System.err
672: .println("PageCacheSized: Done with file read on "
673: + this );
674:
675: if ((comp != null) && (comp.sizeCompleted != size)) {
676: throw new RuntimeException(
677: "PageCacheSized: WARNING: Got "
678: + comp.sizeCompleted
679: + " bytes read, expecting " + size);
680: }
681:
682: if (af != null) {
683: af.close();
684: aFileTbl.remove(af);
685: af = null;
686: }
687: if (PROFILE) {
688: tEndRead = System.currentTimeMillis();
689: HaboobStats.numFileRead++;
690: HaboobStats.timeFileRead += (tEndRead - tStartRead);
691: }
692:
693: pending = false;
694: httpRequest waiter;
695:
696: while ((waiter = (httpRequest) waiting.remove_head()) != null) {
697: httpResponder respd = new httpResponder(response,
698: waiter);
699: HttpSend.sendResponse(respd);
700: }
701: }
702:
703: // Send cache entry on hit
704: void send(httpRequest req) {
705: httpResponder respd = new httpResponder(response, req);
706: HttpSend.sendResponse(respd);
707: }
708:
709: public String toString() {
710: if (af != null) {
711: return "cacheEntry [af=" + af.getFilename() + ", size="
712: + size + "]";
713: } else {
714: return "cacheEntry [size=" + size + "]";
715: }
716: }
717: }
718:
719: // An experiment: Try running cache misses in a separate stage
720: class CacheMissStage implements EventHandlerIF {
721: SinkIF mysink;
722:
723: public void init(ConfigDataIF config) throws Exception {
724: System.err.println("CacheMissStage: mysink " + mysink);
725: mysink = config.getStage().getSink();
726: }
727:
728: public void destroy() throws Exception {
729: }
730:
731: public void handleEvent(QueueElementIF event) {
732: // Actually run the enclosing class method, since
733: // a cache entry may be loaded by an earlier request
734: // in the pipeline.
735: System.err.println("CacheMissStage: handling " + event);
736: PageCacheSized.this .handleEvent(event);
737: }
738:
739: public void handleEvents(QueueElementIF items[]) {
740: for (int i = 0; i < items.length; i++) {
741: handleEvent(items[i]);
742: }
743: }
744:
745: }
746:
747: }
|