0001: /* AdaptiveRevisitHostQueue
0002: *
0003: * Created on Sep 13, 2004
0004: *
0005: * Copyright (C) 2004 Kristinn Sigur?sson.
0006: *
0007: * This file is part of the Heritrix web crawler (crawler.archive.org).
0008: *
0009: * Heritrix is free software; you can redistribute it and/or modify
0010: * it under the terms of the GNU Lesser Public License as published by
0011: * the Free Software Foundation; either version 2.1 of the License, or
0012: * any later version.
0013: *
0014: * Heritrix is distributed in the hope that it will be useful,
0015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
0017: * GNU Lesser Public License for more details.
0018: *
0019: * You should have received a copy of the GNU Lesser Public License
0020: * along with Heritrix; if not, write to the Free Software
0021: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0022: */
0023: package org.archive.crawler.frontier;
0024:
0025: import java.io.IOException;
0026: import java.util.logging.Level;
0027: import java.util.logging.Logger;
0028:
0029: import org.archive.crawler.datamodel.CandidateURI;
0030: import org.archive.crawler.datamodel.CrawlSubstats;
0031: import org.archive.crawler.datamodel.CrawlURI;
0032: import org.archive.crawler.framework.Frontier.FrontierGroup;
0033: import org.archive.util.ArchiveUtils;
0034:
0035: import com.sleepycat.bind.EntryBinding;
0036: import com.sleepycat.bind.serial.ClassCatalog;
0037: import com.sleepycat.bind.serial.SerialBinding;
0038: import com.sleepycat.bind.serial.StoredClassCatalog;
0039: import com.sleepycat.bind.serial.TupleSerialKeyCreator;
0040: import com.sleepycat.bind.tuple.StringBinding;
0041: import com.sleepycat.bind.tuple.TupleBinding;
0042: import com.sleepycat.bind.tuple.TupleInput;
0043: import com.sleepycat.bind.tuple.TupleOutput;
0044: import com.sleepycat.je.Cursor;
0045: import com.sleepycat.je.Database;
0046: import com.sleepycat.je.DatabaseConfig;
0047: import com.sleepycat.je.DatabaseEntry;
0048: import com.sleepycat.je.DatabaseException;
0049: import com.sleepycat.je.Environment;
0050: import com.sleepycat.je.LockMode;
0051: import com.sleepycat.je.OperationStatus;
0052: import com.sleepycat.je.SecondaryConfig;
0053: import com.sleepycat.je.SecondaryDatabase;
0054:
0055: /**
0056: * A priority based queue of CrawlURIs. Each queue should represent
0057: * one host (although this is not enforced in this class). Items are ordered
0058: * by the scheduling directive and time of next processing (in that order)
0059: * and also indexed by the URI.
0060: * <p>
0061: * The HQ does no calculations on the 'time of next processing.' It always
0062: * relies on values already set on the CrawlURI.
0063: * <p>
0064: * Note: Class is not 'thread safe.' In multi threaded environment the caller
0065: * must ensure that two threads do not make overlapping calls.
0066: * <p>
0067: * Any BDB DatabaseException will be converted to an IOException by public
0068: * methods. This includes preserving the original stacktrace, in favor of the
0069: * one created for the IOException, so that the true source of the exception
0070: * is not lost.
0071: *
0072: * @author Kristinn Sigurdsson
0073: */
0074: public class AdaptiveRevisitHostQueue implements
0075: AdaptiveRevisitAttributeConstants, FrontierGroup {
0076:
0077: // TODO: Need to be able to remove URIs, both by name and reg.expr.
0078:
0079: // Constants declerations
0080: /** HQ contains no queued CrawlURIs elements. This state only occurs after
0081: * queue creation before the first add. After the first item is added the
0082: * state can never become empty again. */
0083: public static final int HQSTATE_EMPTY = 0;
0084: /** HQ has a CrawlURI ready for processing */
0085: public static final int HQSTATE_READY = 1;
0086: /** HQ has maximum number of CrawlURI currently being processed. This number
0087: * is either equal to the 'valence' (maximum number of simultanious
0088: * connections to a host) or (if smaller) the total number of CrawlURIs
0089: * in the HQ. */
0090: public static final int HQSTATE_BUSY = 2;
0091: /** HQ is in a suspended state until it can be woken back up */
0092: public static final int HQSTATE_SNOOZED = 3;
0093:
0094: // Internal class variables
0095: /** Name of the host that this AdaptiveRevisitHostQueue represents */
0096: final String hostName;
0097: /** Last known state of HQ -- ALL methods should use getState() to read
0098: * this value, never read it directly. */
0099: int state;
0100: /** Time (in milliseconds) when the HQ will next be ready to issue a URI
0101: * for processing. When setting this value, methods should use the
0102: * setter method {@link #setNextReadyTime(long) setNextReadyTime()}
0103: */
0104: long nextReadyTime;
0105: /** Time (in milliseconds) when each URI 'slot' becomes available again.<p>
0106: * Any positive value larger then the current time signifies a taken slot
0107: * where the URI has completed processing but the politness wait has not
0108: * ended. <p>
0109: * A zero or positive value smaller then the current time in milliseconds
0110: * signifies an empty slot.<p>
0111: * Any negative value signifies a slot for a URI that is being processed.
0112: * <p>
0113: * Methods should never write directly to this, rather use the
0114: * {@link #updateWakeUpTimeSlot(long) updateWakeUpTimeSlot()} and
0115: * {@link #useWakeUpTimeSlot() useWakeUpTimeSlot()} methods as needed.
0116: */
0117: long[] wakeUpTime;
0118: /** Number of simultanious connections permitted to this host. I.e. this
0119: * many URIs can be issued before state of HQ becomes busy until one of
0120: * them is returned via the update method. */
0121: int valence;
0122: /**
0123: * Size of queue. That is, the number of CrawlURIs that have been added to
0124: * it, including any that are currently being processed.
0125: */
0126: long size;
0127: /** Number of URIs belonging to this queue that are being processed at the
0128: * moment. This number will always be in the range of 0 - valence
0129: */
0130: long inProcessing;
0131: /** The AdaptiveRevisitHostQueueList that contains this class. This
0132: * reference is
0133: * maintained to inform the owning class of changes to the sort order
0134: * value. Value may be null, in which case no notices are made.*/
0135: private AdaptiveRevisitQueueList owner;
0136: /** Logger */
0137: private static final Logger logger = Logger
0138: .getLogger(AdaptiveRevisitHostQueue.class.getName());
0139:
0140: protected CrawlSubstats substats = new CrawlSubstats();
0141:
0142: // Berkeley DB - All class member variables related to BDB JE
0143: // Databases
0144: /** Database containing the URI priority queue, indexed by the the
0145: * URI string. */
0146: protected Database primaryUriDB;
0147: /** Secondary index into {@link #primaryUriDB the primary DB}, URIs indexed
0148: * by the time when they can next be processed again. */
0149: protected SecondaryDatabase secondaryUriDB;
0150: /** A database containing those URIs that are currently being processed. */
0151: protected Database processingUriDB;
0152: // Serialization support
0153: /** For BDB serialization of objects */
0154: protected StoredClassCatalog classCatalog;
0155: /** A binding for the serialization of the primary key (URI string) */
0156: protected EntryBinding primaryKeyBinding;
0157: /** A binding for the CrawlURIARWrapper object */
0158: protected EntryBinding crawlURIBinding;
0159:
0160: // Cursors into databases
0161:
0162: /**
0163: * Constructor
0164: *
0165: * @param hostName Name of the host this queue represents. This name must
0166: * be unique for all HQs in the same Environment.
0167: * @param env Berkeley DB Environment. All BDB databases created will use
0168: * it.
0169: * @param catalog Db for bdb class serialization.
0170: * @param valence The total number of simultanous URIs that the HQ can issue
0171: * for processing. Once this many URIs have been issued for
0172: * processing, the HQ will go into {@link #HQSTATE_BUSY busy}
0173: * state until at least one of the URI is
0174: * {@link #update(CrawlURI, boolean, long) updated}.
0175: * Value should be larger then zero. Zero and negative values
0176: * will be treated same as 1.
0177: *
0178: * @throws IOException if an error occurs opening/creating the
0179: * database
0180: */
0181: public AdaptiveRevisitHostQueue(String hostName, Environment env,
0182: StoredClassCatalog catalog, int valence) throws IOException {
0183: try {
0184: if (valence < 1) {
0185: this .valence = 1;
0186: } else {
0187: this .valence = valence;
0188: }
0189: wakeUpTime = new long[valence];
0190: for (int i = 0; i < valence; i++) {
0191: wakeUpTime[i] = 0; // 0 means open slot.
0192: }
0193:
0194: inProcessing = 0;
0195:
0196: this .hostName = hostName;
0197:
0198: state = HQSTATE_EMPTY; //HQ is initially empty.
0199: nextReadyTime = Long.MAX_VALUE; //Empty and busy HQ get this value.
0200:
0201: // Set up the primary URI database, it is indexed by URI names
0202: DatabaseConfig dbConfig = new DatabaseConfig();
0203: dbConfig.setTransactional(false);
0204: dbConfig.setAllowCreate(true);
0205: primaryUriDB = env.openDatabase(null, hostName, dbConfig);
0206:
0207: this .classCatalog = catalog;
0208:
0209: // Set up a DB for storing URIs being processed
0210: DatabaseConfig dbConfig2 = new DatabaseConfig();
0211: dbConfig2.setTransactional(false);
0212: dbConfig2.setAllowCreate(true);
0213: processingUriDB = env.openDatabase(null, hostName
0214: + "/processing", dbConfig2);
0215:
0216: // Create a primitive binding for the primary key (URI string)
0217: primaryKeyBinding = TupleBinding
0218: .getPrimitiveBinding(String.class);
0219: // Create a serial binding for the CrawlURI object
0220: crawlURIBinding = new SerialBinding(classCatalog,
0221: CrawlURI.class);
0222:
0223: // Open a secondary database to allow accessing the primary
0224: // database by the secondary key value.
0225: SecondaryConfig secConfig = new SecondaryConfig();
0226: secConfig.setAllowCreate(true);
0227: secConfig.setSortedDuplicates(true);
0228: secConfig.setKeyCreator(new OrderOfProcessingKeyCreator(
0229: classCatalog, CrawlURI.class));
0230: secondaryUriDB = env.openSecondaryDatabase(null, hostName
0231: + "/timeOfProcessing", primaryUriDB, secConfig);
0232:
0233: // Check if we are opening an existing DB...
0234: size = countCrawlURIs();
0235: if (size > 0) {
0236: // If size > 0 then we just opened an existing DB.
0237: // Set nextReadyTime;
0238: nextReadyTime = peek().getLong(
0239: A_TIME_OF_NEXT_PROCESSING);
0240: // Move any items in processingUriDB into the primariUriDB, ensure
0241: // that they wind up on top!
0242: flushProcessingURIs();
0243: state = HQSTATE_READY;
0244: }
0245: } catch (DatabaseException e) {
0246: // Blanket catch all DBExceptions and convert to IOExceptions.
0247: IOException e2 = new IOException(e.getMessage());
0248: e2.setStackTrace(e.getStackTrace());
0249: throw e2;
0250: }
0251: }
0252:
0253: /**
0254: * Returns the HQ's name
0255: * @return the HQ's name
0256: */
0257: public String getHostName() {
0258: return hostName;
0259: }
0260:
0261: /**
0262: * Add a CrawlURI to this host queue.
0263: * <p>
0264: * Calls can optionally chose to have the time of next processing value
0265: * override existing values for the URI if the existing values are 'later'
0266: * then the new ones.
0267: *
0268: * @param curi The CrawlURI to add.
0269: * @param overrideSetTimeOnDups If true then the time of next processing for
0270: * the supplied URI will override the any
0271: * existing time for it already stored in the HQ.
0272: * If false, then no changes will be made to any
0273: * existing values of the URI. Note: Will never
0274: * override with a later time.
0275: * @throws IOException When an error occurs accessing the database
0276: */
0277: public void add(CrawlURI curi, boolean overrideSetTimeOnDups)
0278: throws IOException {
0279: if (logger.isLoggable(Level.FINER)) {
0280: logger.finer("Adding " + curi.toString());
0281: }
0282: try {
0283: if (inProcessing(curi.toString())) {
0284: // If it is currently being processed, then it is already been
0285: // added and we sure as heck can't fetch it any sooner!
0286: return;
0287: }
0288:
0289: OperationStatus opStatus = strictAdd(curi, false);
0290:
0291: long curiProcessingTime = curi
0292: .getLong(A_TIME_OF_NEXT_PROCESSING);
0293:
0294: if (opStatus == OperationStatus.KEYEXIST) {
0295: // Override an existing URI
0296: // We need to extract the old CrawlURI (it contains vital
0297: // info on past crawls), check its scheduling directive
0298: // and (possibly) its time of next fetch and update if it
0299: // will promote the URI to an earlier processing time.
0300: boolean update = false;
0301: CrawlURI curiExisting = getCrawlURI(curi.toString());
0302: long oldCuriProcessingTime = curiExisting
0303: .getLong(A_TIME_OF_NEXT_PROCESSING);
0304: if (curi.getSchedulingDirective() < curiExisting
0305: .getSchedulingDirective()) {
0306: // New scheduling directive is of higher importance,
0307: // update to promote URI.
0308: curiExisting.setSchedulingDirective(curi
0309: .getSchedulingDirective());
0310: update = true;
0311: }
0312: if ((curiProcessingTime < oldCuriProcessingTime)
0313: && (overrideSetTimeOnDups || update)) {
0314: // We update the processing time if it is earlier then
0315: // the original and either overrideSetTimeOnDups was set
0316: // or update is true, meaning a higher priority scheduling
0317: // directive for this URI.
0318: curiExisting.putLong(A_TIME_OF_NEXT_PROCESSING,
0319: curiProcessingTime);
0320: update = true;
0321: }
0322: if (update) {
0323: opStatus = strictAdd(curiExisting, true); //Override
0324: } else {
0325: return;
0326: }
0327: } else if (opStatus == OperationStatus.SUCCESS) {
0328: // Just inserted a brand new CrawlURI into the queue.
0329: size++;
0330: }
0331:
0332: // Finally, check if insert (fresh add or override) into DB was
0333: // successful and if so check if we need to update nextReadyTime.
0334: if (opStatus == OperationStatus.SUCCESS) {
0335: if (curiProcessingTime < nextReadyTime) {
0336: // Update nextReadyTime to reflect new value.
0337: setNextReadyTime(curiProcessingTime);
0338: }
0339: if (state == HQSTATE_EMPTY) {
0340: // Definately no longer empty.
0341: state = HQSTATE_READY;
0342: }
0343: } else {
0344: // Something went wrong. Throw an exception.
0345: throw new DatabaseException(
0346: "Error on add into database for " + "CrawlURI "
0347: + curi.toString() + ". "
0348: + opStatus.toString());
0349: }
0350: } catch (DatabaseException e) {
0351: // Blanket catch all DBExceptions and convert to IOExceptions.
0352: IOException e2 = new IOException(e.getMessage());
0353: e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
0354: throw e2;
0355: }
0356: reorder(); // May need a reorder.
0357: }
0358:
0359: /**
0360: * An internal method for adding URIs to the queue.
0361: *
0362: * @param curi The CrawlURI to add
0363: * @param overrideDuplicates If true then any existing CrawlURI in the DB
0364: * will be overwritten. If false insert into the
0365: * queue is only performed if the key doesn't
0366: * already exist.
0367: * @return The OperationStatus object returned by the put method.
0368: *
0369: * @throws DatabaseException
0370: */
0371: protected OperationStatus strictAdd(CrawlURI curi,
0372: boolean overrideDuplicates) throws DatabaseException {
0373: DatabaseEntry keyEntry = new DatabaseEntry();
0374: DatabaseEntry dataEntry = new DatabaseEntry();
0375: primaryKeyBinding.objectToEntry(curi.toString(), keyEntry);
0376: crawlURIBinding.objectToEntry(curi, dataEntry);
0377: OperationStatus opStatus = null;
0378: if (overrideDuplicates) {
0379: opStatus = primaryUriDB.put(null, keyEntry, dataEntry);
0380: } else {
0381: opStatus = primaryUriDB.putNoOverwrite(null, keyEntry,
0382: dataEntry);
0383: }
0384:
0385: return opStatus;
0386: }
0387:
0388: /**
0389: * Flush any CrawlURIs in the processingUriDB into the primaryUriDB. URIs
0390: * flushed will have their 'time of next fetch' maintained and the
0391: * nextReadyTime will be updated if needed.
0392: * <p>
0393: * No change is made to the list of available slots.
0394: *
0395: * @throws DatabaseException if one occurs while flushing
0396: */
0397: protected void flushProcessingURIs() throws DatabaseException {
0398: Cursor processingCursor = processingUriDB
0399: .openCursor(null, null);
0400: DatabaseEntry keyEntry = new DatabaseEntry();
0401: DatabaseEntry dataEntry = new DatabaseEntry();
0402:
0403: while (true) {
0404: OperationStatus opStatus = processingCursor.getFirst(
0405: keyEntry, dataEntry, LockMode.DEFAULT);
0406:
0407: if (opStatus == OperationStatus.SUCCESS) {
0408: // Got one!
0409: CrawlURI curi = (CrawlURI) crawlURIBinding
0410: .entryToObject(dataEntry);
0411: // Delete it from processingUriDB
0412: deleteInProcessing(curi.toString());
0413: // Add to processingUriDB;
0414: strictAdd(curi, false); // Ignore any duplicates. Go with the
0415: // ones already in the queue.
0416: // Update nextReadyTime if needed.
0417: long curiNextReadyTime = curi
0418: .getLong(A_TIME_OF_NEXT_PROCESSING);
0419: if (curiNextReadyTime < nextReadyTime) {
0420: setNextReadyTime(curiNextReadyTime);
0421: }
0422: } else {
0423: // No more entries in processingUriDB
0424: processingCursor.close();
0425: return;
0426: }
0427: }
0428: }
0429:
0430: /**
0431: * Count all entries in both primaryUriDB and processingUriDB.
0432: * <p>
0433: * This method is needed since BDB does not provide a simple way of counting
0434: * entries.
0435: * <p>
0436: * Note: This is an expensive operation, requires a loop through the entire
0437: * queue!
0438: * @return the number of distinct CrawlURIs in the HQ.
0439: * @throws DatabaseException
0440: */
0441: protected long countCrawlURIs() throws DatabaseException {
0442: // TODO: Instead of all this, the value should be simply read from the
0443: // database.
0444: long count = 0;
0445:
0446: DatabaseEntry keyEntry = new DatabaseEntry();
0447: DatabaseEntry dataEntry = new DatabaseEntry();
0448:
0449: // Count URIs in the queue
0450: Cursor primaryCursor = primaryUriDB.openCursor(null, null);
0451: OperationStatus opStatus = primaryCursor.getFirst(keyEntry,
0452: dataEntry, LockMode.DEFAULT);
0453: while (opStatus == OperationStatus.SUCCESS) {
0454: count++;
0455: opStatus = primaryCursor.getNext(keyEntry, dataEntry,
0456: LockMode.DEFAULT);
0457: }
0458:
0459: primaryCursor.close();
0460:
0461: // Now count URIs in the processingUriDB
0462: Cursor processingCursor = processingUriDB
0463: .openCursor(null, null);
0464: opStatus = processingCursor.getFirst(keyEntry, dataEntry,
0465: LockMode.DEFAULT);
0466: while (opStatus == OperationStatus.SUCCESS) {
0467: count++;
0468: opStatus = processingCursor.getNext(keyEntry, dataEntry,
0469: LockMode.DEFAULT);
0470: }
0471:
0472: processingCursor.close();
0473: return count;
0474: }
0475:
0476: /**
0477: * Returns true if this HQ has a CrawlURI matching the uri string currently
0478: * being processed. False otherwise.
0479: *
0480: * @param uri Uri to check
0481: * @return true if this HQ has a CrawlURI matching the uri string currently
0482: * being processed. False otherwise.
0483: *
0484: * @throws DatabaseException
0485: */
0486: protected boolean inProcessing(String uri) throws DatabaseException {
0487: DatabaseEntry keyEntry = new DatabaseEntry();
0488: DatabaseEntry dataEntry = new DatabaseEntry();
0489:
0490: StringBinding.stringToEntry(uri, keyEntry);
0491:
0492: OperationStatus opStatus = processingUriDB.get(null, keyEntry,
0493: dataEntry, LockMode.DEFAULT);
0494:
0495: if (opStatus == OperationStatus.SUCCESS) {
0496: return true;
0497: }
0498:
0499: return false; //Not found
0500: }
0501:
0502: /**
0503: * Removes a URI from the list of URIs belonging to this HQ and are
0504: * currently being processed.
0505: * <p>
0506: * Returns true if successful, false if the URI was not found.
0507: *
0508: * @param uri The URI string of the CrawlURI to delete.
0509: *
0510: * @throws DatabaseException
0511: * @throws IllegalStateException if the URI was not on the list
0512: */
0513: protected void deleteInProcessing(String uri)
0514: throws DatabaseException {
0515: DatabaseEntry keyEntry = new DatabaseEntry();
0516:
0517: StringBinding.stringToEntry(uri, keyEntry);
0518:
0519: OperationStatus opStatus = processingUriDB.delete(null,
0520: keyEntry);
0521:
0522: if (opStatus != OperationStatus.SUCCESS) {
0523: if (opStatus == OperationStatus.NOTFOUND) {
0524: throw new IllegalStateException(
0525: "Trying to deleta a "
0526: + "non-existant URI from the list of URIs being "
0527: + "processed. HQ: " + hostName
0528: + ", CrawlURI: " + uri);
0529: }
0530: throw new DatabaseException("Error occured deleting URI: "
0531: + uri + " from HQ " + hostName + " list "
0532: + "of URIs currently being processed. "
0533: + opStatus.toString());
0534: }
0535: }
0536:
0537: /**
0538: * Adds a CrawlURI to the list of CrawlURIs belonging to this HQ and are
0539: * being processed at the moment.
0540: *
0541: * @param curi
0542: * The CrawlURI to add to the list
0543: * @throws DatabaseException
0544: * @throws IllegalStateException
0545: * if the CrawlURI is already in the list of URIs being
0546: * processed.
0547: */
0548: protected void addInProcessing(CrawlURI curi)
0549: throws DatabaseException, IllegalStateException {
0550: DatabaseEntry keyEntry = new DatabaseEntry();
0551: DatabaseEntry dataEntry = new DatabaseEntry();
0552:
0553: StringBinding.stringToEntry(curi.toString(), keyEntry);
0554: crawlURIBinding.objectToEntry(curi, dataEntry);
0555:
0556: OperationStatus opStatus = processingUriDB.putNoOverwrite(null,
0557: keyEntry, dataEntry);
0558:
0559: if (opStatus != OperationStatus.SUCCESS) {
0560: if (opStatus == OperationStatus.KEYEXIST) {
0561: throw new IllegalStateException(
0562: "Can not insert duplicate "
0563: + "URI into list of URIs being processed. "
0564: + "HQ: " + hostName + ", CrawlURI: "
0565: + curi.toString());
0566: }
0567: throw new DatabaseException(
0568: "Error occured adding CrawlURI: " + curi.toString()
0569: + " to HQ " + hostName + " list "
0570: + "of URIs currently being processed. "
0571: + opStatus.toString());
0572: }
0573: }
0574:
0575: /**
0576: * Returns the CrawlURI associated with the specified URI (string) or null
0577: * if no such CrawlURI is queued in this HQ. If CrawlURI is being processed
0578: * it is not considered to be <i>queued </i> and this method will return
0579: * null for any such URIs.
0580: *
0581: * @param uri
0582: * A string representing the URI
0583: * @return the CrawlURI associated with the specified URI (string) or null
0584: * if no such CrawlURI is queued in this HQ.
0585: *
0586: * @throws DatabaseException
0587: * if a errors occurs reading the database
0588: */
0589: protected CrawlURI getCrawlURI(String uri) throws DatabaseException {
0590: DatabaseEntry keyEntry = new DatabaseEntry();
0591: DatabaseEntry dataEntry = new DatabaseEntry();
0592:
0593: primaryKeyBinding.objectToEntry(uri, keyEntry);
0594: primaryUriDB.get(null, keyEntry, dataEntry, LockMode.DEFAULT);
0595:
0596: CrawlURI curi = (CrawlURI) crawlURIBinding
0597: .entryToObject(dataEntry);
0598:
0599: return curi;
0600: }
0601:
0602: /**
0603: * Update CrawlURI that has completed processing.
0604: *
0605: * @param curi The CrawlURI. This must be a CrawlURI issued by this HQ's
0606: * {@link #next() next()} method.
0607: * @param needWait If true then the URI was processed successfully,
0608: * requiring a period of suspended action on that host. If
0609: * valence is > 1 then seperate times are maintained for
0610: * each slot.
0611: * @param wakeupTime If new state is
0612: * {@link AdaptiveRevisitHostQueue#HQSTATE_SNOOZED snoozed}
0613: * then this parameter should contain the time (in
0614: * milliseconds) when it will be safe to wake the HQ up
0615: * again. Otherwise this parameter will be ignored.
0616: *
0617: * @throws IllegalStateException if the CrawlURI
0618: * does not match a CrawlURI issued for crawling by this HQ's
0619: * {@link AdaptiveRevisitHostQueue#next() next()}.
0620: * @throws IOException if an error occurs accessing the database
0621: */
0622: public void update(CrawlURI curi, boolean needWait, long wakeupTime)
0623: throws IllegalStateException, IOException {
0624: update(curi, needWait, wakeupTime, false);
0625: }
0626:
0627: /**
0628: * Update CrawlURI that has completed processing.
0629: *
0630: * @param curi The CrawlURI. This must be a CrawlURI issued by this HQ's
0631: * {@link #next() next()} method.
0632: * @param needWait If true then the URI was processed successfully,
0633: * requiring a period of suspended action on that host. If
0634: * valence is > 1 then seperate times are maintained for
0635: * each slot.
0636: * @param wakeupTime If new state is
0637: * {@link AdaptiveRevisitHostQueue#HQSTATE_SNOOZED snoozed}
0638: * then this parameter should contain the time (in
0639: * milliseconds) when it will be safe to wake the HQ up
0640: * again. Otherwise this parameter will be ignored.
0641: * @param forgetURI If true, the URI will be deleted from the queue.
0642: *
0643: * @throws IllegalStateException if the CrawlURI
0644: * does not match a CrawlURI issued for crawling by this HQ's
0645: * {@link AdaptiveRevisitHostQueue#next() next()}.
0646: * @throws IOException if an error occurs accessing the database
0647: */
0648: public void update(CrawlURI curi, boolean needWait,
0649: long wakeupTime, boolean forgetURI)
0650: throws IllegalStateException, IOException {
0651: if (logger.isLoggable(Level.FINE)) {
0652: logger.fine("Updating " + curi.toString());
0653: }
0654: try {
0655: // First add it to the regular queue (if not forgetting it).
0656: if (forgetURI == false) {
0657: OperationStatus opStatus = strictAdd(curi, false);
0658: if (opStatus != OperationStatus.SUCCESS) {
0659: if (opStatus == OperationStatus.KEYEXIST) {
0660: throw new IllegalStateException(
0661: "Trying to update a"
0662: + " CrawlURI failed because it was in the queue"
0663: + " of URIs waiting for processing. URIs currently"
0664: + " being processsed can never be in that queue."
0665: + " HQ: " + hostName
0666: + ", CrawlURI: "
0667: + curi.toString());
0668: }
0669: }
0670:
0671: // Check if we need to update nextReadyTime
0672: long curiTimeOfNextProcessing = curi
0673: .getLong(A_TIME_OF_NEXT_PROCESSING);
0674: if (nextReadyTime > curiTimeOfNextProcessing) {
0675: setNextReadyTime(curiTimeOfNextProcessing);
0676: }
0677:
0678: } else {
0679: size--;
0680: }
0681:
0682: // Then remove from list of in processing URIs
0683: deleteInProcessing(curi.toString());
0684:
0685: inProcessing--;
0686:
0687: // Update the wakeUpTime slot.
0688: if (needWait == false) {
0689: // Ok, no wait then. Set wake up time to 0.
0690: wakeupTime = 0;
0691: }
0692:
0693: updateWakeUpTimeSlot(wakeupTime);
0694: } catch (DatabaseException e) {
0695: // Blanket catch all DBExceptions and convert to IOExceptions.
0696: IOException e2 = new IOException(e.getMessage());
0697: e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
0698: throw e2;
0699: }
0700: }
0701:
0702: /**
0703: * Returns the 'top' URI in the AdaptiveRevisitHostQueue.
0704: * <p>
0705: * HQ state will be set to {@link AdaptiveRevisitHostQueue#HQSTATE_BUSY busy} if this
0706: * method returns normally.
0707:
0708: *
0709: * @return a CrawlURI ready for processing
0710: *
0711: * @throws IllegalStateException if the HostQueues current state is not
0712: * ready {@link AdaptiveRevisitHostQueue#HQSTATE_READY ready}
0713: * @throws IOException if an error occurs reading from the database
0714: */
0715: public CrawlURI next() throws IllegalStateException, IOException {
0716: try {
0717: // Ok, lets issue a URI, first check state and reserve slot.
0718: if (getState() != HQSTATE_READY
0719: || useWakeUpTimeSlot() == false) {
0720: throw new IllegalStateException(
0721: "Can not issue next URI when " + "HQ "
0722: + hostName + " state is "
0723: + getStateByName());
0724: }
0725:
0726: DatabaseEntry keyEntry = new DatabaseEntry();
0727:
0728: // Get the top URI
0729: CrawlURI curi = peek();
0730:
0731: // Add it to processingUriDB
0732: addInProcessing(curi);
0733:
0734: // Delete it from the primaryUriDB
0735: primaryKeyBinding.objectToEntry(curi.toString(), keyEntry);
0736: OperationStatus opStatus = primaryUriDB.delete(null,
0737: keyEntry);
0738:
0739: if (opStatus != OperationStatus.SUCCESS) {
0740: throw new DatabaseException(
0741: "Error occured removing URI: "
0742: + curi.toString() + " from HQ "
0743: + hostName
0744: + " priority queue for processing. "
0745: + opStatus.toString());
0746: }
0747:
0748: // Finally update nextReadyTime with new top if one exists.
0749: CrawlURI top = peek();
0750: long nextReady = Long.MAX_VALUE;
0751: if (top != null) {
0752: nextReady = top.getLong(A_TIME_OF_NEXT_PROCESSING);
0753: }
0754: inProcessing++;
0755: setNextReadyTime(nextReady);
0756: logger.fine("Issuing " + curi.toString());
0757: return curi;
0758: } catch (DatabaseException e) {
0759: // Blanket catch all DBExceptions and convert to IOExceptions.
0760: IOException e2 = new IOException(e.getMessage());
0761: e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
0762: throw e2;
0763: }
0764: }
0765:
0766: /**
0767: * Returns the URI with the earliest time of next processing. I.e. the URI
0768: * at the head of this host based priority queue.
0769: * <p>
0770: * Note: This method will return the head CrawlURI regardless of wether it
0771: * is safe to start processing it or not. CrawlURI will remain in the queue.
0772: * The returned CrawlURI should only be used for queue inspection, it can
0773: * <i>not</i> be updated and returned to the queue. To get URIs ready for
0774: * processing use {@link #next() next()}.
0775: *
0776: * @return the URI with the earliest time of next processing or null if
0777: * the queue is empty or all URIs are currently being processed.
0778: * @throws IllegalStateException
0779: *
0780: * @throws IOException if an error occurs reading from the database
0781: */
0782: public CrawlURI peek() throws IllegalStateException, IOException {
0783: try {
0784:
0785: DatabaseEntry keyEntry = new DatabaseEntry();
0786: DatabaseEntry dataEntry = new DatabaseEntry();
0787:
0788: CrawlURI curi = null;
0789: Cursor secondaryCursor = secondaryUriDB.openCursor(null,
0790: null);
0791:
0792: OperationStatus opStatus = secondaryCursor.getFirst(
0793: keyEntry, dataEntry, LockMode.DEFAULT);
0794:
0795: if (opStatus == OperationStatus.SUCCESS) {
0796: curi = (CrawlURI) crawlURIBinding
0797: .entryToObject(dataEntry);
0798: } else {
0799: if (opStatus == OperationStatus.NOTFOUND) {
0800: curi = null;
0801: } else {
0802: throw new IOException("Error occured in "
0803: + "AdaptiveRevisitHostQueue.peek()."
0804: + opStatus.toString());
0805: }
0806: }
0807: secondaryCursor.close();
0808: return curi;
0809: } catch (DatabaseException e) {
0810: // Blanket catch all DBExceptions and convert to IOExceptions.
0811: IOException e2 = new IOException(e.getMessage());
0812: e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
0813: throw e2;
0814: }
0815: }
0816:
0817: /**
0818: * Returns the current state of the HQ.
0819: *
0820: * @return the current state of the HQ.
0821: *
0822: * @see AdaptiveRevisitHostQueue#HQSTATE_BUSY
0823: * @see AdaptiveRevisitHostQueue#HQSTATE_EMPTY
0824: * @see AdaptiveRevisitHostQueue#HQSTATE_READY
0825: * @see AdaptiveRevisitHostQueue#HQSTATE_SNOOZED
0826: */
0827: public int getState() {
0828: if (state != HQSTATE_EMPTY) {
0829: // Need to confirm state
0830: if (isBusy()) {
0831: state = HQSTATE_BUSY;
0832: } else {
0833: long currentTime = System.currentTimeMillis();
0834: long wakeTime = getEarliestWakeUpTimeSlot();
0835:
0836: if (wakeTime > currentTime
0837: || nextReadyTime > currentTime) {
0838: state = HQSTATE_SNOOZED;
0839: } else {
0840: state = HQSTATE_READY;
0841: }
0842: }
0843: }
0844: return state;
0845: }
0846:
0847: /**
0848: * Returns the time when the HQ will next be ready to issue a URI.
0849: * <p>
0850: * If the queue is in a {@link #HQSTATE_SNOOZED snoozed} state then this
0851: * time will be in the future and reflects either the time when the HQ will
0852: * again be able to issue URIs for processing because politness constraints
0853: * have ended, or when a URI next becomes available for visit, whichever is
0854: * larger.
0855: * <p>
0856: * If the queue is in a {@link #HQSTATE_READY ready} state this time will
0857: * be in the past and reflect the earliest time when the HQ had a URI ready
0858: * for processing, taking time spent snoozed for politness concerns into
0859: * account.
0860: * <p>
0861: * If the HQ is in any other state then the return value of this method is
0862: * equal to Long.MAX_VALUE.
0863: * <p>
0864: * This value may change each time a URI is added, issued or updated.
0865: *
0866: * @return the time when the HQ will next be ready to issue a URI
0867: */
0868: public long getNextReadyTime() {
0869: if (getState() == HQSTATE_BUSY || getState() == HQSTATE_EMPTY) {
0870: // Have no idea when HQ next be able issue a URI
0871: return Long.MAX_VALUE;
0872: }
0873: long wakeTime = getEarliestWakeUpTimeSlot();
0874: return nextReadyTime > wakeTime ? nextReadyTime : wakeTime;
0875: }
0876:
0877: /**
0878: * Updates nextReadyTime (if smaller) with the supplied value
0879: * @param newTime the new value of nextReady Time;
0880: */
0881: protected void setNextReadyTime(long newTime) {
0882: if (logger.isLoggable(Level.FINEST)) {
0883: logger.finest("Setting next ready to new value " + newTime
0884: + " from " + getNextReadyTime());
0885: }
0886: nextReadyTime = newTime;
0887: reorder();
0888: }
0889:
0890: /**
0891: * Method is called whenever something has been done that might have
0892: * changed the value of the 'published' time of next ready. If an owner
0893: * has been specified it will be notified that the value may have changed..
0894: */
0895: protected void reorder() {
0896: if (owner != null) {
0897: owner.reorder(this );
0898: }
0899: }
0900:
0901: /**
0902: * Same as {@link #getState() getState()} except this method returns a
0903: * human readable name for the state instead of its constant integer value.
0904: * <p>
0905: * Should only be used for reports, error messages and other strings
0906: * intended for human eyes.
0907: *
0908: * @return the human readable name of the current state
0909: */
0910: public String getStateByName() {
0911: switch (getState()) {
0912: case HQSTATE_BUSY:
0913: return "busy";
0914: case HQSTATE_EMPTY:
0915: return "empty";
0916: case HQSTATE_READY:
0917: return "ready";
0918: case HQSTATE_SNOOZED:
0919: return "snoozed";
0920: }
0921: // This should be impossible unless new states are added without
0922: // updating this method.
0923: return "undefined";
0924: }
0925:
0926: /**
0927: * Returns the size of the HQ. That is, the number of URIs queued,
0928: * including any that are currently being processed.
0929: *
0930: * @return the size of the HQ.
0931: */
0932: public long getSize() {
0933: return size;
0934: }
0935:
0936: /**
0937: * Set the AdaptiveRevisitQueueList object that contains this HQ. Will cause
0938: * that
0939: * object to be notified (via
0940: * {@link AdaptiveRevisitQueueList#reorder(AdaptiveRevisitHostQueue)
0941: * reorder()} when the
0942: * value used for sorting the list of HQs changes.
0943: * @param owner the ARHostQueueList object that contains this HQ.
0944: */
0945: public void setOwner(AdaptiveRevisitQueueList owner) {
0946: this .owner = owner;
0947: }
0948:
0949: /**
0950: * Cleanup all open Berkeley Database objects.
0951: * <p>
0952: * Does <I>not</I> close the Environment.
0953: *
0954: * @throws IOException if an error occurs closing a database object
0955: */
0956: public void close() throws IOException {
0957: try {
0958: secondaryUriDB.close();
0959: processingUriDB.close();
0960: primaryUriDB.close();
0961: } catch (DatabaseException e) {
0962: // Blanket catch all DBExceptions and convert to IOExceptions.
0963: IOException e2 = new IOException(e.getMessage());
0964: e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
0965: throw e2;
0966: }
0967: }
0968:
0969: /**
0970: * If true then the HQ has no available slot for issuing URIs.
0971: * <p>
0972: * I.e. number of in processing URIs = valence.
0973: *
0974: * @return true if number of in processing URIs = valence
0975: */
0976: private boolean isBusy() {
0977: return inProcessing == valence;
0978: }
0979:
0980: /**
0981: * Overwrites a used (-1) value in wakeUpTime[] with the supplied value.
0982: * @param newVal
0983: */
0984: private void updateWakeUpTimeSlot(long newVal) {
0985: for (int i = 0; i < valence; i++) {
0986: if (wakeUpTime[i] == -1) {
0987: wakeUpTime[i] = newVal;
0988: }
0989: }
0990: reorder();
0991: }
0992:
0993: /**
0994: * A new URI is being issued. Set the wakeup time on an unused slot to -1.
0995: *
0996: * @return true if a slot was successfully reserved. False otherwise.
0997: */
0998: private boolean useWakeUpTimeSlot() {
0999: for (int i = 0; i < valence; i++) {
1000: if (wakeUpTime[i] > -1
1001: && wakeUpTime[i] <= System.currentTimeMillis()) {
1002: wakeUpTime[i] = -1;
1003: return true;
1004: }
1005: }
1006: reorder();
1007: return false;
1008: }
1009:
1010: /**
1011: * Returns the earliest time when a wake up slot will become available. If
1012: * one is already available then this time will be in the past.
1013: * <p>
1014: * If all slots are taken with URIs currently being processed (i.e. HQ state
1015: * is {@link #HQSTATE_BUSY busy} then this will return Long.MAX_VALUE;
1016: * @return the earliest time when a wake up slot will become available
1017: */
1018: private long getEarliestWakeUpTimeSlot() {
1019: long earliest = Long.MAX_VALUE;
1020: for (int i = 0; i < valence; i++) {
1021: if (wakeUpTime[i] > -1 && wakeUpTime[i] < earliest) {
1022: earliest = wakeUpTime[i];
1023: }
1024: }
1025: return earliest;
1026: }
1027:
1028: /**
1029: * Returns a report detailing the status of this HQ.
1030: * @param max Maximum number of URIs to show. 0 equals no limit.
1031: * @return a report detailing the status of this HQ.
1032: */
1033: public String report(int max) {
1034: try {
1035: StringBuffer ret = new StringBuffer(256);
1036: ret.append("AdaptiveRevisitHostQueue: " + hostName + "\n");
1037: ret.append("Size: " + size + "\n");
1038: ret.append("State: " + getStateByName() + "\n");
1039: if (getState() == HQSTATE_BUSY) {
1040: ret.append("Processing URIs: \n");
1041: Cursor processingCursor = processingUriDB.openCursor(
1042: null, null);
1043: reportURIs(ret, processingCursor, valence);
1044: processingCursor.close();
1045: } else {
1046: ret
1047: .append("Next ready: "
1048: + ArchiveUtils
1049: .formatMillisecondsToConventional(getNextReadyTime()
1050: - System
1051: .currentTimeMillis())
1052: + "\n");
1053: }
1054: ret.append("Top URIs: \n");
1055:
1056: Cursor secondaryCursor = secondaryUriDB.openCursor(null,
1057: null);
1058: reportURIs(ret, secondaryCursor, max);
1059: secondaryCursor.close();
1060: return ret.toString();
1061: } catch (DatabaseException e) {
1062: return "Exception occured compiling report:\n"
1063: + e.getMessage();
1064: }
1065: }
1066:
1067: /**
1068: * Adds a report of the first <code>max</code> URIs that the cursor points
1069: * to to the stringbuffer object.
1070: *
1071: * @param ret The stringbuffer to append to
1072: * @param cursor The cursor pointing at a URI database
1073: * @param max Maximum number of URIs to report on. If fewer URIs are in the
1074: * database, all URIs are shown
1075: * @throws DatabaseException if an error occurs
1076: */
1077: private void reportURIs(StringBuffer ret, Cursor cursor, int max)
1078: throws DatabaseException {
1079: DatabaseEntry keyEntry = new DatabaseEntry();
1080: DatabaseEntry dataEntry = new DatabaseEntry();
1081: OperationStatus opStatus = cursor.getFirst(keyEntry, dataEntry,
1082: LockMode.DEFAULT);
1083: if (max == 0) {
1084: // No limit on the number of values returned.
1085: max = Integer.MAX_VALUE;
1086: }
1087: int i = 0;
1088: while (i < max && opStatus == OperationStatus.SUCCESS) {
1089: CrawlURI tmp = (CrawlURI) crawlURIBinding
1090: .entryToObject(dataEntry);
1091: ret.append(" URI: " + tmp.toString() + "\n");
1092: switch (tmp.getSchedulingDirective()) {
1093: case CandidateURI.HIGHEST:
1094: ret.append(" Sched. directive: HIGHEST\n");
1095: break;
1096: case CandidateURI.HIGH:
1097: ret.append(" Sched. directive: HIGH\n");
1098: break;
1099: case CandidateURI.MEDIUM:
1100: ret.append(" Sched. directive: MEDIUM\n");
1101: break;
1102: case CandidateURI.NORMAL:
1103: ret.append(" Sched. directive: NORMAL\n");
1104: break;
1105: }
1106: ret.append(" Next processing: ");
1107: long nextProcessing = tmp
1108: .getLong(A_TIME_OF_NEXT_PROCESSING)
1109: - System.currentTimeMillis();
1110: if (nextProcessing < 0) {
1111: ret.append("Overdue ");
1112: nextProcessing = nextProcessing * -1;
1113: }
1114: ret.append(ArchiveUtils
1115: .formatMillisecondsToConventional(nextProcessing)
1116: + "\n");
1117: if (tmp.getFetchStatus() != 0) {
1118: ret.append(" Last fetch status: "
1119: + tmp.getFetchStatus() + "\n");
1120: }
1121: if (tmp.containsKey(A_WAIT_INTERVAL)) {
1122: ret.append(" Wait interval: "
1123: + ArchiveUtils
1124: .formatMillisecondsToConventional(tmp
1125: .getLong(A_WAIT_INTERVAL))
1126: + "\n");
1127: }
1128: if (tmp.containsKey(A_NUMBER_OF_VISITS)) {
1129: ret.append(" Visits: "
1130: + tmp.getInt(A_NUMBER_OF_VISITS) + "\n");
1131: }
1132: if (tmp.containsKey(A_NUMBER_OF_VERSIONS)) {
1133: ret.append(" Versions: "
1134: + tmp.getInt(A_NUMBER_OF_VERSIONS) + "\n");
1135: }
1136:
1137: opStatus = cursor.getNext(keyEntry, dataEntry,
1138: LockMode.DEFAULT);
1139: i++;
1140: }
1141: }
1142:
1143: /**
1144: * Creates the secondary key for the secondary index.
1145: * <p>
1146: * The secondary index is the scheduling directive (first sorting) and
1147: * the time of next processing (sorted from earlies to latest within each
1148: * scheduling directive). If the scheduling directive is missing or
1149: * unknown NORMAL will be assumed.
1150: */
1151: private static class OrderOfProcessingKeyCreator extends
1152: TupleSerialKeyCreator {
1153:
1154: /**
1155: * Constructor. Invokes parent constructor.
1156: *
1157: * @param classCatalog is the catalog to hold shared class information
1158: * and for a database should be a
1159: * StoredClassCatalog.
1160: * @param dataClass is the CrawlURI class.
1161: */
1162: public OrderOfProcessingKeyCreator(ClassCatalog classCatalog,
1163: Class dataClass) {
1164: super (classCatalog, dataClass);
1165: }
1166:
1167: /* (non-Javadoc)
1168: * @see com.sleepycat.bind.serial.TupleSerialKeyCreator#createSecondaryKey(com.sleepycat.bind.tuple.TupleInput, java.lang.Object, com.sleepycat.bind.tuple.TupleOutput)
1169: */
1170: public boolean createSecondaryKey(TupleInput primaryKeyInput,
1171: Object dataInput, TupleOutput indexKeyOutput) {
1172: CrawlURI curi = (CrawlURI) dataInput;
1173: int directive = curi.getSchedulingDirective();
1174: // Can not rely on the default directive constants having a good
1175: // sort order
1176: switch (directive) {
1177: case CandidateURI.HIGHEST:
1178: directive = 0;
1179: break;
1180: case CandidateURI.HIGH:
1181: directive = 1;
1182: break;
1183: case CandidateURI.MEDIUM:
1184: directive = 2;
1185: break;
1186: case CandidateURI.NORMAL:
1187: directive = 3;
1188: break;
1189: default:
1190: directive = 3; // If directive missing or unknown
1191: }
1192:
1193: indexKeyOutput.writeInt(directive);
1194: long timeOfNextProcessing = curi
1195: .getLong(A_TIME_OF_NEXT_PROCESSING);
1196:
1197: indexKeyOutput.writeLong(timeOfNextProcessing);
1198: return true;
1199: }
1200: }
1201:
1202: /* (non-Javadoc)
1203: * @see org.archive.crawler.datamodel.CrawlSubstats.HasCrawlSubstats#getSubstats()
1204: */
1205: public CrawlSubstats getSubstats() {
1206: return substats;
1207: }
1208:
1209: }
|