001: /*
002: * This file is part of DrFTPD, Distributed FTP Daemon.
003: *
004: * DrFTPD is free software; you can redistribute it and/or modify
005: * it under the terms of the GNU General Public License as published by
006: * the Free Software Foundation; either version 2 of the License, or
007: * (at your option) any later version.
008: *
009: * DrFTPD is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012: * GNU General Public License for more details.
013: *
014: * You should have received a copy of the GNU General Public License
015: * along with DrFTPD; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */
018: package org.drftpd.slave;
019:
020: import java.io.FileInputStream;
021: import java.io.FileNotFoundException;
022: import java.io.FileOutputStream;
023: import java.io.IOException;
024: import java.io.InputStream;
025: import java.io.OutputStream;
026: import java.net.InetSocketAddress;
027: import java.net.Socket;
028: import java.util.Iterator;
029: import java.util.zip.CRC32;
030: import java.util.zip.CheckedInputStream;
031: import java.util.zip.CheckedOutputStream;
032:
033: import net.sf.drftpd.FileExistsException;
034: import net.sf.drftpd.ObjectNotFoundException;
035:
036: import org.apache.log4j.Logger;
037: import org.drftpd.PassiveConnection;
038: import org.drftpd.io.AddAsciiOutputStream;
039: import org.drftpd.slave.async.AsyncResponseDiskStatus;
040: import org.drftpd.slave.async.AsyncResponseTransferStatus;
041:
042: import se.mog.io.File;
043:
044: /**
045: * @author zubov
046: * @version $Id: Transfer.java 1593 2007-01-31 22:56:52Z zubov $
047: */
048: public class Transfer {
049: private static final Logger logger = Logger
050: .getLogger(Transfer.class);
051: private String _abortReason = null;
052: private CRC32 _checksum = null;
053: private Connection _conn;
054: private char _direction;
055: private long _finished = 0;
056: private InputStream _in;
057: private char _mode = 'I';
058: private OutputStream _out;
059: private Slave _slave;
060: private Socket _sock;
061: private long _started = 0;
062: private long _transfered = 0;
063: private TransferIndex _transferIndex;
064: public static final char TRANSFER_RECEIVING_UPLOAD = 'R';
065: public static final char TRANSFER_SENDING_DOWNLOAD = 'S';
066: public static final char TRANSFER_THROUGHPUT = 'A';
067: public static final char TRANSFER_UNKNOWN = 'U';
068: private File _file = null;
069: private String _pathForUpload = null;
070:
071: /**
072: * Start undefined transfer.
073: */
074: public Transfer(Connection conn, Slave slave,
075: TransferIndex transferIndex) {
076: if (conn == null) {
077: throw new RuntimeException();
078: }
079:
080: if (slave == null) {
081: throw new RuntimeException();
082: }
083:
084: if (transferIndex == null) {
085: throw new RuntimeException();
086: }
087:
088: _slave = slave;
089: _conn = conn;
090: synchronized (this ) {
091: _direction = Transfer.TRANSFER_UNKNOWN;
092: }
093: _transferIndex = transferIndex;
094: }
095:
096: public int hashCode() {
097: return _transferIndex.hashCode();
098: }
099:
100: public synchronized void abort(String reason) {
101: try {
102: _abortReason = reason;
103:
104: } finally {
105: if (_conn != null) {
106: _conn.abort();
107: }
108: if (_direction == Transfer.TRANSFER_RECEIVING_UPLOAD) {
109: if (_file != null) {
110: _file.delete();
111: }
112: }
113: if (_sock != null) {
114: try {
115: _sock.close();
116: } catch (IOException e) {
117: }
118: }
119: if (_out != null) {
120: try {
121: _out.close();
122: } catch (IOException e) {
123: }
124: }
125: if (_in != null) {
126: try {
127: _in.close();
128: } catch (IOException e) {
129: }
130: }
131: }
132: }
133:
134: public long getChecksum() {
135: if (_checksum == null) {
136: return 0;
137: }
138:
139: return _checksum.getValue();
140: }
141:
142: public long getElapsed() {
143: if (_finished == 0) {
144: return System.currentTimeMillis() - _started;
145: }
146:
147: return _finished - _started;
148: }
149:
150: public int getLocalPort() {
151: if (_conn instanceof PassiveConnection) {
152: return ((PassiveConnection) _conn).getLocalPort();
153: }
154:
155: throw new IllegalStateException(
156: "getLocalPort() called on a non-passive transfer");
157: }
158:
159: public char getState() {
160: return _direction;
161: }
162:
163: public TransferStatus getTransferStatus() {
164: return new TransferStatus(getElapsed(), getTransfered(),
165: getChecksum(), isFinished(), getTransferIndex());
166: }
167:
168: public boolean isFinished() {
169: return (_finished != 0 || _abortReason != null);
170: }
171:
172: public long getTransfered() {
173: return _transfered;
174: }
175:
176: public TransferIndex getTransferIndex() {
177: return _transferIndex;
178: }
179:
180: private Transfer getUploadForPath(String path)
181: throws ObjectNotFoundException {
182: for (Iterator iter = _slave.getTransfers().iterator(); iter
183: .hasNext();) {
184: Transfer transfer = (Transfer) iter.next();
185: synchronized (transfer) {
186: if (!transfer.isReceivingUploading()) {
187: continue;
188: }
189: if (transfer._pathForUpload.equalsIgnoreCase(path)) {
190: return transfer;
191: }
192: }
193: }
194: throw new ObjectNotFoundException("Transfer not found");
195: }
196:
197: public int getXferSpeed() {
198: long elapsed = getElapsed();
199:
200: if (_transfered == 0) {
201: return 0;
202: }
203:
204: if (elapsed == 0) {
205: return 0;
206: }
207:
208: return (int) (_transfered / ((float) elapsed / (float) 1000));
209: }
210:
211: public boolean isReceivingUploading() {
212: return _direction == Transfer.TRANSFER_RECEIVING_UPLOAD;
213: }
214:
215: public boolean isSendingUploading() {
216: return _direction == Transfer.TRANSFER_SENDING_DOWNLOAD;
217: }
218:
219: public TransferStatus receiveFile(String dirname, char mode,
220: String filename, long offset) throws IOException {
221: _pathForUpload = dirname + File.separator + filename;
222: try {
223: _slave.getRoots().getFile(_pathForUpload);
224: throw new FileExistsException("File " + dirname
225: + File.separatorChar + filename + " exists");
226: } catch (FileNotFoundException ex) {
227: }
228: String root = _slave.getRoots().getARootFileDir(dirname)
229: .getPath();
230:
231: try {
232: _out = new FileOutputStream(_file = new File(root
233: + File.separator + filename));
234:
235: if (_slave.getUploadChecksums()) {
236: _checksum = new CRC32();
237: _out = new CheckedOutputStream(_out, _checksum);
238: }
239: accept();
240:
241: _in = _sock.getInputStream();
242: synchronized (this ) {
243: _direction = Transfer.TRANSFER_RECEIVING_UPLOAD;
244: }
245:
246: System.out.println(dirname + "/" + filename);
247: transfer(null);
248: _slave.sendResponse(new AsyncResponseDiskStatus(_slave
249: .getDiskStatus()));
250: return getTransferStatus();
251: } catch (IOException e) {
252: // TODO really delete on IOException ?
253: // _slave.delete(root + File.separator + filename);
254: throw e; // so the Master can still handle the exception
255: } finally {
256: if (_sock != null) {
257: try {
258: _sock.close();
259: } catch (IOException e) {
260: }
261: }
262: if (_out != null) {
263: try {
264: _out.close();
265: } catch (IOException e) {
266: }
267: }
268: if (_in != null) {
269: try {
270: _in.close();
271: } catch (IOException e) {
272: }
273: }
274: }
275: }
276:
277: public TransferStatus sendFile(String path, char type,
278: long resumePosition) throws IOException {
279: try {
280:
281: _in = new FileInputStream(_file = new File(_slave
282: .getRoots().getFile(path)));
283:
284: if (_slave.getDownloadChecksums()) {
285: _checksum = new CRC32();
286: _in = new CheckedInputStream(_in, _checksum);
287: }
288:
289: _in.skip(resumePosition);
290: accept();
291: if (_slave.getBufferSize() > 0) {
292: _sock.setSendBufferSize(_slave.getBufferSize());
293: }
294:
295: _out = _sock.getOutputStream();
296: synchronized (this ) {
297: _direction = Transfer.TRANSFER_SENDING_DOWNLOAD;
298: }
299:
300: System.out.println("DL:" + path);
301: try {
302: transfer(getUploadForPath(path));
303: } catch (ObjectNotFoundException e) {
304: transfer(null);
305: }
306: return getTransferStatus();
307: } finally {
308: if (_sock != null) {
309: try {
310: _sock.close();
311: } catch (IOException e) {
312: }
313: }
314: if (_out != null) {
315: try {
316: _out.close();
317: } catch (IOException e) {
318: }
319: }
320: if (_in != null) {
321: try {
322: _in.close();
323: } catch (IOException e) {
324: }
325: }
326: }
327: }
328:
329: private void accept() throws IOException {
330: _sock = _conn.connect(_slave.getBufferSize());
331:
332: _conn = null;
333: }
334:
335: /**
336: * Call sock.connect() and start sending.
337: *
338: * Read about buffers here:
339: * http://groups.google.com/groups?hl=sv&lr=&ie=UTF-8&oe=UTF-8&threadm=9eomqe%24rtr%241%40to-gate.itd.utech.de&rnum=22&prev=/groups%3Fq%3Dtcp%2Bgood%2Bbuffer%2Bsize%26start%3D20%26hl%3Dsv%26lr%3D%26ie%3DUTF-8%26oe%3DUTF-8%26selm%3D9eomqe%2524rtr%25241%2540to-gate.itd.utech.de%26rnum%3D22
340: *
341: * Quote: Short answer is: if memory is not limited make your buffer big;
342: * TCP will flow control itself and only use what it needs.
343: *
344: * Longer answer: for optimal throughput (assuming TCP is not flow
345: * controlling itself for other reasons) you want your buffer size to at
346: * least be
347: *
348: * channel bandwidth * channel round-trip-delay.
349: *
350: * So on a long slow link, if you can get 100K bps throughput, but your
351: * delay -s 8 seconds, you want:
352: *
353: * 100Kbps * / bits-per-byte * 8 seconds = 100 Kbytes
354: *
355: * That way TCP can keep transmitting data for 8 seconds before it would
356: * have to stop and wait for an ack (to clear space in the buffer for new
357: * data so it can put new TX data in there and on the line). (The idea is to
358: * get the ack before you have to stop transmitting.)
359: */
360: private void transfer(Transfer associatedUpload) throws IOException {
361: try {
362: _started = System.currentTimeMillis();
363: if (_mode == 'A') {
364: _out = new AddAsciiOutputStream(_out);
365: }
366:
367: byte[] buff = new byte[Math.max(_slave.getBufferSize(),
368: 65535)];
369: int count;
370: long currentTime = System.currentTimeMillis();
371:
372: try {
373: while (true) {
374: if (_abortReason != null) {
375: throw new TransferFailedException(
376: "Transfer was aborted - "
377: + _abortReason,
378: getTransferStatus());
379: }
380: count = _in.read(buff);
381: if (count == -1) {
382: if (associatedUpload == null) {
383: break; // done transferring
384: }
385: if (associatedUpload.getTransferStatus()
386: .isFinished()) {
387: break; // done transferring
388: }
389: try {
390: Thread.sleep(500);
391: } catch (InterruptedException e) {
392: }
393: continue; // waiting for upload to catch up
394: }
395: // count != -1
396: if ((System.currentTimeMillis() - currentTime) >= 1000) {
397: TransferStatus ts = getTransferStatus();
398: if (ts.isFinished()) {
399: throw new TransferFailedException(
400: "Transfer was aborted - "
401: + _abortReason, ts);
402: }
403: _slave
404: .sendResponse(new AsyncResponseTransferStatus(
405: ts));
406: currentTime = System.currentTimeMillis();
407: }
408: _transfered += count;
409: _out.write(buff, 0, count);
410: }
411:
412: _out.flush();
413: } catch (IOException e) {
414: throw new TransferFailedException(e,
415: getTransferStatus());
416: }
417: } finally {
418: _finished = System.currentTimeMillis();
419: _slave.removeTransfer(this ); // transfers are added in setting up the transfer, issueListenToSlave()/issueConnectToSlave()
420: }
421: }
422: }
|