001: /* ARHostQueueList.java
002: *
003: * Created on Sep 13, 2004
004: *
005: * Copyright (C) 2004 Kristinn Sigur?sson.
006: *
007: * This file is part of the Heritrix web crawler (crawler.archive.org).
008: *
009: * Heritrix is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU Lesser Public License as published by
011: * the Free Software Foundation; either version 2.1 of the License, or
012: * any later version.
013: *
014: * Heritrix is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
017: * GNU Lesser Public License for more details.
018: *
019: * You should have received a copy of the GNU Lesser Public License
020: * along with Heritrix; if not, write to the Free Software
021: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
022: */
023: package org.archive.crawler.frontier;
024:
025: import java.io.IOException;
026: import java.io.PrintWriter;
027: import java.util.HashMap;
028: import java.util.Iterator;
029: import java.util.TreeSet;
030: import java.util.logging.Level;
031: import java.util.logging.Logger;
032:
033: import org.archive.util.ArchiveUtils;
034: import org.archive.util.Reporter;
035:
036: import com.sleepycat.bind.EntryBinding;
037: import com.sleepycat.bind.serial.StoredClassCatalog;
038: import com.sleepycat.bind.tuple.IntegerBinding;
039: import com.sleepycat.bind.tuple.StringBinding;
040: import com.sleepycat.je.Cursor;
041: import com.sleepycat.je.Database;
042: import com.sleepycat.je.DatabaseConfig;
043: import com.sleepycat.je.DatabaseEntry;
044: import com.sleepycat.je.DatabaseException;
045: import com.sleepycat.je.Environment;
046: import com.sleepycat.je.LockMode;
047: import com.sleepycat.je.OperationStatus;
048:
049: /**
050: * Maintains an ordered list of {@link AdaptiveRevisitHostQueue}s used by a
051: * Frontier.
052: * <p>
053: * The list is ordered by the
054: * {@link AdaptiveRevisitHostQueue#getNextReadyTime() AdaptiveRevisitHostQueue.getNextReadyTime()},
055: * smallest value at the top of the list and then on in descending order.
056: * <p>
057: * The list will maintain a list of hostnames in a seperate DB. On creation a
058: * list will try to open the DB at a specified location. If it already exists
059: * the list will create HQs for all the hostnames in the list, discarding
060: * those that turn out to be empty.
061: * <p>
062: * Any BDB DatabaseException will be converted to an IOException by public
063: * methods. This includes preserving the original stacktrace, in favor of the
064: * one created for the IOException, so that the true source of the exception
065: * is not lost.
066: *
067: * @author Kristinn Sigurdsson
068: */
069: public class AdaptiveRevisitQueueList implements Reporter {
070:
071: // TODO: Handle HQs becoming empty.
072:
073: /** The Environment for the BerkleyDB databases in the HQs */
074: private final Environment env;
075: private final StoredClassCatalog catalog;
076: /** Contains host names for all HQs. Name is key, valence is value */
077: private Database hostNamesDB;
078: private EntryBinding keyBinding;
079: private EntryBinding valueBinding;
080:
081: /** A hash table of all the HQs (wrapped), keyed by hostName */
082: private HashMap<String, AdaptiveRevisitHostQueueWrapper> hostQueues;
083: /** Contains the hostQueues (wrapped) sorted by their time of next fetch */
084: private TreeSet<AdaptiveRevisitHostQueueWrapper> sortedHostQueues;
085: /** Logger */
086: private static final Logger logger = Logger
087: .getLogger(AdaptiveRevisitQueueList.class.getName());
088:
089: public AdaptiveRevisitQueueList(Environment env,
090: StoredClassCatalog catalog) throws IOException {
091: Cursor cursor = null;
092: try {
093: this .env = env;
094: this .catalog = catalog;
095: keyBinding = new StringBinding();
096: valueBinding = new IntegerBinding();
097:
098: // Then initialize other data
099: hostQueues = new HashMap<String, AdaptiveRevisitHostQueueWrapper>();
100: sortedHostQueues = new TreeSet<AdaptiveRevisitHostQueueWrapper>();
101:
102: // Open the hostNamesDB
103: DatabaseConfig dbConfig = new DatabaseConfig();
104: dbConfig.setTransactional(false);
105: dbConfig.setAllowCreate(true);
106: hostNamesDB = env.openDatabase(null, "hostNames", dbConfig);
107:
108: // Read any existing hostNames and create relevant HQs
109: cursor = hostNamesDB.openCursor(null, null);
110: DatabaseEntry keyEntry = new DatabaseEntry();
111: DatabaseEntry dataEntry = new DatabaseEntry();
112: OperationStatus opStatus = cursor.getFirst(keyEntry,
113: dataEntry, LockMode.DEFAULT);
114: while (opStatus == OperationStatus.SUCCESS) {
115: // Got one!
116: String hostName = (String) keyBinding
117: .entryToObject(keyEntry);
118: int valence = ((Integer) valueBinding
119: .entryToObject(dataEntry)).intValue();
120: opStatus = cursor.getNext(keyEntry, dataEntry,
121: LockMode.DEFAULT);
122: // Create HQ
123: createHQ(hostName, valence);
124: // TODO: If the hq is empty, then it can be discarded
125: // immediately
126: }
127: } catch (DatabaseException e) {
128: throw convertDbException(e);
129: } finally {
130: if (cursor != null) {
131: try {
132: cursor.close();
133: } catch (DatabaseException e) {
134: throw convertDbException(e);
135: }
136: }
137: }
138: }
139:
140: private IOException convertDbException(Exception e) {
141: IOException e2 = new IOException(e.getMessage());
142: e2.setStackTrace(e.getStackTrace());
143: return e2;
144: }
145:
146: /**
147: * Get an AdaptiveRevisitHostQueue for the specified host.
148: * <p>
149: * If one does not already exist, null is returned
150: *
151: * @param hostName The host's name
152: * @return an AdaptiveRevisitHostQueue for the specified host
153: */
154: public AdaptiveRevisitHostQueue getHQ(String hostName) {
155: AdaptiveRevisitHostQueueWrapper wrapper = ((AdaptiveRevisitHostQueueWrapper) hostQueues
156: .get(hostName));
157: if (wrapper != null) {
158: return wrapper.hq;
159: }
160: return null;
161: }
162:
163: /**
164: * Creates a new AdaptiveRevisitHostQueue.
165: * <p>
166: * If a HQ already existed for the specified hostName, the existing HQ
167: * is returned as it is. It's existing valence will <i>not</i> be updated
168: * to reflect a different valence.
169: *
170: * @param hostName
171: * @param valence number of simultaneous connections allowed to this host
172: * @return the newly created HQ
173: * @throws IOException
174: */
175: public AdaptiveRevisitHostQueue createHQ(String hostName,
176: int valence) throws IOException {
177: AdaptiveRevisitHostQueueWrapper hqw = hostQueues.get(hostName);
178: if (hqw != null) {
179: return hqw.hq;
180: }
181: AdaptiveRevisitHostQueue hq;
182: // Ok, the HQ does not already exist. (Had to make sure)
183: // Create it, save it and return it.
184: hq = new AdaptiveRevisitHostQueue(hostName, env, catalog,
185: valence);
186: hq.setOwner(this );
187:
188: try {
189: DatabaseEntry keyEntry = new DatabaseEntry();
190: DatabaseEntry dataEntry = new DatabaseEntry();
191: keyBinding.objectToEntry(hostName, keyEntry);
192: valueBinding.objectToEntry(new Integer(valence), dataEntry);
193: hostNamesDB.put(null, keyEntry, dataEntry);
194: AdaptiveRevisitHostQueueWrapper tmp = new AdaptiveRevisitHostQueueWrapper(
195: hq);
196: hostQueues.put(hostName, tmp);
197: sortedHostQueues.add(tmp);
198: return hq;
199: } catch (DatabaseException e) {
200: throw convertDbException(e);
201: }
202: }
203:
204: public AdaptiveRevisitHostQueue getTopHQ() {
205: AdaptiveRevisitHostQueueWrapper wrapper = (AdaptiveRevisitHostQueueWrapper) sortedHostQueues
206: .first();
207: return wrapper.hq;
208: }
209:
210: /**
211: * Returns the number of URIs in all the HQs in this list
212: * @return the number of URIs in all the HQs in this list
213: */
214: public long getSize() {
215: long size = 0;
216: for (Iterator it = sortedHostQueues.iterator(); it.hasNext();) {
217: AdaptiveRevisitHostQueue hq = ((AdaptiveRevisitHostQueueWrapper) it
218: .next()).hq;
219: size += hq.getSize();
220: }
221: return size;
222: }
223:
224: /**
225: * Returns the average depth of all the HQs in this list
226: * @return the average depth of all the HQs in this list (rounded down)
227: */
228: public long getAverageDepth() {
229: long size = getSize();
230: return size / hostQueues.size();
231: }
232:
233: /**
234: * Returns the size of the largest (deepest) queue.
235: * @return the size of the largest (deepest) queue.
236: */
237: public long getDeepestQueueSize() {
238: long size = 0;
239: for (Iterator it = sortedHostQueues.iterator(); it.hasNext();) {
240: AdaptiveRevisitHostQueue hq = ((AdaptiveRevisitHostQueueWrapper) it
241: .next()).hq;
242: if (hq.getSize() > size) {
243: size = hq.getSize();
244: }
245: }
246: return size;
247: }
248:
249: /**
250: * Returns the congestion ratio.
251: * <p>
252: * The congestion ratio is equal to the total number of queues divided
253: * by the number of queues currently being processed or are snozzed (i.e.
254: * not ready). A congestion ratio of 1 indicates no congestion.
255: * @return the congestion ratio
256: */
257: public float getCongestionRatio() {
258: int readyQueues = 0;
259: for (Iterator it = sortedHostQueues.iterator(); it.hasNext();) {
260: AdaptiveRevisitHostQueue hq = ((AdaptiveRevisitHostQueueWrapper) it
261: .next()).hq;
262: if (hq.getState() == AdaptiveRevisitHostQueue.HQSTATE_READY) {
263: readyQueues++;
264: }
265: }
266: int totalQueues = hostQueues.size();
267:
268: return (float) (totalQueues) / (totalQueues - readyQueues);
269: }
270:
271: /**
272: * This method reorders the host queues. Method is only called by the
273: * AdaptiveRevisitHostQueue that it 'owns' when their reported time of next
274: * ready is being updated.
275: *
276: * @param hq The calling HQ
277: */
278: protected void reorder(AdaptiveRevisitHostQueue hq) {
279: // Find the wrapper
280: AdaptiveRevisitHostQueueWrapper wrapper = (AdaptiveRevisitHostQueueWrapper) hostQueues
281: .get(hq.getHostName());
282:
283: long newTime = hq.getNextReadyTime();
284:
285: if (newTime != wrapper.nextReadyTime) {
286: // Ok, the time has changed, move the queue around.
287: if (logger.isLoggable(Level.FINER)) {
288: logger.finer("reorder(" + hq.getHostName() + ") was "
289: + wrapper.nextReadyTime);
290: }
291: // Remove it from the sorted list
292: sortedHostQueues.remove(wrapper);
293: // Update the time on the ref.
294: wrapper.nextReadyTime = newTime;
295: if (logger.isLoggable(Level.FINER)) {
296: logger.finer("reorder(" + hq.getHostName() + ") is "
297: + wrapper.nextReadyTime);
298: }
299: // Readd to the list
300: sortedHostQueues.add(wrapper);
301: }
302: }
303:
304: /**
305: * The total number of URIs queued in all the HQs belonging to this list.
306: *
307: * @return total number of URIs queued in all the HQs belonging to this list.
308: */
309: public long getUriCount() {
310: Iterator it = hostQueues.keySet().iterator();
311: long count = 0;
312: while (it.hasNext()) {
313: AdaptiveRevisitHostQueueWrapper wrapper = (AdaptiveRevisitHostQueueWrapper) it
314: .next();
315: count += wrapper.hq.getSize();
316: }
317: return count;
318: }
319:
320: /**
321: * This class wraps an AdaptiveRevisitHostQueue with a fixed value for next
322: * ready time.
323: * This is done to facilitate sorting by making sure that the value does
324: * not change while the HQ is in the sorted list. With this wrapper, it
325: * is possible to remove it from the sorted list, then update the time of
326: * next ready, and then readd (resort) it.
327: *
328: * @author Kristinn Sigurdsson
329: */
330: private class AdaptiveRevisitHostQueueWrapper implements Comparable {
331: long nextReadyTime;
332: AdaptiveRevisitHostQueue hq;
333:
334: public AdaptiveRevisitHostQueueWrapper(
335: AdaptiveRevisitHostQueue hq) {
336: nextReadyTime = hq.getNextReadyTime();
337: this .hq = hq;
338: }
339:
340: /**
341: * Compares the this ARHQWrapper to the supplied one.
342: *
343: * @param obj the HQ to compare to. If this object is not an instance
344: * of ARHostQueueWrapper then the method will throw a
345: * ClassCastException.
346: * @return a value less than 0 if this HQWrappers time of next ready
347: * value is less than the argument HQWrappers's; and a value
348: * greater than 0 if this value is greater. If the time of
349: * next ready is equal, the hostName strings will be compared
350: * and that result returned.
351: */
352: public int compareTo(Object obj) {
353: AdaptiveRevisitHostQueueWrapper comp = (AdaptiveRevisitHostQueueWrapper) obj;
354:
355: long compTime = comp.nextReadyTime;
356:
357: if (nextReadyTime > compTime) {
358: return 1;
359: } else if (nextReadyTime < compTime) {
360: return -1;
361: } else {
362: // Equal time. Use hostnames
363: return hq.getHostName()
364: .compareTo(comp.hq.getHostName());
365: }
366: }
367: }
368:
369: /**
370: * Closes all HQs and the Environment.
371: */
372: public void close() {
373: Iterator it = sortedHostQueues.iterator();
374: while (it.hasNext()) {
375: AdaptiveRevisitHostQueue hq = ((AdaptiveRevisitHostQueueWrapper) it
376: .next()).hq;
377: try {
378: hq.close();
379: } catch (IOException e) {
380: logger.severe("IOException while closing "
381: + hq.getHostName() + "\n" + e.getMessage());
382: }
383: }
384: try {
385: hostNamesDB.close();
386: } catch (DatabaseException e) {
387: logger.severe("IOException while closing hostNamesDB"
388: + "\n" + e.getMessage());
389: }
390: }
391:
392: //
393: // Reporter implementation
394: //
395:
396: public String[] getReports() {
397: // none but default for now
398: return new String[] {};
399: }
400:
401: /* (non-Javadoc)
402: * @see org.archive.util.Reporter#singleLineReport()
403: */
404: public String singleLineReport() {
405: return ArchiveUtils.singleLineReport(this );
406: }
407:
408: /* (non-Javadoc)
409: * @see org.archive.util.Reporter#reportTo(java.io.Writer)
410: */
411: public void reportTo(PrintWriter writer) {
412: Iterator it = sortedHostQueues.iterator();
413: while (it.hasNext()) {
414: AdaptiveRevisitHostQueueWrapper wrapper = (AdaptiveRevisitHostQueueWrapper) it
415: .next();
416:
417: writer.print(wrapper.hq.report(10));
418: writer.print("\n\n");
419: }
420: }
421:
422: public void reportTo(String name, PrintWriter writer) {
423: if (name == null || hostQueues.containsKey(name) == false) {
424: reportTo(writer);
425: } else {
426: AdaptiveRevisitHostQueueWrapper wrapper = (AdaptiveRevisitHostQueueWrapper) hostQueues
427: .get(name);
428:
429: writer.print(wrapper.hq.report(0));
430: writer.print("\n\n");
431: }
432: }
433:
434: public void singleLineReportTo(PrintWriter writer) {
435: Iterator it = sortedHostQueues.iterator();
436: int total = 0;
437: int ready = 0;
438: int snoozed = 0;
439: int empty = 0;
440: int busy = 0;
441: while (it.hasNext()) {
442: AdaptiveRevisitHostQueueWrapper wrapper = (AdaptiveRevisitHostQueueWrapper) it
443: .next();
444: total++;
445: switch (wrapper.hq.getState()) {
446: case AdaptiveRevisitHostQueue.HQSTATE_BUSY:
447: busy++;
448: break;
449: case AdaptiveRevisitHostQueue.HQSTATE_EMPTY:
450: empty++;
451: break;
452: case AdaptiveRevisitHostQueue.HQSTATE_READY:
453: ready++;
454: break;
455: case AdaptiveRevisitHostQueue.HQSTATE_SNOOZED:
456: snoozed++;
457: break;
458: }
459: }
460: writer.print(total + " queues: " + ready + " ready, " + snoozed
461: + " snoozed, " + busy + " busy, and " + empty
462: + " empty");
463: }
464:
465: /* (non-Javadoc)
466: * @see org.archive.util.Reporter#singleLineLegend()
467: */
468: public String singleLineLegend() {
469: return "total ready snoozed busy empty";
470: }
471:
472: }
|