001: /* BdbWorkQueue
002: *
003: * Created on Dec 24, 2004
004: *
005: * Copyright (C) 2004 Internet Archive.
006: *
007: * This file is part of the Heritrix web crawler (crawler.archive.org).
008: *
009: * Heritrix is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU Lesser Public License as published by
011: * the Free Software Foundation; either version 2.1 of the License, or
012: * any later version.
013: *
014: * Heritrix is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
017: * GNU Lesser Public License for more details.
018: *
019: * You should have received a copy of the GNU Lesser Public License
020: * along with Heritrix; if not, write to the Free Software
021: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
022: */
023: package org.archive.crawler.frontier;
024:
025: import java.io.IOException;
026: import java.io.Serializable;
027: import java.io.UnsupportedEncodingException;
028: import java.util.logging.Level;
029: import java.util.logging.Logger;
030:
031: import org.archive.crawler.datamodel.CrawlURI;
032: import org.archive.util.ArchiveUtils;
033: import org.archive.util.IoUtils;
034:
035: import com.sleepycat.je.DatabaseEntry;
036: import com.sleepycat.je.DatabaseException;
037:
038: /**
039: * One independent queue of items with the same 'classKey' (eg host).
040: * @author gojomo
041: */
042: public class BdbWorkQueue extends WorkQueue implements Comparable,
043: Serializable {
044: private static Logger LOGGER = Logger.getLogger(BdbWorkQueue.class
045: .getName());
046:
047: // be robust against trivial implementation changes
048: private static final long serialVersionUID = ArchiveUtils
049: .classnameBasedUID(BdbWorkQueue.class, 1);
050:
051: /**
052: * All items in this queue have this same 'origin'
053: * prefix to their keys.
054: */
055: private byte[] origin;
056:
057: /**
058: * Create a virtual queue inside the given BdbMultipleWorkQueues
059: *
060: * @param classKey
061: */
062: public BdbWorkQueue(String classKey, BdbFrontier frontier) {
063: super (classKey);
064: this .origin = BdbMultipleWorkQueues
065: .calculateOriginKey(classKey);
066: if (LOGGER.isLoggable(Level.FINE)) {
067: LOGGER
068: .fine(getPrefixClassKey(this .origin) + " "
069: + classKey);
070: }
071: // add the queue-front 'cap' entry; see...
072: // http://sourceforge.net/tracker/index.php?func=detail&aid=1262665&group_id=73833&atid=539102
073: frontier.getWorkQueues().addCap(origin);
074: }
075:
076: protected long deleteMatchingFromQueue(
077: final WorkQueueFrontier frontier, final String match)
078: throws IOException {
079: try {
080: final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
081: .getWorkQueues();
082: return queues.deleteMatchingFromQueue(match, classKey,
083: new DatabaseEntry(origin));
084: } catch (DatabaseException e) {
085: throw IoUtils.wrapAsIOException(e);
086: }
087: }
088:
089: protected void deleteItem(final WorkQueueFrontier frontier,
090: final CrawlURI peekItem) throws IOException {
091: try {
092: final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
093: .getWorkQueues();
094: queues.delete(peekItem);
095: } catch (DatabaseException e) {
096: e.printStackTrace();
097: throw IoUtils.wrapAsIOException(e);
098: }
099: }
100:
101: protected CrawlURI peekItem(final WorkQueueFrontier frontier)
102: throws IOException {
103: final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
104: .getWorkQueues();
105: DatabaseEntry key = new DatabaseEntry(origin);
106: CrawlURI curi = null;
107: int tries = 1;
108: while (true) {
109: try {
110: curi = queues.get(key);
111: } catch (DatabaseException e) {
112: LOGGER.log(Level.SEVERE, "peekItem failure; retrying",
113: e);
114: }
115:
116: // ensure CrawlURI, if any, came from acceptable range:
117: if (!ArchiveUtils.startsWith(key.getData(), origin)) {
118: LOGGER.severe("inconsistency: " + classKey + "("
119: + getPrefixClassKey(origin) + ") with "
120: + getCount() + " items gave " + curi + "("
121: + getPrefixClassKey(key.getData()));
122: // clear curi to allow retry
123: curi = null;
124: // reset key to original origin for retry
125: key.setData(origin);
126: }
127:
128: if (curi != null) {
129: // success
130: break;
131: }
132:
133: if (tries > 3) {
134: LOGGER.severe("no item where expected in queue "
135: + classKey);
136: break;
137: }
138: tries++;
139: LOGGER.severe("Trying get #" + Integer.toString(tries)
140: + " in queue " + classKey + " with " + getCount()
141: + " items using key "
142: + getPrefixClassKey(key.getData()));
143: }
144:
145: return curi;
146: }
147:
148: protected void insertItem(final WorkQueueFrontier frontier,
149: final CrawlURI curi, boolean overwriteIfPresent)
150: throws IOException {
151: try {
152: final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
153: .getWorkQueues();
154: queues.put(curi, overwriteIfPresent);
155: if (LOGGER.isLoggable(Level.FINE)) {
156: LOGGER.fine("Inserted into "
157: + getPrefixClassKey(this .origin) + " (count "
158: + Long.toString(getCount()) + "): "
159: + curi.toString());
160: }
161: } catch (DatabaseException e) {
162: throw IoUtils.wrapAsIOException(e);
163: }
164: }
165:
166: /**
167: * @param byteArray Byte array to get hex string of.
168: * @return Hex string of passed in byte array (Used logging
169: * key-prefixes).
170: */
171: protected static String getPrefixClassKey(final byte[] byteArray) {
172: int zeroIndex = 0;
173: while (byteArray[zeroIndex] != 0) {
174: zeroIndex++;
175: }
176: try {
177: return new String(byteArray, 0, zeroIndex, "UTF-8");
178: } catch (UnsupportedEncodingException e) {
179: // should be impossible; UTF-8 always available
180: e.printStackTrace();
181: return e.getMessage();
182: }
183: }
184: }
|