001: /*
002: * This file is part of DrFTPD, Distributed FTP Daemon.
003: *
004: * DrFTPD is free software; you can redistribute it and/or modify
005: * it under the terms of the GNU General Public License as published by
006: * the Free Software Foundation; either version 2 of the License, or
007: * (at your option) any later version.
008: *
009: * DrFTPD is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012: * GNU General Public License for more details.
013: *
014: * You should have received a copy of the GNU General Public License
015: * along with DrFTPD; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */
018: package org.drftpd.master;
019:
020: import java.beans.XMLDecoder;
021: import java.io.File;
022: import java.io.FileInputStream;
023: import java.io.FileNotFoundException;
024: import java.io.IOException;
025: import java.io.ObjectInputStream;
026: import java.io.ObjectOutputStream;
027: import java.io.PrintWriter;
028: import java.net.ServerSocket;
029: import java.net.Socket;
030: import java.util.ArrayList;
031: import java.util.Collection;
032: import java.util.Collections;
033: import java.util.HashMap;
034: import java.util.HashSet;
035: import java.util.Iterator;
036: import java.util.List;
037: import java.util.ListIterator;
038: import java.util.Properties;
039: import java.util.Set;
040: import java.util.concurrent.BlockingQueue;
041: import java.util.concurrent.LinkedBlockingQueue;
042:
043: import javax.net.ssl.SSLSocket;
044:
045: import net.sf.drftpd.FatalException;
046: import net.sf.drftpd.NoAvailableSlaveException;
047: import net.sf.drftpd.ObjectNotFoundException;
048: import net.sf.drftpd.SlaveUnavailableException;
049: import net.sf.drftpd.master.SlaveFileException;
050:
051: import org.apache.log4j.Logger;
052: import org.drftpd.GlobalContext;
053: import org.drftpd.PropertyHelper;
054: import org.drftpd.SSLGetContext;
055: import org.drftpd.io.SafeFileOutputStream;
056: import org.drftpd.remotefile.LinkedRemoteFile;
057: import org.drftpd.remotefile.LinkedRemoteFileInterface;
058: import org.drftpd.remotefile.MLSTSerialize;
059: import org.drftpd.slave.RemoteIOException;
060: import org.drftpd.slave.SlaveStatus;
061: import org.drftpd.slave.async.AsyncCommandArgument;
062: import org.drftpd.usermanager.UserFileException;
063:
064: /**
065: * @author mog
066: * @version $Id: SlaveManager.java 1562 2007-01-05 12:37:07Z zubov $
067: */
068: public class SlaveManager implements Runnable {
069: private static final Logger logger = Logger
070: .getLogger(SlaveManager.class.getName());
071:
072: private static final String slavePath = "slaves/";
073:
074: private static final File slavePathFile = new File(slavePath);
075:
076: private static final int socketTimeout = 10000; // 10 seconds, for Socket
077:
078: protected static final int actualTimeout = 60000; // one minute, evaluated on a SocketTimeout
079:
080: protected GlobalContext _gctx;
081:
082: protected List<RemoteSlave> _rslaves = new ArrayList<RemoteSlave>();
083:
084: private int _port;
085:
086: protected ServerSocket _serverSocket;
087:
088: private LinkedBlockingQueue<RemergeMessage> _remergeQueue = new LinkedBlockingQueue<RemergeMessage>();
089:
090: private RemergeThread _remergeThread;
091:
092: private boolean _sslSlaves;
093:
094: public SlaveManager(Properties p, GlobalContext gctx)
095: throws SlaveFileException {
096: this ();
097: _gctx = gctx;
098: _port = Integer.parseInt(PropertyHelper.getProperty(p,
099: "master.bindport"));
100: _sslSlaves = p.getProperty("master.slaveSSL", "false")
101: .equalsIgnoreCase("true");
102: loadSlaves();
103: }
104:
105: /**
106: * For JUnit tests
107: */
108: public SlaveManager() {
109: }
110:
111: private void loadSlaves() throws SlaveFileException {
112: if (!slavePathFile.exists() && !slavePathFile.mkdirs()) {
113: throw new SlaveFileException(new IOException(
114: "Error creating directories: " + slavePathFile));
115: }
116:
117: String[] slavepaths = slavePathFile.list();
118:
119: for (int i = 0; i < slavepaths.length; i++) {
120: String slavepath = slavepaths[i];
121:
122: if (!slavepath.endsWith(".xml")) {
123: continue;
124: }
125:
126: String slavename = slavepath.substring(0, slavepath
127: .length()
128: - ".xml".length());
129:
130: try {
131: getSlaveByNameUnchecked(slavename);
132: } catch (ObjectNotFoundException e) {
133: throw new SlaveFileException(e);
134: }
135:
136: // throws IOException
137: }
138:
139: Collections.sort(_rslaves);
140: }
141:
142: public void newSlave(String slavename) {
143: addSlave(new RemoteSlave(slavename, getGlobalContext()));
144: }
145:
146: public void addSlave(RemoteSlave rslave) {
147: _rslaves.add(rslave);
148: Collections.sort(_rslaves);
149: }
150:
151: private RemoteSlave getSlaveByNameUnchecked(String slavename)
152: throws ObjectNotFoundException {
153: if (slavename == null) {
154: throw new NullPointerException();
155: }
156:
157: RemoteSlave rslave = null;
158:
159: try {
160: XMLDecoder in = new XMLDecoder(new FileInputStream(
161: getSlaveFile(slavename)));
162:
163: rslave = (RemoteSlave) in.readObject();
164: in.close();
165: rslave.init(getGlobalContext());
166:
167: if (rslave.getName().equals(slavename)) {
168: _rslaves.add(rslave);
169: return rslave;
170: }
171: logger
172: .warn(
173: "Tried to lookup a slave with the same name, different case",
174: new Throwable());
175: throw new ObjectNotFoundException();
176: } catch (FileNotFoundException e) {
177: throw new ObjectNotFoundException(e);
178: } catch (Exception e) {
179: throw new FatalException("Error loading " + slavename, e);
180: }
181: }
182:
183: protected File getSlaveFile(String slavename) {
184: return new File(slavePath + slavename + ".xml");
185: }
186:
187: protected void addShutdownHook() {
188: //add shutdown hook last
189: Runtime.getRuntime().addShutdownHook(new Thread() {
190: public void run() {
191: logger.info("Running shutdown hook");
192: for (RemoteSlave rslave : _rslaves) {
193: rslave.shutdown();
194: }
195: saveFilelist();
196:
197: try {
198: getGlobalContext().getConnectionManager()
199: .getGlobalContext().getUserManager()
200: .saveAll();
201: } catch (UserFileException e) {
202: logger.warn("", e);
203: }
204: }
205: });
206: }
207:
208: public void delSlave(String slaveName) {
209: RemoteSlave rslave = null;
210:
211: try {
212: rslave = getRemoteSlave(slaveName);
213: getSlaveFile(rslave.getName()).delete();
214: rslave.setOffline("Slave has been deleted");
215: _rslaves.remove(rslave);
216: getGlobalContext().getRoot().unmergeDir(rslave);
217: } catch (ObjectNotFoundException e) {
218: throw new IllegalArgumentException("Slave not found");
219: }
220: }
221:
222: public HashSet<RemoteSlave> findSlavesBySpace(int numOfSlaves,
223: Set exemptSlaves, boolean ascending) {
224: Collection<RemoteSlave> slaveList = getSlaves();
225: HashMap<Long, RemoteSlave> map = new HashMap<Long, RemoteSlave>();
226:
227: for (Iterator<RemoteSlave> iter = slaveList.iterator(); iter
228: .hasNext();) {
229: RemoteSlave rslave = iter.next();
230:
231: if (exemptSlaves.contains(rslave)) {
232: continue;
233: }
234:
235: Long size;
236:
237: try {
238: size = new Long(rslave.getSlaveStatusAvailable()
239: .getDiskSpaceAvailable());
240: } catch (SlaveUnavailableException e) {
241: continue;
242: }
243:
244: map.put(size, rslave);
245: }
246:
247: ArrayList sorted = new ArrayList(map.keySet());
248:
249: if (ascending) {
250: Collections.sort(sorted);
251: } else {
252: Collections.sort(sorted, Collections.reverseOrder());
253: }
254:
255: HashSet<RemoteSlave> returnMe = new HashSet<RemoteSlave>();
256:
257: for (ListIterator iter = sorted.listIterator(); iter.hasNext();) {
258: if (iter.nextIndex() == numOfSlaves) {
259: break;
260: }
261:
262: Long key = (Long) iter.next();
263: RemoteSlave rslave = (RemoteSlave) map.get(key);
264: returnMe.add(rslave);
265: }
266:
267: return returnMe;
268: }
269:
270: public RemoteSlave findSmallestFreeSlave() {
271: Collection slaveList = getGlobalContext()
272: .getConnectionManager().getGlobalContext()
273: .getSlaveManager().getSlaves();
274: long smallSize = Integer.MAX_VALUE;
275: RemoteSlave smallSlave = null;
276:
277: for (Iterator iter = slaveList.iterator(); iter.hasNext();) {
278: RemoteSlave rslave = (RemoteSlave) iter.next();
279: long size = Integer.MAX_VALUE;
280:
281: try {
282: size = rslave.getSlaveStatusAvailable()
283: .getDiskSpaceAvailable();
284: } catch (SlaveUnavailableException e) {
285: continue;
286: }
287:
288: if (size < smallSize) {
289: smallSize = size;
290: smallSlave = rslave;
291: }
292: }
293:
294: return smallSlave;
295: }
296:
297: /**
298: * Not cached at all since RemoteSlave objects cache their SlaveStatus
299: */
300: public SlaveStatus getAllStatus() {
301: SlaveStatus allStatus = new SlaveStatus();
302:
303: for (Iterator iter = getSlaves().iterator(); iter.hasNext();) {
304: RemoteSlave rslave = (RemoteSlave) iter.next();
305:
306: try {
307: allStatus = allStatus.append(rslave
308: .getSlaveStatusAvailable());
309: } catch (SlaveUnavailableException e) {
310: //slave is offline, continue
311: }
312: }
313:
314: return allStatus;
315: }
316:
317: public HashMap getAllStatusArray() {
318: //SlaveStatus[] ret = new SlaveStatus[getSlaves().size()];
319: HashMap ret = new HashMap(getSlaves().size());
320:
321: for (Iterator<RemoteSlave> iter = getSlaves().iterator(); iter
322: .hasNext();) {
323: RemoteSlave rslave = iter.next();
324:
325: try {
326: ret.put(rslave.getName(), rslave.getSlaveStatus());
327: } catch (SlaveUnavailableException e) {
328: ret.put(rslave.getName(), (Object) null);
329: }
330: }
331:
332: return ret;
333: }
334:
335: // private Random rand = new Random();
336: // public RemoteSlave getASlave() {
337: // ArrayList retSlaves = new ArrayList();
338: // for (Iterator iter = this.rslaves.iterator(); iter.hasNext();) {
339: // RemoteSlave rslave = (RemoteSlave) iter.next();
340: // if (!rslave.isAvailable())
341: // continue;
342: // retSlaves.add(rslave);
343: // }
344: //
345: // int num = rand.nextInt(retSlaves.size());
346: // logger.fine(
347: // "Slave "
348: // + num
349: // + " selected out of "
350: // + retSlaves.size()
351: // + " available slaves");
352: // return (RemoteSlave) retSlaves.get(num);
353: // }
354: /**
355: * Returns a modifiable list of available RemoteSlave's
356: */
357: public Collection<RemoteSlave> getAvailableSlaves()
358: throws NoAvailableSlaveException {
359: ArrayList<RemoteSlave> availableSlaves = new ArrayList<RemoteSlave>();
360:
361: for (Iterator<RemoteSlave> iter = getSlaves().iterator(); iter
362: .hasNext();) {
363: RemoteSlave rslave = iter.next();
364:
365: if (!rslave.isAvailable()) {
366: continue;
367: }
368:
369: availableSlaves.add(rslave);
370: }
371:
372: if (availableSlaves.isEmpty()) {
373: throw new NoAvailableSlaveException("No slaves online");
374: }
375:
376: return availableSlaves;
377: }
378:
379: public GlobalContext getGlobalContext() {
380: if (_gctx == null) {
381: throw new NullPointerException();
382: }
383:
384: return _gctx;
385: }
386:
387: public RemoteSlave getRemoteSlave(String s)
388: throws ObjectNotFoundException {
389: for (Iterator<RemoteSlave> iter = getSlaves().iterator(); iter
390: .hasNext();) {
391: RemoteSlave rslave = iter.next();
392:
393: if (rslave.getName().equals(s)) {
394: return rslave;
395: }
396: }
397:
398: return getSlaveByNameUnchecked(s);
399: }
400:
401: public List<RemoteSlave> getSlaves() {
402: if (_rslaves == null) {
403: throw new NullPointerException();
404: }
405:
406: return Collections.unmodifiableList(_rslaves);
407: }
408:
409: /**
410: * Returns true if one or more slaves are online, false otherwise.
411: *
412: * @return true if one or more slaves are online, false otherwise.
413: */
414: public boolean hasAvailableSlaves() {
415: for (Iterator<RemoteSlave> iter = _rslaves.iterator(); iter
416: .hasNext();) {
417: if (iter.next().isAvailable()) {
418: return true;
419: }
420: }
421: return false;
422: }
423:
424: public void saveFilelist() {
425: try {
426: PrintWriter out = new PrintWriter(new SafeFileOutputStream(
427: "files.mlst"));
428:
429: try {
430: MLSTSerialize.serialize(getGlobalContext()
431: .getConnectionManager().getGlobalContext()
432: .getRoot(), out);
433: } finally {
434: out.close();
435: logger.info("Done saving filelist");
436: }
437: } catch (IOException e) {
438: logger.warn("Error saving files.mlst", e);
439: }
440: }
441:
442: public void run() {
443: try {
444: if (_sslSlaves) {
445: _serverSocket = SSLGetContext.getSSLContext()
446: .getServerSocketFactory().createServerSocket(
447: _port);
448: } else {
449: _serverSocket = new ServerSocket(_port);
450: }
451: //_serverSocket.setReuseAddress(true);
452: logger.info("Listening for slaves on port " + _port);
453: } catch (Exception e) {
454: throw new FatalException(e);
455: }
456:
457: Socket socket = null;
458:
459: while (true) {
460: RemoteSlave rslave = null;
461: ObjectInputStream in = null;
462: ObjectOutputStream out = null;
463:
464: try {
465: socket = _serverSocket.accept();
466: socket.setSoTimeout(socketTimeout);
467: if (socket instanceof SSLSocket) {
468: ((SSLSocket) socket).setUseClientMode(false);
469: ((SSLSocket) socket).startHandshake();
470: }
471: logger.debug("Slave connected from "
472: + socket.getRemoteSocketAddress());
473:
474: in = new ObjectInputStream(socket.getInputStream());
475: out = new ObjectOutputStream(socket.getOutputStream());
476:
477: String slavename = RemoteSlave
478: .getSlaveNameFromObjectInput(in);
479:
480: try {
481: rslave = getRemoteSlave(slavename);
482: } catch (ObjectNotFoundException e) {
483: out
484: .writeObject(new AsyncCommandArgument(
485: "error",
486: "error",
487: slavename
488: + " does not exist, use \"site addslave\""));
489: logger.info("Slave " + slavename
490: + " does not exist, use \"site addslave\"");
491: socket.close();
492: continue;
493: }
494:
495: if (rslave.isOnline()) {
496: out.writeObject(new AsyncCommandArgument("",
497: "error", "Already online"));
498: out.flush();
499: socket.close();
500: throw new IOException("Already online");
501: }
502: } catch (Exception e) {
503: if (socket != null) {
504: try {
505: socket.close();
506: } catch (IOException e1) {
507: }
508: }
509:
510: logger.error("", e);
511:
512: continue;
513: }
514:
515: try {
516: if (!rslave.checkConnect(socket)) {
517: out.writeObject(new AsyncCommandArgument("",
518: "error", socket.getInetAddress()
519: + " is not a valid mask for "
520: + rslave.getName()));
521: logger.error(socket.getInetAddress()
522: + " is not a valid ip for "
523: + rslave.getName());
524: socket.close();
525:
526: continue;
527: }
528:
529: rslave.connect(socket, in, out);
530: } catch (Exception e) {
531: rslave.setOffline(e);
532: logger.error(e);
533: } catch (Throwable t) {
534: logger.error("FATAL: Throwable in SalveManager loop");
535: }
536: }
537: }
538:
539: public BlockingQueue<RemergeMessage> getRemergeQueue() {
540: return _remergeQueue;
541: }
542:
543: /**
544: * @param message
545: */
546: public void putRemergeQueue(RemergeMessage message) {
547: try {
548: _remergeQueue.put(message);
549: } catch (InterruptedException e) {
550: throw new RuntimeException(e);
551: }
552: if (_remergeThread == null || !_remergeThread.isAlive()) {
553: _remergeThread = new RemergeThread(getGlobalContext());
554: _remergeThread.start();
555: }
556: }
557:
558: /**
559: * Cancels all transfers in directory
560: */
561: public void cancelTransfersInDirectory(LinkedRemoteFileInterface dir) {
562: if (!dir.isDirectory()) {
563: throw new IllegalArgumentException(dir
564: + " is not a directory");
565: }
566: for (RemoteSlave rs : getSlaves()) {
567: try {
568: for (RemoteTransfer rt : rs.getTransfers()) {
569: String path = rt.getPathNull();
570: if (path != null) {
571: if (path.startsWith(dir.getPath())) {
572: rt.abort("Directory is nuked");
573: }
574: }
575: }
576: } catch (SlaveUnavailableException ignore) {
577: }
578: }
579: }
580:
581: /**
582: * Accepts files and directories and does the physical deletes asynchronously
583: * Waits for a response and handles errors on each slave
584: * Use RemoteSlave.simpleDelete(path) if you want to just delete one file
585: * @param file
586: */
587: public void deleteOnAllSlaves(LinkedRemoteFile file) {
588: HashMap<RemoteSlave, String> slaveMap = new HashMap<RemoteSlave, String>();
589: List<RemoteSlave> slaves = null;
590: if (file.isFile()) {
591: slaves = file.getSlaves();
592: } else {
593: slaves = new ArrayList<RemoteSlave>(_rslaves);
594: }
595: for (RemoteSlave rslave : slaves) {
596: String index = null;
597: try {
598: index = rslave.issueDeleteToSlave(file.getPath());
599: slaveMap.put(rslave, index);
600: } catch (SlaveUnavailableException e) {
601: rslave.addQueueDelete(file.getPath());
602: }
603: }
604: for (RemoteSlave rslave : slaveMap.keySet()) {
605: String index = slaveMap.get(rslave);
606: try {
607: rslave.fetchResponse(index, 300000);
608: } catch (SlaveUnavailableException e) {
609: rslave.addQueueDelete(file.getPath());
610: } catch (RemoteIOException e) {
611: if (e.getCause() instanceof FileNotFoundException) {
612: continue;
613: }
614: rslave
615: .setOffline("IOException deleting file, check logs for specific error");
616: rslave.addQueueDelete(file.getPath());
617: logger
618: .error(
619: "IOException deleting file, file will be deleted when slave comes online",
620: e);
621: rslave.addQueueDelete(file.getPath());
622: }
623: }
624: }
625:
626: public void renameOnAllSlaves(String fromPath, String toDirPath,
627: String toName) {
628: synchronized (this ) {
629: for (RemoteSlave rslave : _rslaves) {
630: rslave.simpleRename(fromPath, toDirPath, toName);
631: }
632: }
633: }
634: }
635:
636: class RemergeThread extends Thread {
637: private static final Logger logger = Logger
638: .getLogger(RemergeThread.class);
639:
640: private GlobalContext _gctx;
641:
642: public RemergeThread(GlobalContext gctx) {
643: super ("RemergeThread");
644: _gctx = gctx;
645: }
646:
647: public void run() {
648: while (true) {
649: RemergeMessage msg;
650: try {
651: msg = getGlobalContext().getSlaveManager()
652: .getRemergeQueue().take();
653: } catch (InterruptedException e) {
654: logger.info("", e);
655: continue;
656: }
657:
658: if (msg.isCompleted()) {
659: continue;
660: }
661:
662: LinkedRemoteFileInterface lrf;
663:
664: try {
665: lrf = getGlobalContext().getRoot().lookupFile(
666: msg.getDirectory());
667: } catch (FileNotFoundException e1) {
668: lrf = getGlobalContext().getRoot().createDirectories(
669: msg.getDirectory());
670: }
671:
672: try {
673: lrf.remerge(msg.getFiles(), msg.getRslave());
674: } catch (IOException e2) {
675: logger.error("IOException during remerge", e2);
676: msg.getRslave()
677: .setOffline("IOException during remerge");
678: }
679: }
680: }
681:
682: private GlobalContext getGlobalContext() {
683: return _gctx;
684: }
685: }
|