001: /* ReplayableOutputStream
002: *
003: * $Id: RecordingOutputStream.java 5080 2007-04-13 20:30:49Z gojomo $
004: *
005: * Created on Sep 23, 2003
006: *
007: * Copyright (C) 2003 Internet Archive.
008: *
009: * This file is part of the Heritrix web crawler (crawler.archive.org).
010: *
011: * Heritrix is free software; you can redistribute it and/or modify
012: * it under the terms of the GNU Lesser Public License as published by
013: * the Free Software Foundation; either version 2.1 of the License, or
014: * any later version.
015: *
016: * Heritrix is distributed in the hope that it will be useful,
017: * but WITHOUT ANY WARRANTY; without even the implied warranty of
018: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
019: * GNU Lesser Public License for more details.
020: *
021: * You should have received a copy of the GNU Lesser Public License
022: * along with Heritrix; if not, write to the Free Software
023: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
024: */
025: package org.archive.io;
026:
027: import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
028:
029: import java.io.FileOutputStream;
030: import java.io.IOException;
031: import java.io.OutputStream;
032: import java.security.MessageDigest;
033: import java.security.NoSuchAlgorithmException;
034: import java.util.logging.Level;
035: import java.util.logging.Logger;
036:
037: import org.archive.util.IoUtils;
038:
039: /**
040: * An output stream that records all writes to wrapped output
041: * stream.
042: *
043: * A RecordingOutputStream can be wrapped around any other
044: * OutputStream to record all bytes written to it. You can
045: * then request a ReplayInputStream to read those bytes.
046: *
047: * <p>The RecordingOutputStream uses an in-memory buffer and
048: * backing disk file to allow it to record streams of
049: * arbitrary length limited only by available disk space.
050: *
051: * <p>As long as the stream recorded is smaller than the
052: * in-memory buffer, no disk access will occur.
053: *
054: * <p>Recorded content can be recovered as a ReplayInputStream
055: * (via getReplayInputStream() or, for only the content after
056: * the content-begin-mark is set, getContentReplayInputStream() )
057: * or as a ReplayCharSequence (via getReplayCharSequence()).
058: *
059: * <p>This class is also used as a straight output stream
060: * by {@link RecordingInputStream} to which it records all reads.
061: * {@link RecordingInputStream} is exploiting the file backed buffer
062: * facility of this class passing <code>null</code> for the stream
063: * to wrap. TODO: Make a FileBackedOutputStream class that is
064: * subclassed by RecordingInputStream.
065: *
066: * @author gojomo
067: *
068: */
069: public class RecordingOutputStream extends OutputStream {
070: protected static Logger logger = Logger
071: .getLogger(RecordingOutputStream.class.getName());
072:
073: /**
074: * Size of recording.
075: *
076: * Later passed to ReplayInputStream on creation. It uses it to know when
077: * EOS.
078: */
079: private long size = 0;
080:
081: private String backingFilename;
082: private OutputStream diskStream = null;
083:
084: /**
085: * Buffer we write recordings to.
086: *
087: * We write all recordings here first till its full. Thereafter we
088: * write the backing file.
089: */
090: private byte[] buffer;
091:
092: /** current virtual position in the recording */
093: private long position;
094:
095: /** flag to disable recording */
096: private boolean recording;
097:
098: /**
099: * Reusable buffer for FastBufferedOutputStream
100: */
101: protected byte[] bufStreamBuf = new byte[FastBufferedOutputStream.DEFAULT_BUFFER_SIZE];
102:
103: /**
104: * True if we're to digest content.
105: */
106: private boolean shouldDigest = false;
107:
108: /**
109: * Digest instance.
110: */
111: private MessageDigest digest = null;
112:
113: /**
114: * Define for SHA1 alogarithm.
115: */
116: private static final String SHA1 = "SHA1";
117:
118: /**
119: * Maximum amount of header material to accept without the content
120: * body beginning -- if more, throw a RecorderTooMuchHeaderException.
121: * TODO: make configurable? make smaller?
122: */
123: protected static final long MAX_HEADER_MATERIAL = 1024 * 1024; // 1MB
124:
125: // configurable max length, max time limits
126: /** maximum length of material to record before throwing exception */
127: protected long maxLength = Long.MAX_VALUE;
128: /** maximum time to record before throwing exception */
129: protected long timeoutMs = Long.MAX_VALUE;
130: /** maximum rate to record (adds delays to hit target rate) */
131: protected long maxRateBytesPerMs = Long.MAX_VALUE;
132: /** time recording begins for timeout, rate calculations */
133: protected long startTime = Long.MAX_VALUE;
134:
135: /**
136: * When recording HTTP, where the content-body starts.
137: */
138: private long contentBeginMark;
139:
140: /**
141: * Stream to record.
142: */
143: private OutputStream out = null;
144:
145: // mark/reset support
146: /** furthest position reached before any reset()s */
147: private long maxPosition = 0;
148: /** remembered position to reset() to */
149: private long markPosition = 0;
150:
151: /**
152: * Create a new RecordingOutputStream.
153: *
154: * @param bufferSize Buffer size to use.
155: * @param backingFilename Name of backing file to use.
156: */
157: public RecordingOutputStream(int bufferSize, String backingFilename) {
158: this .buffer = new byte[bufferSize];
159: this .backingFilename = backingFilename;
160: recording = true;
161: }
162:
163: /**
164: * Wrap the given stream, both recording and passing along any data written
165: * to this RecordingOutputStream.
166: *
167: * @throws IOException If failed creation of backing file.
168: */
169: public void open() throws IOException {
170: this .open(null);
171: }
172:
173: /**
174: * Wrap the given stream, both recording and passing along any data written
175: * to this RecordingOutputStream.
176: *
177: * @param wrappedStream Stream to wrap. May be null for case where we
178: * want to write to a file backed stream only.
179: *
180: * @throws IOException If failed creation of backing file.
181: */
182: public void open(OutputStream wrappedStream) throws IOException {
183: if (isOpen()) {
184: // error; should not be opening/wrapping in an unclosed
185: // stream remains open
186: throw new IOException("ROS already open for "
187: + Thread.currentThread().getName());
188: }
189: this .out = wrappedStream;
190: this .position = 0;
191: this .markPosition = 0;
192: this .maxPosition = 0;
193: this .size = 0;
194: this .contentBeginMark = -1;
195: // ensure recording turned on
196: this .recording = true;
197: // Always begins false; must use startDigest() to begin
198: this .shouldDigest = false;
199: if (this .diskStream != null) {
200: closeDiskStream();
201: }
202: if (this .diskStream == null) {
203: // TODO: Fix so we only make file when its actually needed.
204: FileOutputStream fis = new FileOutputStream(
205: this .backingFilename);
206:
207: this .diskStream = new RecyclingFastBufferedOutputStream(
208: fis, bufStreamBuf);
209: }
210: startTime = System.currentTimeMillis();
211: }
212:
213: public void write(int b) throws IOException {
214: if (position < maxPosition) {
215: // revisiting previous content; do nothing but advance position
216: position++;
217: return;
218: }
219: if (recording) {
220: record(b);
221: }
222: if (this .out != null) {
223: this .out.write(b);
224: }
225: checkLimits();
226: }
227:
228: public void write(byte[] b, int off, int len) throws IOException {
229: if (position < maxPosition) {
230: if (position + len <= maxPosition) {
231: // revisiting; do nothing but advance position
232: position += len;
233: return;
234: }
235: // consume part of the array doing nothing but advancing position
236: long consumeRange = maxPosition - position;
237: position += consumeRange;
238: off += consumeRange;
239: len -= consumeRange;
240: }
241: if (recording) {
242: record(b, off, len);
243: }
244: if (this .out != null) {
245: this .out.write(b, off, len);
246: }
247: checkLimits();
248: }
249:
250: /**
251: * Check any enforced limits.
252: */
253: protected void checkLimits() throws RecorderIOException {
254: // too much material before finding end of headers?
255: if (contentBeginMark < 0) {
256: // no mark yet
257: if (position > MAX_HEADER_MATERIAL) {
258: throw new RecorderTooMuchHeaderException();
259: }
260: }
261: // overlong?
262: if (position > maxLength) {
263: throw new RecorderLengthExceededException();
264: }
265: // taking too long?
266: long duration = System.currentTimeMillis() - startTime + 1; // !divzero
267: if (duration > timeoutMs) {
268: throw new RecorderTimeoutException();
269: }
270: // need to throttle reading to hit max configured rate?
271: if (position / duration > maxRateBytesPerMs) {
272: long desiredDuration = position / maxRateBytesPerMs;
273: try {
274: Thread.sleep(desiredDuration - duration);
275: } catch (InterruptedException e) {
276: logger.log(Level.WARNING,
277: "bandwidth throttling sleep interrupted", e);
278: }
279: }
280: }
281:
282: /**
283: * Record the given byte for later recovery
284: *
285: * @param b Int to record.
286: *
287: * @exception IOException Failed write to backing file.
288: */
289: private void record(int b) throws IOException {
290: if (this .shouldDigest) {
291: this .digest.update((byte) b);
292: }
293: if (this .position >= this .buffer.length) {
294: // TODO: Its possible to call write w/o having first opened a
295: // stream. Protect ourselves against this.
296: assert this .diskStream != null : "Diskstream is null";
297: this .diskStream.write(b);
298: } else {
299: this .buffer[(int) this .position] = (byte) b;
300: }
301: this .position++;
302: }
303:
304: /**
305: * Record the given byte-array range for recovery later
306: *
307: * @param b Buffer to record.
308: * @param off Offset into buffer at which to start recording.
309: * @param len Length of buffer to record.
310: *
311: * @exception IOException Failed write to backing file.
312: */
313: private void record(byte[] b, int off, int len) throws IOException {
314: if (this .shouldDigest) {
315: assert this .digest != null : "Digest is null.";
316: this .digest.update(b, off, len);
317: }
318: tailRecord(b, off, len);
319: }
320:
321: /**
322: * Record without digesting.
323: *
324: * @param b Buffer to record.
325: * @param off Offset into buffer at which to start recording.
326: * @param len Length of buffer to record.
327: *
328: * @exception IOException Failed write to backing file.
329: */
330: private void tailRecord(byte[] b, int off, int len)
331: throws IOException {
332: if (this .position >= this .buffer.length) {
333: // TODO: Its possible to call write w/o having first opened a
334: // stream. Lets protect ourselves against this.
335: if (this .diskStream == null) {
336: throw new IOException("diskstream is null");
337: }
338: this .diskStream.write(b, off, len);
339: this .position += len;
340: } else {
341: assert this .buffer != null : "Buffer is null";
342: int toCopy = (int) Math.min(this .buffer.length
343: - this .position, len);
344: assert b != null : "Passed buffer is null";
345: System.arraycopy(b, off, this .buffer, (int) this .position,
346: toCopy);
347: this .position += toCopy;
348: // TODO verify these are +1 -1 right
349: if (toCopy < len) {
350: tailRecord(b, off + toCopy, len - toCopy);
351: }
352: }
353: }
354:
355: public void close() throws IOException {
356: if (contentBeginMark < 0) {
357: // if unset, consider 0 posn as content-start
358: // (so that a -1 never survives to replay step)
359: contentBeginMark = 0;
360: }
361: if (this .out != null) {
362: this .out.close();
363: this .out = null;
364: }
365: closeRecorder();
366: }
367:
368: protected synchronized void closeDiskStream() throws IOException {
369: if (this .diskStream != null) {
370: this .diskStream.close();
371: this .diskStream = null;
372: }
373: }
374:
375: public void closeRecorder() throws IOException {
376: recording = false;
377: closeDiskStream(); // if any
378: // This setting of size is important. Its passed to ReplayInputStream
379: // on creation. It uses it to know EOS.
380: if (this .size == 0) {
381: this .size = this .position;
382: }
383: }
384:
385: /* (non-Javadoc)
386: * @see java.io.OutputStream#flush()
387: */
388: public void flush() throws IOException {
389: if (this .out != null) {
390: this .out.flush();
391: }
392: if (this .diskStream != null) {
393: this .diskStream.flush();
394: }
395: }
396:
397: public ReplayInputStream getReplayInputStream() throws IOException {
398: return getReplayInputStream(0);
399: }
400:
401: public ReplayInputStream getReplayInputStream(long skip)
402: throws IOException {
403: // If this method is being called, then assumption must be that the
404: // stream is closed. If it ain't, then the stream gotten won't work
405: // -- the size will zero so any attempt at a read will get back EOF.
406: assert this .out == null : "Stream is still open.";
407: ReplayInputStream replay = new ReplayInputStream(this .buffer,
408: this .size, this .contentBeginMark, this .backingFilename);
409: replay.skip(skip);
410: return replay;
411: }
412:
413: /**
414: * Return a replay stream, cued up to begining of content
415: *
416: * @throws IOException
417: * @return An RIS.
418: */
419: public ReplayInputStream getContentReplayInputStream()
420: throws IOException {
421: return getReplayInputStream(this .contentBeginMark);
422: }
423:
424: public long getSize() {
425: return this .size;
426: }
427:
428: /**
429: * Remember the current position as the start of the "response
430: * body". Useful when recording HTTP traffic as a way to start
431: * replays after the headers.
432: */
433: public void markContentBegin() {
434: this .contentBeginMark = this .position;
435: startDigest();
436: }
437:
438: /**
439: * Return stored content-begin-mark (which is also end-of-headers)
440: */
441: public long getContentBegin() {
442: return this .contentBeginMark;
443: }
444:
445: /**
446: * Starts digesting recorded data, if a MessageDigest has been
447: * set.
448: */
449: public void startDigest() {
450: if (this .digest != null) {
451: this .digest.reset();
452: this .shouldDigest = true;
453: }
454: }
455:
456: /**
457: * Convenience method for setting SHA1 digest.
458: * @see #setDigest(String)
459: */
460: public void setSha1Digest() {
461: setDigest(SHA1);
462: }
463:
464: /**
465: * Sets a digest function which may be applied to recorded data.
466: * The difference between calling this method and {@link #setDigest(MessageDigest)}
467: * is that this method tries to reuse MethodDigest instance if already allocated
468: * and of appropriate algorithm.
469: * @param algorithm Message digest algorithm to use.
470: * @see #setDigest(MessageDigest)
471: */
472: public void setDigest(String algorithm) {
473: try {
474: // Reuse extant digest if its sha1 algorithm.
475: if (this .digest == null
476: || !this .digest.getAlgorithm().equals(algorithm)) {
477: setDigest(MessageDigest.getInstance(algorithm));
478: }
479: } catch (NoSuchAlgorithmException e) {
480: e.printStackTrace();
481: }
482: }
483:
484: /**
485: * Sets a digest function which may be applied to recorded data.
486: *
487: * As usually only a subset of the recorded data should
488: * be fed to the digest, you must also call startDigest()
489: * to begin digesting.
490: *
491: * @param md Message digest function to use.
492: */
493: public void setDigest(MessageDigest md) {
494: this .digest = md;
495: }
496:
497: /**
498: * Return the digest value for any recorded, digested data. Call
499: * only after all data has been recorded; otherwise, the running
500: * digest state is ruined.
501: *
502: * @return the digest final value
503: */
504: public byte[] getDigestValue() {
505: if (this .digest == null) {
506: return null;
507: }
508: return this .digest.digest();
509: }
510:
511: public ReplayCharSequence getReplayCharSequence()
512: throws IOException {
513: return getReplayCharSequence(null);
514: }
515:
516: public ReplayCharSequence getReplayCharSequence(
517: String characterEncoding) throws IOException {
518: return getReplayCharSequence(characterEncoding,
519: this .contentBeginMark);
520: }
521:
522: /**
523: * @param characterEncoding Encoding of recorded stream.
524: * @return A ReplayCharSequence Will return null if an IOException. Call
525: * close on returned RCS when done.
526: * @throws IOException
527: */
528: public ReplayCharSequence getReplayCharSequence(
529: String characterEncoding, long startOffset)
530: throws IOException {
531: // TODO: handled transfer-encoding: chunked content-bodies properly
532: float maxBytesPerChar = IoUtils
533: .encodingMaxBytesPerChar(characterEncoding);
534: if (maxBytesPerChar <= 1) {
535: // single
536: // TODO: take into account single-byte encoding may be non-default
537: return new ByteReplayCharSequence(this .buffer, this .size,
538: startOffset, this .backingFilename);
539: } else {
540: // multibyte
541: if (this .size <= this .buffer.length) {
542: // raw data is all in memory; do in memory
543: return new MultiByteReplayCharSequence(this .buffer,
544: this .size, startOffset, characterEncoding);
545:
546: } else {
547: // raw data overflows to disk; use temp file
548: ReplayInputStream ris = getReplayInputStream(startOffset);
549: ReplayCharSequence rcs = new MultiByteReplayCharSequence(
550: ris, this .backingFilename, characterEncoding);
551: ris.close();
552: return rcs;
553: }
554:
555: }
556:
557: }
558:
559: public long getResponseContentLength() {
560: return this .size - this .contentBeginMark;
561: }
562:
563: /**
564: * @return True if this ROS is open.
565: */
566: public boolean isOpen() {
567: return this .out != null;
568: }
569:
570: /**
571: * When used alongside a mark-supporting RecordingInputStream, remember
572: * a position reachable by a future reset().
573: */
574: public void mark() {
575: // remember this position for subsequent reset()
576: this .markPosition = position;
577: }
578:
579: /**
580: * When used alongside a mark-supporting RecordingInputStream, reset
581: * the position to that saved by previous mark(). Until the position
582: * again reached "new" material, none of the bytes pushed to this
583: * stream will be digested or recorded.
584: */
585: public void reset() {
586: // take note of furthest-position-reached to avoid double-recording
587: maxPosition = Math.max(maxPosition, position);
588: // reset to previous position
589: position = markPosition;
590: }
591:
592: /**
593: * Set limits on length, time, and rate to enforce.
594: *
595: * @param length
596: * @param milliseconds
597: * @param rateKBps
598: */
599: public void setLimits(long length, long milliseconds, long rateKBps) {
600: maxLength = (length > 0) ? length : Long.MAX_VALUE;
601: timeoutMs = (milliseconds > 0) ? milliseconds : Long.MAX_VALUE;
602: maxRateBytesPerMs = (rateKBps > 0) ? rateKBps * 1024 / 1000
603: : Long.MAX_VALUE;
604: }
605:
606: /**
607: * Reset limits to effectively-unlimited defaults
608: */
609: public void resetLimits() {
610: maxLength = Long.MAX_VALUE;
611: timeoutMs = Long.MAX_VALUE;
612: maxRateBytesPerMs = Long.MAX_VALUE;
613: }
614:
615: /**
616: * Return number of bytes that could be recorded without hitting
617: * length limit
618: *
619: * @return long byte count
620: */
621: public long getRemainingLength() {
622: return maxLength - position;
623: }
624: }
|