001: /* $Id: ArchiveReader.java 4996 2007-03-13 00:08:58Z stack-sf $
002: *
003: * Created on August 21st, 2006
004: *
005: * Copyright (C) 2006 Internet Archive.
006: *
007: * This file is part of the Heritrix web crawler (crawler.archive.org).
008: *
009: * Heritrix is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU Lesser Public License as published by
011: * the Free Software Foundation; either version 2.1 of the License, or
012: * any later version.
013: *
014: * Heritrix is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
017: * GNU Lesser Public License for more details.
018: *
019: * You should have received a copy of the GNU Lesser Public License
020: * along with Heritrix; if not, write to the Free Software
021: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
022: */
023: package org.archive.io;
024:
025: import it.unimi.dsi.fastutil.io.RepositionableStream;
026:
027: import java.io.BufferedInputStream;
028: import java.io.BufferedWriter;
029: import java.io.EOFException;
030: import java.io.File;
031: import java.io.FileWriter;
032: import java.io.IOException;
033: import java.io.InputStream;
034: import java.util.ArrayList;
035: import java.util.Iterator;
036: import java.util.List;
037: import java.util.logging.Level;
038: import java.util.logging.Logger;
039: import org.apache.commons.cli.Option;
040: import org.apache.commons.cli.Options;
041: import org.archive.util.MimetypeUtils;
042:
043: /**
044: * Reader for an Archive file of Archive {@link ArchiveRecord}s.
045: * @author stack
046: * @version $Date: 2007-03-13 00:08:58 +0000 (Tue, 13 Mar 2007) $ $Version$
047: */
048: public abstract class ArchiveReader implements ArchiveFileConstants {
049: /**
050: * Is this Archive file compressed?
051: */
052: private boolean compressed = false;
053:
054: /**
055: * Should we digest as we read?
056: */
057: private boolean digest = true;
058:
059: /**
060: * Should the parse be strict?
061: */
062: private boolean strict = false;
063:
064: /**
065: * Archive file input stream.
066: *
067: * Keep it around so we can close it when done.
068: *
069: * <p>Set in constructor. Must support {@link RepositionableStream}
070: * interface. Make it protected so subclasses have access.
071: */
072: private InputStream in = null;
073:
074: /**
075: * Maximum amount of recoverable exceptions in a row.
076: * If more than this amount in a row, we'll let out the exception rather
077: * than go back in for yet another retry.
078: */
079: public static final int MAX_ALLOWED_RECOVERABLES = 10;
080:
081: /**
082: * The Record currently being read.
083: *
084: * Keep this ongoing reference so we'll close the record even if the caller
085: * doesn't.
086: */
087: private ArchiveRecord currentRecord = null;
088:
089: /**
090: * Descriptive string for the Archive file we're going against:
091: * full path, url, etc. -- depends on context in which file was made.
092: */
093: private String identifier = null;
094:
095: /**
096: * Archive file version.
097: */
098: private String version = null;
099:
100: protected ArchiveReader() {
101: super ();
102: }
103:
104: /**
105: * Convenience method used by subclass constructors.
106: * @param i Identifier for Archive file this reader goes against.
107: */
108: protected void initialize(final String i) {
109: setReaderIdentifier(i);
110: }
111:
112: /**
113: * Convenience method for constructors.
114: *
115: * @param f File to read.
116: * @param offset Offset at which to start reading.
117: * @return InputStream to read from.
118: * @throws IOException If failed open or fail to get a memory
119: * mapped byte buffer on file.
120: */
121: protected InputStream getInputStream(final File f, final long offset)
122: throws IOException {
123: return new RandomAccessBufferedInputStream(
124: new RandomAccessInputStream(f, offset));
125: }
126:
127: public boolean isCompressed() {
128: return this .compressed;
129: }
130:
131: /**
132: * Get record at passed <code>offset</code>.
133: *
134: * @param offset Byte index into file at which a record starts.
135: * @return An Archive Record reference.
136: * @throws IOException
137: */
138: public ArchiveRecord get(long offset) throws IOException {
139: cleanupCurrentRecord();
140: RepositionableStream ps = (RepositionableStream) this .in;
141: long currentOffset = ps.position();
142: if (currentOffset != offset) {
143: currentOffset = offset;
144: ps.position(offset);
145: }
146: return createArchiveRecord(this .in, currentOffset);
147: }
148:
149: /**
150: * @return Return Archive Record created against current offset.
151: * @throws IOException
152: */
153: public ArchiveRecord get() throws IOException {
154: return createArchiveRecord(this .in,
155: ((RepositionableStream) this .in).position());
156: }
157:
158: public void close() throws IOException {
159: if (this .in != null) {
160: this .in.close();
161: this .in = null;
162: }
163: }
164:
165: /**
166: * Rewinds stream to start of the Archive file.
167: * @throws IOException if stream is not resettable.
168: */
169: protected void rewind() throws IOException {
170: cleanupCurrentRecord();
171: if (this .in instanceof RepositionableStream) {
172: try {
173: ((RepositionableStream) this .in).position(0);
174: } catch (IOException e) {
175: throw new RuntimeException(e);
176: }
177: } else {
178: throw new IOException("Stream is not resettable.");
179: }
180: }
181:
182: /**
183: * Cleanout the current record if there is one.
184: * @throws IOException
185: */
186: protected void cleanupCurrentRecord() throws IOException {
187: if (this .currentRecord != null) {
188: this .currentRecord.close();
189: gotoEOR(this .currentRecord);
190: this .currentRecord = null;
191: }
192: }
193:
194: /**
195: * Return an Archive Record homed on <code>offset</code> into
196: * <code>is</code>.
197: * @param is Stream to read Record from.
198: * @param offset Offset to find Record at.
199: * @return ArchiveRecord instance.
200: * @throws IOException
201: */
202: protected abstract ArchiveRecord createArchiveRecord(
203: InputStream is, long offset) throws IOException;
204:
205: /**
206: * Skip over any trailing new lines at end of the record so we're lined up
207: * ready to read the next.
208: * @param record
209: * @throws IOException
210: */
211: protected abstract void gotoEOR(ArchiveRecord record)
212: throws IOException;
213:
214: public abstract String getFileExtension();
215:
216: public abstract String getDotFileExtension();
217:
218: /**
219: * @return Version of this Archive file.
220: */
221: public String getVersion() {
222: return this .version;
223: }
224:
225: /**
226: * Validate the Archive file.
227: *
228: * This method iterates over the file throwing exception if it fails
229: * to successfully parse any record.
230: *
231: * <p>Assumes the stream is at the start of the file.
232: * @return List of all read Archive Headers.
233: *
234: * @throws IOException
235: */
236: public List validate() throws IOException {
237: return validate(-1);
238: }
239:
240: /**
241: * Validate the Archive file.
242: *
243: * This method iterates over the file throwing exception if it fails
244: * to successfully parse.
245: *
246: * <p>We start validation from whereever we are in the stream.
247: *
248: * @param noRecords Number of records expected. Pass -1 if number is
249: * unknown.
250: *
251: * @return List of all read metadatas. As we validate records, we add
252: * a reference to the read metadata.
253: *
254: * @throws IOException
255: */
256: public List validate(int noRecords) throws IOException {
257: List<ArchiveRecordHeader> hs = new ArrayList<ArchiveRecordHeader>();
258: int count = 0;
259: setStrict(true);
260: for (Iterator<ArchiveRecord> i = iterator(); i.hasNext();) {
261: count++;
262: ArchiveRecord r = i.next();
263: if (r.getHeader().getLength() <= 0
264: && r.getHeader().getMimetype().equals(
265: MimetypeUtils.NO_TYPE_MIMETYPE)) {
266: throw new IOException("ARCRecord content is empty.");
267: }
268: r.close();
269: // Add reference to metadata into a list of metadatas.
270: hs.add(r.getHeader());
271: }
272:
273: if (noRecords != -1) {
274: if (count != noRecords) {
275: throw new IOException("Count of records, "
276: + Integer.toString(count)
277: + " is less than expected "
278: + Integer.toString(noRecords));
279: }
280: }
281:
282: return hs;
283: }
284:
285: /**
286: * Test Archive file is valid.
287: * Assumes the stream is at the start of the file. Be aware that this
288: * method makes a pass over the whole file.
289: * @return True if file can be successfully parsed.
290: */
291: public boolean isValid() {
292: boolean valid = false;
293: try {
294: validate();
295: valid = true;
296: } catch (Exception e) {
297: // File is not valid if exception thrown parsing.
298: valid = false;
299: }
300:
301: return valid;
302: }
303:
304: /**
305: * @return Returns the strict.
306: */
307: public boolean isStrict() {
308: return this .strict;
309: }
310:
311: /**
312: * @param s The strict to set.
313: */
314: public void setStrict(boolean s) {
315: this .strict = s;
316: }
317:
318: /**
319: * @param d True if we're to digest.
320: */
321: public void setDigest(boolean d) {
322: this .digest = d;
323: }
324:
325: /**
326: * @return True if we're digesting as we read.
327: */
328: public boolean isDigest() {
329: return this .digest;
330: }
331:
332: protected Logger getLogger() {
333: return Logger.getLogger(this .getClass().getName());
334: }
335:
336: protected InputStream getInputStream() {
337: return this .in;
338: }
339:
340: /**
341: * Returns an ArchiveRecord iterator.
342: * Of note, on IOException, especially if ZipException reading compressed
343: * ARCs, rather than fail the iteration, try moving to the next record.
344: * If {@link ArchiveReader#strict} is not set, this will usually succeed.
345: * @return An iterator over ARC records.
346: */
347: public Iterator<ArchiveRecord> iterator() {
348: // Eat up any record outstanding.
349: try {
350: cleanupCurrentRecord();
351: } catch (IOException e) {
352: throw new RuntimeException(e);
353: }
354:
355: // Now reset stream to the start of the arc file.
356: try {
357: rewind();
358: } catch (IOException e) {
359: throw new RuntimeException(e);
360: }
361: return new ArchiveRecordIterator();
362: }
363:
364: protected void setCompressed(boolean compressed) {
365: this .compressed = compressed;
366: }
367:
368: /**
369: * @return The current ARC record or null if none.
370: * After construction has the arcfile header record.
371: * @see #get()
372: */
373: protected ArchiveRecord getCurrentRecord() {
374: return this .currentRecord;
375: }
376:
377: protected ArchiveRecord currentRecord(
378: final ArchiveRecord currentRecord) {
379: this .currentRecord = currentRecord;
380: return currentRecord;
381: }
382:
383: protected InputStream getIn() {
384: return in;
385: }
386:
387: protected void setIn(InputStream in) {
388: this .in = in;
389: }
390:
391: protected void setVersion(String version) {
392: this .version = version;
393: }
394:
395: public String getReaderIdentifier() {
396: return this .identifier;
397: }
398:
399: protected void setReaderIdentifier(final String i) {
400: this .identifier = i;
401: }
402:
403: /**
404: * Log on stderr.
405: * Logging should go via the logging system. This method
406: * bypasses the logging system going direct to stderr.
407: * Should not generally be used. Its used for rare messages
408: * that come of cmdline usage of ARCReader ERRORs and WARNINGs.
409: * Override if using ARCReader in a context where no stderr or
410: * where you'd like to redirect stderr to other than System.err.
411: * @param level Level to log message at.
412: * @param message Message to log.
413: */
414: public void logStdErr(Level level, String message) {
415: System.err.println(level.toString() + " " + message);
416: }
417:
418: /**
419: * Add buffering to RandomAccessInputStream.
420: */
421: protected class RandomAccessBufferedInputStream extends
422: BufferedInputStream implements RepositionableStream {
423:
424: public RandomAccessBufferedInputStream(
425: RandomAccessInputStream is) throws IOException {
426: super (is);
427: }
428:
429: public RandomAccessBufferedInputStream(
430: RandomAccessInputStream is, int size)
431: throws IOException {
432: super (is, size);
433: }
434:
435: public long position() throws IOException {
436: // Current position is the underlying files position
437: // minus the amount thats in the buffer yet to be read.
438: return ((RandomAccessInputStream) this .in).position()
439: - (this .count - this .pos);
440: }
441:
442: public void position(long position) throws IOException {
443: // Force refill of buffer whenever there's been a seek.
444: this .pos = 0;
445: this .count = 0;
446: ((RandomAccessInputStream) this .in).position(position);
447: }
448: }
449:
450: /**
451: * Inner ArchiveRecord Iterator class.
452: * Throws RuntimeExceptions in {@link #hasNext()} and {@link #next()} if
453: * trouble pulling record from underlying stream.
454: * @author stack
455: */
456: protected class ArchiveRecordIterator implements
457: Iterator<ArchiveRecord> {
458: private final Logger logger = Logger.getLogger(this .getClass()
459: .getName());
460:
461: /**
462: * @return True if we have more records to read.
463: * @exception RuntimeException Can throw an IOException wrapped in a
464: * RuntimeException if a problem reading underlying stream (Corrupted
465: * gzip, etc.).
466: */
467: public boolean hasNext() {
468: // Call close on any extant record. This will scoot us past
469: // any content not yet read.
470: try {
471: cleanupCurrentRecord();
472: } catch (IOException e) {
473: if (isStrict()) {
474: throw new RuntimeException(e);
475: }
476: if (e instanceof EOFException) {
477: logger.warning("Premature EOF cleaning up "
478: + currentRecord.getHeader().toString()
479: + ": " + e.getMessage());
480: return false;
481: }
482: // If not strict, try going again. We might be able to skip
483: // over the bad record.
484: logger
485: .warning("Trying skip of failed record cleanup of "
486: + currentRecord.getHeader().toString()
487: + ": " + e.getMessage());
488: }
489: return innerHasNext();
490: }
491:
492: protected boolean innerHasNext() {
493: long offset = -1;
494: try {
495: offset = ((RepositionableStream) getInputStream())
496: .position();
497: return getInputStream().available() > 0;
498: } catch (IOException e) {
499: throw new RuntimeException("Offset " + offset, e);
500: }
501: }
502:
503: /**
504: * Tries to move to next record if we get
505: * {@link RecoverableIOException}. If not <code>strict</code>
506: * tries to move to next record if we get an
507: * {@link IOException}.
508: * @return Next object.
509: * @exception RuntimeException Throws a runtime exception,
510: * usually a wrapping of an IOException, if trouble getting
511: * a record (Throws exception rather than return null).
512: */
513: public ArchiveRecord next() {
514: long offset = -1;
515: try {
516: offset = ((RepositionableStream) getInputStream())
517: .position();
518: return exceptionNext();
519: } catch (IOException e) {
520: if (!isStrict()) {
521: // Retry though an IOE. Maybe we will succeed reading
522: // subsequent record.
523: try {
524: if (hasNext()) {
525: getLogger().warning(
526: "Bad Record. Trying skip "
527: + "(Current offset "
528: + offset + "): "
529: + e.getMessage());
530: return exceptionNext();
531: }
532: // Else we are at last record. Iterator#next is
533: // expecting value. We do not have one. Throw exception.
534: throw new RuntimeException(
535: "Retried but no next "
536: + "record (Offset " + offset
537: + ")", e);
538: } catch (IOException e1) {
539: throw new RuntimeException(
540: "After retry (Offset " + offset + ")",
541: e1);
542: }
543: }
544: throw new RuntimeException("(Offset " + offset + ")", e);
545: }
546: }
547:
548: /**
549: * A next that throws exceptions and has handling of
550: * recoverable exceptions moving us to next record. Can call
551: * hasNext which itself may throw exceptions.
552: * @return Next record.
553: * @throws IOException
554: * @throws RuntimeException Thrown when we've reached maximum
555: * retries.
556: */
557: protected ArchiveRecord exceptionNext() throws IOException,
558: RuntimeException {
559: ArchiveRecord result = null;
560: IOException ioe = null;
561: for (int i = MAX_ALLOWED_RECOVERABLES; i > 0
562: && result == null; i--) {
563: ioe = null;
564: try {
565: result = innerNext();
566: } catch (RecoverableIOException e) {
567: ioe = e;
568: getLogger().warning(e.getMessage());
569: if (hasNext()) {
570: continue;
571: }
572: // No records left. Throw exception rather than
573: // return null. The caller is expecting to get
574: // back a record since they've just called
575: // hasNext.
576: break;
577: }
578: }
579: if (ioe != null) {
580: // Then we did MAX_ALLOWED_RECOVERABLES retries. Throw
581: // the recoverable ioe wrapped in a RuntimeException so
582: // it goes out pass checks for IOE.
583: throw new RuntimeException("Retried "
584: + MAX_ALLOWED_RECOVERABLES + " times in a row",
585: ioe);
586: }
587: return result;
588: }
589:
590: protected ArchiveRecord innerNext() throws IOException {
591: return get(((RepositionableStream) getInputStream())
592: .position());
593: }
594:
595: public void remove() {
596: throw new UnsupportedOperationException();
597: }
598: }
599:
600: protected static String stripExtension(final String name,
601: final String ext) {
602: return (!name.endsWith(ext)) ? name : name.substring(0, name
603: .length()
604: - ext.length());
605: }
606:
607: /**
608: * @return short name of Archive file.
609: */
610: public String getFileName() {
611: return (new File(getReaderIdentifier())).getName();
612: }
613:
614: /**
615: * @return short name of Archive file.
616: */
617: public String getStrippedFileName() {
618: return getStrippedFileName(getFileName(), getDotFileExtension());
619: }
620:
621: /**
622: * @param name Name of ARCFile.
623: * @param dotFileExtension '.arc' or '.warc', etc.
624: * @return short name of Archive file.
625: */
626: public static String getStrippedFileName(String name,
627: final String dotFileExtension) {
628: name = stripExtension(name,
629: ArchiveFileConstants.DOT_COMPRESSED_FILE_EXTENSION);
630: return stripExtension(name, dotFileExtension);
631: }
632:
633: /**
634: * @param value Value to test.
635: * @return True if value is 'true', else false.
636: */
637: protected static boolean getTrueOrFalse(final String value) {
638: if (value == null || value.length() <= 0) {
639: return false;
640: }
641: return Boolean.TRUE.toString().equals(value.toLowerCase());
642: }
643:
644: /**
645: * @param format Format to use outputting.
646: * @throws IOException
647: * @throws java.text.ParseException
648: * @return True if handled.
649: */
650: protected boolean output(final String format) throws IOException,
651: java.text.ParseException {
652: boolean result = true;
653: // long start = System.currentTimeMillis();
654:
655: // Write output as pseudo-CDX file. See
656: // http://www.archive.org/web/researcher/cdx_legend.php
657: // and http://www.archive.org/web/researcher/example_cdx.php.
658: // Hash is hard-coded straight SHA-1 hash of content.
659: if (format.equals(DUMP)) {
660: // No point digesting dumping.
661: setDigest(false);
662: dump(false);
663: } else if (format.equals(GZIP_DUMP)) {
664: // No point digesting dumping.
665: setDigest(false);
666: dump(true);
667: } else if (format.equals(CDX)) {
668: cdxOutput(false);
669: } else if (format.equals(CDX_FILE)) {
670: cdxOutput(true);
671: } else {
672: result = false;
673: }
674: return result;
675: }
676:
677: protected void cdxOutput(boolean toFile) throws IOException {
678: BufferedWriter cdxWriter = null;
679: if (toFile) {
680: String cdxFilename = stripExtension(getReaderIdentifier(),
681: DOT_COMPRESSED_FILE_EXTENSION);
682: cdxFilename = stripExtension(cdxFilename,
683: getDotFileExtension());
684: cdxFilename += ('.' + CDX);
685: cdxWriter = new BufferedWriter(new FileWriter(cdxFilename));
686: }
687:
688: String header = "CDX b e a m s c "
689: + ((isCompressed()) ? "V" : "v") + " n g";
690: if (toFile) {
691: cdxWriter.write(header);
692: cdxWriter.newLine();
693: } else {
694: System.out.println(header);
695: }
696:
697: String strippedFileName = getStrippedFileName();
698: try {
699: for (Iterator<ArchiveRecord> ii = iterator(); ii.hasNext();) {
700: ArchiveRecord r = ii.next();
701: if (toFile) {
702: cdxWriter.write(r.outputCdx(strippedFileName));
703: cdxWriter.newLine();
704: } else {
705: System.out.println(r.outputCdx(strippedFileName));
706: }
707: }
708: } finally {
709: if (toFile) {
710: cdxWriter.close();
711: }
712: }
713: }
714:
715: /**
716: * Output passed record using passed format specifier.
717: * @param format What format to use outputting.
718: * @throws IOException
719: * @return True if handled.
720: */
721: public boolean outputRecord(final String format) throws IOException {
722: boolean result = true;
723: if (format.equals(CDX)) {
724: System.out.println(get().outputCdx(getStrippedFileName()));
725: } else if (format.equals(ArchiveFileConstants.DUMP)) {
726: // No point digesting if dumping content.
727: setDigest(false);
728: get().dump();
729: } else {
730: result = false;
731: }
732: return result;
733: }
734:
735: /**
736: * Dump this file on STDOUT
737: * @throws compress True if dumped output is compressed.
738: * @throws IOException
739: * @throws java.text.ParseException
740: */
741: public abstract void dump(final boolean compress)
742: throws IOException, java.text.ParseException;
743:
744: /**
745: * @return an ArchiveReader that will delete a local file on close. Used
746: * when we bring Archive files local and need to clean up afterward.
747: */
748: public abstract ArchiveReader getDeleteFileOnCloseReader(
749: final File f);
750:
751: /**
752: * Output passed record using passed format specifier.
753: * @param r ARCReader instance to output.
754: * @param format What format to use outputting.
755: * @throws IOException
756: */
757: protected static void outputRecord(final ArchiveReader r,
758: final String format) throws IOException {
759: if (!r.outputRecord(format)) {
760: throw new IOException("Unsupported format"
761: + " (or unsupported on a single record): " + format);
762: }
763: }
764:
765: /**
766: * @return Base Options object filled out with help, digest, strict, etc.
767: * options.
768: */
769: protected static Options getOptions() {
770: Options options = new Options();
771: options.addOption(new Option("h", "help", false,
772: "Prints this message and exits."));
773: options.addOption(new Option("o", "offset", true,
774: "Outputs record at this offset into file."));
775: options.addOption(new Option("d", "digest", true,
776: "Pass true|false. Expensive. Default: true (SHA-1)."));
777: options
778: .addOption(new Option("s", "strict", false,
779: "Strict mode. Fails parse if incorrectly formatted file."));
780: options.addOption(new Option("f", "format", true,
781: "Output options: 'cdx', cdxfile', 'dump', 'gzipdump',"
782: + "'or 'nohead'. Default: 'cdx'."));
783: return options;
784: }
785: }
|