001: /* Checkpointer
002: *
003: * $Id: Checkpointer.java 4550 2006-08-29 00:19:31Z stack-sf $
004: *
005: * Created on Apr 19, 2004
006: *
007: * Copyright (C) 2004 Internet Archive.
008: *
009: * This file is part of the Heritrix web crawler (crawler.archive.org).
010: *
011: * Heritrix is free software; you can redistribute it and/or modify
012: * it under the terms of the GNU Lesser Public License as published by
013: * the Free Software Foundation; either version 2.1 of the License, or
014: * any later version.
015: *
016: * Heritrix is distributed in the hope that it will be useful,
017: * but WITHOUT ANY WARRANTY; without even the implied warranty of
018: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
019: * GNU Lesser Public License for more details.
020: *
021: * You should have received a copy of the GNU Lesser Public License
022: * along with Heritrix; if not, write to the Free Software
023: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
024: */
025: package org.archive.crawler.framework;
026:
027: import java.io.File;
028: import java.io.FileOutputStream;
029: import java.io.IOException;
030: import java.io.Serializable;
031: import java.text.DecimalFormat;
032: import java.util.LinkedList;
033: import java.util.List;
034: import java.util.Timer;
035: import java.util.TimerTask;
036: import java.util.logging.Level;
037: import java.util.logging.Logger;
038:
039: import org.archive.crawler.datamodel.Checkpoint;
040: import org.archive.util.ArchiveUtils;
041:
042: /**
043: * Runs checkpointing.
044: * Also keeps history of crawl checkpoints Generally used by CrawlController
045: * only but also has static utility methods classes that need to participate in
046: * a checkpoint can use.
047: *
048: * @author gojomo
049: * @author stack
050: */
051: public class Checkpointer implements Serializable {
052: private static final long serialVersionUID = 7610078446694353173L;
053:
054: private final static Logger LOGGER = Logger
055: .getLogger(Checkpointer.class.getName());
056:
057: private static final String DEFAULT_PREFIX = "";
058:
059: /**
060: * String to prefix any new checkpoint names.
061: */
062: private String checkpointPrefix = DEFAULT_PREFIX;
063:
064: /**
065: * Next overall series checkpoint number.
066: */
067: private int nextCheckpoint = 1;
068:
069: /**
070: * All checkpoint names in chain prior to now. May not all still
071: * exist on disk.
072: */
073: private List predecessorCheckpoints = new LinkedList();
074:
075: /**
076: * If a checkpoint has begun, its directory under
077: * <code>checkpointDirectory</code>.
078: */
079: private transient File checkpointInProgressDir = null;
080:
081: /**
082: * If the checkpoint in progress has encountered fatal errors.
083: */
084: private transient boolean checkpointErrors = false;
085:
086: /**
087: * checkpointThread is set if a checkpoint is currently running.
088: */
089: private transient Thread checkpointThread = null;
090:
091: private transient CrawlController controller;
092:
093: /**
094: * Setup in constructor or on a call to revovery.
095: */
096: private transient Timer timerThread = null;
097:
098: public static final DecimalFormat INDEX_FORMAT = new DecimalFormat(
099: "00000");
100:
101: /**
102: * Create a new CheckpointContext with the given store directory
103: * @param cc CrawlController instance thats hosting this Checkpointer.
104: * @param checkpointDir Where to store checkpoint.
105: */
106: public Checkpointer(final CrawlController cc,
107: final File checkpointDir) {
108: this (cc, DEFAULT_PREFIX);
109: }
110:
111: /**
112: * Create a new CheckpointContext with the given store directory
113: *
114: * @param cc CrawlController instance thats hosting this Checkpointer.
115: * @param prefix Prefix for checkpoint label.
116: */
117: public Checkpointer(final CrawlController cc, final String prefix) {
118: super ();
119: initialize(cc, prefix);
120:
121: }
122:
123: protected void initialize(final CrawlController cc,
124: final String prefix) {
125: this .controller = cc;
126: this .checkpointPrefix = prefix;
127: // Period is in hours.
128: int period = Integer.parseInt(System.getProperties()
129: .getProperty(this .getClass().getName() + ".period",
130: "-1"));
131: if (period <= 0) {
132: return;
133: }
134: // Convert period from hours to milliseconds.
135: long periodMs = period * (1000 * 60 * 60);
136: TimerTask tt = new TimerTask() {
137: private CrawlController cController = cc;
138:
139: public void run() {
140: if (isCheckpointing()) {
141: LOGGER
142: .info("CheckpointTimerThread skipping checkpoint, "
143: + "already checkpointing: State: "
144: + this .cController.getState());
145: return;
146: }
147: LOGGER.info("TimerThread request checkpoint");
148: this .cController.requestCrawlCheckpoint();
149: }
150: };
151: this .timerThread = new Timer(true);
152: this .timerThread.schedule(tt, periodMs, periodMs);
153: LOGGER
154: .info("Installed Checkpoint TimerThread to checkpoint every "
155: + period + " hour(s).");
156: }
157:
158: void cleanup() {
159: if (this .timerThread != null) {
160: LOGGER.info("Cleanedup Checkpoint TimerThread.");
161: this .timerThread.cancel();
162: }
163: }
164:
165: /**
166: * @return Returns the nextCheckpoint index.
167: */
168: public int getNextCheckpoint() {
169: return this .nextCheckpoint;
170: }
171:
172: /**
173: * Run a checkpoint of the crawler.
174: */
175: public void checkpoint() {
176: String name = "Checkpoint-" + getNextCheckpointName();
177: this .checkpointThread = new CheckpointingThread(name);
178: this .checkpointThread.setDaemon(true);
179: this .checkpointThread.start();
180: }
181:
182: /**
183: * Thread to run the checkpointing.
184: * @author stack
185: */
186: public class CheckpointingThread extends Thread {
187: public CheckpointingThread(final String name) {
188: super (name);
189: }
190:
191: public CrawlController getController() {
192: return Checkpointer.this .controller;
193: }
194:
195: public void run() {
196: LOGGER.info("Started");
197: // If crawler already paused, don't resume crawling after
198: // finishing checkpointing.
199: final boolean alreadyPaused = getController().isPaused()
200: || getController().isPausing();
201: try {
202: getController().requestCrawlPause();
203: // Clear any checkpoint errors.
204: setCheckpointErrors(false);
205: if (!waitOnPaused()) {
206: checkpointFailed("Failed wait for complete pause.");
207: } else {
208: createCheckpointInProgressDirectory();
209: this .getController().checkpoint();
210: }
211: } catch (Exception e) {
212: checkpointFailed(e);
213: } finally {
214: if (!isCheckpointErrors()) {
215: writeValidity();
216: }
217: Checkpointer.this .nextCheckpoint++;
218: clearCheckpointInProgressDirectory();
219: LOGGER.info("Finished");
220: getController().completePause();
221: if (!alreadyPaused) {
222: getController().requestCrawlResume();
223: }
224: }
225: }
226:
227: private synchronized boolean waitOnPaused() {
228: // If we're paused we can exit but also exit if the crawl has been
229: // resumed by the operator.
230: while (!getController().isPaused()
231: && !getController().isRunning()) {
232: try {
233: wait(1000 * 3);
234: } catch (InterruptedException e) {
235: // May be for us.
236: }
237: }
238: return getController().isPaused();
239: }
240: }
241:
242: protected File createCheckpointInProgressDirectory() {
243: this .checkpointInProgressDir = new File(
244: Checkpointer.this .controller.getCheckpointsDisk(),
245: getNextCheckpointName());
246: this .checkpointInProgressDir.mkdirs();
247: return this .checkpointInProgressDir;
248: }
249:
250: protected void clearCheckpointInProgressDirectory() {
251: this .checkpointInProgressDir = null;
252: }
253:
254: protected CrawlController getController() {
255: return this .controller;
256: }
257:
258: /**
259: * @return next checkpoint name (zero-padding string).
260: */
261: public String getNextCheckpointName() {
262: return formatCheckpointName(this .checkpointPrefix,
263: this .nextCheckpoint);
264: }
265:
266: public static String formatCheckpointName(final String prefix,
267: final int index) {
268: return prefix + INDEX_FORMAT.format(index);
269: }
270:
271: protected void writeValidity() {
272: File valid = new File(this .checkpointInProgressDir,
273: Checkpoint.VALIDITY_STAMP_FILENAME);
274: try {
275: FileOutputStream fos = new FileOutputStream(valid);
276: fos.write(ArchiveUtils.get14DigitDate().getBytes());
277: fos.close();
278: } catch (IOException e) {
279: valid.delete();
280: }
281: }
282:
283: /**
284: * @return Checkpoint directory. Name of the directory is the name of this
285: * current checkpoint. Null if no checkpoint in progress.
286: */
287: public File getCheckpointInProgressDirectory() {
288: return this .checkpointInProgressDir;
289: }
290:
291: /**
292: * @return True if a checkpoint is in progress.
293: */
294: public boolean isCheckpointing() {
295: return this .checkpointThread != null
296: && this .checkpointThread.isAlive();
297: }
298:
299: /**
300: * Note that a checkpoint failed
301: *
302: * @param e Exception checkpoint failed on.
303: */
304: protected void checkpointFailed(Exception e) {
305: LOGGER.log(Level.WARNING, " Checkpoint failed", e);
306: checkpointFailed();
307: }
308:
309: protected void checkpointFailed(final String message) {
310: LOGGER.warning(message);
311: checkpointFailed();
312: }
313:
314: protected void checkpointFailed() {
315: this .checkpointErrors = true;
316: }
317:
318: /**
319: * @return True if current/last checkpoint failed.
320: */
321: public boolean isCheckpointFailed() {
322: return this .checkpointErrors;
323: }
324:
325: /**
326: * @return Return whether this context is at a new crawl, never-
327: * checkpointed state.
328: */
329: public boolean isAtBeginning() {
330: return nextCheckpoint == 1;
331: }
332:
333: /**
334: * Call when recovering from a checkpoint.
335: * Call this after instance has been revivifyied post-serialization to
336: * amend counters and directories that effect where checkpoints get stored
337: * from here on out.
338: * @param cc CrawlController instance.
339: */
340: public void recover(final CrawlController cc) {
341: // Prepend the checkpoint name with a little 'r' so we tell apart
342: // checkpoints made from a recovery. Allow for there being
343: // multiple 'r' prefixes.
344: initialize(cc, 'r' + this .checkpointPrefix);
345: }
346:
347: /**
348: * @return Returns the predecessorCheckpoints.
349: */
350: public List getPredecessorCheckpoints() {
351: return this .predecessorCheckpoints;
352: }
353:
354: protected boolean isCheckpointErrors() {
355: return this .checkpointErrors;
356: }
357:
358: protected void setCheckpointErrors(boolean checkpointErrors) {
359: this.checkpointErrors = checkpointErrors;
360: }
361: }
|