001: /*
002: * This file is part of DrFTPD, Distributed FTP Daemon.
003: *
004: * DrFTPD is free software; you can redistribute it and/or modify it under the
005: * terms of the GNU General Public License as published by the Free Software
006: * Foundation; either version 2 of the License, or (at your option) any later
007: * version.
008: *
009: * DrFTPD is distributed in the hope that it will be useful, but WITHOUT ANY
010: * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
011: * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
012: *
013: * You should have received a copy of the GNU General Public License along with
014: * DrFTPD; if not, write to the Free Software Foundation, Inc., 59 Temple Place,
015: * Suite 330, Boston, MA 02111-1307 USA
016: */
017: package net.sf.drftpd.mirroring;
018:
019: import java.io.FileInputStream;
020: import java.io.IOException;
021: import java.util.ArrayList;
022: import java.util.Collection;
023: import java.util.Collections;
024: import java.util.HashSet;
025: import java.util.Iterator;
026: import java.util.List;
027: import java.util.Properties;
028: import java.util.Set;
029: import java.util.TimerTask;
030: import java.util.TreeSet;
031:
032: import net.sf.drftpd.NoAvailableSlaveException;
033:
034: import org.apache.log4j.Logger;
035: import org.drftpd.GlobalContext;
036: import org.drftpd.PropertyHelper;
037: import org.drftpd.master.RemoteSlave;
038:
039: /**
040: * @author zubov
041: * @version $Id: JobManager.java 1482 2006-06-16 04:50:07Z fr0w $
042: */
043: public class JobManager {
044: private static final Logger logger = Logger
045: .getLogger(JobManager.class);
046:
047: private boolean _isStopped = false;
048:
049: private Set<Job> _queuedJobSet;
050:
051: private boolean _useCRC;
052:
053: private boolean _useSSL;
054:
055: private long _sleepSeconds;
056:
057: private GlobalContext _gctx;
058:
059: private TimerTask _runJob = null;
060:
061: /**
062: * Keeps track of all jobs and controls them
063: */
064: public JobManager(GlobalContext gctx) {
065: _gctx = gctx;
066: _queuedJobSet = new TreeSet<Job>(new JobComparator());
067: reload();
068: }
069:
070: public synchronized void addJobsToQueue(Collection<Job> jobs) {
071: ArrayList<Job> jobs2 = new ArrayList<Job>(jobs);
072: for (Iterator jobiter = jobs2.iterator(); jobiter.hasNext();) {
073: Job job = (Job) jobiter.next();
074: Collection<RemoteSlave> slaves = job.getFile().getSlaves();
075:
076: for (Iterator<RemoteSlave> iter = slaves.iterator(); iter
077: .hasNext();) {
078: RemoteSlave slave = iter.next();
079:
080: if (job.getDestinationSlaves().contains(slave)) {
081: job.sentToSlave(slave);
082: }
083: }
084: if (job.isDone()) {
085: jobiter.remove();
086: }
087: }
088: _queuedJobSet.addAll(jobs2);
089: }
090:
091: public synchronized void addJobToQueue(Job job) {
092: addJobsToQueue(Collections.singletonList(job));
093: }
094:
095: /**
096: * Gets all jobs.
097: */
098: public synchronized Set<Job> getAllJobsFromQueue() {
099: return Collections.unmodifiableSet(_queuedJobSet);
100: }
101:
102: public synchronized Job getNextJob(Set<RemoteSlave> busySlaves,
103: Set skipJobs) {
104: for (Iterator iter = _queuedJobSet.iterator(); iter.hasNext();) {
105: Job tempJob = (Job) iter.next();
106:
107: if (tempJob.isDone()) {
108: iter.remove();
109:
110: continue;
111: }
112:
113: if (tempJob.isTransferring()) {
114: continue;
115: }
116:
117: if (skipJobs.contains(tempJob)) {
118: continue;
119: }
120:
121: Collection availableSlaves = null;
122:
123: try {
124: availableSlaves = tempJob.getFile()
125: .getAvailableSlaves();
126: } catch (NoAvailableSlaveException e) {
127: if (tempJob.getFile().isDeleted()) {
128: tempJob.setDone();
129: iter.remove();
130: }
131:
132: continue; // can't transfer what isn't online
133: }
134:
135: if (!busySlaves.containsAll(availableSlaves)) {
136: return tempJob;
137: }
138: }
139:
140: return null;
141: }
142:
143: public boolean isStopped() {
144: return _isStopped;
145: }
146:
147: public void processJob() {
148: Job job = null;
149: RemoteSlave sourceSlave = null;
150: RemoteSlave destSlave = null;
151:
152: Collection<RemoteSlave> availableSlaves;
153: try {
154: availableSlaves = getGlobalContext().getSlaveManager()
155: .getAvailableSlaves();
156: } catch (NoAvailableSlaveException e1) {
157: return; // can't transfer with no slaves
158: }
159:
160: Set<RemoteSlave> busySlavesDown = new HashSet<RemoteSlave>();
161: Set<Job> skipJobs = new HashSet<Job>();
162:
163: synchronized (this ) {
164: while (!busySlavesDown.containsAll(availableSlaves)) {
165: job = getNextJob(busySlavesDown, skipJobs);
166:
167: if (job == null) {
168: return;
169: }
170:
171: // logger.debug("looking up slave for job " + job);
172: try {
173: sourceSlave = getGlobalContext()
174: .getSlaveSelectionManager()
175: .getASlaveForJobDownload(job);
176: } catch (NoAvailableSlaveException e) {
177: busySlavesDown.addAll(job.getFile().getSlaves());
178: continue;
179: }
180:
181: if (sourceSlave == null) {
182: logger
183: .debug("Unable to find a suitable job for transfer");
184: return;
185: }
186:
187: availableSlaves.removeAll(job.getFile().getSlaves());
188: try {
189: destSlave = getGlobalContext()
190: .getSlaveSelectionManager()
191: .getASlaveForJobUpload(job, sourceSlave);
192:
193: break; // we have a source slave and a destination
194: // slave,
195:
196: // transfer!
197: } catch (NoAvailableSlaveException e) {
198: // job was ready to be sent, but it had no slave that was
199: // ready to accept it
200: skipJobs.add(job);
201:
202: continue;
203: }
204: }
205: // sourceSlave will always be null if destSlave is null
206: if (destSlave == null /*|| sourceSlave == null*/) {
207: // all slaves are offline or busy
208: return;
209: }
210: }
211:
212: // job is not deleted, we are ready to process
213:
214: job.transfer(useCRC(), useSecureTransfers(), sourceSlave,
215: destSlave);
216: if (job.isDone()) {
217: logger.debug("Job is finished, removing job "
218: + job.getFile());
219: removeJobFromQueue(job);
220: }
221: }
222:
223: private GlobalContext getGlobalContext() {
224: return _gctx;
225: }
226:
227: public void reload() {
228: Properties p = new Properties();
229: FileInputStream fis = null;
230:
231: try {
232: fis = new FileInputStream("conf/jobmanager.conf");
233: p.load(fis);
234: } catch (IOException e) {
235: logger
236: .warn("conf/jobmanager.conf missing, using default values");
237: // defaults
238: _useCRC = true;
239: _useSSL = false;
240: _sleepSeconds = 10000; // 10 seconds
241: return;
242: } finally {
243: if (fis != null) {
244: try {
245: fis.close();
246: } catch (IOException e) {
247: logger
248: .error(
249: "Could not close the FileInputStream of conf/jobmanager.conf",
250: e);
251: }
252: fis = null;
253: }
254: }
255:
256: _useCRC = p.getProperty("useCRC", "true").equals("true");
257: _useSSL = p.getProperty("useSSLTransfers", "true").equals(
258: "true");
259: _sleepSeconds = 1000 * Integer.parseInt(PropertyHelper
260: .getProperty(p, "sleepSeconds"));
261: if (_runJob != null) {
262: _runJob.cancel();
263: }
264: _runJob = new TimerTask() {
265: public void run() {
266: if (_isStopped) {
267: return;
268: }
269: new JobTransferThread(getJobManager()).start();
270: }
271: };
272: getGlobalContext().getTimer().schedule(_runJob, 0,
273: _sleepSeconds);
274: }
275:
276: public synchronized void removeJobFromQueue(Job job) {
277: _queuedJobSet.remove(job);
278: }
279:
280: public void startJobs() {
281: _isStopped = false;
282: }
283:
284: private JobManager getJobManager() {
285: return this ;
286: }
287:
288: public void stopJob(Job job) {
289: removeJobFromQueue(job);
290: job.setDone();
291: }
292:
293: public void stopJobs() {
294: _isStopped = true;
295: }
296:
297: private boolean useCRC() {
298: return _useCRC;
299: }
300:
301: private boolean useSecureTransfers() {
302: return _useSSL;
303: }
304: }
|