001: // plasmaDHTFlush.java
002: // ------------------------------
003: // part of YaCy
004: // (C) by Michael Peter Christen; mc@anomic.de
005: // first published on http://www.anomic.de
006: // Frankfurt, Germany, 2005, 2006
007: //
008: // This Class was written by Martin Thelian
009: //
010: // This program is free software; you can redistribute it and/or modify
011: // it under the terms of the GNU General Public License as published by
012: // the Free Software Foundation; either version 2 of the License, or
013: // (at your option) any later version.
014: //
015: // This program is distributed in the hope that it will be useful,
016: // but WITHOUT ANY WARRANTY; without even the implied warranty of
017: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
018: // GNU General Public License for more details.
019: //
020: // You should have received a copy of the GNU General Public License
021: // along with this program; if not, write to the Free Software
022: // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
023: //
024: // Using this software in any meaning (reading, learning, copying, compiling,
025: // running) means that you agree that the Author(s) is (are) not responsible
026: // for cost, loss of data or any harm that may be caused directly or indirectly
027: // by usage of this softare or this documentation. The usage of this software
028: // is on your own risk. The installation and usage (starting/running) of this
029: // software may allow other people or application to access your computer and
030: // any attached devices and is highly dependent on the configuration of the
031: // software which must be done by the user of the software; the author(s) is
032: // (are) also not responsible for proper configuration and usage of the
033: // software, even if provoked by documentation provided together with
034: // the software.
035: //
036: // Any changes to this file according to the GPL as documented in the file
037: // gpl.txt aside this file in the shipment you received can be done to the
038: // lines that follows this copyright notice here, but changes must not be
039: // done inside the copyright notive above. A re-distribution must contain
040: // the intact and unchanged copyright notice.
041: // Contributions and changes to the program code must be marked as such.
042:
043: package de.anomic.plasma;
044:
045: import de.anomic.server.logging.serverLog;
046: import de.anomic.yacy.yacySeed;
047:
048: public class plasmaDHTFlush extends Thread {
049: private yacySeed seed = null;
050: private boolean delete = false;
051: private boolean finished = false;
052: private boolean gzipBody4Transfer = false;
053: private int timeout4Transfer = 60000;
054: private int transferedEntryCount = 0;
055: private long transferedBytes = 0;
056: private int transferedContainerCount = 0;
057: private String status = "Running";
058: private String oldStartingPointHash = "AAAAAAAAAAAA",
059: startPointHash = "AAAAAAAAAAAA";
060: private int initialWordsDBSize = 0;
061: private int chunkSize = 500;
062: private final long startingTime = System.currentTimeMillis();
063: private final plasmaSwitchboard sb;
064: private plasmaDHTTransfer worker = null;
065: private serverLog log;
066: private plasmaWordIndex wordIndex;
067:
068: public plasmaDHTFlush(serverLog log, plasmaWordIndex wordIndex,
069: yacySeed seed, boolean delete, boolean gzipBody, int timeout) {
070: super (new ThreadGroup("TransferIndexThreadGroup"),
071: "TransferIndex_" + seed.getName());
072: this .log = log;
073: this .wordIndex = wordIndex;
074: this .seed = seed;
075: this .delete = delete;
076: this .sb = plasmaSwitchboard.getSwitchboard();
077: this .initialWordsDBSize = this .sb.wordIndex.size();
078: this .gzipBody4Transfer = gzipBody;
079: this .timeout4Transfer = timeout;
080: //this.maxOpenFiles4Transfer = (int) sb.getConfigLong("indexTransfer.maxOpenFiles",800);
081: }
082:
083: public void run() {
084: this .performTransferWholeIndex();
085: }
086:
087: public void stopIt(boolean wait) throws InterruptedException {
088: this .finished = true;
089: if (this .worker != null)
090: this .worker.stopIt();
091: if (wait)
092: this .join();
093: }
094:
095: public boolean isFinished() {
096: return this .finished;
097: }
098:
099: public boolean deleteIndex() {
100: return this .delete;
101: }
102:
103: public int[] getIndexCount() {
104: plasmaDHTTransfer workerThread = this .worker;
105: if (workerThread != null) {
106: return new int[] { this .chunkSize,
107: workerThread.dhtChunk.indexCount() };
108: }
109: return new int[] { this .chunkSize, 500 };
110: }
111:
112: public int getTransferedEntryCount() {
113: return this .transferedEntryCount;
114: }
115:
116: public int getTransferedContainerCount() {
117: return this .transferedContainerCount;
118: }
119:
120: public long getTransferedBytes() {
121: return this .transferedBytes;
122: }
123:
124: public float getTransferedContainerPercent() {
125: long currentWordsDBSize = this .sb.wordIndex.size();
126: if (this .initialWordsDBSize == 0)
127: return 100;
128: else if (currentWordsDBSize >= this .initialWordsDBSize)
129: return 0;
130: //else return (float) ((initialWordsDBSize-currentWordsDBSize)/(initialWordsDBSize/100));
131: else
132: return (this .transferedContainerCount * 100 / this .initialWordsDBSize);
133: }
134:
135: public int getTransferedEntrySpeed() {
136: long transferTime = System.currentTimeMillis()
137: - this .startingTime;
138: if (transferTime <= 0)
139: transferTime = 1;
140: return (int) ((1000 * this .transferedEntryCount) / transferTime);
141: }
142:
143: public yacySeed getSeed() {
144: return this .seed;
145: }
146:
147: public String[] getStatus() {
148: plasmaDHTTransfer workerThread = this .worker;
149: if (workerThread != null) {
150: return new String[] { this .status,
151: workerThread.getStatusMessage() };
152: }
153: return new String[] { this .status, "Not running" };
154: }
155:
156: public String[] getRange() {
157: plasmaDHTTransfer workerThread = this .worker;
158: if (workerThread != null) {
159: return new String[] {
160: "[" + this .oldStartingPointHash + ".."
161: + this .startPointHash + "]",
162: "["
163: + workerThread.dhtChunk.firstContainer()
164: .getWordHash()
165: + ".."
166: + workerThread.dhtChunk.lastContainer()
167: .getWordHash() + "]" };
168: }
169: return new String[] {
170: "[" + this .oldStartingPointHash + ".."
171: + this .startPointHash + "]",
172: "[------------..------------]" };
173: }
174:
175: public void performTransferWholeIndex() {
176: plasmaDHTChunk newDHTChunk = null, oldDHTChunk = null;
177: try {
178: // initial startingpoint of intex transfer is "AAAAAAAAAAAA"
179: this .log
180: .logFine("Selected hash "
181: + this .startPointHash
182: + " as start point for index distribution of whole index");
183:
184: /* Loop until we have
185: * - finished transfer of whole index
186: * - detected a server shutdown or user interruption
187: * - detected a failure
188: */
189: long iteration = 0;
190:
191: while (!this .finished
192: && !Thread.currentThread().isInterrupted()) {
193: iteration++;
194: oldDHTChunk = newDHTChunk;
195:
196: // selecting 500 words to transfer
197: this .status = "Running: Selecting chunk " + iteration;
198: newDHTChunk = new plasmaDHTChunk(this .log,
199: this .wordIndex, this .chunkSize / 3 * 2,
200: this .chunkSize, -1, this .startPointHash);
201:
202: /* If we havn't selected a word chunk this could be because of
203: * a) no words are left in the index
204: * b) max open file limit was exceeded
205: */
206: if (nothingSelected(newDHTChunk)) {
207: if (this .sb.wordIndex.size() > 0 && this .delete) {
208: // if there are still words in the index we try it again now
209: this .startPointHash = "AAAAAAAAAAAA";
210: } else {
211: // otherwise we could end transfer now
212: this .log
213: .logFine("No index available for index transfer, hash start-point "
214: + this .startPointHash);
215: this .status = "Finished. " + iteration
216: + " chunks transfered.";
217: this .finished = true;
218: }
219: } else {
220:
221: // getting start point for next DHT-selection
222: this .oldStartingPointHash = this .startPointHash;
223: this .startPointHash = newDHTChunk.lastContainer()
224: .getWordHash(); // DHT targets must have greater hashes
225:
226: this .log
227: .logInfo("Index selection of "
228: + newDHTChunk.indexCount()
229: + " words ["
230: + newDHTChunk.firstContainer()
231: .getWordHash()
232: + " .. "
233: + newDHTChunk.lastContainer()
234: .getWordHash()
235: + "]"
236: + " in "
237: + (newDHTChunk.getSelectionTime() / 1000)
238: + " seconds ("
239: + (1000 * newDHTChunk.indexCount() / (newDHTChunk
240: .getSelectionTime() + 1))
241: + " words/s)");
242: }
243:
244: // query status of old worker thread
245: if (this .worker != null) {
246: this .status = "Finished: Selecting chunk "
247: + iteration;
248: this .worker.join();
249: if (this .worker.getStatus() != plasmaDHTChunk.chunkStatus_COMPLETE) {
250: // if the transfer failed we abort index transfer now
251: this .status = "Aborted because of Transfer error:\n"
252: + this .worker.dhtChunk.getStatus();
253:
254: // abort index transfer
255: return;
256: }
257:
258: // calculationg the new transfer size
259: this .calculateNewChunkSize();
260:
261: // counting transfered containers / entries
262: this .transferedEntryCount += oldDHTChunk
263: .indexCount();
264: this .transferedContainerCount += oldDHTChunk
265: .containerSize();
266: this .transferedBytes += this .worker
267: .getPayloadSize();
268:
269: this .worker = null;
270:
271: // deleting transfered words from index
272: if (this .delete) {
273: this .status = "Running: Deleting chunk "
274: + iteration;
275: String urlReferences = oldDHTChunk
276: .deleteTransferIndexes();
277: this .log.logFine("Deleted from "
278: + oldDHTChunk.containerSize()
279: + " transferred RWIs locally "
280: + urlReferences + " URL references");
281: }
282: oldDHTChunk = null;
283: }
284:
285: // handover chunk to transfer worker
286: if ((newDHTChunk.containerSize() > 0)
287: || (newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FILLED)) {
288: this .worker = new plasmaDHTTransfer(this .log,
289: this .seed, newDHTChunk,
290: this .gzipBody4Transfer,
291: this .timeout4Transfer, 5);
292: this .worker
293: .setTransferMode(plasmaDHTTransfer.TRANSFER_MODE_FLUSH);
294: this .worker.start();
295: }
296: }
297:
298: // if we reach this point we were aborted by the user or by server shutdown
299: if (this .sb.wordIndex.size() > 0)
300: this .status = "aborted";
301: } catch (Exception e) {
302: this .status = "Error: " + e.getMessage();
303: this .log.logWarning("Index transfer to peer "
304: + this .seed.getName() + ":" + this .seed.hash
305: + " failed:'" + e.getMessage() + "'", e);
306:
307: } finally {
308: if (this .worker != null) {
309: this .worker.stopIt();
310: try {
311: this .worker.join();
312: } catch (Exception e) {
313: }
314: }
315: }
316: }
317:
318: private void calculateNewChunkSize() {
319: // getting the transfered chunk size
320: this .chunkSize = this .worker.dhtChunk.indexCount();
321:
322: // getting the chunk selection time
323: long selectionTime = this .worker.dhtChunk.getSelectionTime();
324:
325: // getting the chunk transfer time
326: long transferTime = this .worker.getTransferTime();
327:
328: // calculationg the new chunk size
329: if (transferTime > 60 * 1000 && this .chunkSize > 200) {
330: this .chunkSize -= 100;
331: } else if (selectionTime < transferTime) {
332: this .chunkSize += 100;
333: } else if (selectionTime >= selectionTime
334: && this .chunkSize > 200) {
335: this .chunkSize -= 100;
336: }
337: }
338:
339: private static boolean nothingSelected(plasmaDHTChunk newDHTChunk) {
340: return (newDHTChunk == null)
341: || (newDHTChunk.containerSize() == 0)
342: || (newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FAILED);
343: }
344: }
|