0001: // plasmaSearchEvent.java
0002: // (C) 2005 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
0003: // first published 10.10.2005 on http://yacy.net
0004: //
0005: // This is a part of YaCy, a peer-to-peer based web search engine
0006: //
0007: // $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
0008: // $LastChangedRevision: 1986 $
0009: // $LastChangedBy: orbiter $
0010: //
0011: // LICENSE
0012: //
0013: // This program is free software; you can redistribute it and/or modify
0014: // it under the terms of the GNU General Public License as published by
0015: // the Free Software Foundation; either version 2 of the License, or
0016: // (at your option) any later version.
0017: //
0018: // This program is distributed in the hope that it will be useful,
0019: // but WITHOUT ANY WARRANTY; without even the implied warranty of
0020: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
0021: // GNU General Public License for more details.
0022: //
0023: // You should have received a copy of the GNU General Public License
0024: // along with this program; if not, write to the Free Software
0025: // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0026:
0027: package de.anomic.plasma;
0028:
0029: import java.io.UnsupportedEncodingException;
0030: import java.util.ArrayList;
0031: import java.util.Date;
0032: import java.util.HashMap;
0033: import java.util.Iterator;
0034: import java.util.Map;
0035: import java.util.Set;
0036: import java.util.TreeMap;
0037: import java.util.TreeSet;
0038:
0039: import de.anomic.index.indexContainer;
0040: import de.anomic.index.indexRWIEntry;
0041: import de.anomic.index.indexURLEntry;
0042: import de.anomic.kelondro.kelondroBitfield;
0043: import de.anomic.kelondro.kelondroMSetTools;
0044: import de.anomic.plasma.plasmaSnippetCache.MediaSnippet;
0045: import de.anomic.server.serverProfiling;
0046: import de.anomic.server.logging.serverLog;
0047: import de.anomic.yacy.yacyCore;
0048: import de.anomic.yacy.yacyDHTAction;
0049: import de.anomic.yacy.yacySearch;
0050: import de.anomic.yacy.yacySeed;
0051: import de.anomic.yacy.yacyURL;
0052:
0053: public final class plasmaSearchEvent {
0054:
0055: public static final String COLLECTION = "collection";
0056: public static final String JOIN = "join";
0057: public static final String PRESORT = "presort";
0058: public static final String URLFETCH = "urlfetch";
0059: public static final String NORMALIZING = "normalizing";
0060:
0061: public static int workerThreadCount = 10;
0062: public static String lastEventID = "";
0063: private static HashMap<String, plasmaSearchEvent> lastEvents = new HashMap<String, plasmaSearchEvent>(); // a cache for objects from this class: re-use old search requests
0064: public static final long eventLifetime = 600000; // the time an event will stay in the cache, 10 Minutes
0065: private static final int max_results_preparation = 200;
0066:
0067: private long eventTime;
0068: private plasmaSearchQuery query;
0069: private plasmaWordIndex wordIndex;
0070: private plasmaSearchRankingProcess rankedCache; // ordered search results, grows dynamically as all the query threads enrich this container
0071: private Map<String, TreeMap<String, String>> rcAbstracts; // cache for index abstracts; word:TreeMap mapping where the embedded TreeMap is a urlhash:peerlist relation
0072: private yacySearch[] primarySearchThreads, secondarySearchThreads;
0073: private Thread localSearchThread;
0074: private TreeMap<String, String> preselectedPeerHashes;
0075: //private Object[] references;
0076: public TreeMap<String, String> IAResults;
0077: public TreeMap<String, Integer> IACount;
0078: public String IAmaxcounthash, IAneardhthash;
0079: private resultWorker[] workerThreads;
0080: private ArrayList<ResultEntry> resultList;
0081: //private int resultListLock; // a pointer that shows that all elements below this pointer are fixed and may not be changed again
0082: private HashMap<String, String> failedURLs; // a mapping from a urlhash to a fail reason string
0083: TreeSet<String> snippetFetchWordHashes; // a set of word hashes that are used to match with the snippets
0084: private long urlRetrievalAllTime;
0085: private long snippetComputationAllTime;
0086:
0087: @SuppressWarnings("unchecked")
0088: private plasmaSearchEvent(plasmaSearchQuery query,
0089: plasmaWordIndex wordIndex,
0090: TreeMap<String, String> preselectedPeerHashes,
0091: boolean generateAbstracts) {
0092: this .eventTime = System.currentTimeMillis(); // for lifetime check
0093: this .wordIndex = wordIndex;
0094: this .query = query;
0095: this .rcAbstracts = (query.queryHashes.size() > 1) ? new TreeMap<String, TreeMap<String, String>>()
0096: : null; // generate abstracts only for combined searches
0097: this .primarySearchThreads = null;
0098: this .secondarySearchThreads = null;
0099: this .preselectedPeerHashes = preselectedPeerHashes;
0100: this .IAResults = new TreeMap<String, String>();
0101: this .IACount = new TreeMap<String, Integer>();
0102: this .IAmaxcounthash = null;
0103: this .IAneardhthash = null;
0104: this .urlRetrievalAllTime = 0;
0105: this .snippetComputationAllTime = 0;
0106: this .workerThreads = null;
0107: this .localSearchThread = null;
0108: this .resultList = new ArrayList<ResultEntry>(10); // this is the result set which is filled up with search results, enriched with snippets
0109: //this.resultListLock = 0; // no locked elements until now
0110: this .failedURLs = new HashMap<String, String>(); // a map of urls to reason strings where a worker thread tried to work on, but failed.
0111:
0112: // snippets do not need to match with the complete query hashes,
0113: // only with the query minus the stopwords which had not been used for the search
0114: final TreeSet<String> filtered = kelondroMSetTools
0115: .joinConstructive(query.queryHashes,
0116: plasmaSwitchboard.stopwords);
0117: this .snippetFetchWordHashes = (TreeSet<String>) query.queryHashes
0118: .clone();
0119: if ((filtered != null) && (filtered.size() > 0)) {
0120: kelondroMSetTools.excludeDestructive(
0121: this .snippetFetchWordHashes,
0122: plasmaSwitchboard.stopwords);
0123: }
0124:
0125: long start = System.currentTimeMillis();
0126: if ((query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT)
0127: || (query.domType == plasmaSearchQuery.SEARCHDOM_CLUSTERALL)) {
0128: // do a global search
0129: this .rankedCache = new plasmaSearchRankingProcess(
0130: wordIndex, query, 2, max_results_preparation);
0131:
0132: int fetchpeers = 30;
0133:
0134: // the result of the fetch is then in the rcGlobal
0135: long timer = System.currentTimeMillis();
0136: serverLog.logFine("SEARCH_EVENT", "STARTING " + fetchpeers
0137: + " THREADS TO CATCH EACH "
0138: + query.displayResults() + " URLs");
0139: this .primarySearchThreads = yacySearch
0140: .primaryRemoteSearches(
0141: plasmaSearchQuery
0142: .hashSet2hashString(query.queryHashes),
0143: plasmaSearchQuery
0144: .hashSet2hashString(query.excludeHashes),
0145: "",
0146: query.prefer,
0147: query.urlMask,
0148: query.displayResults(),
0149: query.maxDistance,
0150: wordIndex,
0151: rankedCache,
0152: rcAbstracts,
0153: fetchpeers,
0154: plasmaSwitchboard.urlBlacklist,
0155: query.ranking,
0156: query.constraint,
0157: (query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT) ? null
0158: : preselectedPeerHashes);
0159: serverProfiling.update("SEARCH",
0160: new plasmaProfiling.searchEvent(query.id(true),
0161: "remote search thread start",
0162: this .primarySearchThreads.length, System
0163: .currentTimeMillis()
0164: - timer));
0165:
0166: // meanwhile do a local search
0167: localSearchThread = new localSearchProcess();
0168: localSearchThread.start();
0169:
0170: // finished searching
0171: serverLog
0172: .logFine(
0173: "SEARCH_EVENT",
0174: "SEARCH TIME AFTER GLOBAL-TRIGGER TO "
0175: + primarySearchThreads.length
0176: + " PEERS: "
0177: + ((System.currentTimeMillis() - start) / 1000)
0178: + " seconds");
0179: } else {
0180: // do a local search
0181: this .rankedCache = new plasmaSearchRankingProcess(
0182: wordIndex, query, 2, max_results_preparation);
0183: this .rankedCache.execQuery();
0184: //plasmaWordIndex.Finding finding = wordIndex.retrieveURLs(query, false, 2, ranking, process);
0185:
0186: if (generateAbstracts) {
0187: // compute index abstracts
0188: long timer = System.currentTimeMillis();
0189: Iterator<Map.Entry<String, indexContainer>> ci = this .rankedCache
0190: .searchContainerMaps()[0].entrySet().iterator();
0191: Map.Entry<String, indexContainer> entry;
0192: int maxcount = -1;
0193: double mindhtdistance = 1.1, d;
0194: String wordhash;
0195: while (ci.hasNext()) {
0196: entry = ci.next();
0197: wordhash = entry.getKey();
0198: indexContainer container = entry.getValue();
0199: assert (container.getWordHash().equals(wordhash));
0200: if (container.size() > maxcount) {
0201: IAmaxcounthash = wordhash;
0202: maxcount = container.size();
0203: }
0204: d = yacyDHTAction.dhtDistance(yacyCore.seedDB
0205: .mySeed().hash, wordhash);
0206: if (d < mindhtdistance) {
0207: // calculate the word hash that is closest to our dht position
0208: mindhtdistance = d;
0209: IAneardhthash = wordhash;
0210: }
0211: IACount
0212: .put(wordhash,
0213: new Integer(container.size()));
0214: IAResults.put(wordhash, indexContainer
0215: .compressIndex(container, null, 1000)
0216: .toString());
0217: }
0218: serverProfiling.update("SEARCH",
0219: new plasmaProfiling.searchEvent(query.id(true),
0220: "abstract generation", this .rankedCache
0221: .searchContainerMaps()[0]
0222: .size(), System
0223: .currentTimeMillis()
0224: - timer));
0225: }
0226:
0227: }
0228:
0229: if (query.onlineSnippetFetch) {
0230: // start worker threads to fetch urls and snippets
0231: this .workerThreads = new resultWorker[workerThreadCount];
0232: for (int i = 0; i < workerThreadCount; i++) {
0233: this .workerThreads[i] = new resultWorker(i, 10000);
0234: this .workerThreads[i].start();
0235: }
0236: } else {
0237: // prepare result vector directly without worker threads
0238: long timer = System.currentTimeMillis();
0239: indexURLEntry uentry;
0240: ResultEntry resultEntry;
0241: yacyURL url;
0242: synchronized (rankedCache) {
0243: while ((rankedCache.size() > 0)
0244: && ((uentry = rankedCache.bestURL(true)) != null)
0245: && (resultList.size() < (query.neededResults()))) {
0246: url = uentry.comp().url();
0247: if (url == null)
0248: continue;
0249: //System.out.println("***DEBUG*** SEARCH RESULT URL=" + url.toNormalform(false, false));
0250:
0251: resultEntry = obtainResultEntry(uentry,
0252: (snippetComputationAllTime < 300) ? 1 : 0);
0253: if (resultEntry == null)
0254: continue; // the entry had some problems, cannot be used
0255: urlRetrievalAllTime += resultEntry.dbRetrievalTime;
0256: snippetComputationAllTime += resultEntry.snippetComputationTime;
0257:
0258: // place the result to the result vector
0259: synchronized (resultList) {
0260: resultList.add(resultEntry);
0261: }
0262:
0263: // add references
0264: synchronized (rankedCache) {
0265: rankedCache.addReferences(resultEntry);
0266: }
0267: }
0268: }
0269: serverProfiling.update("SEARCH",
0270: new plasmaProfiling.searchEvent(query.id(true),
0271: "offline snippet fetch", resultList.size(),
0272: System.currentTimeMillis() - timer));
0273: }
0274:
0275: // clean up events
0276: cleanupEvents(false);
0277:
0278: // store this search to a cache so it can be re-used
0279: lastEvents.put(query.id(false), this );
0280: lastEventID = query.id(false);
0281: }
0282:
0283: private class localSearchProcess extends Thread {
0284:
0285: public localSearchProcess() {
0286: }
0287:
0288: public void run() {
0289: // do a local search
0290:
0291: // sort the local containers and truncate it to a limited count,
0292: // so following sortings together with the global results will be fast
0293: synchronized (rankedCache) {
0294: rankedCache.execQuery();
0295: }
0296: }
0297: }
0298:
0299: public static void cleanupEvents(boolean all) {
0300: // remove old events in the event cache
0301: Iterator<plasmaSearchEvent> i = lastEvents.values().iterator();
0302: plasmaSearchEvent cleanEvent;
0303: while (i.hasNext()) {
0304: cleanEvent = i.next();
0305: if ((all)
0306: || (cleanEvent.eventTime + eventLifetime < System
0307: .currentTimeMillis())) {
0308: // execute deletion of failed words
0309: Set<String> removeWords = cleanEvent.query.queryHashes;
0310: removeWords.addAll(cleanEvent.query.excludeHashes);
0311: cleanEvent.wordIndex.removeEntriesMultiple(removeWords,
0312: cleanEvent.failedURLs.keySet());
0313: serverLog.logInfo("SearchEvents", "cleaning up event "
0314: + cleanEvent.query.id(true) + ", removed "
0315: + cleanEvent.failedURLs.size()
0316: + " URL references on " + removeWords.size()
0317: + " words");
0318:
0319: // remove the event
0320: i.remove();
0321: }
0322: }
0323: }
0324:
0325: private ResultEntry obtainResultEntry(indexURLEntry page,
0326: int snippetFetchMode) {
0327:
0328: // a search result entry needs some work to produce a result Entry:
0329: // - check if url entry exists in LURL-db
0330: // - check exclusions, constraints, masks, media-domains
0331: // - load snippet (see if page exists) and check if snippet contains searched word
0332:
0333: // Snippet Fetching can has 3 modes:
0334: // 0 - do not fetch snippets
0335: // 1 - fetch snippets offline only
0336: // 2 - online snippet fetch
0337:
0338: // load only urls if there was not yet a root url of that hash
0339: // find the url entry
0340:
0341: long startTime = System.currentTimeMillis();
0342: indexURLEntry.Components comp = page.comp();
0343: String pagetitle = comp.dc_title().toLowerCase();
0344: if (comp.url() == null) {
0345: registerFailure(page.hash(), "url corrupted (null)");
0346: return null; // rare case where the url is corrupted
0347: }
0348: String pageurl = comp.url().toString().toLowerCase();
0349: String pageauthor = comp.dc_creator().toLowerCase();
0350: long dbRetrievalTime = System.currentTimeMillis() - startTime;
0351:
0352: // check exclusion
0353: if ((plasmaSearchQuery.matches(pagetitle, query.excludeHashes))
0354: || (plasmaSearchQuery.matches(pageurl,
0355: query.excludeHashes))
0356: || (plasmaSearchQuery.matches(pageauthor,
0357: query.excludeHashes))) {
0358: return null;
0359: }
0360:
0361: // check url mask
0362: if (!(pageurl.matches(query.urlMask))) {
0363: return null;
0364: }
0365:
0366: // check constraints
0367: if ((query.constraint != null)
0368: && (query.constraint
0369: .get(plasmaCondenser.flag_cat_indexof))
0370: && (!(comp.dc_title().startsWith("Index of")))) {
0371: final Iterator<String> wi = query.queryHashes.iterator();
0372: while (wi.hasNext())
0373: wordIndex.removeEntry((String) wi.next(), page.hash());
0374: registerFailure(page.hash(),
0375: "index-of constraint not fullfilled");
0376: return null;
0377: }
0378:
0379: if ((query.contentdom == plasmaSearchQuery.CONTENTDOM_AUDIO)
0380: && (page.laudio() == 0)) {
0381: registerFailure(page.hash(),
0382: "contentdom-audio constraint not fullfilled");
0383: return null;
0384: }
0385: if ((query.contentdom == plasmaSearchQuery.CONTENTDOM_VIDEO)
0386: && (page.lvideo() == 0)) {
0387: registerFailure(page.hash(),
0388: "contentdom-video constraint not fullfilled");
0389: return null;
0390: }
0391: if ((query.contentdom == plasmaSearchQuery.CONTENTDOM_IMAGE)
0392: && (page.limage() == 0)) {
0393: registerFailure(page.hash(),
0394: "contentdom-image constraint not fullfilled");
0395: return null;
0396: }
0397: if ((query.contentdom == plasmaSearchQuery.CONTENTDOM_APP)
0398: && (page.lapp() == 0)) {
0399: registerFailure(page.hash(),
0400: "contentdom-app constraint not fullfilled");
0401: return null;
0402: }
0403:
0404: if (snippetFetchMode == 0) {
0405: return new ResultEntry(page, wordIndex, null, null,
0406: dbRetrievalTime, 0); // result without snippet
0407: }
0408:
0409: // load snippet
0410: if (query.contentdom == plasmaSearchQuery.CONTENTDOM_TEXT) {
0411: // attach text snippet
0412: startTime = System.currentTimeMillis();
0413: plasmaSnippetCache.TextSnippet snippet = plasmaSnippetCache
0414: .retrieveTextSnippet(
0415: comp,
0416: snippetFetchWordHashes,
0417: (snippetFetchMode == 2),
0418: ((query.constraint != null) && (query.constraint
0419: .get(plasmaCondenser.flag_cat_indexof))),
0420: 180, 3000,
0421: (snippetFetchMode == 2) ? Integer.MAX_VALUE
0422: : 100000);
0423: long snippetComputationTime = System.currentTimeMillis()
0424: - startTime;
0425: serverLog
0426: .logInfo(
0427: "SEARCH_EVENT",
0428: "text snippet load time for "
0429: + comp.url()
0430: + ": "
0431: + snippetComputationTime
0432: + ", "
0433: + ((snippet.getErrorCode() < 11) ? "snippet found"
0434: : ("no snippet found ("
0435: + snippet
0436: .getError() + ")")));
0437:
0438: if (snippet.getErrorCode() < 11) {
0439: // we loaded the file and found the snippet
0440: return new ResultEntry(page, wordIndex, snippet, null,
0441: dbRetrievalTime, snippetComputationTime); // result with snippet attached
0442: } else if (snippetFetchMode == 1) {
0443: // we did not demand online loading, therefore a failure does not mean that the missing snippet causes a rejection of this result
0444: // this may happen during a remote search, because snippet loading is omitted to retrieve results faster
0445: return new ResultEntry(page, wordIndex, null, null,
0446: dbRetrievalTime, snippetComputationTime); // result without snippet
0447: } else {
0448: // problems with snippet fetch
0449: registerFailure(page.hash(), "no text snippet for URL "
0450: + comp.url());
0451: plasmaSnippetCache.failConsequences(snippet, query
0452: .id(false));
0453: return null;
0454: }
0455: } else {
0456: // attach media information
0457: startTime = System.currentTimeMillis();
0458: ArrayList<MediaSnippet> mediaSnippets = plasmaSnippetCache
0459: .retrieveMediaSnippets(comp.url(),
0460: snippetFetchWordHashes, query.contentdom,
0461: (snippetFetchMode == 2), 6000);
0462: long snippetComputationTime = System.currentTimeMillis()
0463: - startTime;
0464: serverLog.logInfo("SEARCH_EVENT",
0465: "media snippet load time for " + comp.url() + ": "
0466: + snippetComputationTime);
0467:
0468: if ((mediaSnippets != null) && (mediaSnippets.size() > 0)) {
0469: // found media snippets, return entry
0470: return new ResultEntry(page, wordIndex, null,
0471: mediaSnippets, dbRetrievalTime,
0472: snippetComputationTime);
0473: } else if (snippetFetchMode == 1) {
0474: return new ResultEntry(page, wordIndex, null, null,
0475: dbRetrievalTime, snippetComputationTime);
0476: } else {
0477: // problems with snippet fetch
0478: registerFailure(page.hash(),
0479: "no media snippet for URL " + comp.url());
0480: return null;
0481: }
0482: }
0483: // finished, no more actions possible here
0484: }
0485:
0486: private boolean anyWorkerAlive() {
0487: if (this .workerThreads == null)
0488: return false;
0489: for (int i = 0; i < workerThreadCount; i++) {
0490: if ((this .workerThreads[i] != null)
0491: && (this .workerThreads[i].isAlive())
0492: && (this .workerThreads[i].busytime() < 3000))
0493: return true;
0494: }
0495: return false;
0496: }
0497:
0498: private boolean anyRemoteSearchAlive() {
0499: // check primary search threads
0500: if ((this .primarySearchThreads != null)
0501: && (this .primarySearchThreads.length != 0)) {
0502: for (int i = 0; i < this .primarySearchThreads.length; i++) {
0503: if ((this .primarySearchThreads[i] != null)
0504: && (this .primarySearchThreads[i].isAlive()))
0505: return true;
0506: }
0507: }
0508: // maybe a secondary search thread is alive, check this
0509: if ((this .secondarySearchThreads != null)
0510: && (this .secondarySearchThreads.length != 0)) {
0511: for (int i = 0; i < this .secondarySearchThreads.length; i++) {
0512: if ((this .secondarySearchThreads[i] != null)
0513: && (this .secondarySearchThreads[i].isAlive()))
0514: return true;
0515: }
0516: }
0517: return false;
0518: }
0519:
0520: private int countFinishedRemoteSearch() {
0521: int count = 0;
0522: // check only primary search threads
0523: if ((this .primarySearchThreads != null)
0524: && (this .primarySearchThreads.length != 0)) {
0525: for (int i = 0; i < this .primarySearchThreads.length; i++) {
0526: if ((this .primarySearchThreads[i] == null)
0527: || (!(this .primarySearchThreads[i].isAlive())))
0528: count++;
0529: }
0530: }
0531: return count;
0532: }
0533:
0534: public plasmaSearchQuery getQuery() {
0535: return query;
0536: }
0537:
0538: public yacySearch[] getPrimarySearchThreads() {
0539: return primarySearchThreads;
0540: }
0541:
0542: public yacySearch[] getSecondarySearchThreads() {
0543: return secondarySearchThreads;
0544: }
0545:
0546: public plasmaSearchRankingProcess getRankingResult() {
0547: return this .rankedCache;
0548: }
0549:
0550: public long getURLRetrievalTime() {
0551: return this .urlRetrievalAllTime;
0552: }
0553:
0554: public long getSnippetComputationTime() {
0555: return this .snippetComputationAllTime;
0556: }
0557:
0558: public static plasmaSearchEvent getEvent(String eventID) {
0559: synchronized (lastEvents) {
0560: return (plasmaSearchEvent) lastEvents.get(eventID);
0561: }
0562: }
0563:
0564: public static plasmaSearchEvent getEvent(plasmaSearchQuery query,
0565: plasmaSearchRankingProfile ranking,
0566: plasmaWordIndex wordIndex,
0567: TreeMap<String, String> preselectedPeerHashes,
0568: boolean generateAbstracts) {
0569: synchronized (lastEvents) {
0570: plasmaSearchEvent event = (plasmaSearchEvent) lastEvents
0571: .get(query.id(false));
0572: if (event == null) {
0573: event = new plasmaSearchEvent(query, wordIndex,
0574: preselectedPeerHashes, generateAbstracts);
0575: } else {
0576: //re-new the event time for this event, so it is not deleted next time too early
0577: event.eventTime = System.currentTimeMillis();
0578: // replace the query, because this contains the current result offset
0579: event.query = query;
0580: }
0581:
0582: // if worker threads had been alive, but did not succeed, start them again to fetch missing links
0583: if ((query.onlineSnippetFetch)
0584: && (!event.anyWorkerAlive())
0585: && (event.resultList.size() < query.neededResults() + 10)
0586: && ((event.getRankingResult()
0587: .getLocalResourceSize() + event
0588: .getRankingResult().getRemoteResourceSize()) > event.resultList
0589: .size())) {
0590: // set new timeout
0591: event.eventTime = System.currentTimeMillis();
0592: // start worker threads to fetch urls and snippets
0593: event.workerThreads = new resultWorker[workerThreadCount];
0594: for (int i = 0; i < workerThreadCount; i++) {
0595: event.workerThreads[i] = event.deployWorker(i,
0596: 10000);
0597: }
0598: }
0599:
0600: return event;
0601: }
0602:
0603: }
0604:
0605: private resultWorker deployWorker(int id, long lifetime) {
0606: resultWorker worker = new resultWorker(id, lifetime);
0607: worker.start();
0608: return worker;
0609: }
0610:
0611: private class resultWorker extends Thread {
0612:
0613: private long timeout; // the date until this thread should try to work
0614: private long lastLifeSign; // when the last time the run()-loop was executed
0615: private int id;
0616:
0617: public resultWorker(int id, long maxlifetime) {
0618: this .id = id;
0619: this .lastLifeSign = System.currentTimeMillis();
0620: this .timeout = System.currentTimeMillis()
0621: + Math.max(1000, maxlifetime);
0622: //this.sleeptime = Math.min(300, maxlifetime / 10 * id);
0623: }
0624:
0625: public void run() {
0626:
0627: // start fetching urls and snippets
0628: indexURLEntry page;
0629: while (System.currentTimeMillis() < this .timeout) {
0630: this .lastLifeSign = System.currentTimeMillis();
0631:
0632: if (resultList.size() >= query.neededResults() /*+ query.displayResults()*/)
0633: break; // we have enough
0634:
0635: // get next entry
0636: page = rankedCache.bestURL(true);
0637: if (page == null) {
0638: if (!anyRemoteSearchAlive())
0639: break; // we cannot expect more results
0640: // if we did not get another entry, sleep some time and try again
0641: try {
0642: Thread.sleep(100);
0643: } catch (InterruptedException e1) {
0644: }
0645: continue;
0646: }
0647: if (anyResultWith(page.hash()))
0648: continue;
0649: if (anyFailureWith(page.hash()))
0650: continue;
0651:
0652: // try secondary search
0653: prepareSecondarySearch(); // will be executed only once
0654:
0655: ResultEntry resultEntry = obtainResultEntry(page, 2);
0656: if (resultEntry == null)
0657: continue; // the entry had some problems, cannot be used
0658: urlRetrievalAllTime += resultEntry.dbRetrievalTime;
0659: snippetComputationAllTime += resultEntry.snippetComputationTime;
0660: //System.out.println("+++DEBUG-resultWorker+++ fetched " + resultEntry.urlstring());
0661:
0662: // place the result to the result vector
0663: boolean d = false;
0664: synchronized (resultList) {
0665: doublecheck: for (int i = 0; i < resultList.size(); i++) {
0666: if (resultList.get(i).urlcomps.url().hash()
0667: .equals(
0668: resultEntry.urlcomps.url()
0669: .hash())) {
0670: d = true;
0671: break doublecheck;
0672: }
0673: }
0674: if (!d) {
0675: resultList.add(resultEntry);
0676: }
0677: }
0678:
0679: // add references
0680: if (!d)
0681: synchronized (rankedCache) {
0682: rankedCache.addReferences(resultEntry);
0683: }
0684: //System.out.println("DEBUG SNIPPET_LOADING: thread " + id + " got " + resultEntry.url());
0685: }
0686: serverLog.logInfo("SEARCH", "resultWorker thread " + id
0687: + " terminated");
0688: }
0689:
0690: private boolean anyResultWith(String urlhash) {
0691: for (int i = 0; i < resultList.size(); i++) {
0692: if (((ResultEntry) resultList.get(i)).urlentry.hash()
0693: .equals(urlhash))
0694: return true;
0695: }
0696: return false;
0697: }
0698:
0699: private boolean anyFailureWith(String urlhash) {
0700: return (failedURLs.get(urlhash) != null);
0701: }
0702:
0703: public long busytime() {
0704: return System.currentTimeMillis() - this .lastLifeSign;
0705: }
0706: }
0707:
0708: private void registerFailure(String urlhash, String reason) {
0709: this .failedURLs.put(urlhash, reason);
0710: serverLog.logInfo("search", "sorted out hash " + urlhash
0711: + " during search: " + reason);
0712: }
0713:
0714: public ResultEntry oneResult(int item) {
0715: // first sleep a while to give accumulation threads a chance to work
0716: if ((query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT)
0717: || (query.domType == plasmaSearchQuery.SEARCHDOM_CLUSTERALL)) {
0718: // this is a search using remote search threads. Also the local search thread is started as background process
0719: if ((localSearchThread != null)
0720: && (localSearchThread.isAlive())) {
0721: // in case that the local search takes longer than some other remote search requests,
0722: // do some sleeps to give the local process a chance to contribute
0723: try {
0724: Thread.sleep(200);
0725: } catch (InterruptedException e) {
0726: }
0727: }
0728: // now wait until as many remote worker threads have finished, as we want to display results
0729: while ((this .primarySearchThreads != null)
0730: && (this .primarySearchThreads.length > item)
0731: && (anyWorkerAlive())
0732: && ((this .resultList.size() <= item) || (countFinishedRemoteSearch() <= item))) {
0733: try {
0734: Thread.sleep(100);
0735: } catch (InterruptedException e) {
0736: }
0737: }
0738: // finally wait until enough results are there produced from the snippet fetch process
0739: while ((anyWorkerAlive())
0740: && (this .resultList.size() <= item)) {
0741: try {
0742: Thread.sleep(100);
0743: } catch (InterruptedException e) {
0744: }
0745: }
0746: } else {
0747: // we did a local search. If we arrive here, the local search process was finished
0748: // and the only things we need to wait for are snippets from snippet fetch processes
0749: while ((anyWorkerAlive())
0750: && (this .resultList.size() <= item)) {
0751: try {
0752: Thread.sleep(100);
0753: } catch (InterruptedException e) {
0754: }
0755: }
0756: }
0757:
0758: // finally, if there is something, return the result
0759: synchronized (this .resultList) {
0760: // check if we have enough entries
0761: if (this .resultList.size() <= item)
0762: return null;
0763:
0764: // fetch the best entry from the resultList, not the entry from item position
0765: // whenever a specific entry was switched in its position and was returned here
0766: // a moving pointer is set to assign that item position as not changeable
0767: int bestpick = item; //postRankingFavourite(item);
0768: if (bestpick != item) {
0769: // switch the elements
0770: ResultEntry buf = (ResultEntry) this .resultList
0771: .get(bestpick);
0772: serverLog.logInfo("SEARCH_POSTRANKING", "prefering ["
0773: + bestpick
0774: + "] "
0775: + buf.urlstring()
0776: + " over ["
0777: + item
0778: + "] "
0779: + ((ResultEntry) this .resultList.get(item))
0780: .urlstring());
0781: this .resultList.set(bestpick,
0782: (ResultEntry) this .resultList.get(item));
0783: this .resultList.set(item, buf);
0784: }
0785:
0786: //this.resultListLock = item; // lock the element; be prepared to return it
0787: return (ResultEntry) this .resultList.get(item);
0788: }
0789: }
0790:
0791: public ArrayList<ResultEntry> completeResults(long waitingtime) {
0792: long timeout = System.currentTimeMillis() + waitingtime;
0793: while ((this .resultList.size() < query.neededResults())
0794: && (anyWorkerAlive())
0795: && (System.currentTimeMillis() < timeout)) {
0796: try {
0797: Thread.sleep(100);
0798: } catch (InterruptedException e) {
0799: }
0800: //System.out.println("+++DEBUG-completeResults+++ sleeping " + 200);
0801: }
0802: return this .resultList;
0803: }
0804:
0805: boolean secondarySearchStartet = false;
0806:
0807: private void prepareSecondarySearch() {
0808: if (secondarySearchStartet)
0809: return; // don't do this twice
0810:
0811: if ((rcAbstracts == null)
0812: || (rcAbstracts.size() != query.queryHashes.size()))
0813: return; // secondary search not possible (yet)
0814: this .secondarySearchStartet = true;
0815:
0816: /*
0817: // catch up index abstracts and join them; then call peers again to submit their urls
0818: System.out.println("DEBUG-INDEXABSTRACT: " + rcAbstracts.size() + " word references catched, " + query.queryHashes.size() + " needed");
0819:
0820: Iterator i = rcAbstracts.entrySet().iterator();
0821: Map.Entry entry;
0822: while (i.hasNext()) {
0823: entry = (Map.Entry) i.next();
0824: System.out.println("DEBUG-INDEXABSTRACT: hash " + (String) entry.getKey() + ": " + ((query.queryHashes.contains((String) entry.getKey())) ? "NEEDED" : "NOT NEEDED") + "; " + ((TreeMap) entry.getValue()).size() + " entries");
0825: }
0826: */
0827: TreeMap<String, String> abstractJoin = (rcAbstracts.size() == query.queryHashes
0828: .size()) ? kelondroMSetTools.joinConstructive(
0829: rcAbstracts.values(), true)
0830: : new TreeMap<String, String>();
0831: if (abstractJoin.size() == 0) {
0832: //System.out.println("DEBUG-INDEXABSTRACT: no success using index abstracts from remote peers");
0833: } else {
0834: //System.out.println("DEBUG-INDEXABSTRACT: index abstracts delivered " + abstractJoin.size() + " additional results for secondary search");
0835: // generate query for secondary search
0836: TreeMap<String, String> secondarySearchURLs = new TreeMap<String, String>(); // a (peerhash:urlhash-liststring) mapping
0837: Iterator<Map.Entry<String, String>> i1 = abstractJoin
0838: .entrySet().iterator();
0839: Map.Entry<String, String> entry1;
0840: String url, urls, peer, peers;
0841: String mypeerhash = yacyCore.seedDB.mySeed().hash;
0842: boolean mypeerinvolved = false;
0843: int mypeercount;
0844: while (i1.hasNext()) {
0845: entry1 = i1.next();
0846: url = entry1.getKey();
0847: peers = entry1.getValue();
0848: //System.out.println("DEBUG-INDEXABSTRACT: url " + url + ": from peers " + peers);
0849: mypeercount = 0;
0850: for (int j = 0; j < peers.length(); j = j + 12) {
0851: peer = peers.substring(j, j + 12);
0852: if ((peer.equals(mypeerhash))
0853: && (mypeercount++ > 1))
0854: continue;
0855: //if (peers.indexOf(peer) < j) continue; // avoid doubles that may appear in the abstractJoin
0856: urls = (String) secondarySearchURLs.get(peer);
0857: urls = (urls == null) ? url : urls + url;
0858: secondarySearchURLs.put(peer, urls);
0859: }
0860: if (mypeercount == 1)
0861: mypeerinvolved = true;
0862: }
0863:
0864: // compute words for secondary search and start the secondary searches
0865: i1 = secondarySearchURLs.entrySet().iterator();
0866: String words;
0867: secondarySearchThreads = new yacySearch[(mypeerinvolved) ? secondarySearchURLs
0868: .size() - 1
0869: : secondarySearchURLs.size()];
0870: int c = 0;
0871: while (i1.hasNext()) {
0872: entry1 = i1.next();
0873: peer = entry1.getKey();
0874: if (peer.equals(mypeerhash))
0875: continue; // we dont need to ask ourself
0876: urls = (String) entry1.getValue();
0877: words = wordsFromPeer(peer, urls);
0878: //System.out.println("DEBUG-INDEXABSTRACT ***: peer " + peer + " has urls: " + urls);
0879: //System.out.println("DEBUG-INDEXABSTRACT ***: peer " + peer + " from words: " + words);
0880: secondarySearchThreads[c++] = yacySearch
0881: .secondaryRemoteSearch(words, "", urls,
0882: wordIndex, this .rankedCache, peer,
0883: plasmaSwitchboard.urlBlacklist,
0884: query.ranking, query.constraint,
0885: preselectedPeerHashes);
0886:
0887: }
0888: }
0889: }
0890:
0891: private String wordsFromPeer(String peerhash, String urls) {
0892: Map.Entry<String, TreeMap<String, String>> entry;
0893: String word, peerlist, url, wordlist = "";
0894: TreeMap<String, String> urlPeerlist;
0895: int p;
0896: boolean hasURL;
0897: synchronized (rcAbstracts) {
0898: Iterator<Map.Entry<String, TreeMap<String, String>>> i = rcAbstracts
0899: .entrySet().iterator();
0900: while (i.hasNext()) {
0901: entry = i.next();
0902: word = entry.getKey();
0903: urlPeerlist = entry.getValue();
0904: hasURL = true;
0905: for (int j = 0; j < urls.length(); j = j + 12) {
0906: url = urls.substring(j, j + 12);
0907: peerlist = (String) urlPeerlist.get(url);
0908: p = (peerlist == null) ? -1 : peerlist
0909: .indexOf(peerhash);
0910: if ((p < 0) || (p % 12 != 0)) {
0911: hasURL = false;
0912: break;
0913: }
0914: }
0915: if (hasURL)
0916: wordlist += word;
0917: }
0918: }
0919: return wordlist;
0920: }
0921:
0922: public void remove(String urlhash) {
0923: // removes the url hash reference from last search result
0924: /*indexRWIEntry e =*/this .rankedCache.remove(urlhash);
0925: //assert e != null;
0926: }
0927:
0928: public Set<String> references(int count) {
0929: // returns a set of words that are computed as toplist
0930: return this .rankedCache.getReferences(count);
0931: }
0932:
0933: public static class ResultEntry {
0934: // payload objects
0935: private indexURLEntry urlentry;
0936: private indexURLEntry.Components urlcomps; // buffer for components
0937: private String alternative_urlstring;
0938: private String alternative_urlname;
0939: private plasmaSnippetCache.TextSnippet textSnippet;
0940: private ArrayList<plasmaSnippetCache.MediaSnippet> mediaSnippets;
0941:
0942: // statistic objects
0943: public long dbRetrievalTime, snippetComputationTime;
0944:
0945: public ResultEntry(
0946: indexURLEntry urlentry,
0947: plasmaWordIndex wordIndex,
0948: plasmaSnippetCache.TextSnippet textSnippet,
0949: ArrayList<plasmaSnippetCache.MediaSnippet> mediaSnippets,
0950: long dbRetrievalTime, long snippetComputationTime) {
0951: this .urlentry = urlentry;
0952: this .urlcomps = urlentry.comp();
0953: this .alternative_urlstring = null;
0954: this .alternative_urlname = null;
0955: this .textSnippet = textSnippet;
0956: this .mediaSnippets = mediaSnippets;
0957: this .dbRetrievalTime = dbRetrievalTime;
0958: this .snippetComputationTime = snippetComputationTime;
0959: String host = urlcomps.url().getHost();
0960: if (host.endsWith(".yacyh")) {
0961: // translate host into current IP
0962: int p = host.indexOf(".");
0963: String hash = yacySeed.hexHash2b64Hash(host.substring(
0964: p + 1, host.length() - 6));
0965: yacySeed seed = yacyCore.seedDB.getConnected(hash);
0966: String filename = urlcomps.url().getFile();
0967: String address = null;
0968: if ((seed == null)
0969: || ((address = seed.getPublicAddress()) == null)) {
0970: // seed is not known from here
0971: try {
0972: wordIndex
0973: .removeWordReferences(
0974: plasmaCondenser
0975: .getWords(
0976: ("yacyshare "
0977: + filename
0978: .replace(
0979: '?',
0980: ' ')
0981: + " " + urlcomps
0982: .dc_title())
0983: .getBytes(),
0984: "UTF-8")
0985: .keySet(), urlentry
0986: .hash());
0987: wordIndex.loadedURL.remove(urlentry.hash()); // clean up
0988: throw new RuntimeException("index void");
0989: } catch (UnsupportedEncodingException e) {
0990: throw new RuntimeException("parser failed: "
0991: + e.getMessage());
0992: }
0993: }
0994: alternative_urlstring = "http://" + address + "/"
0995: + host.substring(0, p) + filename;
0996: alternative_urlname = "http://share." + seed.getName()
0997: + ".yacy" + filename;
0998: if ((p = alternative_urlname.indexOf("?")) > 0)
0999: alternative_urlname = alternative_urlname
1000: .substring(0, p);
1001: }
1002: }
1003:
1004: public String hash() {
1005: return urlentry.hash();
1006: }
1007:
1008: public yacyURL url() {
1009: return urlcomps.url();
1010: }
1011:
1012: public kelondroBitfield flags() {
1013: return urlentry.flags();
1014: }
1015:
1016: public String urlstring() {
1017: return (alternative_urlstring == null) ? urlcomps.url()
1018: .toNormalform(false, true) : alternative_urlstring;
1019: }
1020:
1021: public String urlname() {
1022: return (alternative_urlname == null) ? urlcomps.url()
1023: .toNormalform(false, true) : alternative_urlname;
1024: }
1025:
1026: public String title() {
1027: return urlcomps.dc_title();
1028: }
1029:
1030: public plasmaSnippetCache.TextSnippet textSnippet() {
1031: return this .textSnippet;
1032: }
1033:
1034: public ArrayList<plasmaSnippetCache.MediaSnippet> mediaSnippets() {
1035: return this .mediaSnippets;
1036: }
1037:
1038: public Date modified() {
1039: return urlentry.moddate();
1040: }
1041:
1042: public int filesize() {
1043: return urlentry.size();
1044: }
1045:
1046: public int limage() {
1047: return urlentry.limage();
1048: }
1049:
1050: public int laudio() {
1051: return urlentry.laudio();
1052: }
1053:
1054: public int lvideo() {
1055: return urlentry.lvideo();
1056: }
1057:
1058: public int lapp() {
1059: return urlentry.lapp();
1060: }
1061:
1062: public indexRWIEntry word() {
1063: return urlentry.word();
1064: }
1065:
1066: public boolean hasTextSnippet() {
1067: return (this .textSnippet != null)
1068: && (this .textSnippet.getErrorCode() < 11);
1069: }
1070:
1071: public boolean hasMediaSnippets() {
1072: return (this .mediaSnippets != null)
1073: && (this .mediaSnippets.size() > 0);
1074: }
1075:
1076: public String resource() {
1077: // generate transport resource
1078: if ((textSnippet != null) && (textSnippet.exists())) {
1079: return urlentry.toString(textSnippet.getLineRaw());
1080: } else {
1081: return urlentry.toString();
1082: }
1083: }
1084: }
1085: }
|