001: /*
002: * This file is part of DrFTPD, Distributed FTP Daemon.
003: *
004: * DrFTPD is free software; you can redistribute it and/or modify it under the
005: * terms of the GNU General Public License as published by the Free Software
006: * Foundation; either version 2 of the License, or (at your option) any later
007: * version.
008: *
009: * DrFTPD is distributed in the hope that it will be useful, but WITHOUT ANY
010: * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
011: * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
012: *
013: * You should have received a copy of the GNU General Public License along with
014: * DrFTPD; if not, write to the Free Software Foundation, Inc., 59 Temple Place,
015: * Suite 330, Boston, MA 02111-1307 USA
016: */
017: package net.sf.drftpd.mirroring;
018:
019: import java.io.FileNotFoundException;
020: import java.net.SocketException;
021: import java.util.Collection;
022: import java.util.Collections;
023: import java.util.HashSet;
024: import java.util.Iterator;
025: import java.util.Set;
026:
027: import net.sf.drftpd.FileExistsException;
028: import net.sf.drftpd.NoAvailableSlaveException;
029: import net.sf.drftpd.SlaveUnavailableException;
030:
031: import org.apache.log4j.Logger;
032: import org.drftpd.master.RemoteSlave;
033: import org.drftpd.remotefile.LinkedRemoteFileInterface;
034: import org.drftpd.slave.RemoteIOException;
035:
036: /**
037: * @author zubov
038: * @author mog
039: * @version $Id: Job.java 1482 2006-06-16 04:50:07Z fr0w $
040: */
041: public class Job {
042: private static long jobIndexCount = 1;
043: private static final Logger logger = Logger.getLogger(Job.class);
044: private Set<RemoteSlave> _destSlaves;
045: private LinkedRemoteFileInterface _file;
046: private int _priority;
047: private SlaveTransfer _slaveTransfer;
048: private long _timeCreated;
049: private long _timeSpent;
050: private int _transferNum;
051: private long _index;
052: private boolean _onlyCountOnlineSlaves;
053:
054: public Job(LinkedRemoteFileInterface file,
055: Collection<RemoteSlave> destSlaves, int priority,
056: int transferNum) {
057: this (file, destSlaves, priority, transferNum, false);
058: }
059:
060: public Job(LinkedRemoteFileInterface file,
061: Collection<RemoteSlave> destSlaves, int priority,
062: int transferNum, boolean onlyCountOnlineSlaves) {
063: _index = jobIndexCount++;
064: _destSlaves = new HashSet<RemoteSlave>(destSlaves);
065: _file = file;
066: _priority = priority;
067: _timeCreated = System.currentTimeMillis();
068: _timeSpent = 0;
069: _transferNum = transferNum;
070: _slaveTransfer = null;
071: _onlyCountOnlineSlaves = onlyCountOnlineSlaves;
072: if (_transferNum > destSlaves.size()) {
073: throw new IllegalArgumentException(
074: "transferNum cannot be greater than destSlaves.size()");
075: }
076:
077: if (_transferNum <= 0) {
078: throw new IllegalArgumentException(
079: "transferNum must be greater than 0");
080: }
081: }
082:
083: public void addTimeSpent(long time) {
084: _timeSpent += time;
085: }
086:
087: /**
088: * Returns an unmodifiable List of slaves that can be used with.
089: * {@see net.sf.drftpd.master.SlaveManagerImpl#getASlave(Collection, char, FtpConfig)}
090: */
091: public Set<RemoteSlave> getDestinationSlaves() {
092: if (_onlyCountOnlineSlaves) {
093: HashSet<RemoteSlave> onlineDestinationSlaves = new HashSet<RemoteSlave>();
094: for (RemoteSlave rslave : new HashSet<RemoteSlave>(
095: _destSlaves)) {
096: if (rslave.isAvailable()) {
097: onlineDestinationSlaves.add(rslave);
098: }
099: }
100: return onlineDestinationSlaves;
101: }
102: return Collections.unmodifiableSet(_destSlaves);
103: }
104:
105: /**
106: * Returns the file for this job. This file is used to tell the slaves what
107: * file to transfer & receive.
108: */
109: public LinkedRemoteFileInterface getFile() {
110: return _file;
111: }
112:
113: /**
114: * Returns the priority of this job.
115: */
116: public int getPriority() {
117: return _priority;
118: }
119:
120: public synchronized long getProgress() {
121: if (!isTransferring()) {
122: throw new IllegalStateException(this
123: + " is not transferring");
124: }
125:
126: return _slaveTransfer.getTransfered();
127: }
128:
129: public synchronized long getSpeed() {
130: if (!isTransferring()) {
131: throw new IllegalStateException(this
132: + " is not transferring");
133: }
134:
135: return _slaveTransfer.getXferSpeed();
136: }
137:
138: public synchronized RemoteSlave getSourceSlave() {
139: if (!isTransferring()) {
140: throw new IllegalStateException(this
141: + " is not transferring");
142: }
143: return _slaveTransfer.getSourceSlave();
144: }
145:
146: /**
147: * This is the time that the job was created
148: */
149: public long getTimeCreated() {
150: return _timeCreated;
151: }
152:
153: /**
154: * This is the amount of time spent processing this job
155: */
156: public long getTimeSpent() {
157: return _timeSpent;
158: }
159:
160: /**
161: * returns true if this job has nothing more to send
162: */
163: public boolean isDone() {
164: return _transferNum < 1;
165: }
166:
167: public boolean isTransferring() {
168: return _slaveTransfer != null;
169: }
170:
171: private String outputDestinationSlaves() {
172: String toReturn = "";
173:
174: for (RemoteSlave rslave : new HashSet<RemoteSlave>(_destSlaves)) {
175: toReturn = toReturn + rslave.getName() + ",";
176: }
177: if (!toReturn.equals("")) {
178: return toReturn.substring(0, toReturn.length() - 1);
179: }
180: return null;
181: }
182:
183: private synchronized void reset() {
184: if (_slaveTransfer != null) {
185: _slaveTransfer.abort("Resetting slave2slave Transfer");
186: _slaveTransfer = null;
187: }
188: }
189:
190: public synchronized void sentToSlave(RemoteSlave slave) {
191: if (_destSlaves.remove(slave)) {
192: _transferNum--;
193: } else {
194: throw new IllegalArgumentException("Slave "
195: + slave.getName()
196: + " does not exist as a destination slave for job "
197: + this );
198: }
199:
200: if (_destSlaves.isEmpty() && (_transferNum > 0)) {
201: throw new IllegalStateException(
202: "Job cannot have a destSlaveSet of size 0 with transferNum > 0");
203: }
204: }
205:
206: public void setDone() {
207: _transferNum = 0;
208: }
209:
210: public String toString() {
211: return "Job[index=" + _index + "][file=" + getFile().getPath()
212: + ",dest=[" + outputDestinationSlaves()
213: + "],transferNum=" + _transferNum + ",priority="
214: + getPriority() + "]";
215: }
216:
217: /**
218: * Returns true if transfer was completed successfully
219: *
220: * @param checkCRC
221: * @param sourceSlave
222: * @param destSlave
223: * @return
224: */
225:
226: public void transfer(boolean checkCRC, boolean secureTransfer,
227: RemoteSlave sourceSlave, RemoteSlave destSlave) {
228: synchronized (this ) {
229: if (_slaveTransfer != null) {
230: throw new IllegalStateException(
231: "Job is already transferring");
232: }
233: if (getFile().getSlaves().contains(destSlave)) {
234: throw new IllegalStateException(
235: "File already exists on target slave");
236: }
237: _slaveTransfer = new SlaveTransfer(getFile(), sourceSlave,
238: destSlave, secureTransfer);
239: }
240:
241: logger.info("Sending " + getFile().getName() + " from "
242: + sourceSlave.getName() + " to " + destSlave.getName());
243: long startTime = System.currentTimeMillis();
244: try {
245: boolean crcMatch = _slaveTransfer.transfer();
246: if (crcMatch || !checkCRC) {
247: logSuccess();
248: } else {
249: destSlave.simpleDelete(getFile().getPath());
250: logger.debug("CRC did not match for " + getFile()
251: + " when sending from " + sourceSlave.getName()
252: + " to " + destSlave.getName());
253: }
254: } catch (DestinationSlaveException e) {
255: if (e.getCause() instanceof FileExistsException) {
256: logger.debug("Caught FileExistsException in sending "
257: + getFile().getName() + " from "
258: + sourceSlave.getName() + " to "
259: + destSlave.getName(), e);
260: long remoteChecksum = 0;
261: long localChecksum = 0;
262:
263: try {
264: String index = destSlave
265: .issueChecksumToSlave(getFile().getPath());
266: remoteChecksum = destSlave
267: .fetchChecksumFromIndex(index);
268: } catch (SlaveUnavailableException e2) {
269: logger.debug("SlaveUnavailableException from ", e2);
270: destSlave.simpleDelete(getFile().getPath());
271: return;
272: } catch (RemoteIOException e3) {
273: logger.debug("RemoteIOException from ", e3);
274: destSlave.simpleDelete(getFile().getPath());
275: return;
276: }
277:
278: try {
279: localChecksum = getFile().getCheckSum();
280: } catch (NoAvailableSlaveException e4) {
281: // File exists locally, but I can't verify it's checksum
282: // Accept the new one since there's no cached checksum
283: logger
284: .debug("Accepting file because there's no local checksum");
285: // successful transfer
286: getFile().setCheckSum(remoteChecksum);
287: logSuccess();
288: return;
289: }
290: if (remoteChecksum == localChecksum) {
291: logger
292: .debug("Accepting file because the crc's match");
293: // successful transfer
294: logSuccess();
295: } else {
296: logger
297: .debug("Checksum did not match, removing offending file");
298: destSlave.simpleDelete(getFile().getPath());
299: }
300: return;
301: } else {
302: logger
303: .error(
304: "Error on slave during slave2slave transfer",
305: e);
306: }
307: destSlave.simpleDelete(getFile().getPath());
308: } catch (SourceSlaveException e) {
309: if (e.getCause() instanceof FileNotFoundException) {
310: logger.warn("Caught FileNotFoundException in sending "
311: + getFile().getName() + " from "
312: + sourceSlave.getName() + " to "
313: + destSlave.getName(), e);
314: getFile().removeSlave(sourceSlave);
315: return;
316: } else {
317: logger
318: .error(
319: "Error on slave during slave2slave transfer",
320: e);
321: }
322: } catch (SlaveException e) {
323: throw new RuntimeException(
324: "SlaveException was not of type DestinationSlaveException or SourceSlaveException");
325: } finally {
326: addTimeSpent(System.currentTimeMillis() - startTime);
327: reset();
328: }
329: }
330:
331: private void logSuccess() {
332: getFile().addSlave(getDestinationSlave());
333: logger.debug("Sent file " + getFile().getName() + " from "
334: + getSourceSlave().getName() + " to "
335: + getDestinationSlave().getName());
336: sentToSlave(getDestinationSlave());
337: }
338:
339: public synchronized RemoteSlave getDestinationSlave() {
340: if (!isTransferring()) {
341: throw new IllegalStateException(this
342: + " is not transferring");
343: }
344: return _slaveTransfer.getDestinationSlave();
345: }
346:
347: /* (non-Javadoc)
348: * @see java.lang.Object#equals(java.lang.Object)
349: */
350: public boolean equals(Object arg0) {
351: if (arg0 instanceof Job) {
352: return _file == ((Job) arg0)._file;
353: }
354: return super .equals(arg0);
355: }
356:
357: public long getIndex() {
358: return _index;
359: }
360: }
|