Source Code Cross Referenced for WorkQueue.java in  » Web-Crawler » heritrix » org » archive » crawler » frontier » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Web Crawler » heritrix » org.archive.crawler.frontier 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.archive.crawler.frontier;
002:
003:        import java.io.IOException;
004:        import java.io.PrintWriter;
005:        import java.io.Serializable;
006:        import java.util.logging.Level;
007:        import java.util.logging.Logger;
008:
009:        import org.archive.crawler.datamodel.CrawlSubstats;
010:        import org.archive.crawler.datamodel.CrawlURI;
011:        import org.archive.crawler.framework.Frontier;
012:        import org.archive.util.ArchiveUtils;
013:        import org.archive.util.Reporter;
014:
015:        /**
016:         * A single queue of related URIs to visit, grouped by a classKey
017:         * (typically "hostname:port" or similar) 
018:         * 
019:         * @author gojomo
020:         * @author Christian Kohlschuetter 
021:         */
022:        public abstract class WorkQueue implements  Frontier.FrontierGroup,
023:                Comparable, Serializable, Reporter {
024:            private static final Logger logger = Logger
025:                    .getLogger(WorkQueue.class.getName());
026:
027:            /** The classKey */
028:            protected final String classKey;
029:
030:            private boolean active = true;
031:
032:            /** Total number of stored items */
033:            private long count = 0;
034:
035:            /** Total number of items ever enqueued */
036:            private long enqueueCount = 0;
037:
038:            /** Whether queue is already in lifecycle stage */
039:            private boolean isHeld = false;
040:
041:            /** Time to wake, if snoozed */
042:            private long wakeTime = 0;
043:
044:            /** Running 'budget' indicating whether queue should stay active */
045:            private int sessionBalance = 0;
046:
047:            /** Cost of the last item to be charged against queue */
048:            private int lastCost = 0;
049:
050:            /** Total number of items charged against queue; with totalExpenditure
051:             * can be used to calculate 'average cost'. */
052:            private long costCount = 0;
053:
054:            /** Running tally of total expenditures on this queue */
055:            private long totalExpenditure = 0;
056:
057:            /** Total to spend on this queue over its lifetime */
058:            private long totalBudget = 0;
059:
060:            /** The next item to be returned */
061:            private CrawlURI peekItem = null;
062:
063:            /** Last URI enqueued */
064:            private String lastQueued;
065:
066:            /** Last URI peeked */
067:            private String lastPeeked;
068:
069:            /** time of last dequeue (disposition of some URI) **/
070:            private long lastDequeueTime;
071:
072:            /** count of errors encountered */
073:            private long errorCount = 0;
074:
075:            /** Substats for all CrawlURIs in this group */
076:            protected CrawlSubstats substats = new CrawlSubstats();
077:
078:            private boolean retired;
079:
080:            public WorkQueue(final String pClassKey) {
081:                this .classKey = pClassKey;
082:            }
083:
084:            /**
085:             * Delete URIs matching the given pattern from this queue. 
086:             * @param frontier
087:             * @param match
088:             * @return count of deleted URIs
089:             */
090:            public long deleteMatching(final WorkQueueFrontier frontier,
091:                    String match) {
092:                try {
093:                    final long deleteCount = deleteMatchingFromQueue(frontier,
094:                            match);
095:                    this .count -= deleteCount;
096:                    return deleteCount;
097:                } catch (IOException e) {
098:                    //FIXME better exception handling
099:                    e.printStackTrace();
100:                    throw new RuntimeException(e);
101:                }
102:            }
103:
104:            /**
105:             * Add the given CrawlURI, noting its addition in running count. (It
106:             * should not already be present.)
107:             * 
108:             * @param frontier Work queues manager.
109:             * @param curi CrawlURI to insert.
110:             */
111:            public synchronized void enqueue(final WorkQueueFrontier frontier,
112:                    CrawlURI curi) {
113:                try {
114:                    insert(frontier, curi, false);
115:                } catch (IOException e) {
116:                    //FIXME better exception handling
117:                    e.printStackTrace();
118:                    throw new RuntimeException(e);
119:                }
120:                count++;
121:                enqueueCount++;
122:            }
123:
124:            /**
125:             * Return the topmost queue item -- and remember it,
126:             * such that even later higher-priority inserts don't
127:             * change it. 
128:             * 
129:             * TODO: evaluate if this is really necessary
130:             * @param frontier Work queues manager
131:             * 
132:             * @return topmost queue item, or null
133:             */
134:            public CrawlURI peek(final WorkQueueFrontier frontier) {
135:                if (peekItem == null && count > 0) {
136:                    try {
137:                        peekItem = peekItem(frontier);
138:                    } catch (IOException e) {
139:                        //FIXME better exception handling
140:                        logger.log(Level.SEVERE, "peek failure", e);
141:                        e.printStackTrace();
142:                        // throw new RuntimeException(e);
143:                    }
144:                    if (peekItem != null) {
145:                        lastPeeked = peekItem.toString();
146:                    }
147:                }
148:                return peekItem;
149:            }
150:
151:            /**
152:             * Remove the peekItem from the queue and adjusts the count.
153:             * 
154:             * @param frontier  Work queues manager.
155:             */
156:            public synchronized void dequeue(final WorkQueueFrontier frontier) {
157:                try {
158:                    deleteItem(frontier, peekItem);
159:                } catch (IOException e) {
160:                    //FIXME better exception handling
161:                    e.printStackTrace();
162:                    throw new RuntimeException(e);
163:                }
164:                unpeek();
165:                count--;
166:                lastDequeueTime = System.currentTimeMillis();
167:            }
168:
169:            /**
170:             * Set the session 'activity budget balance' to the given value
171:             * 
172:             * @param balance to use
173:             */
174:            public void setSessionBalance(int balance) {
175:                this .sessionBalance = balance;
176:            }
177:
178:            /**
179:             * Return current session 'activity budget balance' 
180:             * 
181:             * @return session balance
182:             */
183:            public int getSessionBalance() {
184:                return this .sessionBalance;
185:            }
186:
187:            /**
188:             * Set the total expenditure level allowable before queue is 
189:             * considered inherently 'over-budget'. 
190:             * 
191:             * @param budget
192:             */
193:            public void setTotalBudget(long budget) {
194:                this .totalBudget = budget;
195:            }
196:
197:            /**
198:             * Check whether queue has temporarily or permanently exceeded
199:             * its budget. 
200:             * 
201:             * @return true if queue is over its set budget(s)
202:             */
203:            public boolean isOverBudget() {
204:                // check whether running balance is depleted 
205:                // or totalExpenditure exceeds totalBudget
206:                return this .sessionBalance <= 0
207:                        || (this .totalBudget >= 0 && this .totalExpenditure > this .totalBudget);
208:            }
209:
210:            /**
211:             * Return the tally of all expenditures on this queue
212:             * 
213:             * @return total amount expended on this queue
214:             */
215:            public long getTotalExpenditure() {
216:                return totalExpenditure;
217:            }
218:
219:            /**
220:             * Increase the internal running budget to be used before 
221:             * deactivating the queue
222:             * 
223:             * @param amount amount to increment
224:             * @return updated budget value
225:             */
226:            public int incrementSessionBalance(int amount) {
227:                this .sessionBalance = this .sessionBalance + amount;
228:                return this .sessionBalance;
229:            }
230:
231:            /**
232:             * Decrease the internal running budget by the given amount. 
233:             * @param amount tp decrement
234:             * @return updated budget value
235:             */
236:            public int expend(int amount) {
237:                this .sessionBalance = this .sessionBalance - amount;
238:                this .totalExpenditure = this .totalExpenditure + amount;
239:                this .lastCost = amount;
240:                this .costCount++;
241:                return this .sessionBalance;
242:            }
243:
244:            /**
245:             * A URI should not have been charged against queue (eg
246:             * it was disregarded); return the amount expended 
247:             * @param amount to return
248:             * @return updated budget value
249:             */
250:            public int refund(int amount) {
251:                this .sessionBalance = this .sessionBalance + amount;
252:                this .totalExpenditure = this .totalExpenditure - amount;
253:                this .costCount--;
254:                return this .sessionBalance;
255:            }
256:
257:            /**
258:             * Note an error and assess an extra penalty. 
259:             * @param penalty additional amount to deduct
260:             */
261:            public void noteError(int penalty) {
262:                this .sessionBalance = this .sessionBalance - penalty;
263:                this .totalExpenditure = this .totalExpenditure + penalty;
264:                errorCount++;
265:            }
266:
267:            /**
268:             * @param l
269:             */
270:            public void setWakeTime(long l) {
271:                wakeTime = l;
272:            }
273:
274:            /**
275:             * @return wakeTime
276:             */
277:            public long getWakeTime() {
278:                return wakeTime;
279:            }
280:
281:            /**
282:             * @return classKey, the 'identifier', for this queue.
283:             */
284:            public String getClassKey() {
285:                return this .classKey;
286:            }
287:
288:            /**
289:             * Clear isHeld to false
290:             */
291:            public void clearHeld() {
292:                isHeld = false;
293:            }
294:
295:            /**
296:             * Whether the queue is already in a lifecycle stage --
297:             * such as ready, in-progress, snoozed -- and thus should
298:             * not be redundantly inserted to readyClassQueues
299:             * 
300:             * @return isHeld
301:             */
302:            public boolean isHeld() {
303:                return isHeld;
304:            }
305:
306:            /**
307:             * Set isHeld to true
308:             */
309:            public void setHeld() {
310:                isHeld = true;
311:            }
312:
313:            /**
314:             * Forgive the peek, allowing a subsequent peek to 
315:             * return a different item. 
316:             * 
317:             */
318:            public void unpeek() {
319:                peekItem = null;
320:            }
321:
322:            public final int compareTo(Object obj) {
323:                if (this  == obj) {
324:                    return 0; // for exact identity only
325:                }
326:                WorkQueue other = (WorkQueue) obj;
327:                if (getWakeTime() > other.getWakeTime()) {
328:                    return 1;
329:                }
330:                if (getWakeTime() < other.getWakeTime()) {
331:                    return -1;
332:                }
333:                // at this point, the ordering is arbitrary, but still
334:                // must be consistent/stable over time
335:                return this .classKey.compareTo(other.getClassKey());
336:            }
337:
338:            /**
339:             * Update the given CrawlURI, which should already be present. (This
340:             * is not checked.) Equivalent to an enqueue without affecting the count.
341:             * 
342:             * @param frontier Work queues manager.
343:             * @param curi CrawlURI to update.
344:             */
345:            public void update(final WorkQueueFrontier frontier, CrawlURI curi) {
346:                try {
347:                    insert(frontier, curi, true);
348:                } catch (IOException e) {
349:                    //FIXME better exception handling
350:                    e.printStackTrace();
351:                    throw new RuntimeException(e);
352:                }
353:            }
354:
355:            /**
356:             * @return Returns the count.
357:             */
358:            public synchronized long getCount() {
359:                return this .count;
360:            }
361:
362:            /**
363:             * Insert the given curi, whether it is already present or not. 
364:             * @param frontier WorkQueueFrontier.
365:             * @param curi CrawlURI to insert.
366:             * @throws IOException
367:             */
368:            private void insert(final WorkQueueFrontier frontier,
369:                    CrawlURI curi, boolean overwriteIfPresent)
370:                    throws IOException {
371:                insertItem(frontier, curi, overwriteIfPresent);
372:                lastQueued = curi.toString();
373:            }
374:
375:            /**
376:             * Insert the given curi, whether it is already present or not.
377:             * Hook for subclasses. 
378:             * 
379:             * @param frontier WorkQueueFrontier.
380:             * @param curi CrawlURI to insert.
381:             * @throws IOException  if there was a problem while inserting the item
382:             */
383:            protected abstract void insertItem(
384:                    final WorkQueueFrontier frontier, CrawlURI curi,
385:                    boolean expectedPresent) throws IOException;
386:
387:            /**
388:             * Delete URIs matching the given pattern from this queue. 
389:             * @param frontier WorkQueues manager.
390:             * @param match  the pattern to match
391:             * @return count of deleted URIs
392:             * @throws IOException  if there was a problem while deleting
393:             */
394:            protected abstract long deleteMatchingFromQueue(
395:                    final WorkQueueFrontier frontier, final String match)
396:                    throws IOException;
397:
398:            /**
399:             * Removes the given item from the queue.
400:             * 
401:             * This is only used to remove the first item in the queue,
402:             * so it is not necessary to implement a random-access queue.
403:             * 
404:             * @param frontier  Work queues manager.
405:             * @throws IOException  if there was a problem while deleting the item
406:             */
407:            protected abstract void deleteItem(
408:                    final WorkQueueFrontier frontier, final CrawlURI item)
409:                    throws IOException;
410:
411:            /**
412:             * Returns first item from queue (does not delete)
413:             * 
414:             * @return The peeked item, or null
415:             * @throws IOException  if there was a problem while peeking
416:             */
417:            protected abstract CrawlURI peekItem(
418:                    final WorkQueueFrontier frontier) throws IOException;
419:
420:            /**
421:             * Suspends this WorkQueue. Closes all connections to resources etc.
422:             * 
423:             * @param frontier
424:             * @throws IOException
425:             */
426:            protected void suspend(final WorkQueueFrontier frontier)
427:                    throws IOException {
428:            }
429:
430:            /**
431:             * Resumes this WorkQueue. Eventually opens connections to resources etc.
432:             * 
433:             * @param frontier
434:             * @throws IOException
435:             */
436:            protected void resume(final WorkQueueFrontier frontier)
437:                    throws IOException {
438:            }
439:
440:            public void setActive(final WorkQueueFrontier frontier,
441:                    final boolean b) {
442:                if (active != b) {
443:                    active = b;
444:                    try {
445:                        if (active) {
446:                            resume(frontier);
447:                        } else {
448:                            suspend(frontier);
449:                        }
450:                    } catch (IOException e) {
451:                        //FIXME better exception handling
452:                        e.printStackTrace();
453:                        throw new RuntimeException(e);
454:                    }
455:                }
456:            }
457:
458:            // 
459:            // Reporter
460:            //
461:
462:            /* (non-Javadoc)
463:             * @see org.archive.util.Reporter#getReports()
464:             */
465:            public String[] getReports() {
466:                return new String[] {};
467:            }
468:
469:            /* (non-Javadoc)
470:             * @see org.archive.util.Reporter#reportTo(java.io.Writer)
471:             */
472:            public void reportTo(PrintWriter writer) {
473:                reportTo(null, writer);
474:            }
475:
476:            /* (non-Javadoc)
477:             * @see org.archive.util.Reporter#singleLineReportTo(java.io.Writer)
478:             */
479:            public void singleLineReportTo(PrintWriter writer) {
480:                // queue name
481:                writer.print(classKey);
482:                writer.print(" ");
483:                // count of items
484:                writer.print(Long.toString(count));
485:                writer.print(" ");
486:                // enqueue count
487:                writer.print(Long.toString(enqueueCount));
488:                writer.print(" ");
489:                writer.print(sessionBalance);
490:                writer.print(" ");
491:                writer.print(lastCost);
492:                writer.print("(");
493:                writer.print(ArchiveUtils.doubleToString(
494:                        ((double) totalExpenditure / costCount), 1));
495:                writer.print(")");
496:                writer.print(" ");
497:                // last dequeue time, if any, or '-'
498:                if (lastDequeueTime != 0) {
499:                    writer.print(ArchiveUtils.getLog17Date(lastDequeueTime));
500:                } else {
501:                    writer.print("-");
502:                }
503:                writer.print(" ");
504:                // wake time if snoozed, or '-'
505:                if (wakeTime != 0) {
506:                    writer.print(ArchiveUtils
507:                            .formatMillisecondsToConventional(wakeTime
508:                                    - System.currentTimeMillis()));
509:                } else {
510:                    writer.print("-");
511:                }
512:                writer.print(" ");
513:                writer.print(Long.toString(totalExpenditure));
514:                writer.print("/");
515:                writer.print(Long.toString(totalBudget));
516:                writer.print(" ");
517:                writer.print(Long.toString(errorCount));
518:                writer.print(" ");
519:                writer.print(lastPeeked);
520:                writer.print(" ");
521:                writer.print(lastQueued);
522:                writer.print("\n");
523:            }
524:
525:            /* (non-Javadoc)
526:             * @see org.archive.util.Reporter#singleLineLegend()
527:             */
528:            public String singleLineLegend() {
529:                return "queue currentSize totalEnqueues sessionBalance lastCost "
530:                        + "(averageCost) lastDequeueTime wakeTime "
531:                        + "totalSpend/totalBudget errorCount lastPeekUri lastQueuedUri";
532:            }
533:
534:            /* (non-Javadoc)
535:             * @see org.archive.util.Reporter#singleLineReport()
536:             */
537:            public String singleLineReport() {
538:                return ArchiveUtils.singleLineReport(this );
539:            }
540:
541:            /**
542:             * @param writer
543:             * @throws IOException
544:             */
545:            public void reportTo(String name, PrintWriter writer) {
546:                // name is ignored: only one kind of report for now
547:                writer.print("Queue ");
548:                writer.print(classKey);
549:                writer.print("\n");
550:                writer.print("  ");
551:                writer.print(Long.toString(count));
552:                writer.print(" items");
553:                if (wakeTime != 0) {
554:                    writer.print("\n   wakes in: "
555:                            + ArchiveUtils
556:                                    .formatMillisecondsToConventional(wakeTime
557:                                            - System.currentTimeMillis()));
558:                }
559:                writer.print("\n    last enqueued: ");
560:                writer.print(lastQueued);
561:                writer.print("\n      last peeked: ");
562:                writer.print(lastPeeked);
563:                writer.print("\n");
564:                writer.print("   total expended: ");
565:                writer.print(Long.toString(totalExpenditure));
566:                writer.print(" (total budget: ");
567:                writer.print(Long.toString(totalBudget));
568:                writer.print(")\n");
569:                writer.print("   active balance: ");
570:                writer.print(sessionBalance);
571:                writer.print("\n   last(avg) cost: ");
572:                writer.print(lastCost);
573:                writer.print("(");
574:                writer.print(ArchiveUtils.doubleToString(
575:                        ((double) totalExpenditure / costCount), 1));
576:                writer.print(")\n\n");
577:            }
578:
579:            public CrawlSubstats getSubstats() {
580:                return substats;
581:            }
582:
583:            /**
584:             * Set the retired status of this queue.
585:             * 
586:             * @param b new value for retired status
587:             */
588:            public void setRetired(boolean b) {
589:                this .retired = b;
590:            }
591:
592:            public boolean isRetired() {
593:                return retired;
594:            }
595:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.