Source Code Cross Referenced for Slave.java in  » Net » DrFTPD » org » drftpd » slave » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » DrFTPD » org.drftpd.slave 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * This file is part of DrFTPD, Distributed FTP Daemon.
003:         *
004:         * DrFTPD is free software; you can redistribute it and/or modify
005:         * it under the terms of the GNU General Public License as published by
006:         * the Free Software Foundation; either version 2 of the License, or
007:         * (at your option) any later version.
008:         *
009:         * DrFTPD is distributed in the hope that it will be useful,
010:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
011:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
012:         * GNU General Public License for more details.
013:         *
014:         * You should have received a copy of the GNU General Public License
015:         * along with DrFTPD; if not, write to the Free Software
016:         * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
017:         */
018:        package org.drftpd.slave;
019:
020:        import java.io.BufferedInputStream;
021:        import java.io.BufferedReader;
022:        import java.io.EOFException;
023:        import java.io.FileInputStream;
024:        import java.io.FileNotFoundException;
025:        import java.io.FileReader;
026:        import java.io.IOException;
027:        import java.io.ObjectInputStream;
028:        import java.io.ObjectOutputStream;
029:        import java.net.InetAddress;
030:        import java.net.InetSocketAddress;
031:        import java.net.Socket;
032:        import java.net.SocketTimeoutException;
033:        import java.net.UnknownHostException;
034:        import java.util.ArrayList;
035:        import java.util.Collection;
036:        import java.util.HashMap;
037:        import java.util.HashSet;
038:        import java.util.Iterator;
039:        import java.util.Properties;
040:        import java.util.Set;
041:        import java.util.zip.CRC32;
042:        import java.util.zip.CheckedInputStream;
043:        import java.util.zip.ZipEntry;
044:        import java.util.zip.ZipInputStream;
045:
046:        import javax.net.ssl.SSLContext;
047:        import javax.net.ssl.SSLSocket;
048:
049:        import net.sf.drftpd.FileExistsException;
050:        import net.sf.drftpd.util.PortRange;
051:
052:        import org.apache.log4j.BasicConfigurator;
053:        import org.apache.log4j.Logger;
054:        import org.drftpd.ActiveConnection;
055:        import org.drftpd.LightSFVFile;
056:        import org.drftpd.PassiveConnection;
057:        import org.drftpd.PropertyHelper;
058:        import org.drftpd.SSLGetContext;
059:        import org.drftpd.id3.ID3Tag;
060:        import org.drftpd.id3.MP3File;
061:        import org.drftpd.master.QueuedOperation;
062:        import org.drftpd.remotefile.CaseInsensitiveHashtable;
063:        import org.drftpd.remotefile.LightRemoteFile;
064:        import org.drftpd.slave.async.AsyncCommand;
065:        import org.drftpd.slave.async.AsyncCommandArgument;
066:        import org.drftpd.slave.async.AsyncResponse;
067:        import org.drftpd.slave.async.AsyncResponseChecksum;
068:        import org.drftpd.slave.async.AsyncResponseDIZFile;
069:        import org.drftpd.slave.async.AsyncResponseDiskStatus;
070:        import org.drftpd.slave.async.AsyncResponseException;
071:        import org.drftpd.slave.async.AsyncResponseID3Tag;
072:        import org.drftpd.slave.async.AsyncResponseMaxPath;
073:        import org.drftpd.slave.async.AsyncResponseRemerge;
074:        import org.drftpd.slave.async.AsyncResponseSFVFile;
075:        import org.drftpd.slave.async.AsyncResponseTransfer;
076:        import org.drftpd.slave.async.AsyncResponseTransferStatus;
077:
078:        import se.mog.io.File;
079:        import se.mog.io.PermissionDeniedException;
080:
081:        import com.Ostermiller.util.StringTokenizer;
082:
083:        /**
084:         * @author mog
085:         * @author zubov
086:         * @version $Id: Slave.java 1703 2007-04-08 18:19:23Z tdsoul $
087:         */
088:        public class Slave {
089:            public static final boolean isWin32 = System.getProperty("os.name")
090:                    .startsWith("Windows");
091:            private static final Logger logger = Logger.getLogger(Slave.class);
092:            private static final int socketTimeout = 10000; // 10 seconds, for Socket
093:            protected static final int actualTimeout = 60000; // one minute, evaluated on a SocketTimeout
094:            public static final String VERSION = "DrFTPD 2.0.5";
095:            private int _bufferSize;
096:            private SSLContext _ctx;
097:            private boolean _downloadChecksums;
098:            private RootCollection _roots;
099:            private Socket _s;
100:            private ObjectInputStream _sin;
101:            private ObjectOutputStream _sout;
102:            private HashMap _transfers;
103:            private boolean _uploadChecksums;
104:            private PortRange _portRange;
105:            private Set _renameQueue = null;
106:            private int _timeout;
107:            private boolean _sslMaster;
108:
109:            protected Slave() {
110:
111:            }
112:
113:            public Slave(Properties p) throws IOException {
114:                InetSocketAddress addr = new InetSocketAddress(PropertyHelper
115:                        .getProperty(p, "master.host"), Integer
116:                        .parseInt(PropertyHelper.getProperty(p,
117:                                "master.bindport")));
118:                _sslMaster = p.getProperty("slave.masterSSL", "false")
119:                        .equalsIgnoreCase("true");
120:
121:                // Whatever interface the slave uses to connect to the master, is the 
122:                // interface that the master will report to clients requesting PASV transfers 
123:                // from this slave, unless pasv_addr is set on the master for this slave
124:                logger.info("Connecting to master at " + addr);
125:
126:                String slavename = PropertyHelper.getProperty(p, "slave.name");
127:
128:                if (isWin32) {
129:                    _renameQueue = new HashSet();
130:                }
131:
132:                try {
133:                    _ctx = SSLGetContext.getSSLContext();
134:                } catch (Exception e) {
135:                    logger.warn("Error loading SSLContext", e);
136:                }
137:
138:                if (_sslMaster) {
139:                    _s = _ctx.getSocketFactory().createSocket();
140:                } else {
141:                    _s = new Socket();
142:                }
143:
144:                try {
145:                    _timeout = Integer.parseInt(PropertyHelper.getProperty(p,
146:                            "slave.timeout"));
147:                } catch (NullPointerException e) {
148:                    _timeout = actualTimeout;
149:                }
150:                _s.setSoTimeout(socketTimeout);
151:                _s.connect(addr);
152:                if (_s instanceof  SSLSocket) {
153:                    ((SSLSocket) _s).setUseClientMode(true);
154:                    ((SSLSocket) _s).startHandshake();
155:                }
156:                _sout = new ObjectOutputStream(_s.getOutputStream());
157:                _sin = new ObjectInputStream(_s.getInputStream());
158:
159:                //TODO sendReply()
160:                _sout.writeObject(slavename);
161:                _sout.flush();
162:                _sout.reset();
163:
164:                _uploadChecksums = p.getProperty("enableuploadchecksums",
165:                        "true").equals("true");
166:                _downloadChecksums = p.getProperty("enabledownloadchecksums",
167:                        "true").equals("true");
168:                _bufferSize = Integer
169:                        .parseInt(p.getProperty("bufferSize", "0"));
170:                _roots = getDefaultRootBasket(p);
171:                _transfers = new HashMap();
172:
173:                try {
174:                    int minport = Integer.parseInt(p
175:                            .getProperty("slave.portfrom"));
176:                    int maxport = Integer.parseInt(p
177:                            .getProperty("slave.portto"));
178:                    _portRange = new PortRange(minport, maxport,
179:                            getBufferSize());
180:                } catch (NumberFormatException e) {
181:                    _portRange = new PortRange(getBufferSize());
182:                }
183:            }
184:
185:            public static RootCollection getDefaultRootBasket(Properties cfg)
186:                    throws IOException {
187:                RootCollection roots;
188:
189:                // START: RootBasket
190:                //long defaultMinSpaceFree = Bytes.parseBytes(cfg.getProperty(
191:                //             "slave.minspacefree", "50mb"));
192:                ArrayList rootStrings = new ArrayList();
193:
194:                for (int i = 1; true; i++) {
195:                    String rootString = cfg.getProperty("slave.root." + i);
196:
197:                    if (rootString == null) {
198:                        break;
199:                    }
200:
201:                    logger.info("slave.root." + i + ": " + rootString);
202:
203:                    /*
204:                     * long minSpaceFree;
205:                     *
206:                     * try { minSpaceFree = Long.parseLong(cfg.getProperty("slave.root." +
207:                     * i + ".minspacefree")); } catch (NumberFormatException ex) {
208:                     * minSpaceFree = defaultMinSpaceFree; }
209:                     *
210:                     * int priority;
211:                     *
212:                     * try { priority = Integer.parseInt(cfg.getProperty("slave.root." +
213:                     * i + ".priority")); } catch (NumberFormatException ex) { priority =
214:                     * 0; }
215:                     */
216:                    rootStrings.add(new Root(rootString));
217:                }
218:
219:                roots = new RootCollection(rootStrings);
220:
221:                // END: RootBasket
222:                System.gc();
223:
224:                return roots;
225:            }
226:
227:            public static void main(String[] args) throws Exception {
228:                BasicConfigurator.configure();
229:                System.out
230:                        .println("DrFTPD Slave starting, further logging will be done through log4j");
231:
232:                Properties p = new Properties();
233:                p.load(new FileInputStream("slave.conf"));
234:
235:                Slave s = new Slave(p);
236:                if (isWin32) {
237:                    s.startFileLockThread();
238:                }
239:                try {
240:                    s.sendResponse(new AsyncResponseDiskStatus(s
241:                            .getDiskStatus()));
242:                } catch (Throwable t) {
243:                    logger
244:                            .fatal("Error, check config on master for this slave");
245:                }
246:                s.listenForCommands();
247:            }
248:
249:            public class FileLockRunnable implements  Runnable {
250:
251:                public void run() {
252:                    while (true) {
253:                        synchronized (_transfers) {
254:                            try {
255:                                _transfers.wait(300000);
256:                            } catch (InterruptedException e) {
257:                            }
258:                            synchronized (_renameQueue) {
259:                                for (Iterator iter = _renameQueue.iterator(); iter
260:                                        .hasNext();) {
261:                                    QueuedOperation qo = (QueuedOperation) iter
262:                                            .next();
263:                                    if (qo.getDestination() == null) { // delete
264:                                        try {
265:                                            delete(qo.getSource());
266:                                            // delete successfull
267:                                            iter.remove();
268:                                        } catch (PermissionDeniedException e) {
269:                                            // keep it in the queue
270:                                        } catch (FileNotFoundException e) {
271:                                            iter.remove();
272:                                        } catch (IOException e) {
273:                                            throw new RuntimeException(
274:                                                    "Win32 stinks", e);
275:                                        }
276:                                    } else { // rename
277:                                        String fileName = qo
278:                                                .getDestination()
279:                                                .substring(
280:                                                        qo.getDestination()
281:                                                                .lastIndexOf(
282:                                                                        "/") + 1);
283:                                        String destDir = qo.getDestination()
284:                                                .substring(
285:                                                        0,
286:                                                        qo.getDestination()
287:                                                                .lastIndexOf(
288:                                                                        "/"));
289:                                        try {
290:                                            rename(qo.getSource(), destDir,
291:                                                    fileName);
292:                                            // rename successfull
293:                                            iter.remove();
294:                                        } catch (PermissionDeniedException e) {
295:                                            // keep it in the queue
296:                                        } catch (FileNotFoundException e) {
297:                                            iter.remove();
298:                                        } catch (IOException e) {
299:                                            throw new RuntimeException(
300:                                                    "Win32 stinks", e);
301:                                        }
302:                                    }
303:                                }
304:                            }
305:                        }
306:                    }
307:                }
308:            }
309:
310:            private void startFileLockThread() {
311:                Thread t = new Thread(new FileLockRunnable());
312:                t.setName("FileLockThread");
313:                t.start();
314:            }
315:
316:            public void addTransfer(Transfer transfer) {
317:                synchronized (_transfers) {
318:                    _transfers.put(transfer.getTransferIndex(), transfer);
319:                }
320:            }
321:
322:            public long checkSum(String path) throws IOException {
323:                logger.debug("Checksumming: " + path);
324:
325:                CheckedInputStream in = null;
326:
327:                try {
328:                    CRC32 crc32 = new CRC32();
329:                    in = new CheckedInputStream(new FileInputStream(_roots
330:                            .getFile(path)), crc32);
331:
332:                    byte[] buf = new byte[4096];
333:
334:                    while (in.read(buf) != -1) {
335:                    }
336:
337:                    return crc32.getValue();
338:                } finally {
339:                    if (in != null) {
340:                        in.close();
341:                    }
342:                }
343:            }
344:
345:            public void delete(String path) throws IOException {
346:                // now deletes files as well as directories, recursive!
347:                Collection files = null;
348:                try {
349:                    files = _roots.getMultipleRootsForFile(path);
350:                } catch (FileNotFoundException e) {
351:                    // all is good, it's already gone
352:                    return;
353:                }
354:
355:                for (Iterator iter = files.iterator(); iter.hasNext();) {
356:                    Root root = (Root) iter.next();
357:                    File file = root.getFile(path);
358:
359:                    if (!file.exists()) {
360:                        iter.remove();
361:                        continue;
362:                        // should never occur
363:                    }
364:
365:                    if (file.isDirectory()) {
366:                        if (!file.deleteRecursive()) {
367:                            throw new PermissionDeniedException(
368:                                    "delete failed on " + path);
369:                        }
370:                        logger.info("DELETEDIR: " + path);
371:                    } else if (file.isFile()) {
372:                        File dir = new File(file.getParentFile());
373:                        logger.info("DELETE: " + path);
374:                        file.delete();
375:
376:                        String[] dirList = dir.list();
377:
378:                        while ((dirList != null) && (dirList.length == 0)) {
379:                            if (dir.getPath().length() <= root.getPath()
380:                                    .length()) {
381:                                break;
382:                            }
383:
384:                            java.io.File tmpFile = dir.getParentFile();
385:
386:                            dir.delete();
387:                            logger.info("rmdir: " + dir.getPath());
388:
389:                            if (tmpFile == null) {
390:                                break;
391:                            }
392:                            dir = new File(tmpFile);
393:
394:                            dirList = dir.list();
395:                        }
396:                    }
397:                }
398:            }
399:
400:            public int getBufferSize() {
401:                return _bufferSize;
402:            }
403:
404:            public boolean getDownloadChecksums() {
405:                return _downloadChecksums;
406:            }
407:
408:            public ID3Tag getID3v1Tag(String path) throws IOException {
409:                String absPath = _roots.getFile(path).getAbsolutePath();
410:                logger.warn("Extracting ID3Tag info from: " + absPath);
411:
412:                MP3File mp3 = null;
413:                try {
414:                    mp3 = new MP3File(absPath, "r");
415:
416:                    if (!mp3.hasID3v1Tag) {
417:                        mp3.close();
418:                        throw new IOException("No id3tag found for " + absPath);
419:                    }
420:
421:                    ID3Tag id3tag = mp3.readID3v1Tag();
422:                    mp3.close();
423:
424:                    return id3tag;
425:                } catch (FileNotFoundException e) {
426:                    logger.warn("FileNotFoundException: ", e);
427:                } catch (IOException e) {
428:                    logger.warn("IOException: ", e);
429:                } finally {
430:                    if (mp3 != null)
431:                        mp3.close();
432:                }
433:
434:                return null;
435:            }
436:
437:            public RootCollection getRoots() {
438:                return _roots;
439:            }
440:
441:            private LightSFVFile getSFVFile(String path) throws IOException {
442:                return new LightSFVFile(new BufferedReader(new FileReader(
443:                        _roots.getFile(path))));
444:            }
445:
446:            private String getDIZFile(String path) throws IOException {
447:                ZipEntry zipEntry = null;
448:                ZipInputStream zipInput = null;
449:                byte[] buf = new byte[20 * 1024];
450:                int numRd;
451:                try {
452:
453:                    zipInput = new ZipInputStream(new BufferedInputStream(
454:                            new FileInputStream(_roots.getFile(path))));
455:
456:                    // Access a list of all of the files in the zip archive
457:                    while ((zipEntry = zipInput.getNextEntry()) != null) {
458:                        // Is this entry a DIZ file?
459:                        if (zipEntry.getName().toLowerCase().endsWith(".diz")) {
460:                            // Read 20 KBytes from the DIZ file, hopefully this
461:                            // will be the entire file.
462:                            numRd = zipInput.read(buf, 0, 20 * 1024);
463:
464:                            if (numRd > 0) {
465:                                return new String(buf, 0, numRd);
466:                            } else {
467:                                throw new FileNotFoundException(
468:                                        "0 bytes read from .zip file - " + path);
469:                            }
470:
471:                        }
472:                    }
473:                } catch (Throwable t) {
474:                    logger.error("Error extracting .diz from zipfile", t);
475:                } finally {
476:                    try {
477:                        if (zipInput != null) {
478:                            zipInput.close();
479:                        }
480:                    } catch (IOException e) {
481:                    }
482:                }
483:                throw new FileNotFoundException("No diz entry in - " + path);
484:            }
485:
486:            // public LinkedRemoteFile getSlaveRoot() throws IOException {
487:            // return Slave.getDefaultRoot(_roots);
488:            // }
489:
490:            public DiskStatus getDiskStatus() {
491:                return new DiskStatus(_roots.getTotalDiskSpaceAvailable(),
492:                        _roots.getTotalDiskSpaceCapacity());
493:            }
494:
495:            public Transfer getTransfer(TransferIndex index) {
496:                synchronized (_transfers) {
497:                    return (Transfer) _transfers.get(index);
498:                }
499:            }
500:
501:            public boolean getUploadChecksums() {
502:                return _uploadChecksums;
503:            }
504:
505:            private AsyncResponse handleChecksum(AsyncCommandArgument ac) {
506:                try {
507:                    return new AsyncResponseChecksum(ac.getIndex(), checkSum(ac
508:                            .getArgs()));
509:                } catch (IOException e) {
510:                    return new AsyncResponseException(ac.getIndex(), e);
511:                }
512:            }
513:
514:            private AsyncResponse handleCommand(AsyncCommand ac) {
515:                if (ac.getName().equals("remerge")) {
516:                    return handleRemerge((AsyncCommandArgument) ac);
517:                }
518:
519:                if (ac.getName().equals("checksum")) {
520:                    return handleChecksum((AsyncCommandArgument) ac);
521:                }
522:
523:                if (ac.getName().equals("connect")) {
524:                    return handleConnect((AsyncCommandArgument) ac);
525:                }
526:
527:                if (ac.getName().equals("delete")) {
528:                    return handleDelete((AsyncCommandArgument) ac);
529:                }
530:
531:                if (ac.getName().equals("id3tag")) {
532:                    return handleID3Tag((AsyncCommandArgument) ac);
533:                }
534:
535:                if (ac.getName().equals("listen")) {
536:                    return handleListen((AsyncCommandArgument) ac);
537:                }
538:
539:                if (ac.getName().equals("maxpath")) {
540:                    return handleMaxpath(ac);
541:                }
542:
543:                if (ac.getName().equals("ping")) {
544:                    return handlePing(ac);
545:                }
546:
547:                if (ac.getName().equals("receive")) {
548:                    return handleReceive((AsyncCommandArgument) ac);
549:                }
550:
551:                if (ac.getName().equals("rename")) {
552:                    return handleRename((AsyncCommandArgument) ac);
553:                }
554:
555:                if (ac.getName().equals("sfvfile")) {
556:                    return handleSfvFile((AsyncCommandArgument) ac);
557:                }
558:
559:                if (ac.getName().equals("dizfile")) {
560:                    return handleDIZFile((AsyncCommandArgument) ac);
561:                }
562:
563:                if (ac.getName().equals("send")) {
564:                    return handleSend((AsyncCommandArgument) ac);
565:                }
566:
567:                if (ac.getName().equals("abort")) {
568:                    handleAbort((AsyncCommandArgument) ac);
569:
570:                    return null;
571:                }
572:
573:                if (ac.getIndex().equals("shutdown")) {
574:                    logger.info("The master has requested that I shutdown");
575:                    System.exit(0);
576:                }
577:
578:                if (ac.getIndex().equals("error")) {
579:                    System.err.println("error - " + ac);
580:                    System.exit(0);
581:                }
582:
583:                return new AsyncResponseException(ac.getIndex(), new Exception(
584:                        ac.getName() + " - Operation Not Supported"));
585:            }
586:
587:            private void handleAbort(AsyncCommandArgument aca) {
588:                String[] args = aca.getArgs().split(",");
589:                TransferIndex ti = new TransferIndex(Integer.parseInt(args[0]));
590:
591:                if (!_transfers.containsKey(ti)) {
592:                    return;
593:                }
594:
595:                Transfer t = (Transfer) _transfers.get(ti);
596:                t.abort(args[1]);
597:            }
598:
599:            private AsyncResponse handleConnect(AsyncCommandArgument ac) {
600:                String[] data = ac.getArgs().split(",");
601:                String[] data2 = data[0].split(":");
602:                boolean encrypted = data[1].equals("true");
603:                boolean useSSLClientHandshake = data[2].equals("true");
604:                InetAddress address;
605:
606:                try {
607:                    address = InetAddress.getByName(data2[0]);
608:                } catch (UnknownHostException e1) {
609:                    return new AsyncResponseException(ac.getIndex(), e1);
610:                }
611:
612:                int port = Integer.parseInt(data2[1]);
613:                Transfer t = new Transfer(new ActiveConnection(encrypted ? _ctx
614:                        : null, new InetSocketAddress(address, port),
615:                        useSSLClientHandshake), this , new TransferIndex());
616:                addTransfer(t);
617:
618:                return new AsyncResponseTransfer(ac.getIndex(),
619:                        new ConnectInfo(port, t.getTransferIndex(), t
620:                                .getTransferStatus()));
621:            }
622:
623:            private AsyncResponse handleDelete(AsyncCommandArgument ac) {
624:                try {
625:                    try {
626:                        delete(mapPathToRenameQueue(ac.getArgs()));
627:                    } catch (PermissionDeniedException e) {
628:                        if (isWin32) {
629:                            synchronized (_renameQueue) {
630:                                _renameQueue.add(new QueuedOperation(ac
631:                                        .getArgs(), null));
632:                            }
633:                        } else {
634:                            throw e;
635:                        }
636:                    }
637:                    sendResponse(new AsyncResponseDiskStatus(getDiskStatus()));
638:                    return new AsyncResponse(ac.getIndex());
639:                } catch (IOException e) {
640:                    return new AsyncResponseException(ac.getIndex(), e);
641:                }
642:            }
643:
644:            private AsyncResponse handleID3Tag(AsyncCommandArgument ac) {
645:                try {
646:                    return new AsyncResponseID3Tag(ac.getIndex(),
647:                            getID3v1Tag(mapPathToRenameQueue(ac.getArgs())));
648:                } catch (IOException e) {
649:                    return new AsyncResponseException(ac.getIndex(), e);
650:                }
651:            }
652:
653:            private AsyncResponse handleListen(AsyncCommandArgument ac) {
654:                String[] data = ac.getArgs().split(":");
655:                boolean encrypted = data[0].equals("true");
656:                boolean useSSLClientMode = data[1].equals("true");
657:                PassiveConnection c = null;
658:
659:                try {
660:                    c = new PassiveConnection(encrypted ? _ctx : null,
661:                            _portRange, useSSLClientMode);
662:                } catch (IOException e) {
663:                    return new AsyncResponseException(ac.getIndex(), e);
664:                }
665:
666:                Transfer t = new Transfer(c, this , new TransferIndex());
667:                addTransfer(t);
668:
669:                return new AsyncResponseTransfer(ac.getIndex(),
670:                        new ConnectInfo(c.getLocalPort(), t.getTransferIndex(),
671:                                t.getTransferStatus()));
672:            }
673:
674:            private AsyncResponse handleMaxpath(AsyncCommand ac) {
675:                return new AsyncResponseMaxPath(ac.getIndex(), isWin32 ? 255
676:                        : Integer.MAX_VALUE);
677:            }
678:
679:            private AsyncResponse handlePing(AsyncCommand ac) {
680:                return new AsyncResponse(ac.getIndex());
681:            }
682:
683:            private AsyncResponse handleReceive(AsyncCommandArgument ac) {
684:                StringTokenizer st = new StringTokenizer(ac.getArgs(), ",");
685:                char type = st.nextToken().charAt(0);
686:                long position = Long.parseLong(st.nextToken());
687:                TransferIndex transferIndex = new TransferIndex(Integer
688:                        .parseInt(st.nextToken()));
689:                String path = mapPathToRenameQueue(st.nextToken());
690:                String fileName = path.substring(path.lastIndexOf("/") + 1);
691:                String dirName = path.substring(0, path.lastIndexOf("/"));
692:                Transfer t = getTransfer(transferIndex);
693:                sendResponse(new AsyncResponse(ac.getIndex())); // return calling thread
694:                // on master
695:                try {
696:                    return new AsyncResponseTransferStatus(t.receiveFile(
697:                            dirName, type, fileName, position));
698:                } catch (IOException e) {
699:                    return (new AsyncResponseTransferStatus(new TransferStatus(
700:                            transferIndex, e)));
701:                }
702:            }
703:
704:            private AsyncResponse handleRemerge(AsyncCommandArgument ac) {
705:                try {
706:                    handleRemergeRecursive(new FileRemoteFile(_roots));
707:
708:                    return new AsyncResponse(ac.getIndex());
709:                } catch (Throwable e) {
710:                    logger.error("Exception during merging", e);
711:
712:                    return new AsyncResponseException(ac.getIndex(), e);
713:                }
714:            }
715:
716:            private void handleRemergeRecursive(FileRemoteFile dir) {
717:                //sendResponse(new AsyncResponseRemerge(file.getPath(),
718:                // file.getFiles()));
719:                CaseInsensitiveHashtable mergeFiles = new CaseInsensitiveHashtable();
720:
721:                Collection files = dir.getFiles();
722:
723:                for (Iterator iter = files.iterator(); iter.hasNext();) {
724:                    FileRemoteFile file = (FileRemoteFile) iter.next();
725:
726:                    // need to send directories and files
727:                    mergeFiles.put(file.getName(), new LightRemoteFile(file));
728:
729:                    //keep only dirs for recursiveness
730:                    if (!file.isDirectory()) {
731:                        iter.remove();
732:                    }
733:                }
734:
735:                sendResponse(new AsyncResponseRemerge(dir.getPath(), mergeFiles));
736:
737:                for (Iterator iter = files.iterator(); iter.hasNext();) {
738:                    FileRemoteFile file = (FileRemoteFile) iter.next();
739:                    handleRemergeRecursive(file);
740:                }
741:            }
742:
743:            private AsyncResponse handleRename(AsyncCommandArgument ac) {
744:                StringTokenizer st = new StringTokenizer(ac.getArgs(), ",");
745:                String from = mapPathToRenameQueue(st.nextToken());
746:                String toDir = st.nextToken();
747:                String toFile = st.nextToken();
748:
749:                try {
750:                    try {
751:                        rename(from, toDir, toFile);
752:                    } catch (PermissionDeniedException e) {
753:                        if (isWin32) {
754:                            String simplePath = null;
755:                            if (toDir.endsWith("/")) {
756:                                simplePath = toDir + toFile;
757:                            } else {
758:                                simplePath = toDir + "/" + toFile;
759:                            }
760:                            synchronized (_renameQueue) {
761:                                _renameQueue.add(new QueuedOperation(from,
762:                                        simplePath));
763:                            }
764:                        } else {
765:                            throw e;
766:                        }
767:                    }
768:
769:                    return new AsyncResponse(ac.getIndex());
770:                } catch (IOException e) {
771:                    return new AsyncResponseException(ac.getIndex(), e);
772:                }
773:            }
774:
775:            private AsyncResponse handleSend(AsyncCommandArgument ac) {
776:                StringTokenizer st = new StringTokenizer(ac.getArgs(), ",");
777:                char type = st.nextToken().charAt(0);
778:                long position = Long.parseLong(st.nextToken());
779:                TransferIndex transferIndex = new TransferIndex(Integer
780:                        .parseInt(st.nextToken()));
781:                String path = mapPathToRenameQueue(st.nextToken());
782:                Transfer t = getTransfer(transferIndex);
783:                sendResponse(new AsyncResponse(ac.getIndex())); // return
784:
785:                // calling thread on master
786:                try {
787:                    return new AsyncResponseTransferStatus(t.sendFile(path,
788:                            type, position));
789:                } catch (IOException e) {
790:                    return new AsyncResponseTransferStatus(new TransferStatus(t
791:                            .getTransferIndex(), e));
792:                }
793:            }
794:
795:            private AsyncResponse handleSfvFile(AsyncCommandArgument ac) {
796:                try {
797:                    return new AsyncResponseSFVFile(ac.getIndex(),
798:                            getSFVFile(mapPathToRenameQueue(ac.getArgs())));
799:                } catch (IOException e) {
800:                    return new AsyncResponseException(ac.getIndex(), e);
801:                }
802:            }
803:
804:            private AsyncResponse handleDIZFile(AsyncCommandArgument ac) {
805:                try {
806:                    return new AsyncResponseDIZFile(ac.getIndex(),
807:                            getDIZFile(ac.getArgs()));
808:                } catch (IOException e) {
809:                    return new AsyncResponseException(ac.getIndex(), e);
810:                }
811:            }
812:
813:            private void listenForCommands() throws IOException {
814:                long lastCommandReceived = System.currentTimeMillis();
815:                while (true) {
816:                    AsyncCommand ac = null;
817:
818:                    try {
819:                        ac = (AsyncCommand) _sin.readObject();
820:
821:                        if (ac == null) {
822:                            continue;
823:                        }
824:                        lastCommandReceived = System.currentTimeMillis();
825:                    } catch (ClassNotFoundException e) {
826:                        throw new RuntimeException(e);
827:                    } catch (EOFException e) {
828:                        logger
829:                                .debug("Lost connection to the master, may have been kicked offline");
830:                        return;
831:                    } catch (SocketTimeoutException e) {
832:                        // if no communication for slave.timeout (_timeout) time, than
833:                        // connection to the master is dead or there is a configuration
834:                        // error
835:                        if (_timeout < (System.currentTimeMillis() - lastCommandReceived)) {
836:                            logger
837:                                    .error("Slave is going offline as it hasn't received any communication from the master in "
838:                                            + (System.currentTimeMillis() - lastCommandReceived)
839:                                            + " milliseconds");
840:                            throw new RuntimeException(e);
841:                        }
842:                        continue;
843:                    }
844:
845:                    logger.debug("Slave fetched " + ac);
846:                    class AsyncCommandHandler implements  Runnable {
847:                        private AsyncCommand _command = null;
848:
849:                        public AsyncCommandHandler(AsyncCommand command) {
850:                            _command = command;
851:                        }
852:
853:                        public void run() {
854:                            try {
855:                                sendResponse(handleCommand(_command));
856:                            } catch (Throwable e) {
857:                                sendResponse(new AsyncResponseException(
858:                                        _command.getIndex(), e));
859:                            }
860:                        }
861:                    }
862:                    Thread t = new Thread(new AsyncCommandHandler(ac));
863:                    t.setName("AsyncCommandHandler - " + ac.getClass());
864:                    t.start();
865:                }
866:            }
867:
868:            public String mapPathToRenameQueue(String path) {
869:                if (!isWin32) { // there is no renameQueue
870:                    return path;
871:                }
872:                synchronized (_renameQueue) {
873:                    for (Iterator iter = _renameQueue.iterator(); iter
874:                            .hasNext();) {
875:                        QueuedOperation qo = (QueuedOperation) iter.next();
876:                        if (qo.getDestination() == null) {
877:                            continue;
878:                        }
879:                        if (qo.getDestination().equals(path)) {
880:                            return qo.getSource();
881:                        }
882:                    }
883:                    return path;
884:                }
885:            }
886:
887:            public void removeTransfer(Transfer transfer) {
888:                synchronized (_transfers) {
889:                    if (_transfers.remove(transfer.getTransferIndex()) == null) {
890:                        throw new IllegalStateException();
891:                    }
892:                    _transfers.notifyAll();
893:                }
894:            }
895:
896:            public void rename(String from, String toDirPath, String toName)
897:                    throws IOException {
898:                for (Iterator iter = _roots.iterator(); iter.hasNext();) {
899:                    Root root = (Root) iter.next();
900:
901:                    File fromfile = root.getFile(from);
902:
903:                    if (!fromfile.exists()) {
904:                        continue;
905:                    }
906:
907:                    File toDir = root.getFile(toDirPath);
908:                    toDir.mkdirs();
909:
910:                    File tofile = new File(toDir.getPath() + File.separator
911:                            + toName);
912:
913:                    //!win32 == true on linux
914:                    //!win32 && equalsignore == true on win32
915:                    if (tofile.exists()
916:                            && !(isWin32 && fromfile.getName()
917:                                    .equalsIgnoreCase(toName))) {
918:                        throw new FileExistsException("cannot rename from "
919:                                + fromfile + " to " + tofile
920:                                + ", destination exists");
921:                    }
922:
923:                    if (!fromfile.renameTo(tofile)) {
924:                        throw new PermissionDeniedException("renameTo("
925:                                + fromfile + ", " + tofile + ") failed");
926:                    }
927:                }
928:            }
929:
930:            protected synchronized void sendResponse(AsyncResponse response) {
931:                if (response == null) {
932:                    // handler doesn't return anything or it sends reply on it's own
933:                    // (threaded for example)
934:                    return;
935:                }
936:
937:                try {
938:                    _sout.writeObject(response);
939:                    _sout.flush();
940:                    _sout.reset();
941:                    if (!(response instanceof  AsyncResponseTransferStatus)) {
942:                        logger.debug("Slave wrote response - " + response);
943:                    }
944:
945:                    if (response instanceof  AsyncResponseException) {
946:                        logger.debug("", ((AsyncResponseException) response)
947:                                .getThrowable());
948:                    }
949:                } catch (IOException e) {
950:                    throw new RuntimeException(e);
951:                }
952:            }
953:
954:            /**
955:             * @return The current list of Transfer objects
956:             */
957:            public ArrayList getTransfers() {
958:                synchronized (_transfers) {
959:                    return new ArrayList(_transfers.values());
960:                }
961:            }
962:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.