001: /*
002: * LowDiskPauseProcessor
003: *
004: * $Id: LowDiskPauseProcessor.java 4654 2006-09-25 20:19:54Z paul_jack $
005: *
006: * Created on Jun 5, 2003
007: *
008: * Copyright (C) 2003 Internet Archive.
009: *
010: * This file is part of the Heritrix web crawler (crawler.archive.org).
011: *
012: * Heritrix is free software; you can redistribute it and/or modify
013: * it under the terms of the GNU Lesser Public License as published by
014: * the Free Software Foundation; either version 2.1 of the License, or
015: * any later version.
016: *
017: * Heritrix is distributed in the hope that it will be useful,
018: * but WITHOUT ANY WARRANTY; without even the implied warranty of
019: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
020: * GNU Lesser Public License for more details.
021: *
022: * You should have received a copy of the GNU Lesser Public License
023: * along with Heritrix; if not, write to the Free Software
024: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
025: */
026: package org.archive.crawler.postprocessor;
027:
028: import java.io.IOException;
029: import java.util.Arrays;
030: import java.util.List;
031: import java.util.logging.Level;
032: import java.util.logging.Logger;
033: import java.util.regex.Matcher;
034: import java.util.regex.Pattern;
035:
036: import org.archive.crawler.datamodel.CrawlURI;
037: import org.archive.crawler.framework.Processor;
038: import org.archive.crawler.settings.SimpleType;
039: import org.archive.crawler.settings.Type;
040: import org.archive.util.IoUtils;
041:
042: /**
043: * Processor module which uses 'df -k', where available and with
044: * the expected output format (on Linux), to monitor available
045: * disk space and pause the crawl if free space on monitored
046: * filesystems falls below certain thresholds.
047: */
048: public class LowDiskPauseProcessor extends Processor {
049:
050: private static final long serialVersionUID = 3338337700768396302L;
051:
052: /**
053: * Logger.
054: */
055: private static final Logger logger = Logger
056: .getLogger(LowDiskPauseProcessor.class.getName());
057:
058: /**
059: * List of mounts to monitor; should match "Mounted on" column of 'df' output
060: */
061: public static final String ATTR_MONITOR_MOUNTS = "monitor-mounts";
062: public static final String DEFAULT_MONITOR_MOUNTS = "";
063:
064: /**
065: * Space available level below which a crawl-pause should be triggered.
066: */
067: public static final String ATTR_PAUSE_THRESHOLD = "pause-threshold-kb";
068: public static final int DEFAULT_PAUSE_THRESHOLD = 500 * 1024; // 500MB
069:
070: /**
071: * Amount of content received between each recheck of free space
072: */
073: public static final String ATTR_RECHECK_THRESHOLD = "recheck-threshold-kb";
074: public static final int DEFAULT_RECHECK_THRESHOLD = 200 * 1024; // 200MB
075:
076: protected int contentSinceCheck = 0;
077:
078: public static final Pattern VALID_DF_OUTPUT = Pattern
079: .compile("(?s)^Filesystem\\s+1K-blocks\\s+Used\\s+Available\\s+Use%\\s+Mounted on\\n.*");
080: public static final Pattern AVAILABLE_EXTRACTOR = Pattern
081: .compile("(?m)\\s(\\d+)\\s+\\d+%\\s+(\\S+)$");
082:
083: /**
084: * @param name Name of this writer.
085: */
086: public LowDiskPauseProcessor(String name) {
087: super (name, "LowDiskPause processor");
088: Type e = addElementToDefinition(new SimpleType(
089: ATTR_MONITOR_MOUNTS,
090: "Space-delimited list of filessystem mounts whose "
091: + "'available' space should be monitored via 'df' "
092: + "(if available).", DEFAULT_MONITOR_MOUNTS));
093: e.setOverrideable(false);
094: e = addElementToDefinition(new SimpleType(
095: ATTR_PAUSE_THRESHOLD,
096: "When available space on any monitored mounts falls "
097: + "below this threshold, the crawl will be paused. ",
098: new Integer(DEFAULT_PAUSE_THRESHOLD)));
099: e = addElementToDefinition(new SimpleType(
100: ATTR_RECHECK_THRESHOLD,
101: "Available space via 'df' is rechecked after every "
102: + "increment of this much content (uncompressed) is "
103: + "observed. ", new Integer(
104: DEFAULT_RECHECK_THRESHOLD)));
105: e.setOverrideable(false);
106: }
107:
108: /**
109: * Notes a CrawlURI's content size in its running tally. If the
110: * recheck increment of content has passed through since the last
111: * available-space check, checks available space and pauses the
112: * crawl if any monitored mounts are below the configured threshold.
113: *
114: * @param curi CrawlURI to process.
115: */
116: protected void innerProcess(CrawlURI curi) {
117: contentSinceCheck += curi.getContentSize();
118: synchronized (this ) {
119: if (contentSinceCheck / 1024 > ((Integer) getUncheckedAttribute(
120: null, ATTR_RECHECK_THRESHOLD)).intValue()) {
121: checkAvailableSpace(curi);
122: contentSinceCheck = 0;
123: }
124: }
125: }
126:
127: /**
128: * Probe via 'df' to see if monitored mounts have fallen
129: * below the pause available threshold. If so, request a
130: * crawl pause.
131: * @param curi Current context.
132: */
133: private void checkAvailableSpace(CrawlURI curi) {
134: try {
135: String df = IoUtils.readFullyAsString(Runtime.getRuntime()
136: .exec("df -k").getInputStream());
137: Matcher matcher = VALID_DF_OUTPUT.matcher(df);
138: if (!matcher.matches()) {
139: logger
140: .severe("'df -k' output unacceptable for low-disk checking");
141: return;
142: }
143: List monitoredMounts = Arrays
144: .asList(((String) getUncheckedAttribute(null,
145: ATTR_MONITOR_MOUNTS)).split("\\s*"));
146: matcher = AVAILABLE_EXTRACTOR.matcher(df);
147: while (matcher.find()) {
148: String mount = matcher.group(2);
149: if (monitoredMounts.contains(mount)) {
150: long availKilobytes = Long.parseLong(matcher
151: .group(1));
152: int thresholdKilobytes = ((Integer) getUncheckedAttribute(
153: null, ATTR_PAUSE_THRESHOLD)).intValue();
154: if (availKilobytes < thresholdKilobytes) {
155: getController().requestCrawlPause();
156: logger.log(Level.SEVERE, "Low Disk Pause",
157: availKilobytes + "K available on "
158: + mount + " (below threshold "
159: + thresholdKilobytes + "K)");
160: break;
161: }
162: }
163: }
164: } catch (IOException e) {
165: curi.addLocalizedError(this .getName(), e,
166: "problem checking available space via 'df'");
167: }
168: }
169: }
|