001: // plasmaCrawlBalancer.java
002: // -----------------------
003: // part of YaCy
004: // (C) by Michael Peter Christen; mc@anomic.de
005: // first published on http://www.anomic.de
006: // Frankfurt, Germany, 2005
007: // created: 24.09.2005
008: //
009: // This program is free software; you can redistribute it and/or modify
010: // it under the terms of the GNU General Public License as published by
011: // the Free Software Foundation; either version 2 of the License, or
012: // (at your option) any later version.
013: //
014: // This program is distributed in the hope that it will be useful,
015: // but WITHOUT ANY WARRANTY; without even the implied warranty of
016: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
017: // GNU General Public License for more details.
018: //
019: // You should have received a copy of the GNU General Public License
020: // along with this program; if not, write to the Free Software
021: // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
022: //
023: // Using this software in any meaning (reading, learning, copying, compiling,
024: // running) means that you agree that the Author(s) is (are) not responsible
025: // for cost, loss of data or any harm that may be caused directly or indirectly
026: // by usage of this softare or this documentation. The usage of this software
027: // is on your own risk. The installation and usage (starting/running) of this
028: // software may allow other people or application to access your computer and
029: // any attached devices and is highly dependent on the configuration of the
030: // software which must be done by the user of the software; the author(s) is
031: // (are) also not responsible for proper configuration and usage of the
032: // software, even if provoked by documentation provided together with
033: // the software.
034: //
035: // Any changes to this file according to the GPL as documented in the file
036: // gpl.txt aside this file in the shipment you received can be done to the
037: // lines that follows this copyright notice here, but changes must not be
038: // done inside the copyright notive above. A re-distribution must contain
039: // the intact and unchanged copyright notice.
040: // Contributions and changes to the program code must be marked as such.
041:
042: package de.anomic.plasma;
043:
044: import java.io.File;
045: import java.io.IOException;
046: import java.util.ArrayList;
047: import java.util.Collections;
048: import java.util.HashMap;
049: import java.util.Iterator;
050: import java.util.LinkedList;
051: import java.util.Map;
052: import java.util.TreeMap;
053:
054: import de.anomic.kelondro.kelondroAbstractRecords;
055: import de.anomic.kelondro.kelondroBase64Order;
056: import de.anomic.kelondro.kelondroEcoTable;
057: import de.anomic.kelondro.kelondroIndex;
058: import de.anomic.kelondro.kelondroRow;
059: import de.anomic.kelondro.kelondroStack;
060: import de.anomic.server.logging.serverLog;
061: import de.anomic.yacy.yacySeedDB;
062:
063: public class plasmaCrawlBalancer {
064:
065: private static final String stackSuffix = "9.stack";
066: private static final String indexSuffix = "9.db";
067: private static final int EcoFSBufferSize = 200;
068:
069: // a shared domainAccess map for all balancers
070: private static final Map<String, domaccess> domainAccess = Collections
071: .synchronizedMap(new HashMap<String, domaccess>());
072:
073: // definition of payload for fileStack
074: private static final kelondroRow stackrow = new kelondroRow(
075: "byte[] urlhash-" + yacySeedDB.commonHashLength,
076: kelondroBase64Order.enhancedCoder, 0);
077:
078: // class variables
079: private ArrayList<String> urlRAMStack; // a list that is flushed first
080: private kelondroStack urlFileStack; // a file with url hashes
081: private kelondroIndex urlFileIndex;
082: private HashMap<String, LinkedList<String>> domainStacks; // a map from domain name part to Lists with url hashs
083: private File cacheStacksPath;
084: private String stackname;
085: private boolean top; // to alternate between top and bottom of the file stack
086: private boolean fullram;
087:
088: public static class domaccess {
089: long time;
090: int count;
091:
092: public domaccess() {
093: this .time = System.currentTimeMillis();
094: this .count = 0;
095: }
096:
097: public void update() {
098: this .time = System.currentTimeMillis();
099: this .count++;
100: }
101:
102: public long time() {
103: return this .time;
104: }
105:
106: public int count() {
107: return this .count;
108: }
109: }
110:
111: public plasmaCrawlBalancer(File cachePath, String stackname,
112: boolean fullram) {
113: this .cacheStacksPath = cachePath;
114: this .stackname = stackname;
115: File stackFile = new File(cachePath, stackname + stackSuffix);
116: this .urlFileStack = kelondroStack.open(stackFile, stackrow);
117: this .domainStacks = new HashMap<String, LinkedList<String>>();
118: this .urlRAMStack = new ArrayList<String>();
119: this .top = true;
120: this .fullram = fullram;
121:
122: // create a stack for newly entered entries
123: if (!(cachePath.exists()))
124: cachePath.mkdir(); // make the path
125: openFileIndex();
126: }
127:
128: public synchronized void close() {
129: while (domainStacksNotEmpty())
130: flushOnceDomStacks(0, true); // flush to ram, because the ram flush is optimized
131: try {
132: flushAllRamStack();
133: } catch (IOException e) {
134: }
135: if (urlFileIndex != null) {
136: urlFileIndex.close();
137: urlFileIndex = null;
138: }
139: if (urlFileStack != null) {
140: urlFileStack.close();
141: urlFileStack = null;
142: }
143: }
144:
145: public void finalize() {
146: if (urlFileStack != null)
147: close();
148: }
149:
150: public synchronized void clear() {
151: urlFileStack = kelondroStack.reset(urlFileStack);
152: domainStacks.clear();
153: urlRAMStack.clear();
154: resetFileIndex();
155: }
156:
157: private void openFileIndex() {
158: cacheStacksPath.mkdirs();
159: urlFileIndex = new kelondroEcoTable(new File(cacheStacksPath,
160: stackname + indexSuffix), plasmaCrawlEntry.rowdef,
161: (fullram) ? kelondroEcoTable.tailCacheUsageAuto
162: : kelondroEcoTable.tailCacheDenyUsage,
163: EcoFSBufferSize, 0);
164: }
165:
166: private void resetFileIndex() {
167: if (urlFileIndex != null) {
168: urlFileIndex.close();
169: urlFileIndex = null;
170: new File(cacheStacksPath, stackname + indexSuffix).delete();
171: }
172: openFileIndex();
173: }
174:
175: public synchronized plasmaCrawlEntry get(String urlhash)
176: throws IOException {
177: kelondroRow.Entry entry = urlFileIndex.get(urlhash.getBytes());
178: if (entry == null)
179: return null;
180: return new plasmaCrawlEntry(entry);
181: }
182:
183: public synchronized int removeAllByProfileHandle(
184: String profileHandle) throws IOException {
185: // removes all entries with a specific profile hash.
186: // this may last some time
187: // returns number of deletions
188:
189: // first find a list of url hashes that shall be deleted
190: Iterator<kelondroRow.Entry> i = urlFileIndex.rows(true, null);
191: ArrayList<String> urlHashes = new ArrayList<String>();
192: kelondroRow.Entry rowEntry;
193: plasmaCrawlEntry crawlEntry;
194: while (i.hasNext()) {
195: rowEntry = (kelondroRow.Entry) i.next();
196: crawlEntry = new plasmaCrawlEntry(rowEntry);
197: if (crawlEntry.profileHandle().equals(profileHandle)) {
198: urlHashes.add(crawlEntry.url().hash());
199: }
200: }
201:
202: // then delete all these urls from the queues and the file index
203: Iterator<String> j = urlHashes.iterator();
204: while (j.hasNext())
205: this .remove(j.next());
206: return urlHashes.size();
207: }
208:
209: public synchronized plasmaCrawlEntry remove(String urlhash)
210: throws IOException {
211: // this method is only here, because so many import/export methods need it
212: // and it was implemented in the previous architecture
213: // however, usage is not recommended
214: int s = urlFileIndex.size();
215: kelondroRow.Entry entry = urlFileIndex.remove(urlhash
216: .getBytes(), false);
217: if (entry == null)
218: return null;
219: assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = "
220: + urlFileIndex.size() + ", s = " + s;
221:
222: // now delete that thing also from the queues
223:
224: // iterate through the RAM stack
225: Iterator<String> i = urlRAMStack.iterator();
226: String h;
227: while (i.hasNext()) {
228: h = (String) i.next();
229: if (h.equals(urlhash)) {
230: i.remove();
231: return new plasmaCrawlEntry(entry);
232: }
233: }
234:
235: // iterate through the file stack
236: // in general this is a bad idea. But this can only be avoided by avoidance of this method
237: Iterator<kelondroRow.Entry> j = urlFileStack
238: .stackIterator(true);
239: while (j.hasNext()) {
240: h = new String(j.next().getColBytes(0));
241: if (h.equals(urlhash)) {
242: j.remove();
243: return new plasmaCrawlEntry(entry);
244: }
245: }
246:
247: if (kelondroAbstractRecords.debugmode) {
248: serverLog.logWarning("PLASMA BALANCER",
249: "remove: not found urlhash " + urlhash + " in "
250: + stackname);
251: }
252: return new plasmaCrawlEntry(entry);
253: }
254:
255: public synchronized boolean has(String urlhash) {
256: try {
257: return urlFileIndex.has(urlhash.getBytes());
258: } catch (IOException e) {
259: e.printStackTrace();
260: return false;
261: }
262: }
263:
264: public boolean notEmpty() {
265: // alternative method to the property size() > 0
266: // this is better because it may avoid synchronized access to domain stack summarization
267: return urlRAMStack.size() > 0 || urlFileStack.size() > 0
268: || domainStacksNotEmpty();
269: }
270:
271: public int size() {
272: int componentsize = urlFileStack.size() + urlRAMStack.size()
273: + sizeDomainStacks();
274: if (componentsize != urlFileIndex.size()) {
275: // here is urlIndexFile.size() always smaller. why?
276: if (kelondroAbstractRecords.debugmode) {
277: serverLog.logWarning("PLASMA BALANCER",
278: "size operation wrong in " + stackname
279: + " - componentsize = " + componentsize
280: + ", urlFileIndex.size() = "
281: + urlFileIndex.size());
282: }
283: if ((componentsize == 0) && (urlFileIndex.size() > 0)) {
284: resetFileIndex();
285: }
286: }
287: return componentsize;
288: }
289:
290: private boolean domainStacksNotEmpty() {
291: if (domainStacks == null)
292: return false;
293: synchronized (domainStacks) {
294: Iterator<LinkedList<String>> i = domainStacks.values()
295: .iterator();
296: while (i.hasNext()) {
297: if (i.next().size() > 0)
298: return true;
299: }
300: }
301: return false;
302: }
303:
304: private int sizeDomainStacks() {
305: if (domainStacks == null)
306: return 0;
307: int sum = 0;
308: synchronized (domainStacks) {
309: Iterator<LinkedList<String>> i = domainStacks.values()
310: .iterator();
311: while (i.hasNext())
312: sum += i.next().size();
313: }
314: return sum;
315: }
316:
317: private void flushOnceDomStacks(int minimumleft, boolean ram) {
318: // takes one entry from every domain stack and puts it on the ram or file stack
319: // the minimumleft value is a limit for the number of entries that should be left
320: if (domainStacks.size() == 0)
321: return;
322: synchronized (domainStacks) {
323: Iterator<Map.Entry<String, LinkedList<String>>> i = domainStacks
324: .entrySet().iterator();
325: Map.Entry<String, LinkedList<String>> entry;
326: LinkedList<String> list;
327: while (i.hasNext()) {
328: entry = i.next();
329: list = entry.getValue();
330: if (list.size() > minimumleft) {
331: if (ram) {
332: urlRAMStack.add(list.removeFirst());
333: } else
334: try {
335: urlFileStack
336: .push(urlFileStack
337: .row()
338: .newEntry(
339: new byte[][] { ((String) list
340: .removeFirst())
341: .getBytes() }));
342: } catch (IOException e) {
343: e.printStackTrace();
344: }
345: }
346: if (list.size() == 0)
347: i.remove();
348: }
349: }
350: }
351:
352: private void flushAllRamStack() throws IOException {
353: // this flushes only the ramStack to the fileStack, but does not flush the domainStacks
354: for (int i = 0; i < urlRAMStack.size() / 2; i++) {
355: urlFileStack.push(urlFileStack.row().newEntry(
356: new byte[][] { ((String) urlRAMStack.get(i))
357: .getBytes() }));
358: urlFileStack.push(urlFileStack.row().newEntry(
359: new byte[][] { ((String) urlRAMStack
360: .get(urlRAMStack.size() - i - 1))
361: .getBytes() }));
362: }
363: if (urlRAMStack.size() % 2 == 1)
364: urlFileStack
365: .push(urlFileStack.row().newEntry(
366: new byte[][] { ((String) urlRAMStack
367: .get(urlRAMStack.size() / 2))
368: .getBytes() }));
369: }
370:
371: public synchronized void push(plasmaCrawlEntry entry)
372: throws IOException {
373: assert entry != null;
374: if (urlFileIndex.has(entry.url().hash().getBytes())) {
375: serverLog.logWarning("PLASMA BALANCER",
376: "double-check has failed for urlhash "
377: + entry.url().hash() + " in " + stackname
378: + " - fixed");
379: return;
380: }
381:
382: // extend domain stack
383: String dom = entry.url().hash().substring(6);
384: LinkedList<String> domainList = domainStacks.get(dom);
385: if (domainList == null) {
386: // create new list
387: domainList = new LinkedList<String>();
388: synchronized (domainStacks) {
389: domainList.add(entry.url().hash());
390: domainStacks.put(dom, domainList);
391: }
392: } else {
393: // extend existent domain list
394: domainList.addLast(entry.url().hash());
395: }
396:
397: // add to index
398: urlFileIndex.put(entry.toRow());
399:
400: // check size of domainStacks and flush
401: if ((domainStacks.size() > 20) || (sizeDomainStacks() > 1000)) {
402: flushOnceDomStacks(1, urlRAMStack.size() < 100); // when the ram stack is small, flush it there
403: }
404: }
405:
406: public synchronized plasmaCrawlEntry pop(long minimumLocalDelta,
407: long minimumGlobalDelta, long maximumAge)
408: throws IOException {
409: // returns an url-hash from the stack and ensures minimum delta times
410: // we have 3 sources to choose from: the ramStack, the domainStacks and the fileStack
411:
412: String result = null; // the result
413:
414: // 1st: check ramStack
415: if (urlRAMStack.size() > 0) {
416: result = (String) urlRAMStack.remove(0);
417: }
418:
419: // 2nd-a: check domainStacks for latest arrivals
420: if ((result == null) && (domainStacks.size() > 0))
421: synchronized (domainStacks) {
422: // we select specific domains that have not been used for a long time
423: // i.e. 60 seconds. Latest arrivals that have not yet been crawled
424: // fit also in that scheme
425: Iterator<Map.Entry<String, LinkedList<String>>> i = domainStacks
426: .entrySet().iterator();
427: Map.Entry<String, LinkedList<String>> entry;
428: String domhash;
429: long delta, maxdelta = 0;
430: String maxhash = null;
431: LinkedList<String> domlist;
432: while (i.hasNext()) {
433: entry = i.next();
434: domhash = (String) entry.getKey();
435: delta = lastAccessDelta(domhash);
436: if (delta == Integer.MAX_VALUE) {
437: // a brand new domain - we take it
438: domlist = entry.getValue();
439: result = (String) domlist.removeFirst();
440: if (domlist.size() == 0)
441: i.remove();
442: break;
443: }
444: if (delta > maxdelta) {
445: maxdelta = delta;
446: maxhash = domhash;
447: }
448: }
449: if (maxdelta > maximumAge) {
450: // success - we found an entry from a domain that has not been used for a long time
451: domlist = domainStacks.get(maxhash);
452: result = (String) domlist.removeFirst();
453: if (domlist.size() == 0)
454: domainStacks.remove(maxhash);
455: }
456: }
457:
458: // 2nd-b: check domainStacks for best match between stack size and retrieval time
459: if ((result == null) && (domainStacks.size() > 0))
460: synchronized (domainStacks) {
461: // we order all domains by the number of entries per domain
462: // then we iterate through these domains in descending entry order
463: // and that that one, that has a delta > minimumDelta
464: Iterator<Map.Entry<String, LinkedList<String>>> i = domainStacks
465: .entrySet().iterator();
466: Map.Entry<String, LinkedList<String>> entry;
467: String domhash;
468: LinkedList<String> domlist;
469: TreeMap<Integer, String> hitlist = new TreeMap<Integer, String>();
470: int count = 0;
471: // first collect information about sizes of the domain lists
472: while (i.hasNext()) {
473: entry = i.next();
474: domhash = entry.getKey();
475: domlist = entry.getValue();
476: hitlist.put(new Integer(domlist.size() * 100
477: + count++), domhash);
478: }
479:
480: // now iterate in descending order an fetch that one,
481: // that is acceptable by the minimumDelta constraint
482: long delta;
483: String maxhash = null;
484: while (hitlist.size() > 0) {
485: domhash = (String) hitlist
486: .remove(hitlist.lastKey());
487: if (maxhash == null)
488: maxhash = domhash; // remember first entry
489: delta = lastAccessDelta(domhash);
490: if (delta > minimumGlobalDelta) {
491: domlist = domainStacks.get(domhash);
492: result = (String) domlist.removeFirst();
493: if (domlist.size() == 0)
494: domainStacks.remove(domhash);
495: break;
496: }
497: }
498:
499: // if we did yet not choose any entry, we simply take that one with the most entries
500: if ((result == null) && (maxhash != null)) {
501: domlist = domainStacks.get(maxhash);
502: result = (String) domlist.removeFirst();
503: if (domlist.size() == 0)
504: domainStacks.remove(maxhash);
505: }
506: }
507:
508: // 3rd: take entry from file
509: if ((result == null) && (urlFileStack.size() > 0)) {
510: kelondroRow.Entry nextentry = (top) ? urlFileStack.top()
511: : urlFileStack.bot();
512: if (nextentry == null) {
513: // emergency case: this means that something with the stack organization is wrong
514: // the file appears to be broken. We kill the file.
515: kelondroStack.reset(urlFileStack);
516: serverLog
517: .logSevere("PLASMA BALANCER",
518: "get() failed to fetch entry from file stack. reset stack file.");
519: } else {
520: String nexthash = new String(nextentry.getColBytes(0));
521:
522: // check if the time after retrieval of last hash from same
523: // domain is not shorter than the minimumDelta
524: long delta = lastAccessDelta(nexthash);
525: if (delta > minimumGlobalDelta) {
526: // the entry is fine
527: result = new String((top) ? urlFileStack.pop()
528: .getColBytes(0) : urlFileStack.pot()
529: .getColBytes(0));
530: } else {
531: // try other entry
532: result = new String((top) ? urlFileStack.pot()
533: .getColBytes(0) : urlFileStack.pop()
534: .getColBytes(0));
535: delta = lastAccessDelta(result);
536: }
537: }
538: top = !top; // alternate top/bottom
539: }
540:
541: // check case where we did not found anything
542: if (result == null) {
543: serverLog.logSevere("PLASMA BALANCER",
544: "get() was not able to find a valid urlhash - total size = "
545: + size() + ", fileStack.size() = "
546: + urlFileStack.size()
547: + ", ramStack.size() = "
548: + urlRAMStack.size()
549: + ", domainStacks.size() = "
550: + domainStacks.size());
551: return null;
552: }
553:
554: // finally: check minimumDelta and if necessary force a sleep
555: long delta = lastAccessDelta(result);
556: assert delta >= 0 : "delta = " + delta;
557: int s = urlFileIndex.size();
558: kelondroRow.Entry rowEntry = urlFileIndex.remove(result
559: .getBytes(), false);
560: assert (rowEntry == null) || (urlFileIndex.size() + 1 == s) : "urlFileIndex.size() = "
561: + urlFileIndex.size()
562: + ", s = "
563: + s
564: + ", result = "
565: + result;
566: if (rowEntry == null) {
567: serverLog
568: .logSevere(
569: "PLASMA BALANCER",
570: "get() found a valid urlhash, but failed to fetch the corresponding url entry - total size = "
571: + size()
572: + ", fileStack.size() = "
573: + urlFileStack.size()
574: + ", ramStack.size() = "
575: + urlRAMStack.size()
576: + ", domainStacks.size() = "
577: + domainStacks.size());
578: return null;
579: } else {
580: assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = "
581: + urlFileIndex.size()
582: + ", s = "
583: + s
584: + ", result = " + result;
585: }
586: plasmaCrawlEntry crawlEntry = new plasmaCrawlEntry(rowEntry);
587: long minimumDelta = (crawlEntry.url().isLocal()) ? minimumLocalDelta
588: : minimumGlobalDelta;
589: plasmaCrawlRobotsTxt.Entry robotsEntry = plasmaSwitchboard.robots
590: .getEntry(crawlEntry.url().getHost());
591: Integer hostDelay = (robotsEntry == null) ? null : robotsEntry
592: .getCrawlDelay();
593: long genericDelta = ((robotsEntry == null) || (hostDelay == null)) ? minimumDelta
594: : Math.max(minimumDelta, hostDelay.intValue() * 1000);
595: genericDelta = Math.min(10000, genericDelta); // prevent that ta robots file can stop our indexer completely
596: if (delta < genericDelta) {
597: // force a busy waiting here
598: // in best case, this should never happen if the balancer works propertly
599: // this is only to protect against the worst case, where the crawler could
600: // behave in a DoS-manner
601: long sleeptime = genericDelta - delta;
602: try {
603: synchronized (this ) {
604: this .wait(sleeptime);
605: }
606: } catch (InterruptedException e) {
607: }
608: }
609:
610: // update statistical data
611: domaccess lastAccess = domainAccess.get(result.substring(6));
612: if (lastAccess == null)
613: lastAccess = new domaccess();
614: else
615: lastAccess.update();
616: domainAccess.put(result.substring(6), lastAccess);
617:
618: return crawlEntry;
619: }
620:
621: private long lastAccessDelta(String hash) {
622: assert hash != null;
623: domaccess lastAccess = domainAccess
624: .get((hash.length() > 6) ? hash.substring(6) : hash);
625: if (lastAccess == null)
626: return Long.MAX_VALUE; // never accessed
627: return System.currentTimeMillis() - lastAccess.time();
628: }
629:
630: public synchronized plasmaCrawlEntry top(int dist)
631: throws IOException {
632: // if we need to flush anything, then flush the domain stack first,
633: // to avoid that new urls get hidden by old entries from the file stack
634: if (urlRAMStack == null)
635: return null;
636: while ((domainStacksNotEmpty()) && (urlRAMStack.size() <= dist)) {
637: // flush only that much as we need to display
638: flushOnceDomStacks(0, true);
639: }
640: while ((urlFileStack != null) && (urlRAMStack.size() <= dist)
641: && (urlFileStack.size() > 0)) {
642: // flush some entries from disc to ram stack
643: try {
644: kelondroRow.Entry t = urlFileStack.pop();
645: if (t == null)
646: break;
647: urlRAMStack.add(new String(t.getColBytes(0)));
648: } catch (IOException e) {
649: break;
650: }
651: }
652: if (dist >= urlRAMStack.size())
653: return null;
654: String urlhash = (String) urlRAMStack.get(dist);
655: kelondroRow.Entry entry = urlFileIndex.get(urlhash.getBytes());
656: if (entry == null) {
657: if (kelondroAbstractRecords.debugmode)
658: serverLog.logWarning("PLASMA BALANCER",
659: "no entry in index for urlhash " + urlhash);
660: return null;
661: }
662: return new plasmaCrawlEntry(entry);
663: }
664:
665: public synchronized Iterator<plasmaCrawlEntry> iterator()
666: throws IOException {
667: return new EntryIterator();
668: }
669:
670: private class EntryIterator implements Iterator<plasmaCrawlEntry> {
671:
672: private Iterator<kelondroRow.Entry> rowIterator;
673:
674: public EntryIterator() throws IOException {
675: rowIterator = urlFileIndex.rows(true, null);
676: }
677:
678: public boolean hasNext() {
679: return (rowIterator == null) ? false : rowIterator
680: .hasNext();
681: }
682:
683: public plasmaCrawlEntry next() {
684: kelondroRow.Entry entry = (kelondroRow.Entry) rowIterator
685: .next();
686: try {
687: return (entry == null) ? null : new plasmaCrawlEntry(
688: entry);
689: } catch (IOException e) {
690: rowIterator = null;
691: return null;
692: }
693: }
694:
695: public void remove() {
696: if (rowIterator != null)
697: rowIterator.remove();
698: }
699:
700: }
701:
702: }
|