001: /* $Id: ExperimentalWARCWriterProcessor.java 4935 2007-02-23 00:27:24Z gojomo $
002: *
003: * Created on August 1st, 2006.
004: *
005: * Copyright (C) 2006 Internet Archive.
006: *
007: * This file is part of the Heritrix web crawler (crawler.archive.org).
008: *
009: * Heritrix is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU Lesser Public License as published by
011: * the Free Software Foundation; either version 2.1 of the License, or
012: * any later version.
013: *
014: * Heritrix is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
017: * GNU Lesser Public License for more details.
018: *
019: * You should have received a copy of the GNU Lesser Public License
020: * along with Heritrix; if not, write to the Free Software
021: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
022: */
023: package org.archive.crawler.writer;
024:
025: import java.io.ByteArrayInputStream;
026: import java.io.IOException;
027: import java.net.URI;
028: import java.net.URISyntaxException;
029: import java.util.Collection;
030: import java.util.HashMap;
031: import java.util.Map;
032: import java.util.concurrent.atomic.AtomicInteger;
033: import java.util.logging.Level;
034: import java.util.logging.Logger;
035:
036: import org.apache.commons.httpclient.Header;
037: import org.apache.commons.httpclient.HttpMethodBase;
038: import org.apache.commons.httpclient.HttpStatus;
039: import org.archive.crawler.datamodel.CoreAttributeConstants;
040: import org.archive.crawler.datamodel.CrawlURI;
041: import org.archive.crawler.datamodel.FetchStatusCodes;
042: import org.archive.crawler.deciderules.recrawl.IdenticalDigestDecideRule;
043: import org.archive.crawler.event.CrawlStatusListener;
044: import org.archive.crawler.extractor.Link;
045: import org.archive.crawler.framework.WriterPoolProcessor;
046: import org.archive.crawler.settings.SimpleType;
047: import org.archive.crawler.settings.Type;
048: import org.archive.io.ReplayInputStream;
049: import org.archive.io.WriterPoolMember;
050: import org.archive.io.WriterPoolSettings;
051: import org.archive.io.warc.ExperimentalWARCWriter;
052: import org.archive.io.warc.WARCConstants;
053: import org.archive.io.warc.WARCWriterPool;
054: import org.archive.uid.GeneratorFactory;
055: import org.archive.util.ArchiveUtils;
056: import org.archive.util.anvl.ANVLRecord;
057:
058: /**
059: * Experimental WARCWriterProcessor.
060: * Goes against the pending release of 0.12 of the WARC specification, the
061: * "Marcel Marceau" release. See <a href="https://archive-access.svn.sourceforge.net/svnroot/archive-access/branches/gjm_warc_0_12/warc/warc_file_format.html">latest revision</a>
062: * for current state. The 0.10 WARC implemenation has been moved to
063: * {@link ExperimentalV10WARCWriterProcessor}.
064: *
065: * <p>TODO: Remove ANVLRecord. Rename NameValue or use RFC822
066: * (commons-httpclient?) or find something else.
067: *
068: * @author stack
069: */
070: public class ExperimentalWARCWriterProcessor extends
071: WriterPoolProcessor implements CoreAttributeConstants,
072: CrawlStatusListener, WriterPoolSettings, FetchStatusCodes,
073: WARCConstants {
074: private static final long serialVersionUID = 6182850087635847443L;
075:
076: private final Logger logger = Logger.getLogger(this .getClass()
077: .getName());
078:
079: /**
080: * Key for whether to write 'request' type records where possible
081: */
082: public static final String ATTR_WRITE_REQUESTS = "write-requests";
083:
084: /**
085: * Key for whether to write 'metadata' type records where possible
086: */
087: public static final String ATTR_WRITE_METADATA = "write-metadata";
088:
089: /**
090: * Key for whether to write 'revisit' type records when
091: * consecutive identical digest
092: */
093: public static final String ATTR_WRITE_REVISIT_FOR_IDENTICAL_DIGESTS = "write-revisit-for-identical-digests";
094:
095: /**
096: * Key for whether to write 'revisit' type records for server
097: * "304 not modified" responses
098: */
099: public static final String ATTR_WRITE_REVISIT_FOR_NOT_MODIFIED = "write-revisit-for-not-modified";
100:
101: /**
102: * Default path list.
103: */
104: private static final String[] DEFAULT_PATH = { "warcs" };
105:
106: protected String[] getDefaultPath() {
107: return DEFAULT_PATH;
108: }
109:
110: /**
111: * @param name Name of this writer.
112: */
113: public ExperimentalWARCWriterProcessor(final String name) {
114: super (name, "Experimental WARCWriter processor (Version 0.12)");
115: Type e = addElementToDefinition(new SimpleType(
116: ATTR_WRITE_REQUESTS,
117: "Whether to write 'request' type records. "
118: + "Default is true.", new Boolean(true)));
119: e.setOverrideable(true);
120: e.setExpertSetting(true);
121: e = addElementToDefinition(new SimpleType(ATTR_WRITE_METADATA,
122: "Whether to write 'metadata' type records. "
123: + "Default is true.", new Boolean(true)));
124: e.setOverrideable(true);
125: e.setExpertSetting(true);
126: e = addElementToDefinition(new SimpleType(
127: ATTR_WRITE_REVISIT_FOR_IDENTICAL_DIGESTS,
128: "Whether to write 'revisit' type records when a URI's "
129: + "history indicates the previous fetch had an identical "
130: + "content digest. " + "Default is true.",
131: new Boolean(true)));
132: e.setOverrideable(true);
133: e.setExpertSetting(true);
134: e = addElementToDefinition(new SimpleType(
135: ATTR_WRITE_REVISIT_FOR_NOT_MODIFIED,
136: "Whether to write 'revisit' type records when a "
137: + "304-Not Modified response is received. "
138: + "Default is true.", new Boolean(true)));
139: e.setOverrideable(true);
140: e.setExpertSetting(true);
141: }
142:
143: protected void setupPool(final AtomicInteger serialNo) {
144: setPool(new WARCWriterPool(serialNo, this ,
145: getPoolMaximumActive(), getPoolMaximumWait()));
146: }
147:
148: /**
149: * Writes a CrawlURI and its associated data to store file.
150: *
151: * Currently this method understands the following uri types: dns, http, and
152: * https.
153: *
154: * @param curi CrawlURI to process.
155: *
156: */
157: protected void innerProcess(CrawlURI curi) {
158: // If failure, or we haven't fetched the resource yet, return
159: if (curi.getFetchStatus() <= 0) {
160: return;
161: }
162:
163: // If no recorded content at all, don't write record.
164: long recordLength = curi.getContentSize();
165: if (recordLength <= 0) {
166: // getContentSize() should be > 0 if any material (even just
167: // HTTP headers with zero-length body) is available.
168: return;
169: }
170:
171: String scheme = curi.getUURI().getScheme().toLowerCase();
172: try {
173: if (shouldWrite(curi)) {
174: write(scheme, curi);
175: } else {
176: logger.info("This writer does not write out scheme "
177: + scheme + " content");
178: }
179: } catch (IOException e) {
180: curi.addLocalizedError(this .getName(), e, "WriteRecord: "
181: + curi.toString());
182: logger.log(Level.SEVERE, "Failed write of Record: "
183: + curi.toString(), e);
184: }
185: }
186:
187: protected void write(final String lowerCaseScheme,
188: final CrawlURI curi) throws IOException {
189: WriterPoolMember writer = getPool().borrowFile();
190: long position = writer.getPosition();
191: // See if we need to open a new file because we've exceeed maxBytes.
192: // Call to checkFileSize will open new file if we're at maximum for
193: // current file.
194: writer.checkSize();
195: if (writer.getPosition() != position) {
196: // We just closed the file because it was larger than maxBytes.
197: // Add to the totalBytesWritten the size of the first record
198: // in the file, if any.
199: setTotalBytesWritten(getTotalBytesWritten()
200: + (writer.getPosition() - position));
201: position = writer.getPosition();
202: }
203:
204: ExperimentalWARCWriter w = (ExperimentalWARCWriter) writer;
205: try {
206: // Write a request, response, and metadata all in the one
207: // 'transaction'.
208: final URI baseid = getRecordID();
209: final String timestamp = ArchiveUtils.getLog14Date(curi
210: .getLong(A_FETCH_BEGAN_TIME));
211: if (lowerCaseScheme.startsWith("http")) {
212: // Add named fields for ip, checksum, and relate the metadata
213: // and request to the resource field.
214: // TODO: Use other than ANVL (or rename ANVL as NameValue or
215: // use RFC822 (commons-httpclient?).
216: ANVLRecord headers = new ANVLRecord(5);
217: if (curi.getContentDigest() != null) {
218: headers.addLabelValue(HEADER_KEY_CHECKSUM, curi
219: .getContentDigestSchemeString());
220: }
221: headers.addLabelValue(HEADER_KEY_IP,
222: getHostAddress(curi));
223: URI rid;
224:
225: if (IdenticalDigestDecideRule.hasIdenticalDigest(curi)
226: && ((Boolean) getUncheckedAttribute(curi,
227: ATTR_WRITE_REVISIT_FOR_IDENTICAL_DIGESTS))) {
228: rid = writeRevisitDigest(w, timestamp,
229: HTTP_RESPONSE_MIMETYPE, baseid, curi,
230: headers);
231: } else if (curi.getFetchStatus() == HttpStatus.SC_NOT_MODIFIED
232: && ((Boolean) getUncheckedAttribute(curi,
233: ATTR_WRITE_REVISIT_FOR_NOT_MODIFIED))) {
234: rid = writeRevisitNotModified(w, timestamp, baseid,
235: curi, headers);
236: } else {
237: if (curi.isTruncatedFetch()) {
238: String value = curi.isTimeTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_TIME
239: : curi.isLengthTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_LEN
240: : curi.isHeaderTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_HEAD
241: :
242: // TODO: Add this to spec.
243: TRUNCATED_VALUE_UNSPECIFIED;
244: headers.addLabelValue(HEADER_KEY_TRUNCATED,
245: value);
246: }
247: rid = writeResponse(w, timestamp,
248: HTTP_RESPONSE_MIMETYPE, baseid, curi,
249: headers);
250: }
251:
252: headers = new ANVLRecord(1);
253: headers.addLabelValue(HEADER_KEY_CONCURRENT_TO,
254: '<' + rid.toString() + '>');
255:
256: if (((Boolean) getUncheckedAttribute(curi,
257: ATTR_WRITE_REQUESTS))) {
258: writeRequest(w, timestamp, HTTP_REQUEST_MIMETYPE,
259: baseid, curi, headers);
260: }
261: if (((Boolean) getUncheckedAttribute(curi,
262: ATTR_WRITE_METADATA))) {
263: writeMetadata(w, timestamp, baseid, curi, headers);
264: }
265: } else if (lowerCaseScheme.equals("dns")) {
266: ANVLRecord headers = null;
267: String ip = curi.getString(A_DNS_SERVER_IP_LABEL);
268: if (ip != null && ip.length() > 0) {
269: headers = new ANVLRecord(1);
270: headers.addLabelValue(HEADER_KEY_IP, ip);
271: }
272: writeResponse(w, timestamp, curi.getContentType(),
273: baseid, curi, headers);
274: } else {
275: logger.warning("No handler for scheme "
276: + lowerCaseScheme);
277: }
278: } catch (IOException e) {
279: // Invalidate this file (It gets a '.invalid' suffix).
280: getPool().invalidateFile(writer);
281: // Set the writer to null otherwise the pool accounting
282: // of how many active writers gets skewed if we subsequently
283: // do a returnWriter call on this object in the finally block.
284: writer = null;
285: throw e;
286: } finally {
287: if (writer != null) {
288: setTotalBytesWritten(getTotalBytesWritten()
289: + (writer.getPosition() - position));
290: getPool().returnFile(writer);
291: }
292: }
293: checkBytesWritten();
294: }
295:
296: protected URI writeRequest(final ExperimentalWARCWriter w,
297: final String timestamp, final String mimetype,
298: final URI baseid, final CrawlURI curi,
299: final ANVLRecord namedFields) throws IOException {
300: final URI uid = qualifyRecordID(baseid, TYPE, REQUEST);
301: ReplayInputStream ris = curi.getHttpRecorder()
302: .getRecordedOutput().getReplayInputStream();
303: try {
304: w.writeRequestRecord(curi.toString(), timestamp, mimetype,
305: uid, namedFields, ris, curi.getHttpRecorder()
306: .getRecordedOutput().getSize());
307: } finally {
308: if (ris != null) {
309: ris.close();
310: }
311: }
312: return uid;
313: }
314:
315: protected URI writeResponse(final ExperimentalWARCWriter w,
316: final String timestamp, final String mimetype,
317: final URI baseid, final CrawlURI curi,
318: final ANVLRecord namedFields) throws IOException {
319: ReplayInputStream ris = curi.getHttpRecorder()
320: .getRecordedInput().getReplayInputStream();
321: try {
322: w.writeResponseRecord(curi.toString(), timestamp, mimetype,
323: baseid, namedFields, ris, curi.getHttpRecorder()
324: .getRecordedInput().getSize());
325: } finally {
326: if (ris != null) {
327: ris.close();
328: }
329: }
330: return baseid;
331: }
332:
333: protected URI writeRevisitDigest(final ExperimentalWARCWriter w,
334: final String timestamp, final String mimetype,
335: final URI baseid, final CrawlURI curi,
336: final ANVLRecord namedFields) throws IOException {
337: long revisedLength = curi.getHttpRecorder().getRecordedInput()
338: .getContentBegin();
339: revisedLength = revisedLength > 0 ? revisedLength : curi
340: .getHttpRecorder().getRecordedInput().getSize();
341: namedFields.addLabelValue(HEADER_KEY_PROFILE,
342: PROFILE_REVISIT_IDENTICAL_DIGEST);
343: namedFields.addLabelValue(HEADER_KEY_TRUNCATED,
344: NAMED_FIELD_TRUNCATED_VALUE_LEN);
345: ReplayInputStream ris = curi.getHttpRecorder()
346: .getRecordedInput().getReplayInputStream();
347: try {
348: w.writeRevisitRecord(curi.toString(), timestamp, mimetype,
349: baseid, namedFields, ris, revisedLength);
350: } finally {
351: if (ris != null) {
352: ris.close();
353: }
354: }
355: return baseid;
356: }
357:
358: protected URI writeRevisitNotModified(
359: final ExperimentalWARCWriter w, final String timestamp,
360: final URI baseid, final CrawlURI curi,
361: final ANVLRecord namedFields) throws IOException {
362: namedFields.addLabelValue(HEADER_KEY_PROFILE,
363: PROFILE_REVISIT_NOT_MODIFIED);
364: // save just enough context to understand basis of not-modified
365: if (curi.containsKey(A_HTTP_TRANSACTION)) {
366: HttpMethodBase method = (HttpMethodBase) curi
367: .getObject(A_HTTP_TRANSACTION);
368: saveHeader(A_ETAG_HEADER, method, namedFields,
369: HEADER_KEY_ETAG);
370: saveHeader(A_LAST_MODIFIED_HEADER, method, namedFields,
371: HEADER_KEY_LAST_MODIFIED);
372: }
373: // truncate to zero-length (all necessary info is above)
374: namedFields.addLabelValue(HEADER_KEY_TRUNCATED,
375: NAMED_FIELD_TRUNCATED_VALUE_LEN);
376: ReplayInputStream ris = curi.getHttpRecorder()
377: .getRecordedInput().getReplayInputStream();
378: try {
379: w.writeRevisitRecord(curi.toString(), timestamp, null,
380: baseid, namedFields, ris, 0);
381: } finally {
382: if (ris != null) {
383: ris.close();
384: }
385: }
386: return baseid;
387: }
388:
389: /**
390: * Save a header from the given HTTP operation into the
391: * provider headers under a new name
392: *
393: * @param origName header name to get if present
394: * @param method http operation containing headers
395: */
396: protected void saveHeader(String origName, HttpMethodBase method,
397: ANVLRecord headers, String newName) {
398: Header header = method.getResponseHeader(origName);
399: if (header != null) {
400: headers.addLabelValue(newName, header.getValue());
401: }
402: }
403:
404: protected URI writeMetadata(final ExperimentalWARCWriter w,
405: final String timestamp, final URI baseid,
406: final CrawlURI curi, final ANVLRecord namedFields)
407: throws IOException {
408: final URI uid = qualifyRecordID(baseid, TYPE, METADATA);
409: // Get some metadata from the curi.
410: // TODO: Get all curi metadata.
411: // TODO: Use other than ANVL (or rename ANVL as NameValue or use
412: // RFC822 (commons-httpclient?).
413: ANVLRecord r = new ANVLRecord();
414: if (curi.isSeed()) {
415: r.addLabel("seed");
416: } else {
417: if (curi.forceFetch()) {
418: r.addLabel("force-fetch");
419: }
420: r.addLabelValue("via", curi.flattenVia());
421: r.addLabelValue("pathFromSeed", curi.getPathFromSeed());
422: if (curi.containsKey(A_SOURCE_TAG)) {
423: r.addLabelValue("sourceTag", curi
424: .getString(A_SOURCE_TAG));
425: }
426: }
427:
428: // Add outlinks though they are effectively useless without anchor text.
429: Collection<Link> links = curi.getOutLinks();
430: if (links != null && links.size() > 0) {
431: for (Link link : links) {
432: r.addLabelValue("outlink", link.toString());
433: }
434: }
435:
436: // TODO: Other curi fields to write to metadata.
437: //
438: // Credentials
439: //
440: // fetch-began-time: 1154569278774
441: // fetch-completed-time: 1154569281816
442: //
443: // Annotations.
444:
445: byte[] b = r.getUTF8Bytes();
446: w.writeMetadataRecord(curi.toString(), timestamp,
447: ANVLRecord.MIMETYPE, uid, namedFields,
448: new ByteArrayInputStream(b), b.length);
449: return uid;
450: }
451:
452: protected URI getRecordID() throws IOException {
453: URI result;
454: try {
455: result = GeneratorFactory.getFactory().getRecordID();
456: } catch (URISyntaxException e) {
457: throw new IOException(e.toString());
458: }
459: return result;
460: }
461:
462: protected URI qualifyRecordID(final URI base, final String key,
463: final String value) throws IOException {
464: URI result;
465: Map<String, String> qualifiers = new HashMap<String, String>(1);
466: qualifiers.put(key, value);
467: try {
468: result = GeneratorFactory.getFactory().qualifyRecordID(
469: base, qualifiers);
470: } catch (URISyntaxException e) {
471: throw new IOException(e.toString());
472: }
473: return result;
474: }
475:
476: @Override
477: protected String getFirstrecordStylesheet() {
478: return "/warcinfobody.xsl";
479: }
480: }
|