0001: /* AbstractFrontier
0002: *
0003: * $Id: AbstractFrontier.java 5053 2007-04-10 02:34:20Z gojomo $
0004: *
0005: * Created on Aug 17, 2004
0006: *
0007: * Copyright (C) 2004 Internet Archive.
0008: *
0009: * This file is part of the Heritrix web crawler (crawler.archive.org).
0010: *
0011: * Heritrix is free software; you can redistribute it and/or modify
0012: * it under the terms of the GNU Lesser Public License as published by
0013: * the Free Software Foundation; either version 2.1 of the License, or
0014: * any later version.
0015: *
0016: * Heritrix is distributed in the hope that it will be useful,
0017: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0018: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
0019: * GNU Lesser Public License for more details.
0020: *
0021: * You should have received a copy of the GNU Lesser Public License
0022: * along with Heritrix; if not, write to the Free Software
0023: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0024: */
0025: package org.archive.crawler.frontier;
0026:
0027: import java.io.BufferedWriter;
0028: import java.io.File;
0029: import java.io.FileWriter;
0030: import java.io.IOException;
0031: import java.io.PrintWriter;
0032: import java.io.Serializable;
0033: import java.io.StringWriter;
0034: import java.io.Writer;
0035: import java.util.Iterator;
0036: import java.util.List;
0037: import java.util.concurrent.atomic.AtomicLong;
0038: import java.util.logging.Level;
0039: import java.util.logging.Logger;
0040: import java.util.regex.Pattern;
0041:
0042: import javax.management.AttributeNotFoundException;
0043:
0044: import org.apache.commons.httpclient.HttpStatus;
0045: import org.archive.crawler.datamodel.CandidateURI;
0046: import org.archive.crawler.datamodel.CoreAttributeConstants;
0047: import org.archive.crawler.datamodel.CrawlHost;
0048: import org.archive.crawler.datamodel.CrawlOrder;
0049: import org.archive.crawler.datamodel.CrawlServer;
0050: import org.archive.crawler.datamodel.CrawlURI;
0051: import org.archive.crawler.datamodel.FetchStatusCodes;
0052: import org.archive.crawler.event.CrawlStatusListener;
0053: import org.archive.crawler.framework.CrawlController;
0054: import org.archive.crawler.framework.Frontier;
0055: import org.archive.crawler.framework.ToeThread;
0056: import org.archive.crawler.framework.exceptions.EndedException;
0057: import org.archive.crawler.framework.exceptions.FatalConfigurationException;
0058: import org.archive.crawler.settings.ModuleType;
0059: import org.archive.crawler.settings.RegularExpressionConstraint;
0060: import org.archive.crawler.settings.SimpleType;
0061: import org.archive.crawler.settings.Type;
0062: import org.archive.crawler.url.Canonicalizer;
0063: import org.archive.net.UURI;
0064: import org.archive.util.ArchiveUtils;
0065:
0066: /**
0067: * Shared facilities for Frontier implementations.
0068: *
0069: * @author gojomo
0070: */
0071: public abstract class AbstractFrontier extends ModuleType implements
0072: CrawlStatusListener, Frontier, FetchStatusCodes,
0073: CoreAttributeConstants, Serializable {
0074: private static final Logger logger = Logger
0075: .getLogger(AbstractFrontier.class.getName());
0076:
0077: protected transient CrawlController controller;
0078:
0079: /** ordinal numbers to assign to created CrawlURIs */
0080: protected AtomicLong nextOrdinal = new AtomicLong(1);
0081:
0082: /** should the frontier hold any threads asking for URIs? */
0083: protected boolean shouldPause = false;
0084:
0085: /**
0086: * should the frontier send an EndedException to any threads asking for
0087: * URIs?
0088: */
0089: protected transient boolean shouldTerminate = false;
0090:
0091: /**
0092: * how many multiples of last fetch elapsed time to wait before recontacting
0093: * same server
0094: */
0095: public final static String ATTR_DELAY_FACTOR = "delay-factor";
0096:
0097: protected final static Float DEFAULT_DELAY_FACTOR = new Float(5);
0098:
0099: /**
0100: * always wait this long after one completion before recontacting same
0101: * server, regardless of multiple
0102: */
0103: public final static String ATTR_MIN_DELAY = "min-delay-ms";
0104:
0105: // 3 secs.
0106: protected final static Integer DEFAULT_MIN_DELAY = new Integer(3000);
0107:
0108: /** never wait more than this long, regardless of multiple */
0109: public final static String ATTR_MAX_DELAY = "max-delay-ms";
0110:
0111: // 30 secs
0112: protected final static Integer DEFAULT_MAX_DELAY = new Integer(
0113: 30000);
0114:
0115: /** number of hops of embeds (ERX) to bump to front of host queue */
0116: public final static String ATTR_PREFERENCE_EMBED_HOPS = "preference-embed-hops";
0117:
0118: protected final static Integer DEFAULT_PREFERENCE_EMBED_HOPS = new Integer(
0119: 1);
0120:
0121: /** maximum per-host bandwidth usage */
0122: public final static String ATTR_MAX_HOST_BANDWIDTH_USAGE = "max-per-host-bandwidth-usage-KB-sec";
0123:
0124: protected final static Integer DEFAULT_MAX_HOST_BANDWIDTH_USAGE = new Integer(
0125: 0);
0126:
0127: /** maximum overall bandwidth usage */
0128: public final static String ATTR_MAX_OVERALL_BANDWIDTH_USAGE = "total-bandwidth-usage-KB-sec";
0129:
0130: protected final static Integer DEFAULT_MAX_OVERALL_BANDWIDTH_USAGE = new Integer(
0131: 0);
0132:
0133: /** for retryable problems, seconds to wait before a retry */
0134: public final static String ATTR_RETRY_DELAY = "retry-delay-seconds";
0135:
0136: // 15 mins
0137: protected final static Long DEFAULT_RETRY_DELAY = new Long(900);
0138:
0139: /** maximum times to emit a CrawlURI without final disposition */
0140: public final static String ATTR_MAX_RETRIES = "max-retries";
0141:
0142: protected final static Integer DEFAULT_MAX_RETRIES = new Integer(30);
0143:
0144: public final static String ATTR_QUEUE_ASSIGNMENT_POLICY = "queue-assignment-policy";
0145:
0146: /** queue assignment to force onto CrawlURIs; intended to be overridden */
0147: public final static String ATTR_FORCE_QUEUE = "force-queue-assignment";
0148:
0149: protected final static String DEFAULT_FORCE_QUEUE = "";
0150:
0151: // word chars, dash, period, comma, colon
0152: protected final static String ACCEPTABLE_FORCE_QUEUE = "[-\\w\\.,:]*";
0153:
0154: /** whether pause, rather than finish, when crawl appears done */
0155: public final static String ATTR_PAUSE_AT_FINISH = "pause-at-finish";
0156: // TODO: change default to true once well-tested
0157: protected final static Boolean DEFAULT_PAUSE_AT_FINISH = Boolean.FALSE;
0158:
0159: /** whether to pause at crawl start */
0160: public final static String ATTR_PAUSE_AT_START = "pause-at-start";
0161: protected final static Boolean DEFAULT_PAUSE_AT_START = Boolean.FALSE;
0162:
0163: /** whether to pause at crawl start */
0164: public final static String ATTR_SOURCE_TAG_SEEDS = "source-tag-seeds";
0165: protected final static Boolean DEFAULT_SOURCE_TAG_SEEDS = Boolean.FALSE;
0166:
0167: /**
0168: * Recover log on or off attribute.
0169: */
0170: protected final static String ATTR_RECOVERY_ENABLED = "recovery-log-enabled";
0171: protected final static Boolean DEFAULT_ATTR_RECOVERY_ENABLED = Boolean.TRUE;
0172:
0173: // top-level stats
0174: protected long queuedUriCount = 0; // total URIs queued to be visited
0175:
0176: protected long succeededFetchCount = 0;
0177:
0178: protected long failedFetchCount = 0;
0179:
0180: protected long disregardedUriCount = 0; //URIs that are disregarded (for
0181: // example because of robot.txt rules)
0182:
0183: /**
0184: * Used when bandwidth constraint are used.
0185: */
0186: protected long totalProcessedBytes = 0;
0187:
0188: private transient long nextURIEmitTime = 0;
0189:
0190: protected long processedBytesAfterLastEmittedURI = 0;
0191:
0192: protected int lastMaxBandwidthKB = 0;
0193:
0194: /** Policy for assigning CrawlURIs to named queues */
0195: protected transient QueueAssignmentPolicy queueAssignmentPolicy = null;
0196:
0197: /**
0198: * Crawl replay logger.
0199: *
0200: * Currently captures Frontier/URI transitions.
0201: * Can be null if user chose not to run a recovery.log.
0202: */
0203: private transient FrontierJournal recover = null;
0204:
0205: /** file collecting report of ignored seed-file entries (if any) */
0206: public static final String IGNORED_SEEDS_FILENAME = "seeds.ignored";
0207:
0208: /**
0209: * @param name Name of this frontier.
0210: * @param description Description for this frontier.
0211: */
0212: public AbstractFrontier(String name, String description) {
0213: super (name, description);
0214: addElementToDefinition(new SimpleType(ATTR_DELAY_FACTOR,
0215: "How many multiples of last fetch elapsed time to wait before "
0216: + "recontacting same server",
0217: DEFAULT_DELAY_FACTOR));
0218: addElementToDefinition(new SimpleType(ATTR_MAX_DELAY,
0219: "Never wait more than this long.", DEFAULT_MAX_DELAY));
0220: addElementToDefinition(new SimpleType(ATTR_MIN_DELAY,
0221: "Always wait this long after one completion before recontacting "
0222: + "same server.", DEFAULT_MIN_DELAY));
0223: addElementToDefinition(new SimpleType(
0224: ATTR_MAX_RETRIES,
0225: "How often to retry fetching a URI that failed to be retrieved. "
0226: + "If zero, the crawler will get the robots.txt only.",
0227: DEFAULT_MAX_RETRIES));
0228: addElementToDefinition(new SimpleType(
0229: ATTR_RETRY_DELAY,
0230: "How long to wait by default until we retry fetching a"
0231: + " URI that failed to be retrieved (seconds). ",
0232: DEFAULT_RETRY_DELAY));
0233: addElementToDefinition(new SimpleType(
0234: ATTR_PREFERENCE_EMBED_HOPS,
0235: "Number of embedded (or redirected) hops up to which "
0236: + "a URI has higher priority scheduling. For example, if set "
0237: + "to 1 (the default), items such as inline images (1-hop "
0238: + "embedded resources) will be scheduled ahead of all regular "
0239: + "links (or many-hop resources, like nested frames). If set to "
0240: + "zero, no preferencing will occur, and embeds/redirects are "
0241: + "scheduled the same as regular links.",
0242: DEFAULT_PREFERENCE_EMBED_HOPS));
0243: Type t;
0244: t = addElementToDefinition(new SimpleType(
0245: ATTR_MAX_OVERALL_BANDWIDTH_USAGE,
0246: "The maximum average bandwidth the crawler is allowed to use. "
0247: + "The actual read speed is not affected by this setting, it only "
0248: + "holds back new URIs from being processed when the bandwidth "
0249: + "usage has been to high. 0 means no bandwidth limitation.",
0250: DEFAULT_MAX_OVERALL_BANDWIDTH_USAGE));
0251: t.setOverrideable(false);
0252: t = addElementToDefinition(new SimpleType(
0253: ATTR_MAX_HOST_BANDWIDTH_USAGE,
0254: "The maximum average bandwidth the crawler is allowed to use per "
0255: + "host. The actual read speed is not affected by this setting, "
0256: + "it only holds back new URIs from being processed when the "
0257: + "bandwidth usage has been to high. 0 means no bandwidth "
0258: + "limitation.",
0259: DEFAULT_MAX_HOST_BANDWIDTH_USAGE));
0260: t.setExpertSetting(true);
0261:
0262: // Read the list of permissible choices from heritrix.properties.
0263: // Its a list of space- or comma-separated values.
0264: String queueStr = System.getProperty(AbstractFrontier.class
0265: .getName()
0266: + "." + ATTR_QUEUE_ASSIGNMENT_POLICY,
0267: HostnameQueueAssignmentPolicy.class.getName()
0268: + " "
0269: + IPQueueAssignmentPolicy.class.getName()
0270: + " "
0271: + BucketQueueAssignmentPolicy.class.getName()
0272: + " "
0273: + SurtAuthorityQueueAssignmentPolicy.class
0274: .getName());
0275: Pattern p = Pattern.compile("\\s*,\\s*|\\s+");
0276: String[] queues = p.split(queueStr);
0277: if (queues.length <= 0) {
0278: throw new RuntimeException("Failed parse of "
0279: + " assignment queue policy string: " + queueStr);
0280: }
0281: t = addElementToDefinition(new SimpleType(
0282: ATTR_QUEUE_ASSIGNMENT_POLICY,
0283: "Defines how to assign URIs to queues. Can assign by host, "
0284: + "by ip, and into one of a fixed set of buckets (1k).",
0285: queues[0], queues));
0286: t.setExpertSetting(true);
0287: t.setOverrideable(false);
0288:
0289: t = addElementToDefinition(new SimpleType(
0290: ATTR_FORCE_QUEUE,
0291: "The queue name into which to force URIs. Should "
0292: + "be left blank at global level. Specify a "
0293: + "per-domain/per-host override to force URIs into "
0294: + "a particular named queue, regardless of the assignment "
0295: + "policy in effect (domain or ip-based politeness). "
0296: + "This could be used on domains known to all be from "
0297: + "the same small set of IPs (eg blogspot, dailykos, etc.) "
0298: + "to simulate IP-based politeness, or it could be used if "
0299: + "you wanted to enforce politeness over a whole domain, even "
0300: + "though the subdomains are split across many IPs.",
0301: DEFAULT_FORCE_QUEUE));
0302: t.setOverrideable(true);
0303: t.setExpertSetting(true);
0304: t
0305: .addConstraint(new RegularExpressionConstraint(
0306: ACCEPTABLE_FORCE_QUEUE,
0307: Level.WARNING,
0308: "This field must contain only alphanumeric "
0309: + "characters plus period, dash, comma, colon, or underscore."));
0310: t = addElementToDefinition(new SimpleType(
0311: ATTR_PAUSE_AT_START,
0312: "Whether to pause when the crawl begins, before any URIs "
0313: + "are tried. This gives the operator a chance to verify or "
0314: + "adjust the crawl before actual work begins. "
0315: + "Default is false.", DEFAULT_PAUSE_AT_START));
0316: t = addElementToDefinition(new SimpleType(
0317: ATTR_PAUSE_AT_FINISH,
0318: "Whether to pause when the crawl appears finished, rather "
0319: + "than immediately end the crawl. This gives the operator an "
0320: + "opportunity to view crawl results, and possibly add URIs or "
0321: + "adjust settings, while the crawl state is still available. "
0322: + "Default is false.", DEFAULT_PAUSE_AT_FINISH));
0323: t.setOverrideable(false);
0324:
0325: t = addElementToDefinition(new SimpleType(
0326: ATTR_SOURCE_TAG_SEEDS,
0327: "Whether to tag seeds with their own URI as a heritable "
0328: + "'source' String, which will be carried-forward to all URIs "
0329: + "discovered on paths originating from that seed. When "
0330: + "present, such source tags appear in the second-to-last "
0331: + "crawl.log field.", DEFAULT_SOURCE_TAG_SEEDS));
0332: t.setOverrideable(false);
0333:
0334: t = addElementToDefinition(new SimpleType(
0335: ATTR_RECOVERY_ENABLED,
0336: "Set to false to disable recovery log writing. Do this if "
0337: + "you you are using the checkpoint feature for recovering "
0338: + "crashed crawls.",
0339: DEFAULT_ATTR_RECOVERY_ENABLED));
0340: t.setExpertSetting(true);
0341: // No sense in it being overrideable.
0342: t.setOverrideable(false);
0343: }
0344:
0345: public void start() {
0346: if (((Boolean) getUncheckedAttribute(null, ATTR_PAUSE_AT_START))
0347: .booleanValue()) {
0348: // trigger crawl-wide pause
0349: controller.requestCrawlPause();
0350: } else {
0351: // simply begin
0352: unpause();
0353: }
0354: }
0355:
0356: synchronized public void pause() {
0357: shouldPause = true;
0358: }
0359:
0360: synchronized public void unpause() {
0361: shouldPause = false;
0362: notifyAll();
0363: }
0364:
0365: public void initialize(CrawlController c)
0366: throws FatalConfigurationException, IOException {
0367: c.addCrawlStatusListener(this );
0368: File logsDisk = null;
0369: try {
0370: logsDisk = c.getSettingsDir(CrawlOrder.ATTR_LOGS_PATH);
0371: } catch (AttributeNotFoundException e) {
0372: logger.log(Level.SEVERE, "Failed to get logs directory", e);
0373: }
0374: if (logsDisk != null) {
0375: String logsPath = logsDisk.getAbsolutePath()
0376: + File.separatorChar;
0377: if (((Boolean) getUncheckedAttribute(null,
0378: ATTR_RECOVERY_ENABLED)).booleanValue()) {
0379: this .recover = new RecoveryJournal(logsPath,
0380: FrontierJournal.LOGNAME_RECOVER);
0381: }
0382: }
0383: try {
0384: final Class qapClass = Class
0385: .forName((String) getUncheckedAttribute(null,
0386: ATTR_QUEUE_ASSIGNMENT_POLICY));
0387:
0388: queueAssignmentPolicy = (QueueAssignmentPolicy) qapClass
0389: .newInstance();
0390: } catch (Exception e) {
0391: logger.log(Level.SEVERE,
0392: "Bad queue assignment policy class", e);
0393: throw new FatalConfigurationException(e.getMessage());
0394: }
0395: }
0396:
0397: synchronized public void terminate() {
0398: shouldTerminate = true;
0399: if (this .recover != null) {
0400: this .recover.close();
0401: this .recover = null;
0402: }
0403: unpause();
0404: }
0405:
0406: protected void doJournalFinishedSuccess(CrawlURI c) {
0407: if (this .recover != null) {
0408: this .recover.finishedSuccess(c);
0409: }
0410: }
0411:
0412: protected void doJournalAdded(CrawlURI c) {
0413: if (this .recover != null) {
0414: this .recover.added(c);
0415: }
0416: }
0417:
0418: protected void doJournalRescheduled(CrawlURI c) {
0419: if (this .recover != null) {
0420: this .recover.rescheduled(c);
0421: }
0422: }
0423:
0424: protected void doJournalFinishedFailure(CrawlURI c) {
0425: if (this .recover != null) {
0426: this .recover.finishedFailure(c);
0427: }
0428: }
0429:
0430: protected void doJournalEmitted(CrawlURI c) {
0431: if (this .recover != null) {
0432: this .recover.emitted(c);
0433: }
0434: }
0435:
0436: /**
0437: * Frontier is empty only if all queues are empty and no URIs are in-process
0438: *
0439: * @return True if queues are empty.
0440: */
0441: public synchronized boolean isEmpty() {
0442: return queuedUriCount == 0;
0443: }
0444:
0445: /**
0446: * Increment the running count of queued URIs. Synchronized because
0447: * operations on longs are not atomic.
0448: */
0449: protected synchronized void incrementQueuedUriCount() {
0450: queuedUriCount++;
0451: }
0452:
0453: /**
0454: * Increment the running count of queued URIs. Synchronized because
0455: * operations on longs are not atomic.
0456: *
0457: * @param increment
0458: * amount to increment the queued count
0459: */
0460: protected synchronized void incrementQueuedUriCount(long increment) {
0461: queuedUriCount += increment;
0462: }
0463:
0464: /**
0465: * Note that a number of queued Uris have been deleted.
0466: *
0467: * @param numberOfDeletes
0468: */
0469: protected synchronized void decrementQueuedCount(
0470: long numberOfDeletes) {
0471: queuedUriCount -= numberOfDeletes;
0472: }
0473:
0474: /**
0475: * (non-Javadoc)
0476: *
0477: * @see org.archive.crawler.framework.Frontier#queuedUriCount()
0478: */
0479: public long queuedUriCount() {
0480: return queuedUriCount;
0481: }
0482:
0483: /**
0484: * (non-Javadoc)
0485: *
0486: * @see org.archive.crawler.framework.Frontier#finishedUriCount()
0487: */
0488: public long finishedUriCount() {
0489: return succeededFetchCount + failedFetchCount
0490: + disregardedUriCount;
0491: }
0492:
0493: /**
0494: * Increment the running count of successfully fetched URIs. Synchronized
0495: * because operations on longs are not atomic.
0496: */
0497: protected synchronized void incrementSucceededFetchCount() {
0498: succeededFetchCount++;
0499: }
0500:
0501: /**
0502: * (non-Javadoc)
0503: *
0504: * @see org.archive.crawler.framework.Frontier#succeededFetchCount()
0505: */
0506: public long succeededFetchCount() {
0507: return succeededFetchCount;
0508: }
0509:
0510: /**
0511: * Increment the running count of failed URIs. Synchronized because
0512: * operations on longs are not atomic.
0513: */
0514: protected synchronized void incrementFailedFetchCount() {
0515: failedFetchCount++;
0516: }
0517:
0518: /**
0519: * (non-Javadoc)
0520: *
0521: * @see org.archive.crawler.framework.Frontier#failedFetchCount()
0522: */
0523: public long failedFetchCount() {
0524: return failedFetchCount;
0525: }
0526:
0527: /**
0528: * Increment the running count of disregarded URIs. Synchronized because
0529: * operations on longs are not atomic.
0530: */
0531: protected synchronized void incrementDisregardedUriCount() {
0532: disregardedUriCount++;
0533: }
0534:
0535: public long disregardedUriCount() {
0536: return disregardedUriCount;
0537: }
0538:
0539: /** @deprecated misnomer; use StatisticsTracking figures instead */
0540: public long totalBytesWritten() {
0541: return totalProcessedBytes;
0542: }
0543:
0544: /**
0545: * Load up the seeds.
0546: *
0547: * This method is called on initialize and inside in the crawlcontroller
0548: * when it wants to force reloading of configuration.
0549: *
0550: * @see org.archive.crawler.framework.CrawlController#kickUpdate()
0551: */
0552: public void loadSeeds() {
0553: Writer ignoredWriter = new StringWriter();
0554: logger.info("beginning");
0555: // Get the seeds to refresh.
0556: Iterator iter = this .controller.getScope().seedsIterator(
0557: ignoredWriter);
0558: int count = 0;
0559: while (iter.hasNext()) {
0560: UURI u = (UURI) iter.next();
0561: CandidateURI caUri = CandidateURI.createSeedCandidateURI(u);
0562: caUri.setSchedulingDirective(CandidateURI.MEDIUM);
0563: if (((Boolean) getUncheckedAttribute(null,
0564: ATTR_SOURCE_TAG_SEEDS)).booleanValue()) {
0565: caUri.putString(CoreAttributeConstants.A_SOURCE_TAG,
0566: caUri.toString());
0567: caUri
0568: .makeHeritable(CoreAttributeConstants.A_SOURCE_TAG);
0569: }
0570: schedule(caUri);
0571: count++;
0572: if (count % 1000 == 0) {
0573: logger.info(count + " seeds");
0574: }
0575: }
0576: // save ignored items (if any) where they can be consulted later
0577: saveIgnoredItems(ignoredWriter.toString(), controller.getDisk());
0578: logger.info("finished");
0579: }
0580:
0581: /**
0582: * Dump ignored seed items (if any) to disk; delete file otherwise.
0583: * Static to allow non-derived sibling classes (frontiers not yet
0584: * subclassed here) to reuse.
0585: *
0586: * @param ignoredItems
0587: * @param dir
0588: */
0589: public static void saveIgnoredItems(String ignoredItems, File dir) {
0590: File ignoredFile = new File(dir, IGNORED_SEEDS_FILENAME);
0591: if (ignoredItems == null | ignoredItems.length() > 0) {
0592: try {
0593: BufferedWriter bw = new BufferedWriter(new FileWriter(
0594: ignoredFile));
0595: bw.write(ignoredItems);
0596: bw.close();
0597: } catch (IOException e) {
0598: // TODO make an alert?
0599: e.printStackTrace();
0600: }
0601: } else {
0602: // delete any older file (if any)
0603: ignoredFile.delete();
0604: }
0605: }
0606:
0607: protected CrawlURI asCrawlUri(CandidateURI caUri) {
0608: CrawlURI curi;
0609: if (caUri instanceof CrawlURI) {
0610: curi = (CrawlURI) caUri;
0611: } else {
0612: curi = CrawlURI.from(caUri, nextOrdinal.getAndIncrement());
0613: }
0614: curi.setClassKey(getClassKey(curi));
0615: return curi;
0616: }
0617:
0618: /**
0619: * @param now
0620: * @throws InterruptedException
0621: * @throws EndedException
0622: */
0623: protected synchronized void preNext(long now)
0624: throws InterruptedException, EndedException {
0625: if (this .controller == null) {
0626: return;
0627: }
0628:
0629: // Check completion conditions
0630: if (this .controller.atFinish()) {
0631: if (((Boolean) getUncheckedAttribute(null,
0632: ATTR_PAUSE_AT_FINISH)).booleanValue()) {
0633: this .controller.requestCrawlPause();
0634: } else {
0635: this .controller.beginCrawlStop();
0636: }
0637: }
0638:
0639: // enforce operator pause
0640: if (shouldPause) {
0641: while (shouldPause) {
0642: this .controller.toePaused();
0643: wait();
0644: }
0645: // exitted pause; possibly finish regardless of pause-at-finish
0646: if (controller != null && controller.atFinish()) {
0647: this .controller.beginCrawlStop();
0648: }
0649: }
0650:
0651: // enforce operator terminate or thread retirement
0652: if (shouldTerminate
0653: || ((ToeThread) Thread.currentThread()).shouldRetire()) {
0654: throw new EndedException("terminated");
0655: }
0656:
0657: enforceBandwidthThrottle(now);
0658: }
0659:
0660: /**
0661: * Perform any special handling of the CrawlURI, such as promoting its URI
0662: * to seed-status, or preferencing it because it is an embed.
0663: *
0664: * @param curi
0665: */
0666: protected void applySpecialHandling(CrawlURI curi) {
0667: if (curi.isSeed() && curi.getVia() != null
0668: && curi.flattenVia().length() > 0) {
0669: // The only way a seed can have a non-empty via is if it is the
0670: // result of a seed redirect. Add it to the seeds list.
0671: //
0672: // This is a feature. This is handling for case where a seed
0673: // gets immediately redirected to another page. What we're doing is
0674: // treating the immediate redirect target as a seed.
0675: this .controller.getScope().addSeed(curi);
0676: // And it needs rapid scheduling.
0677: if (curi.getSchedulingDirective() == CandidateURI.NORMAL)
0678: curi.setSchedulingDirective(CandidateURI.MEDIUM);
0679: }
0680:
0681: // optionally preferencing embeds up to MEDIUM
0682: int prefHops = ((Integer) getUncheckedAttribute(curi,
0683: ATTR_PREFERENCE_EMBED_HOPS)).intValue();
0684: if (prefHops > 0) {
0685: int embedHops = curi.getTransHops();
0686: if (embedHops > 0
0687: && embedHops <= prefHops
0688: && curi.getSchedulingDirective() == CandidateURI.NORMAL) {
0689: // number of embed hops falls within the preferenced range, and
0690: // uri is not already MEDIUM -- so promote it
0691: curi.setSchedulingDirective(CandidateURI.MEDIUM);
0692: }
0693: }
0694: }
0695:
0696: /**
0697: * Perform fixups on a CrawlURI about to be returned via next().
0698: *
0699: * @param curi
0700: * CrawlURI about to be returned by next()
0701: * @param q
0702: * the queue from which the CrawlURI came
0703: */
0704: protected void noteAboutToEmit(CrawlURI curi, WorkQueue q) {
0705: curi.setHolder(q);
0706: // if (curi.getServer() == null) {
0707: // // TODO: perhaps short-circuit the emit here,
0708: // // because URI will be rejected as unfetchable
0709: // }
0710: doJournalEmitted(curi);
0711: }
0712:
0713: /**
0714: * @param curi
0715: * @return the CrawlServer to be associated with this CrawlURI
0716: */
0717: protected CrawlServer getServer(CrawlURI curi) {
0718: return this .controller.getServerCache().getServerFor(curi);
0719: }
0720:
0721: /**
0722: * Return a suitable value to wait before retrying the given URI.
0723: *
0724: * @param curi
0725: * CrawlURI to be retried
0726: * @return millisecond delay before retry
0727: */
0728: protected long retryDelayFor(CrawlURI curi) {
0729: int status = curi.getFetchStatus();
0730: return (status == S_CONNECT_FAILED || status == S_CONNECT_LOST || status == S_DOMAIN_UNRESOLVABLE) ? ((Long) getUncheckedAttribute(
0731: curi, ATTR_RETRY_DELAY)).longValue()
0732: : 0; // no delay for most
0733: }
0734:
0735: /**
0736: * Update any scheduling structures with the new information in this
0737: * CrawlURI. Chiefly means make necessary arrangements for no other URIs at
0738: * the same host to be visited within the appropriate politeness window.
0739: *
0740: * @param curi
0741: * The CrawlURI
0742: * @return millisecond politeness delay
0743: */
0744: protected long politenessDelayFor(CrawlURI curi) {
0745: long durationToWait = 0;
0746: if (curi.containsKey(A_FETCH_BEGAN_TIME)
0747: && curi.containsKey(A_FETCH_COMPLETED_TIME)) {
0748:
0749: long completeTime = curi.getLong(A_FETCH_COMPLETED_TIME);
0750: long durationTaken = (completeTime - curi
0751: .getLong(A_FETCH_BEGAN_TIME));
0752: durationToWait = (long) (((Float) getUncheckedAttribute(
0753: curi, ATTR_DELAY_FACTOR)).floatValue() * durationTaken);
0754:
0755: long minDelay = ((Integer) getUncheckedAttribute(curi,
0756: ATTR_MIN_DELAY)).longValue();
0757: if (minDelay > durationToWait) {
0758: // wait at least the minimum
0759: durationToWait = minDelay;
0760: }
0761:
0762: long maxDelay = ((Integer) getUncheckedAttribute(curi,
0763: ATTR_MAX_DELAY)).longValue();
0764: if (durationToWait > maxDelay) {
0765: // wait no more than the maximum
0766: durationToWait = maxDelay;
0767: }
0768:
0769: long now = System.currentTimeMillis();
0770: int maxBandwidthKB = ((Integer) getUncheckedAttribute(curi,
0771: ATTR_MAX_HOST_BANDWIDTH_USAGE)).intValue();
0772: if (maxBandwidthKB > 0) {
0773: // Enforce bandwidth limit
0774: CrawlHost host = controller.getServerCache()
0775: .getHostFor(curi);
0776: long minDurationToWait = host
0777: .getEarliestNextURIEmitTime()
0778: - now;
0779: float maxBandwidth = maxBandwidthKB * 1.024F; // kilo factor
0780: long processedBytes = curi.getContentSize();
0781: host
0782: .setEarliestNextURIEmitTime((long) (processedBytes / maxBandwidth)
0783: + now);
0784:
0785: if (minDurationToWait > durationToWait) {
0786: durationToWait = minDurationToWait;
0787: }
0788: }
0789: }
0790: return durationToWait;
0791: }
0792:
0793: /**
0794: * Ensure that any overall-bandwidth-usage limit is respected, by pausing as
0795: * long as necessary.
0796: *
0797: * @param now
0798: * @throws InterruptedException
0799: */
0800: private void enforceBandwidthThrottle(long now)
0801: throws InterruptedException {
0802: int maxBandwidthKB = ((Integer) getUncheckedAttribute(null,
0803: ATTR_MAX_OVERALL_BANDWIDTH_USAGE)).intValue();
0804: if (maxBandwidthKB > 0) {
0805: // Make sure that new bandwidth setting doesn't affect total crawl
0806: if (maxBandwidthKB != lastMaxBandwidthKB) {
0807: lastMaxBandwidthKB = maxBandwidthKB;
0808: processedBytesAfterLastEmittedURI = totalProcessedBytes;
0809: }
0810:
0811: // Enforce bandwidth limit
0812: long sleepTime = nextURIEmitTime - now;
0813: float maxBandwidth = maxBandwidthKB * 1.024F; // Kilo_factor
0814: long processedBytes = totalProcessedBytes
0815: - processedBytesAfterLastEmittedURI;
0816: long shouldHaveEmittedDiff = nextURIEmitTime == 0 ? 0
0817: : nextURIEmitTime - now;
0818: nextURIEmitTime = (long) (processedBytes / maxBandwidth)
0819: + now + shouldHaveEmittedDiff;
0820: processedBytesAfterLastEmittedURI = totalProcessedBytes;
0821: if (sleepTime > 0) {
0822: long targetTime = now + sleepTime;
0823: now = System.currentTimeMillis();
0824: while (now < targetTime) {
0825: synchronized (this ) {
0826: if (logger.isLoggable(Level.FINE)) {
0827: logger.fine("Frontier waits for: "
0828: + sleepTime
0829: + "ms to respect bandwidth limit.");
0830: }
0831: // TODO: now that this is a wait(), frontier can
0832: // still schedule and finish items while waiting,
0833: // which is good, but multiple threads could all
0834: // wait for the same wakeTime, which somewhat
0835: // spoils the throttle... should be fixed.
0836: wait(targetTime - now);
0837: }
0838: now = System.currentTimeMillis();
0839: }
0840: }
0841: }
0842: }
0843:
0844: /**
0845: * Take note of any processor-local errors that have been entered into the
0846: * CrawlURI.
0847: *
0848: * @param curi
0849: *
0850: */
0851: protected void logLocalizedErrors(CrawlURI curi) {
0852: if (curi.containsKey(A_LOCALIZED_ERRORS)) {
0853: List localErrors = (List) curi
0854: .getObject(A_LOCALIZED_ERRORS);
0855: Iterator iter = localErrors.iterator();
0856: while (iter.hasNext()) {
0857: Object array[] = { curi, iter.next() };
0858: controller.localErrors.log(Level.WARNING, curi
0859: .getUURI().toString(), array);
0860: }
0861: // once logged, discard
0862: curi.remove(A_LOCALIZED_ERRORS);
0863: }
0864: }
0865:
0866: /**
0867: * Utility method to return a scratch dir for the given key's temp files.
0868: * Every key gets its own subdir. To avoid having any one directory with
0869: * thousands of files, there are also two levels of enclosing directory
0870: * named by the least-significant hex digits of the key string's java
0871: * hashcode.
0872: *
0873: * @param key
0874: * @return File representing scratch directory
0875: */
0876: protected File scratchDirFor(String key) {
0877: String hex = Integer.toHexString(key.hashCode());
0878: while (hex.length() < 4) {
0879: hex = "0" + hex;
0880: }
0881: int len = hex.length();
0882: return new File(this .controller.getStateDisk(), hex.substring(
0883: len - 2, len)
0884: + File.separator
0885: + hex.substring(len - 4, len - 2)
0886: + File.separator + key);
0887: }
0888:
0889: protected boolean overMaxRetries(CrawlURI curi) {
0890: // never retry more than the max number of times
0891: if (curi.getFetchAttempts() >= ((Integer) getUncheckedAttribute(
0892: curi, ATTR_MAX_RETRIES)).intValue()) {
0893: return true;
0894: }
0895: return false;
0896: }
0897:
0898: public void importRecoverLog(String pathToLog,
0899: boolean retainFailures) throws IOException {
0900: File source = new File(pathToLog);
0901: if (!source.isAbsolute()) {
0902: source = new File(getSettingsHandler().getOrder()
0903: .getController().getDisk(), pathToLog);
0904: }
0905: RecoveryJournal.importRecoverLog(source, this , retainFailures);
0906: }
0907:
0908: /*
0909: * (non-Javadoc)
0910: *
0911: * @see org.archive.crawler.framework.URIFrontier#kickUpdate()
0912: */
0913: public void kickUpdate() {
0914: // by default, do nothing
0915: // (scope will loadSeeds, if appropriate)
0916: }
0917:
0918: /**
0919: * Log to the main crawl.log
0920: *
0921: * @param curi
0922: */
0923: protected void log(CrawlURI curi) {
0924: curi.aboutToLog();
0925: Object array[] = { curi };
0926: this .controller.uriProcessing.log(Level.INFO, curi.getUURI()
0927: .toString(), array);
0928: }
0929:
0930: protected boolean isDisregarded(CrawlURI curi) {
0931: switch (curi.getFetchStatus()) {
0932: case S_ROBOTS_PRECLUDED: // they don't want us to have it
0933: case S_BLOCKED_BY_CUSTOM_PROCESSOR:
0934: case S_OUT_OF_SCOPE: // filtered out by scope
0935: case S_BLOCKED_BY_USER: // filtered out by user
0936: case S_TOO_MANY_EMBED_HOPS: // too far from last true link
0937: case S_TOO_MANY_LINK_HOPS: // too far from seeds
0938: case S_DELETED_BY_USER: // user deleted
0939: return true;
0940: default:
0941: return false;
0942: }
0943: }
0944:
0945: /**
0946: * Checks if a recently completed CrawlURI that did not finish successfully
0947: * needs to be retried (processed again after some time elapses)
0948: *
0949: * @param curi
0950: * The CrawlURI to check
0951: * @return True if we need to retry.
0952: */
0953: protected boolean needsRetrying(CrawlURI curi) {
0954: if (overMaxRetries(curi)) {
0955: return false;
0956: }
0957:
0958: switch (curi.getFetchStatus()) {
0959: case HttpStatus.SC_UNAUTHORIZED:
0960: // We can get here though usually a positive status code is
0961: // a success. We get here if there is rfc2617 credential data
0962: // loaded and we're supposed to go around again. See if any
0963: // rfc2617 credential present and if there, assume it got
0964: // loaded in FetchHTTP on expectation that we're to go around
0965: // again. If no rfc2617 loaded, we should not be here.
0966: boolean loaded = curi.hasRfc2617CredentialAvatar();
0967: if (!loaded && logger.isLoggable(Level.INFO)) {
0968: logger.info("Have 401 but no creds loaded " + curi);
0969: }
0970: return loaded;
0971: case S_DEFERRED:
0972: case S_CONNECT_FAILED:
0973: case S_CONNECT_LOST:
0974: case S_DOMAIN_UNRESOLVABLE:
0975: // these are all worth a retry
0976: // TODO: consider if any others (S_TIMEOUT in some cases?) deserve
0977: // retry
0978: return true;
0979: default:
0980: return false;
0981: }
0982: }
0983:
0984: /**
0985: * Canonicalize passed uuri. Its would be sweeter if this canonicalize
0986: * function was encapsulated by that which it canonicalizes but because
0987: * settings change with context -- i.e. there may be overrides in operation
0988: * for a particular URI -- its not so easy; Each CandidateURI would need a
0989: * reference to the settings system. That's awkward to pass in.
0990: *
0991: * @param uuri Candidate URI to canonicalize.
0992: * @return Canonicalized version of passed <code>uuri</code>.
0993: */
0994: protected String canonicalize(UURI uuri) {
0995: return Canonicalizer.canonicalize(uuri, this .controller
0996: .getOrder());
0997: }
0998:
0999: /**
1000: * Canonicalize passed CandidateURI. This method differs from
1001: * {@link #canonicalize(UURI)} in that it takes a look at
1002: * the CandidateURI context possibly overriding any canonicalization effect if
1003: * it could make us miss content. If canonicalization produces an URL that
1004: * was 'alreadyseen', but the entry in the 'alreadyseen' database did
1005: * nothing but redirect to the current URL, we won't get the current URL;
1006: * we'll think we've already see it. Examples would be archive.org
1007: * redirecting to www.archive.org or the inverse, www.netarkivet.net
1008: * redirecting to netarkivet.net (assuming stripWWW rule enabled).
1009: * <p>Note, this method under circumstance sets the forceFetch flag.
1010: *
1011: * @param cauri CandidateURI to examine.
1012: * @return Canonicalized <code>cacuri</code>.
1013: */
1014: protected String canonicalize(CandidateURI cauri) {
1015: String canon = canonicalize(cauri.getUURI());
1016: if (cauri.isLocation()) {
1017: // If the via is not the same as where we're being redirected (i.e.
1018: // we're not being redirected back to the same page, AND the
1019: // canonicalization of the via is equal to the the current cauri,
1020: // THEN forcefetch (Forcefetch so no chance of our not crawling
1021: // content because alreadyseen check things its seen the url before.
1022: // An example of an URL that redirects to itself is:
1023: // http://bridalelegance.com/images/buttons3/tuxedos-off.gif.
1024: // An example of an URL whose canonicalization equals its via's
1025: // canonicalization, and we want to fetch content at the
1026: // redirection (i.e. need to set forcefetch), is netarkivet.dk.
1027: if (!cauri.toString().equals(cauri.getVia().toString())
1028: && canonicalize(cauri.getVia()).equals(canon)) {
1029: cauri.setForceFetch(true);
1030: }
1031: }
1032: return canon;
1033: }
1034:
1035: /**
1036: * @param cauri CrawlURI we're to get a key for.
1037: * @return a String token representing a queue
1038: */
1039: public String getClassKey(CandidateURI cauri) {
1040: String queueKey = (String) getUncheckedAttribute(cauri,
1041: ATTR_FORCE_QUEUE);
1042: if ("".equals(queueKey)) {
1043: // Typical case, barring overrides
1044: queueKey = queueAssignmentPolicy.getClassKey(
1045: this .controller, cauri);
1046: }
1047: return queueKey;
1048: }
1049:
1050: /**
1051: * @return RecoveryJournal instance. May be null.
1052: */
1053: public FrontierJournal getFrontierJournal() {
1054: return this .recover;
1055: }
1056:
1057: public void crawlEnding(String sExitMessage) {
1058: // TODO Auto-generated method stub
1059: }
1060:
1061: public void crawlEnded(String sExitMessage) {
1062: if (logger.isLoggable(Level.INFO)) {
1063: logger.info("Closing with "
1064: + Long.toString(queuedUriCount())
1065: + " urls still in queue.");
1066: }
1067: }
1068:
1069: public void crawlStarted(String message) {
1070: // TODO Auto-generated method stub
1071: }
1072:
1073: public void crawlPausing(String statusMessage) {
1074: // TODO Auto-generated method stub
1075: }
1076:
1077: public void crawlPaused(String statusMessage) {
1078: // TODO Auto-generated method stub
1079: }
1080:
1081: public void crawlResuming(String statusMessage) {
1082: // TODO Auto-generated method stub
1083: }
1084:
1085: public void crawlCheckpoint(File checkpointDir) throws Exception {
1086: if (this .recover == null) {
1087: return;
1088: }
1089: this .recover.checkpoint(checkpointDir);
1090: }
1091:
1092: //
1093: // Reporter implementation
1094: //
1095: public String singleLineReport() {
1096: return ArchiveUtils.singleLineReport(this );
1097: }
1098:
1099: public void reportTo(PrintWriter writer) {
1100: reportTo(null, writer);
1101: }
1102: }
|