001: /* BdbMultipleWorkQueues
002: *
003: * Created on Dec 24, 2004
004: *
005: * Copyright (C) 2004 Internet Archive.
006: *
007: * This file is part of the Heritrix web crawler (crawler.archive.org).
008: *
009: * Heritrix is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU Lesser Public License as published by
011: * the Free Software Foundation; either version 2.1 of the License, or
012: * any later version.
013: *
014: * Heritrix is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
017: * GNU Lesser Public License for more details.
018: *
019: * You should have received a copy of the GNU Lesser Public License
020: * along with Heritrix; if not, write to the Free Software
021: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
022: */
023: package org.archive.crawler.frontier;
024:
025: import java.io.UnsupportedEncodingException;
026: import java.math.BigInteger;
027: import java.util.ArrayList;
028: import java.util.List;
029: import java.util.logging.Level;
030: import java.util.logging.Logger;
031: import java.util.regex.Pattern;
032:
033: import org.archive.crawler.datamodel.CrawlURI;
034: import org.archive.crawler.framework.FrontierMarker;
035: import org.archive.util.ArchiveUtils;
036:
037: import com.sleepycat.bind.serial.StoredClassCatalog;
038: import com.sleepycat.je.Cursor;
039: import com.sleepycat.je.Database;
040: import com.sleepycat.je.DatabaseConfig;
041: import com.sleepycat.je.DatabaseEntry;
042: import com.sleepycat.je.DatabaseException;
043: import com.sleepycat.je.DatabaseNotFoundException;
044: import com.sleepycat.je.Environment;
045: import com.sleepycat.je.OperationStatus;
046: import com.sleepycat.util.RuntimeExceptionWrapper;
047:
048: /**
049: * A BerkeleyDB-database-backed structure for holding ordered
050: * groupings of CrawlURIs. Reading the groupings from specific
051: * per-grouping (per-classKey/per-Host) starting points allows
052: * this to act as a collection of independent queues.
053: *
054: * <p>For how the bdb keys are made, see {@link #calculateInsertKey(CrawlURI)}.
055: *
056: * <p>TODO: refactor, improve naming.
057: *
058: * @author gojomo
059: */
060: public class BdbMultipleWorkQueues {
061: private static final long serialVersionUID = ArchiveUtils
062: .classnameBasedUID(BdbMultipleWorkQueues.class, 1);
063:
064: private static final Logger LOGGER = Logger
065: .getLogger(BdbMultipleWorkQueues.class.getName());
066:
067: /** Database holding all pending URIs, grouped in virtual queues */
068: private Database pendingUrisDB = null;
069:
070: /** Supporting bdb serialization of CrawlURIs */
071: private RecyclingSerialBinding crawlUriBinding;
072:
073: /**
074: * Create the multi queue in the given environment.
075: *
076: * @param env bdb environment to use
077: * @param classCatalog Class catalog to use.
078: * @param recycle True if we are to reuse db content if any.
079: * @throws DatabaseException
080: */
081: public BdbMultipleWorkQueues(Environment env,
082: StoredClassCatalog classCatalog, final boolean recycle)
083: throws DatabaseException {
084: // Open the database. Create it if it does not already exist.
085: DatabaseConfig dbConfig = new DatabaseConfig();
086: dbConfig.setAllowCreate(true);
087: if (!recycle) {
088: try {
089: env.truncateDatabase(null, "pending", false);
090: } catch (DatabaseNotFoundException e) {
091: // Ignored
092: }
093: }
094: // Make database deferred write: URLs that are added then removed
095: // before a page-out is required need never cause disk IO.
096: dbConfig.setDeferredWrite(true);
097:
098: this .pendingUrisDB = env
099: .openDatabase(null, "pending", dbConfig);
100: crawlUriBinding = new RecyclingSerialBinding(classCatalog,
101: CrawlURI.class);
102: }
103:
104: /**
105: * Delete all CrawlURIs matching the given expression.
106: *
107: * @param match
108: * @param queue
109: * @param headKey
110: * @return count of deleted items
111: * @throws DatabaseException
112: * @throws DatabaseException
113: */
114: public long deleteMatchingFromQueue(String match, String queue,
115: DatabaseEntry headKey) throws DatabaseException {
116: long deletedCount = 0;
117: Pattern pattern = Pattern.compile(match);
118: DatabaseEntry key = headKey;
119: DatabaseEntry value = new DatabaseEntry();
120: Cursor cursor = null;
121: try {
122: cursor = pendingUrisDB.openCursor(null, null);
123: OperationStatus result = cursor.getSearchKeyRange(headKey,
124: value, null);
125:
126: while (result == OperationStatus.SUCCESS) {
127: if (value.getData().length > 0) {
128: CrawlURI curi = (CrawlURI) crawlUriBinding
129: .entryToObject(value);
130: if (!curi.getClassKey().equals(queue)) {
131: // rolled into next queue; finished with this queue
132: break;
133: }
134: if (pattern.matcher(curi.toString()).matches()) {
135: cursor.delete();
136: deletedCount++;
137: }
138: }
139: result = cursor.getNext(key, value, null);
140: }
141: } finally {
142: if (cursor != null) {
143: cursor.close();
144: }
145: }
146:
147: return deletedCount;
148: }
149:
150: /**
151: * @param m marker
152: * @param maxMatches
153: * @return list of matches starting from marker position
154: * @throws DatabaseException
155: */
156: public List getFrom(FrontierMarker m, int maxMatches)
157: throws DatabaseException {
158: int matches = 0;
159: int tries = 0;
160: ArrayList<CrawlURI> results = new ArrayList<CrawlURI>(
161: maxMatches);
162: BdbFrontierMarker marker = (BdbFrontierMarker) m;
163:
164: DatabaseEntry key = marker.getStartKey();
165: DatabaseEntry value = new DatabaseEntry();
166:
167: if (key != null) {
168: Cursor cursor = null;
169: OperationStatus result = null;
170: try {
171: cursor = pendingUrisDB.openCursor(null, null);
172: result = cursor.getSearchKey(key, value, null);
173:
174: while (matches < maxMatches
175: && result == OperationStatus.SUCCESS) {
176: if (value.getData().length > 0) {
177: CrawlURI curi = (CrawlURI) crawlUriBinding
178: .entryToObject(value);
179: if (marker.accepts(curi)) {
180: results.add(curi);
181: matches++;
182: }
183: tries++;
184: }
185: result = cursor.getNext(key, value, null);
186: }
187: } finally {
188: if (cursor != null) {
189: cursor.close();
190: }
191: }
192:
193: if (result != OperationStatus.SUCCESS) {
194: // end of scan
195: marker.setStartKey(null);
196: }
197: }
198: return results;
199: }
200:
201: /**
202: * Get a marker for beginning a scan over all contents
203: *
204: * @param regexpr
205: * @return a marker pointing to the first item
206: */
207: public FrontierMarker getInitialMarker(String regexpr) {
208: try {
209: return new BdbFrontierMarker(getFirstKey(), regexpr);
210: } catch (DatabaseException e) {
211: e.printStackTrace();
212: return null;
213: }
214: }
215:
216: /**
217: * @return the key to the first item in the database
218: * @throws DatabaseException
219: */
220: protected DatabaseEntry getFirstKey() throws DatabaseException {
221: DatabaseEntry key = new DatabaseEntry();
222: DatabaseEntry value = new DatabaseEntry();
223: Cursor cursor = pendingUrisDB.openCursor(null, null);
224: OperationStatus status = cursor.getNext(key, value, null);
225: cursor.close();
226: if (status == OperationStatus.SUCCESS) {
227: return key;
228: }
229: return null;
230: }
231:
232: /**
233: * Get the next nearest item after the given key. Relies on
234: * external discipline -- we'll look at the queues count of how many
235: * items it has -- to avoid asking for something from a
236: * range where there are no associated items --
237: * otherwise could get first item of next 'queue' by mistake.
238: *
239: * <p>TODO: hold within a queue's range
240: *
241: * @param headKey Key prefix that demarks the beginning of the range
242: * in <code>pendingUrisDB</code> we're interested in.
243: * @return CrawlURI.
244: * @throws DatabaseException
245: */
246: public CrawlURI get(DatabaseEntry headKey) throws DatabaseException {
247: DatabaseEntry result = new DatabaseEntry();
248:
249: // From Linda Lee of sleepycat:
250: // "You want to check the status returned from Cursor.getSearchKeyRange
251: // to make sure that you have OperationStatus.SUCCESS. In that case,
252: // you have found a valid data record, and result.getData()
253: // (called by internally by the binding code, in this case) will be
254: // non-null. The other possible status return is
255: // OperationStatus.NOTFOUND, in which case no data record matched
256: // the criteria. "
257: OperationStatus status = getNextNearestItem(headKey, result);
258: CrawlURI retVal = null;
259: if (status != OperationStatus.SUCCESS) {
260: LOGGER
261: .severe("See '1219854 NPE je-2.0 "
262: + "entryToObject...'. OperationStatus "
263: + " was not SUCCESS: "
264: + status
265: + ", headKey "
266: + BdbWorkQueue.getPrefixClassKey(headKey
267: .getData()));
268: return null;
269: }
270: try {
271: retVal = (CrawlURI) crawlUriBinding.entryToObject(result);
272: } catch (RuntimeExceptionWrapper rw) {
273: LOGGER.log(Level.SEVERE,
274: "expected object missing in queue "
275: + BdbWorkQueue.getPrefixClassKey(headKey
276: .getData()), rw);
277: return null;
278: }
279: retVal.setHolderKey(headKey);
280: return retVal;
281: }
282:
283: protected OperationStatus getNextNearestItem(DatabaseEntry headKey,
284: DatabaseEntry result) throws DatabaseException {
285: Cursor cursor = null;
286: OperationStatus status;
287: try {
288: cursor = this .pendingUrisDB.openCursor(null, null);
289: // get cap; headKey at this point should always point to
290: // a queue-beginning cap entry (zero-length value)
291: status = cursor.getSearchKey(headKey, result, null);
292: if (status != OperationStatus.SUCCESS
293: || result.getData().length > 0) {
294: // cap missing
295: throw new DatabaseException("bdb queue cap missing");
296: }
297: // get next item (real first item of queue)
298: status = cursor.getNext(headKey, result, null);
299: } finally {
300: if (cursor != null) {
301: cursor.close();
302: }
303: }
304: return status;
305: }
306:
307: /**
308: * Put the given CrawlURI in at the appropriate place.
309: *
310: * @param curi
311: * @throws DatabaseException
312: */
313: public void put(CrawlURI curi, boolean overwriteIfPresent)
314: throws DatabaseException {
315: DatabaseEntry insertKey = (DatabaseEntry) curi.getHolderKey();
316: if (insertKey == null) {
317: insertKey = calculateInsertKey(curi);
318: curi.setHolderKey(insertKey);
319: }
320: DatabaseEntry value = new DatabaseEntry();
321: crawlUriBinding.objectToEntry(curi, value);
322: // Output tally on avg. size if level is FINE or greater.
323: if (LOGGER.isLoggable(Level.FINE)) {
324: tallyAverageEntrySize(curi, value);
325: }
326: OperationStatus status;
327: if (overwriteIfPresent) {
328: status = pendingUrisDB.put(null, insertKey, value);
329: } else {
330: status = pendingUrisDB.putNoOverwrite(null, insertKey,
331: value);
332: }
333: if (status != OperationStatus.SUCCESS) {
334: LOGGER.severe("failed; " + status + " " + curi);
335: }
336: }
337:
338: private long entryCount = 0;
339: private long entrySizeSum = 0;
340: private int largestEntry = 0;
341:
342: /**
343: * Log average size of database entry.
344: * @param curi CrawlURI this entry is for.
345: * @param value Database entry value.
346: */
347: private synchronized void tallyAverageEntrySize(CrawlURI curi,
348: DatabaseEntry value) {
349: entryCount++;
350: int length = value.getData().length;
351: entrySizeSum += length;
352: int avg = (int) (entrySizeSum / entryCount);
353: if (entryCount % 1000 == 0) {
354: LOGGER.fine("Average entry size at " + entryCount + ": "
355: + avg);
356: }
357: if (length > largestEntry) {
358: largestEntry = length;
359: LOGGER.fine("Largest entry: " + length + " " + curi);
360: if (length > (2 * avg)) {
361: LOGGER.fine("excessive?");
362: }
363: }
364: }
365:
366: /**
367: * Calculate the 'origin' key for a virtual queue of items
368: * with the given classKey. This origin key will be a
369: * prefix of the keys for all items in the queue.
370: *
371: * @param classKey String key to derive origin byte key from
372: * @return a byte array key
373: */
374: static byte[] calculateOriginKey(String classKey) {
375: byte[] classKeyBytes = null;
376: int len = 0;
377: try {
378: classKeyBytes = classKey.getBytes("UTF-8");
379: len = classKeyBytes.length;
380: } catch (UnsupportedEncodingException e) {
381: // should be impossible; all JVMs must support UTF-8
382: e.printStackTrace();
383: }
384: byte[] keyData = new byte[len + 1];
385: System.arraycopy(classKeyBytes, 0, keyData, 0, len);
386: keyData[len] = 0;
387: return keyData;
388: }
389:
390: /**
391: * Calculate the insertKey that places a CrawlURI in the
392: * desired spot. First bytes are always classKey (usu. host)
393: * based -- ensuring grouping by host -- terminated by a zero
394: * byte. Then 8 bytes of data ensuring desired ordering
395: * within that 'queue' are used. The first byte of these 8 is
396: * priority -- allowing 'immediate' and 'soon' items to
397: * sort above regular. Next 1 byte is 'cost'. Last 6 bytes
398: * are ordinal serial number, ensuring earlier-discovered
399: * URIs sort before later.
400: *
401: * NOTE: Dangers here are:
402: * (1) priorities or costs over 2^7 (signed byte comparison)
403: * (2) ordinals over 2^48
404: *
405: * Package access & static for testing purposes.
406: *
407: * @param curi
408: * @return a DatabaseEntry key for the CrawlURI
409: */
410: static DatabaseEntry calculateInsertKey(CrawlURI curi) {
411: byte[] classKeyBytes = null;
412: int len = 0;
413: try {
414: classKeyBytes = curi.getClassKey().getBytes("UTF-8");
415: len = classKeyBytes.length;
416: } catch (UnsupportedEncodingException e) {
417: // should be impossible; all JVMs must support UTF-8
418: e.printStackTrace();
419: }
420: byte[] keyData = new byte[len + 9];
421: System.arraycopy(classKeyBytes, 0, keyData, 0, len);
422: keyData[len] = 0;
423: long ordinalPlus = curi.getOrdinal() & 0x0000FFFFFFFFFFFFL;
424: ordinalPlus = ((long) curi.getSchedulingDirective() << 56)
425: | ordinalPlus;
426: ordinalPlus = ((((long) curi.getHolderCost()) & 0xFFL) << 48)
427: | ordinalPlus;
428: ArchiveUtils.longIntoByteArray(ordinalPlus, keyData, len + 1);
429: return new DatabaseEntry(keyData);
430: }
431:
432: /**
433: * Delete the given CrawlURI from persistent store. Requires
434: * the key under which it was stored be available.
435: *
436: * @param item
437: * @throws DatabaseException
438: */
439: public void delete(CrawlURI item) throws DatabaseException {
440: OperationStatus status;
441: status = pendingUrisDB.delete(null, (DatabaseEntry) item
442: .getHolderKey());
443: if (status != OperationStatus.SUCCESS) {
444: LOGGER.severe("expected item not present: "
445: + item
446: + "("
447: + (new BigInteger(((DatabaseEntry) item
448: .getHolderKey()).getData())).toString(16)
449: + ")");
450: }
451:
452: }
453:
454: /**
455: * Method used by BdbFrontier during checkpointing.
456: * <p>The backing bdbje database has been marked deferred write so we save
457: * on writes to disk. Means no guarantees disk will have whats in memory
458: * unless a sync is called (Calling sync on the bdbje Environment is not
459: * sufficent).
460: * <p>Package access only because only Frontiers of this package would ever
461: * need access.
462: * @see <a href="http://www.sleepycat.com/jedocs/GettingStartedGuide/DB.html">Deferred Write Databases</a>
463: */
464: void sync() {
465: if (this .pendingUrisDB == null) {
466: return;
467: }
468: try {
469: this .pendingUrisDB.sync();
470: } catch (DatabaseException e) {
471: e.printStackTrace();
472: }
473: }
474:
475: /**
476: * clean up
477: *
478: */
479: public void close() {
480: try {
481: this .pendingUrisDB.close();
482: } catch (DatabaseException e) {
483: e.printStackTrace();
484: }
485: }
486:
487: /**
488: * Marker for remembering a position within the BdbMultipleWorkQueues.
489: *
490: * @author gojomo
491: */
492: public class BdbFrontierMarker implements FrontierMarker {
493: DatabaseEntry startKey;
494: Pattern pattern;
495: int nextItemNumber;
496:
497: /**
498: * Create a marker pointed at the given start location.
499: *
500: * @param startKey
501: * @param regexpr
502: */
503: public BdbFrontierMarker(DatabaseEntry startKey, String regexpr) {
504: this .startKey = startKey;
505: pattern = Pattern.compile(regexpr);
506: nextItemNumber = 1;
507: }
508:
509: /**
510: * @param curi
511: * @return whether the marker accepts the given CrawlURI
512: */
513: public boolean accepts(CrawlURI curi) {
514: boolean retVal = pattern.matcher(curi.toString()).matches();
515: if (retVal == true) {
516: nextItemNumber++;
517: }
518: return retVal;
519: }
520:
521: /**
522: * @param key position for marker
523: */
524: public void setStartKey(DatabaseEntry key) {
525: startKey = key;
526: }
527:
528: /**
529: * @return startKey
530: */
531: public DatabaseEntry getStartKey() {
532: return startKey;
533: }
534:
535: /* (non-Javadoc)
536: * @see org.archive.crawler.framework.FrontierMarker#getMatchExpression()
537: */
538: public String getMatchExpression() {
539: return pattern.pattern();
540: }
541:
542: /* (non-Javadoc)
543: * @see org.archive.crawler.framework.FrontierMarker#getNextItemNumber()
544: */
545: public long getNextItemNumber() {
546: return nextItemNumber;
547: }
548:
549: /* (non-Javadoc)
550: * @see org.archive.crawler.framework.FrontierMarker#hasNext()
551: */
552: public boolean hasNext() {
553: // as long as any startKey is stated, consider as having next
554: return startKey != null;
555: }
556: }
557:
558: /**
559: * Add a dummy 'cap' entry at the given insertion key. Prevents
560: * 'seeks' to queue heads from holding lock on last item of
561: * 'preceding' queue. See:
562: * http://sourceforge.net/tracker/index.php?func=detail&aid=1262665&group_id=73833&atid=539102
563: *
564: * @param origin key at which to insert the cap
565: */
566: public void addCap(byte[] origin) {
567: try {
568: pendingUrisDB.put(null, new DatabaseEntry(origin),
569: new DatabaseEntry(new byte[0]));
570: } catch (DatabaseException e) {
571: throw new RuntimeException(e);
572: }
573: }
574: }
|