0001: /* $Id: WorkQueueFrontier.java 5046 2007-04-10 01:40:08Z gojomo $
0002: * Created on Sep 24, 2004
0003: *
0004: * Copyright (C) 2004 Internet Archive.
0005: *
0006: * This file is part of the Heritrix web crawler (crawler.archive.org).
0007: *
0008: * Heritrix is free software; you can redistribute it and/or modify
0009: * it under the terms of the GNU Lesser Public License as published by
0010: * the Free Software Foundation; either version 2.1 of the License, or
0011: * any later version.
0012: *
0013: * Heritrix is distributed in the hope that it will be useful,
0014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
0016: * GNU Lesser Public License for more details.
0017: *
0018: * You should have received a copy of the GNU Lesser Public License
0019: * along with Heritrix; if not, write to the Free Software
0020: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0021: *
0022: */
0023: package org.archive.crawler.frontier;
0024:
0025: import java.io.IOException;
0026: import java.io.PrintWriter;
0027: import java.io.Serializable;
0028: import java.util.ArrayList;
0029: import java.util.Collection;
0030: import java.util.Collections;
0031: import java.util.Date;
0032: import java.util.HashMap;
0033: import java.util.Iterator;
0034: import java.util.Map;
0035: import java.util.SortedSet;
0036: import java.util.Timer;
0037: import java.util.TimerTask;
0038: import java.util.TreeSet;
0039: import java.util.logging.Level;
0040: import java.util.logging.Logger;
0041:
0042: import org.apache.commons.collections.Bag;
0043: import org.apache.commons.collections.BagUtils;
0044: import org.apache.commons.collections.bag.HashBag;
0045: import org.archive.crawler.datamodel.CandidateURI;
0046: import org.archive.crawler.datamodel.CoreAttributeConstants;
0047: import org.archive.crawler.datamodel.CrawlURI;
0048: import org.archive.crawler.datamodel.FetchStatusCodes;
0049: import org.archive.crawler.datamodel.UriUniqFilter;
0050: import org.archive.crawler.datamodel.UriUniqFilter.HasUriReceiver;
0051: import org.archive.crawler.framework.CrawlController;
0052: import org.archive.crawler.framework.Frontier;
0053: import org.archive.crawler.framework.exceptions.EndedException;
0054: import org.archive.crawler.framework.exceptions.FatalConfigurationException;
0055: import org.archive.crawler.settings.SimpleType;
0056: import org.archive.crawler.settings.Type;
0057: import org.archive.net.UURI;
0058: import org.archive.util.ArchiveUtils;
0059:
0060: import com.sleepycat.collections.StoredIterator;
0061:
0062: import java.util.concurrent.BlockingQueue;
0063: import java.util.concurrent.LinkedBlockingQueue;
0064: import java.util.concurrent.TimeUnit;
0065:
0066: /**
0067: * A common Frontier base using several queues to hold pending URIs.
0068: *
0069: * Uses in-memory map of all known 'queues' inside a single database.
0070: * Round-robins between all queues.
0071: *
0072: * @author Gordon Mohr
0073: * @author Christian Kohlschuetter
0074: */
0075: public abstract class WorkQueueFrontier extends AbstractFrontier
0076: implements FetchStatusCodes, CoreAttributeConstants,
0077: HasUriReceiver, Serializable {
0078: private static final long serialVersionUID = 570384305871965843L;
0079:
0080: public class WakeTask extends TimerTask {
0081: @Override
0082: public void run() {
0083: synchronized (snoozedClassQueues) {
0084: if (this != nextWake) {
0085: // an intervening waketask was made
0086: return;
0087: }
0088: wakeQueues();
0089: }
0090: }
0091: }
0092:
0093: /** truncate reporting of queues at some large but not unbounded number */
0094: private static final int REPORT_MAX_QUEUES = 2000;
0095:
0096: /**
0097: * If we know that only a small amount of queues is held in memory,
0098: * we can avoid using a disk-based BigMap.
0099: * This only works efficiently if the WorkQueue does not hold its
0100: * entries in memory as well.
0101: */
0102: private static final int MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY = 3000;
0103:
0104: /**
0105: * When a snooze target for a queue is longer than this amount, and
0106: * there are already ready queues, deactivate rather than snooze
0107: * the current queue -- so other more responsive sites get a chance
0108: * in active rotation. (As a result, queue's next try may be much
0109: * further in the future than the snooze target delay.)
0110: */
0111: public final static String ATTR_SNOOZE_DEACTIVATE_MS = "snooze-deactivate-ms";
0112: public static Long DEFAULT_SNOOZE_DEACTIVATE_MS = new Long(
0113: 5 * 60 * 1000); // 5 minutes
0114:
0115: private static final Logger logger = Logger
0116: .getLogger(WorkQueueFrontier.class.getName());
0117:
0118: /** whether to hold queues INACTIVE until needed for throughput */
0119: public final static String ATTR_HOLD_QUEUES = "hold-queues";
0120: protected final static Boolean DEFAULT_HOLD_QUEUES = new Boolean(
0121: true);
0122:
0123: /** amount to replenish budget on each activation (duty cycle) */
0124: public final static String ATTR_BALANCE_REPLENISH_AMOUNT = "balance-replenish-amount";
0125: protected final static Integer DEFAULT_BALANCE_REPLENISH_AMOUNT = new Integer(
0126: 3000);
0127:
0128: /** whether to hold queues INACTIVE until needed for throughput */
0129: public final static String ATTR_ERROR_PENALTY_AMOUNT = "error-penalty-amount";
0130: protected final static Integer DEFAULT_ERROR_PENALTY_AMOUNT = new Integer(
0131: 100);
0132:
0133: /** total expenditure to allow a queue before 'retiring' it */
0134: public final static String ATTR_QUEUE_TOTAL_BUDGET = "queue-total-budget";
0135: protected final static Long DEFAULT_QUEUE_TOTAL_BUDGET = new Long(
0136: -1);
0137:
0138: /** cost assignment policy to use (by class name) */
0139: public final static String ATTR_COST_POLICY = "cost-policy";
0140: protected final static String DEFAULT_COST_POLICY = UnitCostAssignmentPolicy.class
0141: .getName();
0142:
0143: /** target size of ready queues backlog */
0144: public final static String ATTR_TARGET_READY_QUEUES_BACKLOG = "target-ready-backlog";
0145: protected final static Integer DEFAULT_TARGET_READY_QUEUES_BACKLOG = new Integer(
0146: 50);
0147:
0148: /** those UURIs which are already in-process (or processed), and
0149: thus should not be rescheduled */
0150: protected transient UriUniqFilter alreadyIncluded;
0151:
0152: /** All known queues.
0153: */
0154: protected transient Map<String, WorkQueue> allQueues = null;
0155: // of classKey -> ClassKeyQueue
0156:
0157: /**
0158: * All per-class queues whose first item may be handed out.
0159: * Linked-list of keys for the queues.
0160: */
0161: protected BlockingQueue<String> readyClassQueues = new LinkedBlockingQueue<String>();
0162:
0163: /** Target (minimum) size to keep readyClassQueues */
0164: protected int targetSizeForReadyQueues;
0165:
0166: /**
0167: * All 'inactive' queues, not yet in active rotation.
0168: * Linked-list of keys for the queues.
0169: */
0170: protected BlockingQueue<String> inactiveQueues = new LinkedBlockingQueue<String>();
0171:
0172: /**
0173: * 'retired' queues, no longer considered for activation.
0174: * Linked-list of keys for queues.
0175: */
0176: protected BlockingQueue<String> retiredQueues = new LinkedBlockingQueue<String>();
0177:
0178: /** all per-class queues from whom a URI is outstanding */
0179: protected Bag inProcessQueues = BagUtils
0180: .synchronizedBag(new HashBag()); // of ClassKeyQueue
0181:
0182: /**
0183: * All per-class queues held in snoozed state, sorted by wake time.
0184: */
0185: protected SortedSet<WorkQueue> snoozedClassQueues = Collections
0186: .synchronizedSortedSet(new TreeSet<WorkQueue>());
0187:
0188: /** Timer for tasks which wake head item of snoozedClassQueues */
0189: protected transient Timer wakeTimer;
0190:
0191: /** Task for next wake */
0192: protected transient WakeTask nextWake;
0193:
0194: protected WorkQueue longestActiveQueue = null;
0195:
0196: /** how long to wait for a ready queue when there's nothing snoozed */
0197: private static final long DEFAULT_WAIT = 1000; // 1 second
0198:
0199: /** a policy for assigning 'cost' values to CrawlURIs */
0200: private transient CostAssignmentPolicy costAssignmentPolicy;
0201:
0202: /** all policies available to be chosen */
0203: String[] AVAILABLE_COST_POLICIES = new String[] {
0204: ZeroCostAssignmentPolicy.class.getName(),
0205: UnitCostAssignmentPolicy.class.getName(),
0206: WagCostAssignmentPolicy.class.getName(),
0207: AntiCalendarCostAssignmentPolicy.class.getName() };
0208:
0209: /**
0210: * Create the CommonFrontier
0211: *
0212: * @param name
0213: * @param description
0214: */
0215: public WorkQueueFrontier(String name, String description) {
0216: // The 'name' of all frontiers should be the same (URIFrontier.ATTR_NAME)
0217: // therefore we'll ignore the supplied parameter.
0218: super (Frontier.ATTR_NAME, description);
0219: Type t = addElementToDefinition(new SimpleType(
0220: ATTR_HOLD_QUEUES,
0221: "Whether to hold newly-created per-host URI work"
0222: + " queues until needed to stay busy. If false (default),"
0223: + " all queues may contribute URIs for crawling at all"
0224: + " times. If true, queues begin (and collect URIs) in"
0225: + " an 'inactive' state, and only when the Frontier needs"
0226: + " another queue to keep all ToeThreads busy will new"
0227: + " queues be activated.", DEFAULT_HOLD_QUEUES));
0228: t.setExpertSetting(true);
0229: t.setOverrideable(false);
0230: t = addElementToDefinition(new SimpleType(
0231: ATTR_BALANCE_REPLENISH_AMOUNT,
0232: "Amount to replenish a queue's activity balance when it becomes "
0233: + "active. Larger amounts mean more URIs will be tried from the "
0234: + "queue before it is deactivated in favor of waiting queues. "
0235: + "Default is 3000",
0236: DEFAULT_BALANCE_REPLENISH_AMOUNT));
0237: t.setExpertSetting(true);
0238: t.setOverrideable(true);
0239: t = addElementToDefinition(new SimpleType(
0240: ATTR_ERROR_PENALTY_AMOUNT,
0241: "Amount to additionally penalize a queue when one of"
0242: + "its URIs fails completely. Accelerates deactivation or "
0243: + "full retirement of problem queues and unresponsive sites. "
0244: + "Default is 100",
0245: DEFAULT_ERROR_PENALTY_AMOUNT));
0246: t.setExpertSetting(true);
0247: t.setOverrideable(true);
0248: t = addElementToDefinition(new SimpleType(
0249: ATTR_QUEUE_TOTAL_BUDGET,
0250: "Total activity expenditure allowable to a single queue; queues "
0251: + "over this expenditure will be 'retired' and crawled no more. "
0252: + "Default of -1 means no ceiling on activity expenditures is "
0253: + "enforced.", DEFAULT_QUEUE_TOTAL_BUDGET));
0254: t.setExpertSetting(true);
0255: t.setOverrideable(true);
0256:
0257: t = addElementToDefinition(new SimpleType(
0258: ATTR_COST_POLICY,
0259: "Policy for calculating the cost of each URI attempted. "
0260: + "The default UnitCostAssignmentPolicy considers the cost of "
0261: + "each URI to be '1'.", DEFAULT_COST_POLICY,
0262: AVAILABLE_COST_POLICIES));
0263: t.setExpertSetting(true);
0264:
0265: t = addElementToDefinition(new SimpleType(
0266: ATTR_SNOOZE_DEACTIVATE_MS,
0267: "Threshold above which any 'snooze' delay will cause the "
0268: + "affected queue to go inactive, allowing other queues a "
0269: + "chance to rotate into active state. Typically set to be "
0270: + "longer than the politeness pauses between successful "
0271: + "fetches, but shorter than the connection-failed "
0272: + "'retry-delay-seconds'. (Default is 5 minutes.)",
0273: DEFAULT_SNOOZE_DEACTIVATE_MS));
0274: t.setExpertSetting(true);
0275: t.setOverrideable(false);
0276: t = addElementToDefinition(new SimpleType(
0277: ATTR_TARGET_READY_QUEUES_BACKLOG,
0278: "Target size for backlog of ready queues. This many queues "
0279: + "will be brought into 'ready' state even if a thread is "
0280: + "not waiting. Only has effect if 'hold-queues' is true. "
0281: + "Default is 50.",
0282: DEFAULT_TARGET_READY_QUEUES_BACKLOG));
0283: t.setExpertSetting(true);
0284: t.setOverrideable(false);
0285: }
0286:
0287: /**
0288: * Initializes the Frontier, given the supplied CrawlController.
0289: *
0290: * @see org.archive.crawler.framework.Frontier#initialize(org.archive.crawler.framework.CrawlController)
0291: */
0292: public void initialize(CrawlController c)
0293: throws FatalConfigurationException, IOException {
0294: // Call the super method. It sets up frontier journalling.
0295: super .initialize(c);
0296: this .controller = c;
0297:
0298: this .targetSizeForReadyQueues = (Integer) getUncheckedAttribute(
0299: null, ATTR_TARGET_READY_QUEUES_BACKLOG);
0300: if (this .targetSizeForReadyQueues < 1) {
0301: this .targetSizeForReadyQueues = 1;
0302: }
0303: this .wakeTimer = new Timer("waker for " + c.toString());
0304:
0305: try {
0306: if (workQueueDataOnDisk()
0307: && queueAssignmentPolicy.maximumNumberOfKeys() >= 0
0308: && queueAssignmentPolicy.maximumNumberOfKeys() <= MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY) {
0309: this .allQueues = Collections
0310: .synchronizedMap(new HashMap<String, WorkQueue>());
0311: } else {
0312: this .allQueues = c.getBigMap("allqueues", String.class,
0313: WorkQueue.class);
0314: if (logger.isLoggable(Level.FINE)) {
0315: Iterator i = this .allQueues.keySet().iterator();
0316: try {
0317: for (; i.hasNext();) {
0318: logger.fine((String) i.next());
0319: }
0320: } finally {
0321: StoredIterator.close(i);
0322: }
0323: }
0324: }
0325: this .alreadyIncluded = createAlreadyIncluded();
0326: initQueue();
0327: } catch (IOException e) {
0328: e.printStackTrace();
0329: throw (FatalConfigurationException) new FatalConfigurationException(
0330: e.getMessage()).initCause(e);
0331: } catch (Exception e) {
0332: e.printStackTrace();
0333: throw (FatalConfigurationException) new FatalConfigurationException(
0334: e.getMessage()).initCause(e);
0335: }
0336:
0337: initCostPolicy();
0338:
0339: loadSeeds();
0340: }
0341:
0342: /**
0343: * Set (or reset after configuration change) the cost policy in effect.
0344: *
0345: * @throws FatalConfigurationException
0346: */
0347: private void initCostPolicy() throws FatalConfigurationException {
0348: try {
0349: costAssignmentPolicy = (CostAssignmentPolicy) Class
0350: .forName(
0351: (String) getUncheckedAttribute(null,
0352: ATTR_COST_POLICY)).newInstance();
0353: } catch (Exception e) {
0354: e.printStackTrace();
0355: throw new FatalConfigurationException(e.getMessage());
0356: }
0357: }
0358:
0359: /* (non-Javadoc)
0360: * @see org.archive.crawler.frontier.AbstractFrontier#crawlEnded(java.lang.String)
0361: */
0362: public void crawlEnded(String sExitMessage) {
0363: // Cleanup. CrawlJobs persist after crawl has finished so undo any
0364: // references.
0365: if (this .alreadyIncluded != null) {
0366: this .alreadyIncluded.close();
0367: this .alreadyIncluded = null;
0368: }
0369:
0370: this .queueAssignmentPolicy = null;
0371:
0372: try {
0373: closeQueue();
0374: } catch (IOException e) {
0375: // FIXME exception handling
0376: e.printStackTrace();
0377: }
0378: this .wakeTimer.cancel();
0379:
0380: this .allQueues.clear();
0381: this .allQueues = null;
0382: this .inProcessQueues = null;
0383: this .readyClassQueues = null;
0384: this .snoozedClassQueues = null;
0385: this .inactiveQueues = null;
0386: this .retiredQueues = null;
0387:
0388: this .costAssignmentPolicy = null;
0389:
0390: // Clearing controller is a problem. We get NPEs in #preNext.
0391: super .crawlEnded(sExitMessage);
0392: this .controller = null;
0393: }
0394:
0395: /**
0396: * Create a UriUniqFilter that will serve as record
0397: * of already seen URIs.
0398: *
0399: * @return A UURISet that will serve as a record of already seen URIs
0400: * @throws IOException
0401: */
0402: protected abstract UriUniqFilter createAlreadyIncluded()
0403: throws IOException;
0404:
0405: /**
0406: * Arrange for the given CandidateURI to be visited, if it is not
0407: * already scheduled/completed.
0408: *
0409: * @see org.archive.crawler.framework.Frontier#schedule(org.archive.crawler.datamodel.CandidateURI)
0410: */
0411: public void schedule(CandidateURI caUri) {
0412: // Canonicalization may set forceFetch flag. See
0413: // #canonicalization(CandidateURI) javadoc for circumstance.
0414: String canon = canonicalize(caUri);
0415: if (caUri.forceFetch()) {
0416: alreadyIncluded.addForce(canon, caUri);
0417: } else {
0418: alreadyIncluded.add(canon, caUri);
0419: }
0420: }
0421:
0422: /**
0423: * Accept the given CandidateURI for scheduling, as it has
0424: * passed the alreadyIncluded filter.
0425: *
0426: * Choose a per-classKey queue and enqueue it. If this
0427: * item has made an unready queue ready, place that
0428: * queue on the readyClassQueues queue.
0429: * @param caUri CandidateURI.
0430: */
0431: public void receive(CandidateURI caUri) {
0432: CrawlURI curi = asCrawlUri(caUri);
0433: applySpecialHandling(curi);
0434: sendToQueue(curi);
0435: // Update recovery log.
0436: doJournalAdded(curi);
0437: }
0438:
0439: /* (non-Javadoc)
0440: * @see org.archive.crawler.frontier.AbstractFrontier#asCrawlUri(org.archive.crawler.datamodel.CandidateURI)
0441: */
0442: protected CrawlURI asCrawlUri(CandidateURI caUri) {
0443: CrawlURI curi = super .asCrawlUri(caUri);
0444: // force cost to be calculated, pre-insert
0445: getCost(curi);
0446: return curi;
0447: }
0448:
0449: /**
0450: * Send a CrawlURI to the appropriate subqueue.
0451: *
0452: * @param curi
0453: */
0454: protected void sendToQueue(CrawlURI curi) {
0455: WorkQueue wq = getQueueFor(curi);
0456: synchronized (wq) {
0457: wq.enqueue(this , curi);
0458: if (!wq.isRetired()) {
0459: incrementQueuedUriCount();
0460: }
0461: if (!wq.isHeld()) {
0462: wq.setHeld();
0463: if (holdQueues()
0464: && readyClassQueues.size() >= targetSizeForReadyQueues()) {
0465: deactivateQueue(wq);
0466: } else {
0467: replenishSessionBalance(wq);
0468: readyQueue(wq);
0469: }
0470: }
0471: WorkQueue laq = longestActiveQueue;
0472: if (!wq.isRetired()
0473: && ((laq == null) || wq.getCount() > laq.getCount())) {
0474: longestActiveQueue = wq;
0475: }
0476: }
0477: }
0478:
0479: /**
0480: * Whether queues should start inactive (only becoming active when needed
0481: * to keep the crawler busy), or if queues should start out ready.
0482: *
0483: * @return true if new queues should held inactive
0484: */
0485: private boolean holdQueues() {
0486: return ((Boolean) getUncheckedAttribute(null, ATTR_HOLD_QUEUES))
0487: .booleanValue();
0488: }
0489:
0490: /**
0491: * Put the given queue on the readyClassQueues queue
0492: * @param wq
0493: */
0494: private void readyQueue(WorkQueue wq) {
0495: try {
0496: wq.setActive(this , true);
0497: readyClassQueues.put(wq.getClassKey());
0498: } catch (InterruptedException e) {
0499: e.printStackTrace();
0500: System.err.println("unable to ready queue " + wq);
0501: // propagate interrupt up
0502: throw new RuntimeException(e);
0503: }
0504: }
0505:
0506: /**
0507: * Put the given queue on the inactiveQueues queue
0508: * @param wq
0509: */
0510: private void deactivateQueue(WorkQueue wq) {
0511: try {
0512: wq.setSessionBalance(0); // zero out session balance
0513: inactiveQueues.put(wq.getClassKey());
0514: wq.setActive(this , false);
0515: } catch (InterruptedException e) {
0516: e.printStackTrace();
0517: System.err.println("unable to deactivate queue " + wq);
0518: // propagate interrupt up
0519: throw new RuntimeException(e);
0520: }
0521: }
0522:
0523: /**
0524: * Put the given queue on the retiredQueues queue
0525: * @param wq
0526: */
0527: private void retireQueue(WorkQueue wq) {
0528: try {
0529: retiredQueues.put(wq.getClassKey());
0530: decrementQueuedCount(wq.getCount());
0531: wq.setRetired(true);
0532: wq.setActive(this , false);
0533: } catch (InterruptedException e) {
0534: e.printStackTrace();
0535: System.err.println("unable to retire queue " + wq);
0536: // propagate interrupt up
0537: throw new RuntimeException(e);
0538: }
0539: }
0540:
0541: /**
0542: * Accomodate any changes in settings.
0543: *
0544: * @see org.archive.crawler.framework.Frontier#kickUpdate()
0545: */
0546: public void kickUpdate() {
0547: super .kickUpdate();
0548: int target = (Integer) getUncheckedAttribute(null,
0549: ATTR_TARGET_READY_QUEUES_BACKLOG);
0550: if (target < 1) {
0551: target = 1;
0552: }
0553: this .targetSizeForReadyQueues = target;
0554: try {
0555: initCostPolicy();
0556: } catch (FatalConfigurationException fce) {
0557: throw new RuntimeException(fce);
0558: }
0559: // The rules for a 'retired' queue may have changed; so,
0560: // unretire all queues to 'inactive'. If they still qualify
0561: // as retired/overbudget next time they come up, they'll
0562: // be re-retired; if not, they'll get a chance to become
0563: // active under the new rules.
0564: Object key = this .retiredQueues.poll();
0565: while (key != null) {
0566: WorkQueue q = (WorkQueue) this .allQueues.get(key);
0567: if (q != null) {
0568: unretireQueue(q);
0569: }
0570: key = this .retiredQueues.poll();
0571: }
0572: }
0573:
0574: /**
0575: * Restore a retired queue to the 'inactive' state.
0576: *
0577: * @param q
0578: */
0579: private void unretireQueue(WorkQueue q) {
0580: deactivateQueue(q);
0581: q.setRetired(false);
0582: incrementQueuedUriCount(q.getCount());
0583: }
0584:
0585: /**
0586: * Return the work queue for the given CrawlURI's classKey. URIs
0587: * are ordered and politeness-delayed within their 'class'.
0588: * If the requested queue is not found, a new instance is created.
0589: *
0590: * @param curi CrawlURI to base queue on
0591: * @return the found or created ClassKeyQueue
0592: */
0593: protected abstract WorkQueue getQueueFor(CrawlURI curi);
0594:
0595: /**
0596: * Return the work queue for the given classKey, or null
0597: * if no such queue exists.
0598: *
0599: * @param classKey key to look for
0600: * @return the found WorkQueue
0601: */
0602: protected abstract WorkQueue getQueueFor(String classKey);
0603:
0604: /**
0605: * Return the next CrawlURI to be processed (and presumably
0606: * visited/fetched) by a a worker thread.
0607: *
0608: * Relies on the readyClassQueues having been loaded with
0609: * any work queues that are eligible to provide a URI.
0610: *
0611: * @return next CrawlURI to be processed. Or null if none is available.
0612: *
0613: * @see org.archive.crawler.framework.Frontier#next()
0614: */
0615: public CrawlURI next() throws InterruptedException, EndedException {
0616: while (true) { // loop left only by explicit return or exception
0617: long now = System.currentTimeMillis();
0618:
0619: // Do common checks for pause, terminate, bandwidth-hold
0620: preNext(now);
0621:
0622: synchronized (readyClassQueues) {
0623: int activationsNeeded = targetSizeForReadyQueues()
0624: - readyClassQueues.size();
0625: while (activationsNeeded > 0
0626: && !inactiveQueues.isEmpty()) {
0627: activateInactiveQueue();
0628: activationsNeeded--;
0629: }
0630: }
0631:
0632: WorkQueue readyQ = null;
0633: Object key = readyClassQueues.poll(DEFAULT_WAIT,
0634: TimeUnit.MILLISECONDS);
0635: if (key != null) {
0636: readyQ = (WorkQueue) this .allQueues.get(key);
0637: }
0638: if (readyQ != null) {
0639: while (true) { // loop left by explicit return or break on empty
0640: CrawlURI curi = null;
0641: synchronized (readyQ) {
0642: curi = readyQ.peek(this );
0643: if (curi != null) {
0644: // check if curi belongs in different queue
0645: String currentQueueKey = getClassKey(curi);
0646: if (currentQueueKey.equals(curi
0647: .getClassKey())) {
0648: // curi was in right queue, emit
0649: noteAboutToEmit(curi, readyQ);
0650: inProcessQueues.add(readyQ);
0651: return curi;
0652: }
0653: // URI's assigned queue has changed since it
0654: // was queued (eg because its IP has become
0655: // known). Requeue to new queue.
0656: curi.setClassKey(currentQueueKey);
0657: readyQ.dequeue(this );
0658: decrementQueuedCount(1);
0659: curi.setHolderKey(null);
0660: // curi will be requeued to true queue after lock
0661: // on readyQ is released, to prevent deadlock
0662: } else {
0663: // readyQ is empty and ready: it's exhausted
0664: // release held status, allowing any subsequent
0665: // enqueues to again put queue in ready
0666: readyQ.clearHeld();
0667: break;
0668: }
0669: }
0670: if (curi != null) {
0671: // complete the requeuing begun earlier
0672: sendToQueue(curi);
0673: }
0674: }
0675: } else {
0676: // ReadyQ key wasn't in all queues: unexpected
0677: if (key != null) {
0678: logger.severe("Key " + key
0679: + " in readyClassQueues but not allQueues");
0680: }
0681: }
0682:
0683: if (shouldTerminate) {
0684: // skip subsequent steps if already on last legs
0685: throw new EndedException("shouldTerminate is true");
0686: }
0687:
0688: if (inProcessQueues.size() == 0) {
0689: // Nothing was ready or in progress or imminent to wake; ensure
0690: // any piled-up pending-scheduled URIs are considered
0691: this .alreadyIncluded.requestFlush();
0692: }
0693: }
0694: }
0695:
0696: private int targetSizeForReadyQueues() {
0697: return targetSizeForReadyQueues;
0698: }
0699:
0700: /**
0701: * Return the 'cost' of a CrawlURI (how much of its associated
0702: * queue's budget it depletes upon attempted processing)
0703: *
0704: * @param curi
0705: * @return the associated cost
0706: */
0707: private int getCost(CrawlURI curi) {
0708: int cost = curi.getHolderCost();
0709: if (cost == CrawlURI.UNCALCULATED) {
0710: cost = costAssignmentPolicy.costOf(curi);
0711: curi.setHolderCost(cost);
0712: }
0713: return cost;
0714: }
0715:
0716: /**
0717: * Activate an inactive queue, if any are available.
0718: */
0719: private void activateInactiveQueue() {
0720: Object key = this .inactiveQueues.poll();
0721: if (key == null) {
0722: return;
0723: }
0724: WorkQueue candidateQ = (WorkQueue) this .allQueues.get(key);
0725: if (candidateQ != null) {
0726: synchronized (candidateQ) {
0727: replenishSessionBalance(candidateQ);
0728: if (candidateQ.isOverBudget()) {
0729: // if still over-budget after an activation & replenishing,
0730: // retire
0731: retireQueue(candidateQ);
0732: return;
0733: }
0734: long now = System.currentTimeMillis();
0735: long delay_ms = candidateQ.getWakeTime() - now;
0736: if (delay_ms > 0) {
0737: // queue still due for snoozing
0738: snoozeQueue(candidateQ, now, delay_ms);
0739: return;
0740: }
0741: candidateQ.setWakeTime(0); // clear obsolete wake time, if any
0742: readyQueue(candidateQ);
0743: if (logger.isLoggable(Level.FINE)) {
0744: logger.fine("ACTIVATED queue: "
0745: + candidateQ.getClassKey());
0746:
0747: }
0748: }
0749: }
0750: }
0751:
0752: /**
0753: * Replenish the budget of the given queue by the appropriate amount.
0754: *
0755: * @param queue queue to replenish
0756: */
0757: private void replenishSessionBalance(WorkQueue queue) {
0758: // get a CrawlURI for override context purposes
0759: CrawlURI contextUri = queue.peek(this );
0760: // TODO: consider confusing cross-effects of this and IP-based politeness
0761: queue.setSessionBalance(((Integer) getUncheckedAttribute(
0762: contextUri, ATTR_BALANCE_REPLENISH_AMOUNT)).intValue());
0763: // reset total budget (it may have changed)
0764: // TODO: is this the best way to be sensitive to potential mid-crawl changes
0765: long totalBudget = ((Long) getUncheckedAttribute(contextUri,
0766: ATTR_QUEUE_TOTAL_BUDGET)).longValue();
0767: queue.setTotalBudget(totalBudget);
0768: queue.unpeek(); // don't insist on that URI being next released
0769: }
0770:
0771: /**
0772: * Enqueue the given queue to either readyClassQueues or inactiveQueues,
0773: * as appropriate.
0774: *
0775: * @param wq
0776: */
0777: private void reenqueueQueue(WorkQueue wq) {
0778: if (wq.isOverBudget()) {
0779: // if still over budget, deactivate
0780: if (logger.isLoggable(Level.FINE)) {
0781: logger.fine("DEACTIVATED queue: " + wq.getClassKey());
0782: }
0783: deactivateQueue(wq);
0784: } else {
0785: readyQueue(wq);
0786: }
0787: }
0788:
0789: /**
0790: * Wake any queues sitting in the snoozed queue whose time has come.
0791: */
0792: void wakeQueues() {
0793: synchronized (snoozedClassQueues) {
0794: long now = System.currentTimeMillis();
0795: long nextWakeDelay = 0;
0796: int wokenQueuesCount = 0;
0797: while (true) {
0798: if (snoozedClassQueues.isEmpty()) {
0799: return;
0800: }
0801: WorkQueue peek = (WorkQueue) snoozedClassQueues.first();
0802: nextWakeDelay = peek.getWakeTime() - now;
0803: if (nextWakeDelay <= 0) {
0804: snoozedClassQueues.remove(peek);
0805: peek.setWakeTime(0);
0806: reenqueueQueue(peek);
0807: wokenQueuesCount++;
0808: } else {
0809: break;
0810: }
0811: }
0812: this .nextWake = new WakeTask();
0813: this .wakeTimer.schedule(nextWake, nextWakeDelay);
0814: }
0815: }
0816:
0817: /**
0818: * Note that the previously emitted CrawlURI has completed
0819: * its processing (for now).
0820: *
0821: * The CrawlURI may be scheduled to retry, if appropriate,
0822: * and other related URIs may become eligible for release
0823: * via the next next() call, as a result of finished().
0824: *
0825: * (non-Javadoc)
0826: * @see org.archive.crawler.framework.Frontier#finished(org.archive.crawler.datamodel.CrawlURI)
0827: */
0828: public void finished(CrawlURI curi) {
0829: long now = System.currentTimeMillis();
0830:
0831: curi.incrementFetchAttempts();
0832: logLocalizedErrors(curi);
0833: WorkQueue wq = (WorkQueue) curi.getHolder();
0834: assert (wq.peek(this ) == curi) : "unexpected peek " + wq;
0835: inProcessQueues.remove(wq, 1);
0836:
0837: if (includesRetireDirective(curi)) {
0838: // CrawlURI is marked to trigger retirement of its queue
0839: curi.processingCleanup();
0840: wq.unpeek();
0841: wq.update(this , curi); // rewrite any changes
0842: retireQueue(wq);
0843: return;
0844: }
0845:
0846: if (needsRetrying(curi)) {
0847: // Consider errors which can be retried, leaving uri atop queue
0848: if (curi.getFetchStatus() != S_DEFERRED) {
0849: wq.expend(getCost(curi)); // all retries but DEFERRED cost
0850: }
0851: long delay_sec = retryDelayFor(curi);
0852: curi.processingCleanup(); // lose state that shouldn't burden retry
0853: synchronized (wq) {
0854: wq.unpeek();
0855: // TODO: consider if this should happen automatically inside unpeek()
0856: wq.update(this , curi); // rewrite any changes
0857: if (delay_sec > 0) {
0858: long delay_ms = delay_sec * 1000;
0859: snoozeQueue(wq, now, delay_ms);
0860: } else {
0861: reenqueueQueue(wq);
0862: }
0863: }
0864: // Let everyone interested know that it will be retried.
0865: controller.fireCrawledURINeedRetryEvent(curi);
0866: doJournalRescheduled(curi);
0867: return;
0868: }
0869:
0870: // Curi will definitely be disposed of without retry, so remove from queue
0871: wq.dequeue(this );
0872: decrementQueuedCount(1);
0873: log(curi);
0874:
0875: if (curi.isSuccess()) {
0876: totalProcessedBytes += curi.getRecordedSize();
0877: incrementSucceededFetchCount();
0878: // Let everyone know in case they want to do something before we strip the curi.
0879: controller.fireCrawledURISuccessfulEvent(curi);
0880: doJournalFinishedSuccess(curi);
0881: wq.expend(getCost(curi)); // successes cost
0882: } else if (isDisregarded(curi)) {
0883: // Check for codes that mean that while we the crawler did
0884: // manage to schedule it, it must be disregarded for some reason.
0885: incrementDisregardedUriCount();
0886: // Let interested listeners know of disregard disposition.
0887: controller.fireCrawledURIDisregardEvent(curi);
0888: // if exception, also send to crawlErrors
0889: if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) {
0890: Object[] array = { curi };
0891: controller.runtimeErrors.log(Level.WARNING, curi
0892: .getUURI().toString(), array);
0893: }
0894: // TODO: consider reinstating forget-uri
0895: } else {
0896: // In that case FAILURE, note & log
0897: //Let interested listeners know of failed disposition.
0898: this .controller.fireCrawledURIFailureEvent(curi);
0899: // if exception, also send to crawlErrors
0900: if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) {
0901: Object[] array = { curi };
0902: this .controller.runtimeErrors.log(Level.WARNING, curi
0903: .getUURI().toString(), array);
0904: }
0905: incrementFailedFetchCount();
0906: // let queue note error
0907: wq.noteError(((Integer) getUncheckedAttribute(curi,
0908: ATTR_ERROR_PENALTY_AMOUNT)).intValue());
0909: doJournalFinishedFailure(curi);
0910: wq.expend(getCost(curi)); // failures cost
0911: }
0912:
0913: long delay_ms = politenessDelayFor(curi);
0914: synchronized (wq) {
0915: if (delay_ms > 0) {
0916: snoozeQueue(wq, now, delay_ms);
0917: } else {
0918: reenqueueQueue(wq);
0919: }
0920: }
0921:
0922: curi.stripToMinimal();
0923: curi.processingCleanup();
0924:
0925: }
0926:
0927: private boolean includesRetireDirective(CrawlURI curi) {
0928: return curi.containsKey(A_FORCE_RETIRE)
0929: && (Boolean) curi.getObject(A_FORCE_RETIRE);
0930: }
0931:
0932: /**
0933: * Place the given queue into 'snoozed' state, ineligible to
0934: * supply any URIs for crawling, for the given amount of time.
0935: *
0936: * @param wq queue to snooze
0937: * @param now time now in ms
0938: * @param delay_ms time to snooze in ms
0939: */
0940: private void snoozeQueue(WorkQueue wq, long now, long delay_ms) {
0941: long nextTime = now + delay_ms;
0942: wq.setWakeTime(nextTime);
0943: long snoozeToInactiveDelayMs = ((Long) getUncheckedAttribute(
0944: null, ATTR_SNOOZE_DEACTIVATE_MS)).longValue();
0945: if (delay_ms > snoozeToInactiveDelayMs
0946: && !inactiveQueues.isEmpty()) {
0947: deactivateQueue(wq);
0948: } else {
0949: synchronized (snoozedClassQueues) {
0950: snoozedClassQueues.add(wq);
0951: if (wq == snoozedClassQueues.first()) {
0952: this .nextWake = new WakeTask();
0953: this .wakeTimer.schedule(nextWake, delay_ms);
0954: }
0955: }
0956: }
0957: }
0958:
0959: /**
0960: * Forget the given CrawlURI. This allows a new instance
0961: * to be created in the future, if it is reencountered under
0962: * different circumstances.
0963: *
0964: * @param curi The CrawlURI to forget
0965: */
0966: protected void forget(CrawlURI curi) {
0967: logger.finer("Forgetting " + curi);
0968: alreadyIncluded.forget(canonicalize(curi.getUURI()), curi);
0969: }
0970:
0971: /** (non-Javadoc)
0972: * @see org.archive.crawler.framework.Frontier#discoveredUriCount()
0973: */
0974: public long discoveredUriCount() {
0975: return (this .alreadyIncluded != null) ? this .alreadyIncluded
0976: .count() : 0;
0977: }
0978:
0979: /**
0980: * @param match String to match.
0981: * @return Number of items deleted.
0982: */
0983: public long deleteURIs(String match) {
0984: long count = 0;
0985: // TODO: DANGER/ values() may not work right from CachedBdbMap
0986: Iterator iter = allQueues.keySet().iterator();
0987: while (iter.hasNext()) {
0988: WorkQueue wq = getQueueFor(((String) iter.next()));
0989: wq.unpeek();
0990: count += wq.deleteMatching(this , match);
0991: }
0992: decrementQueuedCount(count);
0993: return count;
0994: }
0995:
0996: //
0997: // Reporter implementation
0998: //
0999:
1000: public static String STANDARD_REPORT = "standard";
1001: public static String ALL_NONEMPTY = "nonempty";
1002: public static String ALL_QUEUES = "all";
1003: protected static String[] REPORTS = { STANDARD_REPORT,
1004: ALL_NONEMPTY, ALL_QUEUES };
1005:
1006: public String[] getReports() {
1007: return REPORTS;
1008: }
1009:
1010: /**
1011: * @param w Where to write to.
1012: */
1013: public void singleLineReportTo(PrintWriter w) {
1014: if (this .allQueues == null) {
1015: return;
1016: }
1017: int allCount = allQueues.size();
1018: int inProcessCount = inProcessQueues.uniqueSet().size();
1019: int readyCount = readyClassQueues.size();
1020: int snoozedCount = snoozedClassQueues.size();
1021: int activeCount = inProcessCount + readyCount + snoozedCount;
1022: int inactiveCount = inactiveQueues.size();
1023: int retiredCount = retiredQueues.size();
1024: int exhaustedCount = allCount - activeCount - inactiveCount
1025: - retiredCount;
1026: w.print(allCount);
1027: w.print(" queues: ");
1028: w.print(activeCount);
1029: w.print(" active (");
1030: w.print(inProcessCount);
1031: w.print(" in-process; ");
1032: w.print(readyCount);
1033: w.print(" ready; ");
1034: w.print(snoozedCount);
1035: w.print(" snoozed); ");
1036: w.print(inactiveCount);
1037: w.print(" inactive; ");
1038: w.print(retiredCount);
1039: w.print(" retired; ");
1040: w.print(exhaustedCount);
1041: w.print(" exhausted");
1042: w.flush();
1043: }
1044:
1045: /* (non-Javadoc)
1046: * @see org.archive.util.Reporter#singleLineLegend()
1047: */
1048: public String singleLineLegend() {
1049: return "total active in-process ready snoozed inactive retired exhausted";
1050: }
1051:
1052: /**
1053: * This method compiles a human readable report on the status of the frontier
1054: * at the time of the call.
1055: * @param name Name of report.
1056: * @param writer Where to write to.
1057: */
1058: public synchronized void reportTo(String name, PrintWriter writer) {
1059: if (ALL_NONEMPTY.equals(name)) {
1060: allNonemptyReportTo(writer);
1061: return;
1062: }
1063: if (ALL_QUEUES.equals(name)) {
1064: allQueuesReportTo(writer);
1065: return;
1066: }
1067: if (name != null && !STANDARD_REPORT.equals(name)) {
1068: writer.print(name);
1069: writer.print(" unavailable; standard report:\n");
1070: }
1071: standardReportTo(writer);
1072: }
1073:
1074: /** Compact report of all nonempty queues (one queue per line)
1075: *
1076: * @param writer
1077: */
1078: private void allNonemptyReportTo(PrintWriter writer) {
1079: ArrayList<WorkQueue> inProcessQueuesCopy;
1080: synchronized (this .inProcessQueues) {
1081: // grab a copy that will be stable against mods for report duration
1082: @SuppressWarnings("unchecked")
1083: Collection<WorkQueue> inProcess = this .inProcessQueues;
1084: inProcessQueuesCopy = new ArrayList<WorkQueue>(inProcess);
1085: }
1086: writer.print("\n -----===== IN-PROCESS QUEUES =====-----\n");
1087: queueSingleLinesTo(writer, inProcessQueuesCopy.iterator());
1088:
1089: writer.print("\n -----===== READY QUEUES =====-----\n");
1090: queueSingleLinesTo(writer, this .readyClassQueues.iterator());
1091:
1092: writer.print("\n -----===== SNOOZED QUEUES =====-----\n");
1093: queueSingleLinesTo(writer, this .snoozedClassQueues.iterator());
1094:
1095: writer.print("\n -----===== INACTIVE QUEUES =====-----\n");
1096: queueSingleLinesTo(writer, this .inactiveQueues.iterator());
1097:
1098: writer.print("\n -----===== RETIRED QUEUES =====-----\n");
1099: queueSingleLinesTo(writer, this .retiredQueues.iterator());
1100: }
1101:
1102: /** Compact report of all nonempty queues (one queue per line)
1103: *
1104: * @param writer
1105: */
1106: private void allQueuesReportTo(PrintWriter writer) {
1107: queueSingleLinesTo(writer, allQueues.keySet().iterator());
1108: }
1109:
1110: /**
1111: * Writer the single-line reports of all queues in the
1112: * iterator to the writer
1113: *
1114: * @param writer to receive report
1115: * @param iterator over queues of interest.
1116: */
1117: private void queueSingleLinesTo(PrintWriter writer,
1118: Iterator iterator) {
1119: Object obj;
1120: WorkQueue q;
1121: boolean legendWritten = false;
1122: while (iterator.hasNext()) {
1123: obj = iterator.next();
1124: if (obj == null) {
1125: continue;
1126: }
1127: q = (obj instanceof WorkQueue) ? (WorkQueue) obj
1128: : (WorkQueue) this .allQueues.get(obj);
1129: if (q == null) {
1130: writer.print(" ERROR: " + obj);
1131: }
1132: if (!legendWritten) {
1133: writer.println(q.singleLineLegend());
1134: legendWritten = true;
1135: }
1136: q.singleLineReportTo(writer);
1137: }
1138: }
1139:
1140: /**
1141: * @param w Writer to print to.
1142: */
1143: private void standardReportTo(PrintWriter w) {
1144: int allCount = allQueues.size();
1145: int inProcessCount = inProcessQueues.uniqueSet().size();
1146: int readyCount = readyClassQueues.size();
1147: int snoozedCount = snoozedClassQueues.size();
1148: int activeCount = inProcessCount + readyCount + snoozedCount;
1149: int inactiveCount = inactiveQueues.size();
1150: int retiredCount = retiredQueues.size();
1151: int exhaustedCount = allCount - activeCount - inactiveCount
1152: - retiredCount;
1153:
1154: w.print("Frontier report - ");
1155: w.print(ArchiveUtils.get12DigitDate());
1156: w.print("\n");
1157: w.print(" Job being crawled: ");
1158: w.print(controller.getOrder().getCrawlOrderName());
1159: w.print("\n");
1160: w.print("\n -----===== STATS =====-----\n");
1161: w.print(" Discovered: ");
1162: w.print(Long.toString(discoveredUriCount()));
1163: w.print("\n");
1164: w.print(" Queued: ");
1165: w.print(Long.toString(queuedUriCount()));
1166: w.print("\n");
1167: w.print(" Finished: ");
1168: w.print(Long.toString(finishedUriCount()));
1169: w.print("\n");
1170: w.print(" Successfully: ");
1171: w.print(Long.toString(succeededFetchCount()));
1172: w.print("\n");
1173: w.print(" Failed: ");
1174: w.print(Long.toString(failedFetchCount()));
1175: w.print("\n");
1176: w.print(" Disregarded: ");
1177: w.print(Long.toString(disregardedUriCount()));
1178: w.print("\n");
1179: w.print("\n -----===== QUEUES =====-----\n");
1180: w.print(" Already included size: ");
1181: w.print(Long.toString(alreadyIncluded.count()));
1182: w.print("\n");
1183: w.print(" pending: ");
1184: w.print(Long.toString(alreadyIncluded.pending()));
1185: w.print("\n");
1186: w.print("\n All class queues map size: ");
1187: w.print(Long.toString(allCount));
1188: w.print("\n");
1189: w.print(" Active queues: ");
1190: w.print(activeCount);
1191: w.print("\n");
1192: w.print(" In-process: ");
1193: w.print(inProcessCount);
1194: w.print("\n");
1195: w.print(" Ready: ");
1196: w.print(readyCount);
1197: w.print("\n");
1198: w.print(" Snoozed: ");
1199: w.print(snoozedCount);
1200: w.print("\n");
1201: w.print(" Inactive queues: ");
1202: w.print(inactiveCount);
1203: w.print("\n");
1204: w.print(" Retired queues: ");
1205: w.print(retiredCount);
1206: w.print("\n");
1207: w.print(" Exhausted queues: ");
1208: w.print(exhaustedCount);
1209: w.print("\n");
1210:
1211: w.print("\n -----===== IN-PROCESS QUEUES =====-----\n");
1212: @SuppressWarnings("unchecked")
1213: Collection<WorkQueue> inProcess = inProcessQueues;
1214: ArrayList<WorkQueue> copy = extractSome(inProcess,
1215: REPORT_MAX_QUEUES);
1216: appendQueueReports(w, copy.iterator(), copy.size(),
1217: REPORT_MAX_QUEUES);
1218:
1219: w.print("\n -----===== READY QUEUES =====-----\n");
1220: appendQueueReports(w, this .readyClassQueues.iterator(),
1221: this .readyClassQueues.size(), REPORT_MAX_QUEUES);
1222:
1223: w.print("\n -----===== SNOOZED QUEUES =====-----\n");
1224: copy = extractSome(snoozedClassQueues, REPORT_MAX_QUEUES);
1225: appendQueueReports(w, copy.iterator(), copy.size(),
1226: REPORT_MAX_QUEUES);
1227:
1228: WorkQueue longest = longestActiveQueue;
1229: if (longest != null) {
1230: w.print("\n -----===== LONGEST QUEUE =====-----\n");
1231: longest.reportTo(w);
1232: }
1233:
1234: w.print("\n -----===== INACTIVE QUEUES =====-----\n");
1235: appendQueueReports(w, this .inactiveQueues.iterator(),
1236: this .inactiveQueues.size(), REPORT_MAX_QUEUES);
1237:
1238: w.print("\n -----===== RETIRED QUEUES =====-----\n");
1239: appendQueueReports(w, this .retiredQueues.iterator(),
1240: this .retiredQueues.size(), REPORT_MAX_QUEUES);
1241:
1242: w.flush();
1243: }
1244:
1245: /**
1246: * Extract some of the elements in the given collection to an
1247: * ArrayList. This method synchronizes on the given collection's
1248: * monitor. The returned list will never contain more than the
1249: * specified maximum number of elements.
1250: *
1251: * @param c the collection whose elements to extract
1252: * @param max the maximum number of elements to extract
1253: * @return the extraction
1254: */
1255: private static <T> ArrayList<T> extractSome(Collection<T> c, int max) {
1256: // Try to guess a sane initial capacity for ArrayList
1257: // Hopefully given collection won't grow more than 10 items
1258: // between now and the synchronized block...
1259: int initial = Math.min(c.size() + 10, max);
1260: int count = 0;
1261: ArrayList<T> list = new ArrayList<T>(initial);
1262: synchronized (c) {
1263: Iterator<T> iter = c.iterator();
1264: while (iter.hasNext() && (count < max)) {
1265: list.add(iter.next());
1266: count++;
1267: }
1268: }
1269: return list;
1270: }
1271:
1272: /**
1273: * Append queue report to general Frontier report.
1274: * @param w StringBuffer to append to.
1275: * @param iterator An iterator over
1276: * @param total
1277: * @param max
1278: */
1279: protected void appendQueueReports(PrintWriter w, Iterator iterator,
1280: int total, int max) {
1281: Object obj;
1282: WorkQueue q;
1283: for (int count = 0; iterator.hasNext() && (count < max); count++) {
1284: obj = iterator.next();
1285: if (obj == null) {
1286: continue;
1287: }
1288: q = (obj instanceof WorkQueue) ? (WorkQueue) obj
1289: : (WorkQueue) this .allQueues.get(obj);
1290: if (q == null) {
1291: w.print("WARNING: No report for queue " + obj);
1292: }
1293: q.reportTo(w);
1294: }
1295: if (total > max) {
1296: w.print("...and " + (total - max) + " more.\n");
1297: }
1298: }
1299:
1300: /**
1301: * Force logging, etc. of operator- deleted CrawlURIs
1302: *
1303: * @see org.archive.crawler.framework.Frontier#deleted(org.archive.crawler.datamodel.CrawlURI)
1304: */
1305: public synchronized void deleted(CrawlURI curi) {
1306: //treat as disregarded
1307: controller.fireCrawledURIDisregardEvent(curi);
1308: log(curi);
1309: incrementDisregardedUriCount();
1310: curi.stripToMinimal();
1311: curi.processingCleanup();
1312: }
1313:
1314: public void considerIncluded(UURI u) {
1315: this .alreadyIncluded.note(canonicalize(u));
1316: CrawlURI temp = new CrawlURI(u);
1317: temp.setClassKey(getClassKey(temp));
1318: getQueueFor(temp).expend(getCost(temp));
1319: }
1320:
1321: protected abstract void initQueue() throws IOException;
1322:
1323: protected abstract void closeQueue() throws IOException;
1324:
1325: /**
1326: * Returns <code>true</code> if the WorkQueue implementation of this
1327: * Frontier stores its workload on disk instead of relying
1328: * on serialization mechanisms.
1329: *
1330: * @return a constant boolean value for this class/instance
1331: */
1332: protected abstract boolean workQueueDataOnDisk();
1333:
1334: public FrontierGroup getGroup(CrawlURI curi) {
1335: return getQueueFor(curi);
1336: }
1337:
1338: public long averageDepth() {
1339: int inProcessCount = inProcessQueues.uniqueSet().size();
1340: int readyCount = readyClassQueues.size();
1341: int snoozedCount = snoozedClassQueues.size();
1342: int activeCount = inProcessCount + readyCount + snoozedCount;
1343: int inactiveCount = inactiveQueues.size();
1344: int totalQueueCount = (activeCount + inactiveCount);
1345: return (totalQueueCount == 0) ? 0 : queuedUriCount
1346: / totalQueueCount;
1347: }
1348:
1349: public float congestionRatio() {
1350: int inProcessCount = inProcessQueues.uniqueSet().size();
1351: int readyCount = readyClassQueues.size();
1352: int snoozedCount = snoozedClassQueues.size();
1353: int activeCount = inProcessCount + readyCount + snoozedCount;
1354: int inactiveCount = inactiveQueues.size();
1355: return (float) (activeCount + inactiveCount)
1356: / (inProcessCount + snoozedCount);
1357: }
1358:
1359: public long deepestUri() {
1360: return longestActiveQueue == null ? -1 : longestActiveQueue
1361: .getCount();
1362: }
1363:
1364: /* (non-Javadoc)
1365: * @see org.archive.crawler.framework.Frontier#isEmpty()
1366: */
1367: public synchronized boolean isEmpty() {
1368: return queuedUriCount == 0 && alreadyIncluded.pending() == 0;
1369: }
1370: }
|