0001: /*
0002: * This file is part of DrFTPD, Distributed FTP Daemon.
0003: *
0004: * DrFTPD is free software; you can redistribute it and/or modify
0005: * it under the terms of the GNU General Public License as published by
0006: * the Free Software Foundation; either version 2 of the License, or
0007: * (at your option) any later version.
0008: *
0009: * DrFTPD is distributed in the hope that it will be useful,
0010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
0012: * GNU General Public License for more details.
0013: *
0014: * You should have received a copy of the GNU General Public License
0015: * along with DrFTPD; if not, write to the Free Software
0016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0017: */
0018: package org.drftpd.master;
0019:
0020: import java.beans.DefaultPersistenceDelegate;
0021: import java.beans.ExceptionListener;
0022: import java.beans.IntrospectionException;
0023: import java.beans.Introspector;
0024: import java.beans.PropertyDescriptor;
0025: import java.beans.XMLEncoder;
0026: import java.io.FileNotFoundException;
0027: import java.io.IOException;
0028: import java.io.ObjectInputStream;
0029: import java.io.ObjectOutputStream;
0030: import java.io.Serializable;
0031: import java.net.Socket;
0032: import java.net.SocketException;
0033: import java.net.SocketTimeoutException;
0034: import java.util.ArrayList;
0035: import java.util.Collection;
0036: import java.util.Collections;
0037: import java.util.EmptyStackException;
0038: import java.util.HashMap;
0039: import java.util.Hashtable;
0040: import java.util.Iterator;
0041: import java.util.LinkedList;
0042: import java.util.Properties;
0043: import java.util.Stack;
0044: import java.util.StringTokenizer;
0045:
0046: import net.sf.drftpd.DuplicateElementException;
0047: import net.sf.drftpd.FatalException;
0048: import net.sf.drftpd.SlaveUnavailableException;
0049: import net.sf.drftpd.event.SlaveEvent;
0050:
0051: import org.apache.log4j.Logger;
0052: import org.apache.oro.text.regex.MalformedPatternException;
0053: import org.drftpd.GlobalContext;
0054: import org.drftpd.LightSFVFile;
0055: import org.drftpd.dynamicdata.Key;
0056: import org.drftpd.dynamicdata.KeyNotFoundException;
0057: import org.drftpd.id3.ID3Tag;
0058: import org.drftpd.io.SafeFileOutputStream;
0059: import org.drftpd.remotefile.LinkedRemoteFileInterface;
0060: import org.drftpd.slave.ConnectInfo;
0061: import org.drftpd.slave.DiskStatus;
0062: import org.drftpd.slave.RemoteIOException;
0063: import org.drftpd.slave.SlaveStatus;
0064: import org.drftpd.slave.Transfer;
0065: import org.drftpd.slave.TransferIndex;
0066: import org.drftpd.slave.TransferStatus;
0067: import org.drftpd.slave.async.AsyncCommand;
0068: import org.drftpd.slave.async.AsyncCommandArgument;
0069: import org.drftpd.slave.async.AsyncResponse;
0070: import org.drftpd.slave.async.AsyncResponseChecksum;
0071: import org.drftpd.slave.async.AsyncResponseDIZFile;
0072: import org.drftpd.slave.async.AsyncResponseDiskStatus;
0073: import org.drftpd.slave.async.AsyncResponseException;
0074: import org.drftpd.slave.async.AsyncResponseID3Tag;
0075: import org.drftpd.slave.async.AsyncResponseMaxPath;
0076: import org.drftpd.slave.async.AsyncResponseRemerge;
0077: import org.drftpd.slave.async.AsyncResponseSFVFile;
0078: import org.drftpd.slave.async.AsyncResponseTransfer;
0079: import org.drftpd.slave.async.AsyncResponseTransferStatus;
0080: import org.drftpd.usermanager.Entity;
0081: import org.drftpd.usermanager.HostMask;
0082: import org.drftpd.usermanager.HostMaskCollection;
0083:
0084: /**
0085: * @author mog
0086: * @author zubov
0087: * @version $Id: RemoteSlave.java 1538 2006-12-14 20:55:37Z zubov $
0088: */
0089: public class RemoteSlave implements Runnable, Comparable<RemoteSlave>,
0090: Serializable, Entity {
0091: private static final long serialVersionUID = -6973935289361817125L;
0092:
0093: private final String[] transientFields = { "available",
0094: "lastDownloadSending", "lastUploadReceiving" };
0095:
0096: private static final Logger logger = Logger
0097: .getLogger(RemoteSlave.class);
0098:
0099: private transient boolean _isAvailable;
0100:
0101: protected transient int _errors;
0102:
0103: private transient GlobalContext _gctx;
0104:
0105: private transient long _lastDownloadSending = 0;
0106:
0107: protected transient long _lastNetworkError;
0108:
0109: private transient long _lastUploadReceiving = 0;
0110:
0111: private transient long _lastResponseReceived = System
0112: .currentTimeMillis();
0113:
0114: private transient long _lastCommandSent = System
0115: .currentTimeMillis();
0116:
0117: private transient int _maxPath;
0118:
0119: private transient String _name;
0120:
0121: private transient DiskStatus _status;
0122:
0123: private HostMaskCollection _ipMasks;
0124:
0125: private Properties _keysAndValues;
0126:
0127: private LinkedList<QueuedOperation> _renameQueue;
0128:
0129: private transient Stack<String> _indexPool;
0130:
0131: private transient HashMap<String, AsyncResponse> _indexWithCommands;
0132:
0133: private transient ObjectInputStream _sin;
0134:
0135: private transient Socket _socket;
0136:
0137: private transient ObjectOutputStream _sout;
0138:
0139: private transient HashMap<TransferIndex, RemoteTransfer> _transfers;
0140:
0141: public RemoteSlave(String name) {
0142: _name = name;
0143: _keysAndValues = new Properties();
0144: _ipMasks = new HostMaskCollection();
0145: _renameQueue = new LinkedList<QueuedOperation>();
0146: }
0147:
0148: /**
0149: * Used by everything including tests
0150: */
0151: public RemoteSlave(String name, GlobalContext gctx) {
0152: this (name);
0153: _gctx = gctx;
0154: commit();
0155: }
0156:
0157: public final static Hashtable rslavesToHashtable(Collection rslaves) {
0158: Hashtable<String, RemoteSlave> map = new Hashtable<String, RemoteSlave>(
0159: rslaves.size());
0160:
0161: for (Iterator iter = rslaves.iterator(); iter.hasNext();) {
0162: RemoteSlave rslave = (RemoteSlave) iter.next();
0163: map.put(rslave.getName(), rslave);
0164: }
0165:
0166: return map;
0167: }
0168:
0169: public void addMask(String mask) throws DuplicateElementException {
0170: _ipMasks.addMask(mask);
0171: commit();
0172: }
0173:
0174: /**
0175: * If X # of errors occur in Y amount of time, kick slave offline
0176: */
0177: public final void addNetworkError(SocketException e) {
0178: // set slave offline if too many network errors
0179: long errortimeout = Long.parseLong(getProperty("errortimeout",
0180: "60000")); // one
0181: // minute
0182:
0183: if (errortimeout <= 0) {
0184: errortimeout = 60000;
0185: }
0186:
0187: int maxerrors = Integer.parseInt(getProperty("maxerrors", "5"));
0188:
0189: if (maxerrors < 0) {
0190: maxerrors = 5;
0191: }
0192:
0193: _errors -= ((System.currentTimeMillis() - _lastNetworkError) / errortimeout);
0194:
0195: if (_errors < 0) {
0196: _errors = 0;
0197: }
0198:
0199: _errors++;
0200: _lastNetworkError = System.currentTimeMillis();
0201:
0202: if (_errors > maxerrors) {
0203: setOffline("Too many network errors - " + e.getMessage());
0204: logger.error("Too many network errors - " + e);
0205: }
0206: }
0207:
0208: protected void addQueueDelete(String fileName) {
0209: addQueueRename(fileName, null);
0210: }
0211:
0212: protected void addQueueRename(String fileName, String destName) {
0213: if (isOnline()) {
0214: throw new IllegalStateException(
0215: "Slave is online, you cannot queue an operation");
0216: }
0217: _renameQueue.add(new QueuedOperation(fileName, destName));
0218: commit();
0219: }
0220:
0221: public void setProperty(String name, String value) {
0222: synchronized (_keysAndValues) {
0223: _keysAndValues.setProperty(name, value);
0224: commit();
0225: }
0226: }
0227:
0228: public String getProperty(String name, String def) {
0229: synchronized (_keysAndValues) {
0230: return _keysAndValues.getProperty(name, def);
0231: }
0232: }
0233:
0234: public Properties getProperties() {
0235: synchronized (_keysAndValues) {
0236: return (Properties) _keysAndValues.clone();
0237: }
0238: }
0239:
0240: /**
0241: * Needed in order for this class to be a Bean
0242: */
0243: public void setProperties(Properties keysAndValues) {
0244: _keysAndValues = keysAndValues;
0245: }
0246:
0247: public void commit() {
0248: try {
0249:
0250: XMLEncoder out = new XMLEncoder(new SafeFileOutputStream(
0251: (getGlobalContext().getSlaveManager()
0252: .getSlaveFile(this .getName()))));
0253: out.setExceptionListener(new ExceptionListener() {
0254: public void exceptionThrown(Exception e) {
0255: logger.warn("", e);
0256: }
0257: });
0258: out.setPersistenceDelegate(Key.class,
0259: new DefaultPersistenceDelegate(new String[] {
0260: "owner", "key", "type" }));
0261: out.setPersistenceDelegate(HostMask.class,
0262: new DefaultPersistenceDelegate(
0263: new String[] { "mask" }));
0264: out.setPersistenceDelegate(RemoteSlave.class,
0265: new DefaultPersistenceDelegate(
0266: new String[] { "name" }));
0267: out.setPersistenceDelegate(QueuedOperation.class,
0268: new DefaultPersistenceDelegate(new String[] {
0269: "source", "destination" }));
0270: try {
0271: PropertyDescriptor[] pdArr = Introspector.getBeanInfo(
0272: RemoteSlave.class).getPropertyDescriptors();
0273: ArrayList<String> transientList = new ArrayList<String>();
0274: for (int x = 0; x < transientFields.length; x++) {
0275: transientList.add(transientFields[x]);
0276: }
0277: for (int x = 0; x < pdArr.length; x++) {
0278: if (transientList.contains(pdArr[x].getName())) {
0279: pdArr[x].setValue("transient", Boolean.TRUE);
0280: }
0281: }
0282: } catch (IntrospectionException e1) {
0283: logger.error("I don't know what to do here", e1);
0284: throw new RuntimeException(e1);
0285: }
0286: try {
0287: out.writeObject(this );
0288: } finally {
0289: out.close();
0290: }
0291:
0292: Logger.getLogger(RemoteSlave.class).debug(
0293: "wrote " + getName());
0294: } catch (IOException ex) {
0295: throw new RuntimeException("Error writing slavefile for "
0296: + this .getName() + ": " + ex.getMessage(), ex);
0297: }
0298: }
0299:
0300: public final int compareTo(RemoteSlave o) {
0301: return getName().compareTo(o.getName());
0302: }
0303:
0304: public final boolean equals(Object obj) {
0305: try {
0306: return ((RemoteSlave) obj).getName().equals(getName());
0307: } catch (NullPointerException e) {
0308: return false;
0309: }
0310: }
0311:
0312: public GlobalContext getGlobalContext() {
0313: return _gctx;
0314: }
0315:
0316: public final long getLastDownloadSending() {
0317: return _lastDownloadSending;
0318: }
0319:
0320: public final long getLastTransfer() {
0321: return Math.max(getLastDownloadSending(),
0322: getLastUploadReceiving());
0323: }
0324:
0325: public long getLastTransferForDirection(char dir) {
0326: if (dir == Transfer.TRANSFER_RECEIVING_UPLOAD) {
0327: return getLastUploadReceiving();
0328: } else if (dir == Transfer.TRANSFER_SENDING_DOWNLOAD) {
0329: return getLastDownloadSending();
0330: } else if (dir == Transfer.TRANSFER_THROUGHPUT) {
0331: return getLastTransfer();
0332: } else {
0333: throw new IllegalArgumentException();
0334: }
0335: }
0336:
0337: public final long getLastUploadReceiving() {
0338: return _lastUploadReceiving;
0339: }
0340:
0341: public HostMaskCollection getMasks() {
0342: return _ipMasks;
0343: }
0344:
0345: public void setMasks(HostMaskCollection masks) {
0346: _ipMasks = masks;
0347: }
0348:
0349: /**
0350: * Returns the name.
0351: */
0352: public String getName() {
0353: return _name;
0354: }
0355:
0356: /**
0357: * Returns the RemoteSlave's saved SlaveStatus, can return a status before
0358: * remerge() is completed
0359: */
0360: public synchronized SlaveStatus getSlaveStatus()
0361: throws SlaveUnavailableException {
0362: if ((_status == null) || !isOnline()) {
0363: throw new SlaveUnavailableException();
0364: }
0365: int throughputUp = 0;
0366: int throughputDown = 0;
0367: int transfersUp = 0;
0368: int transfersDown = 0;
0369: long bytesReceived;
0370: long bytesSent;
0371:
0372: synchronized (_transfers) {
0373: bytesReceived = getReceivedBytes();
0374: bytesSent = getSentBytes();
0375:
0376: for (Iterator i = _transfers.values().iterator(); i
0377: .hasNext();) {
0378: RemoteTransfer transfer = (RemoteTransfer) i.next();
0379: switch (transfer.getState()) {
0380: case Transfer.TRANSFER_RECEIVING_UPLOAD:
0381: throughputUp += transfer.getXferSpeed();
0382: bytesReceived += transfer.getTransfered();
0383: transfersUp += 1;
0384: break;
0385:
0386: case Transfer.TRANSFER_SENDING_DOWNLOAD:
0387: throughputDown += transfer.getXferSpeed();
0388: transfersDown += 1;
0389: bytesSent += transfer.getTransfered();
0390: break;
0391:
0392: case Transfer.TRANSFER_UNKNOWN:
0393: case Transfer.TRANSFER_THROUGHPUT:
0394: break;
0395:
0396: default:
0397: throw new FatalException(
0398: "unrecognized direction - "
0399: + transfer.getState() + " for "
0400: + transfer);
0401: }
0402: }
0403: }
0404:
0405: return new SlaveStatus(_status, bytesSent, bytesReceived,
0406: throughputUp, transfersUp, throughputDown,
0407: transfersDown);
0408: }
0409:
0410: public long getSentBytes() {
0411: return Long.parseLong(getProperty("bytesSent", "0"));
0412: }
0413:
0414: public long getReceivedBytes() {
0415: return Long.parseLong(getProperty("bytesReceived", "0"));
0416: }
0417:
0418: /**
0419: * Returns the RemoteSlave's stored SlaveStatus, will not return a status
0420: * before remerge() is completed
0421: */
0422: public synchronized SlaveStatus getSlaveStatusAvailable()
0423: throws SlaveUnavailableException {
0424: if (isAvailable()) {
0425: return getSlaveStatus();
0426: }
0427:
0428: throw new SlaveUnavailableException("Slave is not online");
0429: }
0430:
0431: public final int hashCode() {
0432: return getName().hashCode();
0433: }
0434:
0435: /**
0436: * Called when the slave connects
0437: */
0438: private void initializeSlaveAfterThreadIsRunning()
0439: throws IOException, SlaveUnavailableException {
0440: commit();
0441: processQueue();
0442:
0443: String maxPathIndex = issueMaxPathToSlave();
0444: _maxPath = fetchMaxPathFromIndex(maxPathIndex);
0445: logger.debug("maxpath was received");
0446:
0447: String remergeIndex = issueRemergeToSlave("/");
0448: fetchRemergeResponseFromIndex(remergeIndex);
0449: getGlobalContext().getSlaveManager().putRemergeQueue(
0450: new RemergeMessage(this ));
0451: setAvailable(true);
0452: logger.info("Slave added: '" + getName() + "' status: "
0453: + _status);
0454: getGlobalContext().dispatchFtpEvent(
0455: new SlaveEvent("ADDSLAVE", this ));
0456: }
0457:
0458: /**
0459: * @return true if the slave has synchronized its filelist since last
0460: * connect
0461: */
0462: public synchronized boolean isAvailable() {
0463: return _isAvailable;
0464: }
0465:
0466: public boolean isAvailablePing() {
0467: if (!isAvailable()) {
0468: return false;
0469: }
0470:
0471: try {
0472: String index = issuePingToSlave();
0473: fetchResponse(index);
0474: } catch (SlaveUnavailableException e) {
0475: setOffline(e);
0476: return false;
0477: } catch (RemoteIOException e) {
0478: setOffline("The slave encountered an IOException while running ping...this is almost not possible");
0479: return false;
0480: }
0481:
0482: return isAvailable();
0483: }
0484:
0485: public void processQueue() throws IOException,
0486: SlaveUnavailableException {
0487: //no for-each loop, needs iter.remove()
0488: for (Iterator<QueuedOperation> iter = _renameQueue.iterator(); iter
0489: .hasNext();) {
0490: QueuedOperation item = iter.next();
0491: String sourceFile = item.getSource();
0492: String destFile = item.getDestination();
0493:
0494: if (destFile == null) { // delete
0495: try {
0496: fetchResponse(issueDeleteToSlave(sourceFile),
0497: 300000);
0498: } catch (RemoteIOException e) {
0499: if (!(e.getCause() instanceof FileNotFoundException)) {
0500: throw (IOException) e.getCause();
0501: }
0502: } finally {
0503: iter.remove();
0504: commit();
0505: }
0506: } else { // rename
0507: String fileName = destFile.substring(destFile
0508: .lastIndexOf("/") + 1);
0509: String destDir = destFile.substring(0, destFile
0510: .lastIndexOf("/"));
0511: try {
0512: fetchResponse(issueRenameToSlave(sourceFile,
0513: destDir, fileName));
0514: } catch (RemoteIOException e) {
0515: if (!(e.getCause() instanceof FileNotFoundException)) {
0516: throw (IOException) e.getCause();
0517: }
0518: } finally {
0519: iter.remove();
0520: commit();
0521: }
0522: }
0523: }
0524: }
0525:
0526: /**
0527: * @return true if the mask was removed successfully
0528: */
0529: public final boolean removeMask(String mask) {
0530: boolean ret = _ipMasks.removeMask(mask);
0531:
0532: if (ret) {
0533: commit();
0534: }
0535:
0536: return ret;
0537: }
0538:
0539: public synchronized void setAvailable(boolean available) {
0540: _isAvailable = available;
0541: }
0542:
0543: public final void setLastDirection(char direction, long l) {
0544: switch (direction) {
0545: case Transfer.TRANSFER_RECEIVING_UPLOAD:
0546: setLastUploadReceiving(l);
0547:
0548: return;
0549:
0550: case Transfer.TRANSFER_SENDING_DOWNLOAD:
0551: setLastDownloadSending(l);
0552:
0553: return;
0554:
0555: default:
0556: throw new IllegalArgumentException();
0557: }
0558: }
0559:
0560: public final void setLastDownloadSending(long lastDownloadSending) {
0561: _lastDownloadSending = lastDownloadSending;
0562: }
0563:
0564: public final void setLastUploadReceiving(long lastUploadReceiving) {
0565: _lastUploadReceiving = lastUploadReceiving;
0566: }
0567:
0568: /**
0569: * Deletes files/directories and waits for the response
0570: * Meant to be used if you don't want to utilize asynchronization
0571: */
0572: public void simpleDelete(String path) {
0573: try {
0574: fetchResponse(issueDeleteToSlave(path), 300000);
0575: } catch (RemoteIOException e) {
0576: if (e.getCause() instanceof FileNotFoundException) {
0577: return;
0578: }
0579:
0580: setOffline("IOException deleting file, check logs for specific error");
0581: addQueueDelete(path);
0582: logger
0583: .error(
0584: "IOException deleting file, file will be deleted when slave comes online",
0585: e);
0586: } catch (SlaveUnavailableException e) {
0587: // Already offline and we ARE successful in deleting the file
0588: addQueueDelete(path);
0589: }
0590: }
0591:
0592: /**
0593: * Renames files/directories and waits for the response
0594: */
0595: public void simpleRename(String from, String toDirPath,
0596: String toName) {
0597: String simplePath = null;
0598: if (toDirPath.endsWith("/")) {
0599: simplePath = toDirPath + toName;
0600: } else {
0601: simplePath = toDirPath + "/" + toName;
0602: }
0603: try {
0604: fetchResponse(issueRenameToSlave(from, toDirPath, toName));
0605: } catch (RemoteIOException e) {
0606: setOffline(e);
0607: addQueueRename(from, simplePath);
0608: } catch (SlaveUnavailableException e) {
0609: addQueueRename(from, simplePath);
0610: }
0611: }
0612:
0613: public String toString() {
0614: return moreInfo();
0615: }
0616:
0617: public static String getSlaveNameFromObjectInput(
0618: ObjectInputStream in) throws IOException {
0619: try {
0620: return (String) in.readObject();
0621: } catch (ClassNotFoundException e) {
0622: throw new RuntimeException(e);
0623: }
0624: }
0625:
0626: public synchronized void connect(Socket socket,
0627: ObjectInputStream in, ObjectOutputStream out)
0628: throws IOException {
0629: _socket = socket;
0630: _sout = out;
0631: _sin = in;
0632: _indexPool = new Stack<String>();
0633:
0634: for (int i = 0; i < 256; i++) {
0635: String key = Integer.toHexString(i);
0636:
0637: if (key.length() < 2) {
0638: key = "0" + key;
0639: }
0640:
0641: _indexPool.push(key);
0642: }
0643:
0644: _indexWithCommands = new HashMap<String, AsyncResponse>();
0645: _transfers = new HashMap<TransferIndex, RemoteTransfer>();
0646: _errors = 0;
0647: _lastNetworkError = System.currentTimeMillis();
0648: start();
0649: class RemergeThread implements Runnable {
0650: public void run() {
0651: try {
0652: initializeSlaveAfterThreadIsRunning();
0653: } catch (IOException e) {
0654: setOffline(e);
0655: } catch (SlaveUnavailableException e) {
0656: setOffline(e);
0657: }
0658: }
0659: }
0660:
0661: new Thread(new RemergeThread(), "RemoteSlaveRemerge - "
0662: + getName()).start();
0663: }
0664:
0665: private void start() {
0666: Thread t = new Thread(this );
0667: t.setName("RemoteSlave - " + getName());
0668: t.start();
0669: }
0670:
0671: public long fetchChecksumFromIndex(String index)
0672: throws RemoteIOException, SlaveUnavailableException {
0673: return ((AsyncResponseChecksum) fetchResponse(index))
0674: .getChecksum();
0675: }
0676:
0677: public ID3Tag fetchID3TagFromIndex(String index)
0678: throws RemoteIOException, SlaveUnavailableException {
0679: return ((AsyncResponseID3Tag) fetchResponse(index)).getTag();
0680: }
0681:
0682: private synchronized String fetchIndex()
0683: throws SlaveUnavailableException {
0684: while (isOnline()) {
0685: try {
0686: return _indexPool.pop();
0687: } catch (EmptyStackException e) {
0688: logger
0689: .error("Too many commands sent, need to wait for the slave to process commands");
0690: }
0691:
0692: try {
0693: wait();
0694: } catch (InterruptedException e1) {
0695: }
0696: }
0697:
0698: throw new SlaveUnavailableException(
0699: "Slave was offline or went offline while fetching an index");
0700: }
0701:
0702: public int fetchMaxPathFromIndex(String maxPathIndex)
0703: throws SlaveUnavailableException {
0704: try {
0705: return ((AsyncResponseMaxPath) fetchResponse(maxPathIndex))
0706: .getMaxPath();
0707: } catch (RemoteIOException e) {
0708: throw new FatalException(
0709: "this is not possible, slave had an error processing maxpath...");
0710: }
0711: }
0712:
0713: /**
0714: * @see fetchResponse(String index, int wait)
0715: */
0716: public AsyncResponse fetchResponse(String index)
0717: throws SlaveUnavailableException, RemoteIOException {
0718: return fetchResponse(index, 60 * 1000);
0719: }
0720:
0721: /**
0722: * returns an AsyncResponse for that index and throws any exceptions thrown
0723: * on the Slave side
0724: */
0725: public synchronized AsyncResponse fetchResponse(String index,
0726: int wait) throws SlaveUnavailableException,
0727: RemoteIOException {
0728: long total = System.currentTimeMillis();
0729:
0730: while (isOnline() && !_indexWithCommands.containsKey(index)) {
0731: try {
0732: wait(1000);
0733:
0734: // will wait a maximum of 1000 milliseconds before waking up
0735: } catch (InterruptedException e) {
0736: }
0737:
0738: if ((wait != 0)
0739: && ((System.currentTimeMillis() - total) >= wait)) {
0740: setOffline("Slave has taken too long while waiting for reply "
0741: + index);
0742: }
0743: }
0744:
0745: if (!isOnline()) {
0746: throw new SlaveUnavailableException(
0747: "Slave went offline while processing command");
0748: }
0749:
0750: AsyncResponse rar = _indexWithCommands.remove(index);
0751: _indexPool.push(index);
0752: notifyAll();
0753:
0754: if (rar instanceof AsyncResponseException) {
0755: Throwable t = ((AsyncResponseException) rar).getThrowable();
0756:
0757: if (t instanceof IOException) {
0758: throw new RemoteIOException((IOException) t);
0759: }
0760:
0761: logger
0762: .error(
0763: "Exception on slave that is unable to be handled by the master",
0764: t);
0765: setOffline("Exception on slave that is unable to be handled by the master");
0766: throw new SlaveUnavailableException(
0767: "Exception on slave that is unable to be handled by the master");
0768: }
0769: return rar;
0770: }
0771:
0772: public String fetchDIZFileFromIndex(String index)
0773: throws RemoteIOException, SlaveUnavailableException {
0774: return ((AsyncResponseDIZFile) fetchResponse(index)).getDIZ();
0775: }
0776:
0777: public LightSFVFile fetchSFVFileFromIndex(String index)
0778: throws RemoteIOException, SlaveUnavailableException {
0779: return ((AsyncResponseSFVFile) fetchResponse(index)).getSFV();
0780: }
0781:
0782: public synchronized String getPASVIP()
0783: throws SlaveUnavailableException {
0784: if (!isOnline())
0785: throw new SlaveUnavailableException();
0786: return getProperty("pasv_addr", _socket.getInetAddress()
0787: .getHostAddress());
0788: }
0789:
0790: public int getPort() {
0791: return _socket.getPort();
0792: }
0793:
0794: public synchronized boolean isOnline() {
0795: return ((_socket != null) && _socket.isConnected());
0796: }
0797:
0798: /**
0799: *
0800: * @param string
0801: * @return
0802: * @throws SlaveUnavailableException
0803: */
0804: public String issueChecksumToSlave(String string)
0805: throws SlaveUnavailableException {
0806: String index = fetchIndex();
0807: sendCommand(new AsyncCommandArgument(index, "checksum", string));
0808:
0809: return index;
0810: }
0811:
0812: public String issueConnectToSlave(String ip, int port,
0813: boolean encryptedDataChannel, boolean useSSLClientHandshake)
0814: throws SlaveUnavailableException {
0815: String index = fetchIndex();
0816: sendCommand(new AsyncCommandArgument(index, "connect", ip + ":"
0817: + port + "," + encryptedDataChannel + ","
0818: + useSSLClientHandshake));
0819:
0820: return index;
0821: }
0822:
0823: /**
0824: * @return String index, needs to be used to fetch the response
0825: */
0826: public String issueDeleteToSlave(String sourceFile)
0827: throws SlaveUnavailableException {
0828: String index = fetchIndex();
0829: sendCommand(new AsyncCommandArgument(index, "delete",
0830: sourceFile));
0831:
0832: return index;
0833: }
0834:
0835: public String issueID3TagToSlave(String path)
0836: throws SlaveUnavailableException {
0837: String index = fetchIndex();
0838: sendCommand(new AsyncCommandArgument(index, "id3tag", path));
0839:
0840: return index;
0841: }
0842:
0843: public String issueListenToSlave(boolean isSecureTransfer,
0844: boolean useSSLClientMode) throws SlaveUnavailableException {
0845: String index = fetchIndex();
0846: sendCommand(new AsyncCommandArgument(index, "listen", ""
0847: + isSecureTransfer + ":" + useSSLClientMode));
0848:
0849: return index;
0850: }
0851:
0852: public String issueMaxPathToSlave()
0853: throws SlaveUnavailableException {
0854: String index = fetchIndex();
0855: sendCommand(new AsyncCommand(index, "maxpath"));
0856:
0857: return index;
0858: }
0859:
0860: private String issuePingToSlave() throws SlaveUnavailableException {
0861: String index = fetchIndex();
0862: sendCommand(new AsyncCommand(index, "ping"));
0863:
0864: return index;
0865: }
0866:
0867: public String issueReceiveToSlave(String name, char c,
0868: long position, TransferIndex tindex)
0869: throws SlaveUnavailableException {
0870: String index = fetchIndex();
0871: sendCommand(new AsyncCommandArgument(index, "receive", c + ","
0872: + position + "," + tindex + "," + name));
0873:
0874: return index;
0875: }
0876:
0877: public String issueRenameToSlave(String from, String toDirPath,
0878: String toName) throws SlaveUnavailableException {
0879: if (toDirPath.length() == 0) { // needed for files in root
0880: toDirPath = "/";
0881: }
0882: String index = fetchIndex();
0883: sendCommand(new AsyncCommandArgument(index, "rename", from
0884: + "," + toDirPath + "," + toName));
0885:
0886: return index;
0887: }
0888:
0889: public String issueDIZFileToSlave(LinkedRemoteFileInterface file)
0890: throws SlaveUnavailableException {
0891: String index = fetchIndex();
0892: AsyncCommand ac = new AsyncCommandArgument(index, "dizfile",
0893: file.getPath());
0894:
0895: sendCommand(ac);
0896: return index;
0897: }
0898:
0899: public String issueSFVFileToSlave(String path)
0900: throws SlaveUnavailableException {
0901: String index = fetchIndex();
0902: AsyncCommand ac = new AsyncCommandArgument(index, "sfvfile",
0903: path);
0904: sendCommand(ac);
0905:
0906: return index;
0907: }
0908:
0909: public String issueStatusToSlave() throws SlaveUnavailableException {
0910: String index = fetchIndex();
0911: sendCommand(new AsyncCommand(index, "status"));
0912:
0913: return index;
0914: }
0915:
0916: public String moreInfo() {
0917: try {
0918: return getName() + ":address=[" + getPASVIP() + "]port=["
0919: + Integer.toString(getPort()) + "]";
0920: } catch (SlaveUnavailableException e) {
0921: return getName() + ":offline";
0922: }
0923: }
0924:
0925: public void run() {
0926: logger.debug("Starting RemoteSlave for " + getName());
0927:
0928: try {
0929: String pingIndex = null;
0930: while (isOnline()) {
0931: AsyncResponse ar = null;
0932:
0933: try {
0934: ar = readAsyncResponse();
0935: _lastResponseReceived = System.currentTimeMillis();
0936: } catch (SlaveUnavailableException e3) {
0937: // no reason for slave thread to be running if the slave is
0938: // not online
0939: return;
0940: } catch (SocketTimeoutException e) {
0941: // handled below
0942: }
0943:
0944: if (pingIndex == null
0945: && ((getActualTimeout() / 2 < (System
0946: .currentTimeMillis() - _lastResponseReceived)) || (getActualTimeout() / 2 < (System
0947: .currentTimeMillis() - _lastCommandSent)))) {
0948: pingIndex = issuePingToSlave();
0949: } else if (getActualTimeout() < (System
0950: .currentTimeMillis() - _lastResponseReceived)) {
0951: setOffline("Slave seems to have gone offline, have not received a response in "
0952: + (System.currentTimeMillis() - _lastResponseReceived)
0953: + " milliseconds");
0954: throw new SlaveUnavailableException();
0955: }
0956:
0957: if (ar == null) {
0958: continue;
0959: }
0960:
0961: synchronized (this ) {
0962: if (!(ar instanceof AsyncResponseRemerge)
0963: && !(ar instanceof AsyncResponseTransferStatus)) {
0964: logger.debug("Received: " + ar);
0965: }
0966:
0967: if (ar instanceof AsyncResponseTransfer) {
0968: AsyncResponseTransfer art = (AsyncResponseTransfer) ar;
0969: addTransfer((art.getConnectInfo()
0970: .getTransferIndex()),
0971: new RemoteTransfer(
0972: art.getConnectInfo(), this ));
0973: }
0974:
0975: if (ar.getIndex().equals("Remerge")) {
0976: getGlobalContext()
0977: .getSlaveManager()
0978: .putRemergeQueue(
0979: new RemergeMessage(
0980: (AsyncResponseRemerge) ar,
0981: this ));
0982: } else if (ar.getIndex().equals("DiskStatus")) {
0983: _status = ((AsyncResponseDiskStatus) ar)
0984: .getDiskStatus();
0985: } else if (ar.getIndex().equals("TransferStatus")) {
0986: TransferStatus ats = ((AsyncResponseTransferStatus) ar)
0987: .getTransferStatus();
0988: RemoteTransfer rt = null;
0989:
0990: try {
0991: rt = getTransfer(ats.getTransferIndex());
0992: } catch (SlaveUnavailableException e1) {
0993:
0994: // no reason for slave thread to be running if the
0995: // slave is not online
0996: return;
0997: }
0998:
0999: rt.updateTransferStatus(ats);
1000:
1001: if (ats.isFinished()) {
1002: removeTransfer(ats.getTransferIndex());
1003: }
1004: } else {
1005: _indexWithCommands.put(ar.getIndex(), ar);
1006: if (pingIndex != null
1007: && pingIndex.equals(ar.getIndex())) {
1008: fetchResponse(pingIndex);
1009: pingIndex = null;
1010: } else {
1011: notifyAll();
1012: }
1013: }
1014: }
1015: }
1016: } catch (Throwable e) {
1017: setOffline("error: " + e.getMessage());
1018: logger.error("", e);
1019: }
1020: }
1021:
1022: private int getActualTimeout() {
1023: return Integer.parseInt(getProperty("timeout", Integer
1024: .toString(SlaveManager.actualTimeout)));
1025: }
1026:
1027: private synchronized void removeTransfer(TransferIndex transferIndex) {
1028: RemoteTransfer transfer = null;
1029: synchronized (_transfers) {
1030: transfer = _transfers.remove(transferIndex);
1031: }
1032: if (transfer == null) {
1033: throw new IllegalStateException("there is a bug in code");
1034: }
1035: if (transfer.getState() == Transfer.TRANSFER_RECEIVING_UPLOAD) {
1036: addReceivedBytes(transfer.getTransfered());
1037: } else if (transfer.getState() == Transfer.TRANSFER_SENDING_DOWNLOAD) {
1038: addSentBytes(transfer.getTransfered());
1039: } // else, we don't care
1040: }
1041:
1042: private void addSentBytes(long transfered) {
1043: addBytes("bytesSent", transfered);
1044: }
1045:
1046: private void addBytes(String field, long transfered) {
1047: setProperty(field, Long.toString(Long.parseLong(getProperty(
1048: field, "0"))
1049: + transfered));
1050: }
1051:
1052: private void addReceivedBytes(long transfered) {
1053: addBytes("bytesReceived", transfered);
1054: }
1055:
1056: public void setOffline(String reason) {
1057: logger.debug("setOffline() " + reason);
1058: setOfflineReal(reason);
1059: }
1060:
1061: private final synchronized void setOfflineReal(String reason) {
1062:
1063: if (_socket != null) {
1064: setProperty("lastOnline", Long.toString(System
1065: .currentTimeMillis()));
1066: try {
1067: _socket.close();
1068: } catch (IOException e) {
1069: }
1070: _socket = null;
1071: }
1072: _sin = null;
1073: _sout = null;
1074: _indexPool = null;
1075: _indexWithCommands = null;
1076: _transfers = null;
1077: _maxPath = 0;
1078: _status = null;
1079:
1080: if (_isAvailable) {
1081: getGlobalContext().dispatchFtpEvent(
1082: new SlaveEvent("DELSLAVE", reason, this ));
1083: }
1084:
1085: setAvailable(false);
1086: }
1087:
1088: public void setOffline(Throwable t) {
1089: logger.info("setOffline()", t);
1090:
1091: if (t.getMessage() == null) {
1092: setOfflineReal("No Message");
1093: } else {
1094: setOfflineReal(t.getMessage());
1095: }
1096: }
1097:
1098: /**
1099: * fetches the next AsyncResponse, if IOException is encountered, the slave
1100: * is setOffline() and the Exception is thrown
1101: *
1102: * @throws SlaveUnavailableException
1103: * @throws SocketTimeoutException
1104: */
1105: private AsyncResponse readAsyncResponse()
1106: throws SlaveUnavailableException, SocketTimeoutException {
1107: Object obj = null;
1108: while (true) {
1109: try {
1110: obj = _sin.readObject();
1111: } catch (ClassNotFoundException e) {
1112: logger.error("ClassNotFound reading AsyncResponse", e);
1113: setOffline("ClassNotFound reading AsyncResponse");
1114: throw new SlaveUnavailableException(
1115: "Slave is unavailable - Class Not Found");
1116: } catch (SocketTimeoutException e) {
1117: // don't want this to be caught by IOException below
1118: throw e;
1119: } catch (IOException e) {
1120: logger.error("IOException reading AsyncResponse", e);
1121: setOffline("IOException reading AsyncResponse");
1122: throw new SlaveUnavailableException(
1123: "Slave is unavailable - IOException");
1124: }
1125: if (obj != null) {
1126: if (obj instanceof AsyncResponse) {
1127: return (AsyncResponse) obj;
1128: }
1129: logger.error("Throwing away an unexpected class - "
1130: + obj.getClass().getName() + " - " + obj);
1131: }
1132: }
1133: }
1134:
1135: public void issueAbortToSlave(TransferIndex transferIndex,
1136: String reason) throws SlaveUnavailableException {
1137: if (reason == null) {
1138: reason = "null";
1139: }
1140: sendCommand(new AsyncCommandArgument("abort", "abort",
1141: transferIndex.toString() + "," + reason));
1142: }
1143:
1144: public ConnectInfo fetchTransferResponseFromIndex(String index)
1145: throws RemoteIOException, SlaveUnavailableException {
1146: AsyncResponseTransfer art = (AsyncResponseTransfer) fetchResponse(index);
1147:
1148: return art.getConnectInfo();
1149: }
1150:
1151: /**
1152: * Will not set a slave offline, it is the job of the calling thread to decide to do this
1153: */
1154: private synchronized void sendCommand(AsyncCommand rac)
1155: throws SlaveUnavailableException {
1156: if (rac == null) {
1157: throw new NullPointerException();
1158: }
1159:
1160: if (!isOnline()) {
1161: throw new SlaveUnavailableException();
1162: }
1163:
1164: try {
1165: _sout.writeObject(rac);
1166: _sout.flush();
1167: _sout.reset();
1168: } catch (IOException e) {
1169: logger.error("error in sendCommand()", e);
1170: throw new SlaveUnavailableException(
1171: "error sending command (exception already handled)",
1172: e);
1173: }
1174: _lastCommandSent = System.currentTimeMillis();
1175: }
1176:
1177: public String issueSendToSlave(String name, char c, long position,
1178: TransferIndex tindex) throws SlaveUnavailableException {
1179: String index = fetchIndex();
1180: sendCommand(new AsyncCommandArgument(index, "send", c + ","
1181: + position + "," + tindex + "," + name));
1182:
1183: return index;
1184: }
1185:
1186: public String issueRemergeToSlave(String path)
1187: throws SlaveUnavailableException {
1188: String index = fetchIndex();
1189: sendCommand(new AsyncCommandArgument(index, "remerge", path));
1190:
1191: return index;
1192: }
1193:
1194: public void fetchRemergeResponseFromIndex(String index)
1195: throws IOException, SlaveUnavailableException {
1196: try {
1197: fetchResponse(index, 0);
1198: } catch (RemoteIOException e) {
1199: throw (IOException) e.getCause();
1200: }
1201: }
1202:
1203: public boolean checkConnect(Socket socket)
1204: throws MalformedPatternException {
1205: return getMasks().check(socket);
1206: }
1207:
1208: public String getProperty(String key) {
1209: synchronized (_keysAndValues) {
1210: return _keysAndValues.getProperty(key);
1211: }
1212: }
1213:
1214: public synchronized void addTransfer(TransferIndex transferIndex,
1215: RemoteTransfer transfer) {
1216: if (!isOnline()) {
1217: return;
1218: }
1219:
1220: synchronized (_transfers) {
1221: _transfers.put(transferIndex, transfer);
1222: }
1223: }
1224:
1225: public synchronized RemoteTransfer getTransfer(
1226: TransferIndex transferIndex)
1227: throws SlaveUnavailableException {
1228: if (!isOnline()) {
1229: throw new SlaveUnavailableException("Slave is not online");
1230: }
1231:
1232: synchronized (_transfers) {
1233: RemoteTransfer ret = _transfers.get(transferIndex);
1234: if (ret == null)
1235: throw new FatalException(
1236: "there is a bug somewhere in code, tried to fetch a transfer index that doesn't exist - "
1237: + transferIndex);
1238: return ret;
1239: }
1240: }
1241:
1242: public synchronized Collection<RemoteTransfer> getTransfers()
1243: throws SlaveUnavailableException {
1244: if (!isOnline()) {
1245: throw new SlaveUnavailableException("Slave is not online");
1246: }
1247: synchronized (_transfers) {
1248: return Collections
1249: .unmodifiableCollection(new ArrayList<RemoteTransfer>(
1250: _transfers.values()));
1251: }
1252: }
1253:
1254: public boolean isMemberOf(String string) {
1255: StringTokenizer st = new StringTokenizer(getProperty(
1256: "keywords", ""), " ");
1257:
1258: while (st.hasMoreElements()) {
1259: if (st.nextToken().equals(string)) {
1260: return true;
1261: }
1262: }
1263:
1264: return false;
1265: }
1266:
1267: public void init(GlobalContext globalContext) {
1268: _gctx = globalContext;
1269: }
1270:
1271: public LinkedList<QueuedOperation> getRenameQueue() {
1272: return _renameQueue;
1273: }
1274:
1275: public void setRenameQueue(LinkedList<QueuedOperation> renameQueue) {
1276: _renameQueue = renameQueue;
1277: }
1278:
1279: public void shutdown() {
1280: try {
1281: sendCommand(new AsyncCommand("shutdown",
1282: "shutdown gracefully"));
1283: setOfflineReal("shutdown gracefully");
1284: } catch (SlaveUnavailableException e) {
1285: }
1286: }
1287:
1288: public long getLastTimeOnline() {
1289: if (isOnline()) {
1290: return System.currentTimeMillis();
1291: }
1292: String value = getProperty("lastOnline");
1293: // if (value == null) Slave has never been online
1294: return Long.parseLong(value == null ? "0" : value);
1295: }
1296:
1297: public String removeProperty(String key)
1298: throws KeyNotFoundException {
1299: synchronized (_keysAndValues) {
1300: if (getProperty(key) == null)
1301: throw new KeyNotFoundException();
1302: String value = (String) _keysAndValues.remove(key);
1303: commit();
1304: return value;
1305: }
1306: }
1307: }
|