001: /* BdbFrontier
002: *
003: * $Id: BdbFrontier.java 4917 2007-02-20 22:15:23Z gojomo $
004: *
005: * Created on Sep 24, 2004
006: *
007: * Copyright (C) 2004 Internet Archive.
008: *
009: * This file is part of the Heritrix web crawler (crawler.archive.org).
010: *
011: * Heritrix is free software; you can redistribute it and/or modify
012: * it under the terms of the GNU Lesser Public License as published by
013: * the Free Software Foundation; either version 2.1 of the License, or
014: * any later version.
015: *
016: * Heritrix is distributed in the hope that it will be useful,
017: * but WITHOUT ANY WARRANTY; without even the implied warranty of
018: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
019: * GNU Lesser Public License for more details.
020: *
021: * You should have received a copy of the GNU Lesser Public License
022: * along with Heritrix; if not, write to the Free Software
023: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
024: *
025: */
026: package org.archive.crawler.frontier;
027:
028: import java.io.File;
029: import java.io.FileNotFoundException;
030: import java.io.IOException;
031: import java.io.Serializable;
032: import java.util.ArrayList;
033: import java.util.Iterator;
034: import java.util.List;
035: import java.util.logging.Logger;
036:
037: import javax.management.AttributeNotFoundException;
038:
039: import org.archive.crawler.datamodel.CrawlURI;
040: import org.archive.crawler.datamodel.UriUniqFilter;
041: import org.archive.crawler.framework.CrawlController;
042: import org.archive.crawler.framework.FrontierMarker;
043: import org.archive.crawler.framework.exceptions.FatalConfigurationException;
044: import org.archive.crawler.settings.SimpleType;
045: import org.archive.crawler.settings.Type;
046: import org.archive.crawler.util.BdbUriUniqFilter;
047: import org.archive.crawler.util.BloomUriUniqFilter;
048: import org.archive.crawler.util.CheckpointUtils;
049: import org.archive.crawler.util.DiskFPMergeUriUniqFilter;
050: import org.archive.crawler.util.MemFPMergeUriUniqFilter;
051: import org.archive.util.ArchiveUtils;
052:
053: import com.sleepycat.je.DatabaseException;
054:
055: /**
056: * A Frontier using several BerkeleyDB JE Databases to hold its record of
057: * known hosts (queues), and pending URIs.
058: *
059: * @author Gordon Mohr
060: */
061: public class BdbFrontier extends WorkQueueFrontier implements
062: Serializable {
063: // be robust against trivial implementation changes
064: private static final long serialVersionUID = ArchiveUtils
065: .classnameBasedUID(BdbFrontier.class, 1);
066:
067: private static final Logger logger = Logger
068: .getLogger(BdbFrontier.class.getName());
069:
070: /** all URIs scheduled to be crawled */
071: protected transient BdbMultipleWorkQueues pendingUris;
072:
073: /** all URI-already-included options available to be chosen */
074: private String[] AVAILABLE_INCLUDED_OPTIONS = new String[] {
075: BdbUriUniqFilter.class.getName(),
076: BloomUriUniqFilter.class.getName(),
077: MemFPMergeUriUniqFilter.class.getName(),
078: DiskFPMergeUriUniqFilter.class.getName() };
079:
080: /** URI-already-included to use (by class name) */
081: public final static String ATTR_INCLUDED = "uri-included-structure";
082:
083: private final static String DEFAULT_INCLUDED = BdbUriUniqFilter.class
084: .getName();
085:
086: /**
087: * Constructor.
088: * @param name Name for of this Frontier.
089: */
090: public BdbFrontier(String name) {
091: this (
092: name,
093: "BdbFrontier. "
094: + "A Frontier using BerkeleyDB Java Edition databases for "
095: + "persistence to disk.");
096: Type t = addElementToDefinition(new SimpleType(
097: ATTR_INCLUDED,
098: "Structure to use for tracking already-seen URIs. Non-default "
099: + "options may require additional configuration via system "
100: + "properties.", DEFAULT_INCLUDED,
101: AVAILABLE_INCLUDED_OPTIONS));
102: t.setExpertSetting(true);
103: }
104:
105: /**
106: * Create the BdbFrontier
107: *
108: * @param name
109: * @param description
110: */
111: public BdbFrontier(String name, String description) {
112: super (name, description);
113: }
114:
115: /**
116: * Create the single object (within which is one BDB database)
117: * inside which all the other queues live.
118: *
119: * @return the created BdbMultipleWorkQueues
120: * @throws DatabaseException
121: */
122: private BdbMultipleWorkQueues createMultipleWorkQueues()
123: throws DatabaseException {
124: return new BdbMultipleWorkQueues(this .controller
125: .getBdbEnvironment(), this .controller
126: .getBdbEnvironment().getClassCatalog(), this .controller
127: .isCheckpointRecover());
128: }
129:
130: /**
131: * Create a UriUniqFilter that will serve as record
132: * of already seen URIs.
133: *
134: * @return A UURISet that will serve as a record of already seen URIs
135: * @throws IOException
136: */
137: protected UriUniqFilter createAlreadyIncluded() throws IOException {
138: UriUniqFilter uuf;
139: String c = null;
140: try {
141: c = (String) getAttribute(null, ATTR_INCLUDED);
142: } catch (AttributeNotFoundException e) {
143: // Do default action if attribute not in order.
144: }
145: // TODO: avoid all this special-casing; enable some common
146: // constructor interface usable for all alt implemenations
147: if (c != null && c.equals(BloomUriUniqFilter.class.getName())) {
148: uuf = this .controller.isCheckpointRecover() ? deserializeAlreadySeen(
149: BloomUriUniqFilter.class, this .controller
150: .getCheckpointRecover().getDirectory())
151: : new BloomUriUniqFilter();
152: } else if (c != null
153: && c.equals(MemFPMergeUriUniqFilter.class.getName())) {
154: // TODO: add checkpointing for MemFPMergeUriUniqFilter
155: uuf = new MemFPMergeUriUniqFilter();
156: } else if (c != null
157: && c.equals(DiskFPMergeUriUniqFilter.class.getName())) {
158: // TODO: add checkpointing for DiskFPMergeUriUniqFilter
159: uuf = new DiskFPMergeUriUniqFilter(controller
160: .getScratchDisk());
161: } else {
162: // Assume its BdbUriUniqFilter.
163: uuf = this .controller.isCheckpointRecover() ? deserializeAlreadySeen(
164: BdbUriUniqFilter.class, this .controller
165: .getCheckpointRecover().getDirectory())
166: : new BdbUriUniqFilter(this .controller
167: .getBdbEnvironment());
168: if (this .controller.isCheckpointRecover()) {
169: // If recover, need to call reopen of the db.
170: try {
171: ((BdbUriUniqFilter) uuf).reopen(this .controller
172: .getBdbEnvironment());
173: } catch (DatabaseException e) {
174: throw new IOException(e.getMessage());
175: }
176: }
177: }
178: uuf.setDestination(this );
179: return uuf;
180: }
181:
182: protected UriUniqFilter deserializeAlreadySeen(
183: final Class<? extends UriUniqFilter> cls, final File dir)
184: throws FileNotFoundException, IOException {
185: UriUniqFilter uuf = null;
186: try {
187: logger.fine("Started deserializing " + cls.getName()
188: + " of checkpoint recover.");
189: uuf = CheckpointUtils.readObjectFromFile(cls, dir);
190: logger.fine("Finished deserializing bdbje as part "
191: + "of checkpoint recover.");
192: } catch (ClassNotFoundException e) {
193: throw new IOException("Failed to deserialize "
194: + cls.getName() + ": " + e.getMessage());
195: }
196: return uuf;
197: }
198:
199: /**
200: * Return the work queue for the given CrawlURI's classKey. URIs
201: * are ordered and politeness-delayed within their 'class'.
202: *
203: * @param curi CrawlURI to base queue on
204: * @return the found or created BdbWorkQueue
205: */
206: protected WorkQueue getQueueFor(CrawlURI curi) {
207: WorkQueue wq;
208: String classKey = curi.getClassKey();
209: synchronized (allQueues) {
210: wq = (WorkQueue) allQueues.get(classKey);
211: if (wq == null) {
212: wq = new BdbWorkQueue(classKey, this );
213: wq.setTotalBudget(((Long) getUncheckedAttribute(curi,
214: ATTR_QUEUE_TOTAL_BUDGET)).longValue());
215: allQueues.put(classKey, wq);
216: }
217: }
218: return wq;
219: }
220:
221: /**
222: * Return the work queue for the given classKey, or null
223: * if no such queue exists.
224: *
225: * @param classKey key to look for
226: * @return the found WorkQueue
227: */
228: protected WorkQueue getQueueFor(String classKey) {
229: WorkQueue wq;
230: synchronized (allQueues) {
231: wq = (WorkQueue) allQueues.get(classKey);
232: }
233: return wq;
234: }
235:
236: public FrontierMarker getInitialMarker(String regexpr,
237: boolean inCacheOnly) {
238: return pendingUris.getInitialMarker(regexpr);
239: }
240:
241: /**
242: * Return list of urls.
243: * @param marker
244: * @param numberOfMatches
245: * @param verbose
246: * @return List of URIs (strings).
247: */
248: public ArrayList<String> getURIsList(FrontierMarker marker,
249: int numberOfMatches, final boolean verbose) {
250: List curis;
251: try {
252: curis = pendingUris.getFrom(marker, numberOfMatches);
253: } catch (DatabaseException e) {
254: e.printStackTrace();
255: throw new RuntimeException(e);
256: }
257: ArrayList<String> results = new ArrayList<String>(curis.size());
258: Iterator iter = curis.iterator();
259: while (iter.hasNext()) {
260: CrawlURI curi = (CrawlURI) iter.next();
261: results.add("[" + curi.getClassKey() + "] "
262: + curi.singleLineReport());
263: }
264: return results;
265: }
266:
267: protected void initQueue() throws IOException {
268: try {
269: this .pendingUris = createMultipleWorkQueues();
270: } catch (DatabaseException e) {
271: throw (IOException) new IOException(e.getMessage())
272: .initCause(e);
273: }
274: }
275:
276: protected void closeQueue() {
277: if (this .pendingUris != null) {
278: this .pendingUris.close();
279: this .pendingUris = null;
280: }
281: }
282:
283: protected BdbMultipleWorkQueues getWorkQueues() {
284: return pendingUris;
285: }
286:
287: protected boolean workQueueDataOnDisk() {
288: return true;
289: }
290:
291: public void initialize(CrawlController c)
292: throws FatalConfigurationException, IOException {
293: super .initialize(c);
294: if (c.isCheckpointRecover()) {
295: // If a checkpoint recover, copy old values from serialized
296: // instance into this Frontier instance. Do it this way because
297: // though its possible to serialize BdbFrontier, its currently not
298: // possible to set/remove frontier attribute plugging the
299: // deserialized object back into the settings system.
300: // The below copying over is error-prone because its easy
301: // to miss a value. Perhaps there's a better way? Introspection?
302: BdbFrontier f = null;
303: try {
304: f = (BdbFrontier) CheckpointUtils.readObjectFromFile(
305: this .getClass(), this .controller
306: .getCheckpointRecover().getDirectory());
307: } catch (FileNotFoundException e) {
308: throw new FatalConfigurationException(
309: "Failed checkpoint " + "recover: "
310: + e.getMessage());
311: } catch (IOException e) {
312: throw new FatalConfigurationException(
313: "Failed checkpoint " + "recover: "
314: + e.getMessage());
315: } catch (ClassNotFoundException e) {
316: throw new FatalConfigurationException(
317: "Failed checkpoint " + "recover: "
318: + e.getMessage());
319: }
320:
321: this .nextOrdinal = f.nextOrdinal;
322: this .totalProcessedBytes = f.totalProcessedBytes;
323: this .disregardedUriCount = f.disregardedUriCount;
324: this .failedFetchCount = f.failedFetchCount;
325: this .processedBytesAfterLastEmittedURI = f.processedBytesAfterLastEmittedURI;
326: this .queuedUriCount = f.queuedUriCount;
327: this .succeededFetchCount = f.succeededFetchCount;
328: this .lastMaxBandwidthKB = f.lastMaxBandwidthKB;
329: this .readyClassQueues = f.readyClassQueues;
330: this .inactiveQueues = f.inactiveQueues;
331: this .retiredQueues = f.retiredQueues;
332: this .snoozedClassQueues = f.snoozedClassQueues;
333: this .inProcessQueues = f.inProcessQueues;
334: wakeQueues();
335: }
336: }
337:
338: public void crawlCheckpoint(File checkpointDir) throws Exception {
339: super .crawlCheckpoint(checkpointDir);
340: logger.fine("Started serializing already seen as part "
341: + "of checkpoint. Can take some time.");
342: // An explicit sync on the any deferred write dbs is needed to make the
343: // db recoverable. Sync'ing the environment doesn't work.
344: if (this .pendingUris != null) {
345: this .pendingUris.sync();
346: }
347: CheckpointUtils.writeObjectToFile(this .alreadyIncluded,
348: checkpointDir);
349: logger.fine("Finished serializing already seen as part "
350: + "of checkpoint.");
351: // Serialize ourselves.
352: CheckpointUtils.writeObjectToFile(this, checkpointDir);
353: }
354: }
|