001: /* RecoveryJournal
002: *
003: * $Id: RecoveryJournal.java 4969 2007-03-08 18:57:41Z gojomo $
004: *
005: * Created on Jul 20, 2004
006: *
007: * Copyright (C) 2004 Internet Archive.
008: *
009: * This file is part of the Heritrix web crawler (crawler.archive.org).
010: *
011: * Heritrix is free software; you can redistribute it and/or modify
012: * it under the terms of the GNU Lesser Public License as published by
013: * the Free Software Foundation; either version 2.1 of the License, or
014: * any later version.
015: *
016: * Heritrix is distributed in the hope that it will be useful,
017: * but WITHOUT ANY WARRANTY; without even the implied warranty of
018: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
019: * GNU Lesser Public License for more details.
020: *
021: * You should have received a copy of the GNU Lesser Public License
022: * along with Heritrix; if not, write to the Free Software
023: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
024: */
025: package org.archive.crawler.frontier;
026:
027: import it.unimi.dsi.mg4j.util.MutableString;
028:
029: import java.io.BufferedInputStream;
030: import java.io.EOFException;
031: import java.io.File;
032: import java.io.IOException;
033: import java.util.ArrayList;
034: import java.util.logging.Logger;
035:
036: import org.apache.commons.httpclient.URIException;
037: import org.archive.crawler.datamodel.CandidateURI;
038: import org.archive.crawler.datamodel.CrawlURI;
039: import org.archive.crawler.framework.Frontier;
040: import org.archive.crawler.io.CrawlerJournal;
041: import org.archive.net.UURI;
042: import org.archive.net.UURIFactory;
043:
044: import java.util.concurrent.CountDownLatch;
045:
046: /**
047: * Helper class for managing a simple Frontier change-events journal which is
048: * useful for recovering from crawl problems.
049: *
050: * By replaying the journal into a new Frontier, its state (at least with
051: * respect to URIs alreadyIncluded and in pending queues) will match that of the
052: * original Frontier, allowing a pseudo-resume of a previous crawl, at least as
053: * far as URI visitation/coverage is concerned.
054: *
055: * @author gojomo
056: */
057: public class RecoveryJournal extends CrawlerJournal implements
058: FrontierJournal {
059: private static final Logger LOGGER = Logger
060: .getLogger(RecoveryJournal.class.getName());
061:
062: public final static String F_ADD = "F+ ";
063: public final static String F_EMIT = "Fe ";
064: public final static String F_RESCHEDULE = "Fr ";
065: public final static String F_SUCCESS = "Fs ";
066: public final static String F_FAILURE = "Ff ";
067:
068: // show recovery progress every this many lines
069: private static final int PROGRESS_INTERVAL = 1000000;
070:
071: // once this many URIs are queued during recovery, allow
072: // crawl to begin, while enqueuing of other URIs from log
073: // continues in background
074: private static final long ENOUGH_TO_START_CRAWLING = 100000;
075:
076: /**
077: * Create a new recovery journal at the given location
078: *
079: * @param path Directory to make the recovery journal in.
080: * @param filename Name to use for recovery journal file.
081: * @throws IOException
082: */
083: public RecoveryJournal(String path, String filename)
084: throws IOException {
085: super (path, filename);
086: timestamp_interval = 10000; // write timestamp lines occasionally
087: }
088:
089: public synchronized void added(CrawlURI curi) {
090: accumulatingBuffer.length(0);
091: this .accumulatingBuffer.append(F_ADD).append(curi.toString())
092: .append(" ").append(curi.getPathFromSeed()).append(" ")
093: .append(curi.flattenVia());
094: writeLine(accumulatingBuffer);
095: }
096:
097: public void finishedSuccess(CrawlURI curi) {
098: finishedSuccess(curi.toString());
099: }
100:
101: public void finishedSuccess(UURI uuri) {
102: finishedSuccess(uuri.toString());
103: }
104:
105: protected void finishedSuccess(String uuri) {
106: writeLine(F_SUCCESS, uuri);
107: }
108:
109: public void emitted(CrawlURI curi) {
110: writeLine(F_EMIT, curi.toString());
111:
112: }
113:
114: public void finishedFailure(CrawlURI curi) {
115: finishedFailure(curi.toString());
116: }
117:
118: public void finishedFailure(UURI uuri) {
119: finishedFailure(uuri.toString());
120: }
121:
122: public void finishedFailure(String u) {
123: writeLine(F_FAILURE, u);
124: }
125:
126: public void rescheduled(CrawlURI curi) {
127: writeLine(F_RESCHEDULE, curi.toString());
128: }
129:
130: /**
131: * Utility method for scanning a recovery journal and applying it to
132: * a Frontier.
133: *
134: * @param source Recover log path.
135: * @param frontier Frontier reference.
136: * @param retainFailures
137: * @throws IOException
138: *
139: * @see org.archive.crawler.framework.Frontier#importRecoverLog(String, boolean)
140: */
141: public static void importRecoverLog(final File source,
142: final Frontier frontier, final boolean retainFailures)
143: throws IOException {
144: if (source == null) {
145: throw new IllegalArgumentException(
146: "Passed source file is null.");
147: }
148: LOGGER.info("recovering frontier completion state from "
149: + source);
150:
151: // first, fill alreadyIncluded with successes (and possibly failures),
152: // and count the total lines
153: final int lines = importCompletionInfoFromLog(source, frontier,
154: retainFailures);
155:
156: LOGGER
157: .info("finished completion state; recovering queues from "
158: + source);
159:
160: // now, re-add anything that was in old frontier and not already
161: // registered as finished. Do this in a separate thread that signals
162: // this thread once ENOUGH_TO_START_CRAWLING URIs have been queued.
163: final CountDownLatch recoveredEnough = new CountDownLatch(1);
164: new Thread(new Runnable() {
165: public void run() {
166: importQueuesFromLog(source, frontier, lines,
167: recoveredEnough);
168: }
169: }, "queuesRecoveryThread").start();
170:
171: try {
172: // wait until at least ENOUGH_TO_START_CRAWLING URIs queued
173: recoveredEnough.await();
174: } catch (InterruptedException e) {
175: // TODO Auto-generated catch block
176: e.printStackTrace();
177: }
178: }
179:
180: /**
181: * Import just the SUCCESS (and possibly FAILURE) URIs from the given
182: * recovery log into the frontier as considered included.
183: *
184: * @param source recovery log file to use
185: * @param frontier frontier to update
186: * @param retainFailures whether failure ('Ff') URIs should count as done
187: * @return number of lines in recovery log (for reference)
188: * @throws IOException
189: */
190: private static int importCompletionInfoFromLog(File source,
191: Frontier frontier, boolean retainFailures)
192: throws IOException {
193: // Scan log for all 'Fs' lines: add as 'alreadyIncluded'
194: BufferedInputStream is = getBufferedInput(source);
195: // create MutableString of good starting size (will grow if necessary)
196: MutableString read = new MutableString(UURI.MAX_URL_LENGTH);
197: int lines = 0;
198: try {
199: while (readLine(is, read)) {
200: lines++;
201: boolean wasSuccess = read.startsWith(F_SUCCESS);
202: if (wasSuccess
203: || (retainFailures && read
204: .startsWith(F_FAILURE))) {
205: // retrieve first (only) URL on line
206: String s = read.subSequence(3, read.length())
207: .toString();
208: try {
209: UURI u = UURIFactory.getInstance(s);
210: frontier.considerIncluded(u);
211: if (wasSuccess) {
212: if (frontier.getFrontierJournal() != null) {
213: frontier.getFrontierJournal()
214: .finishedSuccess(u);
215: }
216: } else {
217: // carryforward failure, in case future recovery
218: // wants to no retain them as finished
219: if (frontier.getFrontierJournal() != null) {
220: frontier.getFrontierJournal()
221: .finishedFailure(u);
222: }
223: }
224: } catch (URIException e) {
225: e.printStackTrace();
226: }
227: }
228: if ((lines % PROGRESS_INTERVAL) == 0) {
229: // every 1 million lines, print progress
230: LOGGER.info("at line " + lines
231: + " alreadyIncluded count = "
232: + frontier.discoveredUriCount());
233: }
234: }
235: } catch (EOFException e) {
236: // expected in some uncleanly-closed recovery logs; ignore
237: } finally {
238: is.close();
239: }
240: return lines;
241: }
242:
243: /**
244: * Read a line from the given bufferedinputstream into the MutableString.
245: * Return true if a line was read; false if EOF.
246: *
247: * @param is
248: * @param read
249: * @return True if we read a line.
250: * @throws IOException
251: */
252: private static boolean readLine(BufferedInputStream is,
253: MutableString read) throws IOException {
254: read.length(0);
255: int c = is.read();
256: while ((c != -1) && c != '\n' && c != '\r') {
257: read.append((char) c);
258: c = is.read();
259: }
260: if (c == -1 && read.length() == 0) {
261: // EOF and none read; return false
262: return false;
263: }
264: if (c == '\n') {
265: // consume LF following CR, if present
266: is.mark(1);
267: if (is.read() != '\r') {
268: is.reset();
269: }
270: }
271: // a line (possibly blank) was read
272: return true;
273: }
274:
275: /**
276: * Import all ADDs from given recovery log into the frontier's queues
277: * (excepting those the frontier drops as already having been included)
278: *
279: * @param source recovery log file to use
280: * @param frontier frontier to update
281: * @param lines total lines noted in recovery log earlier
282: * @param enough latch signalling 'enough' URIs queued to begin crawling
283: */
284: private static void importQueuesFromLog(File source,
285: Frontier frontier, int lines, CountDownLatch enough) {
286: BufferedInputStream is;
287: // create MutableString of good starting size (will grow if necessary)
288: MutableString read = new MutableString(UURI.MAX_URL_LENGTH);
289: long queuedAtStart = frontier.queuedUriCount();
290: long queuedDuringRecovery = 0;
291: int qLines = 0;
292:
293: try {
294: // Scan log for all 'F+' lines: if not alreadyIncluded, schedule for
295: // visitation
296: is = getBufferedInput(source);
297: try {
298: while (readLine(is, read)) {
299: qLines++;
300: if (read.startsWith(F_ADD)) {
301: UURI u;
302: CharSequence args[] = splitOnSpaceRuns(read);
303: try {
304: u = UURIFactory.getInstance(args[1]
305: .toString());
306: String pathFromSeed = (args.length > 2) ? args[2]
307: .toString()
308: : "";
309: UURI via = (args.length > 3) ? UURIFactory
310: .getInstance(args[3].toString())
311: : null;
312: String viaContext = (args.length > 4) ? args[4]
313: .toString()
314: : "";
315: CandidateURI caUri = new CandidateURI(u,
316: pathFromSeed, via, viaContext);
317: frontier.schedule(caUri);
318:
319: queuedDuringRecovery = frontier
320: .queuedUriCount()
321: - queuedAtStart;
322: if (((queuedDuringRecovery + 1) % ENOUGH_TO_START_CRAWLING) == 0) {
323: enough.countDown();
324: }
325: } catch (URIException e) {
326: e.printStackTrace();
327: }
328: }
329: if ((qLines % PROGRESS_INTERVAL) == 0) {
330: // every 1 million lines, print progress
331: LOGGER.info("through line " + qLines + "/"
332: + lines + " queued count = "
333: + frontier.queuedUriCount());
334: }
335: }
336: } catch (EOFException e) {
337: // no problem: untidy end of recovery journal
338: } finally {
339: is.close();
340: }
341: } catch (IOException e) {
342: // TODO Auto-generated catch block
343: e.printStackTrace();
344: }
345: LOGGER.info("finished recovering frontier from " + source + " "
346: + qLines + " lines processed");
347: enough.countDown();
348: }
349:
350: /**
351: * Return an array of the subsequences of the passed-in sequence,
352: * split on space runs.
353: *
354: * @param read
355: * @return CharSequence.
356: */
357: private static CharSequence[] splitOnSpaceRuns(CharSequence read) {
358: int lastStart = 0;
359: ArrayList<CharSequence> segs = new ArrayList<CharSequence>(5);
360: int i;
361: for (i = 0; i < read.length(); i++) {
362: if (read.charAt(i) == ' ') {
363: segs.add(read.subSequence(lastStart, i));
364: i++;
365: while (i < read.length() && read.charAt(i) == ' ') {
366: // skip any space runs
367: i++;
368: }
369: lastStart = i;
370: }
371: }
372: if (lastStart < read.length()) {
373: segs.add(read.subSequence(lastStart, i));
374: }
375: return (CharSequence[]) segs.toArray(new CharSequence[segs
376: .size()]);
377: }
378: }
|