/*
* Copyright (C) 2007 Eskil Bylund
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using DCSharp.Backend.Connections;
using DCSharp.Backend.Objects;
using DCSharp.Logging;
using FileListingDCSharp.Xml.FileList.FileListing;
namespace DCSharp.Backend.Managers{
public class Downloader : IDisposable
{
private static Logger log = LogManager.GetLogger("Downloader");
private ConnectionManager connectionManager;
private DownloadManager downloadManager;
private Listener listener;
private const int TimerInterval = 5000;
private Dictionary<User, UserConnection> activeConnections;
private Dictionary<User, DownloadFileInfo> pendingConnections;
private Dictionary<User, DownloadFileInfo> priorityDownloads;
private Dictionary<Priority, List<DownloadFileInfo>> downloads;
private Timer downloadTimer;
private object timerLock;
private object listLock;
#region Constructors
public Downloader(ConnectionManager connectionManager,
DownloadManager downloadManager, Listener listener)
{
if (connectionManager == null)
{
throw new ArgumentNullException("connectionManager");
}
if (downloadManager == null)
{
throw new ArgumentNullException("downloadManager");
}
if (listener == null)
{
throw new ArgumentNullException("listener");
}
this.connectionManager = connectionManager;
this.downloadManager = downloadManager;
this.listener = listener;
activeConnections = new Dictionary<User, UserConnection>();
pendingConnections = new Dictionary<User, DownloadFileInfo>();
priorityDownloads = new Dictionary<User, DownloadFileInfo>();
downloads = new Dictionary<Priority, List<DownloadFileInfo>>();
foreach (int priority in Enum.GetValues(typeof(Priority)))
{
downloads.Add((Priority)priority, new List<DownloadFileInfo>());
}
timerLock = new object();
listLock = new object();
// Load the downlaods
lock (downloadManager.SyncRoot)
{
foreach (DownloadFileInfo download in downloadManager)
{
Priority priority = download.Priority;
downloads[priority].Add(download);
}
}
// Events
connectionManager.ConnectionAdded += OnConnectionAdded;
connectionManager.ConnectionRemoved += OnConnectionRemoved;
downloadManager.DownloadAdded += OnDownloadAdded;
downloadManager.DownloadRemoved += OnDownloadRemoved;
listener.UserAdded += OnUserAdded;
}
#endregion
#region Methods
public virtual void Dispose()
{
connectionManager.ConnectionAdded -= OnConnectionAdded;
connectionManager.ConnectionRemoved -= OnConnectionRemoved;
downloadManager.DownloadAdded -= OnDownloadAdded;
downloadManager.DownloadRemoved -= OnDownloadRemoved;
listener.UserAdded -= OnUserAdded;
}
public void SetPriority(DownloadFileInfo download, Priority priority)
{
lock (listLock)
{
downloads[download.Priority].Remove(download);
downloads[priority].Add(download);
download.Priority = priority;
}
}
public void SetNextDownload(User user, DownloadFileInfo download,
bool forceStart)
{
if (user == null)
{
throw new ArgumentNullException("user");
}
if (download == null)
{
throw new ArgumentNullException("download");
}
if (!downloadManager.Contains(download))
{
return;
}
lock (listLock)
{
log.Info("Prioritizing " + download.Name + " from " + user);
UserConnection connection = GetDownloadConnection(user);
if (connection != null && connection.Download == download)
{
// Already downloading the file from this user
return;
}
if (pendingConnections.ContainsKey(user))
{
// A connection request has been made. Replace the download
// so that another connection request can be made for it.
pendingConnections[user] = download;
}
priorityDownloads[user] = download;
if (forceStart && !connectionManager.UserIsBusy(user))
{
if (connection != null && connection.Download != download)
{
// Abort the active download from this user
connection.AbortTransfer();
}
connection = download.UserConnection;
if (download.Active && connection != null)
{
// Abort the download to get the file from this user instead
connection.AbortTransfer();
}
}
Start(download, user);
}
StartTimer();
}
private void StartDownloads()
{
lock (listLock)
{
// Start high priority downloads
foreach (KeyValuePair<User, DownloadFileInfo> pair in priorityDownloads)
{
Start(pair.Value, pair.Key);
}
// Start downloads by priority
foreach (int priority in Enum.GetValues(typeof(Priority)))
{
downloads[(Priority)priority].ForEach(Start);
}
}
}
private void Start(DownloadFileInfo download)
{
Start(download, null);
}
/// <summary>
/// Start downloading from an existing connection or requests a
/// connection to one of the online users
/// </summary>
/// <remarks>Lock on listLock before calling.</remarks>
private void Start(DownloadFileInfo download, User targetUser)
{
if (download == null)
{
throw new ArgumentNullException("download");
}
if (download.Active || pendingConnections.ContainsValue(download))
{
return;
}
if (targetUser != null &&
(!connectionManager.CanRequest(targetUser) ||
pendingConnections.ContainsKey(targetUser) ||
activeConnections.ContainsKey(targetUser)))
{
return;
}
foreach (SourceInfo source in download.SourceInfos)
{
User user = source.User;
if (!source.Available || (targetUser != null && user != targetUser))
{
continue;
}
// Check for an existing connection
UserConnection connection = GetDownloadConnection(user);
if (connection != null && !connection.Downloading)
{
TryDownloadFile(connection, download, source);
return;
}
// Request a new connection
if (source.User.IsOnline &&
connectionManager.CanRequest(user) &&
!pendingConnections.ContainsKey(user) &&
!activeConnections.ContainsKey(user))
{
log.Info("Requesting connection to " + user);
pendingConnections.Add(user, download);
connectionManager.RequestConnection(user, delegate
{
// Connection request failed
lock (listLock)
{
log.Info("Connection request to " + user + " failed");
pendingConnections.Remove(user);
}
StartTimer();
});
return;
}
}
return;
}
#region Download using an existing connection
private void DownloadUsingConnection(UserConnection connection)
{
DownloadFileInfo next = null;
lock (listLock)
{
// Check the high priority downloads
if (priorityDownloads.TryGetValue(connection.RemoteIdentity.User,
out next))
{
priorityDownloads.Remove(connection.RemoteIdentity.User);
}
if (next != null && !next.Active)
{
foreach (SourceInfo source in next.SourceInfos)
{
if (source.User == connection.RemoteIdentity.User &&
source.Available)
{
TryDownloadFile(connection, next, source);
return;
}
}
}
// Get the first download with a source from this user
foreach (int priority in Enum.GetValues(typeof(Priority)))
{
foreach (DownloadFileInfo download in downloads[(Priority)priority])
{
if (download.Active)
{
continue;
}
foreach (SourceInfo source in download.SourceInfos)
{
if (source.User == connection.RemoteIdentity.User &&
source.Available)
{
TryDownloadFile(connection, download, source);
return;
}
}
}
}
}
}
private void TryDownloadFile(UserConnection connection,
DownloadFileInfo download, SourceInfo source)
{
try
{
connection.DownloadFile(download, source);
}
catch (Exception e)
{
log.Error("Error downloading file", e, download, source);
}
}
#endregion
private UserConnection GetDownloadConnection(User user)
{
if (activeConnections.ContainsKey(user))
{
return activeConnections[user];
}
return null;
}
#region Timer
private void StartTimer()
{
lock (timerLock)
{
if (downloadTimer == null)
{
downloadTimer = new Timer(OnTimerElapsed, null, TimerInterval,
Timeout.Infinite);
}
}
}
private void OnTimerElapsed(object obj)
{
StartDownloads();
downloadTimer.Dispose();
downloadTimer = null;
}
#endregion
#region Event handlers
private void OnUserAdded(object obj, EventArgs args)
{
StartTimer();
}
private void OnConnectionAdded(object obj, ConnectionEventArgs args)
{
UserConnection connection = (UserConnection)args.Connection;
connection.DirectionChanged += OnDirectionChanged;
}
private void OnConnectionRemoved(object obj, ConnectionEventArgs args)
{
UserConnection connection = (UserConnection)args.Connection;
connection.DirectionChanged -= OnDirectionChanged;
connection.TransferCompleted -= OnTransferCompleted;
connection.TransferAborted -= OnTransferAborted;
lock (listLock)
{
UserConnection downloadConnection = GetDownloadConnection(
connection.RemoteIdentity.User);
if (connection == downloadConnection)
{
activeConnections.Remove(connection.RemoteIdentity.User);
}
pendingConnections.Remove(connection.RemoteIdentity.User);
}
StartTimer();
}
private void OnDirectionChanged(object obj, EventArgs args)
{
UserConnection connection = (UserConnection)obj;
if (connection.Direction == TransferDirection.Down)
{
// The connection is ready for downloading.
lock (listLock)
{
// Only one connection from each user is used for downloading.
if (activeConnections.ContainsKey(connection.RemoteIdentity.User))
{
return;
}
activeConnections.Add(connection.RemoteIdentity.User, connection);
pendingConnections.Remove(connection.RemoteIdentity.User);
}
connection.TransferCompleted += OnTransferCompleted;
connection.TransferAborted += OnTransferAborted;
DownloadUsingConnection(connection);
}
else
{
lock (listLock)
{
if (pendingConnections.ContainsKey(connection.RemoteIdentity.User))
{
// The requested connection is used for uploading,
// request another connection.
pendingConnections.Remove(connection.RemoteIdentity.User);
StartTimer();
}
}
}
}
private void OnTransferCompleted(object obj, TransferEventArgs args)
{
UserConnection connection = (UserConnection)obj;
DownloadFileInfo download = (DownloadFileInfo)args.Transfer;
if (download.IsFileList)
{
try
{
string file = Path.Combine(download.Target, download.Name);
using (FileStream stream = new FileStream(file, FileMode.Open,
FileAccess.Read, FileShare.Read))
{
FileListing list = FileListing.GrabFileListing(stream,
DCSharp.Extras.Util.Compression.BZip2,
connection.RemoteIdentity.User.Uid);
Runtime.Listener.EmitReceivedFileListing(list,
connection.RemoteIdentity);
}
File.Delete(file);
}
catch (Exception e)
{
// TODO: Notify the user
log.Error("Error opening file list", e, download);
}
}
downloadManager.Remove(download);
DownloadUsingConnection(connection);
}
private void OnTransferAborted(object obj, TransferErrorEventArgs args)
{
UserConnection connection = (UserConnection)obj;
if (args.Error == TransferError.NoSlots)
{
connectionManager.HoldConnection(connection.RemoteIdentity.User);
}
else if (args.Error == TransferError.NotAvailable)
{
// Currently handled by the protocol
}
else if (args.Error != TransferError.Aborted)
{
log.Error("Transfer error: " + args.Error + " - " + args.Message);
}
}
private void OnDownloadAdded(object obj, DownloadEventArgs args)
{
DownloadFileInfo download = args.Download;
Priority priority = download.Priority;
lock (listLock)
{
downloads[priority].Add(download);
if (download.IsFileList)
{
Start(download);
}
else
{
StartTimer();
}
}
}
private void OnDownloadRemoved(object obj, DownloadEventArgs args)
{
DownloadFileInfo download = args.Download;
lock (listLock)
{
foreach (KeyValuePair<Priority, List<DownloadFileInfo>> pair in
downloads)
{
pair.Value.Remove(download);
}
foreach (KeyValuePair<User, DownloadFileInfo> pair in priorityDownloads)
{
if (pair.Value == download)
{
priorityDownloads.Remove(pair.Key);
break;
}
}
}
UserConnection connection = download.UserConnection;
if (download.Active && connection != null)
{
connection.AbortTransfer();
}
}
#endregion
#endregion
}
}
|