001: // plasmaDHTChunk.java
002: // ------------------------------
003: // part of YaCy
004: // (C) by Michael Peter Christen; mc@anomic.de
005: // first published on http://www.anomic.de
006: // Frankfurt, Germany, 2006
007: // created: 18.02.2006
008: //
009: // $LastChangedDate: 2008-01-30 00:15:43 +0000 (Mi, 30 Jan 2008) $
010: // $LastChangedRevision: 4419 $
011: // $LastChangedBy: orbiter $
012: //
013: // This program is free software; you can redistribute it and/or modify
014: // it under the terms of the GNU General Public License as published by
015: // the Free Software Foundation; either version 2 of the License, or
016: // (at your option) any later version.
017: //
018: // This program is distributed in the hope that it will be useful,
019: // but WITHOUT ANY WARRANTY; without even the implied warranty of
020: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
021: // GNU General Public License for more details.
022: //
023: // You should have received a copy of the GNU General Public License
024: // along with this program; if not, write to the Free Software
025: // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
026: //
027: // Using this software in any meaning (reading, learning, copying, compiling,
028: // running) means that you agree that the Author(s) is (are) not responsible
029: // for cost, loss of data or any harm that may be caused directly or indirectly
030: // by usage of this softare or this documentation. The usage of this software
031: // is on your own risk. The installation and usage (starting/running) of this
032: // software may allow other people or application to access your computer and
033: // any attached devices and is highly dependent on the configuration of the
034: // software which must be done by the user of the software; the author(s) is
035: // (are) also not responsible for proper configuration and usage of the
036: // software, even if provoked by documentation provided together with
037: // the software.
038: //
039: // Any changes to this file according to the GPL as documented in the file
040: // gpl.txt aside this file in the shipment you received can be done to the
041: // lines that follows this copyright notice here, but changes must not be
042: // done inside the copyright notive above. A re-distribution must contain
043: // the intact and unchanged copyright notice.
044: // Contributions and changes to the program code must be marked as such.
045:
046: package de.anomic.plasma;
047:
048: import java.util.ArrayList;
049: import java.util.HashMap;
050: import java.util.HashSet;
051: import java.util.Iterator;
052:
053: import de.anomic.index.indexContainer;
054: import de.anomic.index.indexRWIEntry;
055: import de.anomic.index.indexRWIRowEntry;
056: import de.anomic.index.indexURLEntry;
057: import de.anomic.kelondro.kelondroBase64Order;
058: import de.anomic.kelondro.kelondroException;
059: import de.anomic.server.serverCodings;
060: import de.anomic.server.logging.serverLog;
061: import de.anomic.yacy.yacyCore;
062: import de.anomic.yacy.yacyDHTAction;
063: import de.anomic.yacy.yacySeedDB;
064:
065: public class plasmaDHTChunk {
066:
067: public static final int chunkStatus_UNDEFINED = -1;
068: public static final int chunkStatus_FAILED = 0;
069: public static final int chunkStatus_FILLED = 1;
070: public static final int chunkStatus_RUNNING = 2;
071: public static final int chunkStatus_INTERRUPTED = 3;
072: public static final int chunkStatus_COMPLETE = 4;
073:
074: public static final int peerRedundancy = 3;
075:
076: private plasmaWordIndex wordIndex;
077: private serverLog log;
078:
079: private int status = chunkStatus_UNDEFINED;
080: private String startPointHash = "AAAAAAAAAAAA";
081: private indexContainer[] indexContainers = null;
082: private HashMap<String, indexURLEntry> urlCache; // String (url-hash) / plasmaCrawlLURL.Entry
083: private int idxCount;
084:
085: private long selectionStartTime = 0;
086: private long selectionEndTime = 0;
087:
088: private int transferFailedCounter = 0;
089:
090: public indexContainer firstContainer() {
091: return indexContainers[0];
092: }
093:
094: public indexContainer lastContainer() {
095: return indexContainers[indexContainers.length - 1];
096: }
097:
098: public indexContainer[] containers() {
099: return indexContainers;
100: }
101:
102: public int containerSize() {
103: return indexContainers.length;
104: }
105:
106: public int indexCount() {
107: return this .idxCount;
108: }
109:
110: private int indexCounter() {
111: int c = 0;
112: for (int i = 0; i < indexContainers.length; i++) {
113: c += indexContainers[i].size();
114: }
115: return c;
116: }
117:
118: public HashMap<String, indexURLEntry> urlCacheMap() {
119: return urlCache;
120: }
121:
122: public void setStatus(int newStatus) {
123: this .status = newStatus;
124: }
125:
126: public int getStatus() {
127: return this .status;
128: }
129:
130: public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex,
131: int minCount, int maxCount, int maxtime) {
132: try {
133: this .log = log;
134: this .wordIndex = wordIndex;
135: this .startPointHash = selectTransferStart();
136: log
137: .logFine("Selected hash "
138: + this .startPointHash
139: + " as start point for index distribution, distance = "
140: + yacyDHTAction
141: .dhtDistance(yacyCore.seedDB
142: .mySeed().hash,
143: this .startPointHash));
144: selectTransferContainers(this .startPointHash, minCount,
145: maxCount, maxtime);
146:
147: // count the indexes, can be smaller as expected
148: this .idxCount = indexCounter();
149: if (this .idxCount < minCount) {
150: log.logFine("Too few (" + this .idxCount
151: + ") indexes selected for transfer.");
152: this .status = chunkStatus_FAILED;
153: }
154: } catch (InterruptedException e) {
155: this .status = chunkStatus_INTERRUPTED;
156: }
157: }
158:
159: public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex,
160: int minCount, int maxCount, int maxtime, String startHash) {
161: try {
162: this .log = log;
163: this .wordIndex = wordIndex;
164: log
165: .logFine("Demanded hash "
166: + startHash
167: + " as start point for index distribution, distance = "
168: + yacyDHTAction
169: .dhtDistance(yacyCore.seedDB
170: .mySeed().hash,
171: this .startPointHash));
172: selectTransferContainers(startHash, minCount, maxCount,
173: maxtime);
174:
175: // count the indexes, can be smaller as expected
176: this .idxCount = indexCounter();
177: if (this .idxCount < minCount) {
178: log.logFine("Too few (" + this .idxCount
179: + ") indexes selected for transfer.");
180: this .status = chunkStatus_FAILED;
181: }
182: } catch (InterruptedException e) {
183: this .status = chunkStatus_INTERRUPTED;
184: }
185: }
186:
187: private String selectTransferStart() {
188: String startPointHash;
189: // first try to select with increasing probality a good start point
190: double minimumDistance = ((double) peerRedundancy)
191: / ((double) yacyCore.seedDB.sizeConnected());
192: double d, bestDistance = 0.0;
193: String bestHash = null;
194: for (int i = yacyCore.seedDB.sizeConnected() / 8; i > 0; i--) {
195: startPointHash = kelondroBase64Order.enhancedCoder.encode(
196: serverCodings.encodeMD5Raw(Long.toString(i
197: + System.currentTimeMillis()))).substring(
198: 2, 2 + yacySeedDB.commonHashLength);
199: d = yacyDHTAction.dhtDistance(
200: yacyCore.seedDB.mySeed().hash, startPointHash);
201: if (d > (minimumDistance + ((double) i / (double) 10))) {
202: return startPointHash;
203: }
204: if (d > bestDistance) {
205: bestDistance = d;
206: bestHash = startPointHash;
207: }
208: }
209: // if that fails, take simply the best start point
210: if (bestHash == null) {
211: return yacyCore.seedDB.mySeed().hash.substring(0, 11) + "z";
212: } else {
213: return bestHash;
214: }
215: }
216:
217: private void selectTransferContainers(String hash, int mincount,
218: int maxcount, int maxtime) throws InterruptedException {
219: try {
220: this .selectionStartTime = System.currentTimeMillis();
221: int refcountRAM = selectTransferContainersResource(hash,
222: true, maxcount, maxtime);
223: if (refcountRAM >= mincount) {
224: log.logFine("DHT selection from RAM: " + refcountRAM
225: + " entries");
226: return;
227: }
228: int refcountFile = selectTransferContainersResource(hash,
229: false, maxcount, maxtime);
230: log.logFine("DHT selection from FILE: " + refcountFile
231: + " entries, RAM provided only " + refcountRAM
232: + " entries");
233: return;
234: } finally {
235: this .selectionEndTime = System.currentTimeMillis();
236: }
237: }
238:
239: private int selectTransferContainersResource(String hash,
240: boolean ram, int maxcount, int maxtime)
241: throws InterruptedException {
242: // if (maxcount > 500) { maxcount = 500; } // flooding & OOM reduce
243: // the hash is a start hash from where the indexes are picked
244: final ArrayList<indexContainer> tmpContainers = new ArrayList<indexContainer>(
245: maxcount);
246: try {
247: final Iterator<indexContainer> indexContainerIterator = wordIndex
248: .indexContainerSet(hash, ram, true, maxcount)
249: .iterator();
250: indexContainer container;
251: Iterator<indexRWIRowEntry> urlIter;
252: indexRWIRowEntry iEntry;
253: indexURLEntry lurl;
254: int refcount = 0;
255: int wholesize;
256:
257: urlCache = new HashMap<String, indexURLEntry>();
258: final double maximumDistance = ((double) peerRedundancy * 2)
259: / ((double) yacyCore.seedDB.sizeConnected());
260: final long timeout = (maxtime < 0) ? Long.MAX_VALUE
261: : System.currentTimeMillis() + maxtime;
262: while ((maxcount > refcount)
263: && (indexContainerIterator.hasNext())
264: && ((container = (indexContainer) indexContainerIterator
265: .next()) != null)
266: && (container.size() > 0)
267: && ((tmpContainers.size() == 0) || (yacyDHTAction
268: .dhtDistance(container.getWordHash(),
269: tmpContainers.get(0).getWordHash()) < maximumDistance))
270: && (System.currentTimeMillis() < timeout)) {
271: // check for interruption
272: if (Thread.currentThread().isInterrupted())
273: throw new InterruptedException(
274: "Shutdown in progress");
275:
276: // make an on-the-fly entity and insert values
277: int notBoundCounter = 0;
278: try {
279: wholesize = container.size();
280: urlIter = container.entries();
281: // iterate over indexes to fetch url entries and store them in the urlCache
282: while ((urlIter.hasNext()) && (maxcount > refcount)
283: && (System.currentTimeMillis() < timeout)) {
284: // CPU & IO reduce
285: // try { Thread.sleep(50); } catch (InterruptedException e) { }
286:
287: iEntry = urlIter.next();
288: if ((iEntry == null)
289: || (iEntry.urlHash() == null)) {
290: urlIter.remove();
291: continue;
292: }
293: lurl = wordIndex.loadedURL.load(iEntry
294: .urlHash(), iEntry, 0);
295: if ((lurl == null) || (lurl.comp() == null)
296: || (lurl.comp().url() == null)) {
297: //yacyCore.log.logFine("DEBUG selectTransferContainersResource: not-bound url hash '" + iEntry.urlHash() + "' for word hash " + container.getWordHash());
298: notBoundCounter++;
299: urlIter.remove();
300: wordIndex.removeEntry(container
301: .getWordHash(), iEntry.urlHash());
302: } else {
303: urlCache.put(iEntry.urlHash(), lurl);
304: //yacyCore.log.logFine("DEBUG selectTransferContainersResource: added url hash '" + iEntry.urlHash() + "' to urlCache for word hash " + container.getWordHash());
305: refcount++;
306: }
307: }
308:
309: // remove all remaining; we have enough
310: while (urlIter.hasNext()) {
311: iEntry = urlIter.next();
312: urlIter.remove();
313: }
314:
315: // use whats left
316: log.logFine("Selected partial index ("
317: + container.size() + " from " + wholesize
318: + " URLs, " + notBoundCounter
319: + " not bound) for word "
320: + container.getWordHash());
321: tmpContainers.add(container);
322: } catch (kelondroException e) {
323: log.logSevere(
324: "plasmaWordIndexDistribution/2: deleted DB for word "
325: + container.getWordHash(), e);
326: wordIndex.deleteContainer(container.getWordHash());
327: }
328: }
329: // create result
330: indexContainers = (indexContainer[]) tmpContainers
331: .toArray(new indexContainer[tmpContainers.size()]);
332: //[C[16GwGuFzwffp] has 1 entries, C[16hGKMAl0w97] has 9 entries, C[17A8cDPF6SfG] has 9 entries, C[17Kdj__WWnUy] has 1 entries, C[1
333: if ((indexContainers == null)
334: || (indexContainers.length == 0)) {
335: log
336: .logFine("No index available for index transfer, hash start-point "
337: + startPointHash);
338: this .status = chunkStatus_FAILED;
339: return 0;
340: }
341:
342: this .status = chunkStatus_FILLED;
343: return refcount;
344: } catch (kelondroException e) {
345: log.logSevere("selectTransferIndexes database corrupted: "
346: + e.getMessage(), e);
347: indexContainers = new indexContainer[0];
348: urlCache = new HashMap<String, indexURLEntry>();
349: this .status = chunkStatus_FAILED;
350: return 0;
351: }
352: }
353:
354: public synchronized String deleteTransferIndexes() {
355: Iterator<indexRWIRowEntry> urlIter;
356: indexRWIEntry iEntry;
357: HashSet<String> urlHashes;
358: String count = "0";
359:
360: for (int i = 0; i < this .indexContainers.length; i++) {
361: // delete entries separately
362: if (this .indexContainers[i] == null) {
363: log.logFine("Deletion of partial index #" + i
364: + " not possible, entry is null");
365: continue;
366: }
367: int c = this .indexContainers[i].size();
368: urlHashes = new HashSet<String>(this .indexContainers[i]
369: .size());
370: urlIter = this .indexContainers[i].entries();
371: while (urlIter.hasNext()) {
372: iEntry = urlIter.next();
373: urlHashes.add(iEntry.urlHash());
374: }
375: String wordHash = indexContainers[i].getWordHash();
376: count = wordIndex.removeEntriesExpl(this .indexContainers[i]
377: .getWordHash(), urlHashes);
378: if (log.isFine())
379: log.logFine("Deleted partial index (" + c
380: + " URLs) for word " + wordHash + "; "
381: + this .wordIndex.indexSize(wordHash)
382: + " entries left");
383: this .indexContainers[i] = null;
384: }
385: return count;
386: }
387:
388: public long getSelectionTime() {
389: if (this .selectionStartTime == 0 || this .selectionEndTime == 0)
390: return -1;
391: return this .selectionEndTime - this .selectionStartTime;
392: }
393:
394: public void incTransferFailedCounter() {
395: this .transferFailedCounter++;
396: }
397:
398: public int getTransferFailedCounter() {
399: return transferFailedCounter;
400: }
401: }
|