0001: /* Copyright (C) 2003 Internet Archive.
0002: *
0003: * This file is part of the Heritrix web crawler (crawler.archive.org).
0004: *
0005: * Heritrix is free software; you can redistribute it and/or modify
0006: * it under the terms of the GNU Lesser Public License as published by
0007: * the Free Software Foundation; either version 2.1 of the License, or
0008: * any later version.
0009: *
0010: * Heritrix is distributed in the hope that it will be useful,
0011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
0013: * GNU Lesser Public License for more details.
0014: *
0015: * You should have received a copy of the GNU Lesser Public License
0016: * along with Heritrix; if not, write to the Free Software
0017: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0018: *
0019: * CrawlController.java
0020: * Created on May 14, 2003
0021: *
0022: * $Id: CrawlController.java 4917 2007-02-20 22:15:23Z gojomo $
0023: */
0024: package org.archive.crawler.framework;
0025:
0026: import java.io.File;
0027: import java.io.FileOutputStream;
0028: import java.io.FilenameFilter;
0029: import java.io.IOException;
0030: import java.io.ObjectInputStream;
0031: import java.io.PrintWriter;
0032: import java.io.Serializable;
0033: import java.util.ArrayList;
0034: import java.util.Arrays;
0035: import java.util.Collections;
0036: import java.util.EventObject;
0037: import java.util.HashMap;
0038: import java.util.HashSet;
0039: import java.util.Hashtable;
0040: import java.util.Iterator;
0041: import java.util.LinkedList;
0042: import java.util.List;
0043: import java.util.Map;
0044: import java.util.Set;
0045: import java.util.TreeSet;
0046: import java.util.concurrent.locks.ReentrantLock;
0047: import java.util.logging.FileHandler;
0048: import java.util.logging.Formatter;
0049: import java.util.logging.Level;
0050: import java.util.logging.Logger;
0051:
0052: import javax.management.AttributeNotFoundException;
0053: import javax.management.InvalidAttributeValueException;
0054: import javax.management.MBeanException;
0055: import javax.management.ReflectionException;
0056:
0057: import org.apache.commons.httpclient.URIException;
0058: import org.archive.crawler.admin.CrawlJob;
0059: import org.archive.crawler.admin.StatisticsTracker;
0060: import org.archive.crawler.datamodel.Checkpoint;
0061: import org.archive.crawler.datamodel.CrawlOrder;
0062: import org.archive.crawler.datamodel.CrawlURI;
0063: import org.archive.crawler.datamodel.ServerCache;
0064: import org.archive.crawler.event.CrawlStatusListener;
0065: import org.archive.crawler.event.CrawlURIDispositionListener;
0066: import org.archive.crawler.framework.exceptions.FatalConfigurationException;
0067: import org.archive.crawler.framework.exceptions.InitializationException;
0068: import org.archive.crawler.io.LocalErrorFormatter;
0069: import org.archive.crawler.io.RuntimeErrorFormatter;
0070: import org.archive.crawler.io.StatisticsLogFormatter;
0071: import org.archive.crawler.io.UriErrorFormatter;
0072: import org.archive.crawler.io.UriProcessingFormatter;
0073: import org.archive.crawler.settings.MapType;
0074: import org.archive.crawler.settings.SettingsHandler;
0075: import org.archive.crawler.util.CheckpointUtils;
0076: import org.archive.io.GenerationFileHandler;
0077: import org.archive.net.UURI;
0078: import org.archive.net.UURIFactory;
0079: import org.archive.util.ArchiveUtils;
0080: import org.archive.util.CachedBdbMap;
0081: import org.archive.util.FileUtils;
0082: import org.archive.util.Reporter;
0083: import org.archive.util.bdbje.EnhancedEnvironment;
0084: import org.xbill.DNS.DClass;
0085: import org.xbill.DNS.Lookup;
0086:
0087: import com.sleepycat.bind.serial.StoredClassCatalog;
0088: import com.sleepycat.je.CheckpointConfig;
0089: import com.sleepycat.je.Database;
0090: import com.sleepycat.je.DatabaseException;
0091: import com.sleepycat.je.DbInternal;
0092: import com.sleepycat.je.EnvironmentConfig;
0093: import com.sleepycat.je.dbi.EnvironmentImpl;
0094: import com.sleepycat.je.utilint.DbLsn;
0095:
0096: /**
0097: * CrawlController collects all the classes which cooperate to
0098: * perform a crawl and provides a high-level interface to the
0099: * running crawl.
0100: *
0101: * As the "global context" for a crawl, subcomponents will
0102: * often reach each other through the CrawlController.
0103: *
0104: * @author Gordon Mohr
0105: */
0106: public class CrawlController implements Serializable, Reporter {
0107: // be robust against trivial implementation changes
0108: private static final long serialVersionUID = ArchiveUtils
0109: .classnameBasedUID(CrawlController.class, 1);
0110:
0111: /**
0112: * Messages from the crawlcontroller.
0113: *
0114: * They appear on console.
0115: */
0116: private final static Logger LOGGER = Logger
0117: .getLogger(CrawlController.class.getName());
0118:
0119: // manifest support
0120: /** abbrieviation label for config files in manifest */
0121: public static final char MANIFEST_CONFIG_FILE = 'C';
0122: /** abbrieviation label for report files in manifest */
0123: public static final char MANIFEST_REPORT_FILE = 'R';
0124: /** abbrieviation label for log files in manifest */
0125: public static final char MANIFEST_LOG_FILE = 'L';
0126:
0127: // key log names
0128: private static final String LOGNAME_PROGRESS_STATISTICS = "progress-statistics";
0129: private static final String LOGNAME_URI_ERRORS = "uri-errors";
0130: private static final String LOGNAME_RUNTIME_ERRORS = "runtime-errors";
0131: private static final String LOGNAME_LOCAL_ERRORS = "local-errors";
0132: private static final String LOGNAME_CRAWL = "crawl";
0133:
0134: // key subcomponents which define and implement a crawl in progress
0135: private transient CrawlOrder order;
0136: private transient CrawlScope scope;
0137: private transient ProcessorChainList processorChains;
0138:
0139: private transient Frontier frontier;
0140:
0141: private transient ToePool toePool;
0142:
0143: private transient ServerCache serverCache;
0144:
0145: // This gets passed into the initialize method.
0146: private transient SettingsHandler settingsHandler;
0147:
0148: // Used to enable/disable single-threaded operation after OOM
0149: private volatile transient boolean singleThreadMode = false;
0150: private transient ReentrantLock singleThreadLock = null;
0151:
0152: // emergency reserve of memory to allow some progress/reporting after OOM
0153: private transient LinkedList<char[]> reserveMemory;
0154: private static final int RESERVE_BLOCKS = 1;
0155: private static final int RESERVE_BLOCK_SIZE = 6 * 2 ^ 20; // 6MB
0156:
0157: // crawl state: as requested or actual
0158:
0159: /**
0160: * Crawl exit status.
0161: */
0162: private transient String sExit;
0163:
0164: private static final Object NASCENT = "NASCENT".intern();
0165: private static final Object RUNNING = "RUNNING".intern();
0166: private static final Object PAUSED = "PAUSED".intern();
0167: private static final Object PAUSING = "PAUSING".intern();
0168: private static final Object CHECKPOINTING = "CHECKPOINTING"
0169: .intern();
0170: private static final Object STOPPING = "STOPPING".intern();
0171: private static final Object FINISHED = "FINISHED".intern();
0172: private static final Object STARTED = "STARTED".intern();
0173: private static final Object PREPARING = "PREPARING".intern();
0174:
0175: transient private Object state = NASCENT;
0176:
0177: // disk paths
0178: private transient File disk; // overall disk path
0179: private transient File logsDisk; // for log files
0180:
0181: /**
0182: * For temp files representing state of crawler (eg queues)
0183: */
0184: private transient File stateDisk;
0185:
0186: /**
0187: * For discardable temp files (eg fetch buffers).
0188: */
0189: private transient File scratchDisk;
0190:
0191: /**
0192: * Directory that holds checkpoint.
0193: */
0194: private transient File checkpointsDisk;
0195:
0196: /**
0197: * Checkpointer.
0198: * Knows if checkpoint in progress and what name of checkpoint is. Also runs
0199: * checkpoints.
0200: */
0201: private Checkpointer checkpointer;
0202:
0203: /**
0204: * Gets set to checkpoint we're in recovering if in checkpoint recover
0205: * mode. Gets setup by {@link #getCheckpointRecover()}.
0206: */
0207: private transient Checkpoint checkpointRecover = null;
0208:
0209: // crawl limits
0210: private long maxBytes;
0211: private long maxDocument;
0212: private long maxTime;
0213:
0214: /**
0215: * A manifest of all files used/created during this crawl. Written to file
0216: * at the end of the crawl (the absolutely last thing done).
0217: */
0218: private StringBuffer manifest;
0219:
0220: /**
0221: * Record of fileHandlers established for loggers,
0222: * assisting file rotation.
0223: */
0224: transient private Map<Logger, FileHandler> fileHandlers;
0225:
0226: /** suffix to use on active logs */
0227: public static final String CURRENT_LOG_SUFFIX = ".log";
0228:
0229: /**
0230: * Crawl progress logger.
0231: *
0232: * No exceptions. Logs summary result of each url processing.
0233: */
0234: public transient Logger uriProcessing;
0235:
0236: /**
0237: * This logger contains unexpected runtime errors.
0238: *
0239: * Would contain errors trying to set up a job or failures inside
0240: * processors that they are not prepared to recover from.
0241: */
0242: public transient Logger runtimeErrors;
0243:
0244: /**
0245: * This logger is for job-scoped logging, specifically errors which
0246: * happen and are handled within a particular processor.
0247: *
0248: * Examples would be socket timeouts, exceptions thrown by extractors, etc.
0249: */
0250: public transient Logger localErrors;
0251:
0252: /**
0253: * Special log for URI format problems, wherever they may occur.
0254: */
0255: public transient Logger uriErrors;
0256:
0257: /**
0258: * Statistics tracker writes here at regular intervals.
0259: */
0260: private transient Logger progressStats;
0261:
0262: /**
0263: * Logger to hold job summary report.
0264: *
0265: * Large state reports made at infrequent intervals (e.g. job ending) go
0266: * here.
0267: */
0268: public transient Logger reports;
0269:
0270: protected StatisticsTracking statistics = null;
0271:
0272: /**
0273: * List of crawl status listeners.
0274: *
0275: * All iterations need to synchronize on this object if they're to avoid
0276: * concurrent modification exceptions.
0277: * See {@link java.util.Collections#synchronizedList(List)}.
0278: */
0279: private transient List<CrawlStatusListener> registeredCrawlStatusListeners = Collections
0280: .synchronizedList(new ArrayList<CrawlStatusListener>());
0281:
0282: // Since there is a high probability that there will only ever by one
0283: // CrawlURIDispositionListner we will use this while there is only one:
0284: private transient CrawlURIDispositionListener registeredCrawlURIDispositionListener;
0285:
0286: // And then switch to the array once there is more then one.
0287: protected transient ArrayList<CrawlURIDispositionListener> registeredCrawlURIDispositionListeners;
0288:
0289: /** Shared bdb Environment for Frontier subcomponents */
0290: // TODO: investigate using multiple environments to split disk accesses
0291: // across separate physical disks
0292: private transient EnhancedEnvironment bdbEnvironment = null;
0293:
0294: /**
0295: * Keep a list of all BigMap instance made -- shouldn't be many -- so that
0296: * we can checkpoint.
0297: */
0298: private transient Map<String, CachedBdbMap<?, ?>> bigmaps = null;
0299:
0300: /**
0301: * Default constructor
0302: */
0303: public CrawlController() {
0304: super ();
0305: // Defer most setup to initialize methods
0306: }
0307:
0308: /**
0309: * Starting from nothing, set up CrawlController and associated
0310: * classes to be ready for a first crawl.
0311: *
0312: * @param sH Settings handler.
0313: * @throws InitializationException
0314: */
0315: public void initialize(SettingsHandler sH)
0316: throws InitializationException {
0317: sendCrawlStateChangeEvent(PREPARING, CrawlJob.STATUS_PREPARING);
0318:
0319: this .singleThreadLock = new ReentrantLock();
0320: this .settingsHandler = sH;
0321: this .order = settingsHandler.getOrder();
0322: this .order.setController(this );
0323: this .bigmaps = new Hashtable<String, CachedBdbMap<?, ?>>();
0324: sExit = "";
0325: this .manifest = new StringBuffer();
0326: String onFailMessage = "";
0327: try {
0328: onFailMessage = "You must set the User-Agent and From HTTP"
0329: + " header values to acceptable strings. \n"
0330: + " User-Agent: [software-name](+[info-url])[misc]\n"
0331: + " From: [email-address]\n";
0332: order.checkUserAgentAndFrom();
0333:
0334: onFailMessage = "Unable to setup disk";
0335: if (disk == null) {
0336: setupDisk();
0337: }
0338:
0339: onFailMessage = "Unable to create log file(s)";
0340: setupLogs();
0341:
0342: // Figure if we're to do a checkpoint restore. If so, get the
0343: // checkpointRecover instance and then put into place the old bdb
0344: // log files. If any of the log files already exist in target state
0345: // diretory, WE DO NOT OVERWRITE (Makes for faster recovery).
0346: // CrawlController checkpoint recovery code manages restoration of
0347: // the old StatisticsTracker, any BigMaps used by the Crawler and
0348: // the moving of bdb log files into place only. Other objects
0349: // interested in recovery need to ask if
0350: // CrawlController#isCheckpointRecover is set to figure if in
0351: // recovery and then take appropriate recovery action
0352: // (These objects can call CrawlController#getCheckpointRecover
0353: // to get the directory that might hold files/objects dropped
0354: // checkpointing). Such objects will need to use a technique other
0355: // than object serialization restoring settings because they'll
0356: // have already been constructed when comes time for object to ask
0357: // if its to recover itself. See ARCWriterProcessor for example.
0358: onFailMessage = "Unable to test/run checkpoint recover";
0359: this .checkpointRecover = getCheckpointRecover();
0360: if (this .checkpointRecover == null) {
0361: this .checkpointer = new Checkpointer(this ,
0362: this .checkpointsDisk);
0363: } else {
0364: setupCheckpointRecover();
0365: }
0366:
0367: onFailMessage = "Unable to setup bdb environment.";
0368: setupBdb();
0369:
0370: onFailMessage = "Unable to setup statistics";
0371: setupStatTracking();
0372:
0373: onFailMessage = "Unable to setup crawl modules";
0374: setupCrawlModules();
0375: } catch (Exception e) {
0376: String tmp = "On crawl: "
0377: + settingsHandler.getSettingsObject(null).getName()
0378: + " " + onFailMessage;
0379: LOGGER.log(Level.SEVERE, tmp, e);
0380: throw new InitializationException(tmp, e);
0381: }
0382:
0383: // force creation of DNS Cache now -- avoids CacheCleaner in toe-threads group
0384: // also cap size at 1 (we never wanta cached value; 0 is non-operative)
0385: Lookup.getDefaultCache(DClass.IN).setMaxEntries(1);
0386: //dns.getRecords("localhost", Type.A, DClass.IN);
0387:
0388: setupToePool();
0389: setThresholds();
0390:
0391: reserveMemory = new LinkedList<char[]>();
0392: for (int i = 1; i < RESERVE_BLOCKS; i++) {
0393: reserveMemory.add(new char[RESERVE_BLOCK_SIZE]);
0394: }
0395: }
0396:
0397: /**
0398: * Does setup of checkpoint recover.
0399: * Copies bdb log files into state dir.
0400: * @throws IOException
0401: */
0402: protected void setupCheckpointRecover() throws IOException {
0403: long started = System.currentTimeMillis();
0404: ;
0405: if (LOGGER.isLoggable(Level.FINE)) {
0406: LOGGER
0407: .fine("Starting recovery setup -- copying into place "
0408: + "bdbje log files -- for checkpoint named "
0409: + this .checkpointRecover.getDisplayName());
0410: }
0411: // Mark context we're in a recovery.
0412: this .checkpointer.recover(this );
0413: this .progressStats.info("CHECKPOINT RECOVER "
0414: + this .checkpointRecover.getDisplayName());
0415: // Copy the bdb log files to the state dir so we don't damage
0416: // old checkpoint. If thousands of log files, can take
0417: // tens of minutes (1000 logs takes ~5 minutes to java copy,
0418: // dependent upon hardware). If log file already exists over in the
0419: // target state directory, we do not overwrite -- we assume the log
0420: // file in the target same as one we'd copy from the checkpoint dir.
0421: File bdbSubDir = CheckpointUtils
0422: .getBdbSubDirectory(this .checkpointRecover
0423: .getDirectory());
0424: FileUtils.copyFiles(bdbSubDir, CheckpointUtils
0425: .getJeLogsFilter(), getStateDisk(), true, false);
0426: if (LOGGER.isLoggable(Level.INFO)) {
0427: LOGGER.info("Finished recovery setup for checkpoint named "
0428: + this .checkpointRecover.getDisplayName() + " in "
0429: + (System.currentTimeMillis() - started) + "ms.");
0430: }
0431: }
0432:
0433: protected boolean getCheckpointCopyBdbjeLogs() {
0434: return ((Boolean) this .order.getUncheckedAttribute(null,
0435: CrawlOrder.ATTR_CHECKPOINT_COPY_BDBJE_LOGS))
0436: .booleanValue();
0437: }
0438:
0439: private void setupBdb() throws FatalConfigurationException,
0440: AttributeNotFoundException {
0441: EnvironmentConfig envConfig = new EnvironmentConfig();
0442: envConfig.setAllowCreate(true);
0443: int bdbCachePercent = ((Integer) this .order.getAttribute(null,
0444: CrawlOrder.ATTR_BDB_CACHE_PERCENT)).intValue();
0445: if (bdbCachePercent > 0) {
0446: // Operator has expressed a preference; override BDB default or
0447: // je.properties value
0448: envConfig.setCachePercent(bdbCachePercent);
0449: }
0450: envConfig.setLockTimeout(5000000); // 5 seconds
0451: if (LOGGER.isLoggable(Level.FINEST)) {
0452: envConfig.setConfigParam("java.util.logging.level",
0453: "SEVERE");
0454: envConfig.setConfigParam("java.util.logging.level.evictor",
0455: "SEVERE");
0456: envConfig.setConfigParam(
0457: "java.util.logging.ConsoleHandler.on", "true");
0458: }
0459:
0460: if (!getCheckpointCopyBdbjeLogs()) {
0461: // If we are not copying files on checkpoint, then set bdbje to not
0462: // remove its log files so that its possible to later assemble
0463: // (manually) all needed to run a recovery using mix of current
0464: // bdbje logs and those its marked for deletion.
0465: envConfig.setConfigParam("je.cleaner.expunge", "false");
0466: }
0467:
0468: try {
0469: this .bdbEnvironment = new EnhancedEnvironment(
0470: getStateDisk(), envConfig);
0471: if (LOGGER.isLoggable(Level.FINE)) {
0472: // Write out the bdb configuration.
0473: envConfig = bdbEnvironment.getConfig();
0474: LOGGER.fine("BdbConfiguration: Cache percentage "
0475: + envConfig.getCachePercent() + ", cache size "
0476: + envConfig.getCacheSize());
0477: }
0478: } catch (DatabaseException e) {
0479: e.printStackTrace();
0480: throw new FatalConfigurationException(e.getMessage());
0481: }
0482: }
0483:
0484: /**
0485: * @return the shared EnhancedEnvironment
0486: */
0487: public EnhancedEnvironment getBdbEnvironment() {
0488: return this .bdbEnvironment;
0489: }
0490:
0491: /**
0492: * @deprecated use EnhancedEnvironment's getClassCatalog() instead
0493: */
0494: public StoredClassCatalog getClassCatalog() {
0495: return this .bdbEnvironment.getClassCatalog();
0496: }
0497:
0498: /**
0499: * Register for CrawlStatus events.
0500: *
0501: * @param cl a class implementing the CrawlStatusListener interface
0502: *
0503: * @see CrawlStatusListener
0504: */
0505: public void addCrawlStatusListener(CrawlStatusListener cl) {
0506: synchronized (this .registeredCrawlStatusListeners) {
0507: this .registeredCrawlStatusListeners.add(cl);
0508: }
0509: }
0510:
0511: /**
0512: * Register for CrawlURIDisposition events.
0513: *
0514: * @param cl a class implementing the CrawlURIDispostionListener interface
0515: *
0516: * @see CrawlURIDispositionListener
0517: */
0518: public void addCrawlURIDispositionListener(
0519: CrawlURIDispositionListener cl) {
0520: registeredCrawlURIDispositionListener = null;
0521: if (registeredCrawlURIDispositionListeners == null) {
0522: // First listener;
0523: registeredCrawlURIDispositionListener = cl;
0524: //Only used for the first one while it is the only one.
0525: registeredCrawlURIDispositionListeners = new ArrayList<CrawlURIDispositionListener>(
0526: 1);
0527: //We expect it to be very small.
0528: }
0529: registeredCrawlURIDispositionListeners.add(cl);
0530: }
0531:
0532: /**
0533: * Allows an external class to raise a CrawlURIDispostion
0534: * crawledURISuccessful event that will be broadcast to all listeners that
0535: * have registered with the CrawlController.
0536: *
0537: * @param curi - The CrawlURI that will be sent with the event notification.
0538: *
0539: * @see CrawlURIDispositionListener#crawledURISuccessful(CrawlURI)
0540: */
0541: public void fireCrawledURISuccessfulEvent(CrawlURI curi) {
0542: if (registeredCrawlURIDispositionListener != null) {
0543: // Then we'll just use that.
0544: registeredCrawlURIDispositionListener
0545: .crawledURISuccessful(curi);
0546: } else {
0547: // Go through the list.
0548: if (registeredCrawlURIDispositionListeners != null
0549: && registeredCrawlURIDispositionListeners.size() > 0) {
0550: Iterator it = registeredCrawlURIDispositionListeners
0551: .iterator();
0552: while (it.hasNext()) {
0553: ((CrawlURIDispositionListener) it.next())
0554: .crawledURISuccessful(curi);
0555: }
0556: }
0557: }
0558: }
0559:
0560: /**
0561: * Allows an external class to raise a CrawlURIDispostion
0562: * crawledURINeedRetry event that will be broadcast to all listeners that
0563: * have registered with the CrawlController.
0564: *
0565: * @param curi - The CrawlURI that will be sent with the event notification.
0566: *
0567: * @see CrawlURIDispositionListener#crawledURINeedRetry(CrawlURI)
0568: */
0569: public void fireCrawledURINeedRetryEvent(CrawlURI curi) {
0570: if (registeredCrawlURIDispositionListener != null) {
0571: // Then we'll just use that.
0572: registeredCrawlURIDispositionListener
0573: .crawledURINeedRetry(curi);
0574: return;
0575: }
0576:
0577: // Go through the list.
0578: if (registeredCrawlURIDispositionListeners != null
0579: && registeredCrawlURIDispositionListeners.size() > 0) {
0580: for (Iterator i = registeredCrawlURIDispositionListeners
0581: .iterator(); i.hasNext();) {
0582: ((CrawlURIDispositionListener) i.next())
0583: .crawledURINeedRetry(curi);
0584: }
0585: }
0586: }
0587:
0588: /**
0589: * Allows an external class to raise a CrawlURIDispostion
0590: * crawledURIDisregard event that will be broadcast to all listeners that
0591: * have registered with the CrawlController.
0592: *
0593: * @param curi -
0594: * The CrawlURI that will be sent with the event notification.
0595: *
0596: * @see CrawlURIDispositionListener#crawledURIDisregard(CrawlURI)
0597: */
0598: public void fireCrawledURIDisregardEvent(CrawlURI curi) {
0599: if (registeredCrawlURIDispositionListener != null) {
0600: // Then we'll just use that.
0601: registeredCrawlURIDispositionListener
0602: .crawledURIDisregard(curi);
0603: } else {
0604: // Go through the list.
0605: if (registeredCrawlURIDispositionListeners != null
0606: && registeredCrawlURIDispositionListeners.size() > 0) {
0607: Iterator it = registeredCrawlURIDispositionListeners
0608: .iterator();
0609: while (it.hasNext()) {
0610: ((CrawlURIDispositionListener) it.next())
0611: .crawledURIDisregard(curi);
0612: }
0613: }
0614: }
0615: }
0616:
0617: /**
0618: * Allows an external class to raise a CrawlURIDispostion crawledURIFailure event
0619: * that will be broadcast to all listeners that have registered with the CrawlController.
0620: *
0621: * @param curi - The CrawlURI that will be sent with the event notification.
0622: *
0623: * @see CrawlURIDispositionListener#crawledURIFailure(CrawlURI)
0624: */
0625: public void fireCrawledURIFailureEvent(CrawlURI curi) {
0626: if (registeredCrawlURIDispositionListener != null) {
0627: // Then we'll just use that.
0628: registeredCrawlURIDispositionListener
0629: .crawledURIFailure(curi);
0630: } else {
0631: // Go through the list.
0632: if (registeredCrawlURIDispositionListeners != null
0633: && registeredCrawlURIDispositionListeners.size() > 0) {
0634: Iterator it = registeredCrawlURIDispositionListeners
0635: .iterator();
0636: while (it.hasNext()) {
0637: ((CrawlURIDispositionListener) it.next())
0638: .crawledURIFailure(curi);
0639: }
0640: }
0641: }
0642: }
0643:
0644: private void setupCrawlModules()
0645: throws FatalConfigurationException,
0646: AttributeNotFoundException, MBeanException,
0647: ReflectionException {
0648: if (scope == null) {
0649: scope = (CrawlScope) order
0650: .getAttribute(CrawlScope.ATTR_NAME);
0651: scope.initialize(this );
0652: }
0653: try {
0654: this .serverCache = new ServerCache(this );
0655: } catch (Exception e) {
0656: throw new FatalConfigurationException(
0657: "Unable to"
0658: + " initialize frontier (Failed setup of ServerCache) "
0659: + e);
0660: }
0661:
0662: if (this .frontier == null) {
0663: this .frontier = (Frontier) order
0664: .getAttribute(Frontier.ATTR_NAME);
0665: try {
0666: frontier.initialize(this );
0667: frontier.pause(); // Pause until begun
0668: // Run recovery if recoverPath points to a file (If it points
0669: // to a directory, its a checkpoint recovery).
0670: // TODO: make recover path relative to job root dir.
0671: if (!isCheckpointRecover()) {
0672: runFrontierRecover((String) order
0673: .getAttribute(CrawlOrder.ATTR_RECOVER_PATH));
0674: }
0675: } catch (IOException e) {
0676: throw new FatalConfigurationException(
0677: "unable to initialize frontier: " + e);
0678: }
0679: }
0680:
0681: // Setup processors
0682: if (processorChains == null) {
0683: processorChains = new ProcessorChainList(order);
0684: }
0685: }
0686:
0687: protected void runFrontierRecover(String recoverPath)
0688: throws AttributeNotFoundException, MBeanException,
0689: ReflectionException, FatalConfigurationException {
0690: if (recoverPath == null || recoverPath.length() <= 0) {
0691: return;
0692: }
0693: File f = new File(recoverPath);
0694: if (!f.exists()) {
0695: LOGGER.severe("Recover file does not exist " + recoverPath);
0696: return;
0697: }
0698: if (!f.isFile()) {
0699: // Its a directory if supposed to be doing a checkpoint recover.
0700: return;
0701: }
0702: boolean retainFailures = ((Boolean) order
0703: .getAttribute(CrawlOrder.ATTR_RECOVER_RETAIN_FAILURES))
0704: .booleanValue();
0705: try {
0706: frontier.importRecoverLog(recoverPath, retainFailures);
0707: } catch (IOException e) {
0708: e.printStackTrace();
0709: throw (FatalConfigurationException) new FatalConfigurationException(
0710: "Recover.log " + recoverPath + " problem: " + e)
0711: .initCause(e);
0712: }
0713: }
0714:
0715: private void setupDisk() throws AttributeNotFoundException {
0716: String diskPath = (String) order.getAttribute(null,
0717: CrawlOrder.ATTR_DISK_PATH);
0718: this .disk = getSettingsHandler()
0719: .getPathRelativeToWorkingDirectory(diskPath);
0720: this .disk.mkdirs();
0721: this .logsDisk = getSettingsDir(CrawlOrder.ATTR_LOGS_PATH);
0722: this .checkpointsDisk = getSettingsDir(CrawlOrder.ATTR_CHECKPOINTS_PATH);
0723: this .stateDisk = getSettingsDir(CrawlOrder.ATTR_STATE_PATH);
0724: this .scratchDisk = getSettingsDir(CrawlOrder.ATTR_SCRATCH_PATH);
0725: }
0726:
0727: /**
0728: * @return The logging directory or null if problem reading the settings.
0729: */
0730: public File getLogsDir() {
0731: File f = null;
0732: try {
0733: f = getSettingsDir(CrawlOrder.ATTR_LOGS_PATH);
0734: } catch (AttributeNotFoundException e) {
0735: LOGGER.severe("Failed get of logs directory: "
0736: + e.getMessage());
0737: }
0738: return f;
0739: }
0740:
0741: /**
0742: * Return fullpath to the directory named by <code>key</code>
0743: * in settings.
0744: * If directory does not exist, it and all intermediary dirs
0745: * will be created.
0746: * @param key Key to use going to settings.
0747: * @return Full path to directory named by <code>key</code>.
0748: * @throws AttributeNotFoundException
0749: */
0750: public File getSettingsDir(String key)
0751: throws AttributeNotFoundException {
0752: String path = (String) order.getAttribute(null, key);
0753: File f = new File(path);
0754: if (!f.isAbsolute()) {
0755: f = new File(disk.getPath(), path);
0756: }
0757: if (!f.exists()) {
0758: f.mkdirs();
0759: }
0760: return f;
0761: }
0762:
0763: /**
0764: * Setup the statistics tracker.
0765: * The statistics object must be created before modules can use it.
0766: * Do it here now so that when modules retrieve the object from the
0767: * controller during initialization (which some do), its in place.
0768: * @throws InvalidAttributeValueException
0769: * @throws FatalConfigurationException
0770: */
0771: private void setupStatTracking()
0772: throws InvalidAttributeValueException,
0773: FatalConfigurationException {
0774: MapType loggers = order.getLoggers();
0775: final String cstName = "crawl-statistics";
0776: if (loggers.isEmpty(null)) {
0777: if (!isCheckpointRecover() && this .statistics == null) {
0778: this .statistics = new StatisticsTracker(cstName);
0779: }
0780: loggers.addElement(null,
0781: (StatisticsTracker) this .statistics);
0782: }
0783:
0784: if (isCheckpointRecover()) {
0785: restoreStatisticsTracker(loggers, cstName);
0786: }
0787:
0788: for (Iterator it = loggers.iterator(null); it.hasNext();) {
0789: StatisticsTracking tracker = (StatisticsTracking) it.next();
0790: tracker.initialize(this );
0791: if (this .statistics == null) {
0792: this .statistics = tracker;
0793: }
0794: }
0795: }
0796:
0797: protected void restoreStatisticsTracker(MapType loggers,
0798: String replaceName) throws FatalConfigurationException {
0799: try {
0800: // Add the deserialized statstracker to the settings system.
0801: loggers
0802: .removeElement(loggers.globalSettings(),
0803: replaceName);
0804: loggers.addElement(loggers.globalSettings(),
0805: (StatisticsTracker) this .statistics);
0806: } catch (Exception e) {
0807: throw convertToFatalConfigurationException(e);
0808: }
0809: }
0810:
0811: protected FatalConfigurationException convertToFatalConfigurationException(
0812: Exception e) {
0813: FatalConfigurationException fce = new FatalConfigurationException(
0814: "Converted exception: " + e.getMessage());
0815: fce.setStackTrace(e.getStackTrace());
0816: return fce;
0817: }
0818:
0819: private void setupLogs() throws IOException {
0820: String logsPath = logsDisk.getAbsolutePath()
0821: + File.separatorChar;
0822: uriProcessing = Logger
0823: .getLogger(LOGNAME_CRAWL + "." + logsPath);
0824: runtimeErrors = Logger.getLogger(LOGNAME_RUNTIME_ERRORS + "."
0825: + logsPath);
0826: localErrors = Logger.getLogger(LOGNAME_LOCAL_ERRORS + "."
0827: + logsPath);
0828: uriErrors = Logger.getLogger(LOGNAME_URI_ERRORS + "."
0829: + logsPath);
0830: progressStats = Logger.getLogger(LOGNAME_PROGRESS_STATISTICS
0831: + "." + logsPath);
0832:
0833: this .fileHandlers = new HashMap<Logger, FileHandler>();
0834:
0835: setupLogFile(uriProcessing, logsPath + LOGNAME_CRAWL
0836: + CURRENT_LOG_SUFFIX, new UriProcessingFormatter(),
0837: true);
0838:
0839: setupLogFile(runtimeErrors, logsPath + LOGNAME_RUNTIME_ERRORS
0840: + CURRENT_LOG_SUFFIX, new RuntimeErrorFormatter(), true);
0841:
0842: setupLogFile(localErrors, logsPath + LOGNAME_LOCAL_ERRORS
0843: + CURRENT_LOG_SUFFIX, new LocalErrorFormatter(), true);
0844:
0845: setupLogFile(uriErrors, logsPath + LOGNAME_URI_ERRORS
0846: + CURRENT_LOG_SUFFIX, new UriErrorFormatter(), true);
0847:
0848: setupLogFile(progressStats, logsPath
0849: + LOGNAME_PROGRESS_STATISTICS + CURRENT_LOG_SUFFIX,
0850: new StatisticsLogFormatter(), true);
0851:
0852: }
0853:
0854: private void setupLogFile(Logger logger, String filename,
0855: Formatter f, boolean shouldManifest) throws IOException,
0856: SecurityException {
0857: GenerationFileHandler fh = new GenerationFileHandler(filename,
0858: true, shouldManifest);
0859: fh.setFormatter(f);
0860: logger.addHandler(fh);
0861: addToManifest(filename, MANIFEST_LOG_FILE, shouldManifest);
0862: logger.setUseParentHandlers(false);
0863: this .fileHandlers.put(logger, fh);
0864: }
0865:
0866: protected void rotateLogFiles(String generationSuffix)
0867: throws IOException {
0868: if (this .state != PAUSED && this .state != CHECKPOINTING) {
0869: throw new IllegalStateException(
0870: "Pause crawl before requesting " + "log rotation.");
0871: }
0872: for (Iterator i = fileHandlers.keySet().iterator(); i.hasNext();) {
0873: Logger l = (Logger) i.next();
0874: GenerationFileHandler gfh = (GenerationFileHandler) fileHandlers
0875: .get(l);
0876: GenerationFileHandler newGfh = gfh.rotate(generationSuffix,
0877: CURRENT_LOG_SUFFIX);
0878: if (gfh.shouldManifest()) {
0879: addToManifest((String) newGfh.getFilenameSeries()
0880: .get(1), MANIFEST_LOG_FILE, newGfh
0881: .shouldManifest());
0882: }
0883: l.removeHandler(gfh);
0884: l.addHandler(newGfh);
0885: fileHandlers.put(l, newGfh);
0886: }
0887: }
0888:
0889: /**
0890: * Close all log files and remove handlers from loggers.
0891: */
0892: public void closeLogFiles() {
0893: for (Iterator i = fileHandlers.keySet().iterator(); i.hasNext();) {
0894: Logger l = (Logger) i.next();
0895: GenerationFileHandler gfh = (GenerationFileHandler) fileHandlers
0896: .get(l);
0897: gfh.close();
0898: l.removeHandler(gfh);
0899: }
0900: }
0901:
0902: /**
0903: * Sets the values for max bytes, docs and time based on crawl order.
0904: */
0905: private void setThresholds() {
0906: try {
0907: maxBytes = ((Long) order
0908: .getAttribute(CrawlOrder.ATTR_MAX_BYTES_DOWNLOAD))
0909: .longValue();
0910: } catch (Exception e) {
0911: maxBytes = 0;
0912: }
0913: try {
0914: maxDocument = ((Long) order
0915: .getAttribute(CrawlOrder.ATTR_MAX_DOCUMENT_DOWNLOAD))
0916: .longValue();
0917: } catch (Exception e) {
0918: maxDocument = 0;
0919: }
0920: try {
0921: maxTime = ((Long) order
0922: .getAttribute(CrawlOrder.ATTR_MAX_TIME_SEC))
0923: .longValue();
0924: } catch (Exception e) {
0925: maxTime = 0;
0926: }
0927: }
0928:
0929: /**
0930: * @return Object this controller is using to track crawl statistics
0931: */
0932: public StatisticsTracking getStatistics() {
0933: return statistics == null ? new StatisticsTracker(
0934: "crawl-statistics") : this .statistics;
0935: }
0936:
0937: /**
0938: * Send crawl change event to all listeners.
0939: * @param newState State change we're to tell listeners' about.
0940: * @param message Message on state change.
0941: * @see #sendCheckpointEvent(File) for special case event sending
0942: * telling listeners to checkpoint.
0943: */
0944: protected void sendCrawlStateChangeEvent(Object newState,
0945: String message) {
0946: synchronized (this .registeredCrawlStatusListeners) {
0947: this .state = newState;
0948: for (Iterator i = this .registeredCrawlStatusListeners
0949: .iterator(); i.hasNext();) {
0950: CrawlStatusListener l = (CrawlStatusListener) i.next();
0951: if (newState.equals(PAUSED)) {
0952: l.crawlPaused(message);
0953: } else if (newState.equals(RUNNING)) {
0954: l.crawlResuming(message);
0955: } else if (newState.equals(PAUSING)) {
0956: l.crawlPausing(message);
0957: } else if (newState.equals(STARTED)) {
0958: l.crawlStarted(message);
0959: } else if (newState.equals(STOPPING)) {
0960: l.crawlEnding(message);
0961: } else if (newState.equals(FINISHED)) {
0962: l.crawlEnded(message);
0963: } else if (newState.equals(PREPARING)) {
0964: l.crawlResuming(message);
0965: } else {
0966: throw new RuntimeException("Unknown state: "
0967: + newState);
0968: }
0969: if (LOGGER.isLoggable(Level.FINE)) {
0970: LOGGER.fine("Sent " + newState + " to " + l);
0971: }
0972: }
0973: LOGGER.fine("Sent " + newState);
0974: }
0975: }
0976:
0977: /**
0978: * Send the checkpoint event.
0979: * Has its own method apart from
0980: * {@link #sendCrawlStateChangeEvent(Object, String)} because checkpointing
0981: * throws an Exception (Didn't want to have to wrap all of the
0982: * sendCrawlStateChangeEvent in try/catches).
0983: * @param checkpointDir Where to write checkpoint state to.
0984: * @throws Exception
0985: */
0986: protected void sendCheckpointEvent(File checkpointDir)
0987: throws Exception {
0988: synchronized (this .registeredCrawlStatusListeners) {
0989: if (this .state != PAUSED) {
0990: throw new IllegalStateException(
0991: "Crawler must be completly "
0992: + "paused before checkpointing can start");
0993: }
0994: this .state = CHECKPOINTING;
0995: for (Iterator i = this .registeredCrawlStatusListeners
0996: .iterator(); i.hasNext();) {
0997: CrawlStatusListener l = (CrawlStatusListener) i.next();
0998: l.crawlCheckpoint(checkpointDir);
0999: if (LOGGER.isLoggable(Level.FINE)) {
1000: LOGGER.fine("Sent " + CHECKPOINTING + " to " + l);
1001: }
1002: }
1003: LOGGER.fine("Sent " + CHECKPOINTING);
1004: }
1005: }
1006:
1007: /**
1008: * Operator requested crawl begin
1009: */
1010: public void requestCrawlStart() {
1011: runProcessorInitialTasks();
1012:
1013: sendCrawlStateChangeEvent(STARTED, CrawlJob.STATUS_PENDING);
1014: String jobState;
1015: state = RUNNING;
1016: jobState = CrawlJob.STATUS_RUNNING;
1017: sendCrawlStateChangeEvent(this .state, jobState);
1018:
1019: // A proper exit will change this value.
1020: this .sExit = CrawlJob.STATUS_FINISHED_ABNORMAL;
1021:
1022: Thread statLogger = new Thread(statistics);
1023: statLogger.setName("StatLogger");
1024: statLogger.start();
1025:
1026: frontier.start();
1027: }
1028:
1029: /**
1030: * Called when the last toethread exits.
1031: */
1032: protected void completeStop() {
1033: LOGGER.fine("Entered complete stop.");
1034: // Run processors' final tasks
1035: runProcessorFinalTasks();
1036: // Ok, now we are ready to exit.
1037: sendCrawlStateChangeEvent(FINISHED, this .sExit);
1038: synchronized (this .registeredCrawlStatusListeners) {
1039: // Remove all listeners now we're done with them.
1040: this .registeredCrawlStatusListeners
1041: .removeAll(this .registeredCrawlStatusListeners);
1042: this .registeredCrawlStatusListeners = null;
1043: }
1044:
1045: closeLogFiles();
1046:
1047: // Release reference to logger file handler instances.
1048: this .fileHandlers = null;
1049: this .uriErrors = null;
1050: this .uriProcessing = null;
1051: this .localErrors = null;
1052: this .runtimeErrors = null;
1053: this .progressStats = null;
1054: this .reports = null;
1055: this .manifest = null;
1056:
1057: // Do cleanup.
1058: this .statistics = null;
1059: this .frontier = null;
1060: this .disk = null;
1061: this .scratchDisk = null;
1062: this .order = null;
1063: this .scope = null;
1064: if (this .settingsHandler != null) {
1065: this .settingsHandler.cleanup();
1066: }
1067: this .settingsHandler = null;
1068: this .reserveMemory = null;
1069: this .processorChains = null;
1070: if (this .serverCache != null) {
1071: this .serverCache.cleanup();
1072: this .serverCache = null;
1073: }
1074: if (this .checkpointer != null) {
1075: this .checkpointer.cleanup();
1076: this .checkpointer = null;
1077: }
1078: if (this .bdbEnvironment != null) {
1079: try {
1080: this .bdbEnvironment.sync();
1081: this .bdbEnvironment.close();
1082: } catch (DatabaseException e) {
1083: e.printStackTrace();
1084: }
1085: this .bdbEnvironment = null;
1086: }
1087: this .bigmaps = null;
1088: if (this .toePool != null) {
1089: this .toePool.cleanup();
1090: // I played with launching a thread here to do cleanup of the
1091: // ToePool ThreadGroup (making sure the cleanup thread was not
1092: // in the ToePool ThreadGroup). Did this because ToePools seemed
1093: // to be sticking around holding references to CrawlController at
1094: // least. Need to spend more time looking to see that this is
1095: // still the case even after adding the above toePool#cleanup call.
1096: }
1097: this .toePool = null;
1098: LOGGER.fine("Finished crawl.");
1099: }
1100:
1101: synchronized void completePause() {
1102: // Send a notifyAll. At least checkpointing thread may be waiting on a
1103: // complete pause.
1104: notifyAll();
1105: sendCrawlStateChangeEvent(PAUSED, CrawlJob.STATUS_PAUSED);
1106: }
1107:
1108: private boolean shouldContinueCrawling() {
1109: if (frontier.isEmpty()) {
1110: this .sExit = CrawlJob.STATUS_FINISHED;
1111: return false;
1112: }
1113:
1114: if (maxBytes > 0 && frontier.totalBytesWritten() >= maxBytes) {
1115: // Hit the max byte download limit!
1116: sExit = CrawlJob.STATUS_FINISHED_DATA_LIMIT;
1117: return false;
1118: } else if (maxDocument > 0
1119: && frontier.succeededFetchCount() >= maxDocument) {
1120: // Hit the max document download limit!
1121: this .sExit = CrawlJob.STATUS_FINISHED_DOCUMENT_LIMIT;
1122: return false;
1123: } else if (maxTime > 0
1124: && statistics.crawlDuration() >= maxTime * 1000) {
1125: // Hit the max byte download limit!
1126: this .sExit = CrawlJob.STATUS_FINISHED_TIME_LIMIT;
1127: return false;
1128: }
1129: return state == RUNNING;
1130: }
1131:
1132: /**
1133: * Request a checkpoint.
1134: * Sets a checkpointing thread running.
1135: * @throws IllegalStateException Thrown if crawl is not in paused state
1136: * (Crawl must be first paused before checkpointing).
1137: */
1138: public synchronized void requestCrawlCheckpoint()
1139: throws IllegalStateException {
1140: if (this .checkpointer == null) {
1141: return;
1142: }
1143: if (this .checkpointer.isCheckpointing()) {
1144: throw new IllegalStateException(
1145: "Checkpoint already running.");
1146: }
1147: this .checkpointer.checkpoint();
1148: }
1149:
1150: /**
1151: * @return True if checkpointing.
1152: */
1153: public boolean isCheckpointing() {
1154: return this .state == CHECKPOINTING;
1155: }
1156:
1157: /**
1158: * Run checkpointing.
1159: * CrawlController takes care of managing the checkpointing/serializing
1160: * of bdb, the StatisticsTracker, and the CheckpointContext. Other
1161: * modules that want to revive themselves on checkpoint recovery need to
1162: * save state during their {@link CrawlStatusListener#crawlCheckpoint(File)}
1163: * invocation and then in their #initialize if a module,
1164: * or in their #initialTask if a processor, check with the CrawlController
1165: * if its checkpoint recovery. If it is, read in their old state from the
1166: * pointed to checkpoint directory.
1167: * <p>Default access only to be called by Checkpointer.
1168: * @throws Exception
1169: */
1170: void checkpoint() throws Exception {
1171: // Tell registered listeners to checkpoint.
1172: sendCheckpointEvent(this .checkpointer
1173: .getCheckpointInProgressDirectory());
1174:
1175: // Rotate off crawler logs.
1176: LOGGER.fine("Rotating log files.");
1177: rotateLogFiles(CURRENT_LOG_SUFFIX + "."
1178: + this .checkpointer.getNextCheckpointName());
1179:
1180: // Sync the BigMap contents to bdb, if their bdb bigmaps.
1181: LOGGER.fine("BigMaps.");
1182: checkpointBigMaps(this .checkpointer
1183: .getCheckpointInProgressDirectory());
1184:
1185: // Note, on deserialization, the super CrawlType#parent
1186: // needs to be restored. Parent is '/crawl-order/loggers'.
1187: // The settings handler for this module also needs to be
1188: // restored. Both of these fields are private in the
1189: // super class. Adding the restored ST to crawl order should take
1190: // care of this.
1191:
1192: // Checkpoint bdb environment.
1193: LOGGER.fine("Bdb environment.");
1194: checkpointBdb(this .checkpointer
1195: .getCheckpointInProgressDirectory());
1196:
1197: // Make copy of order, seeds, and settings.
1198: LOGGER.fine("Copying settings.");
1199: copySettings(this .checkpointer
1200: .getCheckpointInProgressDirectory());
1201:
1202: // Checkpoint this crawlcontroller.
1203: CheckpointUtils.writeObjectToFile(this , this .checkpointer
1204: .getCheckpointInProgressDirectory());
1205: }
1206:
1207: /**
1208: * Copy off the settings.
1209: * @param checkpointDir Directory to write checkpoint to.
1210: * @throws IOException
1211: */
1212: protected void copySettings(final File checkpointDir)
1213: throws IOException {
1214: final List files = this .settingsHandler.getListOfAllFiles();
1215: boolean copiedSettingsDir = false;
1216: final File settingsDir = new File(this .disk, "settings");
1217: for (final Iterator i = files.iterator(); i.hasNext();) {
1218: File f = new File((String) i.next());
1219: if (f.getAbsolutePath().startsWith(
1220: settingsDir.getAbsolutePath())) {
1221: if (copiedSettingsDir) {
1222: // Skip. We've already copied this member of the
1223: // settings directory.
1224: continue;
1225: }
1226: // Copy 'settings' dir all in one lump, not a file at a time.
1227: copiedSettingsDir = true;
1228: FileUtils.copyFiles(settingsDir, new File(
1229: checkpointDir, settingsDir.getName()));
1230: continue;
1231: }
1232: FileUtils.copyFiles(f, f.isDirectory() ? checkpointDir
1233: : new File(checkpointDir, f.getName()));
1234: }
1235: }
1236:
1237: /**
1238: * Checkpoint bdb.
1239: * I used do a call to log cleaning as suggested in je-2.0 javadoc but takes
1240: * way too much time (20minutes for a crawl of 1million items). Assume
1241: * cleaner is keeping up. Below was log cleaning loop .
1242: * <pre>int totalCleaned = 0;
1243: * for (int cleaned = 0; (cleaned = this.bdbEnvironment.cleanLog()) != 0;
1244: * totalCleaned += cleaned) {
1245: * LOGGER.fine("Cleaned " + cleaned + " log files.");
1246: * }
1247: * </pre>
1248: * <p>I also used to do a sync. But, from Mark Hayes, sync and checkpoint
1249: * are effectively same thing only sync is not configurable. He suggests
1250: * doing one or the other:
1251: * <p>MS: Reading code, Environment.sync() is a checkpoint. Looks like
1252: * I don't need to call a checkpoint after calling a sync?
1253: * <p>MH: Right, they're almost the same thing -- just do one or the other,
1254: * not both. With the new API, you'll need to do a checkpoint not a
1255: * sync, because the sync() method has no config parameter. Don't worry
1256: * -- it's fine to do a checkpoint even though you're not using.
1257: * @param checkpointDir Directory to write checkpoint to.
1258: * @throws DatabaseException
1259: * @throws IOException
1260: * @throws RuntimeException Thrown if failed setup of new bdb environment.
1261: */
1262: protected void checkpointBdb(File checkpointDir)
1263: throws DatabaseException, IOException, RuntimeException {
1264: EnvironmentConfig envConfig = this .bdbEnvironment.getConfig();
1265: final List bkgrdThreads = Arrays.asList(new String[] {
1266: "je.env.runCheckpointer", "je.env.runCleaner",
1267: "je.env.runINCompressor" });
1268: try {
1269: // Disable background threads
1270: setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "false");
1271: // Do a force checkpoint. Thats what a sync does (i.e. doSync).
1272: CheckpointConfig chkptConfig = new CheckpointConfig();
1273: chkptConfig.setForce(true);
1274:
1275: // Mark Hayes of sleepycat says:
1276: // "The default for this property is false, which gives the current
1277: // behavior (allow deltas). If this property is true, deltas are
1278: // prohibited -- full versions of internal nodes are always logged
1279: // during the checkpoint. When a full version of an internal node
1280: // is logged during a checkpoint, recovery does not need to process
1281: // it at all. It is only fetched if needed by the application,
1282: // during normal DB operations after recovery. When a delta of an
1283: // internal node is logged during a checkpoint, recovery must
1284: // process it by fetching the full version of the node from earlier
1285: // in the log, and then applying the delta to it. This can be
1286: // pretty slow, since it is potentially a large amount of
1287: // random I/O."
1288: chkptConfig.setMinimizeRecoveryTime(true);
1289: this .bdbEnvironment.checkpoint(chkptConfig);
1290: LOGGER.fine("Finished bdb checkpoint.");
1291:
1292: // From the sleepycat folks: A trick for flipping db logs.
1293: EnvironmentImpl envImpl = DbInternal
1294: .envGetEnvironmentImpl(this .bdbEnvironment);
1295: long firstFileInNextSet = DbLsn.getFileNumber(envImpl
1296: .forceLogFileFlip());
1297: // So the last file in the checkpoint is firstFileInNextSet - 1.
1298: // Write manifest of all log files into the bdb directory.
1299: final String lastBdbCheckpointLog = getBdbLogFileName(firstFileInNextSet - 1);
1300: processBdbLogs(checkpointDir, lastBdbCheckpointLog);
1301: LOGGER.fine("Finished processing bdb log files.");
1302: } finally {
1303: // Restore background threads.
1304: setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "true");
1305: }
1306: }
1307:
1308: protected void processBdbLogs(final File checkpointDir,
1309: final String lastBdbCheckpointLog) throws IOException {
1310: File bdbDir = CheckpointUtils.getBdbSubDirectory(checkpointDir);
1311: if (!bdbDir.exists()) {
1312: bdbDir.mkdir();
1313: }
1314: PrintWriter pw = new PrintWriter(new FileOutputStream(new File(
1315: checkpointDir, "bdbje-logs-manifest.txt")));
1316: try {
1317: // Don't copy any beyond the last bdb log file (bdbje can keep
1318: // writing logs after checkpoint).
1319: boolean pastLastLogFile = false;
1320: Set<String> srcFilenames = null;
1321: final boolean copyFiles = getCheckpointCopyBdbjeLogs();
1322: do {
1323: FilenameFilter filter = CheckpointUtils
1324: .getJeLogsFilter();
1325: srcFilenames = new HashSet<String>(Arrays
1326: .asList(getStateDisk().list(filter)));
1327: List tgtFilenames = Arrays.asList(bdbDir.list(filter));
1328: if (tgtFilenames != null && tgtFilenames.size() > 0) {
1329: srcFilenames.removeAll(tgtFilenames);
1330: }
1331: if (srcFilenames.size() > 0) {
1332: // Sort files.
1333: srcFilenames = new TreeSet<String>(srcFilenames);
1334: int count = 0;
1335: for (final Iterator i = srcFilenames.iterator(); i
1336: .hasNext()
1337: && !pastLastLogFile;) {
1338: String name = (String) i.next();
1339: if (copyFiles) {
1340: FileUtils.copyFiles(new File(
1341: getStateDisk(), name), new File(
1342: bdbDir, name));
1343: }
1344: pw.println(name);
1345: if (name.equals(lastBdbCheckpointLog)) {
1346: // We're done.
1347: pastLastLogFile = true;
1348: }
1349: count++;
1350: }
1351: if (LOGGER.isLoggable(Level.FINE)) {
1352: LOGGER.fine("Copied " + count);
1353: }
1354: }
1355: } while (!pastLastLogFile && srcFilenames != null
1356: && srcFilenames.size() > 0);
1357: } finally {
1358: pw.close();
1359: }
1360: }
1361:
1362: protected String getBdbLogFileName(final long index) {
1363: String lastBdbLogFileHex = Long.toHexString(index);
1364: StringBuffer buffer = new StringBuffer();
1365: for (int i = 0; i < (8 - lastBdbLogFileHex.length()); i++) {
1366: buffer.append('0');
1367: }
1368: buffer.append(lastBdbLogFileHex);
1369: buffer.append(".jdb");
1370: return buffer.toString();
1371: }
1372:
1373: protected void setBdbjeBkgrdThreads(final EnvironmentConfig config,
1374: final List threads, final String setting) {
1375: for (final Iterator i = threads.iterator(); i.hasNext();) {
1376: config.setConfigParam((String) i.next(), setting);
1377: }
1378: }
1379:
1380: /**
1381: * Get recover checkpoint.
1382: * Returns null if we're NOT in recover mode.
1383: * Looks at ATTR_RECOVER_PATH and if its a directory, assumes checkpoint
1384: * recover. If checkpoint mode, returns Checkpoint instance if
1385: * checkpoint was VALID (else null).
1386: * @return Checkpoint instance if we're in recover checkpoint
1387: * mode and the pointed-to checkpoint was valid.
1388: * @see #isCheckpointRecover()
1389: */
1390: public synchronized Checkpoint getCheckpointRecover() {
1391: if (this .checkpointRecover != null) {
1392: return this .checkpointRecover;
1393: }
1394: return getCheckpointRecover(this .order);
1395: }
1396:
1397: public static Checkpoint getCheckpointRecover(final CrawlOrder order) {
1398: String path = (String) order.getUncheckedAttribute(null,
1399: CrawlOrder.ATTR_RECOVER_PATH);
1400: if (path == null || path.length() <= 0) {
1401: return null;
1402: }
1403: File rp = new File(path);
1404: // Assume if path is to a directory, its a checkpoint recover.
1405: Checkpoint result = null;
1406: if (rp.exists() && rp.isDirectory()) {
1407: Checkpoint cp = new Checkpoint(rp);
1408: if (cp.isValid()) {
1409: // if valid, set as result.
1410: result = cp;
1411: }
1412: }
1413: return result;
1414: }
1415:
1416: public static boolean isCheckpointRecover(final CrawlOrder order) {
1417: return getCheckpointRecover(order) != null;
1418: }
1419:
1420: /**
1421: * @return True if we're in checkpoint recover mode. Call
1422: * {@link #getCheckpointRecover()} to get at Checkpoint instance
1423: * that has info on checkpoint directory being recovered from.
1424: */
1425: public boolean isCheckpointRecover() {
1426: return this .checkpointRecover != null;
1427: }
1428:
1429: /**
1430: * Operator requested for crawl to stop.
1431: */
1432: public synchronized void requestCrawlStop() {
1433: requestCrawlStop(CrawlJob.STATUS_ABORTED);
1434: }
1435:
1436: /**
1437: * Operator requested for crawl to stop.
1438: * @param message
1439: */
1440: public synchronized void requestCrawlStop(String message) {
1441: if (state == STOPPING || state == FINISHED) {
1442: return;
1443: }
1444: if (message == null) {
1445: throw new IllegalArgumentException(
1446: "Message cannot be null.");
1447: }
1448: this .sExit = message;
1449: beginCrawlStop();
1450: }
1451:
1452: /**
1453: * Start the process of stopping the crawl.
1454: */
1455: public void beginCrawlStop() {
1456: LOGGER.fine("Started.");
1457: sendCrawlStateChangeEvent(STOPPING, this .sExit);
1458: if (this .frontier != null) {
1459: this .frontier.terminate();
1460: this .frontier.unpause();
1461: }
1462: LOGGER.fine("Finished.");
1463: }
1464:
1465: /**
1466: * Stop the crawl temporarly.
1467: */
1468: public synchronized void requestCrawlPause() {
1469: if (state == PAUSING || state == PAUSED) {
1470: // Already about to pause
1471: return;
1472: }
1473: sExit = CrawlJob.STATUS_WAITING_FOR_PAUSE;
1474: frontier.pause();
1475: sendCrawlStateChangeEvent(PAUSING, this .sExit);
1476: if (toePool.getActiveToeCount() == 0) {
1477: // if all threads already held, complete pause now
1478: // (no chance to trigger off later held thread)
1479: completePause();
1480: }
1481: }
1482:
1483: /**
1484: * Tell if the controller is paused
1485: * @return true if paused
1486: */
1487: public boolean isPaused() {
1488: return state == PAUSED;
1489: }
1490:
1491: public boolean isPausing() {
1492: return state == PAUSING;
1493: }
1494:
1495: public boolean isRunning() {
1496: return state == RUNNING;
1497: }
1498:
1499: /**
1500: * Resume crawl from paused state
1501: */
1502: public synchronized void requestCrawlResume() {
1503: if (state != PAUSING && state != PAUSED
1504: && state != CHECKPOINTING) {
1505: // Can't resume if not been told to pause or if we're in middle of
1506: // a checkpoint.
1507: return;
1508: }
1509: multiThreadMode();
1510: frontier.unpause();
1511: LOGGER.fine("Crawl resumed.");
1512: sendCrawlStateChangeEvent(RUNNING, CrawlJob.STATUS_RUNNING);
1513: }
1514:
1515: /**
1516: * @return Active toe thread count.
1517: */
1518: public int getActiveToeCount() {
1519: if (toePool == null) {
1520: return 0;
1521: }
1522: return toePool.getActiveToeCount();
1523: }
1524:
1525: private void setupToePool() {
1526: toePool = new ToePool(this );
1527: // TODO: make # of toes self-optimizing
1528: toePool.setSize(order.getMaxToes());
1529: }
1530:
1531: /**
1532: * @return The order file instance.
1533: */
1534: public CrawlOrder getOrder() {
1535: return order;
1536: }
1537:
1538: /**
1539: * @return The server cache instance.
1540: */
1541: public ServerCache getServerCache() {
1542: return serverCache;
1543: }
1544:
1545: /**
1546: * @param o
1547: */
1548: public void setOrder(CrawlOrder o) {
1549: order = o;
1550: }
1551:
1552: /**
1553: * @return The frontier.
1554: */
1555: public Frontier getFrontier() {
1556: return frontier;
1557: }
1558:
1559: /**
1560: * @return This crawl scope.
1561: */
1562: public CrawlScope getScope() {
1563: return scope;
1564: }
1565:
1566: /** Get the list of processor chains.
1567: *
1568: * @return the list of processor chains.
1569: */
1570: public ProcessorChainList getProcessorChainList() {
1571: return processorChains;
1572: }
1573:
1574: /** Get the first processor chain.
1575: *
1576: * @return the first processor chain.
1577: */
1578: public ProcessorChain getFirstProcessorChain() {
1579: return processorChains.getFirstChain();
1580: }
1581:
1582: /** Get the postprocessor chain.
1583: *
1584: * @return the postprocessor chain.
1585: */
1586: public ProcessorChain getPostprocessorChain() {
1587: return processorChains.getLastChain();
1588: }
1589:
1590: /**
1591: * Get the 'working' directory of the current crawl.
1592: * @return the 'working' directory of the current crawl.
1593: */
1594: public File getDisk() {
1595: return disk;
1596: }
1597:
1598: /**
1599: * @return Scratch disk location.
1600: */
1601: public File getScratchDisk() {
1602: return scratchDisk;
1603: }
1604:
1605: /**
1606: * @return State disk location.
1607: */
1608: public File getStateDisk() {
1609: return stateDisk;
1610: }
1611:
1612: /**
1613: * @return The number of ToeThreads
1614: *
1615: * @see ToePool#getToeCount()
1616: */
1617: public int getToeCount() {
1618: return this .toePool == null ? 0 : this .toePool.getToeCount();
1619: }
1620:
1621: /**
1622: * @return The ToePool
1623: */
1624: public ToePool getToePool() {
1625: return toePool;
1626: }
1627:
1628: /**
1629: * @return toepool one-line report
1630: */
1631: public String oneLineReportThreads() {
1632: // TODO Auto-generated method stub
1633: return toePool.singleLineReport();
1634: }
1635:
1636: /**
1637: * While many settings will update automatically when the SettingsHandler is
1638: * modified, some settings need to be explicitly changed to reflect new
1639: * settings. This includes, number of toe threads and seeds.
1640: */
1641: public void kickUpdate() {
1642: toePool.setSize(order.getMaxToes());
1643:
1644: this .scope.kickUpdate();
1645: this .frontier.kickUpdate();
1646: this .processorChains.kickUpdate();
1647:
1648: // TODO: continue to generalize this, so that any major
1649: // component can get a kick when it may need to refresh its data
1650:
1651: setThresholds();
1652: }
1653:
1654: /**
1655: * @return The settings handler.
1656: */
1657: public SettingsHandler getSettingsHandler() {
1658: return settingsHandler;
1659: }
1660:
1661: /**
1662: * This method iterates through processor chains to run processors' initial
1663: * tasks.
1664: *
1665: */
1666: private void runProcessorInitialTasks() {
1667: for (Iterator ic = processorChains.iterator(); ic.hasNext();) {
1668: for (Iterator ip = ((ProcessorChain) ic.next()).iterator(); ip
1669: .hasNext();) {
1670: ((Processor) ip.next()).initialTasks();
1671: }
1672: }
1673: }
1674:
1675: /**
1676: * This method iterates through processor chains to run processors' final
1677: * tasks.
1678: *
1679: */
1680: private void runProcessorFinalTasks() {
1681: for (Iterator ic = processorChains.iterator(); ic.hasNext();) {
1682: for (Iterator ip = ((ProcessorChain) ic.next()).iterator(); ip
1683: .hasNext();) {
1684: ((Processor) ip.next()).finalTasks();
1685: }
1686: }
1687: }
1688:
1689: /**
1690: * Kills a thread. For details see
1691: * {@link org.archive.crawler.framework.ToePool#killThread(int, boolean)
1692: * ToePool.killThread(int, boolean)}.
1693: * @param threadNumber Thread to kill.
1694: * @param replace Should thread be replaced.
1695: * @see org.archive.crawler.framework.ToePool#killThread(int, boolean)
1696: */
1697: public void killThread(int threadNumber, boolean replace) {
1698: toePool.killThread(threadNumber, replace);
1699: }
1700:
1701: /**
1702: * Add a file to the manifest of files used/generated by the current
1703: * crawl.
1704: *
1705: * TODO: Its possible for a file to be added twice if reports are
1706: * force generated midcrawl. Fix.
1707: *
1708: * @param file The filename (with absolute path) of the file to add
1709: * @param type The type of the file
1710: * @param bundle Should the file be included in a typical bundling of
1711: * crawler files.
1712: *
1713: * @see #MANIFEST_CONFIG_FILE
1714: * @see #MANIFEST_LOG_FILE
1715: * @see #MANIFEST_REPORT_FILE
1716: */
1717: public void addToManifest(String file, char type, boolean bundle) {
1718: manifest
1719: .append(type + (bundle ? "+" : "-") + " " + file + "\n");
1720: }
1721:
1722: /**
1723: * Evaluate if the crawl should stop because it is finished.
1724: */
1725: public void checkFinish() {
1726: if (atFinish()) {
1727: beginCrawlStop();
1728: }
1729: }
1730:
1731: /**
1732: * Evaluate if the crawl should stop because it is finished,
1733: * without actually stopping the crawl.
1734: *
1735: * @return true if crawl is at a finish-possible state
1736: */
1737: public boolean atFinish() {
1738: return state == RUNNING && !shouldContinueCrawling();
1739: }
1740:
1741: private void readObject(ObjectInputStream stream)
1742: throws IOException, ClassNotFoundException {
1743: stream.defaultReadObject();
1744: // Setup status listeners
1745: this .registeredCrawlStatusListeners = Collections
1746: .synchronizedList(new ArrayList<CrawlStatusListener>());
1747: // Ensure no holdover singleThreadMode
1748: singleThreadMode = false;
1749: }
1750:
1751: /**
1752: * Go to single thread mode, where only one ToeThread may
1753: * proceed at a time. Also acquires the single lock, so
1754: * no further threads will proceed past an
1755: * acquireContinuePermission. Caller mush be sure to release
1756: * lock to allow other threads to proceed one at a time.
1757: */
1758: public void singleThreadMode() {
1759: this .singleThreadLock.lock();
1760: singleThreadMode = true;
1761: }
1762:
1763: /**
1764: * Go to back to regular multi thread mode, where all
1765: * ToeThreads may proceed at once
1766: */
1767: public void multiThreadMode() {
1768: this .singleThreadLock.lock();
1769: singleThreadMode = false;
1770: while (this .singleThreadLock.isHeldByCurrentThread()) {
1771: this .singleThreadLock.unlock();
1772: }
1773: }
1774:
1775: /**
1776: * Proceed only if allowed, giving CrawlController a chance
1777: * to enforce single-thread mode.
1778: */
1779: public void acquireContinuePermission() {
1780: if (singleThreadMode) {
1781: this .singleThreadLock.lock();
1782: if (!singleThreadMode) {
1783: // If changed while waiting, ignore
1784: while (this .singleThreadLock.isHeldByCurrentThread()) {
1785: this .singleThreadLock.unlock();
1786: }
1787: }
1788: } // else, permission is automatic
1789: }
1790:
1791: /**
1792: * Relinquish continue permission at end of processing (allowing
1793: * another thread to proceed if in single-thread mode).
1794: */
1795: public void releaseContinuePermission() {
1796: if (singleThreadMode) {
1797: while (this .singleThreadLock.isHeldByCurrentThread()) {
1798: this .singleThreadLock.unlock();
1799: }
1800: } // else do nothing;
1801: }
1802:
1803: public void freeReserveMemory() {
1804: if (!reserveMemory.isEmpty()) {
1805: reserveMemory.removeLast();
1806: System.gc();
1807: }
1808: }
1809:
1810: /**
1811: * Note that a ToeThread reached paused condition, possibly
1812: * completing the crawl-pause.
1813: */
1814: public synchronized void toePaused() {
1815: releaseContinuePermission();
1816: if (state == PAUSING && toePool.getActiveToeCount() == 0) {
1817: completePause();
1818: }
1819: }
1820:
1821: /**
1822: * Note that a ToeThread ended, possibly completing the crawl-stop.
1823: */
1824: public synchronized void toeEnded() {
1825: if (state == STOPPING && toePool.getActiveToeCount() == 0) {
1826: completeStop();
1827: }
1828: }
1829:
1830: /**
1831: * Add order file contents to manifest.
1832: * Write configuration files and any files managed by CrawlController to
1833: * it - files managed by other classes, excluding the settings framework,
1834: * are responsible for adding their files to the manifest themselves.
1835: * by calling addToManifest.
1836: * Call before writing out reports.
1837: */
1838: public void addOrderToManifest() {
1839: for (Iterator it = getSettingsHandler().getListOfAllFiles()
1840: .iterator(); it.hasNext();) {
1841: addToManifest((String) it.next(),
1842: CrawlController.MANIFEST_CONFIG_FILE, true);
1843: }
1844: }
1845:
1846: /**
1847: * Log a URIException from deep inside other components to the crawl's
1848: * shared log.
1849: *
1850: * @param e URIException encountered
1851: * @param u CrawlURI where problem occurred
1852: * @param l String which could not be interpreted as URI without exception
1853: */
1854: public void logUriError(URIException e, UURI u, CharSequence l) {
1855: if (e.getReasonCode() == UURIFactory.IGNORED_SCHEME) {
1856: // don't log those that are intentionally ignored
1857: return;
1858: }
1859: Object[] array = { u, l };
1860: uriErrors.log(Level.INFO, e.getMessage(), array);
1861: }
1862:
1863: //
1864: // Reporter
1865: //
1866: public final static String PROCESSORS_REPORT = "processors";
1867: public final static String MANIFEST_REPORT = "manifest";
1868: protected final static String[] REPORTS = { PROCESSORS_REPORT,
1869: MANIFEST_REPORT };
1870:
1871: /* (non-Javadoc)
1872: * @see org.archive.util.Reporter#getReports()
1873: */
1874: public String[] getReports() {
1875: return REPORTS;
1876: }
1877:
1878: /* (non-Javadoc)
1879: * @see org.archive.util.Reporter#reportTo(java.io.Writer)
1880: */
1881: public void reportTo(PrintWriter writer) {
1882: reportTo(null, writer);
1883: }
1884:
1885: public String singleLineReport() {
1886: return ArchiveUtils.singleLineReport(this );
1887: }
1888:
1889: public void reportTo(String name, PrintWriter writer) {
1890: if (PROCESSORS_REPORT.equals(name)) {
1891: reportProcessorsTo(writer);
1892: return;
1893: } else if (MANIFEST_REPORT.equals(name)) {
1894: reportManifestTo(writer);
1895: return;
1896: } else if (name != null) {
1897: writer.println("requested report unknown: " + name);
1898: }
1899: singleLineReportTo(writer);
1900: }
1901:
1902: /**
1903: * @param writer Where to write report to.
1904: */
1905: protected void reportManifestTo(PrintWriter writer) {
1906: writer.print(manifest.toString());
1907: }
1908:
1909: /**
1910: * Compiles and returns a human readable report on the active processors.
1911: * @param writer Where to write to.
1912: * @see org.archive.crawler.framework.Processor#report()
1913: */
1914: protected void reportProcessorsTo(PrintWriter writer) {
1915: writer.print("Processors report - "
1916: + ArchiveUtils.get12DigitDate() + "\n");
1917: writer.print(" Job being crawled: "
1918: + getOrder().getCrawlOrderName() + "\n");
1919:
1920: writer.print(" Number of Processors: "
1921: + processorChains.processorCount() + "\n");
1922: writer
1923: .print(" NOTE: Some processors may not return a report!\n\n");
1924:
1925: for (Iterator ic = processorChains.iterator(); ic.hasNext();) {
1926: for (Iterator ip = ((ProcessorChain) ic.next()).iterator(); ip
1927: .hasNext();) {
1928: writer.print(((Processor) ip.next()).report());
1929: }
1930: }
1931: }
1932:
1933: public void singleLineReportTo(PrintWriter writer) {
1934: // TODO: imrpvoe to be summary of crawl state
1935: writer.write("[Crawl Controller]\n");
1936: }
1937:
1938: public String singleLineLegend() {
1939: // TODO improve
1940: return "nothingYet";
1941: }
1942:
1943: /**
1944: * Call this method to get instance of the crawler BigMap implementation.
1945: * A "BigMap" is a Map that knows how to manage ever-growing sets of
1946: * key/value pairs. If we're in a checkpoint recovery, this method will
1947: * manage reinstantiation of checkpointed bigmaps.
1948: * @param dbName Name to give any associated database. Also used
1949: * as part of name serializing out bigmap. Needs to be unique to a crawl.
1950: * @param keyClass Class of keys we'll be using.
1951: * @param valueClass Class of values we'll be using.
1952: * @return Map that knows how to carry large sets of key/value pairs or
1953: * if none available, returns instance of HashMap.
1954: * @throws Exception
1955: */
1956: public <K, V> Map<K, V> getBigMap(final String dbName,
1957: final Class<? super K> keyClass,
1958: final Class<? super V> valueClass) throws Exception {
1959: CachedBdbMap<K, V> result = new CachedBdbMap<K, V>(dbName);
1960: if (isCheckpointRecover()) {
1961: File baseDir = getCheckpointRecover().getDirectory();
1962: @SuppressWarnings("unchecked")
1963: CachedBdbMap<K, V> temp = CheckpointUtils
1964: .readObjectFromFile(result.getClass(), dbName,
1965: baseDir);
1966: result = temp;
1967: }
1968: result.initialize(getBdbEnvironment(), keyClass, valueClass,
1969: getBdbEnvironment().getClassCatalog());
1970: // Save reference to all big maps made so can manage their
1971: // checkpointing.
1972: this .bigmaps.put(dbName, result);
1973: return result;
1974: }
1975:
1976: protected void checkpointBigMaps(final File cpDir) throws Exception {
1977: for (final Iterator i = this .bigmaps.keySet().iterator(); i
1978: .hasNext();) {
1979: Object key = i.next();
1980: Object obj = this .bigmaps.get(key);
1981: // TODO: I tried adding sync to custom serialization of BigMap
1982: // implementation but data member counts of the BigMap
1983: // implementation were not being persisted properly. Look at
1984: // why. For now, do sync in advance of serialization for now.
1985: ((CachedBdbMap) obj).sync();
1986: CheckpointUtils.writeObjectToFile(obj, (String) key, cpDir);
1987: }
1988: }
1989:
1990: /**
1991: * Called whenever progress statistics logging event.
1992: * @param e Progress statistics event.
1993: */
1994: public void progressStatisticsEvent(final EventObject e) {
1995: // Default is to do nothing. Subclass if you want to catch this event.
1996: // Later, if demand, add publisher/listener support. Currently hacked
1997: // in so the subclass in CrawlJob added to support JMX can send
1998: // notifications of progressStatistics change.
1999: }
2000:
2001: /**
2002: * Log to the progress statistics log.
2003: * @param msg Message to write the progress statistics log.
2004: */
2005: public void logProgressStatistics(final String msg) {
2006: this .progressStats.info(msg);
2007: }
2008:
2009: /**
2010: * @return CrawlController state.
2011: */
2012: public Object getState() {
2013: return this .state;
2014: }
2015:
2016: public File getCheckpointsDisk() {
2017: return this.checkpointsDisk;
2018: }
2019: }
|