001: /* $Id: ExperimentalWARCWriterProcessor.java 4935 2007-02-23 00:27:24Z gojomo $
002: *
003: * Created on August 1st, 2006.
004: *
005: * Copyright (C) 2006 Internet Archive.
006: *
007: * This file is part of the Heritrix web crawler (crawler.archive.org).
008: *
009: * Heritrix is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU Lesser Public License as published by
011: * the Free Software Foundation; either version 2.1 of the License, or
012: * any later version.
013: *
014: * Heritrix is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
017: * GNU Lesser Public License for more details.
018: *
019: * You should have received a copy of the GNU Lesser Public License
020: * along with Heritrix; if not, write to the Free Software
021: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
022: */
023: package org.archive.crawler.writer;
024:
025: import java.io.ByteArrayInputStream;
026: import java.io.IOException;
027: import java.net.URI;
028: import java.net.URISyntaxException;
029: import java.util.Collection;
030: import java.util.HashMap;
031: import java.util.List;
032: import java.util.Map;
033: import java.util.concurrent.atomic.AtomicInteger;
034: import java.util.logging.Level;
035: import java.util.logging.Logger;
036:
037: import org.archive.crawler.datamodel.CoreAttributeConstants;
038: import org.archive.crawler.datamodel.CrawlURI;
039: import org.archive.crawler.datamodel.FetchStatusCodes;
040: import org.archive.crawler.event.CrawlStatusListener;
041: import org.archive.crawler.extractor.Link;
042: import org.archive.crawler.framework.WriterPoolProcessor;
043: import org.archive.io.ReplayInputStream;
044: import org.archive.io.WriterPoolMember;
045: import org.archive.io.WriterPoolSettings;
046: import org.archive.io.warc.WARCConstants;
047: import org.archive.io.warc.v10.ExperimentalWARCWriter;
048: import org.archive.io.warc.v10.WARCWriterPool;
049: import org.archive.uid.GeneratorFactory;
050: import org.archive.util.ArchiveUtils;
051: import org.archive.util.anvl.ANVLRecord;
052:
053: /**
054: * Experimental WARCWriterProcessor.
055: * Implements 0.10 version of the WARC Specification since superceded by
056: * version 0.12.
057: *
058: * @author stack
059: * @deprecated See {@link ExperimentalWARCWriter}
060: */
061: public class ExperimentalV10WARCWriterProcessor extends
062: WriterPoolProcessor implements CoreAttributeConstants,
063: CrawlStatusListener, WriterPoolSettings, FetchStatusCodes,
064: WARCConstants {
065:
066: private static final long serialVersionUID = 188656957531675821L;
067:
068: private final Logger logger = Logger.getLogger(this .getClass()
069: .getName());
070:
071: /**
072: * Default path list.
073: */
074: private static final String[] DEFAULT_PATH = { "warcs" };
075:
076: protected String[] getDefaultPath() {
077: return DEFAULT_PATH;
078: }
079:
080: /**
081: * @param name Name of this writer.
082: */
083: public ExperimentalV10WARCWriterProcessor(String name) {
084: super (name, "Experimental WARCWriter processor");
085: }
086:
087: protected void setupPool(final AtomicInteger serialNo) {
088: setPool(new WARCWriterPool(serialNo, this ,
089: getPoolMaximumActive(), getPoolMaximumWait()));
090: }
091:
092: /**
093: * Writes a CrawlURI and its associated data to store file.
094: *
095: * Currently this method understands the following uri types: dns, http, and
096: * https.
097: *
098: * @param curi
099: * CrawlURI to process.
100: *
101: */
102: protected void innerProcess(CrawlURI curi) {
103: // If failure, or we haven't fetched the resource yet, return
104: if (curi.getFetchStatus() <= 0) {
105: return;
106: }
107:
108: // If no recorded content at all, don't write record.
109: long recordLength = curi.getContentSize();
110: if (recordLength <= 0) {
111: // getContentSize() should be > 0 if any material (even just
112: // HTTP headers with zero-length body) is available.
113: return;
114: }
115:
116: String scheme = curi.getUURI().getScheme().toLowerCase();
117: try {
118: if (shouldWrite(curi)) {
119: write(scheme, curi);
120: } else {
121: logger.info("This writer does not write out scheme "
122: + scheme + " content");
123: }
124: } catch (IOException e) {
125: curi.addLocalizedError(this .getName(), e, "WriteRecord: "
126: + curi.toString());
127: logger.log(Level.SEVERE, "Failed write of Record: "
128: + curi.toString(), e);
129: }
130: }
131:
132: protected void write(final String lowerCaseScheme,
133: final CrawlURI curi) throws IOException {
134: WriterPoolMember writer = getPool().borrowFile();
135: long position = writer.getPosition();
136: // See if we need to open a new file because we've exceeed maxBytes.
137: // Call to checkFileSize will open new file if we're at maximum for
138: // current file.
139: writer.checkSize();
140: if (writer.getPosition() != position) {
141: // We just closed the file because it was larger than maxBytes.
142: // Add to the totalBytesWritten the size of the first record
143: // in the file, if any.
144: setTotalBytesWritten(getTotalBytesWritten()
145: + (writer.getPosition() - position));
146: position = writer.getPosition();
147: }
148:
149: ExperimentalWARCWriter w = (ExperimentalWARCWriter) writer;
150: try {
151: // Write a request, response, and metadata all in the one
152: // 'transaction'.
153: final URI baseid = getRecordID();
154: final String timestamp = ArchiveUtils.get14DigitDate(curi
155: .getLong(A_FETCH_BEGAN_TIME));
156: if (lowerCaseScheme.startsWith("http")) {
157: // Add named fields for ip, checksum, and relate the metadata
158: // and request to the resource field.
159: ANVLRecord r = new ANVLRecord();
160: if (curi.getContentDigest() != null) {
161: // TODO: This is digest for content -- doesn't include
162: // response headers.
163: r.addLabelValue(NAMED_FIELD_CHECKSUM_LABEL, curi
164: .getContentDigestSchemeString());
165: }
166: r.addLabelValue(NAMED_FIELD_IP_LABEL,
167: getHostAddress(curi));
168: URI rid = writeResponse(w, timestamp,
169: HTTP_RESPONSE_MIMETYPE, baseid, curi, r);
170: r = new ANVLRecord(1);
171: r.addLabelValue(NAMED_FIELD_RELATED_LABEL, rid
172: .toString());
173: writeRequest(w, timestamp, HTTP_REQUEST_MIMETYPE,
174: baseid, curi, r);
175: writeMetadata(w, timestamp, baseid, curi, r);
176: } else if (lowerCaseScheme.equals("dns")) {
177: String ip = curi.getString(A_DNS_SERVER_IP_LABEL);
178: ANVLRecord r = null;
179: if (ip != null && ip.length() > 0) {
180: r = new ANVLRecord();
181: r.addLabelValue(NAMED_FIELD_IP_LABEL, ip);
182: }
183: writeResponse(w, timestamp, curi.getContentType(),
184: baseid, curi, r);
185: } else {
186: logger.warning("No handler for scheme "
187: + lowerCaseScheme);
188: }
189: } catch (IOException e) {
190: // Invalidate this file (It gets a '.invalid' suffix).
191: getPool().invalidateFile(writer);
192: // Set the writer to null otherwise the pool accounting
193: // of how many active writers gets skewed if we subsequently
194: // do a returnWriter call on this object in the finally block.
195: writer = null;
196: throw e;
197: } finally {
198: if (writer != null) {
199: setTotalBytesWritten(getTotalBytesWritten()
200: + (writer.getPosition() - position));
201: getPool().returnFile(writer);
202: }
203: }
204: checkBytesWritten();
205: }
206:
207: protected URI writeRequest(final ExperimentalWARCWriter w,
208: final String timestamp, final String mimetype,
209: final URI baseid, final CrawlURI curi,
210: final ANVLRecord namedFields) throws IOException {
211: final URI uid = qualifyRecordID(baseid, TYPE, REQUEST);
212: ReplayInputStream ris = curi.getHttpRecorder()
213: .getRecordedOutput().getReplayInputStream();
214: try {
215: w.writeRequestRecord(curi.toString(), timestamp, mimetype,
216: uid, namedFields, ris, curi.getHttpRecorder()
217: .getRecordedOutput().getSize());
218: } finally {
219: if (ris != null) {
220: ris.close();
221: }
222: }
223: return uid;
224: }
225:
226: protected URI writeResponse(final ExperimentalWARCWriter w,
227: final String timestamp, final String mimetype,
228: final URI baseid, final CrawlURI curi,
229: final ANVLRecord namedFields) throws IOException {
230: ReplayInputStream ris = curi.getHttpRecorder()
231: .getRecordedInput().getReplayInputStream();
232: try {
233: w.writeResponseRecord(curi.toString(), timestamp, mimetype,
234: baseid, namedFields, ris, curi.getHttpRecorder()
235: .getRecordedInput().getSize());
236: } finally {
237: if (ris != null) {
238: ris.close();
239: }
240: }
241: return baseid;
242: }
243:
244: protected URI writeMetadata(final ExperimentalWARCWriter w,
245: final String timestamp, final URI baseid,
246: final CrawlURI curi, final ANVLRecord namedFields)
247: throws IOException {
248: final URI uid = qualifyRecordID(baseid, TYPE, METADATA);
249: // Get some metadata from the curi.
250: // TODO: Get all curi metadata.
251: ANVLRecord r = new ANVLRecord();
252: if (curi.isSeed()) {
253: r.addLabel("seed");
254: } else {
255: if (curi.forceFetch()) {
256: r.addLabel("force-fetch");
257: }
258: r.addLabelValue("via", curi.flattenVia());
259: r.addLabelValue("pathFromSeed", curi.getPathFromSeed());
260: }
261: Collection<Link> links = curi.getOutLinks();
262: if (links != null && links.size() > 0) {
263: for (Link link : links) {
264: r.addLabelValue("outlink", link.toString());
265: }
266: }
267: if (curi.isTruncatedFetch()) {
268: String value = curi.isTimeTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_TIME
269: : curi.isLengthTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_LEN
270: : curi.isHeaderTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_HEAD
271: : NAMED_FIELD_TRUNCATED_VALUE_UNSPECIFIED;
272:
273: r.addLabelValue(NAMED_FIELD_TRUNCATED, value);
274: }
275:
276: // TODO: Other curi fields to write to metadata.
277: //
278: // Credentials
279: //
280: // fetch-began-time: 1154569278774
281: // fetch-completed-time: 1154569281816
282: //
283: // Annotations.
284:
285: byte[] b = r.getUTF8Bytes();
286: w.writeMetadataRecord(curi.toString(), timestamp,
287: ANVLRecord.MIMETYPE, uid, namedFields,
288: new ByteArrayInputStream(b), b.length);
289: return uid;
290: }
291:
292: protected URI getRecordID() throws IOException {
293: URI result;
294: try {
295: result = GeneratorFactory.getFactory().getRecordID();
296: } catch (URISyntaxException e) {
297: throw new IOException(e.toString());
298: }
299: return result;
300: }
301:
302: protected URI qualifyRecordID(final URI base, final String key,
303: final String value) throws IOException {
304: URI result;
305: Map<String, String> qualifiers = new HashMap<String, String>(1);
306: qualifiers.put(key, value);
307: try {
308: result = GeneratorFactory.getFactory().qualifyRecordID(
309: base, qualifiers);
310: } catch (URISyntaxException e) {
311: throw new IOException(e.toString());
312: }
313: return result;
314: }
315:
316: public List getMetadata() {
317: // TODO: As ANVL?
318: return null;
319: }
320: }
|