Source Code Cross Referenced for RemoteSlave.java in  » Net » DrFTPD » org » drftpd » master » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » DrFTPD » org.drftpd.master 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * This file is part of DrFTPD, Distributed FTP Daemon.
0003:         *
0004:         * DrFTPD is free software; you can redistribute it and/or modify
0005:         * it under the terms of the GNU General Public License as published by
0006:         * the Free Software Foundation; either version 2 of the License, or
0007:         * (at your option) any later version.
0008:         *
0009:         * DrFTPD is distributed in the hope that it will be useful,
0010:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0011:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0012:         * GNU General Public License for more details.
0013:         *
0014:         * You should have received a copy of the GNU General Public License
0015:         * along with DrFTPD; if not, write to the Free Software
0016:         * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
0017:         */
0018:        package org.drftpd.master;
0019:
0020:        import java.beans.DefaultPersistenceDelegate;
0021:        import java.beans.ExceptionListener;
0022:        import java.beans.IntrospectionException;
0023:        import java.beans.Introspector;
0024:        import java.beans.PropertyDescriptor;
0025:        import java.beans.XMLEncoder;
0026:        import java.io.FileNotFoundException;
0027:        import java.io.IOException;
0028:        import java.io.ObjectInputStream;
0029:        import java.io.ObjectOutputStream;
0030:        import java.io.Serializable;
0031:        import java.net.Socket;
0032:        import java.net.SocketException;
0033:        import java.net.SocketTimeoutException;
0034:        import java.util.ArrayList;
0035:        import java.util.Collection;
0036:        import java.util.Collections;
0037:        import java.util.EmptyStackException;
0038:        import java.util.HashMap;
0039:        import java.util.Hashtable;
0040:        import java.util.Iterator;
0041:        import java.util.LinkedList;
0042:        import java.util.Properties;
0043:        import java.util.Stack;
0044:        import java.util.StringTokenizer;
0045:
0046:        import net.sf.drftpd.DuplicateElementException;
0047:        import net.sf.drftpd.FatalException;
0048:        import net.sf.drftpd.SlaveUnavailableException;
0049:        import net.sf.drftpd.event.SlaveEvent;
0050:
0051:        import org.apache.log4j.Logger;
0052:        import org.apache.oro.text.regex.MalformedPatternException;
0053:        import org.drftpd.GlobalContext;
0054:        import org.drftpd.LightSFVFile;
0055:        import org.drftpd.dynamicdata.Key;
0056:        import org.drftpd.dynamicdata.KeyNotFoundException;
0057:        import org.drftpd.id3.ID3Tag;
0058:        import org.drftpd.io.SafeFileOutputStream;
0059:        import org.drftpd.remotefile.LinkedRemoteFileInterface;
0060:        import org.drftpd.slave.ConnectInfo;
0061:        import org.drftpd.slave.DiskStatus;
0062:        import org.drftpd.slave.RemoteIOException;
0063:        import org.drftpd.slave.SlaveStatus;
0064:        import org.drftpd.slave.Transfer;
0065:        import org.drftpd.slave.TransferIndex;
0066:        import org.drftpd.slave.TransferStatus;
0067:        import org.drftpd.slave.async.AsyncCommand;
0068:        import org.drftpd.slave.async.AsyncCommandArgument;
0069:        import org.drftpd.slave.async.AsyncResponse;
0070:        import org.drftpd.slave.async.AsyncResponseChecksum;
0071:        import org.drftpd.slave.async.AsyncResponseDIZFile;
0072:        import org.drftpd.slave.async.AsyncResponseDiskStatus;
0073:        import org.drftpd.slave.async.AsyncResponseException;
0074:        import org.drftpd.slave.async.AsyncResponseID3Tag;
0075:        import org.drftpd.slave.async.AsyncResponseMaxPath;
0076:        import org.drftpd.slave.async.AsyncResponseRemerge;
0077:        import org.drftpd.slave.async.AsyncResponseSFVFile;
0078:        import org.drftpd.slave.async.AsyncResponseTransfer;
0079:        import org.drftpd.slave.async.AsyncResponseTransferStatus;
0080:        import org.drftpd.usermanager.Entity;
0081:        import org.drftpd.usermanager.HostMask;
0082:        import org.drftpd.usermanager.HostMaskCollection;
0083:
0084:        /**
0085:         * @author mog
0086:         * @author zubov
0087:         * @version $Id: RemoteSlave.java 1538 2006-12-14 20:55:37Z zubov $
0088:         */
0089:        public class RemoteSlave implements  Runnable, Comparable<RemoteSlave>,
0090:                Serializable, Entity {
0091:            private static final long serialVersionUID = -6973935289361817125L;
0092:
0093:            private final String[] transientFields = { "available",
0094:                    "lastDownloadSending", "lastUploadReceiving" };
0095:
0096:            private static final Logger logger = Logger
0097:                    .getLogger(RemoteSlave.class);
0098:
0099:            private transient boolean _isAvailable;
0100:
0101:            protected transient int _errors;
0102:
0103:            private transient GlobalContext _gctx;
0104:
0105:            private transient long _lastDownloadSending = 0;
0106:
0107:            protected transient long _lastNetworkError;
0108:
0109:            private transient long _lastUploadReceiving = 0;
0110:
0111:            private transient long _lastResponseReceived = System
0112:                    .currentTimeMillis();
0113:
0114:            private transient long _lastCommandSent = System
0115:                    .currentTimeMillis();
0116:
0117:            private transient int _maxPath;
0118:
0119:            private transient String _name;
0120:
0121:            private transient DiskStatus _status;
0122:
0123:            private HostMaskCollection _ipMasks;
0124:
0125:            private Properties _keysAndValues;
0126:
0127:            private LinkedList<QueuedOperation> _renameQueue;
0128:
0129:            private transient Stack<String> _indexPool;
0130:
0131:            private transient HashMap<String, AsyncResponse> _indexWithCommands;
0132:
0133:            private transient ObjectInputStream _sin;
0134:
0135:            private transient Socket _socket;
0136:
0137:            private transient ObjectOutputStream _sout;
0138:
0139:            private transient HashMap<TransferIndex, RemoteTransfer> _transfers;
0140:
0141:            public RemoteSlave(String name) {
0142:                _name = name;
0143:                _keysAndValues = new Properties();
0144:                _ipMasks = new HostMaskCollection();
0145:                _renameQueue = new LinkedList<QueuedOperation>();
0146:            }
0147:
0148:            /**
0149:             * Used by everything including tests
0150:             */
0151:            public RemoteSlave(String name, GlobalContext gctx) {
0152:                this (name);
0153:                _gctx = gctx;
0154:                commit();
0155:            }
0156:
0157:            public final static Hashtable rslavesToHashtable(Collection rslaves) {
0158:                Hashtable<String, RemoteSlave> map = new Hashtable<String, RemoteSlave>(
0159:                        rslaves.size());
0160:
0161:                for (Iterator iter = rslaves.iterator(); iter.hasNext();) {
0162:                    RemoteSlave rslave = (RemoteSlave) iter.next();
0163:                    map.put(rslave.getName(), rslave);
0164:                }
0165:
0166:                return map;
0167:            }
0168:
0169:            public void addMask(String mask) throws DuplicateElementException {
0170:                _ipMasks.addMask(mask);
0171:                commit();
0172:            }
0173:
0174:            /**
0175:             * If X # of errors occur in Y amount of time, kick slave offline
0176:             */
0177:            public final void addNetworkError(SocketException e) {
0178:                // set slave offline if too many network errors
0179:                long errortimeout = Long.parseLong(getProperty("errortimeout",
0180:                        "60000")); // one
0181:                // minute
0182:
0183:                if (errortimeout <= 0) {
0184:                    errortimeout = 60000;
0185:                }
0186:
0187:                int maxerrors = Integer.parseInt(getProperty("maxerrors", "5"));
0188:
0189:                if (maxerrors < 0) {
0190:                    maxerrors = 5;
0191:                }
0192:
0193:                _errors -= ((System.currentTimeMillis() - _lastNetworkError) / errortimeout);
0194:
0195:                if (_errors < 0) {
0196:                    _errors = 0;
0197:                }
0198:
0199:                _errors++;
0200:                _lastNetworkError = System.currentTimeMillis();
0201:
0202:                if (_errors > maxerrors) {
0203:                    setOffline("Too many network errors - " + e.getMessage());
0204:                    logger.error("Too many network errors - " + e);
0205:                }
0206:            }
0207:
0208:            protected void addQueueDelete(String fileName) {
0209:                addQueueRename(fileName, null);
0210:            }
0211:
0212:            protected void addQueueRename(String fileName, String destName) {
0213:                if (isOnline()) {
0214:                    throw new IllegalStateException(
0215:                            "Slave is online, you cannot queue an operation");
0216:                }
0217:                _renameQueue.add(new QueuedOperation(fileName, destName));
0218:                commit();
0219:            }
0220:
0221:            public void setProperty(String name, String value) {
0222:                synchronized (_keysAndValues) {
0223:                    _keysAndValues.setProperty(name, value);
0224:                    commit();
0225:                }
0226:            }
0227:
0228:            public String getProperty(String name, String def) {
0229:                synchronized (_keysAndValues) {
0230:                    return _keysAndValues.getProperty(name, def);
0231:                }
0232:            }
0233:
0234:            public Properties getProperties() {
0235:                synchronized (_keysAndValues) {
0236:                    return (Properties) _keysAndValues.clone();
0237:                }
0238:            }
0239:
0240:            /** 
0241:             * Needed in order for this class to be a Bean
0242:             */
0243:            public void setProperties(Properties keysAndValues) {
0244:                _keysAndValues = keysAndValues;
0245:            }
0246:
0247:            public void commit() {
0248:                try {
0249:
0250:                    XMLEncoder out = new XMLEncoder(new SafeFileOutputStream(
0251:                            (getGlobalContext().getSlaveManager()
0252:                                    .getSlaveFile(this .getName()))));
0253:                    out.setExceptionListener(new ExceptionListener() {
0254:                        public void exceptionThrown(Exception e) {
0255:                            logger.warn("", e);
0256:                        }
0257:                    });
0258:                    out.setPersistenceDelegate(Key.class,
0259:                            new DefaultPersistenceDelegate(new String[] {
0260:                                    "owner", "key", "type" }));
0261:                    out.setPersistenceDelegate(HostMask.class,
0262:                            new DefaultPersistenceDelegate(
0263:                                    new String[] { "mask" }));
0264:                    out.setPersistenceDelegate(RemoteSlave.class,
0265:                            new DefaultPersistenceDelegate(
0266:                                    new String[] { "name" }));
0267:                    out.setPersistenceDelegate(QueuedOperation.class,
0268:                            new DefaultPersistenceDelegate(new String[] {
0269:                                    "source", "destination" }));
0270:                    try {
0271:                        PropertyDescriptor[] pdArr = Introspector.getBeanInfo(
0272:                                RemoteSlave.class).getPropertyDescriptors();
0273:                        ArrayList<String> transientList = new ArrayList<String>();
0274:                        for (int x = 0; x < transientFields.length; x++) {
0275:                            transientList.add(transientFields[x]);
0276:                        }
0277:                        for (int x = 0; x < pdArr.length; x++) {
0278:                            if (transientList.contains(pdArr[x].getName())) {
0279:                                pdArr[x].setValue("transient", Boolean.TRUE);
0280:                            }
0281:                        }
0282:                    } catch (IntrospectionException e1) {
0283:                        logger.error("I don't know what to do here", e1);
0284:                        throw new RuntimeException(e1);
0285:                    }
0286:                    try {
0287:                        out.writeObject(this );
0288:                    } finally {
0289:                        out.close();
0290:                    }
0291:
0292:                    Logger.getLogger(RemoteSlave.class).debug(
0293:                            "wrote " + getName());
0294:                } catch (IOException ex) {
0295:                    throw new RuntimeException("Error writing slavefile for "
0296:                            + this .getName() + ": " + ex.getMessage(), ex);
0297:                }
0298:            }
0299:
0300:            public final int compareTo(RemoteSlave o) {
0301:                return getName().compareTo(o.getName());
0302:            }
0303:
0304:            public final boolean equals(Object obj) {
0305:                try {
0306:                    return ((RemoteSlave) obj).getName().equals(getName());
0307:                } catch (NullPointerException e) {
0308:                    return false;
0309:                }
0310:            }
0311:
0312:            public GlobalContext getGlobalContext() {
0313:                return _gctx;
0314:            }
0315:
0316:            public final long getLastDownloadSending() {
0317:                return _lastDownloadSending;
0318:            }
0319:
0320:            public final long getLastTransfer() {
0321:                return Math.max(getLastDownloadSending(),
0322:                        getLastUploadReceiving());
0323:            }
0324:
0325:            public long getLastTransferForDirection(char dir) {
0326:                if (dir == Transfer.TRANSFER_RECEIVING_UPLOAD) {
0327:                    return getLastUploadReceiving();
0328:                } else if (dir == Transfer.TRANSFER_SENDING_DOWNLOAD) {
0329:                    return getLastDownloadSending();
0330:                } else if (dir == Transfer.TRANSFER_THROUGHPUT) {
0331:                    return getLastTransfer();
0332:                } else {
0333:                    throw new IllegalArgumentException();
0334:                }
0335:            }
0336:
0337:            public final long getLastUploadReceiving() {
0338:                return _lastUploadReceiving;
0339:            }
0340:
0341:            public HostMaskCollection getMasks() {
0342:                return _ipMasks;
0343:            }
0344:
0345:            public void setMasks(HostMaskCollection masks) {
0346:                _ipMasks = masks;
0347:            }
0348:
0349:            /**
0350:             * Returns the name.
0351:             */
0352:            public String getName() {
0353:                return _name;
0354:            }
0355:
0356:            /**
0357:             * Returns the RemoteSlave's saved SlaveStatus, can return a status before
0358:             * remerge() is completed
0359:             */
0360:            public synchronized SlaveStatus getSlaveStatus()
0361:                    throws SlaveUnavailableException {
0362:                if ((_status == null) || !isOnline()) {
0363:                    throw new SlaveUnavailableException();
0364:                }
0365:                int throughputUp = 0;
0366:                int throughputDown = 0;
0367:                int transfersUp = 0;
0368:                int transfersDown = 0;
0369:                long bytesReceived;
0370:                long bytesSent;
0371:
0372:                synchronized (_transfers) {
0373:                    bytesReceived = getReceivedBytes();
0374:                    bytesSent = getSentBytes();
0375:
0376:                    for (Iterator i = _transfers.values().iterator(); i
0377:                            .hasNext();) {
0378:                        RemoteTransfer transfer = (RemoteTransfer) i.next();
0379:                        switch (transfer.getState()) {
0380:                        case Transfer.TRANSFER_RECEIVING_UPLOAD:
0381:                            throughputUp += transfer.getXferSpeed();
0382:                            bytesReceived += transfer.getTransfered();
0383:                            transfersUp += 1;
0384:                            break;
0385:
0386:                        case Transfer.TRANSFER_SENDING_DOWNLOAD:
0387:                            throughputDown += transfer.getXferSpeed();
0388:                            transfersDown += 1;
0389:                            bytesSent += transfer.getTransfered();
0390:                            break;
0391:
0392:                        case Transfer.TRANSFER_UNKNOWN:
0393:                        case Transfer.TRANSFER_THROUGHPUT:
0394:                            break;
0395:
0396:                        default:
0397:                            throw new FatalException(
0398:                                    "unrecognized direction - "
0399:                                            + transfer.getState() + " for "
0400:                                            + transfer);
0401:                        }
0402:                    }
0403:                }
0404:
0405:                return new SlaveStatus(_status, bytesSent, bytesReceived,
0406:                        throughputUp, transfersUp, throughputDown,
0407:                        transfersDown);
0408:            }
0409:
0410:            public long getSentBytes() {
0411:                return Long.parseLong(getProperty("bytesSent", "0"));
0412:            }
0413:
0414:            public long getReceivedBytes() {
0415:                return Long.parseLong(getProperty("bytesReceived", "0"));
0416:            }
0417:
0418:            /**
0419:             * Returns the RemoteSlave's stored SlaveStatus, will not return a status
0420:             * before remerge() is completed
0421:             */
0422:            public synchronized SlaveStatus getSlaveStatusAvailable()
0423:                    throws SlaveUnavailableException {
0424:                if (isAvailable()) {
0425:                    return getSlaveStatus();
0426:                }
0427:
0428:                throw new SlaveUnavailableException("Slave is not online");
0429:            }
0430:
0431:            public final int hashCode() {
0432:                return getName().hashCode();
0433:            }
0434:
0435:            /**
0436:             * Called when the slave connects
0437:             */
0438:            private void initializeSlaveAfterThreadIsRunning()
0439:                    throws IOException, SlaveUnavailableException {
0440:                commit();
0441:                processQueue();
0442:
0443:                String maxPathIndex = issueMaxPathToSlave();
0444:                _maxPath = fetchMaxPathFromIndex(maxPathIndex);
0445:                logger.debug("maxpath was received");
0446:
0447:                String remergeIndex = issueRemergeToSlave("/");
0448:                fetchRemergeResponseFromIndex(remergeIndex);
0449:                getGlobalContext().getSlaveManager().putRemergeQueue(
0450:                        new RemergeMessage(this ));
0451:                setAvailable(true);
0452:                logger.info("Slave added: '" + getName() + "' status: "
0453:                        + _status);
0454:                getGlobalContext().dispatchFtpEvent(
0455:                        new SlaveEvent("ADDSLAVE", this ));
0456:            }
0457:
0458:            /**
0459:             * @return true if the slave has synchronized its filelist since last
0460:             *                 connect
0461:             */
0462:            public synchronized boolean isAvailable() {
0463:                return _isAvailable;
0464:            }
0465:
0466:            public boolean isAvailablePing() {
0467:                if (!isAvailable()) {
0468:                    return false;
0469:                }
0470:
0471:                try {
0472:                    String index = issuePingToSlave();
0473:                    fetchResponse(index);
0474:                } catch (SlaveUnavailableException e) {
0475:                    setOffline(e);
0476:                    return false;
0477:                } catch (RemoteIOException e) {
0478:                    setOffline("The slave encountered an IOException while running ping...this is almost not possible");
0479:                    return false;
0480:                }
0481:
0482:                return isAvailable();
0483:            }
0484:
0485:            public void processQueue() throws IOException,
0486:                    SlaveUnavailableException {
0487:                //no for-each loop, needs iter.remove()
0488:                for (Iterator<QueuedOperation> iter = _renameQueue.iterator(); iter
0489:                        .hasNext();) {
0490:                    QueuedOperation item = iter.next();
0491:                    String sourceFile = item.getSource();
0492:                    String destFile = item.getDestination();
0493:
0494:                    if (destFile == null) { // delete
0495:                        try {
0496:                            fetchResponse(issueDeleteToSlave(sourceFile),
0497:                                    300000);
0498:                        } catch (RemoteIOException e) {
0499:                            if (!(e.getCause() instanceof  FileNotFoundException)) {
0500:                                throw (IOException) e.getCause();
0501:                            }
0502:                        } finally {
0503:                            iter.remove();
0504:                            commit();
0505:                        }
0506:                    } else { // rename
0507:                        String fileName = destFile.substring(destFile
0508:                                .lastIndexOf("/") + 1);
0509:                        String destDir = destFile.substring(0, destFile
0510:                                .lastIndexOf("/"));
0511:                        try {
0512:                            fetchResponse(issueRenameToSlave(sourceFile,
0513:                                    destDir, fileName));
0514:                        } catch (RemoteIOException e) {
0515:                            if (!(e.getCause() instanceof  FileNotFoundException)) {
0516:                                throw (IOException) e.getCause();
0517:                            }
0518:                        } finally {
0519:                            iter.remove();
0520:                            commit();
0521:                        }
0522:                    }
0523:                }
0524:            }
0525:
0526:            /**
0527:             * @return true if the mask was removed successfully
0528:             */
0529:            public final boolean removeMask(String mask) {
0530:                boolean ret = _ipMasks.removeMask(mask);
0531:
0532:                if (ret) {
0533:                    commit();
0534:                }
0535:
0536:                return ret;
0537:            }
0538:
0539:            public synchronized void setAvailable(boolean available) {
0540:                _isAvailable = available;
0541:            }
0542:
0543:            public final void setLastDirection(char direction, long l) {
0544:                switch (direction) {
0545:                case Transfer.TRANSFER_RECEIVING_UPLOAD:
0546:                    setLastUploadReceiving(l);
0547:
0548:                    return;
0549:
0550:                case Transfer.TRANSFER_SENDING_DOWNLOAD:
0551:                    setLastDownloadSending(l);
0552:
0553:                    return;
0554:
0555:                default:
0556:                    throw new IllegalArgumentException();
0557:                }
0558:            }
0559:
0560:            public final void setLastDownloadSending(long lastDownloadSending) {
0561:                _lastDownloadSending = lastDownloadSending;
0562:            }
0563:
0564:            public final void setLastUploadReceiving(long lastUploadReceiving) {
0565:                _lastUploadReceiving = lastUploadReceiving;
0566:            }
0567:
0568:            /**
0569:             * Deletes files/directories and waits for the response
0570:             * Meant to be used if you don't want to utilize asynchronization
0571:             */
0572:            public void simpleDelete(String path) {
0573:                try {
0574:                    fetchResponse(issueDeleteToSlave(path), 300000);
0575:                } catch (RemoteIOException e) {
0576:                    if (e.getCause() instanceof  FileNotFoundException) {
0577:                        return;
0578:                    }
0579:
0580:                    setOffline("IOException deleting file, check logs for specific error");
0581:                    addQueueDelete(path);
0582:                    logger
0583:                            .error(
0584:                                    "IOException deleting file, file will be deleted when slave comes online",
0585:                                    e);
0586:                } catch (SlaveUnavailableException e) {
0587:                    // Already offline and we ARE successful in deleting the file
0588:                    addQueueDelete(path);
0589:                }
0590:            }
0591:
0592:            /**
0593:             * Renames files/directories and waits for the response
0594:             */
0595:            public void simpleRename(String from, String toDirPath,
0596:                    String toName) {
0597:                String simplePath = null;
0598:                if (toDirPath.endsWith("/")) {
0599:                    simplePath = toDirPath + toName;
0600:                } else {
0601:                    simplePath = toDirPath + "/" + toName;
0602:                }
0603:                try {
0604:                    fetchResponse(issueRenameToSlave(from, toDirPath, toName));
0605:                } catch (RemoteIOException e) {
0606:                    setOffline(e);
0607:                    addQueueRename(from, simplePath);
0608:                } catch (SlaveUnavailableException e) {
0609:                    addQueueRename(from, simplePath);
0610:                }
0611:            }
0612:
0613:            public String toString() {
0614:                return moreInfo();
0615:            }
0616:
0617:            public static String getSlaveNameFromObjectInput(
0618:                    ObjectInputStream in) throws IOException {
0619:                try {
0620:                    return (String) in.readObject();
0621:                } catch (ClassNotFoundException e) {
0622:                    throw new RuntimeException(e);
0623:                }
0624:            }
0625:
0626:            public synchronized void connect(Socket socket,
0627:                    ObjectInputStream in, ObjectOutputStream out)
0628:                    throws IOException {
0629:                _socket = socket;
0630:                _sout = out;
0631:                _sin = in;
0632:                _indexPool = new Stack<String>();
0633:
0634:                for (int i = 0; i < 256; i++) {
0635:                    String key = Integer.toHexString(i);
0636:
0637:                    if (key.length() < 2) {
0638:                        key = "0" + key;
0639:                    }
0640:
0641:                    _indexPool.push(key);
0642:                }
0643:
0644:                _indexWithCommands = new HashMap<String, AsyncResponse>();
0645:                _transfers = new HashMap<TransferIndex, RemoteTransfer>();
0646:                _errors = 0;
0647:                _lastNetworkError = System.currentTimeMillis();
0648:                start();
0649:                class RemergeThread implements  Runnable {
0650:                    public void run() {
0651:                        try {
0652:                            initializeSlaveAfterThreadIsRunning();
0653:                        } catch (IOException e) {
0654:                            setOffline(e);
0655:                        } catch (SlaveUnavailableException e) {
0656:                            setOffline(e);
0657:                        }
0658:                    }
0659:                }
0660:
0661:                new Thread(new RemergeThread(), "RemoteSlaveRemerge - "
0662:                        + getName()).start();
0663:            }
0664:
0665:            private void start() {
0666:                Thread t = new Thread(this );
0667:                t.setName("RemoteSlave - " + getName());
0668:                t.start();
0669:            }
0670:
0671:            public long fetchChecksumFromIndex(String index)
0672:                    throws RemoteIOException, SlaveUnavailableException {
0673:                return ((AsyncResponseChecksum) fetchResponse(index))
0674:                        .getChecksum();
0675:            }
0676:
0677:            public ID3Tag fetchID3TagFromIndex(String index)
0678:                    throws RemoteIOException, SlaveUnavailableException {
0679:                return ((AsyncResponseID3Tag) fetchResponse(index)).getTag();
0680:            }
0681:
0682:            private synchronized String fetchIndex()
0683:                    throws SlaveUnavailableException {
0684:                while (isOnline()) {
0685:                    try {
0686:                        return _indexPool.pop();
0687:                    } catch (EmptyStackException e) {
0688:                        logger
0689:                                .error("Too many commands sent, need to wait for the slave to process commands");
0690:                    }
0691:
0692:                    try {
0693:                        wait();
0694:                    } catch (InterruptedException e1) {
0695:                    }
0696:                }
0697:
0698:                throw new SlaveUnavailableException(
0699:                        "Slave was offline or went offline while fetching an index");
0700:            }
0701:
0702:            public int fetchMaxPathFromIndex(String maxPathIndex)
0703:                    throws SlaveUnavailableException {
0704:                try {
0705:                    return ((AsyncResponseMaxPath) fetchResponse(maxPathIndex))
0706:                            .getMaxPath();
0707:                } catch (RemoteIOException e) {
0708:                    throw new FatalException(
0709:                            "this is not possible, slave had an error processing maxpath...");
0710:                }
0711:            }
0712:
0713:            /**
0714:             * @see fetchResponse(String index, int wait)
0715:             */
0716:            public AsyncResponse fetchResponse(String index)
0717:                    throws SlaveUnavailableException, RemoteIOException {
0718:                return fetchResponse(index, 60 * 1000);
0719:            }
0720:
0721:            /**
0722:             * returns an AsyncResponse for that index and throws any exceptions thrown
0723:             * on the Slave side
0724:             */
0725:            public synchronized AsyncResponse fetchResponse(String index,
0726:                    int wait) throws SlaveUnavailableException,
0727:                    RemoteIOException {
0728:                long total = System.currentTimeMillis();
0729:
0730:                while (isOnline() && !_indexWithCommands.containsKey(index)) {
0731:                    try {
0732:                        wait(1000);
0733:
0734:                        // will wait a maximum of 1000 milliseconds before waking up
0735:                    } catch (InterruptedException e) {
0736:                    }
0737:
0738:                    if ((wait != 0)
0739:                            && ((System.currentTimeMillis() - total) >= wait)) {
0740:                        setOffline("Slave has taken too long while waiting for reply "
0741:                                + index);
0742:                    }
0743:                }
0744:
0745:                if (!isOnline()) {
0746:                    throw new SlaveUnavailableException(
0747:                            "Slave went offline while processing command");
0748:                }
0749:
0750:                AsyncResponse rar = _indexWithCommands.remove(index);
0751:                _indexPool.push(index);
0752:                notifyAll();
0753:
0754:                if (rar instanceof  AsyncResponseException) {
0755:                    Throwable t = ((AsyncResponseException) rar).getThrowable();
0756:
0757:                    if (t instanceof  IOException) {
0758:                        throw new RemoteIOException((IOException) t);
0759:                    }
0760:
0761:                    logger
0762:                            .error(
0763:                                    "Exception on slave that is unable to be handled by the master",
0764:                                    t);
0765:                    setOffline("Exception on slave that is unable to be handled by the master");
0766:                    throw new SlaveUnavailableException(
0767:                            "Exception on slave that is unable to be handled by the master");
0768:                }
0769:                return rar;
0770:            }
0771:
0772:            public String fetchDIZFileFromIndex(String index)
0773:                    throws RemoteIOException, SlaveUnavailableException {
0774:                return ((AsyncResponseDIZFile) fetchResponse(index)).getDIZ();
0775:            }
0776:
0777:            public LightSFVFile fetchSFVFileFromIndex(String index)
0778:                    throws RemoteIOException, SlaveUnavailableException {
0779:                return ((AsyncResponseSFVFile) fetchResponse(index)).getSFV();
0780:            }
0781:
0782:            public synchronized String getPASVIP()
0783:                    throws SlaveUnavailableException {
0784:                if (!isOnline())
0785:                    throw new SlaveUnavailableException();
0786:                return getProperty("pasv_addr", _socket.getInetAddress()
0787:                        .getHostAddress());
0788:            }
0789:
0790:            public int getPort() {
0791:                return _socket.getPort();
0792:            }
0793:
0794:            public synchronized boolean isOnline() {
0795:                return ((_socket != null) && _socket.isConnected());
0796:            }
0797:
0798:            /**
0799:             * 
0800:             * @param string
0801:             * @return
0802:             * @throws SlaveUnavailableException
0803:             */
0804:            public String issueChecksumToSlave(String string)
0805:                    throws SlaveUnavailableException {
0806:                String index = fetchIndex();
0807:                sendCommand(new AsyncCommandArgument(index, "checksum", string));
0808:
0809:                return index;
0810:            }
0811:
0812:            public String issueConnectToSlave(String ip, int port,
0813:                    boolean encryptedDataChannel, boolean useSSLClientHandshake)
0814:                    throws SlaveUnavailableException {
0815:                String index = fetchIndex();
0816:                sendCommand(new AsyncCommandArgument(index, "connect", ip + ":"
0817:                        + port + "," + encryptedDataChannel + ","
0818:                        + useSSLClientHandshake));
0819:
0820:                return index;
0821:            }
0822:
0823:            /**
0824:             * @return String index, needs to be used to fetch the response
0825:             */
0826:            public String issueDeleteToSlave(String sourceFile)
0827:                    throws SlaveUnavailableException {
0828:                String index = fetchIndex();
0829:                sendCommand(new AsyncCommandArgument(index, "delete",
0830:                        sourceFile));
0831:
0832:                return index;
0833:            }
0834:
0835:            public String issueID3TagToSlave(String path)
0836:                    throws SlaveUnavailableException {
0837:                String index = fetchIndex();
0838:                sendCommand(new AsyncCommandArgument(index, "id3tag", path));
0839:
0840:                return index;
0841:            }
0842:
0843:            public String issueListenToSlave(boolean isSecureTransfer,
0844:                    boolean useSSLClientMode) throws SlaveUnavailableException {
0845:                String index = fetchIndex();
0846:                sendCommand(new AsyncCommandArgument(index, "listen", ""
0847:                        + isSecureTransfer + ":" + useSSLClientMode));
0848:
0849:                return index;
0850:            }
0851:
0852:            public String issueMaxPathToSlave()
0853:                    throws SlaveUnavailableException {
0854:                String index = fetchIndex();
0855:                sendCommand(new AsyncCommand(index, "maxpath"));
0856:
0857:                return index;
0858:            }
0859:
0860:            private String issuePingToSlave() throws SlaveUnavailableException {
0861:                String index = fetchIndex();
0862:                sendCommand(new AsyncCommand(index, "ping"));
0863:
0864:                return index;
0865:            }
0866:
0867:            public String issueReceiveToSlave(String name, char c,
0868:                    long position, TransferIndex tindex)
0869:                    throws SlaveUnavailableException {
0870:                String index = fetchIndex();
0871:                sendCommand(new AsyncCommandArgument(index, "receive", c + ","
0872:                        + position + "," + tindex + "," + name));
0873:
0874:                return index;
0875:            }
0876:
0877:            public String issueRenameToSlave(String from, String toDirPath,
0878:                    String toName) throws SlaveUnavailableException {
0879:                if (toDirPath.length() == 0) { // needed for files in root
0880:                    toDirPath = "/";
0881:                }
0882:                String index = fetchIndex();
0883:                sendCommand(new AsyncCommandArgument(index, "rename", from
0884:                        + "," + toDirPath + "," + toName));
0885:
0886:                return index;
0887:            }
0888:
0889:            public String issueDIZFileToSlave(LinkedRemoteFileInterface file)
0890:                    throws SlaveUnavailableException {
0891:                String index = fetchIndex();
0892:                AsyncCommand ac = new AsyncCommandArgument(index, "dizfile",
0893:                        file.getPath());
0894:
0895:                sendCommand(ac);
0896:                return index;
0897:            }
0898:
0899:            public String issueSFVFileToSlave(String path)
0900:                    throws SlaveUnavailableException {
0901:                String index = fetchIndex();
0902:                AsyncCommand ac = new AsyncCommandArgument(index, "sfvfile",
0903:                        path);
0904:                sendCommand(ac);
0905:
0906:                return index;
0907:            }
0908:
0909:            public String issueStatusToSlave() throws SlaveUnavailableException {
0910:                String index = fetchIndex();
0911:                sendCommand(new AsyncCommand(index, "status"));
0912:
0913:                return index;
0914:            }
0915:
0916:            public String moreInfo() {
0917:                try {
0918:                    return getName() + ":address=[" + getPASVIP() + "]port=["
0919:                            + Integer.toString(getPort()) + "]";
0920:                } catch (SlaveUnavailableException e) {
0921:                    return getName() + ":offline";
0922:                }
0923:            }
0924:
0925:            public void run() {
0926:                logger.debug("Starting RemoteSlave for " + getName());
0927:
0928:                try {
0929:                    String pingIndex = null;
0930:                    while (isOnline()) {
0931:                        AsyncResponse ar = null;
0932:
0933:                        try {
0934:                            ar = readAsyncResponse();
0935:                            _lastResponseReceived = System.currentTimeMillis();
0936:                        } catch (SlaveUnavailableException e3) {
0937:                            // no reason for slave thread to be running if the slave is
0938:                            // not online
0939:                            return;
0940:                        } catch (SocketTimeoutException e) {
0941:                            // handled below
0942:                        }
0943:
0944:                        if (pingIndex == null
0945:                                && ((getActualTimeout() / 2 < (System
0946:                                        .currentTimeMillis() - _lastResponseReceived)) || (getActualTimeout() / 2 < (System
0947:                                        .currentTimeMillis() - _lastCommandSent)))) {
0948:                            pingIndex = issuePingToSlave();
0949:                        } else if (getActualTimeout() < (System
0950:                                .currentTimeMillis() - _lastResponseReceived)) {
0951:                            setOffline("Slave seems to have gone offline, have not received a response in "
0952:                                    + (System.currentTimeMillis() - _lastResponseReceived)
0953:                                    + " milliseconds");
0954:                            throw new SlaveUnavailableException();
0955:                        }
0956:
0957:                        if (ar == null) {
0958:                            continue;
0959:                        }
0960:
0961:                        synchronized (this ) {
0962:                            if (!(ar instanceof  AsyncResponseRemerge)
0963:                                    && !(ar instanceof  AsyncResponseTransferStatus)) {
0964:                                logger.debug("Received: " + ar);
0965:                            }
0966:
0967:                            if (ar instanceof  AsyncResponseTransfer) {
0968:                                AsyncResponseTransfer art = (AsyncResponseTransfer) ar;
0969:                                addTransfer((art.getConnectInfo()
0970:                                        .getTransferIndex()),
0971:                                        new RemoteTransfer(
0972:                                                art.getConnectInfo(), this ));
0973:                            }
0974:
0975:                            if (ar.getIndex().equals("Remerge")) {
0976:                                getGlobalContext()
0977:                                        .getSlaveManager()
0978:                                        .putRemergeQueue(
0979:                                                new RemergeMessage(
0980:                                                        (AsyncResponseRemerge) ar,
0981:                                                        this ));
0982:                            } else if (ar.getIndex().equals("DiskStatus")) {
0983:                                _status = ((AsyncResponseDiskStatus) ar)
0984:                                        .getDiskStatus();
0985:                            } else if (ar.getIndex().equals("TransferStatus")) {
0986:                                TransferStatus ats = ((AsyncResponseTransferStatus) ar)
0987:                                        .getTransferStatus();
0988:                                RemoteTransfer rt = null;
0989:
0990:                                try {
0991:                                    rt = getTransfer(ats.getTransferIndex());
0992:                                } catch (SlaveUnavailableException e1) {
0993:
0994:                                    // no reason for slave thread to be running if the
0995:                                    // slave is not online
0996:                                    return;
0997:                                }
0998:
0999:                                rt.updateTransferStatus(ats);
1000:
1001:                                if (ats.isFinished()) {
1002:                                    removeTransfer(ats.getTransferIndex());
1003:                                }
1004:                            } else {
1005:                                _indexWithCommands.put(ar.getIndex(), ar);
1006:                                if (pingIndex != null
1007:                                        && pingIndex.equals(ar.getIndex())) {
1008:                                    fetchResponse(pingIndex);
1009:                                    pingIndex = null;
1010:                                } else {
1011:                                    notifyAll();
1012:                                }
1013:                            }
1014:                        }
1015:                    }
1016:                } catch (Throwable e) {
1017:                    setOffline("error: " + e.getMessage());
1018:                    logger.error("", e);
1019:                }
1020:            }
1021:
1022:            private int getActualTimeout() {
1023:                return Integer.parseInt(getProperty("timeout", Integer
1024:                        .toString(SlaveManager.actualTimeout)));
1025:            }
1026:
1027:            private synchronized void removeTransfer(TransferIndex transferIndex) {
1028:                RemoteTransfer transfer = null;
1029:                synchronized (_transfers) {
1030:                    transfer = _transfers.remove(transferIndex);
1031:                }
1032:                if (transfer == null) {
1033:                    throw new IllegalStateException("there is a bug in code");
1034:                }
1035:                if (transfer.getState() == Transfer.TRANSFER_RECEIVING_UPLOAD) {
1036:                    addReceivedBytes(transfer.getTransfered());
1037:                } else if (transfer.getState() == Transfer.TRANSFER_SENDING_DOWNLOAD) {
1038:                    addSentBytes(transfer.getTransfered());
1039:                } // else, we don't care
1040:            }
1041:
1042:            private void addSentBytes(long transfered) {
1043:                addBytes("bytesSent", transfered);
1044:            }
1045:
1046:            private void addBytes(String field, long transfered) {
1047:                setProperty(field, Long.toString(Long.parseLong(getProperty(
1048:                        field, "0"))
1049:                        + transfered));
1050:            }
1051:
1052:            private void addReceivedBytes(long transfered) {
1053:                addBytes("bytesReceived", transfered);
1054:            }
1055:
1056:            public void setOffline(String reason) {
1057:                logger.debug("setOffline() " + reason);
1058:                setOfflineReal(reason);
1059:            }
1060:
1061:            private final synchronized void setOfflineReal(String reason) {
1062:
1063:                if (_socket != null) {
1064:                    setProperty("lastOnline", Long.toString(System
1065:                            .currentTimeMillis()));
1066:                    try {
1067:                        _socket.close();
1068:                    } catch (IOException e) {
1069:                    }
1070:                    _socket = null;
1071:                }
1072:                _sin = null;
1073:                _sout = null;
1074:                _indexPool = null;
1075:                _indexWithCommands = null;
1076:                _transfers = null;
1077:                _maxPath = 0;
1078:                _status = null;
1079:
1080:                if (_isAvailable) {
1081:                    getGlobalContext().dispatchFtpEvent(
1082:                            new SlaveEvent("DELSLAVE", reason, this ));
1083:                }
1084:
1085:                setAvailable(false);
1086:            }
1087:
1088:            public void setOffline(Throwable t) {
1089:                logger.info("setOffline()", t);
1090:
1091:                if (t.getMessage() == null) {
1092:                    setOfflineReal("No Message");
1093:                } else {
1094:                    setOfflineReal(t.getMessage());
1095:                }
1096:            }
1097:
1098:            /**
1099:             * fetches the next AsyncResponse, if IOException is encountered, the slave
1100:             * is setOffline() and the Exception is thrown
1101:             * 
1102:             * @throws SlaveUnavailableException
1103:             * @throws SocketTimeoutException 
1104:             */
1105:            private AsyncResponse readAsyncResponse()
1106:                    throws SlaveUnavailableException, SocketTimeoutException {
1107:                Object obj = null;
1108:                while (true) {
1109:                    try {
1110:                        obj = _sin.readObject();
1111:                    } catch (ClassNotFoundException e) {
1112:                        logger.error("ClassNotFound reading AsyncResponse", e);
1113:                        setOffline("ClassNotFound reading AsyncResponse");
1114:                        throw new SlaveUnavailableException(
1115:                                "Slave is unavailable - Class Not Found");
1116:                    } catch (SocketTimeoutException e) {
1117:                        // don't want this to be caught by IOException below
1118:                        throw e;
1119:                    } catch (IOException e) {
1120:                        logger.error("IOException reading AsyncResponse", e);
1121:                        setOffline("IOException reading AsyncResponse");
1122:                        throw new SlaveUnavailableException(
1123:                                "Slave is unavailable - IOException");
1124:                    }
1125:                    if (obj != null) {
1126:                        if (obj instanceof  AsyncResponse) {
1127:                            return (AsyncResponse) obj;
1128:                        }
1129:                        logger.error("Throwing away an unexpected class - "
1130:                                + obj.getClass().getName() + " - " + obj);
1131:                    }
1132:                }
1133:            }
1134:
1135:            public void issueAbortToSlave(TransferIndex transferIndex,
1136:                    String reason) throws SlaveUnavailableException {
1137:                if (reason == null) {
1138:                    reason = "null";
1139:                }
1140:                sendCommand(new AsyncCommandArgument("abort", "abort",
1141:                        transferIndex.toString() + "," + reason));
1142:            }
1143:
1144:            public ConnectInfo fetchTransferResponseFromIndex(String index)
1145:                    throws RemoteIOException, SlaveUnavailableException {
1146:                AsyncResponseTransfer art = (AsyncResponseTransfer) fetchResponse(index);
1147:
1148:                return art.getConnectInfo();
1149:            }
1150:
1151:            /**
1152:             * Will not set a slave offline, it is the job of the calling thread to decide to do this
1153:             */
1154:            private synchronized void sendCommand(AsyncCommand rac)
1155:                    throws SlaveUnavailableException {
1156:                if (rac == null) {
1157:                    throw new NullPointerException();
1158:                }
1159:
1160:                if (!isOnline()) {
1161:                    throw new SlaveUnavailableException();
1162:                }
1163:
1164:                try {
1165:                    _sout.writeObject(rac);
1166:                    _sout.flush();
1167:                    _sout.reset();
1168:                } catch (IOException e) {
1169:                    logger.error("error in sendCommand()", e);
1170:                    throw new SlaveUnavailableException(
1171:                            "error sending command (exception already handled)",
1172:                            e);
1173:                }
1174:                _lastCommandSent = System.currentTimeMillis();
1175:            }
1176:
1177:            public String issueSendToSlave(String name, char c, long position,
1178:                    TransferIndex tindex) throws SlaveUnavailableException {
1179:                String index = fetchIndex();
1180:                sendCommand(new AsyncCommandArgument(index, "send", c + ","
1181:                        + position + "," + tindex + "," + name));
1182:
1183:                return index;
1184:            }
1185:
1186:            public String issueRemergeToSlave(String path)
1187:                    throws SlaveUnavailableException {
1188:                String index = fetchIndex();
1189:                sendCommand(new AsyncCommandArgument(index, "remerge", path));
1190:
1191:                return index;
1192:            }
1193:
1194:            public void fetchRemergeResponseFromIndex(String index)
1195:                    throws IOException, SlaveUnavailableException {
1196:                try {
1197:                    fetchResponse(index, 0);
1198:                } catch (RemoteIOException e) {
1199:                    throw (IOException) e.getCause();
1200:                }
1201:            }
1202:
1203:            public boolean checkConnect(Socket socket)
1204:                    throws MalformedPatternException {
1205:                return getMasks().check(socket);
1206:            }
1207:
1208:            public String getProperty(String key) {
1209:                synchronized (_keysAndValues) {
1210:                    return _keysAndValues.getProperty(key);
1211:                }
1212:            }
1213:
1214:            public synchronized void addTransfer(TransferIndex transferIndex,
1215:                    RemoteTransfer transfer) {
1216:                if (!isOnline()) {
1217:                    return;
1218:                }
1219:
1220:                synchronized (_transfers) {
1221:                    _transfers.put(transferIndex, transfer);
1222:                }
1223:            }
1224:
1225:            public synchronized RemoteTransfer getTransfer(
1226:                    TransferIndex transferIndex)
1227:                    throws SlaveUnavailableException {
1228:                if (!isOnline()) {
1229:                    throw new SlaveUnavailableException("Slave is not online");
1230:                }
1231:
1232:                synchronized (_transfers) {
1233:                    RemoteTransfer ret = _transfers.get(transferIndex);
1234:                    if (ret == null)
1235:                        throw new FatalException(
1236:                                "there is a bug somewhere in code, tried to fetch a transfer index that doesn't exist - "
1237:                                        + transferIndex);
1238:                    return ret;
1239:                }
1240:            }
1241:
1242:            public synchronized Collection<RemoteTransfer> getTransfers()
1243:                    throws SlaveUnavailableException {
1244:                if (!isOnline()) {
1245:                    throw new SlaveUnavailableException("Slave is not online");
1246:                }
1247:                synchronized (_transfers) {
1248:                    return Collections
1249:                            .unmodifiableCollection(new ArrayList<RemoteTransfer>(
1250:                                    _transfers.values()));
1251:                }
1252:            }
1253:
1254:            public boolean isMemberOf(String string) {
1255:                StringTokenizer st = new StringTokenizer(getProperty(
1256:                        "keywords", ""), " ");
1257:
1258:                while (st.hasMoreElements()) {
1259:                    if (st.nextToken().equals(string)) {
1260:                        return true;
1261:                    }
1262:                }
1263:
1264:                return false;
1265:            }
1266:
1267:            public void init(GlobalContext globalContext) {
1268:                _gctx = globalContext;
1269:            }
1270:
1271:            public LinkedList<QueuedOperation> getRenameQueue() {
1272:                return _renameQueue;
1273:            }
1274:
1275:            public void setRenameQueue(LinkedList<QueuedOperation> renameQueue) {
1276:                _renameQueue = renameQueue;
1277:            }
1278:
1279:            public void shutdown() {
1280:                try {
1281:                    sendCommand(new AsyncCommand("shutdown",
1282:                            "shutdown gracefully"));
1283:                    setOfflineReal("shutdown gracefully");
1284:                } catch (SlaveUnavailableException e) {
1285:                }
1286:            }
1287:
1288:            public long getLastTimeOnline() {
1289:                if (isOnline()) {
1290:                    return System.currentTimeMillis();
1291:                }
1292:                String value = getProperty("lastOnline");
1293:                // if (value == null) Slave has never been online
1294:                return Long.parseLong(value == null ? "0" : value);
1295:            }
1296:
1297:            public String removeProperty(String key)
1298:                    throws KeyNotFoundException {
1299:                synchronized (_keysAndValues) {
1300:                    if (getProperty(key) == null)
1301:                        throw new KeyNotFoundException();
1302:                    String value = (String) _keysAndValues.remove(key);
1303:                    commit();
1304:                    return value;
1305:                }
1306:            }
1307:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.