001: /* CrawlMapper
002: *
003: * Created on Sep 30, 2005
004: *
005: * Copyright (C) 2005 Internet Archive.
006: *
007: * This file is part of the Heritrix web crawler (crawler.archive.org).
008: *
009: * Heritrix is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU Lesser Public License as published by
011: * the Free Software Foundation; either version 2.1 of the License, or
012: * any later version.
013: *
014: * Heritrix is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
017: * GNU Lesser Public License for more details.
018: *
019: * You should have received a copy of the GNU Lesser Public License
020: * along with Heritrix; if not, write to the Free Software
021: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
022: */
023: package org.archive.crawler.processor;
024:
025: import java.io.BufferedOutputStream;
026: import java.io.File;
027: import java.io.FileNotFoundException;
028: import java.io.FileOutputStream;
029: import java.io.PrintWriter;
030: import java.util.HashMap;
031: import java.util.Iterator;
032:
033: import javax.management.AttributeNotFoundException;
034:
035: import org.archive.crawler.datamodel.CandidateURI;
036: import org.archive.crawler.datamodel.CrawlURI;
037: import org.archive.crawler.datamodel.FetchStatusCodes;
038: import org.archive.crawler.deciderules.DecideRule;
039: import org.archive.crawler.deciderules.DecideRuleSequence;
040: import org.archive.crawler.framework.Processor;
041: import org.archive.crawler.settings.SimpleType;
042: import org.archive.util.ArchiveUtils;
043: import org.archive.util.fingerprint.ArrayLongFPCache;
044:
045: import st.ata.util.FPGenerator;
046:
047: /**
048: * A simple crawl splitter/mapper, dividing up CandidateURIs/CrawlURIs
049: * between crawlers by diverting some range of URIs to local log files
050: * (which can then be imported to other crawlers).
051: *
052: * May operate on a CrawlURI (typically early in the processing chain) or
053: * its CandidateURI outlinks (late in the processing chain, after
054: * LinksScoper), or both (if inserted and configured in both places).
055: *
056: * <p>Applies a map() method, supplied by a concrete subclass, to
057: * classKeys to map URIs to crawlers by name.
058: *
059: * <p>One crawler name is distinguished as the 'local name'; URIs mapped to
060: * this name are not diverted, but continue to be processed normally.
061: *
062: * <p>If using the JMX importUris operation importing URLs dropped by
063: * a {@link CrawlMapper} instance, use <code>recoveryLog</code> style.
064: *
065: * @author gojomo
066: * @version $Date: 2006-09-26 23:47:15 +0000 (Tue, 26 Sep 2006) $, $Revision: 4671 $
067: */
068: public abstract class CrawlMapper extends Processor implements
069: FetchStatusCodes {
070: /**
071: * PrintWriter which remembers the File to which it writes.
072: */
073: private class FilePrintWriter extends PrintWriter {
074: File file;
075:
076: public FilePrintWriter(File file) throws FileNotFoundException {
077: super (new BufferedOutputStream(new FileOutputStream(file)));
078: this .file = file;
079: }
080:
081: public File getFile() {
082: return file;
083: }
084: }
085:
086: /** whether to map CrawlURI itself (if status nonpositive) */
087: public static final String ATTR_CHECK_URI = "check-uri";
088: public static final Boolean DEFAULT_CHECK_URI = Boolean.TRUE;
089:
090: /** whether to map CrawlURI's outlinks (if CandidateURIs) */
091: public static final String ATTR_CHECK_OUTLINKS = "check-outlinks";
092: public static final Boolean DEFAULT_CHECK_OUTLINKS = Boolean.TRUE;
093:
094: /** decide rules to determine if an outlink is subject to mapping */
095: public static final String ATTR_MAP_OUTLINK_DECIDE_RULES = "decide-rules";
096:
097: /** name of local crawler (URIs mapped to here are not diverted) */
098: public static final String ATTR_LOCAL_NAME = "local-name";
099: public static final String DEFAULT_LOCAL_NAME = ".";
100:
101: /** where to log diversions */
102: public static final String ATTR_DIVERSION_DIR = "diversion-dir";
103: public static final String DEFAULT_DIVERSION_DIR = "diversions";
104:
105: /** rotate logs when change occurs within this # of digits of timestamp */
106: public static final String ATTR_ROTATION_DIGITS = "rotation-digits";
107: public static final Integer DEFAULT_ROTATION_DIGITS = new Integer(
108: 10); // hourly
109:
110: /**
111: * Mapping of target crawlers to logs (PrintWriters)
112: */
113: HashMap<String, PrintWriter> diversionLogs = new HashMap<String, PrintWriter>();
114:
115: /**
116: * Truncated timestamp prefix for diversion logs; when
117: * current time doesn't match, it's time to close all
118: * current logs.
119: */
120: String logGeneration = "";
121:
122: /** name of the enclosing crawler (URIs mapped here stay put) */
123: protected String localName;
124:
125: protected ArrayLongFPCache cache;
126:
127: /**
128: * Constructor.
129: * @param name Name of this processor.
130: */
131: public CrawlMapper(String name, String description) {
132: super (name, description);
133: addElementToDefinition(new SimpleType(
134: ATTR_LOCAL_NAME,
135: "Name of local crawler node; mappings to this name "
136: + "result in normal processing (no diversion).",
137: DEFAULT_LOCAL_NAME));
138: addElementToDefinition(new SimpleType(ATTR_DIVERSION_DIR,
139: "Directory to write diversion logs.",
140: DEFAULT_DIVERSION_DIR));
141: addElementToDefinition(new SimpleType(
142: ATTR_CHECK_URI,
143: "Whether to apply the mapping to a URI being processed "
144: + "itself, for example early in processing (while its "
145: + "status is still 'unattempted').",
146: DEFAULT_CHECK_URI));
147: addElementToDefinition(new SimpleType(
148: ATTR_CHECK_OUTLINKS,
149: "Whether to apply the mapping to discovered outlinks, "
150: + "for example after extraction has occurred. ",
151: DEFAULT_CHECK_OUTLINKS));
152: addElementToDefinition(new DecideRuleSequence(
153: ATTR_MAP_OUTLINK_DECIDE_RULES));
154: addElementToDefinition(new SimpleType(
155: ATTR_ROTATION_DIGITS,
156: "Number of timestamp digits to use as prefix of log "
157: + "names (grouping all diversions from that period in "
158: + "a single log). Default is 10 (hourly log rotation).",
159: DEFAULT_ROTATION_DIGITS));
160: }
161:
162: protected void innerProcess(CrawlURI curi) {
163: String nowGeneration = ArchiveUtils.get14DigitDate().substring(
164: 0,
165: ((Integer) getUncheckedAttribute(null,
166: ATTR_ROTATION_DIGITS)).intValue());
167: if (!nowGeneration.equals(logGeneration)) {
168: updateGeneration(nowGeneration);
169: }
170:
171: if (curi.getFetchStatus() == 0
172: && ((Boolean) getUncheckedAttribute(null,
173: ATTR_CHECK_URI)).booleanValue()) {
174: // apply mapping to the CrawlURI itself
175: String target = map(curi);
176: if (!localName.equals(target)) {
177: // CrawlURI is mapped to somewhere other than here
178: curi.setFetchStatus(S_BLOCKED_BY_CUSTOM_PROCESSOR);
179: curi.addAnnotation("to:" + target);
180: curi.skipToProcessorChain(getController()
181: .getPostprocessorChain());
182: divertLog(curi, target);
183: } else {
184: // localName means keep locally; do nothing
185: }
186: }
187:
188: if (curi.getOutLinks().size() > 0
189: && ((Boolean) getUncheckedAttribute(null,
190: ATTR_CHECK_OUTLINKS)).booleanValue()) {
191: // consider outlinks for mapping
192: Iterator<CandidateURI> iter = curi.getOutCandidates()
193: .iterator();
194: while (iter.hasNext()) {
195: CandidateURI cauri = iter.next();
196: if (decideToMapOutlink(cauri)) {
197: // apply mapping to the CandidateURI
198: String target = map(cauri);
199: if (!localName.equals(target)) {
200: // CandidateURI is mapped to somewhere other than here
201: iter.remove();
202: divertLog(cauri, target);
203: } else {
204: // localName means keep locally; do nothing
205: }
206: }
207: }
208: }
209: }
210:
211: protected boolean decideToMapOutlink(CandidateURI cauri) {
212: boolean rejected = getMapOutlinkDecideRule(cauri).decisionFor(
213: cauri).equals(DecideRule.REJECT);
214: return !rejected;
215: }
216:
217: protected DecideRule getMapOutlinkDecideRule(Object o) {
218: try {
219: return (DecideRule) getAttribute(o,
220: ATTR_MAP_OUTLINK_DECIDE_RULES);
221: } catch (AttributeNotFoundException e) {
222: throw new RuntimeException(e);
223: }
224: }
225:
226: /**
227: * Close and mark as finished all existing diversion logs, and
228: * arrange for new logs to use the new generation prefix.
229: *
230: * @param nowGeneration new generation (timestamp prefix) to use
231: */
232: protected synchronized void updateGeneration(String nowGeneration) {
233: // all existing logs are of a previous generation
234: Iterator iter = diversionLogs.values().iterator();
235: while (iter.hasNext()) {
236: FilePrintWriter writer = (FilePrintWriter) iter.next();
237: writer.close();
238: writer.getFile().renameTo(
239: new File(writer.getFile().getAbsolutePath()
240: .replaceFirst("\\.open$", ".divert")));
241: }
242: diversionLogs.clear();
243: logGeneration = nowGeneration;
244: }
245:
246: /**
247: * Look up the crawler node name to which the given CandidateURI
248: * should be mapped.
249: *
250: * @param cauri CandidateURI to consider
251: * @return String node name which should handle URI
252: */
253: protected abstract String map(CandidateURI cauri);
254:
255: /**
256: * Note the given CandidateURI in the appropriate diversion log.
257: *
258: * @param cauri CandidateURI to append to a diversion log
259: * @param target String node name (log name) to receive URI
260: */
261: protected synchronized void divertLog(CandidateURI cauri,
262: String target) {
263: if (recentlySeen(cauri)) {
264: return;
265: }
266: PrintWriter diversionLog = getDiversionLog(target);
267: cauri.singleLineReportTo(diversionLog);
268: diversionLog.println();
269: }
270:
271: /**
272: * Consult the cache to determine if the given URI
273: * has been recently seen -- entering it if not.
274: *
275: * @param cauri CandidateURI to test
276: * @return true if URI was already in the cache; false otherwise
277: */
278: private boolean recentlySeen(CandidateURI cauri) {
279: long fp = FPGenerator.std64.fp(cauri.toString());
280: return !cache.add(fp);
281: }
282:
283: /**
284: * Get the diversion log for a given target crawler node node.
285: *
286: * @param target crawler node name of requested log
287: * @return PrintWriter open on an appropriately-named
288: * log file
289: */
290: protected PrintWriter getDiversionLog(String target) {
291: FilePrintWriter writer = (FilePrintWriter) diversionLogs
292: .get(target);
293: if (writer == null) {
294: String divertDirPath = (String) getUncheckedAttribute(null,
295: ATTR_DIVERSION_DIR);
296: File divertDir = new File(divertDirPath);
297: if (!divertDir.isAbsolute()) {
298: divertDir = new File(getSettingsHandler().getOrder()
299: .getController().getDisk(), divertDirPath);
300: }
301: divertDir.mkdirs();
302: File divertLog = new File(divertDir, logGeneration + "-"
303: + localName + "-to-" + target + ".open");
304: try {
305: writer = new FilePrintWriter(divertLog);
306: } catch (FileNotFoundException e) {
307: // TODO Auto-generated catch block
308: e.printStackTrace();
309: throw new RuntimeException(e);
310: }
311: diversionLogs.put(target, writer);
312: }
313: return writer;
314: }
315:
316: protected void initialTasks() {
317: super .initialTasks();
318: localName = (String) getUncheckedAttribute(null,
319: ATTR_LOCAL_NAME);
320: cache = new ArrayLongFPCache();
321: }
322: }
|