Source Code Cross Referenced for CrawlController.java in  » Web-Crawler » heritrix » org » archive » crawler » framework » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Web Crawler » heritrix » org.archive.crawler.framework 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /* Copyright (C) 2003 Internet Archive.
0002:         *
0003:         * This file is part of the Heritrix web crawler (crawler.archive.org).
0004:         *
0005:         * Heritrix is free software; you can redistribute it and/or modify
0006:         * it under the terms of the GNU Lesser Public License as published by
0007:         * the Free Software Foundation; either version 2.1 of the License, or
0008:         * any later version.
0009:         *
0010:         * Heritrix is distributed in the hope that it will be useful,
0011:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0012:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0013:         * GNU Lesser Public License for more details.
0014:         *
0015:         * You should have received a copy of the GNU Lesser Public License
0016:         * along with Heritrix; if not, write to the Free Software
0017:         * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
0018:         *
0019:         * CrawlController.java
0020:         * Created on May 14, 2003
0021:         *
0022:         * $Id: CrawlController.java 4917 2007-02-20 22:15:23Z gojomo $
0023:         */
0024:        package org.archive.crawler.framework;
0025:
0026:        import java.io.File;
0027:        import java.io.FileOutputStream;
0028:        import java.io.FilenameFilter;
0029:        import java.io.IOException;
0030:        import java.io.ObjectInputStream;
0031:        import java.io.PrintWriter;
0032:        import java.io.Serializable;
0033:        import java.util.ArrayList;
0034:        import java.util.Arrays;
0035:        import java.util.Collections;
0036:        import java.util.EventObject;
0037:        import java.util.HashMap;
0038:        import java.util.HashSet;
0039:        import java.util.Hashtable;
0040:        import java.util.Iterator;
0041:        import java.util.LinkedList;
0042:        import java.util.List;
0043:        import java.util.Map;
0044:        import java.util.Set;
0045:        import java.util.TreeSet;
0046:        import java.util.concurrent.locks.ReentrantLock;
0047:        import java.util.logging.FileHandler;
0048:        import java.util.logging.Formatter;
0049:        import java.util.logging.Level;
0050:        import java.util.logging.Logger;
0051:
0052:        import javax.management.AttributeNotFoundException;
0053:        import javax.management.InvalidAttributeValueException;
0054:        import javax.management.MBeanException;
0055:        import javax.management.ReflectionException;
0056:
0057:        import org.apache.commons.httpclient.URIException;
0058:        import org.archive.crawler.admin.CrawlJob;
0059:        import org.archive.crawler.admin.StatisticsTracker;
0060:        import org.archive.crawler.datamodel.Checkpoint;
0061:        import org.archive.crawler.datamodel.CrawlOrder;
0062:        import org.archive.crawler.datamodel.CrawlURI;
0063:        import org.archive.crawler.datamodel.ServerCache;
0064:        import org.archive.crawler.event.CrawlStatusListener;
0065:        import org.archive.crawler.event.CrawlURIDispositionListener;
0066:        import org.archive.crawler.framework.exceptions.FatalConfigurationException;
0067:        import org.archive.crawler.framework.exceptions.InitializationException;
0068:        import org.archive.crawler.io.LocalErrorFormatter;
0069:        import org.archive.crawler.io.RuntimeErrorFormatter;
0070:        import org.archive.crawler.io.StatisticsLogFormatter;
0071:        import org.archive.crawler.io.UriErrorFormatter;
0072:        import org.archive.crawler.io.UriProcessingFormatter;
0073:        import org.archive.crawler.settings.MapType;
0074:        import org.archive.crawler.settings.SettingsHandler;
0075:        import org.archive.crawler.util.CheckpointUtils;
0076:        import org.archive.io.GenerationFileHandler;
0077:        import org.archive.net.UURI;
0078:        import org.archive.net.UURIFactory;
0079:        import org.archive.util.ArchiveUtils;
0080:        import org.archive.util.CachedBdbMap;
0081:        import org.archive.util.FileUtils;
0082:        import org.archive.util.Reporter;
0083:        import org.archive.util.bdbje.EnhancedEnvironment;
0084:        import org.xbill.DNS.DClass;
0085:        import org.xbill.DNS.Lookup;
0086:
0087:        import com.sleepycat.bind.serial.StoredClassCatalog;
0088:        import com.sleepycat.je.CheckpointConfig;
0089:        import com.sleepycat.je.Database;
0090:        import com.sleepycat.je.DatabaseException;
0091:        import com.sleepycat.je.DbInternal;
0092:        import com.sleepycat.je.EnvironmentConfig;
0093:        import com.sleepycat.je.dbi.EnvironmentImpl;
0094:        import com.sleepycat.je.utilint.DbLsn;
0095:
0096:        /**
0097:         * CrawlController collects all the classes which cooperate to
0098:         * perform a crawl and provides a high-level interface to the
0099:         * running crawl.
0100:         *
0101:         * As the "global context" for a crawl, subcomponents will
0102:         * often reach each other through the CrawlController.
0103:         *
0104:         * @author Gordon Mohr
0105:         */
0106:        public class CrawlController implements  Serializable, Reporter {
0107:            // be robust against trivial implementation changes
0108:            private static final long serialVersionUID = ArchiveUtils
0109:                    .classnameBasedUID(CrawlController.class, 1);
0110:
0111:            /**
0112:             * Messages from the crawlcontroller.
0113:             *
0114:             * They appear on console.
0115:             */
0116:            private final static Logger LOGGER = Logger
0117:                    .getLogger(CrawlController.class.getName());
0118:
0119:            // manifest support
0120:            /** abbrieviation label for config files in manifest */
0121:            public static final char MANIFEST_CONFIG_FILE = 'C';
0122:            /** abbrieviation label for report files in manifest */
0123:            public static final char MANIFEST_REPORT_FILE = 'R';
0124:            /** abbrieviation label for log files in manifest */
0125:            public static final char MANIFEST_LOG_FILE = 'L';
0126:
0127:            // key log names
0128:            private static final String LOGNAME_PROGRESS_STATISTICS = "progress-statistics";
0129:            private static final String LOGNAME_URI_ERRORS = "uri-errors";
0130:            private static final String LOGNAME_RUNTIME_ERRORS = "runtime-errors";
0131:            private static final String LOGNAME_LOCAL_ERRORS = "local-errors";
0132:            private static final String LOGNAME_CRAWL = "crawl";
0133:
0134:            // key subcomponents which define and implement a crawl in progress
0135:            private transient CrawlOrder order;
0136:            private transient CrawlScope scope;
0137:            private transient ProcessorChainList processorChains;
0138:
0139:            private transient Frontier frontier;
0140:
0141:            private transient ToePool toePool;
0142:
0143:            private transient ServerCache serverCache;
0144:
0145:            // This gets passed into the initialize method.
0146:            private transient SettingsHandler settingsHandler;
0147:
0148:            // Used to enable/disable single-threaded operation after OOM
0149:            private volatile transient boolean singleThreadMode = false;
0150:            private transient ReentrantLock singleThreadLock = null;
0151:
0152:            // emergency reserve of memory to allow some progress/reporting after OOM
0153:            private transient LinkedList<char[]> reserveMemory;
0154:            private static final int RESERVE_BLOCKS = 1;
0155:            private static final int RESERVE_BLOCK_SIZE = 6 * 2 ^ 20; // 6MB
0156:
0157:            // crawl state: as requested or actual
0158:
0159:            /**
0160:             * Crawl exit status.
0161:             */
0162:            private transient String sExit;
0163:
0164:            private static final Object NASCENT = "NASCENT".intern();
0165:            private static final Object RUNNING = "RUNNING".intern();
0166:            private static final Object PAUSED = "PAUSED".intern();
0167:            private static final Object PAUSING = "PAUSING".intern();
0168:            private static final Object CHECKPOINTING = "CHECKPOINTING"
0169:                    .intern();
0170:            private static final Object STOPPING = "STOPPING".intern();
0171:            private static final Object FINISHED = "FINISHED".intern();
0172:            private static final Object STARTED = "STARTED".intern();
0173:            private static final Object PREPARING = "PREPARING".intern();
0174:
0175:            transient private Object state = NASCENT;
0176:
0177:            // disk paths
0178:            private transient File disk; // overall disk path
0179:            private transient File logsDisk; // for log files
0180:
0181:            /**
0182:             * For temp files representing state of crawler (eg queues)
0183:             */
0184:            private transient File stateDisk;
0185:
0186:            /**
0187:             * For discardable temp files (eg fetch buffers).
0188:             */
0189:            private transient File scratchDisk;
0190:
0191:            /**
0192:             * Directory that holds checkpoint.
0193:             */
0194:            private transient File checkpointsDisk;
0195:
0196:            /**
0197:             * Checkpointer.
0198:             * Knows if checkpoint in progress and what name of checkpoint is.  Also runs
0199:             * checkpoints.
0200:             */
0201:            private Checkpointer checkpointer;
0202:
0203:            /**
0204:             * Gets set to checkpoint we're in recovering if in checkpoint recover
0205:             * mode.  Gets setup by {@link #getCheckpointRecover()}.
0206:             */
0207:            private transient Checkpoint checkpointRecover = null;
0208:
0209:            // crawl limits
0210:            private long maxBytes;
0211:            private long maxDocument;
0212:            private long maxTime;
0213:
0214:            /**
0215:             * A manifest of all files used/created during this crawl. Written to file
0216:             * at the end of the crawl (the absolutely last thing done).
0217:             */
0218:            private StringBuffer manifest;
0219:
0220:            /**
0221:             * Record of fileHandlers established for loggers,
0222:             * assisting file rotation.
0223:             */
0224:            transient private Map<Logger, FileHandler> fileHandlers;
0225:
0226:            /** suffix to use on active logs */
0227:            public static final String CURRENT_LOG_SUFFIX = ".log";
0228:
0229:            /**
0230:             * Crawl progress logger.
0231:             *
0232:             * No exceptions.  Logs summary result of each url processing.
0233:             */
0234:            public transient Logger uriProcessing;
0235:
0236:            /**
0237:             * This logger contains unexpected runtime errors.
0238:             *
0239:             * Would contain errors trying to set up a job or failures inside
0240:             * processors that they are not prepared to recover from.
0241:             */
0242:            public transient Logger runtimeErrors;
0243:
0244:            /**
0245:             * This logger is for job-scoped logging, specifically errors which
0246:             * happen and are handled within a particular processor.
0247:             *
0248:             * Examples would be socket timeouts, exceptions thrown by extractors, etc.
0249:             */
0250:            public transient Logger localErrors;
0251:
0252:            /**
0253:             * Special log for URI format problems, wherever they may occur.
0254:             */
0255:            public transient Logger uriErrors;
0256:
0257:            /**
0258:             * Statistics tracker writes here at regular intervals.
0259:             */
0260:            private transient Logger progressStats;
0261:
0262:            /**
0263:             * Logger to hold job summary report.
0264:             *
0265:             * Large state reports made at infrequent intervals (e.g. job ending) go
0266:             * here.
0267:             */
0268:            public transient Logger reports;
0269:
0270:            protected StatisticsTracking statistics = null;
0271:
0272:            /**
0273:             * List of crawl status listeners.
0274:             *
0275:             * All iterations need to synchronize on this object if they're to avoid
0276:             * concurrent modification exceptions.
0277:             * See {@link java.util.Collections#synchronizedList(List)}.
0278:             */
0279:            private transient List<CrawlStatusListener> registeredCrawlStatusListeners = Collections
0280:                    .synchronizedList(new ArrayList<CrawlStatusListener>());
0281:
0282:            // Since there is a high probability that there will only ever by one
0283:            // CrawlURIDispositionListner we will use this while there is only one:
0284:            private transient CrawlURIDispositionListener registeredCrawlURIDispositionListener;
0285:
0286:            // And then switch to the array once there is more then one.
0287:            protected transient ArrayList<CrawlURIDispositionListener> registeredCrawlURIDispositionListeners;
0288:
0289:            /** Shared bdb Environment for Frontier subcomponents */
0290:            // TODO: investigate using multiple environments to split disk accesses
0291:            // across separate physical disks
0292:            private transient EnhancedEnvironment bdbEnvironment = null;
0293:
0294:            /**
0295:             * Keep a list of all BigMap instance made -- shouldn't be many -- so that
0296:             * we can checkpoint.
0297:             */
0298:            private transient Map<String, CachedBdbMap<?, ?>> bigmaps = null;
0299:
0300:            /**
0301:             * Default constructor
0302:             */
0303:            public CrawlController() {
0304:                super ();
0305:                // Defer most setup to initialize methods
0306:            }
0307:
0308:            /**
0309:             * Starting from nothing, set up CrawlController and associated
0310:             * classes to be ready for a first crawl.
0311:             *
0312:             * @param sH Settings handler.
0313:             * @throws InitializationException
0314:             */
0315:            public void initialize(SettingsHandler sH)
0316:                    throws InitializationException {
0317:                sendCrawlStateChangeEvent(PREPARING, CrawlJob.STATUS_PREPARING);
0318:
0319:                this .singleThreadLock = new ReentrantLock();
0320:                this .settingsHandler = sH;
0321:                this .order = settingsHandler.getOrder();
0322:                this .order.setController(this );
0323:                this .bigmaps = new Hashtable<String, CachedBdbMap<?, ?>>();
0324:                sExit = "";
0325:                this .manifest = new StringBuffer();
0326:                String onFailMessage = "";
0327:                try {
0328:                    onFailMessage = "You must set the User-Agent and From HTTP"
0329:                            + " header values to acceptable strings. \n"
0330:                            + " User-Agent: [software-name](+[info-url])[misc]\n"
0331:                            + " From: [email-address]\n";
0332:                    order.checkUserAgentAndFrom();
0333:
0334:                    onFailMessage = "Unable to setup disk";
0335:                    if (disk == null) {
0336:                        setupDisk();
0337:                    }
0338:
0339:                    onFailMessage = "Unable to create log file(s)";
0340:                    setupLogs();
0341:
0342:                    // Figure if we're to do a checkpoint restore. If so, get the
0343:                    // checkpointRecover instance and then put into place the old bdb
0344:                    // log files. If any of the log files already exist in target state
0345:                    // diretory, WE DO NOT OVERWRITE (Makes for faster recovery).
0346:                    // CrawlController checkpoint recovery code manages restoration of
0347:                    // the old StatisticsTracker, any BigMaps used by the Crawler and
0348:                    // the moving of bdb log files into place only. Other objects
0349:                    // interested in recovery need to ask if
0350:                    // CrawlController#isCheckpointRecover is set to figure if in
0351:                    // recovery and then take appropriate recovery action
0352:                    // (These objects can call CrawlController#getCheckpointRecover
0353:                    // to get the directory that might hold files/objects dropped
0354:                    // checkpointing).  Such objects will need to use a technique other
0355:                    // than object serialization restoring settings because they'll
0356:                    // have already been constructed when comes time for object to ask
0357:                    // if its to recover itself. See ARCWriterProcessor for example.
0358:                    onFailMessage = "Unable to test/run checkpoint recover";
0359:                    this .checkpointRecover = getCheckpointRecover();
0360:                    if (this .checkpointRecover == null) {
0361:                        this .checkpointer = new Checkpointer(this ,
0362:                                this .checkpointsDisk);
0363:                    } else {
0364:                        setupCheckpointRecover();
0365:                    }
0366:
0367:                    onFailMessage = "Unable to setup bdb environment.";
0368:                    setupBdb();
0369:
0370:                    onFailMessage = "Unable to setup statistics";
0371:                    setupStatTracking();
0372:
0373:                    onFailMessage = "Unable to setup crawl modules";
0374:                    setupCrawlModules();
0375:                } catch (Exception e) {
0376:                    String tmp = "On crawl: "
0377:                            + settingsHandler.getSettingsObject(null).getName()
0378:                            + " " + onFailMessage;
0379:                    LOGGER.log(Level.SEVERE, tmp, e);
0380:                    throw new InitializationException(tmp, e);
0381:                }
0382:
0383:                // force creation of DNS Cache now -- avoids CacheCleaner in toe-threads group
0384:                // also cap size at 1 (we never wanta cached value; 0 is non-operative)
0385:                Lookup.getDefaultCache(DClass.IN).setMaxEntries(1);
0386:                //dns.getRecords("localhost", Type.A, DClass.IN);
0387:
0388:                setupToePool();
0389:                setThresholds();
0390:
0391:                reserveMemory = new LinkedList<char[]>();
0392:                for (int i = 1; i < RESERVE_BLOCKS; i++) {
0393:                    reserveMemory.add(new char[RESERVE_BLOCK_SIZE]);
0394:                }
0395:            }
0396:
0397:            /**
0398:             * Does setup of checkpoint recover.
0399:             * Copies bdb log files into state dir.
0400:             * @throws IOException
0401:             */
0402:            protected void setupCheckpointRecover() throws IOException {
0403:                long started = System.currentTimeMillis();
0404:                ;
0405:                if (LOGGER.isLoggable(Level.FINE)) {
0406:                    LOGGER
0407:                            .fine("Starting recovery setup -- copying into place "
0408:                                    + "bdbje log files -- for checkpoint named "
0409:                                    + this .checkpointRecover.getDisplayName());
0410:                }
0411:                // Mark context we're in a recovery.
0412:                this .checkpointer.recover(this );
0413:                this .progressStats.info("CHECKPOINT RECOVER "
0414:                        + this .checkpointRecover.getDisplayName());
0415:                // Copy the bdb log files to the state dir so we don't damage
0416:                // old checkpoint.  If thousands of log files, can take
0417:                // tens of minutes (1000 logs takes ~5 minutes to java copy,
0418:                // dependent upon hardware).  If log file already exists over in the
0419:                // target state directory, we do not overwrite -- we assume the log
0420:                // file in the target same as one we'd copy from the checkpoint dir.
0421:                File bdbSubDir = CheckpointUtils
0422:                        .getBdbSubDirectory(this .checkpointRecover
0423:                                .getDirectory());
0424:                FileUtils.copyFiles(bdbSubDir, CheckpointUtils
0425:                        .getJeLogsFilter(), getStateDisk(), true, false);
0426:                if (LOGGER.isLoggable(Level.INFO)) {
0427:                    LOGGER.info("Finished recovery setup for checkpoint named "
0428:                            + this .checkpointRecover.getDisplayName() + " in "
0429:                            + (System.currentTimeMillis() - started) + "ms.");
0430:                }
0431:            }
0432:
0433:            protected boolean getCheckpointCopyBdbjeLogs() {
0434:                return ((Boolean) this .order.getUncheckedAttribute(null,
0435:                        CrawlOrder.ATTR_CHECKPOINT_COPY_BDBJE_LOGS))
0436:                        .booleanValue();
0437:            }
0438:
0439:            private void setupBdb() throws FatalConfigurationException,
0440:                    AttributeNotFoundException {
0441:                EnvironmentConfig envConfig = new EnvironmentConfig();
0442:                envConfig.setAllowCreate(true);
0443:                int bdbCachePercent = ((Integer) this .order.getAttribute(null,
0444:                        CrawlOrder.ATTR_BDB_CACHE_PERCENT)).intValue();
0445:                if (bdbCachePercent > 0) {
0446:                    // Operator has expressed a preference; override BDB default or 
0447:                    // je.properties value
0448:                    envConfig.setCachePercent(bdbCachePercent);
0449:                }
0450:                envConfig.setLockTimeout(5000000); // 5 seconds
0451:                if (LOGGER.isLoggable(Level.FINEST)) {
0452:                    envConfig.setConfigParam("java.util.logging.level",
0453:                            "SEVERE");
0454:                    envConfig.setConfigParam("java.util.logging.level.evictor",
0455:                            "SEVERE");
0456:                    envConfig.setConfigParam(
0457:                            "java.util.logging.ConsoleHandler.on", "true");
0458:                }
0459:
0460:                if (!getCheckpointCopyBdbjeLogs()) {
0461:                    // If we are not copying files on checkpoint, then set bdbje to not
0462:                    // remove its log files so that its possible to later assemble
0463:                    // (manually) all needed to run a recovery using mix of current
0464:                    // bdbje logs and those its marked for deletion.
0465:                    envConfig.setConfigParam("je.cleaner.expunge", "false");
0466:                }
0467:
0468:                try {
0469:                    this .bdbEnvironment = new EnhancedEnvironment(
0470:                            getStateDisk(), envConfig);
0471:                    if (LOGGER.isLoggable(Level.FINE)) {
0472:                        // Write out the bdb configuration.
0473:                        envConfig = bdbEnvironment.getConfig();
0474:                        LOGGER.fine("BdbConfiguration: Cache percentage "
0475:                                + envConfig.getCachePercent() + ", cache size "
0476:                                + envConfig.getCacheSize());
0477:                    }
0478:                } catch (DatabaseException e) {
0479:                    e.printStackTrace();
0480:                    throw new FatalConfigurationException(e.getMessage());
0481:                }
0482:            }
0483:
0484:            /**
0485:             * @return the shared EnhancedEnvironment
0486:             */
0487:            public EnhancedEnvironment getBdbEnvironment() {
0488:                return this .bdbEnvironment;
0489:            }
0490:
0491:            /**
0492:             * @deprecated use EnhancedEnvironment's getClassCatalog() instead
0493:             */
0494:            public StoredClassCatalog getClassCatalog() {
0495:                return this .bdbEnvironment.getClassCatalog();
0496:            }
0497:
0498:            /**
0499:             * Register for CrawlStatus events.
0500:             *
0501:             * @param cl a class implementing the CrawlStatusListener interface
0502:             *
0503:             * @see CrawlStatusListener
0504:             */
0505:            public void addCrawlStatusListener(CrawlStatusListener cl) {
0506:                synchronized (this .registeredCrawlStatusListeners) {
0507:                    this .registeredCrawlStatusListeners.add(cl);
0508:                }
0509:            }
0510:
0511:            /**
0512:             * Register for CrawlURIDisposition events.
0513:             *
0514:             * @param cl a class implementing the CrawlURIDispostionListener interface
0515:             *
0516:             * @see CrawlURIDispositionListener
0517:             */
0518:            public void addCrawlURIDispositionListener(
0519:                    CrawlURIDispositionListener cl) {
0520:                registeredCrawlURIDispositionListener = null;
0521:                if (registeredCrawlURIDispositionListeners == null) {
0522:                    // First listener;
0523:                    registeredCrawlURIDispositionListener = cl;
0524:                    //Only used for the first one while it is the only one.
0525:                    registeredCrawlURIDispositionListeners = new ArrayList<CrawlURIDispositionListener>(
0526:                            1);
0527:                    //We expect it to be very small.
0528:                }
0529:                registeredCrawlURIDispositionListeners.add(cl);
0530:            }
0531:
0532:            /**
0533:             * Allows an external class to raise a CrawlURIDispostion
0534:             * crawledURISuccessful event that will be broadcast to all listeners that
0535:             * have registered with the CrawlController.
0536:             *
0537:             * @param curi - The CrawlURI that will be sent with the event notification.
0538:             *
0539:             * @see CrawlURIDispositionListener#crawledURISuccessful(CrawlURI)
0540:             */
0541:            public void fireCrawledURISuccessfulEvent(CrawlURI curi) {
0542:                if (registeredCrawlURIDispositionListener != null) {
0543:                    // Then we'll just use that.
0544:                    registeredCrawlURIDispositionListener
0545:                            .crawledURISuccessful(curi);
0546:                } else {
0547:                    // Go through the list.
0548:                    if (registeredCrawlURIDispositionListeners != null
0549:                            && registeredCrawlURIDispositionListeners.size() > 0) {
0550:                        Iterator it = registeredCrawlURIDispositionListeners
0551:                                .iterator();
0552:                        while (it.hasNext()) {
0553:                            ((CrawlURIDispositionListener) it.next())
0554:                                    .crawledURISuccessful(curi);
0555:                        }
0556:                    }
0557:                }
0558:            }
0559:
0560:            /**
0561:             * Allows an external class to raise a CrawlURIDispostion
0562:             * crawledURINeedRetry event that will be broadcast to all listeners that
0563:             * have registered with the CrawlController.
0564:             *
0565:             * @param curi - The CrawlURI that will be sent with the event notification.
0566:             *
0567:             * @see CrawlURIDispositionListener#crawledURINeedRetry(CrawlURI)
0568:             */
0569:            public void fireCrawledURINeedRetryEvent(CrawlURI curi) {
0570:                if (registeredCrawlURIDispositionListener != null) {
0571:                    // Then we'll just use that.
0572:                    registeredCrawlURIDispositionListener
0573:                            .crawledURINeedRetry(curi);
0574:                    return;
0575:                }
0576:
0577:                // Go through the list.
0578:                if (registeredCrawlURIDispositionListeners != null
0579:                        && registeredCrawlURIDispositionListeners.size() > 0) {
0580:                    for (Iterator i = registeredCrawlURIDispositionListeners
0581:                            .iterator(); i.hasNext();) {
0582:                        ((CrawlURIDispositionListener) i.next())
0583:                                .crawledURINeedRetry(curi);
0584:                    }
0585:                }
0586:            }
0587:
0588:            /**
0589:             * Allows an external class to raise a CrawlURIDispostion
0590:             * crawledURIDisregard event that will be broadcast to all listeners that
0591:             * have registered with the CrawlController.
0592:             * 
0593:             * @param curi -
0594:             *            The CrawlURI that will be sent with the event notification.
0595:             * 
0596:             * @see CrawlURIDispositionListener#crawledURIDisregard(CrawlURI)
0597:             */
0598:            public void fireCrawledURIDisregardEvent(CrawlURI curi) {
0599:                if (registeredCrawlURIDispositionListener != null) {
0600:                    // Then we'll just use that.
0601:                    registeredCrawlURIDispositionListener
0602:                            .crawledURIDisregard(curi);
0603:                } else {
0604:                    // Go through the list.
0605:                    if (registeredCrawlURIDispositionListeners != null
0606:                            && registeredCrawlURIDispositionListeners.size() > 0) {
0607:                        Iterator it = registeredCrawlURIDispositionListeners
0608:                                .iterator();
0609:                        while (it.hasNext()) {
0610:                            ((CrawlURIDispositionListener) it.next())
0611:                                    .crawledURIDisregard(curi);
0612:                        }
0613:                    }
0614:                }
0615:            }
0616:
0617:            /**
0618:             * Allows an external class to raise a CrawlURIDispostion crawledURIFailure event
0619:             * that will be broadcast to all listeners that have registered with the CrawlController.
0620:             *
0621:             * @param curi - The CrawlURI that will be sent with the event notification.
0622:             *
0623:             * @see CrawlURIDispositionListener#crawledURIFailure(CrawlURI)
0624:             */
0625:            public void fireCrawledURIFailureEvent(CrawlURI curi) {
0626:                if (registeredCrawlURIDispositionListener != null) {
0627:                    // Then we'll just use that.
0628:                    registeredCrawlURIDispositionListener
0629:                            .crawledURIFailure(curi);
0630:                } else {
0631:                    // Go through the list.
0632:                    if (registeredCrawlURIDispositionListeners != null
0633:                            && registeredCrawlURIDispositionListeners.size() > 0) {
0634:                        Iterator it = registeredCrawlURIDispositionListeners
0635:                                .iterator();
0636:                        while (it.hasNext()) {
0637:                            ((CrawlURIDispositionListener) it.next())
0638:                                    .crawledURIFailure(curi);
0639:                        }
0640:                    }
0641:                }
0642:            }
0643:
0644:            private void setupCrawlModules()
0645:                    throws FatalConfigurationException,
0646:                    AttributeNotFoundException, MBeanException,
0647:                    ReflectionException {
0648:                if (scope == null) {
0649:                    scope = (CrawlScope) order
0650:                            .getAttribute(CrawlScope.ATTR_NAME);
0651:                    scope.initialize(this );
0652:                }
0653:                try {
0654:                    this .serverCache = new ServerCache(this );
0655:                } catch (Exception e) {
0656:                    throw new FatalConfigurationException(
0657:                            "Unable to"
0658:                                    + " initialize frontier (Failed setup of ServerCache) "
0659:                                    + e);
0660:                }
0661:
0662:                if (this .frontier == null) {
0663:                    this .frontier = (Frontier) order
0664:                            .getAttribute(Frontier.ATTR_NAME);
0665:                    try {
0666:                        frontier.initialize(this );
0667:                        frontier.pause(); // Pause until begun
0668:                        // Run recovery if recoverPath points to a file (If it points
0669:                        // to a directory, its a checkpoint recovery).
0670:                        // TODO: make recover path relative to job root dir.
0671:                        if (!isCheckpointRecover()) {
0672:                            runFrontierRecover((String) order
0673:                                    .getAttribute(CrawlOrder.ATTR_RECOVER_PATH));
0674:                        }
0675:                    } catch (IOException e) {
0676:                        throw new FatalConfigurationException(
0677:                                "unable to initialize frontier: " + e);
0678:                    }
0679:                }
0680:
0681:                // Setup processors
0682:                if (processorChains == null) {
0683:                    processorChains = new ProcessorChainList(order);
0684:                }
0685:            }
0686:
0687:            protected void runFrontierRecover(String recoverPath)
0688:                    throws AttributeNotFoundException, MBeanException,
0689:                    ReflectionException, FatalConfigurationException {
0690:                if (recoverPath == null || recoverPath.length() <= 0) {
0691:                    return;
0692:                }
0693:                File f = new File(recoverPath);
0694:                if (!f.exists()) {
0695:                    LOGGER.severe("Recover file does not exist " + recoverPath);
0696:                    return;
0697:                }
0698:                if (!f.isFile()) {
0699:                    // Its a directory if supposed to be doing a checkpoint recover.
0700:                    return;
0701:                }
0702:                boolean retainFailures = ((Boolean) order
0703:                        .getAttribute(CrawlOrder.ATTR_RECOVER_RETAIN_FAILURES))
0704:                        .booleanValue();
0705:                try {
0706:                    frontier.importRecoverLog(recoverPath, retainFailures);
0707:                } catch (IOException e) {
0708:                    e.printStackTrace();
0709:                    throw (FatalConfigurationException) new FatalConfigurationException(
0710:                            "Recover.log " + recoverPath + " problem: " + e)
0711:                            .initCause(e);
0712:                }
0713:            }
0714:
0715:            private void setupDisk() throws AttributeNotFoundException {
0716:                String diskPath = (String) order.getAttribute(null,
0717:                        CrawlOrder.ATTR_DISK_PATH);
0718:                this .disk = getSettingsHandler()
0719:                        .getPathRelativeToWorkingDirectory(diskPath);
0720:                this .disk.mkdirs();
0721:                this .logsDisk = getSettingsDir(CrawlOrder.ATTR_LOGS_PATH);
0722:                this .checkpointsDisk = getSettingsDir(CrawlOrder.ATTR_CHECKPOINTS_PATH);
0723:                this .stateDisk = getSettingsDir(CrawlOrder.ATTR_STATE_PATH);
0724:                this .scratchDisk = getSettingsDir(CrawlOrder.ATTR_SCRATCH_PATH);
0725:            }
0726:
0727:            /**
0728:             * @return The logging directory or null if problem reading the settings.
0729:             */
0730:            public File getLogsDir() {
0731:                File f = null;
0732:                try {
0733:                    f = getSettingsDir(CrawlOrder.ATTR_LOGS_PATH);
0734:                } catch (AttributeNotFoundException e) {
0735:                    LOGGER.severe("Failed get of logs directory: "
0736:                            + e.getMessage());
0737:                }
0738:                return f;
0739:            }
0740:
0741:            /**
0742:             * Return fullpath to the directory named by <code>key</code>
0743:             * in settings.
0744:             * If directory does not exist, it and all intermediary dirs
0745:             * will be created.
0746:             * @param key Key to use going to settings.
0747:             * @return Full path to directory named by <code>key</code>.
0748:             * @throws AttributeNotFoundException
0749:             */
0750:            public File getSettingsDir(String key)
0751:                    throws AttributeNotFoundException {
0752:                String path = (String) order.getAttribute(null, key);
0753:                File f = new File(path);
0754:                if (!f.isAbsolute()) {
0755:                    f = new File(disk.getPath(), path);
0756:                }
0757:                if (!f.exists()) {
0758:                    f.mkdirs();
0759:                }
0760:                return f;
0761:            }
0762:
0763:            /**
0764:             * Setup the statistics tracker.
0765:             * The statistics object must be created before modules can use it.
0766:             * Do it here now so that when modules retrieve the object from the
0767:             * controller during initialization (which some do), its in place.
0768:             * @throws InvalidAttributeValueException
0769:             * @throws FatalConfigurationException
0770:             */
0771:            private void setupStatTracking()
0772:                    throws InvalidAttributeValueException,
0773:                    FatalConfigurationException {
0774:                MapType loggers = order.getLoggers();
0775:                final String cstName = "crawl-statistics";
0776:                if (loggers.isEmpty(null)) {
0777:                    if (!isCheckpointRecover() && this .statistics == null) {
0778:                        this .statistics = new StatisticsTracker(cstName);
0779:                    }
0780:                    loggers.addElement(null,
0781:                            (StatisticsTracker) this .statistics);
0782:                }
0783:
0784:                if (isCheckpointRecover()) {
0785:                    restoreStatisticsTracker(loggers, cstName);
0786:                }
0787:
0788:                for (Iterator it = loggers.iterator(null); it.hasNext();) {
0789:                    StatisticsTracking tracker = (StatisticsTracking) it.next();
0790:                    tracker.initialize(this );
0791:                    if (this .statistics == null) {
0792:                        this .statistics = tracker;
0793:                    }
0794:                }
0795:            }
0796:
0797:            protected void restoreStatisticsTracker(MapType loggers,
0798:                    String replaceName) throws FatalConfigurationException {
0799:                try {
0800:                    // Add the deserialized statstracker to the settings system.
0801:                    loggers
0802:                            .removeElement(loggers.globalSettings(),
0803:                                    replaceName);
0804:                    loggers.addElement(loggers.globalSettings(),
0805:                            (StatisticsTracker) this .statistics);
0806:                } catch (Exception e) {
0807:                    throw convertToFatalConfigurationException(e);
0808:                }
0809:            }
0810:
0811:            protected FatalConfigurationException convertToFatalConfigurationException(
0812:                    Exception e) {
0813:                FatalConfigurationException fce = new FatalConfigurationException(
0814:                        "Converted exception: " + e.getMessage());
0815:                fce.setStackTrace(e.getStackTrace());
0816:                return fce;
0817:            }
0818:
0819:            private void setupLogs() throws IOException {
0820:                String logsPath = logsDisk.getAbsolutePath()
0821:                        + File.separatorChar;
0822:                uriProcessing = Logger
0823:                        .getLogger(LOGNAME_CRAWL + "." + logsPath);
0824:                runtimeErrors = Logger.getLogger(LOGNAME_RUNTIME_ERRORS + "."
0825:                        + logsPath);
0826:                localErrors = Logger.getLogger(LOGNAME_LOCAL_ERRORS + "."
0827:                        + logsPath);
0828:                uriErrors = Logger.getLogger(LOGNAME_URI_ERRORS + "."
0829:                        + logsPath);
0830:                progressStats = Logger.getLogger(LOGNAME_PROGRESS_STATISTICS
0831:                        + "." + logsPath);
0832:
0833:                this .fileHandlers = new HashMap<Logger, FileHandler>();
0834:
0835:                setupLogFile(uriProcessing, logsPath + LOGNAME_CRAWL
0836:                        + CURRENT_LOG_SUFFIX, new UriProcessingFormatter(),
0837:                        true);
0838:
0839:                setupLogFile(runtimeErrors, logsPath + LOGNAME_RUNTIME_ERRORS
0840:                        + CURRENT_LOG_SUFFIX, new RuntimeErrorFormatter(), true);
0841:
0842:                setupLogFile(localErrors, logsPath + LOGNAME_LOCAL_ERRORS
0843:                        + CURRENT_LOG_SUFFIX, new LocalErrorFormatter(), true);
0844:
0845:                setupLogFile(uriErrors, logsPath + LOGNAME_URI_ERRORS
0846:                        + CURRENT_LOG_SUFFIX, new UriErrorFormatter(), true);
0847:
0848:                setupLogFile(progressStats, logsPath
0849:                        + LOGNAME_PROGRESS_STATISTICS + CURRENT_LOG_SUFFIX,
0850:                        new StatisticsLogFormatter(), true);
0851:
0852:            }
0853:
0854:            private void setupLogFile(Logger logger, String filename,
0855:                    Formatter f, boolean shouldManifest) throws IOException,
0856:                    SecurityException {
0857:                GenerationFileHandler fh = new GenerationFileHandler(filename,
0858:                        true, shouldManifest);
0859:                fh.setFormatter(f);
0860:                logger.addHandler(fh);
0861:                addToManifest(filename, MANIFEST_LOG_FILE, shouldManifest);
0862:                logger.setUseParentHandlers(false);
0863:                this .fileHandlers.put(logger, fh);
0864:            }
0865:
0866:            protected void rotateLogFiles(String generationSuffix)
0867:                    throws IOException {
0868:                if (this .state != PAUSED && this .state != CHECKPOINTING) {
0869:                    throw new IllegalStateException(
0870:                            "Pause crawl before requesting " + "log rotation.");
0871:                }
0872:                for (Iterator i = fileHandlers.keySet().iterator(); i.hasNext();) {
0873:                    Logger l = (Logger) i.next();
0874:                    GenerationFileHandler gfh = (GenerationFileHandler) fileHandlers
0875:                            .get(l);
0876:                    GenerationFileHandler newGfh = gfh.rotate(generationSuffix,
0877:                            CURRENT_LOG_SUFFIX);
0878:                    if (gfh.shouldManifest()) {
0879:                        addToManifest((String) newGfh.getFilenameSeries()
0880:                                .get(1), MANIFEST_LOG_FILE, newGfh
0881:                                .shouldManifest());
0882:                    }
0883:                    l.removeHandler(gfh);
0884:                    l.addHandler(newGfh);
0885:                    fileHandlers.put(l, newGfh);
0886:                }
0887:            }
0888:
0889:            /**
0890:             * Close all log files and remove handlers from loggers.
0891:             */
0892:            public void closeLogFiles() {
0893:                for (Iterator i = fileHandlers.keySet().iterator(); i.hasNext();) {
0894:                    Logger l = (Logger) i.next();
0895:                    GenerationFileHandler gfh = (GenerationFileHandler) fileHandlers
0896:                            .get(l);
0897:                    gfh.close();
0898:                    l.removeHandler(gfh);
0899:                }
0900:            }
0901:
0902:            /**
0903:             * Sets the values for max bytes, docs and time based on crawl order. 
0904:             */
0905:            private void setThresholds() {
0906:                try {
0907:                    maxBytes = ((Long) order
0908:                            .getAttribute(CrawlOrder.ATTR_MAX_BYTES_DOWNLOAD))
0909:                            .longValue();
0910:                } catch (Exception e) {
0911:                    maxBytes = 0;
0912:                }
0913:                try {
0914:                    maxDocument = ((Long) order
0915:                            .getAttribute(CrawlOrder.ATTR_MAX_DOCUMENT_DOWNLOAD))
0916:                            .longValue();
0917:                } catch (Exception e) {
0918:                    maxDocument = 0;
0919:                }
0920:                try {
0921:                    maxTime = ((Long) order
0922:                            .getAttribute(CrawlOrder.ATTR_MAX_TIME_SEC))
0923:                            .longValue();
0924:                } catch (Exception e) {
0925:                    maxTime = 0;
0926:                }
0927:            }
0928:
0929:            /**
0930:             * @return Object this controller is using to track crawl statistics
0931:             */
0932:            public StatisticsTracking getStatistics() {
0933:                return statistics == null ? new StatisticsTracker(
0934:                        "crawl-statistics") : this .statistics;
0935:            }
0936:
0937:            /**
0938:             * Send crawl change event to all listeners.
0939:             * @param newState State change we're to tell listeners' about.
0940:             * @param message Message on state change.
0941:             * @see #sendCheckpointEvent(File) for special case event sending
0942:             * telling listeners to checkpoint.
0943:             */
0944:            protected void sendCrawlStateChangeEvent(Object newState,
0945:                    String message) {
0946:                synchronized (this .registeredCrawlStatusListeners) {
0947:                    this .state = newState;
0948:                    for (Iterator i = this .registeredCrawlStatusListeners
0949:                            .iterator(); i.hasNext();) {
0950:                        CrawlStatusListener l = (CrawlStatusListener) i.next();
0951:                        if (newState.equals(PAUSED)) {
0952:                            l.crawlPaused(message);
0953:                        } else if (newState.equals(RUNNING)) {
0954:                            l.crawlResuming(message);
0955:                        } else if (newState.equals(PAUSING)) {
0956:                            l.crawlPausing(message);
0957:                        } else if (newState.equals(STARTED)) {
0958:                            l.crawlStarted(message);
0959:                        } else if (newState.equals(STOPPING)) {
0960:                            l.crawlEnding(message);
0961:                        } else if (newState.equals(FINISHED)) {
0962:                            l.crawlEnded(message);
0963:                        } else if (newState.equals(PREPARING)) {
0964:                            l.crawlResuming(message);
0965:                        } else {
0966:                            throw new RuntimeException("Unknown state: "
0967:                                    + newState);
0968:                        }
0969:                        if (LOGGER.isLoggable(Level.FINE)) {
0970:                            LOGGER.fine("Sent " + newState + " to " + l);
0971:                        }
0972:                    }
0973:                    LOGGER.fine("Sent " + newState);
0974:                }
0975:            }
0976:
0977:            /**
0978:             * Send the checkpoint event.
0979:             * Has its own method apart from
0980:             * {@link #sendCrawlStateChangeEvent(Object, String)} because checkpointing
0981:             * throws an Exception (Didn't want to have to wrap all of the
0982:             * sendCrawlStateChangeEvent in try/catches).
0983:             * @param checkpointDir Where to write checkpoint state to.
0984:             * @throws Exception
0985:             */
0986:            protected void sendCheckpointEvent(File checkpointDir)
0987:                    throws Exception {
0988:                synchronized (this .registeredCrawlStatusListeners) {
0989:                    if (this .state != PAUSED) {
0990:                        throw new IllegalStateException(
0991:                                "Crawler must be completly "
0992:                                        + "paused before checkpointing can start");
0993:                    }
0994:                    this .state = CHECKPOINTING;
0995:                    for (Iterator i = this .registeredCrawlStatusListeners
0996:                            .iterator(); i.hasNext();) {
0997:                        CrawlStatusListener l = (CrawlStatusListener) i.next();
0998:                        l.crawlCheckpoint(checkpointDir);
0999:                        if (LOGGER.isLoggable(Level.FINE)) {
1000:                            LOGGER.fine("Sent " + CHECKPOINTING + " to " + l);
1001:                        }
1002:                    }
1003:                    LOGGER.fine("Sent " + CHECKPOINTING);
1004:                }
1005:            }
1006:
1007:            /**
1008:             * Operator requested crawl begin
1009:             */
1010:            public void requestCrawlStart() {
1011:                runProcessorInitialTasks();
1012:
1013:                sendCrawlStateChangeEvent(STARTED, CrawlJob.STATUS_PENDING);
1014:                String jobState;
1015:                state = RUNNING;
1016:                jobState = CrawlJob.STATUS_RUNNING;
1017:                sendCrawlStateChangeEvent(this .state, jobState);
1018:
1019:                // A proper exit will change this value.
1020:                this .sExit = CrawlJob.STATUS_FINISHED_ABNORMAL;
1021:
1022:                Thread statLogger = new Thread(statistics);
1023:                statLogger.setName("StatLogger");
1024:                statLogger.start();
1025:
1026:                frontier.start();
1027:            }
1028:
1029:            /**
1030:             * Called when the last toethread exits.
1031:             */
1032:            protected void completeStop() {
1033:                LOGGER.fine("Entered complete stop.");
1034:                // Run processors' final tasks
1035:                runProcessorFinalTasks();
1036:                // Ok, now we are ready to exit.
1037:                sendCrawlStateChangeEvent(FINISHED, this .sExit);
1038:                synchronized (this .registeredCrawlStatusListeners) {
1039:                    // Remove all listeners now we're done with them.
1040:                    this .registeredCrawlStatusListeners
1041:                            .removeAll(this .registeredCrawlStatusListeners);
1042:                    this .registeredCrawlStatusListeners = null;
1043:                }
1044:
1045:                closeLogFiles();
1046:
1047:                // Release reference to logger file handler instances.
1048:                this .fileHandlers = null;
1049:                this .uriErrors = null;
1050:                this .uriProcessing = null;
1051:                this .localErrors = null;
1052:                this .runtimeErrors = null;
1053:                this .progressStats = null;
1054:                this .reports = null;
1055:                this .manifest = null;
1056:
1057:                // Do cleanup.
1058:                this .statistics = null;
1059:                this .frontier = null;
1060:                this .disk = null;
1061:                this .scratchDisk = null;
1062:                this .order = null;
1063:                this .scope = null;
1064:                if (this .settingsHandler != null) {
1065:                    this .settingsHandler.cleanup();
1066:                }
1067:                this .settingsHandler = null;
1068:                this .reserveMemory = null;
1069:                this .processorChains = null;
1070:                if (this .serverCache != null) {
1071:                    this .serverCache.cleanup();
1072:                    this .serverCache = null;
1073:                }
1074:                if (this .checkpointer != null) {
1075:                    this .checkpointer.cleanup();
1076:                    this .checkpointer = null;
1077:                }
1078:                if (this .bdbEnvironment != null) {
1079:                    try {
1080:                        this .bdbEnvironment.sync();
1081:                        this .bdbEnvironment.close();
1082:                    } catch (DatabaseException e) {
1083:                        e.printStackTrace();
1084:                    }
1085:                    this .bdbEnvironment = null;
1086:                }
1087:                this .bigmaps = null;
1088:                if (this .toePool != null) {
1089:                    this .toePool.cleanup();
1090:                    // I played with launching a thread here to do cleanup of the
1091:                    // ToePool ThreadGroup (making sure the cleanup thread was not
1092:                    // in the ToePool ThreadGroup).  Did this because ToePools seemed
1093:                    // to be sticking around holding references to CrawlController at
1094:                    // least.  Need to spend more time looking to see that this is
1095:                    // still the case even after adding the above toePool#cleanup call.
1096:                }
1097:                this .toePool = null;
1098:                LOGGER.fine("Finished crawl.");
1099:            }
1100:
1101:            synchronized void completePause() {
1102:                // Send a notifyAll. At least checkpointing thread may be waiting on a
1103:                // complete pause.
1104:                notifyAll();
1105:                sendCrawlStateChangeEvent(PAUSED, CrawlJob.STATUS_PAUSED);
1106:            }
1107:
1108:            private boolean shouldContinueCrawling() {
1109:                if (frontier.isEmpty()) {
1110:                    this .sExit = CrawlJob.STATUS_FINISHED;
1111:                    return false;
1112:                }
1113:
1114:                if (maxBytes > 0 && frontier.totalBytesWritten() >= maxBytes) {
1115:                    // Hit the max byte download limit!
1116:                    sExit = CrawlJob.STATUS_FINISHED_DATA_LIMIT;
1117:                    return false;
1118:                } else if (maxDocument > 0
1119:                        && frontier.succeededFetchCount() >= maxDocument) {
1120:                    // Hit the max document download limit!
1121:                    this .sExit = CrawlJob.STATUS_FINISHED_DOCUMENT_LIMIT;
1122:                    return false;
1123:                } else if (maxTime > 0
1124:                        && statistics.crawlDuration() >= maxTime * 1000) {
1125:                    // Hit the max byte download limit!
1126:                    this .sExit = CrawlJob.STATUS_FINISHED_TIME_LIMIT;
1127:                    return false;
1128:                }
1129:                return state == RUNNING;
1130:            }
1131:
1132:            /**
1133:             * Request a checkpoint.
1134:             * Sets a checkpointing thread running.
1135:             * @throws IllegalStateException Thrown if crawl is not in paused state
1136:             * (Crawl must be first paused before checkpointing).
1137:             */
1138:            public synchronized void requestCrawlCheckpoint()
1139:                    throws IllegalStateException {
1140:                if (this .checkpointer == null) {
1141:                    return;
1142:                }
1143:                if (this .checkpointer.isCheckpointing()) {
1144:                    throw new IllegalStateException(
1145:                            "Checkpoint already running.");
1146:                }
1147:                this .checkpointer.checkpoint();
1148:            }
1149:
1150:            /**
1151:             * @return True if checkpointing.
1152:             */
1153:            public boolean isCheckpointing() {
1154:                return this .state == CHECKPOINTING;
1155:            }
1156:
1157:            /**
1158:             * Run checkpointing.
1159:             * CrawlController takes care of managing the checkpointing/serializing
1160:             * of bdb, the StatisticsTracker, and the CheckpointContext.  Other
1161:             * modules that want to revive themselves on checkpoint recovery need to
1162:             * save state during their {@link CrawlStatusListener#crawlCheckpoint(File)}
1163:             * invocation and then in their #initialize if a module,
1164:             * or in their #initialTask if a processor, check with the CrawlController
1165:             * if its checkpoint recovery. If it is, read in their old state from the
1166:             * pointed to  checkpoint directory.
1167:             * <p>Default access only to be called by Checkpointer.
1168:             * @throws Exception
1169:             */
1170:            void checkpoint() throws Exception {
1171:                // Tell registered listeners to checkpoint.
1172:                sendCheckpointEvent(this .checkpointer
1173:                        .getCheckpointInProgressDirectory());
1174:
1175:                // Rotate off crawler logs.
1176:                LOGGER.fine("Rotating log files.");
1177:                rotateLogFiles(CURRENT_LOG_SUFFIX + "."
1178:                        + this .checkpointer.getNextCheckpointName());
1179:
1180:                // Sync the BigMap contents to bdb, if their bdb bigmaps.
1181:                LOGGER.fine("BigMaps.");
1182:                checkpointBigMaps(this .checkpointer
1183:                        .getCheckpointInProgressDirectory());
1184:
1185:                // Note, on deserialization, the super CrawlType#parent
1186:                // needs to be restored. Parent is '/crawl-order/loggers'.
1187:                // The settings handler for this module also needs to be
1188:                // restored. Both of these fields are private in the
1189:                // super class. Adding the restored ST to crawl order should take
1190:                // care of this.
1191:
1192:                // Checkpoint bdb environment.
1193:                LOGGER.fine("Bdb environment.");
1194:                checkpointBdb(this .checkpointer
1195:                        .getCheckpointInProgressDirectory());
1196:
1197:                // Make copy of order, seeds, and settings.
1198:                LOGGER.fine("Copying settings.");
1199:                copySettings(this .checkpointer
1200:                        .getCheckpointInProgressDirectory());
1201:
1202:                // Checkpoint this crawlcontroller.
1203:                CheckpointUtils.writeObjectToFile(this , this .checkpointer
1204:                        .getCheckpointInProgressDirectory());
1205:            }
1206:
1207:            /**
1208:             * Copy off the settings.
1209:             * @param checkpointDir Directory to write checkpoint to.
1210:             * @throws IOException 
1211:             */
1212:            protected void copySettings(final File checkpointDir)
1213:                    throws IOException {
1214:                final List files = this .settingsHandler.getListOfAllFiles();
1215:                boolean copiedSettingsDir = false;
1216:                final File settingsDir = new File(this .disk, "settings");
1217:                for (final Iterator i = files.iterator(); i.hasNext();) {
1218:                    File f = new File((String) i.next());
1219:                    if (f.getAbsolutePath().startsWith(
1220:                            settingsDir.getAbsolutePath())) {
1221:                        if (copiedSettingsDir) {
1222:                            // Skip.  We've already copied this member of the
1223:                            // settings directory.
1224:                            continue;
1225:                        }
1226:                        // Copy 'settings' dir all in one lump, not a file at a time.
1227:                        copiedSettingsDir = true;
1228:                        FileUtils.copyFiles(settingsDir, new File(
1229:                                checkpointDir, settingsDir.getName()));
1230:                        continue;
1231:                    }
1232:                    FileUtils.copyFiles(f, f.isDirectory() ? checkpointDir
1233:                            : new File(checkpointDir, f.getName()));
1234:                }
1235:            }
1236:
1237:            /**
1238:             * Checkpoint bdb.
1239:             * I used do a call to log cleaning as suggested in je-2.0 javadoc but takes
1240:             * way too much time (20minutes for a crawl of 1million items). Assume
1241:             * cleaner is keeping up. Below was log cleaning loop .
1242:             * <pre>int totalCleaned = 0;
1243:             * for (int cleaned = 0; (cleaned = this.bdbEnvironment.cleanLog()) != 0;
1244:             *  totalCleaned += cleaned) {
1245:             *      LOGGER.fine("Cleaned " + cleaned + " log files.");
1246:             * }
1247:             * </pre>
1248:             * <p>I also used to do a sync. But, from Mark Hayes, sync and checkpoint
1249:             * are effectively same thing only sync is not configurable.  He suggests
1250:             * doing one or the other:
1251:             * <p>MS: Reading code, Environment.sync() is a checkpoint.  Looks like
1252:             * I don't need to call a checkpoint after calling a sync?
1253:             * <p>MH: Right, they're almost the same thing -- just do one or the other,
1254:             * not both.  With the new API, you'll need to do a checkpoint not a
1255:             * sync, because the sync() method has no config parameter.  Don't worry
1256:             * -- it's fine to do a checkpoint even though you're not using.
1257:             * @param checkpointDir Directory to write checkpoint to.
1258:             * @throws DatabaseException 
1259:             * @throws IOException 
1260:             * @throws RuntimeException Thrown if failed setup of new bdb environment.
1261:             */
1262:            protected void checkpointBdb(File checkpointDir)
1263:                    throws DatabaseException, IOException, RuntimeException {
1264:                EnvironmentConfig envConfig = this .bdbEnvironment.getConfig();
1265:                final List bkgrdThreads = Arrays.asList(new String[] {
1266:                        "je.env.runCheckpointer", "je.env.runCleaner",
1267:                        "je.env.runINCompressor" });
1268:                try {
1269:                    // Disable background threads
1270:                    setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "false");
1271:                    // Do a force checkpoint.  Thats what a sync does (i.e. doSync).
1272:                    CheckpointConfig chkptConfig = new CheckpointConfig();
1273:                    chkptConfig.setForce(true);
1274:
1275:                    // Mark Hayes of sleepycat says:
1276:                    // "The default for this property is false, which gives the current
1277:                    // behavior (allow deltas).  If this property is true, deltas are
1278:                    // prohibited -- full versions of internal nodes are always logged
1279:                    // during the checkpoint. When a full version of an internal node
1280:                    // is logged during a checkpoint, recovery does not need to process
1281:                    // it at all.  It is only fetched if needed by the application,
1282:                    // during normal DB operations after recovery. When a delta of an
1283:                    // internal node is logged during a checkpoint, recovery must
1284:                    // process it by fetching the full version of the node from earlier
1285:                    // in the log, and then applying the delta to it.  This can be
1286:                    // pretty slow, since it is potentially a large amount of
1287:                    // random I/O."
1288:                    chkptConfig.setMinimizeRecoveryTime(true);
1289:                    this .bdbEnvironment.checkpoint(chkptConfig);
1290:                    LOGGER.fine("Finished bdb checkpoint.");
1291:
1292:                    // From the sleepycat folks: A trick for flipping db logs.
1293:                    EnvironmentImpl envImpl = DbInternal
1294:                            .envGetEnvironmentImpl(this .bdbEnvironment);
1295:                    long firstFileInNextSet = DbLsn.getFileNumber(envImpl
1296:                            .forceLogFileFlip());
1297:                    // So the last file in the checkpoint is firstFileInNextSet - 1.
1298:                    // Write manifest of all log files into the bdb directory.
1299:                    final String lastBdbCheckpointLog = getBdbLogFileName(firstFileInNextSet - 1);
1300:                    processBdbLogs(checkpointDir, lastBdbCheckpointLog);
1301:                    LOGGER.fine("Finished processing bdb log files.");
1302:                } finally {
1303:                    // Restore background threads.
1304:                    setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "true");
1305:                }
1306:            }
1307:
1308:            protected void processBdbLogs(final File checkpointDir,
1309:                    final String lastBdbCheckpointLog) throws IOException {
1310:                File bdbDir = CheckpointUtils.getBdbSubDirectory(checkpointDir);
1311:                if (!bdbDir.exists()) {
1312:                    bdbDir.mkdir();
1313:                }
1314:                PrintWriter pw = new PrintWriter(new FileOutputStream(new File(
1315:                        checkpointDir, "bdbje-logs-manifest.txt")));
1316:                try {
1317:                    // Don't copy any beyond the last bdb log file (bdbje can keep
1318:                    // writing logs after checkpoint).
1319:                    boolean pastLastLogFile = false;
1320:                    Set<String> srcFilenames = null;
1321:                    final boolean copyFiles = getCheckpointCopyBdbjeLogs();
1322:                    do {
1323:                        FilenameFilter filter = CheckpointUtils
1324:                                .getJeLogsFilter();
1325:                        srcFilenames = new HashSet<String>(Arrays
1326:                                .asList(getStateDisk().list(filter)));
1327:                        List tgtFilenames = Arrays.asList(bdbDir.list(filter));
1328:                        if (tgtFilenames != null && tgtFilenames.size() > 0) {
1329:                            srcFilenames.removeAll(tgtFilenames);
1330:                        }
1331:                        if (srcFilenames.size() > 0) {
1332:                            // Sort files.
1333:                            srcFilenames = new TreeSet<String>(srcFilenames);
1334:                            int count = 0;
1335:                            for (final Iterator i = srcFilenames.iterator(); i
1336:                                    .hasNext()
1337:                                    && !pastLastLogFile;) {
1338:                                String name = (String) i.next();
1339:                                if (copyFiles) {
1340:                                    FileUtils.copyFiles(new File(
1341:                                            getStateDisk(), name), new File(
1342:                                            bdbDir, name));
1343:                                }
1344:                                pw.println(name);
1345:                                if (name.equals(lastBdbCheckpointLog)) {
1346:                                    // We're done.
1347:                                    pastLastLogFile = true;
1348:                                }
1349:                                count++;
1350:                            }
1351:                            if (LOGGER.isLoggable(Level.FINE)) {
1352:                                LOGGER.fine("Copied " + count);
1353:                            }
1354:                        }
1355:                    } while (!pastLastLogFile && srcFilenames != null
1356:                            && srcFilenames.size() > 0);
1357:                } finally {
1358:                    pw.close();
1359:                }
1360:            }
1361:
1362:            protected String getBdbLogFileName(final long index) {
1363:                String lastBdbLogFileHex = Long.toHexString(index);
1364:                StringBuffer buffer = new StringBuffer();
1365:                for (int i = 0; i < (8 - lastBdbLogFileHex.length()); i++) {
1366:                    buffer.append('0');
1367:                }
1368:                buffer.append(lastBdbLogFileHex);
1369:                buffer.append(".jdb");
1370:                return buffer.toString();
1371:            }
1372:
1373:            protected void setBdbjeBkgrdThreads(final EnvironmentConfig config,
1374:                    final List threads, final String setting) {
1375:                for (final Iterator i = threads.iterator(); i.hasNext();) {
1376:                    config.setConfigParam((String) i.next(), setting);
1377:                }
1378:            }
1379:
1380:            /**
1381:             * Get recover checkpoint.
1382:             * Returns null if we're NOT in recover mode.
1383:             * Looks at ATTR_RECOVER_PATH and if its a directory, assumes checkpoint
1384:             * recover. If checkpoint mode, returns Checkpoint instance if
1385:             * checkpoint was VALID (else null).
1386:             * @return Checkpoint instance if we're in recover checkpoint
1387:             * mode and the pointed-to checkpoint was valid.
1388:             * @see #isCheckpointRecover()
1389:             */
1390:            public synchronized Checkpoint getCheckpointRecover() {
1391:                if (this .checkpointRecover != null) {
1392:                    return this .checkpointRecover;
1393:                }
1394:                return getCheckpointRecover(this .order);
1395:            }
1396:
1397:            public static Checkpoint getCheckpointRecover(final CrawlOrder order) {
1398:                String path = (String) order.getUncheckedAttribute(null,
1399:                        CrawlOrder.ATTR_RECOVER_PATH);
1400:                if (path == null || path.length() <= 0) {
1401:                    return null;
1402:                }
1403:                File rp = new File(path);
1404:                // Assume if path is to a directory, its a checkpoint recover.
1405:                Checkpoint result = null;
1406:                if (rp.exists() && rp.isDirectory()) {
1407:                    Checkpoint cp = new Checkpoint(rp);
1408:                    if (cp.isValid()) {
1409:                        // if valid, set as result.
1410:                        result = cp;
1411:                    }
1412:                }
1413:                return result;
1414:            }
1415:
1416:            public static boolean isCheckpointRecover(final CrawlOrder order) {
1417:                return getCheckpointRecover(order) != null;
1418:            }
1419:
1420:            /**
1421:             * @return True if we're in checkpoint recover mode. Call
1422:             * {@link #getCheckpointRecover()} to get at Checkpoint instance
1423:             * that has info on checkpoint directory being recovered from.
1424:             */
1425:            public boolean isCheckpointRecover() {
1426:                return this .checkpointRecover != null;
1427:            }
1428:
1429:            /**
1430:             * Operator requested for crawl to stop.
1431:             */
1432:            public synchronized void requestCrawlStop() {
1433:                requestCrawlStop(CrawlJob.STATUS_ABORTED);
1434:            }
1435:
1436:            /**
1437:             * Operator requested for crawl to stop.
1438:             * @param message 
1439:             */
1440:            public synchronized void requestCrawlStop(String message) {
1441:                if (state == STOPPING || state == FINISHED) {
1442:                    return;
1443:                }
1444:                if (message == null) {
1445:                    throw new IllegalArgumentException(
1446:                            "Message cannot be null.");
1447:                }
1448:                this .sExit = message;
1449:                beginCrawlStop();
1450:            }
1451:
1452:            /**
1453:             * Start the process of stopping the crawl. 
1454:             */
1455:            public void beginCrawlStop() {
1456:                LOGGER.fine("Started.");
1457:                sendCrawlStateChangeEvent(STOPPING, this .sExit);
1458:                if (this .frontier != null) {
1459:                    this .frontier.terminate();
1460:                    this .frontier.unpause();
1461:                }
1462:                LOGGER.fine("Finished.");
1463:            }
1464:
1465:            /**
1466:             * Stop the crawl temporarly.
1467:             */
1468:            public synchronized void requestCrawlPause() {
1469:                if (state == PAUSING || state == PAUSED) {
1470:                    // Already about to pause
1471:                    return;
1472:                }
1473:                sExit = CrawlJob.STATUS_WAITING_FOR_PAUSE;
1474:                frontier.pause();
1475:                sendCrawlStateChangeEvent(PAUSING, this .sExit);
1476:                if (toePool.getActiveToeCount() == 0) {
1477:                    // if all threads already held, complete pause now
1478:                    // (no chance to trigger off later held thread)
1479:                    completePause();
1480:                }
1481:            }
1482:
1483:            /**
1484:             * Tell if the controller is paused
1485:             * @return true if paused
1486:             */
1487:            public boolean isPaused() {
1488:                return state == PAUSED;
1489:            }
1490:
1491:            public boolean isPausing() {
1492:                return state == PAUSING;
1493:            }
1494:
1495:            public boolean isRunning() {
1496:                return state == RUNNING;
1497:            }
1498:
1499:            /**
1500:             * Resume crawl from paused state
1501:             */
1502:            public synchronized void requestCrawlResume() {
1503:                if (state != PAUSING && state != PAUSED
1504:                        && state != CHECKPOINTING) {
1505:                    // Can't resume if not been told to pause or if we're in middle of
1506:                    // a checkpoint.
1507:                    return;
1508:                }
1509:                multiThreadMode();
1510:                frontier.unpause();
1511:                LOGGER.fine("Crawl resumed.");
1512:                sendCrawlStateChangeEvent(RUNNING, CrawlJob.STATUS_RUNNING);
1513:            }
1514:
1515:            /**
1516:             * @return Active toe thread count.
1517:             */
1518:            public int getActiveToeCount() {
1519:                if (toePool == null) {
1520:                    return 0;
1521:                }
1522:                return toePool.getActiveToeCount();
1523:            }
1524:
1525:            private void setupToePool() {
1526:                toePool = new ToePool(this );
1527:                // TODO: make # of toes self-optimizing
1528:                toePool.setSize(order.getMaxToes());
1529:            }
1530:
1531:            /**
1532:             * @return The order file instance.
1533:             */
1534:            public CrawlOrder getOrder() {
1535:                return order;
1536:            }
1537:
1538:            /**
1539:             * @return The server cache instance.
1540:             */
1541:            public ServerCache getServerCache() {
1542:                return serverCache;
1543:            }
1544:
1545:            /**
1546:             * @param o
1547:             */
1548:            public void setOrder(CrawlOrder o) {
1549:                order = o;
1550:            }
1551:
1552:            /**
1553:             * @return The frontier.
1554:             */
1555:            public Frontier getFrontier() {
1556:                return frontier;
1557:            }
1558:
1559:            /**
1560:             * @return This crawl scope.
1561:             */
1562:            public CrawlScope getScope() {
1563:                return scope;
1564:            }
1565:
1566:            /** Get the list of processor chains.
1567:             *
1568:             * @return the list of processor chains.
1569:             */
1570:            public ProcessorChainList getProcessorChainList() {
1571:                return processorChains;
1572:            }
1573:
1574:            /** Get the first processor chain.
1575:             *
1576:             * @return the first processor chain.
1577:             */
1578:            public ProcessorChain getFirstProcessorChain() {
1579:                return processorChains.getFirstChain();
1580:            }
1581:
1582:            /** Get the postprocessor chain.
1583:             *
1584:             * @return the postprocessor chain.
1585:             */
1586:            public ProcessorChain getPostprocessorChain() {
1587:                return processorChains.getLastChain();
1588:            }
1589:
1590:            /**
1591:             * Get the 'working' directory of the current crawl.
1592:             * @return the 'working' directory of the current crawl.
1593:             */
1594:            public File getDisk() {
1595:                return disk;
1596:            }
1597:
1598:            /**
1599:             * @return Scratch disk location.
1600:             */
1601:            public File getScratchDisk() {
1602:                return scratchDisk;
1603:            }
1604:
1605:            /**
1606:             * @return State disk location.
1607:             */
1608:            public File getStateDisk() {
1609:                return stateDisk;
1610:            }
1611:
1612:            /**
1613:             * @return The number of ToeThreads
1614:             *
1615:             * @see ToePool#getToeCount()
1616:             */
1617:            public int getToeCount() {
1618:                return this .toePool == null ? 0 : this .toePool.getToeCount();
1619:            }
1620:
1621:            /**
1622:             * @return The ToePool
1623:             */
1624:            public ToePool getToePool() {
1625:                return toePool;
1626:            }
1627:
1628:            /**
1629:             * @return toepool one-line report
1630:             */
1631:            public String oneLineReportThreads() {
1632:                // TODO Auto-generated method stub
1633:                return toePool.singleLineReport();
1634:            }
1635:
1636:            /**
1637:             * While many settings will update automatically when the SettingsHandler is
1638:             * modified, some settings need to be explicitly changed to reflect new
1639:             * settings. This includes, number of toe threads and seeds.
1640:             */
1641:            public void kickUpdate() {
1642:                toePool.setSize(order.getMaxToes());
1643:
1644:                this .scope.kickUpdate();
1645:                this .frontier.kickUpdate();
1646:                this .processorChains.kickUpdate();
1647:
1648:                // TODO: continue to generalize this, so that any major 
1649:                // component can get a kick when it may need to refresh its data
1650:
1651:                setThresholds();
1652:            }
1653:
1654:            /**
1655:             * @return The settings handler.
1656:             */
1657:            public SettingsHandler getSettingsHandler() {
1658:                return settingsHandler;
1659:            }
1660:
1661:            /**
1662:             * This method iterates through processor chains to run processors' initial
1663:             * tasks.
1664:             *
1665:             */
1666:            private void runProcessorInitialTasks() {
1667:                for (Iterator ic = processorChains.iterator(); ic.hasNext();) {
1668:                    for (Iterator ip = ((ProcessorChain) ic.next()).iterator(); ip
1669:                            .hasNext();) {
1670:                        ((Processor) ip.next()).initialTasks();
1671:                    }
1672:                }
1673:            }
1674:
1675:            /**
1676:             * This method iterates through processor chains to run processors' final
1677:             * tasks.
1678:             *
1679:             */
1680:            private void runProcessorFinalTasks() {
1681:                for (Iterator ic = processorChains.iterator(); ic.hasNext();) {
1682:                    for (Iterator ip = ((ProcessorChain) ic.next()).iterator(); ip
1683:                            .hasNext();) {
1684:                        ((Processor) ip.next()).finalTasks();
1685:                    }
1686:                }
1687:            }
1688:
1689:            /**
1690:             * Kills a thread. For details see
1691:             * {@link org.archive.crawler.framework.ToePool#killThread(int, boolean)
1692:             * ToePool.killThread(int, boolean)}.
1693:             * @param threadNumber Thread to kill.
1694:             * @param replace Should thread be replaced.
1695:             * @see org.archive.crawler.framework.ToePool#killThread(int, boolean)
1696:             */
1697:            public void killThread(int threadNumber, boolean replace) {
1698:                toePool.killThread(threadNumber, replace);
1699:            }
1700:
1701:            /**
1702:             * Add a file to the manifest of files used/generated by the current
1703:             * crawl.
1704:             * 
1705:             * TODO: Its possible for a file to be added twice if reports are
1706:             * force generated midcrawl.  Fix.
1707:             *
1708:             * @param file The filename (with absolute path) of the file to add
1709:             * @param type The type of the file
1710:             * @param bundle Should the file be included in a typical bundling of
1711:             *           crawler files.
1712:             *
1713:             * @see #MANIFEST_CONFIG_FILE
1714:             * @see #MANIFEST_LOG_FILE
1715:             * @see #MANIFEST_REPORT_FILE
1716:             */
1717:            public void addToManifest(String file, char type, boolean bundle) {
1718:                manifest
1719:                        .append(type + (bundle ? "+" : "-") + " " + file + "\n");
1720:            }
1721:
1722:            /**
1723:             * Evaluate if the crawl should stop because it is finished.
1724:             */
1725:            public void checkFinish() {
1726:                if (atFinish()) {
1727:                    beginCrawlStop();
1728:                }
1729:            }
1730:
1731:            /**
1732:             * Evaluate if the crawl should stop because it is finished,
1733:             * without actually stopping the crawl.
1734:             * 
1735:             * @return true if crawl is at a finish-possible state
1736:             */
1737:            public boolean atFinish() {
1738:                return state == RUNNING && !shouldContinueCrawling();
1739:            }
1740:
1741:            private void readObject(ObjectInputStream stream)
1742:                    throws IOException, ClassNotFoundException {
1743:                stream.defaultReadObject();
1744:                // Setup status listeners
1745:                this .registeredCrawlStatusListeners = Collections
1746:                        .synchronizedList(new ArrayList<CrawlStatusListener>());
1747:                // Ensure no holdover singleThreadMode
1748:                singleThreadMode = false;
1749:            }
1750:
1751:            /**
1752:             * Go to single thread mode, where only one ToeThread may
1753:             * proceed at a time. Also acquires the single lock, so 
1754:             * no further threads will proceed past an 
1755:             * acquireContinuePermission. Caller mush be sure to release
1756:             * lock to allow other threads to proceed one at a time. 
1757:             */
1758:            public void singleThreadMode() {
1759:                this .singleThreadLock.lock();
1760:                singleThreadMode = true;
1761:            }
1762:
1763:            /**
1764:             * Go to back to regular multi thread mode, where all
1765:             * ToeThreads may proceed at once
1766:             */
1767:            public void multiThreadMode() {
1768:                this .singleThreadLock.lock();
1769:                singleThreadMode = false;
1770:                while (this .singleThreadLock.isHeldByCurrentThread()) {
1771:                    this .singleThreadLock.unlock();
1772:                }
1773:            }
1774:
1775:            /**
1776:             * Proceed only if allowed, giving CrawlController a chance
1777:             * to enforce single-thread mode.
1778:             */
1779:            public void acquireContinuePermission() {
1780:                if (singleThreadMode) {
1781:                    this .singleThreadLock.lock();
1782:                    if (!singleThreadMode) {
1783:                        // If changed while waiting, ignore
1784:                        while (this .singleThreadLock.isHeldByCurrentThread()) {
1785:                            this .singleThreadLock.unlock();
1786:                        }
1787:                    }
1788:                } // else, permission is automatic
1789:            }
1790:
1791:            /**
1792:             * Relinquish continue permission at end of processing (allowing
1793:             * another thread to proceed if in single-thread mode). 
1794:             */
1795:            public void releaseContinuePermission() {
1796:                if (singleThreadMode) {
1797:                    while (this .singleThreadLock.isHeldByCurrentThread()) {
1798:                        this .singleThreadLock.unlock();
1799:                    }
1800:                } // else do nothing; 
1801:            }
1802:
1803:            public void freeReserveMemory() {
1804:                if (!reserveMemory.isEmpty()) {
1805:                    reserveMemory.removeLast();
1806:                    System.gc();
1807:                }
1808:            }
1809:
1810:            /**
1811:             * Note that a ToeThread reached paused condition, possibly
1812:             * completing the crawl-pause. 
1813:             */
1814:            public synchronized void toePaused() {
1815:                releaseContinuePermission();
1816:                if (state == PAUSING && toePool.getActiveToeCount() == 0) {
1817:                    completePause();
1818:                }
1819:            }
1820:
1821:            /**
1822:             * Note that a ToeThread ended, possibly completing the crawl-stop. 
1823:             */
1824:            public synchronized void toeEnded() {
1825:                if (state == STOPPING && toePool.getActiveToeCount() == 0) {
1826:                    completeStop();
1827:                }
1828:            }
1829:
1830:            /**
1831:             * Add order file contents to manifest.
1832:             * Write configuration files and any files managed by CrawlController to
1833:             * it - files managed by other classes, excluding the settings framework,
1834:             * are responsible for adding their files to the manifest themselves.
1835:             * by calling addToManifest.
1836:             * Call before writing out reports.
1837:             */
1838:            public void addOrderToManifest() {
1839:                for (Iterator it = getSettingsHandler().getListOfAllFiles()
1840:                        .iterator(); it.hasNext();) {
1841:                    addToManifest((String) it.next(),
1842:                            CrawlController.MANIFEST_CONFIG_FILE, true);
1843:                }
1844:            }
1845:
1846:            /**
1847:             * Log a URIException from deep inside other components to the crawl's
1848:             * shared log. 
1849:             * 
1850:             * @param e URIException encountered
1851:             * @param u CrawlURI where problem occurred
1852:             * @param l String which could not be interpreted as URI without exception
1853:             */
1854:            public void logUriError(URIException e, UURI u, CharSequence l) {
1855:                if (e.getReasonCode() == UURIFactory.IGNORED_SCHEME) {
1856:                    // don't log those that are intentionally ignored
1857:                    return;
1858:                }
1859:                Object[] array = { u, l };
1860:                uriErrors.log(Level.INFO, e.getMessage(), array);
1861:            }
1862:
1863:            // 
1864:            // Reporter
1865:            //
1866:            public final static String PROCESSORS_REPORT = "processors";
1867:            public final static String MANIFEST_REPORT = "manifest";
1868:            protected final static String[] REPORTS = { PROCESSORS_REPORT,
1869:                    MANIFEST_REPORT };
1870:
1871:            /* (non-Javadoc)
1872:             * @see org.archive.util.Reporter#getReports()
1873:             */
1874:            public String[] getReports() {
1875:                return REPORTS;
1876:            }
1877:
1878:            /* (non-Javadoc)
1879:             * @see org.archive.util.Reporter#reportTo(java.io.Writer)
1880:             */
1881:            public void reportTo(PrintWriter writer) {
1882:                reportTo(null, writer);
1883:            }
1884:
1885:            public String singleLineReport() {
1886:                return ArchiveUtils.singleLineReport(this );
1887:            }
1888:
1889:            public void reportTo(String name, PrintWriter writer) {
1890:                if (PROCESSORS_REPORT.equals(name)) {
1891:                    reportProcessorsTo(writer);
1892:                    return;
1893:                } else if (MANIFEST_REPORT.equals(name)) {
1894:                    reportManifestTo(writer);
1895:                    return;
1896:                } else if (name != null) {
1897:                    writer.println("requested report unknown: " + name);
1898:                }
1899:                singleLineReportTo(writer);
1900:            }
1901:
1902:            /**
1903:             * @param writer Where to write report to.
1904:             */
1905:            protected void reportManifestTo(PrintWriter writer) {
1906:                writer.print(manifest.toString());
1907:            }
1908:
1909:            /**
1910:             * Compiles and returns a human readable report on the active processors.
1911:             * @param writer Where to write to.
1912:             * @see org.archive.crawler.framework.Processor#report()
1913:             */
1914:            protected void reportProcessorsTo(PrintWriter writer) {
1915:                writer.print("Processors report - "
1916:                        + ArchiveUtils.get12DigitDate() + "\n");
1917:                writer.print("  Job being crawled:    "
1918:                        + getOrder().getCrawlOrderName() + "\n");
1919:
1920:                writer.print("  Number of Processors: "
1921:                        + processorChains.processorCount() + "\n");
1922:                writer
1923:                        .print("  NOTE: Some processors may not return a report!\n\n");
1924:
1925:                for (Iterator ic = processorChains.iterator(); ic.hasNext();) {
1926:                    for (Iterator ip = ((ProcessorChain) ic.next()).iterator(); ip
1927:                            .hasNext();) {
1928:                        writer.print(((Processor) ip.next()).report());
1929:                    }
1930:                }
1931:            }
1932:
1933:            public void singleLineReportTo(PrintWriter writer) {
1934:                // TODO: imrpvoe to be summary of crawl state
1935:                writer.write("[Crawl Controller]\n");
1936:            }
1937:
1938:            public String singleLineLegend() {
1939:                // TODO improve
1940:                return "nothingYet";
1941:            }
1942:
1943:            /**
1944:             * Call this method to get instance of the crawler BigMap implementation.
1945:             * A "BigMap" is a Map that knows how to manage ever-growing sets of
1946:             * key/value pairs. If we're in a checkpoint recovery, this method will
1947:             * manage reinstantiation of checkpointed bigmaps.
1948:             * @param dbName Name to give any associated database.  Also used
1949:             * as part of name serializing out bigmap.  Needs to be unique to a crawl.
1950:             * @param keyClass Class of keys we'll be using.
1951:             * @param valueClass Class of values we'll be using.
1952:             * @return Map that knows how to carry large sets of key/value pairs or
1953:             * if none available, returns instance of HashMap.
1954:             * @throws Exception
1955:             */
1956:            public <K, V> Map<K, V> getBigMap(final String dbName,
1957:                    final Class<? super  K> keyClass,
1958:                    final Class<? super  V> valueClass) throws Exception {
1959:                CachedBdbMap<K, V> result = new CachedBdbMap<K, V>(dbName);
1960:                if (isCheckpointRecover()) {
1961:                    File baseDir = getCheckpointRecover().getDirectory();
1962:                    @SuppressWarnings("unchecked")
1963:                    CachedBdbMap<K, V> temp = CheckpointUtils
1964:                            .readObjectFromFile(result.getClass(), dbName,
1965:                                    baseDir);
1966:                    result = temp;
1967:                }
1968:                result.initialize(getBdbEnvironment(), keyClass, valueClass,
1969:                        getBdbEnvironment().getClassCatalog());
1970:                // Save reference to all big maps made so can manage their
1971:                // checkpointing.
1972:                this .bigmaps.put(dbName, result);
1973:                return result;
1974:            }
1975:
1976:            protected void checkpointBigMaps(final File cpDir) throws Exception {
1977:                for (final Iterator i = this .bigmaps.keySet().iterator(); i
1978:                        .hasNext();) {
1979:                    Object key = i.next();
1980:                    Object obj = this .bigmaps.get(key);
1981:                    // TODO: I tried adding sync to custom serialization of BigMap
1982:                    // implementation but data member counts of the BigMap
1983:                    // implementation were not being persisted properly.  Look at
1984:                    // why.  For now, do sync in advance of serialization for now.
1985:                    ((CachedBdbMap) obj).sync();
1986:                    CheckpointUtils.writeObjectToFile(obj, (String) key, cpDir);
1987:                }
1988:            }
1989:
1990:            /**
1991:             * Called whenever progress statistics logging event.
1992:             * @param e Progress statistics event.
1993:             */
1994:            public void progressStatisticsEvent(final EventObject e) {
1995:                // Default is to do nothing.  Subclass if you want to catch this event.
1996:                // Later, if demand, add publisher/listener support.  Currently hacked
1997:                // in so the subclass in CrawlJob added to support JMX can send
1998:                // notifications of progressStatistics change.
1999:            }
2000:
2001:            /**
2002:             * Log to the progress statistics log.
2003:             * @param msg Message to write the progress statistics log.
2004:             */
2005:            public void logProgressStatistics(final String msg) {
2006:                this .progressStats.info(msg);
2007:            }
2008:
2009:            /**
2010:             * @return CrawlController state.
2011:             */
2012:            public Object getState() {
2013:                return this .state;
2014:            }
2015:
2016:            public File getCheckpointsDisk() {
2017:                return this.checkpointsDisk;
2018:            }
2019:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.