001: /*
002: * This file is part of DrFTPD, Distributed FTP Daemon.
003: *
004: * DrFTPD is free software; you can redistribute it and/or modify
005: * it under the terms of the GNU General Public License as published by
006: * the Free Software Foundation; either version 2 of the License, or
007: * (at your option) any later version.
008: *
009: * DrFTPD is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012: * GNU General Public License for more details.
013: *
014: * You should have received a copy of the GNU General Public License
015: * along with DrFTPD; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */
018: package org.drftpd.slave;
019:
020: import java.io.EOFException;
021: import java.io.FileInputStream;
022: import java.io.IOException;
023: import java.io.ObjectInputStream;
024: import java.io.ObjectOutputStream;
025: import java.net.InetAddress;
026: import java.net.InetSocketAddress;
027: import java.net.Socket;
028: import java.util.HashMap;
029: import java.util.Properties;
030:
031: import net.sf.drftpd.util.PortRange;
032:
033: import org.apache.log4j.BasicConfigurator;
034: import org.apache.log4j.Logger;
035: import org.drftpd.PropertyHelper;
036: import org.drftpd.SSLGetContext;
037: import org.drftpd.remotefile.CaseInsensitiveHashtable;
038: import org.drftpd.remotefile.LightRemoteFile;
039: import org.drftpd.slave.async.AsyncCommand;
040: import org.drftpd.slave.async.AsyncCommandArgument;
041: import org.drftpd.slave.async.AsyncResponse;
042: import org.drftpd.slave.async.AsyncResponseDiskStatus;
043: import org.drftpd.slave.async.AsyncResponseException;
044: import org.drftpd.slave.async.AsyncResponseMaxPath;
045: import org.drftpd.slave.async.AsyncResponseRemerge;
046: import org.drftpd.slave.async.AsyncResponseTransferStatus;
047:
048: /**
049: * @author zubov
050: * @version $Id$
051: */
052: public class TestSlave extends Slave {
053: private Socket _s;
054: private ObjectInputStream _sin;
055: private ObjectOutputStream _sout;
056: private static final Logger logger = Logger
057: .getLogger(TestSlave.class);
058:
059: public TestSlave(Properties p) throws IOException {
060: InetSocketAddress addr = new InetSocketAddress(PropertyHelper
061: .getProperty(p, "master.host"), Integer
062: .parseInt(PropertyHelper.getProperty(p,
063: "master.bindport")));
064: logger.info("Connecting to master at " + addr);
065:
066: String slavename = PropertyHelper.getProperty(p, "slave.name");
067:
068: _s = new Socket();
069: _s.connect(addr);
070:
071: _sout = new ObjectOutputStream(_s.getOutputStream());
072: _sin = new ObjectInputStream(_s.getInputStream());
073:
074: // TODO sendReply()
075: _sout.writeObject(slavename);
076: _sout.flush();
077:
078: }
079:
080: protected synchronized void sendResponse(AsyncResponse response) {
081: if (response == null) {
082: // handler doesn't return anything or it sends reply on it's own
083: // (threaded for example)
084: return;
085: }
086:
087: try {
088: _sout.writeObject(response);
089: _sout.flush();
090: if (!(response instanceof AsyncResponseTransferStatus)) {
091: logger.debug("Slave wrote response - " + response);
092: }
093:
094: if (response instanceof AsyncResponseException) {
095: logger.debug("", ((AsyncResponseException) response)
096: .getThrowable());
097: }
098: } catch (IOException e) {
099: throw new RuntimeException(e);
100: }
101: }
102:
103: public static void main(String[] args) throws Exception {
104: BasicConfigurator.configure();
105: System.out
106: .println("DrFTPD Slave starting, further logging will be done through log4j");
107:
108: Properties p = new Properties();
109: p.put("master.host", "localhost");
110: p.put("master.bindport", "1099");
111: p.put("slave.name", "testslave");
112:
113: TestSlave s = new TestSlave(p);
114: s
115: .sendResponse(new AsyncResponseDiskStatus(
116: new DiskStatus(0, 0)));
117: s.listenForCommands();
118: }
119:
120: private void listenForCommands() throws IOException {
121: while (true) {
122: AsyncCommand ac;
123:
124: try {
125: ac = (AsyncCommand) _sin.readObject();
126:
127: if (ac == null) {
128: continue;
129: }
130: } catch (ClassNotFoundException e) {
131: throw new RuntimeException(e);
132: } catch (EOFException e) {
133: logger.debug("Master shutdown or went offline");
134: return;
135: }
136:
137: logger.debug("Slave fetched " + ac);
138: class AsyncCommandHandler implements Runnable {
139: private AsyncCommand _command = null;
140:
141: public AsyncCommandHandler(AsyncCommand command) {
142: _command = command;
143: }
144:
145: public void run() {
146: try {
147: sendResponse(handleCommand(_command));
148: } catch (Throwable e) {
149: sendResponse(new AsyncResponseException(
150: _command.getIndex(), e));
151: }
152: }
153: }
154: new Thread(new AsyncCommandHandler(ac)).start();
155: }
156: }
157:
158: private AsyncResponse handleCommand(AsyncCommand ac) {
159: if (ac.getName().equals("remerge")) {
160: CaseInsensitiveHashtable mergeFiles = new CaseInsensitiveHashtable();
161: mergeFiles.put("ps2dvd", new LightRemoteFile("ps2dvd",
162: System.currentTimeMillis()));
163: sendResponse(new AsyncResponseRemerge("", mergeFiles));
164: mergeFiles.clear();
165: mergeFiles.put("c4testsagain", new LightRemoteFile(
166: "c4testsagain", System.currentTimeMillis(), 100));
167: sendResponse(new AsyncResponseRemerge("\\ps2dvd",
168: mergeFiles));
169: return new AsyncResponse(ac.getIndex());
170: }
171:
172: if (ac.getName().equals("delete")) {
173: return new AsyncResponse(ac.getIndex());
174: }
175:
176: if (ac.getName().equals("maxpath")) {
177: return new AsyncResponseMaxPath(ac.getIndex(), 255);
178: }
179:
180: if (ac.getName().equals("ping")) {
181: return new AsyncResponse(ac.getIndex());
182: }
183:
184: if (ac.getName().equals("rename")) {
185: return new AsyncResponse(ac.getIndex());
186: }
187:
188: if (ac.getName().equals("error")) {
189: throw new RuntimeException("error - " + ac);
190: }
191:
192: return new AsyncResponseException(ac.getIndex(), new Exception(
193: ac.getName() + " - Operation Not Supported"));
194: }
195:
196: }
|