001: /* RecordingInputStream
002: *
003: * $Id: RecordingInputStream.java 5080 2007-04-13 20:30:49Z gojomo $
004: *
005: * Created on Sep 24, 2003
006: *
007: * Copyright (C) 2003 Internet Archive.
008: *
009: * This file is part of the Heritrix web crawler (crawler.archive.org).
010: *
011: * Heritrix is free software; you can redistribute it and/or modify
012: * it under the terms of the GNU Lesser Public License as published by
013: * the Free Software Foundation; either version 2.1 of the License, or
014: * any later version.
015: *
016: * Heritrix is distributed in the hope that it will be useful,
017: * but WITHOUT ANY WARRANTY; without even the implied warranty of
018: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
019: * GNU Lesser Public License for more details.
020: *
021: * You should have received a copy of the GNU Lesser Public License
022: * along with Heritrix; if not, write to the Free Software
023: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
024: */
025: package org.archive.io;
026:
027: import java.io.File;
028: import java.io.FileOutputStream;
029: import java.io.IOException;
030: import java.io.InputStream;
031: import java.net.SocketException;
032: import java.net.SocketTimeoutException;
033: import java.security.MessageDigest;
034: import java.util.logging.Level;
035: import java.util.logging.Logger;
036:
037: /**
038: * Stream which records all data read from it, which it acquires from a wrapped
039: * input stream.
040: *
041: * Makes use of a RecordingOutputStream for recording because of its being
042: * file backed so we can write massive amounts of data w/o worrying about
043: * overflowing memory.
044: *
045: * @author gojomo
046: *
047: */
048: public class RecordingInputStream extends InputStream {
049:
050: protected static Logger logger = Logger
051: .getLogger("org.archive.io.RecordingInputStream");
052:
053: /**
054: * Where we are recording to.
055: */
056: private RecordingOutputStream recordingOutputStream;
057:
058: /**
059: * Stream to record.
060: */
061: private InputStream in = null;
062:
063: /**
064: * Reusable buffer to avoid reallocation on each readFullyUntil
065: */
066: protected byte[] drainBuffer = new byte[16 * 1024];
067:
068: /**
069: * Create a new RecordingInputStream.
070: *
071: * @param bufferSize Size of buffer to use.
072: * @param backingFilename Name of backing file.
073: */
074: public RecordingInputStream(int bufferSize, String backingFilename) {
075: this .recordingOutputStream = new RecordingOutputStream(
076: bufferSize, backingFilename);
077: }
078:
079: public void open(InputStream wrappedStream) throws IOException {
080: logger.fine(Thread.currentThread().getName() + " opening "
081: + wrappedStream + ", "
082: + Thread.currentThread().getName());
083: if (isOpen()) {
084: // error; should not be opening/wrapping in an unclosed
085: // stream remains open
086: throw new IOException("RIS already open for "
087: + Thread.currentThread().getName());
088: }
089: this .in = wrappedStream;
090: this .recordingOutputStream.open();
091: }
092:
093: public int read() throws IOException {
094: if (!isOpen()) {
095: throw new IOException("Stream closed "
096: + Thread.currentThread().getName());
097: }
098: int b = this .in.read();
099: if (b != -1) {
100: assert this .recordingOutputStream != null : "ROS is null "
101: + Thread.currentThread().getName();
102: this .recordingOutputStream.write(b);
103: }
104: return b;
105: }
106:
107: public int read(byte[] b, int off, int len) throws IOException {
108: if (!isOpen()) {
109: throw new IOException("Stream closed "
110: + Thread.currentThread().getName());
111: }
112: int count = this .in.read(b, off, len);
113: if (count > 0) {
114: assert this .recordingOutputStream != null : "ROS is null "
115: + Thread.currentThread().getName();
116: this .recordingOutputStream.write(b, off, count);
117: }
118: return count;
119: }
120:
121: public int read(byte[] b) throws IOException {
122: if (!isOpen()) {
123: throw new IOException("Stream closed "
124: + Thread.currentThread().getName());
125: }
126: int count = this .in.read(b);
127: if (count > 0) {
128: assert this .recordingOutputStream != null : "ROS is null "
129: + Thread.currentThread().getName();
130: this .recordingOutputStream.write(b, 0, count);
131: }
132: return count;
133: }
134:
135: public void close() throws IOException {
136: if (logger.isLoggable(Level.FINE)) {
137: logger
138: .fine(Thread.currentThread().getName()
139: + " closing " + this .in + ", "
140: + Thread.currentThread().getName());
141: }
142: if (this .in != null) {
143: this .in.close();
144: this .in = null;
145: }
146: this .recordingOutputStream.close();
147: }
148:
149: public ReplayInputStream getReplayInputStream() throws IOException {
150: return this .recordingOutputStream.getReplayInputStream();
151: }
152:
153: public ReplayInputStream getContentReplayInputStream()
154: throws IOException {
155: return this .recordingOutputStream.getContentReplayInputStream();
156: }
157:
158: public long readFully() throws IOException {
159: while (read(drainBuffer) != -1) {
160: // Empty out stream.
161: continue;
162: }
163: return this .recordingOutputStream.getSize();
164: }
165:
166: /**
167: * Read all of a stream (Or read until we timeout or have read to the max).
168: * @param softMaxLength Maximum length to read; if zero or < 0, then no
169: * limit. If met, return normally.
170: * @param hardMaxLength Maximum length to read; if zero or < 0, then no
171: * limit. If exceeded, throw RecorderLengthExceededException
172: * @param timeout Timeout in milliseconds for total read; if zero or
173: * negative, timeout is <code>Long.MAX_VALUE</code>. If exceeded, throw
174: * RecorderTimeoutException
175: * @param maxBytesPerMs How many bytes per millisecond.
176: * @throws IOException failed read.
177: * @throws RecorderLengthExceededException
178: * @throws RecorderTimeoutException
179: * @throws InterruptedException
180: */
181: public void readFullyOrUntil(long softMaxLength)
182: throws IOException, RecorderLengthExceededException,
183: RecorderTimeoutException, InterruptedException {
184: // Check we're open before proceeding.
185: if (!isOpen()) {
186: // TODO: should this be a noisier exception-raising error?
187: return;
188: }
189:
190: long totalBytes = 0L;
191: long bytesRead = -1L;
192: long maxToRead = -1;
193: while (true) {
194: try {
195: // read no more than soft max
196: maxToRead = (softMaxLength <= 0) ? drainBuffer.length
197: : Math.min(drainBuffer.length, softMaxLength
198: - totalBytes);
199: // nor more than hard max
200: maxToRead = Math.min(maxToRead, recordingOutputStream
201: .getRemainingLength());
202: // but always at least 1 (to trigger hard max exception
203: maxToRead = Math.max(maxToRead, 1);
204:
205: bytesRead = read(drainBuffer, 0, (int) maxToRead);
206: if (bytesRead == -1) {
207: break;
208: }
209: totalBytes += bytesRead;
210:
211: if (Thread.interrupted()) {
212: throw new InterruptedException(
213: "Interrupted during IO");
214: }
215: } catch (SocketTimeoutException e) {
216: // A socket timeout is just a transient problem, meaning
217: // nothing was available in the configured timeout period,
218: // but something else might become available later.
219: // Take this opportunity to check the overall
220: // timeout (below). One reason for this timeout is
221: // servers that keep up the connection, 'keep-alive', even
222: // though we asked them to not keep the connection open.
223: if (logger.isLoggable(Level.FINE)) {
224: logger.log(Level.FINE, "socket timeout", e);
225: }
226: // check for overall timeout
227: recordingOutputStream.checkLimits();
228: } catch (SocketException se) {
229: throw se;
230: } catch (NullPointerException e) {
231: // [ 896757 ] NPEs in Andy's Th-Fri Crawl.
232: // A crawl was showing NPE's in this part of the code but can
233: // not reproduce. Adding this rethrowing catch block w/
234: // diagnostics to help should we come across the problem in the
235: // future.
236: throw new NullPointerException("Stream " + this .in
237: + ", " + e.getMessage() + " "
238: + Thread.currentThread().getName());
239: }
240:
241: // if have read 'enough', just finish
242: if (softMaxLength > 0 && totalBytes >= softMaxLength) {
243: break; // return
244: }
245: }
246: }
247:
248: public long getSize() {
249: return this .recordingOutputStream.getSize();
250: }
251:
252: public void markContentBegin() {
253: this .recordingOutputStream.markContentBegin();
254: }
255:
256: public long getContentBegin() {
257: return this .recordingOutputStream.getContentBegin();
258: }
259:
260: public void startDigest() {
261: this .recordingOutputStream.startDigest();
262: }
263:
264: /**
265: * Convenience method for setting SHA1 digest.
266: */
267: public void setSha1Digest() {
268: this .recordingOutputStream.setSha1Digest();
269: }
270:
271: /**
272: * Sets a digest algorithm which may be applied to recorded data.
273: * As usually only a subset of the recorded data should
274: * be fed to the digest, you must also call startDigest()
275: * to begin digesting.
276: *
277: * @param algorithm
278: */
279: public void setDigest(String algorithm) {
280: this .recordingOutputStream.setDigest(algorithm);
281: }
282:
283: /**
284: * Sets a digest function which may be applied to recorded data.
285: * As usually only a subset of the recorded data should
286: * be fed to the digest, you must also call startDigest()
287: * to begin digesting.
288: *
289: * @param md
290: */
291: public void setDigest(MessageDigest md) {
292: this .recordingOutputStream.setDigest(md);
293: }
294:
295: /**
296: * Return the digest value for any recorded, digested data. Call
297: * only after all data has been recorded; otherwise, the running
298: * digest state is ruined.
299: *
300: * @return the digest final value
301: */
302: public byte[] getDigestValue() {
303: return this .recordingOutputStream.getDigestValue();
304: }
305:
306: public ReplayCharSequence getReplayCharSequence()
307: throws IOException {
308: return getReplayCharSequence(null);
309: }
310:
311: /**
312: * @param characterEncoding Encoding of recorded stream.
313: * @return A ReplayCharSequence Will return null if an IOException. Call
314: * close on returned RCS when done.
315: * @throws IOException
316: */
317: public ReplayCharSequence getReplayCharSequence(
318: String characterEncoding) throws IOException {
319: return this .recordingOutputStream
320: .getReplayCharSequence(characterEncoding);
321: }
322:
323: public long getResponseContentLength() {
324: return this .recordingOutputStream.getResponseContentLength();
325: }
326:
327: public void closeRecorder() throws IOException {
328: this .recordingOutputStream.closeRecorder();
329: }
330:
331: /**
332: * @param tempFile
333: * @throws IOException
334: */
335: public void copyContentBodyTo(File tempFile) throws IOException {
336: FileOutputStream fos = new FileOutputStream(tempFile);
337: ReplayInputStream ris = getContentReplayInputStream();
338: ris.readFullyTo(fos);
339: fos.close();
340: ris.close();
341: }
342:
343: /**
344: * @return True if we've been opened.
345: */
346: public boolean isOpen() {
347: return this .in != null;
348: }
349:
350: @Override
351: public synchronized void mark(int readlimit) {
352: this .in.mark(readlimit);
353: this .recordingOutputStream.mark();
354: }
355:
356: @Override
357: public boolean markSupported() {
358: return this .in.markSupported();
359: }
360:
361: @Override
362: public synchronized void reset() throws IOException {
363: this .in.reset();
364: this .recordingOutputStream.reset();
365: }
366:
367: /**
368: * Set limits to be enforced by internal recording-out
369: */
370: public void setLimits(long hardMax, long timeoutMs, long maxRateKBps) {
371: recordingOutputStream
372: .setLimits(hardMax, timeoutMs, maxRateKBps);
373: }
374: }
|