001: /* Copyright (C) 2003 Internet Archive.
002: *
003: * This file is part of the Heritrix web crawler (crawler.archive.org).
004: *
005: * Heritrix is free software; you can redistribute it and/or modify
006: * it under the terms of the GNU Lesser Public License as published by
007: * the Free Software Foundation; either version 2.1 of the License, or
008: * any later version.
009: *
010: * Heritrix is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013: * GNU Lesser Public License for more details.
014: *
015: * You should have received a copy of the GNU Lesser Public License
016: * along with Heritrix; if not, write to the Free Software
017: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018: *
019: * ToeThread.java
020: * Created on May 14, 2003
021: *
022: * $Header$
023: */
024: package org.archive.crawler.framework;
025:
026: import java.io.PrintWriter;
027: import java.util.HashMap;
028: import java.util.logging.Level;
029: import java.util.logging.Logger;
030:
031: import org.archive.crawler.datamodel.CoreAttributeConstants;
032: import org.archive.crawler.datamodel.CrawlOrder;
033: import org.archive.crawler.datamodel.CrawlURI;
034: import org.archive.crawler.datamodel.FetchStatusCodes;
035: import org.archive.crawler.datamodel.InstancePerThread;
036: import org.archive.crawler.framework.exceptions.EndedException;
037: import org.archive.util.ArchiveUtils;
038: import org.archive.util.DevUtils;
039: import org.archive.util.HttpRecorder;
040: import org.archive.util.HttpRecorderMarker;
041: import org.archive.util.ProgressStatisticsReporter;
042: import org.archive.util.Reporter;
043:
044: import com.sleepycat.util.RuntimeExceptionWrapper;
045:
046: /**
047: * One "worker thread"; asks for CrawlURIs, processes them,
048: * repeats unless told otherwise.
049: *
050: * @author Gordon Mohr
051: */
052: public class ToeThread extends Thread implements
053: CoreAttributeConstants, FetchStatusCodes, HttpRecorderMarker,
054: Reporter, ProgressStatisticsReporter {
055: private static final String STEP_NASCENT = "NASCENT";
056: private static final String STEP_ABOUT_TO_GET_URI = "ABOUT_TO_GET_URI";
057: private static final String STEP_FINISHED = "FINISHED";
058: private static final String STEP_ABOUT_TO_BEGIN_CHAIN = "ABOUT_TO_BEGIN_CHAIN";
059: private static final String STEP_ABOUT_TO_BEGIN_PROCESSOR = "ABOUT_TO_BEGIN_PROCESSOR";
060: private static final String STEP_DONE_WITH_PROCESSORS = "DONE_WITH_PROCESSORS";
061: private static final String STEP_HANDLING_RUNTIME_EXCEPTION = "HANDLING_RUNTIME_EXCEPTION";
062: private static final String STEP_ABOUT_TO_RETURN_URI = "ABOUT_TO_RETURN_URI";
063: private static final String STEP_FINISHING_PROCESS = "FINISHING_PROCESS";
064:
065: private static Logger logger = Logger
066: .getLogger("org.archive.crawler.framework.ToeThread");
067:
068: private CrawlController controller;
069: private int serialNumber;
070:
071: /**
072: * Each ToeThead has an instance of HttpRecord that gets used
073: * over and over by each request.
074: *
075: * @see org.archive.util.HttpRecorderMarker
076: */
077: private HttpRecorder httpRecorder = null;
078:
079: private HashMap<String, Processor> localProcessors = new HashMap<String, Processor>();
080: private String currentProcessorName = "";
081:
082: private String coreName;
083: private CrawlURI currentCuri;
084: private long lastStartTime;
085: private long lastFinishTime;
086:
087: // activity monitoring, debugging, and problem detection
088: private String step = STEP_NASCENT;
089: private long atStepSince;
090:
091: // default priority; may not be meaningful in recent JVMs
092: private static final int DEFAULT_PRIORITY = Thread.NORM_PRIORITY - 2;
093:
094: // indicator that a thread is now surplus based on current desired
095: // count; it should wrap up cleanly
096: private volatile boolean shouldRetire = false;
097:
098: /**
099: * Create a ToeThread
100: *
101: * @param g ToeThreadGroup
102: * @param sn serial number
103: */
104: public ToeThread(ToePool g, int sn) {
105: // TODO: add crawl name?
106: super (g, "ToeThread #" + sn);
107: coreName = "ToeThread #" + sn + ": ";
108: controller = g.getController();
109: serialNumber = sn;
110: setPriority(DEFAULT_PRIORITY);
111: int outBufferSize = ((Integer) controller.getOrder()
112: .getUncheckedAttribute(null,
113: CrawlOrder.ATTR_RECORDER_OUT_BUFFER))
114: .intValue();
115: int inBufferSize = ((Integer) controller.getOrder()
116: .getUncheckedAttribute(null,
117: CrawlOrder.ATTR_RECORDER_IN_BUFFER)).intValue();
118: httpRecorder = new HttpRecorder(controller.getScratchDisk(),
119: "tt" + sn + "http", outBufferSize, inBufferSize);
120: lastFinishTime = System.currentTimeMillis();
121: }
122:
123: /** (non-Javadoc)
124: * @see java.lang.Thread#run()
125: */
126: public void run() {
127: String name = controller.getOrder().getCrawlOrderName();
128: logger.fine(getName() + " started for order '" + name + "'");
129:
130: try {
131: while (true) {
132: // TODO check for thread-abort? or is waiting for interrupt enough?
133: continueCheck();
134:
135: setStep(STEP_ABOUT_TO_GET_URI);
136:
137: CrawlURI curi = controller.getFrontier().next();
138:
139: synchronized (this ) {
140: continueCheck();
141: setCurrentCuri(curi);
142: }
143:
144: processCrawlUri();
145:
146: setStep(STEP_ABOUT_TO_RETURN_URI);
147: continueCheck();
148:
149: synchronized (this ) {
150: controller.getFrontier().finished(currentCuri);
151: setCurrentCuri(null);
152: }
153:
154: setStep(STEP_FINISHING_PROCESS);
155: lastFinishTime = System.currentTimeMillis();
156: controller.releaseContinuePermission();
157: if (shouldRetire) {
158: break; // from while(true)
159: }
160: }
161: } catch (EndedException e) {
162: // crawl ended (or thread was retired), so allow thread to end
163: } catch (Exception e) {
164: // everything else (including interruption)
165: logger.log(Level.SEVERE, "Fatal exception in " + getName(),
166: e);
167: } catch (OutOfMemoryError err) {
168: seriousError(err);
169: } finally {
170: controller.releaseContinuePermission();
171: }
172: setCurrentCuri(null);
173: // Do cleanup so that objects can be GC.
174: this .httpRecorder.closeRecorders();
175: this .httpRecorder = null;
176: localProcessors = null;
177:
178: logger.fine(getName() + " finished for order '" + name + "'");
179: setStep(STEP_FINISHED);
180: controller.toeEnded();
181: controller = null;
182: }
183:
184: /**
185: * Set currentCuri, updating thread name as appropriate
186: * @param curi
187: */
188: private void setCurrentCuri(CrawlURI curi) {
189: if (curi == null) {
190: setName(coreName);
191: } else {
192: setName(coreName + curi);
193: }
194: currentCuri = curi;
195: }
196:
197: /**
198: * @param s
199: */
200: private void setStep(String s) {
201: step = s;
202: atStepSince = System.currentTimeMillis();
203: }
204:
205: private void seriousError(Error err) {
206: // try to prevent timeslicing until we have a chance to deal with OOM
207: // TODO: recognize that new JVM priority indifference may make this
208: // priority-jumbling pointless
209: setPriority(DEFAULT_PRIORITY + 1);
210: if (controller != null) {
211: // hold all ToeThreads from proceeding to next processor
212: controller.singleThreadMode();
213: // TODO: consider if SoftReferences would be a better way to
214: // engineer a soft-landing for low-memory conditions
215: controller.freeReserveMemory();
216: controller.requestCrawlPause();
217: if (controller.getFrontier().getFrontierJournal() != null) {
218: controller.getFrontier().getFrontierJournal()
219: .seriousError(getName() + err.getMessage());
220: }
221: }
222:
223: // OutOfMemory etc.
224: String extraInfo = DevUtils.extraInfo();
225: System.err.println("<<<");
226: System.err.println(ArchiveUtils.getLog17Date());
227: System.err.println(err);
228: System.err.println(extraInfo);
229: err.printStackTrace(System.err);
230:
231: if (controller != null) {
232: PrintWriter pw = new PrintWriter(System.err);
233: controller.getToePool().compactReportTo(pw);
234: pw.flush();
235: }
236: System.err.println(">>>");
237: // DevUtils.sigquitSelf();
238:
239: String context = "unknown";
240: if (currentCuri != null) {
241: // update fetch-status, saving original as annotation
242: currentCuri
243: .addAnnotation("err=" + err.getClass().getName());
244: currentCuri.addAnnotation("os"
245: + currentCuri.getFetchStatus());
246: currentCuri.setFetchStatus(S_SERIOUS_ERROR);
247: context = currentCuri.singleLineReport() + " in "
248: + currentProcessorName;
249: }
250: String message = "Serious error occured trying "
251: + "to process '" + context + "'\n" + extraInfo;
252: logger.log(Level.SEVERE, message.toString(), err);
253: setPriority(DEFAULT_PRIORITY);
254: }
255:
256: /**
257: * Perform checks as to whether normal execution should proceed.
258: *
259: * If an external interrupt is detected, throw an interrupted exception.
260: * Used before anything that should not be attempted by a 'zombie' thread
261: * that the Frontier/Crawl has given up on.
262: *
263: * Otherwise, if the controller's memoryGate has been closed,
264: * hold until it is opened. (Provides a better chance of
265: * being able to complete some tasks after an OutOfMemoryError.)
266: *
267: * @throws InterruptedException
268: */
269: private void continueCheck() throws InterruptedException {
270: if (Thread.interrupted()) {
271: throw new InterruptedException("die request detected");
272: }
273: controller.acquireContinuePermission();
274: }
275:
276: /**
277: * Pass the CrawlURI to all appropriate processors
278: *
279: * @throws InterruptedException
280: */
281: private void processCrawlUri() throws InterruptedException {
282: currentCuri.setThreadNumber(this .serialNumber);
283: currentCuri.setNextProcessorChain(controller
284: .getFirstProcessorChain());
285: lastStartTime = System.currentTimeMillis();
286: // System.out.println(currentCuri);
287: try {
288: while (currentCuri.nextProcessorChain() != null) {
289: setStep(STEP_ABOUT_TO_BEGIN_CHAIN);
290: // Starting on a new processor chain.
291: currentCuri.setNextProcessor(currentCuri
292: .nextProcessorChain().getFirstProcessor());
293: currentCuri.setNextProcessorChain(currentCuri
294: .nextProcessorChain().getNextProcessorChain());
295:
296: while (currentCuri.nextProcessor() != null) {
297: setStep(STEP_ABOUT_TO_BEGIN_PROCESSOR);
298: Processor currentProcessor = getProcessor(currentCuri
299: .nextProcessor());
300: currentProcessorName = currentProcessor.getName();
301: continueCheck();
302: // long memBefore = (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory())/1024;
303: currentProcessor.process(currentCuri);
304: // long memAfter = (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory())/1024;
305: // System.out.println((memAfter-memBefore)+"K in "+currentProcessorName);
306: }
307: }
308: setStep(STEP_DONE_WITH_PROCESSORS);
309: currentProcessorName = "";
310: } catch (RuntimeExceptionWrapper e) {
311: // Workaround to get cause from BDB
312: if (e.getCause() == null) {
313: e.initCause(e.getCause());
314: }
315: recoverableProblem(e);
316: } catch (AssertionError ae) {
317: // This risks leaving crawl in fatally inconsistent state,
318: // but is often reasonable for per-Processor assertion problems
319: recoverableProblem(ae);
320: } catch (RuntimeException e) {
321: recoverableProblem(e);
322: } catch (StackOverflowError err) {
323: recoverableProblem(err);
324: } catch (Error err) {
325: // OutOfMemory and any others
326: seriousError(err);
327: }
328: }
329:
330: /**
331: * Handling for exceptions and errors that are possibly recoverable.
332: *
333: * @param e
334: */
335: private void recoverableProblem(Throwable e) {
336: Object previousStep = step;
337: setStep(STEP_HANDLING_RUNTIME_EXCEPTION);
338: e.printStackTrace(System.err);
339: currentCuri.setFetchStatus(S_RUNTIME_EXCEPTION);
340: // store exception temporarily for logging
341: currentCuri.addAnnotation("err=" + e.getClass().getName());
342: currentCuri.putObject(A_RUNTIME_EXCEPTION, e);
343: String message = "Problem " + e
344: + " occured when trying to process '"
345: + currentCuri.toString() + "' at step " + previousStep
346: + " in " + currentProcessorName + "\n";
347: logger.log(Level.SEVERE, message.toString(), e);
348: }
349:
350: private Processor getProcessor(Processor processor) {
351: if (!(processor instanceof InstancePerThread)) {
352: // just use the shared Processor
353: return processor;
354: }
355: // must use local copy of processor
356: Processor localProcessor = (Processor) localProcessors
357: .get(processor.getClass().getName());
358: if (localProcessor == null) {
359: localProcessor = processor.spawn(this .getSerialNumber());
360: localProcessors.put(processor.getClass().getName(),
361: localProcessor);
362: }
363: return localProcessor;
364: }
365:
366: /**
367: * @return Return toe thread serial number.
368: */
369: public int getSerialNumber() {
370: return this .serialNumber;
371: }
372:
373: /**
374: * Used to get current threads HttpRecorder instance.
375: * Implementation of the HttpRecorderMarker interface.
376: * @return Returns instance of HttpRecorder carried by this thread.
377: * @see org.archive.util.HttpRecorderMarker#getHttpRecorder()
378: */
379: public HttpRecorder getHttpRecorder() {
380: return this .httpRecorder;
381: }
382:
383: /** Get the CrawlController acossiated with this thread.
384: *
385: * @return Returns the CrawlController.
386: */
387: public CrawlController getController() {
388: return controller;
389: }
390:
391: /**
392: * Terminates a thread.
393: *
394: * <p> Calling this method will ensure that the current thread will stop
395: * processing as soon as possible (note: this may be never). Meant to
396: * 'short circuit' hung threads.
397: *
398: * <p> Current crawl uri will have its fetch status set accordingly and
399: * will be immediately returned to the frontier.
400: *
401: * <p> As noted before, this does not ensure that the thread will stop
402: * running (ever). But once evoked it will not try and communicate with
403: * other parts of crawler and will terminate as soon as control is
404: * established.
405: */
406: protected void kill() {
407: this .interrupt();
408: synchronized (this ) {
409: if (currentCuri != null) {
410: currentCuri.setFetchStatus(S_PROCESSING_THREAD_KILLED);
411: controller.getFrontier().finished(currentCuri);
412: }
413: }
414: }
415:
416: /**
417: * @return Current step (For debugging/reporting, give abstract step
418: * where this thread is).
419: */
420: public Object getStep() {
421: return step;
422: }
423:
424: /**
425: * Is this thread processing a URI, not paused or waiting for a URI?
426: * @return whether thread is actively processing a URI
427: */
428: public boolean isActive() {
429: // if alive and not waiting in/for frontier.next(), we're 'active'
430: return this .isAlive() && (currentCuri != null);
431: }
432:
433: /**
434: * Request that this thread retire (exit cleanly) at the earliest
435: * opportunity.
436: */
437: public void retire() {
438: shouldRetire = true;
439: }
440:
441: /**
442: * Whether this thread should cleanly retire at the earliest
443: * opportunity.
444: *
445: * @return True if should retire.
446: */
447: public boolean shouldRetire() {
448: return shouldRetire;
449: }
450:
451: //
452: // Reporter implementation
453: //
454:
455: /**
456: * Compiles and returns a report on its status.
457: * @param name Report name.
458: * @param pw Where to print.
459: */
460: public void reportTo(String name, PrintWriter pw) {
461: // name is ignored for now: only one kind of report
462:
463: pw.print("[");
464: pw.println(getName());
465:
466: // Make a local copy of the currentCuri reference in case it gets
467: // nulled while we're using it. We're doing this because
468: // alternative is synchronizing and we don't want to do this --
469: // it causes hang ups as controller waits on a lock for this thread,
470: // something it gets easily enough on old threading model but something
471: // it can wait interminably for on NPTL threading model.
472: // See [ 994946 ] Pause/Terminate ignored on 2.6 kernel 1.5 JVM.
473: CrawlURI c = currentCuri;
474: if (c != null) {
475: pw.print(" ");
476: c.singleLineReportTo(pw);
477: pw.print(" ");
478: pw.print(c.getFetchAttempts());
479: pw.print(" attempts");
480: pw.println();
481: pw.print(" ");
482: pw.print("in processor: ");
483: pw.print(currentProcessorName);
484: } else {
485: pw.print(" -no CrawlURI- ");
486: }
487: pw.println();
488:
489: long now = System.currentTimeMillis();
490: long time = 0;
491:
492: pw.print(" ");
493: if (lastFinishTime > lastStartTime) {
494: // That means we finished something after we last started something
495: // or in other words we are not working on anything.
496: pw.print("WAITING for ");
497: time = now - lastFinishTime;
498: } else if (lastStartTime > 0) {
499: // We are working on something
500: pw.print("ACTIVE for ");
501: time = now - lastStartTime;
502: }
503: pw.print(ArchiveUtils.formatMillisecondsToConventional(time));
504: pw.println();
505:
506: pw.print(" ");
507: pw.print("step: ");
508: pw.print(step);
509: pw.print(" for ");
510: pw.print(ArchiveUtils.formatMillisecondsToConventional(System
511: .currentTimeMillis()
512: - atStepSince));
513: pw.println();
514:
515: StackTraceElement[] ste = this .getStackTrace();
516: for (int i = 0; i < ste.length; i++) {
517: pw.print(" ");
518: pw.print(ste[i].toString());
519: pw.println();
520: }
521: pw.print("]");
522: pw.println();
523:
524: pw.flush();
525: }
526:
527: /**
528: * @param w PrintWriter to write to.
529: */
530: public void singleLineReportTo(PrintWriter w) {
531: w.print("#");
532: w.print(this .serialNumber);
533:
534: // Make a local copy of the currentCuri reference in case it gets
535: // nulled while we're using it. We're doing this because
536: // alternative is synchronizing and we don't want to do this --
537: // it causes hang ups as controller waits on a lock for this thread,
538: // something it gets easily enough on old threading model but something
539: // it can wait interminably for on NPTL threading model.
540: // See [ 994946 ] Pause/Terminate ignored on 2.6 kernel 1.5 JVM.
541: CrawlURI c = currentCuri;
542: if (c != null) {
543: w.print(" ");
544: w.print(currentProcessorName);
545: w.print(" ");
546: w.print(c.toString());
547: w.print(" (");
548: w.print(c.getFetchAttempts());
549: w.print(") ");
550: } else {
551: w.print(" [no CrawlURI] ");
552: }
553:
554: long now = System.currentTimeMillis();
555: long time = 0;
556:
557: if (lastFinishTime > lastStartTime) {
558: // That means we finished something after we last started something
559: // or in other words we are not working on anything.
560: w.print("WAITING for ");
561: time = now - lastFinishTime;
562: } else if (lastStartTime > 0) {
563: // We are working on something
564: w.print("ACTIVE for ");
565: time = now - lastStartTime;
566: }
567: w.print(ArchiveUtils.formatMillisecondsToConventional(time));
568: w.print(" at ");
569: w.print(step);
570: w.print(" for ");
571: w.print(ArchiveUtils.formatMillisecondsToConventional(now
572: - atStepSince));
573: w.print("\n");
574: w.flush();
575: }
576:
577: /* (non-Javadoc)
578: * @see org.archive.util.Reporter#singleLineLegend()
579: */
580: public String singleLineLegend() {
581: return "#serialNumber processorName currentUri (fetchAttempts) threadState threadStep";
582: }
583:
584: /* (non-Javadoc)
585: * @see org.archive.util.Reporter#getReports()
586: */
587: public String[] getReports() {
588: // for now none but the default
589: return new String[] {};
590: }
591:
592: public void reportTo(PrintWriter writer) {
593: reportTo(null, writer);
594: }
595:
596: /* (non-Javadoc)
597: * @see org.archive.util.Reporter#singleLineReport()
598: */
599: public String singleLineReport() {
600: return ArchiveUtils.singleLineReport(this );
601: }
602:
603: public void progressStatisticsLine(PrintWriter writer) {
604: writer.print(getController().getStatistics()
605: .getProgressStatisticsLine());
606: writer.print("\n");
607: }
608:
609: public void progressStatisticsLegend(PrintWriter writer) {
610: writer.print(getController().getStatistics()
611: .progressStatisticsLegend());
612: writer.print("\n");
613: }
614:
615: public String getCurrentProcessorName() {
616: return currentProcessorName;
617: }
618: }
|