001: /*
002: * This file is part of DrFTPD, Distributed FTP Daemon.
003: *
004: * DrFTPD is free software; you can redistribute it and/or modify
005: * it under the terms of the GNU General Public License as published by
006: * the Free Software Foundation; either version 2 of the License, or
007: * (at your option) any later version.
008: *
009: * DrFTPD is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012: * GNU General Public License for more details.
013: *
014: * You should have received a copy of the GNU General Public License
015: * along with DrFTPD; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */
018: package org.drftpd.slave;
019:
020: import java.io.BufferedInputStream;
021: import java.io.BufferedReader;
022: import java.io.EOFException;
023: import java.io.FileInputStream;
024: import java.io.FileNotFoundException;
025: import java.io.FileReader;
026: import java.io.IOException;
027: import java.io.ObjectInputStream;
028: import java.io.ObjectOutputStream;
029: import java.net.InetAddress;
030: import java.net.InetSocketAddress;
031: import java.net.Socket;
032: import java.net.SocketTimeoutException;
033: import java.net.UnknownHostException;
034: import java.util.ArrayList;
035: import java.util.Collection;
036: import java.util.HashMap;
037: import java.util.HashSet;
038: import java.util.Iterator;
039: import java.util.Properties;
040: import java.util.Set;
041: import java.util.zip.CRC32;
042: import java.util.zip.CheckedInputStream;
043: import java.util.zip.ZipEntry;
044: import java.util.zip.ZipInputStream;
045:
046: import javax.net.ssl.SSLContext;
047: import javax.net.ssl.SSLSocket;
048:
049: import net.sf.drftpd.FileExistsException;
050: import net.sf.drftpd.util.PortRange;
051:
052: import org.apache.log4j.BasicConfigurator;
053: import org.apache.log4j.Logger;
054: import org.drftpd.ActiveConnection;
055: import org.drftpd.LightSFVFile;
056: import org.drftpd.PassiveConnection;
057: import org.drftpd.PropertyHelper;
058: import org.drftpd.SSLGetContext;
059: import org.drftpd.id3.ID3Tag;
060: import org.drftpd.id3.MP3File;
061: import org.drftpd.master.QueuedOperation;
062: import org.drftpd.remotefile.CaseInsensitiveHashtable;
063: import org.drftpd.remotefile.LightRemoteFile;
064: import org.drftpd.slave.async.AsyncCommand;
065: import org.drftpd.slave.async.AsyncCommandArgument;
066: import org.drftpd.slave.async.AsyncResponse;
067: import org.drftpd.slave.async.AsyncResponseChecksum;
068: import org.drftpd.slave.async.AsyncResponseDIZFile;
069: import org.drftpd.slave.async.AsyncResponseDiskStatus;
070: import org.drftpd.slave.async.AsyncResponseException;
071: import org.drftpd.slave.async.AsyncResponseID3Tag;
072: import org.drftpd.slave.async.AsyncResponseMaxPath;
073: import org.drftpd.slave.async.AsyncResponseRemerge;
074: import org.drftpd.slave.async.AsyncResponseSFVFile;
075: import org.drftpd.slave.async.AsyncResponseTransfer;
076: import org.drftpd.slave.async.AsyncResponseTransferStatus;
077:
078: import se.mog.io.File;
079: import se.mog.io.PermissionDeniedException;
080:
081: import com.Ostermiller.util.StringTokenizer;
082:
083: /**
084: * @author mog
085: * @author zubov
086: * @version $Id: Slave.java 1703 2007-04-08 18:19:23Z tdsoul $
087: */
088: public class Slave {
089: public static final boolean isWin32 = System.getProperty("os.name")
090: .startsWith("Windows");
091: private static final Logger logger = Logger.getLogger(Slave.class);
092: private static final int socketTimeout = 10000; // 10 seconds, for Socket
093: protected static final int actualTimeout = 60000; // one minute, evaluated on a SocketTimeout
094: public static final String VERSION = "DrFTPD 2.0.5";
095: private int _bufferSize;
096: private SSLContext _ctx;
097: private boolean _downloadChecksums;
098: private RootCollection _roots;
099: private Socket _s;
100: private ObjectInputStream _sin;
101: private ObjectOutputStream _sout;
102: private HashMap _transfers;
103: private boolean _uploadChecksums;
104: private PortRange _portRange;
105: private Set _renameQueue = null;
106: private int _timeout;
107: private boolean _sslMaster;
108:
109: protected Slave() {
110:
111: }
112:
113: public Slave(Properties p) throws IOException {
114: InetSocketAddress addr = new InetSocketAddress(PropertyHelper
115: .getProperty(p, "master.host"), Integer
116: .parseInt(PropertyHelper.getProperty(p,
117: "master.bindport")));
118: _sslMaster = p.getProperty("slave.masterSSL", "false")
119: .equalsIgnoreCase("true");
120:
121: // Whatever interface the slave uses to connect to the master, is the
122: // interface that the master will report to clients requesting PASV transfers
123: // from this slave, unless pasv_addr is set on the master for this slave
124: logger.info("Connecting to master at " + addr);
125:
126: String slavename = PropertyHelper.getProperty(p, "slave.name");
127:
128: if (isWin32) {
129: _renameQueue = new HashSet();
130: }
131:
132: try {
133: _ctx = SSLGetContext.getSSLContext();
134: } catch (Exception e) {
135: logger.warn("Error loading SSLContext", e);
136: }
137:
138: if (_sslMaster) {
139: _s = _ctx.getSocketFactory().createSocket();
140: } else {
141: _s = new Socket();
142: }
143:
144: try {
145: _timeout = Integer.parseInt(PropertyHelper.getProperty(p,
146: "slave.timeout"));
147: } catch (NullPointerException e) {
148: _timeout = actualTimeout;
149: }
150: _s.setSoTimeout(socketTimeout);
151: _s.connect(addr);
152: if (_s instanceof SSLSocket) {
153: ((SSLSocket) _s).setUseClientMode(true);
154: ((SSLSocket) _s).startHandshake();
155: }
156: _sout = new ObjectOutputStream(_s.getOutputStream());
157: _sin = new ObjectInputStream(_s.getInputStream());
158:
159: //TODO sendReply()
160: _sout.writeObject(slavename);
161: _sout.flush();
162: _sout.reset();
163:
164: _uploadChecksums = p.getProperty("enableuploadchecksums",
165: "true").equals("true");
166: _downloadChecksums = p.getProperty("enabledownloadchecksums",
167: "true").equals("true");
168: _bufferSize = Integer
169: .parseInt(p.getProperty("bufferSize", "0"));
170: _roots = getDefaultRootBasket(p);
171: _transfers = new HashMap();
172:
173: try {
174: int minport = Integer.parseInt(p
175: .getProperty("slave.portfrom"));
176: int maxport = Integer.parseInt(p
177: .getProperty("slave.portto"));
178: _portRange = new PortRange(minport, maxport,
179: getBufferSize());
180: } catch (NumberFormatException e) {
181: _portRange = new PortRange(getBufferSize());
182: }
183: }
184:
185: public static RootCollection getDefaultRootBasket(Properties cfg)
186: throws IOException {
187: RootCollection roots;
188:
189: // START: RootBasket
190: //long defaultMinSpaceFree = Bytes.parseBytes(cfg.getProperty(
191: // "slave.minspacefree", "50mb"));
192: ArrayList rootStrings = new ArrayList();
193:
194: for (int i = 1; true; i++) {
195: String rootString = cfg.getProperty("slave.root." + i);
196:
197: if (rootString == null) {
198: break;
199: }
200:
201: logger.info("slave.root." + i + ": " + rootString);
202:
203: /*
204: * long minSpaceFree;
205: *
206: * try { minSpaceFree = Long.parseLong(cfg.getProperty("slave.root." +
207: * i + ".minspacefree")); } catch (NumberFormatException ex) {
208: * minSpaceFree = defaultMinSpaceFree; }
209: *
210: * int priority;
211: *
212: * try { priority = Integer.parseInt(cfg.getProperty("slave.root." +
213: * i + ".priority")); } catch (NumberFormatException ex) { priority =
214: * 0; }
215: */
216: rootStrings.add(new Root(rootString));
217: }
218:
219: roots = new RootCollection(rootStrings);
220:
221: // END: RootBasket
222: System.gc();
223:
224: return roots;
225: }
226:
227: public static void main(String[] args) throws Exception {
228: BasicConfigurator.configure();
229: System.out
230: .println("DrFTPD Slave starting, further logging will be done through log4j");
231:
232: Properties p = new Properties();
233: p.load(new FileInputStream("slave.conf"));
234:
235: Slave s = new Slave(p);
236: if (isWin32) {
237: s.startFileLockThread();
238: }
239: try {
240: s.sendResponse(new AsyncResponseDiskStatus(s
241: .getDiskStatus()));
242: } catch (Throwable t) {
243: logger
244: .fatal("Error, check config on master for this slave");
245: }
246: s.listenForCommands();
247: }
248:
249: public class FileLockRunnable implements Runnable {
250:
251: public void run() {
252: while (true) {
253: synchronized (_transfers) {
254: try {
255: _transfers.wait(300000);
256: } catch (InterruptedException e) {
257: }
258: synchronized (_renameQueue) {
259: for (Iterator iter = _renameQueue.iterator(); iter
260: .hasNext();) {
261: QueuedOperation qo = (QueuedOperation) iter
262: .next();
263: if (qo.getDestination() == null) { // delete
264: try {
265: delete(qo.getSource());
266: // delete successfull
267: iter.remove();
268: } catch (PermissionDeniedException e) {
269: // keep it in the queue
270: } catch (FileNotFoundException e) {
271: iter.remove();
272: } catch (IOException e) {
273: throw new RuntimeException(
274: "Win32 stinks", e);
275: }
276: } else { // rename
277: String fileName = qo
278: .getDestination()
279: .substring(
280: qo.getDestination()
281: .lastIndexOf(
282: "/") + 1);
283: String destDir = qo.getDestination()
284: .substring(
285: 0,
286: qo.getDestination()
287: .lastIndexOf(
288: "/"));
289: try {
290: rename(qo.getSource(), destDir,
291: fileName);
292: // rename successfull
293: iter.remove();
294: } catch (PermissionDeniedException e) {
295: // keep it in the queue
296: } catch (FileNotFoundException e) {
297: iter.remove();
298: } catch (IOException e) {
299: throw new RuntimeException(
300: "Win32 stinks", e);
301: }
302: }
303: }
304: }
305: }
306: }
307: }
308: }
309:
310: private void startFileLockThread() {
311: Thread t = new Thread(new FileLockRunnable());
312: t.setName("FileLockThread");
313: t.start();
314: }
315:
316: public void addTransfer(Transfer transfer) {
317: synchronized (_transfers) {
318: _transfers.put(transfer.getTransferIndex(), transfer);
319: }
320: }
321:
322: public long checkSum(String path) throws IOException {
323: logger.debug("Checksumming: " + path);
324:
325: CheckedInputStream in = null;
326:
327: try {
328: CRC32 crc32 = new CRC32();
329: in = new CheckedInputStream(new FileInputStream(_roots
330: .getFile(path)), crc32);
331:
332: byte[] buf = new byte[4096];
333:
334: while (in.read(buf) != -1) {
335: }
336:
337: return crc32.getValue();
338: } finally {
339: if (in != null) {
340: in.close();
341: }
342: }
343: }
344:
345: public void delete(String path) throws IOException {
346: // now deletes files as well as directories, recursive!
347: Collection files = null;
348: try {
349: files = _roots.getMultipleRootsForFile(path);
350: } catch (FileNotFoundException e) {
351: // all is good, it's already gone
352: return;
353: }
354:
355: for (Iterator iter = files.iterator(); iter.hasNext();) {
356: Root root = (Root) iter.next();
357: File file = root.getFile(path);
358:
359: if (!file.exists()) {
360: iter.remove();
361: continue;
362: // should never occur
363: }
364:
365: if (file.isDirectory()) {
366: if (!file.deleteRecursive()) {
367: throw new PermissionDeniedException(
368: "delete failed on " + path);
369: }
370: logger.info("DELETEDIR: " + path);
371: } else if (file.isFile()) {
372: File dir = new File(file.getParentFile());
373: logger.info("DELETE: " + path);
374: file.delete();
375:
376: String[] dirList = dir.list();
377:
378: while ((dirList != null) && (dirList.length == 0)) {
379: if (dir.getPath().length() <= root.getPath()
380: .length()) {
381: break;
382: }
383:
384: java.io.File tmpFile = dir.getParentFile();
385:
386: dir.delete();
387: logger.info("rmdir: " + dir.getPath());
388:
389: if (tmpFile == null) {
390: break;
391: }
392: dir = new File(tmpFile);
393:
394: dirList = dir.list();
395: }
396: }
397: }
398: }
399:
400: public int getBufferSize() {
401: return _bufferSize;
402: }
403:
404: public boolean getDownloadChecksums() {
405: return _downloadChecksums;
406: }
407:
408: public ID3Tag getID3v1Tag(String path) throws IOException {
409: String absPath = _roots.getFile(path).getAbsolutePath();
410: logger.warn("Extracting ID3Tag info from: " + absPath);
411:
412: MP3File mp3 = null;
413: try {
414: mp3 = new MP3File(absPath, "r");
415:
416: if (!mp3.hasID3v1Tag) {
417: mp3.close();
418: throw new IOException("No id3tag found for " + absPath);
419: }
420:
421: ID3Tag id3tag = mp3.readID3v1Tag();
422: mp3.close();
423:
424: return id3tag;
425: } catch (FileNotFoundException e) {
426: logger.warn("FileNotFoundException: ", e);
427: } catch (IOException e) {
428: logger.warn("IOException: ", e);
429: } finally {
430: if (mp3 != null)
431: mp3.close();
432: }
433:
434: return null;
435: }
436:
437: public RootCollection getRoots() {
438: return _roots;
439: }
440:
441: private LightSFVFile getSFVFile(String path) throws IOException {
442: return new LightSFVFile(new BufferedReader(new FileReader(
443: _roots.getFile(path))));
444: }
445:
446: private String getDIZFile(String path) throws IOException {
447: ZipEntry zipEntry = null;
448: ZipInputStream zipInput = null;
449: byte[] buf = new byte[20 * 1024];
450: int numRd;
451: try {
452:
453: zipInput = new ZipInputStream(new BufferedInputStream(
454: new FileInputStream(_roots.getFile(path))));
455:
456: // Access a list of all of the files in the zip archive
457: while ((zipEntry = zipInput.getNextEntry()) != null) {
458: // Is this entry a DIZ file?
459: if (zipEntry.getName().toLowerCase().endsWith(".diz")) {
460: // Read 20 KBytes from the DIZ file, hopefully this
461: // will be the entire file.
462: numRd = zipInput.read(buf, 0, 20 * 1024);
463:
464: if (numRd > 0) {
465: return new String(buf, 0, numRd);
466: } else {
467: throw new FileNotFoundException(
468: "0 bytes read from .zip file - " + path);
469: }
470:
471: }
472: }
473: } catch (Throwable t) {
474: logger.error("Error extracting .diz from zipfile", t);
475: } finally {
476: try {
477: if (zipInput != null) {
478: zipInput.close();
479: }
480: } catch (IOException e) {
481: }
482: }
483: throw new FileNotFoundException("No diz entry in - " + path);
484: }
485:
486: // public LinkedRemoteFile getSlaveRoot() throws IOException {
487: // return Slave.getDefaultRoot(_roots);
488: // }
489:
490: public DiskStatus getDiskStatus() {
491: return new DiskStatus(_roots.getTotalDiskSpaceAvailable(),
492: _roots.getTotalDiskSpaceCapacity());
493: }
494:
495: public Transfer getTransfer(TransferIndex index) {
496: synchronized (_transfers) {
497: return (Transfer) _transfers.get(index);
498: }
499: }
500:
501: public boolean getUploadChecksums() {
502: return _uploadChecksums;
503: }
504:
505: private AsyncResponse handleChecksum(AsyncCommandArgument ac) {
506: try {
507: return new AsyncResponseChecksum(ac.getIndex(), checkSum(ac
508: .getArgs()));
509: } catch (IOException e) {
510: return new AsyncResponseException(ac.getIndex(), e);
511: }
512: }
513:
514: private AsyncResponse handleCommand(AsyncCommand ac) {
515: if (ac.getName().equals("remerge")) {
516: return handleRemerge((AsyncCommandArgument) ac);
517: }
518:
519: if (ac.getName().equals("checksum")) {
520: return handleChecksum((AsyncCommandArgument) ac);
521: }
522:
523: if (ac.getName().equals("connect")) {
524: return handleConnect((AsyncCommandArgument) ac);
525: }
526:
527: if (ac.getName().equals("delete")) {
528: return handleDelete((AsyncCommandArgument) ac);
529: }
530:
531: if (ac.getName().equals("id3tag")) {
532: return handleID3Tag((AsyncCommandArgument) ac);
533: }
534:
535: if (ac.getName().equals("listen")) {
536: return handleListen((AsyncCommandArgument) ac);
537: }
538:
539: if (ac.getName().equals("maxpath")) {
540: return handleMaxpath(ac);
541: }
542:
543: if (ac.getName().equals("ping")) {
544: return handlePing(ac);
545: }
546:
547: if (ac.getName().equals("receive")) {
548: return handleReceive((AsyncCommandArgument) ac);
549: }
550:
551: if (ac.getName().equals("rename")) {
552: return handleRename((AsyncCommandArgument) ac);
553: }
554:
555: if (ac.getName().equals("sfvfile")) {
556: return handleSfvFile((AsyncCommandArgument) ac);
557: }
558:
559: if (ac.getName().equals("dizfile")) {
560: return handleDIZFile((AsyncCommandArgument) ac);
561: }
562:
563: if (ac.getName().equals("send")) {
564: return handleSend((AsyncCommandArgument) ac);
565: }
566:
567: if (ac.getName().equals("abort")) {
568: handleAbort((AsyncCommandArgument) ac);
569:
570: return null;
571: }
572:
573: if (ac.getIndex().equals("shutdown")) {
574: logger.info("The master has requested that I shutdown");
575: System.exit(0);
576: }
577:
578: if (ac.getIndex().equals("error")) {
579: System.err.println("error - " + ac);
580: System.exit(0);
581: }
582:
583: return new AsyncResponseException(ac.getIndex(), new Exception(
584: ac.getName() + " - Operation Not Supported"));
585: }
586:
587: private void handleAbort(AsyncCommandArgument aca) {
588: String[] args = aca.getArgs().split(",");
589: TransferIndex ti = new TransferIndex(Integer.parseInt(args[0]));
590:
591: if (!_transfers.containsKey(ti)) {
592: return;
593: }
594:
595: Transfer t = (Transfer) _transfers.get(ti);
596: t.abort(args[1]);
597: }
598:
599: private AsyncResponse handleConnect(AsyncCommandArgument ac) {
600: String[] data = ac.getArgs().split(",");
601: String[] data2 = data[0].split(":");
602: boolean encrypted = data[1].equals("true");
603: boolean useSSLClientHandshake = data[2].equals("true");
604: InetAddress address;
605:
606: try {
607: address = InetAddress.getByName(data2[0]);
608: } catch (UnknownHostException e1) {
609: return new AsyncResponseException(ac.getIndex(), e1);
610: }
611:
612: int port = Integer.parseInt(data2[1]);
613: Transfer t = new Transfer(new ActiveConnection(encrypted ? _ctx
614: : null, new InetSocketAddress(address, port),
615: useSSLClientHandshake), this , new TransferIndex());
616: addTransfer(t);
617:
618: return new AsyncResponseTransfer(ac.getIndex(),
619: new ConnectInfo(port, t.getTransferIndex(), t
620: .getTransferStatus()));
621: }
622:
623: private AsyncResponse handleDelete(AsyncCommandArgument ac) {
624: try {
625: try {
626: delete(mapPathToRenameQueue(ac.getArgs()));
627: } catch (PermissionDeniedException e) {
628: if (isWin32) {
629: synchronized (_renameQueue) {
630: _renameQueue.add(new QueuedOperation(ac
631: .getArgs(), null));
632: }
633: } else {
634: throw e;
635: }
636: }
637: sendResponse(new AsyncResponseDiskStatus(getDiskStatus()));
638: return new AsyncResponse(ac.getIndex());
639: } catch (IOException e) {
640: return new AsyncResponseException(ac.getIndex(), e);
641: }
642: }
643:
644: private AsyncResponse handleID3Tag(AsyncCommandArgument ac) {
645: try {
646: return new AsyncResponseID3Tag(ac.getIndex(),
647: getID3v1Tag(mapPathToRenameQueue(ac.getArgs())));
648: } catch (IOException e) {
649: return new AsyncResponseException(ac.getIndex(), e);
650: }
651: }
652:
653: private AsyncResponse handleListen(AsyncCommandArgument ac) {
654: String[] data = ac.getArgs().split(":");
655: boolean encrypted = data[0].equals("true");
656: boolean useSSLClientMode = data[1].equals("true");
657: PassiveConnection c = null;
658:
659: try {
660: c = new PassiveConnection(encrypted ? _ctx : null,
661: _portRange, useSSLClientMode);
662: } catch (IOException e) {
663: return new AsyncResponseException(ac.getIndex(), e);
664: }
665:
666: Transfer t = new Transfer(c, this , new TransferIndex());
667: addTransfer(t);
668:
669: return new AsyncResponseTransfer(ac.getIndex(),
670: new ConnectInfo(c.getLocalPort(), t.getTransferIndex(),
671: t.getTransferStatus()));
672: }
673:
674: private AsyncResponse handleMaxpath(AsyncCommand ac) {
675: return new AsyncResponseMaxPath(ac.getIndex(), isWin32 ? 255
676: : Integer.MAX_VALUE);
677: }
678:
679: private AsyncResponse handlePing(AsyncCommand ac) {
680: return new AsyncResponse(ac.getIndex());
681: }
682:
683: private AsyncResponse handleReceive(AsyncCommandArgument ac) {
684: StringTokenizer st = new StringTokenizer(ac.getArgs(), ",");
685: char type = st.nextToken().charAt(0);
686: long position = Long.parseLong(st.nextToken());
687: TransferIndex transferIndex = new TransferIndex(Integer
688: .parseInt(st.nextToken()));
689: String path = mapPathToRenameQueue(st.nextToken());
690: String fileName = path.substring(path.lastIndexOf("/") + 1);
691: String dirName = path.substring(0, path.lastIndexOf("/"));
692: Transfer t = getTransfer(transferIndex);
693: sendResponse(new AsyncResponse(ac.getIndex())); // return calling thread
694: // on master
695: try {
696: return new AsyncResponseTransferStatus(t.receiveFile(
697: dirName, type, fileName, position));
698: } catch (IOException e) {
699: return (new AsyncResponseTransferStatus(new TransferStatus(
700: transferIndex, e)));
701: }
702: }
703:
704: private AsyncResponse handleRemerge(AsyncCommandArgument ac) {
705: try {
706: handleRemergeRecursive(new FileRemoteFile(_roots));
707:
708: return new AsyncResponse(ac.getIndex());
709: } catch (Throwable e) {
710: logger.error("Exception during merging", e);
711:
712: return new AsyncResponseException(ac.getIndex(), e);
713: }
714: }
715:
716: private void handleRemergeRecursive(FileRemoteFile dir) {
717: //sendResponse(new AsyncResponseRemerge(file.getPath(),
718: // file.getFiles()));
719: CaseInsensitiveHashtable mergeFiles = new CaseInsensitiveHashtable();
720:
721: Collection files = dir.getFiles();
722:
723: for (Iterator iter = files.iterator(); iter.hasNext();) {
724: FileRemoteFile file = (FileRemoteFile) iter.next();
725:
726: // need to send directories and files
727: mergeFiles.put(file.getName(), new LightRemoteFile(file));
728:
729: //keep only dirs for recursiveness
730: if (!file.isDirectory()) {
731: iter.remove();
732: }
733: }
734:
735: sendResponse(new AsyncResponseRemerge(dir.getPath(), mergeFiles));
736:
737: for (Iterator iter = files.iterator(); iter.hasNext();) {
738: FileRemoteFile file = (FileRemoteFile) iter.next();
739: handleRemergeRecursive(file);
740: }
741: }
742:
743: private AsyncResponse handleRename(AsyncCommandArgument ac) {
744: StringTokenizer st = new StringTokenizer(ac.getArgs(), ",");
745: String from = mapPathToRenameQueue(st.nextToken());
746: String toDir = st.nextToken();
747: String toFile = st.nextToken();
748:
749: try {
750: try {
751: rename(from, toDir, toFile);
752: } catch (PermissionDeniedException e) {
753: if (isWin32) {
754: String simplePath = null;
755: if (toDir.endsWith("/")) {
756: simplePath = toDir + toFile;
757: } else {
758: simplePath = toDir + "/" + toFile;
759: }
760: synchronized (_renameQueue) {
761: _renameQueue.add(new QueuedOperation(from,
762: simplePath));
763: }
764: } else {
765: throw e;
766: }
767: }
768:
769: return new AsyncResponse(ac.getIndex());
770: } catch (IOException e) {
771: return new AsyncResponseException(ac.getIndex(), e);
772: }
773: }
774:
775: private AsyncResponse handleSend(AsyncCommandArgument ac) {
776: StringTokenizer st = new StringTokenizer(ac.getArgs(), ",");
777: char type = st.nextToken().charAt(0);
778: long position = Long.parseLong(st.nextToken());
779: TransferIndex transferIndex = new TransferIndex(Integer
780: .parseInt(st.nextToken()));
781: String path = mapPathToRenameQueue(st.nextToken());
782: Transfer t = getTransfer(transferIndex);
783: sendResponse(new AsyncResponse(ac.getIndex())); // return
784:
785: // calling thread on master
786: try {
787: return new AsyncResponseTransferStatus(t.sendFile(path,
788: type, position));
789: } catch (IOException e) {
790: return new AsyncResponseTransferStatus(new TransferStatus(t
791: .getTransferIndex(), e));
792: }
793: }
794:
795: private AsyncResponse handleSfvFile(AsyncCommandArgument ac) {
796: try {
797: return new AsyncResponseSFVFile(ac.getIndex(),
798: getSFVFile(mapPathToRenameQueue(ac.getArgs())));
799: } catch (IOException e) {
800: return new AsyncResponseException(ac.getIndex(), e);
801: }
802: }
803:
804: private AsyncResponse handleDIZFile(AsyncCommandArgument ac) {
805: try {
806: return new AsyncResponseDIZFile(ac.getIndex(),
807: getDIZFile(ac.getArgs()));
808: } catch (IOException e) {
809: return new AsyncResponseException(ac.getIndex(), e);
810: }
811: }
812:
813: private void listenForCommands() throws IOException {
814: long lastCommandReceived = System.currentTimeMillis();
815: while (true) {
816: AsyncCommand ac = null;
817:
818: try {
819: ac = (AsyncCommand) _sin.readObject();
820:
821: if (ac == null) {
822: continue;
823: }
824: lastCommandReceived = System.currentTimeMillis();
825: } catch (ClassNotFoundException e) {
826: throw new RuntimeException(e);
827: } catch (EOFException e) {
828: logger
829: .debug("Lost connection to the master, may have been kicked offline");
830: return;
831: } catch (SocketTimeoutException e) {
832: // if no communication for slave.timeout (_timeout) time, than
833: // connection to the master is dead or there is a configuration
834: // error
835: if (_timeout < (System.currentTimeMillis() - lastCommandReceived)) {
836: logger
837: .error("Slave is going offline as it hasn't received any communication from the master in "
838: + (System.currentTimeMillis() - lastCommandReceived)
839: + " milliseconds");
840: throw new RuntimeException(e);
841: }
842: continue;
843: }
844:
845: logger.debug("Slave fetched " + ac);
846: class AsyncCommandHandler implements Runnable {
847: private AsyncCommand _command = null;
848:
849: public AsyncCommandHandler(AsyncCommand command) {
850: _command = command;
851: }
852:
853: public void run() {
854: try {
855: sendResponse(handleCommand(_command));
856: } catch (Throwable e) {
857: sendResponse(new AsyncResponseException(
858: _command.getIndex(), e));
859: }
860: }
861: }
862: Thread t = new Thread(new AsyncCommandHandler(ac));
863: t.setName("AsyncCommandHandler - " + ac.getClass());
864: t.start();
865: }
866: }
867:
868: public String mapPathToRenameQueue(String path) {
869: if (!isWin32) { // there is no renameQueue
870: return path;
871: }
872: synchronized (_renameQueue) {
873: for (Iterator iter = _renameQueue.iterator(); iter
874: .hasNext();) {
875: QueuedOperation qo = (QueuedOperation) iter.next();
876: if (qo.getDestination() == null) {
877: continue;
878: }
879: if (qo.getDestination().equals(path)) {
880: return qo.getSource();
881: }
882: }
883: return path;
884: }
885: }
886:
887: public void removeTransfer(Transfer transfer) {
888: synchronized (_transfers) {
889: if (_transfers.remove(transfer.getTransferIndex()) == null) {
890: throw new IllegalStateException();
891: }
892: _transfers.notifyAll();
893: }
894: }
895:
896: public void rename(String from, String toDirPath, String toName)
897: throws IOException {
898: for (Iterator iter = _roots.iterator(); iter.hasNext();) {
899: Root root = (Root) iter.next();
900:
901: File fromfile = root.getFile(from);
902:
903: if (!fromfile.exists()) {
904: continue;
905: }
906:
907: File toDir = root.getFile(toDirPath);
908: toDir.mkdirs();
909:
910: File tofile = new File(toDir.getPath() + File.separator
911: + toName);
912:
913: //!win32 == true on linux
914: //!win32 && equalsignore == true on win32
915: if (tofile.exists()
916: && !(isWin32 && fromfile.getName()
917: .equalsIgnoreCase(toName))) {
918: throw new FileExistsException("cannot rename from "
919: + fromfile + " to " + tofile
920: + ", destination exists");
921: }
922:
923: if (!fromfile.renameTo(tofile)) {
924: throw new PermissionDeniedException("renameTo("
925: + fromfile + ", " + tofile + ") failed");
926: }
927: }
928: }
929:
930: protected synchronized void sendResponse(AsyncResponse response) {
931: if (response == null) {
932: // handler doesn't return anything or it sends reply on it's own
933: // (threaded for example)
934: return;
935: }
936:
937: try {
938: _sout.writeObject(response);
939: _sout.flush();
940: _sout.reset();
941: if (!(response instanceof AsyncResponseTransferStatus)) {
942: logger.debug("Slave wrote response - " + response);
943: }
944:
945: if (response instanceof AsyncResponseException) {
946: logger.debug("", ((AsyncResponseException) response)
947: .getThrowable());
948: }
949: } catch (IOException e) {
950: throw new RuntimeException(e);
951: }
952: }
953:
954: /**
955: * @return The current list of Transfer objects
956: */
957: public ArrayList getTransfers() {
958: synchronized (_transfers) {
959: return new ArrayList(_transfers.values());
960: }
961: }
962: }
|