001: package org.archive.crawler.frontier;
002:
003: import java.io.IOException;
004: import java.io.PrintWriter;
005: import java.io.Serializable;
006: import java.util.logging.Level;
007: import java.util.logging.Logger;
008:
009: import org.archive.crawler.datamodel.CrawlSubstats;
010: import org.archive.crawler.datamodel.CrawlURI;
011: import org.archive.crawler.framework.Frontier;
012: import org.archive.util.ArchiveUtils;
013: import org.archive.util.Reporter;
014:
015: /**
016: * A single queue of related URIs to visit, grouped by a classKey
017: * (typically "hostname:port" or similar)
018: *
019: * @author gojomo
020: * @author Christian Kohlschuetter
021: */
022: public abstract class WorkQueue implements Frontier.FrontierGroup,
023: Comparable, Serializable, Reporter {
024: private static final Logger logger = Logger
025: .getLogger(WorkQueue.class.getName());
026:
027: /** The classKey */
028: protected final String classKey;
029:
030: private boolean active = true;
031:
032: /** Total number of stored items */
033: private long count = 0;
034:
035: /** Total number of items ever enqueued */
036: private long enqueueCount = 0;
037:
038: /** Whether queue is already in lifecycle stage */
039: private boolean isHeld = false;
040:
041: /** Time to wake, if snoozed */
042: private long wakeTime = 0;
043:
044: /** Running 'budget' indicating whether queue should stay active */
045: private int sessionBalance = 0;
046:
047: /** Cost of the last item to be charged against queue */
048: private int lastCost = 0;
049:
050: /** Total number of items charged against queue; with totalExpenditure
051: * can be used to calculate 'average cost'. */
052: private long costCount = 0;
053:
054: /** Running tally of total expenditures on this queue */
055: private long totalExpenditure = 0;
056:
057: /** Total to spend on this queue over its lifetime */
058: private long totalBudget = 0;
059:
060: /** The next item to be returned */
061: private CrawlURI peekItem = null;
062:
063: /** Last URI enqueued */
064: private String lastQueued;
065:
066: /** Last URI peeked */
067: private String lastPeeked;
068:
069: /** time of last dequeue (disposition of some URI) **/
070: private long lastDequeueTime;
071:
072: /** count of errors encountered */
073: private long errorCount = 0;
074:
075: /** Substats for all CrawlURIs in this group */
076: protected CrawlSubstats substats = new CrawlSubstats();
077:
078: private boolean retired;
079:
080: public WorkQueue(final String pClassKey) {
081: this .classKey = pClassKey;
082: }
083:
084: /**
085: * Delete URIs matching the given pattern from this queue.
086: * @param frontier
087: * @param match
088: * @return count of deleted URIs
089: */
090: public long deleteMatching(final WorkQueueFrontier frontier,
091: String match) {
092: try {
093: final long deleteCount = deleteMatchingFromQueue(frontier,
094: match);
095: this .count -= deleteCount;
096: return deleteCount;
097: } catch (IOException e) {
098: //FIXME better exception handling
099: e.printStackTrace();
100: throw new RuntimeException(e);
101: }
102: }
103:
104: /**
105: * Add the given CrawlURI, noting its addition in running count. (It
106: * should not already be present.)
107: *
108: * @param frontier Work queues manager.
109: * @param curi CrawlURI to insert.
110: */
111: public synchronized void enqueue(final WorkQueueFrontier frontier,
112: CrawlURI curi) {
113: try {
114: insert(frontier, curi, false);
115: } catch (IOException e) {
116: //FIXME better exception handling
117: e.printStackTrace();
118: throw new RuntimeException(e);
119: }
120: count++;
121: enqueueCount++;
122: }
123:
124: /**
125: * Return the topmost queue item -- and remember it,
126: * such that even later higher-priority inserts don't
127: * change it.
128: *
129: * TODO: evaluate if this is really necessary
130: * @param frontier Work queues manager
131: *
132: * @return topmost queue item, or null
133: */
134: public CrawlURI peek(final WorkQueueFrontier frontier) {
135: if (peekItem == null && count > 0) {
136: try {
137: peekItem = peekItem(frontier);
138: } catch (IOException e) {
139: //FIXME better exception handling
140: logger.log(Level.SEVERE, "peek failure", e);
141: e.printStackTrace();
142: // throw new RuntimeException(e);
143: }
144: if (peekItem != null) {
145: lastPeeked = peekItem.toString();
146: }
147: }
148: return peekItem;
149: }
150:
151: /**
152: * Remove the peekItem from the queue and adjusts the count.
153: *
154: * @param frontier Work queues manager.
155: */
156: public synchronized void dequeue(final WorkQueueFrontier frontier) {
157: try {
158: deleteItem(frontier, peekItem);
159: } catch (IOException e) {
160: //FIXME better exception handling
161: e.printStackTrace();
162: throw new RuntimeException(e);
163: }
164: unpeek();
165: count--;
166: lastDequeueTime = System.currentTimeMillis();
167: }
168:
169: /**
170: * Set the session 'activity budget balance' to the given value
171: *
172: * @param balance to use
173: */
174: public void setSessionBalance(int balance) {
175: this .sessionBalance = balance;
176: }
177:
178: /**
179: * Return current session 'activity budget balance'
180: *
181: * @return session balance
182: */
183: public int getSessionBalance() {
184: return this .sessionBalance;
185: }
186:
187: /**
188: * Set the total expenditure level allowable before queue is
189: * considered inherently 'over-budget'.
190: *
191: * @param budget
192: */
193: public void setTotalBudget(long budget) {
194: this .totalBudget = budget;
195: }
196:
197: /**
198: * Check whether queue has temporarily or permanently exceeded
199: * its budget.
200: *
201: * @return true if queue is over its set budget(s)
202: */
203: public boolean isOverBudget() {
204: // check whether running balance is depleted
205: // or totalExpenditure exceeds totalBudget
206: return this .sessionBalance <= 0
207: || (this .totalBudget >= 0 && this .totalExpenditure > this .totalBudget);
208: }
209:
210: /**
211: * Return the tally of all expenditures on this queue
212: *
213: * @return total amount expended on this queue
214: */
215: public long getTotalExpenditure() {
216: return totalExpenditure;
217: }
218:
219: /**
220: * Increase the internal running budget to be used before
221: * deactivating the queue
222: *
223: * @param amount amount to increment
224: * @return updated budget value
225: */
226: public int incrementSessionBalance(int amount) {
227: this .sessionBalance = this .sessionBalance + amount;
228: return this .sessionBalance;
229: }
230:
231: /**
232: * Decrease the internal running budget by the given amount.
233: * @param amount tp decrement
234: * @return updated budget value
235: */
236: public int expend(int amount) {
237: this .sessionBalance = this .sessionBalance - amount;
238: this .totalExpenditure = this .totalExpenditure + amount;
239: this .lastCost = amount;
240: this .costCount++;
241: return this .sessionBalance;
242: }
243:
244: /**
245: * A URI should not have been charged against queue (eg
246: * it was disregarded); return the amount expended
247: * @param amount to return
248: * @return updated budget value
249: */
250: public int refund(int amount) {
251: this .sessionBalance = this .sessionBalance + amount;
252: this .totalExpenditure = this .totalExpenditure - amount;
253: this .costCount--;
254: return this .sessionBalance;
255: }
256:
257: /**
258: * Note an error and assess an extra penalty.
259: * @param penalty additional amount to deduct
260: */
261: public void noteError(int penalty) {
262: this .sessionBalance = this .sessionBalance - penalty;
263: this .totalExpenditure = this .totalExpenditure + penalty;
264: errorCount++;
265: }
266:
267: /**
268: * @param l
269: */
270: public void setWakeTime(long l) {
271: wakeTime = l;
272: }
273:
274: /**
275: * @return wakeTime
276: */
277: public long getWakeTime() {
278: return wakeTime;
279: }
280:
281: /**
282: * @return classKey, the 'identifier', for this queue.
283: */
284: public String getClassKey() {
285: return this .classKey;
286: }
287:
288: /**
289: * Clear isHeld to false
290: */
291: public void clearHeld() {
292: isHeld = false;
293: }
294:
295: /**
296: * Whether the queue is already in a lifecycle stage --
297: * such as ready, in-progress, snoozed -- and thus should
298: * not be redundantly inserted to readyClassQueues
299: *
300: * @return isHeld
301: */
302: public boolean isHeld() {
303: return isHeld;
304: }
305:
306: /**
307: * Set isHeld to true
308: */
309: public void setHeld() {
310: isHeld = true;
311: }
312:
313: /**
314: * Forgive the peek, allowing a subsequent peek to
315: * return a different item.
316: *
317: */
318: public void unpeek() {
319: peekItem = null;
320: }
321:
322: public final int compareTo(Object obj) {
323: if (this == obj) {
324: return 0; // for exact identity only
325: }
326: WorkQueue other = (WorkQueue) obj;
327: if (getWakeTime() > other.getWakeTime()) {
328: return 1;
329: }
330: if (getWakeTime() < other.getWakeTime()) {
331: return -1;
332: }
333: // at this point, the ordering is arbitrary, but still
334: // must be consistent/stable over time
335: return this .classKey.compareTo(other.getClassKey());
336: }
337:
338: /**
339: * Update the given CrawlURI, which should already be present. (This
340: * is not checked.) Equivalent to an enqueue without affecting the count.
341: *
342: * @param frontier Work queues manager.
343: * @param curi CrawlURI to update.
344: */
345: public void update(final WorkQueueFrontier frontier, CrawlURI curi) {
346: try {
347: insert(frontier, curi, true);
348: } catch (IOException e) {
349: //FIXME better exception handling
350: e.printStackTrace();
351: throw new RuntimeException(e);
352: }
353: }
354:
355: /**
356: * @return Returns the count.
357: */
358: public synchronized long getCount() {
359: return this .count;
360: }
361:
362: /**
363: * Insert the given curi, whether it is already present or not.
364: * @param frontier WorkQueueFrontier.
365: * @param curi CrawlURI to insert.
366: * @throws IOException
367: */
368: private void insert(final WorkQueueFrontier frontier,
369: CrawlURI curi, boolean overwriteIfPresent)
370: throws IOException {
371: insertItem(frontier, curi, overwriteIfPresent);
372: lastQueued = curi.toString();
373: }
374:
375: /**
376: * Insert the given curi, whether it is already present or not.
377: * Hook for subclasses.
378: *
379: * @param frontier WorkQueueFrontier.
380: * @param curi CrawlURI to insert.
381: * @throws IOException if there was a problem while inserting the item
382: */
383: protected abstract void insertItem(
384: final WorkQueueFrontier frontier, CrawlURI curi,
385: boolean expectedPresent) throws IOException;
386:
387: /**
388: * Delete URIs matching the given pattern from this queue.
389: * @param frontier WorkQueues manager.
390: * @param match the pattern to match
391: * @return count of deleted URIs
392: * @throws IOException if there was a problem while deleting
393: */
394: protected abstract long deleteMatchingFromQueue(
395: final WorkQueueFrontier frontier, final String match)
396: throws IOException;
397:
398: /**
399: * Removes the given item from the queue.
400: *
401: * This is only used to remove the first item in the queue,
402: * so it is not necessary to implement a random-access queue.
403: *
404: * @param frontier Work queues manager.
405: * @throws IOException if there was a problem while deleting the item
406: */
407: protected abstract void deleteItem(
408: final WorkQueueFrontier frontier, final CrawlURI item)
409: throws IOException;
410:
411: /**
412: * Returns first item from queue (does not delete)
413: *
414: * @return The peeked item, or null
415: * @throws IOException if there was a problem while peeking
416: */
417: protected abstract CrawlURI peekItem(
418: final WorkQueueFrontier frontier) throws IOException;
419:
420: /**
421: * Suspends this WorkQueue. Closes all connections to resources etc.
422: *
423: * @param frontier
424: * @throws IOException
425: */
426: protected void suspend(final WorkQueueFrontier frontier)
427: throws IOException {
428: }
429:
430: /**
431: * Resumes this WorkQueue. Eventually opens connections to resources etc.
432: *
433: * @param frontier
434: * @throws IOException
435: */
436: protected void resume(final WorkQueueFrontier frontier)
437: throws IOException {
438: }
439:
440: public void setActive(final WorkQueueFrontier frontier,
441: final boolean b) {
442: if (active != b) {
443: active = b;
444: try {
445: if (active) {
446: resume(frontier);
447: } else {
448: suspend(frontier);
449: }
450: } catch (IOException e) {
451: //FIXME better exception handling
452: e.printStackTrace();
453: throw new RuntimeException(e);
454: }
455: }
456: }
457:
458: //
459: // Reporter
460: //
461:
462: /* (non-Javadoc)
463: * @see org.archive.util.Reporter#getReports()
464: */
465: public String[] getReports() {
466: return new String[] {};
467: }
468:
469: /* (non-Javadoc)
470: * @see org.archive.util.Reporter#reportTo(java.io.Writer)
471: */
472: public void reportTo(PrintWriter writer) {
473: reportTo(null, writer);
474: }
475:
476: /* (non-Javadoc)
477: * @see org.archive.util.Reporter#singleLineReportTo(java.io.Writer)
478: */
479: public void singleLineReportTo(PrintWriter writer) {
480: // queue name
481: writer.print(classKey);
482: writer.print(" ");
483: // count of items
484: writer.print(Long.toString(count));
485: writer.print(" ");
486: // enqueue count
487: writer.print(Long.toString(enqueueCount));
488: writer.print(" ");
489: writer.print(sessionBalance);
490: writer.print(" ");
491: writer.print(lastCost);
492: writer.print("(");
493: writer.print(ArchiveUtils.doubleToString(
494: ((double) totalExpenditure / costCount), 1));
495: writer.print(")");
496: writer.print(" ");
497: // last dequeue time, if any, or '-'
498: if (lastDequeueTime != 0) {
499: writer.print(ArchiveUtils.getLog17Date(lastDequeueTime));
500: } else {
501: writer.print("-");
502: }
503: writer.print(" ");
504: // wake time if snoozed, or '-'
505: if (wakeTime != 0) {
506: writer.print(ArchiveUtils
507: .formatMillisecondsToConventional(wakeTime
508: - System.currentTimeMillis()));
509: } else {
510: writer.print("-");
511: }
512: writer.print(" ");
513: writer.print(Long.toString(totalExpenditure));
514: writer.print("/");
515: writer.print(Long.toString(totalBudget));
516: writer.print(" ");
517: writer.print(Long.toString(errorCount));
518: writer.print(" ");
519: writer.print(lastPeeked);
520: writer.print(" ");
521: writer.print(lastQueued);
522: writer.print("\n");
523: }
524:
525: /* (non-Javadoc)
526: * @see org.archive.util.Reporter#singleLineLegend()
527: */
528: public String singleLineLegend() {
529: return "queue currentSize totalEnqueues sessionBalance lastCost "
530: + "(averageCost) lastDequeueTime wakeTime "
531: + "totalSpend/totalBudget errorCount lastPeekUri lastQueuedUri";
532: }
533:
534: /* (non-Javadoc)
535: * @see org.archive.util.Reporter#singleLineReport()
536: */
537: public String singleLineReport() {
538: return ArchiveUtils.singleLineReport(this );
539: }
540:
541: /**
542: * @param writer
543: * @throws IOException
544: */
545: public void reportTo(String name, PrintWriter writer) {
546: // name is ignored: only one kind of report for now
547: writer.print("Queue ");
548: writer.print(classKey);
549: writer.print("\n");
550: writer.print(" ");
551: writer.print(Long.toString(count));
552: writer.print(" items");
553: if (wakeTime != 0) {
554: writer.print("\n wakes in: "
555: + ArchiveUtils
556: .formatMillisecondsToConventional(wakeTime
557: - System.currentTimeMillis()));
558: }
559: writer.print("\n last enqueued: ");
560: writer.print(lastQueued);
561: writer.print("\n last peeked: ");
562: writer.print(lastPeeked);
563: writer.print("\n");
564: writer.print(" total expended: ");
565: writer.print(Long.toString(totalExpenditure));
566: writer.print(" (total budget: ");
567: writer.print(Long.toString(totalBudget));
568: writer.print(")\n");
569: writer.print(" active balance: ");
570: writer.print(sessionBalance);
571: writer.print("\n last(avg) cost: ");
572: writer.print(lastCost);
573: writer.print("(");
574: writer.print(ArchiveUtils.doubleToString(
575: ((double) totalExpenditure / costCount), 1));
576: writer.print(")\n\n");
577: }
578:
579: public CrawlSubstats getSubstats() {
580: return substats;
581: }
582:
583: /**
584: * Set the retired status of this queue.
585: *
586: * @param b new value for retired status
587: */
588: public void setRetired(boolean b) {
589: this .retired = b;
590: }
591:
592: public boolean isRetired() {
593: return retired;
594: }
595: }
|