001: /* GzippedInputStream
002: *
003: * $Id: GzippedInputStream.java 4995 2007-03-12 23:48:36Z stack-sf $
004: *
005: * Created on July 5, 2004
006: *
007: * Copyright (C) 2004 Internet Archive.
008: *
009: * This file is part of the Heritrix web crawler (crawler.archive.org).
010: *
011: * Heritrix is free software; you can redistribute it and/or modify
012: * it under the terms of the GNU Lesser Public License as published by
013: * the Free Software Foundation; either version 2.1 of the License, or
014: * any later version.
015: *
016: * Heritrix is distributed in the hope that it will be useful,
017: * but WITHOUT ANY WARRANTY; without even the implied warranty of
018: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
019: * GNU Lesser Public License for more details.
020: *
021: * You should have received a copy of the GNU Lesser Public License
022: * along with Heritrix; if not, write to the Free Software
023: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
024: */
025: package org.archive.io;
026:
027: import it.unimi.dsi.fastutil.io.RepositionableStream;
028:
029: import java.io.ByteArrayOutputStream;
030: import java.io.EOFException;
031: import java.io.IOException;
032: import java.io.InputStream;
033: import java.util.Iterator;
034: import java.util.logging.Logger;
035: import java.util.zip.Deflater;
036: import java.util.zip.GZIPInputStream;
037: import java.util.zip.GZIPOutputStream;
038: import java.util.zip.Inflater;
039: import java.util.zip.ZipException;
040:
041: /**
042: * Subclass of GZIPInputStream that can handle a stream made of multiple
043: * concatenated GZIP members/records.
044: *
045: * This class is needed because GZIPInputStream only finds the first GZIP
046: * member in the file even if the file is made up of multiple GZIP members.
047: *
048: * <p>Takes an InputStream stream that implements
049: * {@link RepositionableStream} interface so it can backup over-reads done
050: * by the zlib Inflater class.
051: *
052: * <p>Use the {@link #iterator()} method to get a gzip member iterator.
053: * Calls to {@link Iterator#next()} returns the next gzip member in the
054: * stream. Cast return from {@link Iterator#next()} to InputStream.
055: *
056: * <p>Use {@link #gzipMemberSeek(long)} to position stream before reading
057: * a gzip member if doing random accessing of gzip members. Pass it offset
058: * at which gzip member starts.
059: *
060: * <p>If you need to know position at which a gzip member starts, call
061: * {@link #position()} just after a call to {@link Iterator#hasNext()}
062: * and before you call {@link Iterator#next()}.
063: *
064: * @author stack
065: */
066: public class GzippedInputStream extends GZIPInputStream implements
067: RepositionableStream {
068: /**
069: * Tail on gzip members (The CRC).
070: */
071: private static final int GZIP_TRAILER_LENGTH = 8;
072:
073: /**
074: * Utility class used probing for gzip members in stream.
075: * We need this instance to get at the readByte method.
076: */
077: private final GzipHeader gzipHeader = new GzipHeader();
078:
079: /**
080: * Buffer size used skipping over gzip members.
081: */
082: private static final int LINUX_PAGE_SIZE = 4 * 1024;
083:
084: private final long initialOffset;
085:
086: public GzippedInputStream(InputStream is) throws IOException {
087: // Have buffer match linux page size.
088: this (is, LINUX_PAGE_SIZE);
089: }
090:
091: /**
092: * @param is An InputStream that implements RespositionableStream and
093: * returns <code>true</code> when we call
094: * {@link InputStream#markSupported()} (Latter is needed so can setup
095: * an {@link Iterator} against the Gzip stream).
096: * @param size Size of blocks to use reading.
097: * @throws IOException
098: */
099: public GzippedInputStream(final InputStream is, final int size)
100: throws IOException {
101: super (checkStream(is), size);
102: if (!is.markSupported()) {
103: throw new IllegalArgumentException(
104: "GzippedInputStream requires "
105: + "a markable stream");
106: }
107: if (!(is instanceof RepositionableStream)) {
108: throw new IllegalArgumentException(
109: "GzippedInputStream requires "
110: + "a stream that implements RepositionableStream");
111: }
112: // We need to calculate the absolute offset of the current
113: // GZIP Member. Its almost always going to be zero but not
114: // always (We may have been passed a stream that is already part
115: // ways through a stream of GZIP Members). So, getting
116: // absolute offset is not exactly straight-forward. The super
117: // class, GZIPInputStream on construction reads in the GZIP Header
118: // which is a pain because I then do not know the absolute offset
119: // at which the GZIP record began. So, the call above to checkStream()
120: // marked the stream before passing it to the super calls. Then
121: // below we get current postion at just past the GZIP Header, call
122: // reset so we go back to the absolute start of the GZIP Member in
123: // the file, record the offset for later should we need to start
124: // over again in this file -- i.e. we're asked to get an iterator
125: // from Record zero on -- then we move the file position to just
126: // after the GZIP Header again so we're again aligned for inflation
127: // of the current record.
128: long afterGZIPHeader = ((RepositionableStream) is).position();
129: is.reset();
130: this .initialOffset = ((RepositionableStream) is).position();
131: ((RepositionableStream) is).position(afterGZIPHeader);
132: }
133:
134: protected static InputStream checkStream(final InputStream is)
135: throws IOException {
136: if (is instanceof RepositionableStream) {
137: // See note above in constructor on why the mark here.
138: // Also minimal gzip header is 10. IA GZIP Headers are 20 bytes.
139: // Multiply by 4 in case extra info in the header.
140: is.mark(GzipHeader.MINIMAL_GZIP_HEADER_LENGTH * 4);
141: return is;
142: }
143: throw new IOException("Passed stream does not"
144: + " implement PositionableStream");
145: }
146:
147: /**
148: * Exhaust current GZIP member content.
149: * Call this method when you think you're on the end of the
150: * GZIP member. It will clean out any dross.
151: * @param ignore Character to ignore counting characters (Usually
152: * trailing new lines).
153: * @return Count of characters skipped over.
154: * @throws IOException
155: */
156: public long gotoEOR(int ignore) throws IOException {
157: long bytesSkipped = 0;
158: if (this .inf.getTotalIn() <= 0) {
159: return bytesSkipped;
160: }
161: if (!this .inf.finished()) {
162: int read = 0;
163: while ((read = read()) != -1) {
164: if ((byte) read == (byte) ignore) {
165: continue;
166: }
167: bytesSkipped = gotoEOR() + 1;
168: break;
169: }
170: }
171: return bytesSkipped;
172: }
173:
174: /**
175: * Exhaust current GZIP member content.
176: * Call this method when you think you're on the end of the
177: * GZIP member. It will clean out any dross.
178: * @return Count of characters skipped over.
179: * @throws IOException
180: */
181: public long gotoEOR() throws IOException {
182: long bytesSkipped = 0;
183: if (this .inf.getTotalIn() <= 0) {
184: return bytesSkipped;
185: }
186: while (!this .inf.finished()) {
187: bytesSkipped += skip(Long.MAX_VALUE);
188: }
189: return bytesSkipped;
190: }
191:
192: /**
193: * Returns a GZIP Member Iterator.
194: * Has limitations. Can only get one Iterator per instance of this class;
195: * you must get new instance if you want to get Iterator again.
196: * @return Iterator over GZIP Members.
197: */
198: public Iterator iterator() {
199: final Logger logger = Logger.getLogger(this .getClass()
200: .getName());
201:
202: try {
203: // We know its a RepositionableStream else we'd have failed
204: // construction. On iterator construction, set file back to
205: // initial position so we're ready to read GZIP Members
206: // (May not always work dependent on how the
207: // RepositionableStream was implemented).
208: ((RepositionableStream) this .in)
209: .position(this .initialOffset);
210: } catch (IOException e) {
211: throw new RuntimeException(e);
212: }
213: return new Iterator() {
214: private GzippedInputStream compressedStream = GzippedInputStream.this ;
215:
216: public boolean hasNext() {
217: try {
218: gotoEOR();
219: } catch (IOException e) {
220: if ((e instanceof ZipException)
221: || (e.getMessage() != null && e
222: .getMessage().startsWith(
223: "Corrupt GZIP trailer"))) {
224: // Try skipping end of bad record; try moving to next.
225: logger.info("Skipping exception "
226: + e.getMessage());
227: } else {
228: throw new RuntimeException(e);
229: }
230: }
231: return moveToNextGzipMember();
232: }
233:
234: /**
235: * @return An InputStream onto a GZIP Member.
236: */
237: public Object next() {
238: try {
239: gzipMemberSeek();
240: } catch (IOException e) {
241: throw new RuntimeException("Failed move to EOR or "
242: + "failed header read: " + e.getMessage());
243: }
244: return this .compressedStream;
245: }
246:
247: public void remove() {
248: throw new UnsupportedOperationException();
249: }
250: };
251: }
252:
253: /**
254: * @return True if we found another record in the stream.
255: */
256: protected boolean moveToNextGzipMember() {
257: boolean result = false;
258: // Move to the next gzip member, if there is one, positioning
259: // ourselves by backing up the stream so we reread any inflater
260: // remaining bytes. Then add 8 bytes to get us past the GZIP
261: // CRC trailer block that ends all gzip members.
262: try {
263: RepositionableStream ps = (RepositionableStream) getInputStream();
264: // 8 is sizeof gzip CRC block thats on tail of gzipped
265: // record. If remaining is < 8 then experience indicates
266: // we're seeking past the gzip header -- don't backup the
267: // stream.
268: if (getInflater().getRemaining() > GZIP_TRAILER_LENGTH) {
269: ps.position(position() - getInflater().getRemaining()
270: + GZIP_TRAILER_LENGTH);
271: }
272: for (int read = -1, headerRead = 0; true; headerRead = 0) {
273: // Give a hint to underlying stream that we're going to want to
274: // do some backing up.
275: getInputStream().mark(3);
276: if ((read = getInputStream().read()) == -1) {
277: break;
278: }
279: if (compareBytes(read, GZIPInputStream.GZIP_MAGIC)) {
280: headerRead++;
281: if ((read = getInputStream().read()) == -1) {
282: break;
283: }
284: if (compareBytes(read,
285: GZIPInputStream.GZIP_MAGIC >> 8)) {
286: headerRead++;
287: if ((read = getInputStream().read()) == -1) {
288: break;
289: }
290: if (compareBytes(read, Deflater.DEFLATED)) {
291: headerRead++;
292: // Found gzip header. Backup the stream the
293: // bytes we just found and set result true.
294: getInputStream().reset();
295: result = true;
296: break;
297: }
298: }
299: // Didn't find gzip header. Reset stream but one byte
300: // futher on then redo header tests.
301: ps.position(ps.position() - headerRead);
302: }
303: }
304: } catch (IOException e) {
305: throw new RuntimeException("Failed i/o: " + e.getMessage());
306: }
307: return result;
308: }
309:
310: protected boolean compareBytes(final int a, final int b) {
311: return ((byte) (a & 0xff)) == ((byte) (b & 0xff));
312: }
313:
314: protected Inflater getInflater() {
315: return this .inf;
316: }
317:
318: protected InputStream getInputStream() {
319: return this .in;
320: }
321:
322: protected GzipHeader getGzipHeader() {
323: return this .gzipHeader;
324: }
325:
326: /**
327: * Move to next gzip member in the file.
328: */
329: protected void resetInflater() {
330: this .eos = false;
331: this .inf.reset();
332: }
333:
334: /**
335: * Read in the gzip header.
336: * @throws IOException
337: */
338: protected void readHeader() throws IOException {
339: new GzipHeader(this .in);
340: // Reset the crc for subsequent reads.
341: this .crc.reset();
342: }
343:
344: /**
345: * Seek to passed offset.
346: *
347: * After positioning the stream, it resets the inflater.
348: * Assumption is that public use of this method is only
349: * to position stream at start of a gzip member.
350: *
351: * @param position Absolute position of a gzip member start.
352: * @throws IOException
353: */
354: public void position(long position) throws IOException {
355: ((RepositionableStream) this .in).position(position);
356: resetInflater();
357: }
358:
359: public long position() throws IOException {
360: return ((RepositionableStream) this .in).position();
361: }
362:
363: /**
364: * Seek to a gzip member.
365: *
366: * Moves stream to new position, resets inflater and reads in the gzip
367: * header ready for subsequent calls to read.
368: *
369: * @param position Absolute position of a gzip member start.
370: * @throws IOException
371: */
372: public void gzipMemberSeek(long position) throws IOException {
373: position(position);
374: readHeader();
375: }
376:
377: public void gzipMemberSeek() throws IOException {
378: gzipMemberSeek(position());
379: }
380:
381: /**
382: * Gzip passed bytes.
383: * Use only when bytes is small.
384: * @param bytes What to gzip.
385: * @return A gzip member of bytes.
386: * @throws IOException
387: */
388: public static byte[] gzip(byte[] bytes) throws IOException {
389: ByteArrayOutputStream baos = new ByteArrayOutputStream();
390: GZIPOutputStream gzipOS = new GZIPOutputStream(baos);
391: gzipOS.write(bytes, 0, bytes.length);
392: gzipOS.close();
393: return baos.toByteArray();
394: }
395:
396: /**
397: * Tests passed stream is GZIP stream by reading in the HEAD.
398: * Does reposition of stream when done.
399: * @param rs An InputStream that is Repositionable.
400: * @return True if compressed stream.
401: * @throws IOException
402: */
403: public static boolean isCompressedRepositionableStream(
404: final RepositionableStream rs) throws IOException {
405: boolean result = false;
406: long p = rs.position();
407: try {
408: result = isCompressedStream((InputStream) rs);
409: } finally {
410: rs.position(p);
411: }
412: return result;
413: }
414:
415: /**
416: * Tests passed stream is gzip stream by reading in the HEAD.
417: * Does not reposition stream when done.
418: * @param is An InputStream.
419: * @return True if compressed stream.
420: * @throws IOException
421: */
422: public static boolean isCompressedStream(final InputStream is)
423: throws IOException {
424: try {
425: new GzipHeader(is);
426: } catch (NoGzipMagicException e) {
427: return false;
428: }
429: return true;
430: }
431: }
|