001: // plasmaDHTTransfer.java
002: // ------------------------------
003: // part of YaCy
004: // (C) by Michael Peter Christen; mc@anomic.de
005: // first published on http://www.anomic.de
006: // Frankfurt, Germany, 2005, 2006
007: //
008: // This class was provided by Martin Thelian
009: //
010: // This program is free software; you can redistribute it and/or modify
011: // it under the terms of the GNU General Public License as published by
012: // the Free Software Foundation; either version 2 of the License, or
013: // (at your option) any later version.
014: //
015: // This program is distributed in the hope that it will be useful,
016: // but WITHOUT ANY WARRANTY; without even the implied warranty of
017: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
018: // GNU General Public License for more details.
019: //
020: // You should have received a copy of the GNU General Public License
021: // along with this program; if not, write to the Free Software
022: // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
023: //
024: // Using this software in any meaning (reading, learning, copying, compiling,
025: // running) means that you agree that the Author(s) is (are) not responsible
026: // for cost, loss of data or any harm that may be caused directly or indirectly
027: // by usage of this softare or this documentation. The usage of this software
028: // is on your own risk. The installation and usage (starting/running) of this
029: // software may allow other people or application to access your computer and
030: // any attached devices and is highly dependent on the configuration of the
031: // software which must be done by the user of the software; the author(s) is
032: // (are) also not responsible for proper configuration and usage of the
033: // software, even if provoked by documentation provided together with
034: // the software.
035: //
036: // Any changes to this file according to the GPL as documented in the file
037: // gpl.txt aside this file in the shipment you received can be done to the
038: // lines that follows this copyright notice here, but changes must not be
039: // done inside the copyright notive above. A re-distribution must contain
040: // the intact and unchanged copyright notice.
041: // Contributions and changes to the program code must be marked as such.
042:
043: package de.anomic.plasma;
044:
045: import java.util.HashMap;
046:
047: import de.anomic.server.logging.serverLog;
048: import de.anomic.yacy.yacyClient;
049: import de.anomic.yacy.yacyCore;
050: import de.anomic.yacy.yacySeed;
051:
052: public class plasmaDHTTransfer extends Thread {
053:
054: public static final int TRANSFER_MODE_DISTRIBUTION = 0;
055: public static final int TRANSFER_MODE_FLUSH = 1;
056:
057: // connection properties
058: private boolean gzipBody4Transfer = false;
059: private int timeout4Transfer = 60000;
060:
061: // status fields
062: private boolean stopped = false;
063: private long transferTime = 0;
064: private long payloadSize = 0;
065: private int transferStatus = plasmaDHTChunk.chunkStatus_UNDEFINED;
066: private String transferStatusMessage = "";
067:
068: // delivery destination
069: private yacySeed seed = null;
070:
071: // word chunk
072: plasmaDHTChunk dhtChunk;
073:
074: // other fields
075: private int maxRetry;
076: private int transferMode = TRANSFER_MODE_DISTRIBUTION;
077: serverLog log;
078:
079: public plasmaDHTTransfer(serverLog log, yacySeed destSeed,
080: plasmaDHTChunk dhtChunk, boolean gzipBody, int timeout,
081: int retries) {
082: super (new ThreadGroup("TransferIndexThreadGroup"),
083: "TransferIndexWorker_" + destSeed.getName());
084: this .log = log;
085: this .gzipBody4Transfer = gzipBody;
086: this .timeout4Transfer = timeout;
087: this .dhtChunk = dhtChunk;
088: this .maxRetry = retries;
089: this .seed = destSeed;
090: }
091:
092: public void setTransferMode(int mode) {
093: this .transferMode = mode;
094: }
095:
096: public void run() {
097: try {
098: this .uploadIndex();
099: } catch (InterruptedException e) {
100: e.printStackTrace();
101: }
102: }
103:
104: private boolean isAborted() {
105: if (this .stopped || Thread.currentThread().isInterrupted()) {
106: this .transferStatus = plasmaDHTChunk.chunkStatus_INTERRUPTED;
107: this .transferStatusMessage = "aborted";
108: return true;
109: }
110: return false;
111: }
112:
113: public void stopIt() {
114: this .stopped = true;
115: }
116:
117: public long getTransferTime() {
118: return this .transferTime;
119: }
120:
121: public long getPayloadSize() {
122: return this .payloadSize;
123: }
124:
125: public int getStatus() {
126: return this .transferStatus;
127: }
128:
129: public String getStatusMessage() {
130: return this .transferStatusMessage;
131: }
132:
133: public yacySeed getSeed() {
134: return this .seed;
135: }
136:
137: public void uploadIndex() throws InterruptedException {
138:
139: /* loop until we
140: * - have successfully transfered the words list or
141: * - the retry counter limit was exceeded
142: */
143: this .transferStatus = plasmaDHTChunk.chunkStatus_RUNNING;
144: long retryCount = 0, start = System.currentTimeMillis();
145: while (true) {
146: // testing if we were aborted
147: if (this .isAborted())
148: return;
149:
150: // transfering seleted words to remote peer
151: this .transferStatusMessage = "Running: Transfering chunk to target "
152: + this .seed.hash + "/" + this .seed.getName();
153: HashMap<String, Object> result = yacyClient.transferIndex(
154: this .seed, this .dhtChunk.containers(),
155: this .dhtChunk.urlCacheMap(),
156: this .gzipBody4Transfer, this .timeout4Transfer);
157: String error = (String) result.get("result");
158: if (error == null) {
159: // words successfully transfered
160: this .transferTime = System.currentTimeMillis() - start;
161: this .payloadSize = ((Integer) result.get("payloadSize"))
162: .intValue();
163:
164: this .log
165: .logInfo("Index transfer of "
166: + this .dhtChunk.indexCount()
167: + " entries "
168: + this .dhtChunk.containerSize()
169: + " words ["
170: + this .dhtChunk.firstContainer()
171: .getWordHash()
172: + " .. "
173: + this .dhtChunk.lastContainer()
174: .getWordHash()
175: + "]"
176: + " and "
177: + this .dhtChunk.urlCacheMap().size()
178: + " URLs"
179: + " to peer "
180: + this .seed.getName()
181: + ":"
182: + this .seed.hash
183: + " in "
184: + (this .transferTime / 1000)
185: + " seconds successful ("
186: + (1000 * this .dhtChunk.indexCount() / (this .transferTime + 1))
187: + " words/s, " + this .payloadSize
188: + " Bytes)");
189:
190: // if the peer has set a pause time and we are in flush mode (index transfer)
191: // then we pause for a while now
192: if (this .transferMode == TRANSFER_MODE_FLUSH) {
193: long pause = getBusyTime(result);
194: if (pause != -1) {
195: this .transferStatusMessage = "Finished: Transfer of chunk to target "
196: + this .seed.hash
197: + "/"
198: + this .seed.getName()
199: + ". Pausing "
200: + pause + " ms.";
201: this .pause(pause);
202: }
203: } else {
204: this .transferStatusMessage = "Finished: Transfer of chunk to target "
205: + this .seed.hash
206: + "/"
207: + this .seed.getName();
208: }
209:
210: // transfer of chunk finished
211: this .transferStatus = plasmaDHTChunk.chunkStatus_COMPLETE;
212: retryCount = 0;
213:
214: break;
215: }
216:
217: // inc retry counter
218: retryCount++;
219: if (this .isAborted())
220: return;
221:
222: boolean reconnectNeeded = false;
223: long pauseTime = 1;
224:
225: if (error.equals("busy")) {
226: // get pause time that was requested by the remote peer
227: pauseTime = getBusyTime(result);
228: if (pauseTime == -1)
229: pauseTime = 60000;
230:
231: this .transferStatusMessage = "Peer "
232: + this .seed.getName() + ":" + this .seed.hash
233: + " is busy. Waiting " + pauseTime + " ms.";
234: this .log.logInfo(this .transferStatusMessage);
235: } else {
236: this .transferStatusMessage = "Transfer to peer "
237: + this .seed.getName() + ":" + this .seed.hash
238: + " failed:'" + error
239: + "', Trying to reconnect ...";
240:
241: // force disconnection of peer
242: yacyCore.peerActions.peerDeparture(this .seed,
243: "DHT Transfer: " + this .transferStatusMessage);
244: this .log.logWarning(this .transferStatusMessage);
245:
246: // calculate pause time
247: pauseTime = retryCount * 10000;
248: reconnectNeeded = true;
249: }
250:
251: // if the retry counter limit was not exceeded we'll retry it in a few seconds
252: if (retryCount > this .maxRetry) {
253: this .transferStatusMessage = "Transfer aborted. Retry limit reached.";
254: this .transferStatus = plasmaDHTChunk.chunkStatus_FAILED;
255: return;
256: }
257:
258: // sleep for a while
259: this .pause(pauseTime);
260:
261: // reconnect to peer if needed
262: if (reconnectNeeded) {
263:
264: /* loop until
265: * - we have successfully done a peer ping or
266: * - the retry counter limit was exceeded
267: */
268: while (true) {
269: // testing if we were aborted ...
270: if (this .isAborted())
271: return;
272:
273: // doing a peer ping to the remote seed
274: int added = yacyClient.publishMySeed(this .seed
275: .getPublicAddress(), this .seed.hash);
276: if (added < 0) {
277: // inc. retry counter
278: retryCount++;
279: this .transferStatusMessage = "Disconnected peer: Peer ping failed. "
280: + ((retryCount > 5) ? "Transfer aborted."
281: : "Retry " + retryCount);
282: if (retryCount > this .maxRetry)
283: return;
284: this .pause(retryCount * 10000);
285: continue;
286: }
287:
288: yacyCore.seedDB.getConnected(this .seed.hash);
289: this .transferStatusMessage = "running";
290: break;
291: }
292: }
293: }
294: }
295:
296: @SuppressWarnings("unchecked")
297: private long getBusyTime(HashMap<String, Object> result) {
298: int pause = -1;
299: Object transferRWIResult = result.get("resultTransferRWI");
300: assert transferRWIResult instanceof HashMap;
301: if (transferRWIResult != null
302: && ((HashMap<String, String>) transferRWIResult)
303: .containsKey("pause")) {
304: String pauseStr = (String) ((HashMap<String, String>) transferRWIResult)
305: .get("pause");
306: try {
307: pause = Integer.valueOf(pauseStr).intValue();
308: } catch (NumberFormatException numEx) {
309: }
310: if (pause < 0)
311: pause = 5000;
312: else if (pause > 30000)
313: pause = 30000;
314: }
315: return pause;
316: }
317:
318: private void pause(long sleepTime) throws InterruptedException {
319: if (sleepTime == 0)
320: return;
321: long sleepCounter = sleepTime / 1000;
322: long sleepRest = sleepTime % 1000;
323: while (!this .isAborted() && sleepCounter > 0) {
324: sleepCounter--;
325: Thread.sleep(1000);
326: }
327: if (sleepRest > 0)
328: Thread.sleep(sleepRest);
329: }
330: }
|