# Written by Bram Cohen
# see LICENSE.txt for license information
#edited by Owen Williams to support more args
from zurllib import urlopen
from urlparse import urljoin
from btformats import check_message
from Choker import Choker
from Storage import Storage
from StorageWrapper import StorageWrapper
from Uploader import Upload
from Downloader import Downloader
from Connecter import Connecter
from Encrypter import Encoder
from RawServer import RawServer
from Rerequester import Rerequester
from DownloaderFeedback import DownloaderFeedback
from RateMeasure import RateMeasure
from CurrentRateMeasure import Measure
from PiecePicker import PiecePicker
from bencode import bencode,bdecode
from __init__ import version
from binascii import b2a_hex
from sha import sha
from os import path,makedirs
from parseargs import parseargs,formatDefinitions
from socket import error
from random import seed
from threading import Thread,Event
from time import time
try:
from os import getpid
except ImportError:
def getpid():
return 1
defaults = [
('max_uploads', 7,
"the maximum number of uploads to allow at once."),
('keepalive_interval', 120.0,
'number of seconds to pause between sending keepalives'),
('download_slice_size', 2 ** 14,
"How many bytes to query for per request."),
('request_backlog', 5,
"how many requests to keep in a single pipe at once."),
('max_message_length', 2 ** 23,
"maximum length prefix encoding you'll accept over the wire - larger values get the connection dropped."),
('ip', '',
"ip to report you have to the tracker."),
('minport', 6881, 'minimum port to listen on, counts up if unavailable'),
('maxport', 6999, 'maximum port to listen on'),
('responsefile', '',
'file the server response was stored in, alternative to url'),
('url', '',
'url to get file from, alternative to responsefile'),
('saveas', '',
'local file name to save the file as, null indicates query user'),
('timeout', 300.0,
'time to wait between closing sockets which nothing has been received on'),
('timeout_check_interval', 60.0,
'time to wait between checking if any connections have timed out'),
('max_slice_length', 2 ** 17,
"maximum length slice to send to peers, larger requests are ignored"),
('max_rate_period', 20.0,
"maximum amount of time to guess the current rate estimate represents"),
('bind', '',
'ip to bind to locally'),
('upload_rate_fudge', 5.0,
'time equivalent of writing to kernel-level TCP buffer, for rate adjustment'),
('display_interval', .5,
'time between updates of displayed information'),
('rerequest_interval', 5 * 60,
'time to wait between requesting more peers'),
('min_peers', 20,
'minimum number of peers to not do rerequesting'),
('http_timeout', 60,
'number of seconds to wait before assuming that an http connection has timed out'),
('max_initiate', 35,
'number of peers at which to stop initiating new connections'),
('max_allow_in', 55,
'maximum number of connections to allow, after this new incoming connections will be immediately closed'),
('check_hashes', 1,
'whether to check hashes on disk'),
('max_upload_rate', 0,
'maximum kB/s to upload at, 0 means no limit'),
('snub_time', 30.0,
"seconds to wait for data to come in over a connection before assuming it's semi-permanently choked"),
('spew', 0,
"whether to display diagnostic info to stdout"),
('rarest_first_cutoff', 4,
"number of downloads at which to switch from random to rarest first"),
('min_uploads', 4,
"the number of uploads to fill out to with extra optimistic unchokes"),
('report_hash_failures', 0,
"whether to inform the user that hash failures occur. They're non-fatal."),
]
def download(params, filefunc, statusfunc, finfunc, errorfunc, doneflag, cols, pathFunc = None, paramfunc = None, spewflag = Event()):
if len(params) == 0:
errorfunc('arguments are -\n' + formatDefinitions(defaults, cols))
return
try:
config, args = parseargs(params, defaults) #OWEN
if args:
if config.get('responsefile', None) == None:
raise ValueError, 'must have responsefile as arg or parameter, not both'
if path.isfile(args[0]):
config['responsefile'] = args[0]
else:
config['url'] = args[0]
if (config['responsefile'] == '') == (config['url'] == ''):
raise ValueError, 'need responsefile or url'
except ValueError, e:
errorfunc('error: ' + str(e) + '\nrun with no args for parameter explanations')
return
try:
if config['responsefile'] != '':
h = open(config['responsefile'], 'rb')
else:
h = urlopen(config['url'])
response = h.read()
h.close()
except IOError, e:
if config['responsefile'] != '' and config['responsefile'].find('Temporary Internet Files') != -1:
errorfunc('BitTorrent was passed a filename that doesn\'t exist. ' +
'Either clear your Temporary Internet Files or right-click the link ' +
'and save the .torrent to disk first.')
else:
errorfunc('problem getting response info - ' + str(e))
return
try:
response = bdecode(response)
check_message(response)
except ValueError, e:
errorfunc("got bad file info - " + str(e))
return
try:
def make(f, forcedir = False):
if not forcedir:
f = path.split(f)[0]
if f != '' and not path.exists(f):
makedirs(f)
info = response['info']
if info.has_key('length'):
file_length = info['length']
file = filefunc(info['name'], file_length, config['saveas'], False)
if file is None:
return
make(file)
files = [(file, file_length)]
else:
file_length = 0
for x in info['files']:
file_length += x['length']
file = filefunc(info['name'], file_length, config['saveas'], True)
if file is None:
return
# if this path exists, and no files from the info dict exist, we assume it's a new download and
# the user wants to create a new directory with the default name
existing = 0
if path.exists(file):
for x in info['files']:
if path.exists(path.join(file, x['path'][0])):
existing = 1
if not existing:
file = path.join(file, info['name'])
make(file, True)
# alert the UI to any possible change in path
if pathFunc != None:
pathFunc(file)
files = []
for x in info['files']:
n = file
for i in x['path']:
n = path.join(n, i)
files.append((n, x['length']))
make(n)
except OSError, e:
errorfunc("Couldn't allocate dir - " + str(e))
return
finflag = Event()
ann = [None]
myid = 'M' + version.replace('.', '-')
myid = myid + ('-' * (8 - len(myid))) + b2a_hex(sha(repr(time()) + ' ' + str(getpid())).digest()[-6:])
seed(myid)
pieces = [info['pieces'][x:x+20] for x in xrange(0,
len(info['pieces']), 20)]
def failed(reason, errorfunc = errorfunc, doneflag = doneflag):
doneflag.set()
if reason is not None:
errorfunc(reason)
rawserver = RawServer(doneflag, config['timeout_check_interval'], config['timeout'], errorfunc = errorfunc, maxconnects = config['max_allow_in'])
try:
try:
storage = Storage(files, open, path.exists, path.getsize)
except IOError, e:
errorfunc('trouble accessing files - ' + str(e))
return
def finished(finfunc = finfunc, finflag = finflag,
ann = ann, storage = storage, errorfunc = errorfunc):
finflag.set()
try:
storage.set_readonly()
except (IOError, OSError), e:
errorfunc('trouble setting readonly at end - ' + str(e))
if ann[0] is not None:
ann[0](1)
finfunc()
rm = [None]
def data_flunked(amount, rm = rm, errorfunc = errorfunc, report_hash_failures = config['report_hash_failures']):
if rm[0] is not None:
rm[0](amount)
if report_hash_failures:
errorfunc('a piece failed hash check, re-downloading it')
storagewrapper = StorageWrapper(storage,
config['download_slice_size'], pieces,
info['piece length'], finished, failed,
statusfunc, doneflag, config['check_hashes'], data_flunked)
except ValueError, e:
failed('bad data - ' + str(e))
except IOError, e:
failed('IOError - ' + str(e))
if doneflag.isSet():
return
e = 'maxport less than minport - no ports to check'
for listen_port in xrange(config['minport'], config['maxport'] + 1):
try:
rawserver.bind(listen_port, config['bind'])
break
except socketerror, e:
pass
else:
errorfunc("Couldn't listen - " + str(e))
return
choker = Choker(config['max_uploads'], rawserver.add_task, finflag.isSet,
config['min_uploads'])
upmeasure = Measure(config['max_rate_period'],
config['upload_rate_fudge'])
downmeasure = Measure(config['max_rate_period'])
def make_upload(connection, choker = choker,
storagewrapper = storagewrapper,
max_slice_length = config['max_slice_length'],
max_rate_period = config['max_rate_period'],
fudge = config['upload_rate_fudge']):
return Upload(connection, choker, storagewrapper,
max_slice_length, max_rate_period, fudge)
ratemeasure = RateMeasure(storagewrapper.get_amount_left())
rm[0] = ratemeasure.data_rejected
picker = PiecePicker(len(pieces), config['rarest_first_cutoff'])
for i in xrange(len(pieces)):
if storagewrapper.do_I_have(i):
picker.complete(i)
downloader = Downloader(storagewrapper, picker,
config['request_backlog'], config['max_rate_period'],
len(pieces), downmeasure, config['snub_time'],
ratemeasure.data_came_in)
connecter = Connecter(make_upload, downloader, choker,
len(pieces), upmeasure, config['max_upload_rate'] * 1024, rawserver.add_task)
infohash = sha(bencode(info)).digest()
encoder = Encoder(connecter, rawserver,
myid, config['max_message_length'], rawserver.add_task,
config['keepalive_interval'], infohash, config['max_initiate'])
rerequest = Rerequester(response['announce'], config['rerequest_interval'],
rawserver.add_task, connecter.how_many_connections,
config['min_peers'], encoder.start_connection,
rawserver.add_task, storagewrapper.get_amount_left,
upmeasure.get_total, downmeasure.get_total, listen_port,
config['ip'], myid, infohash, config['http_timeout'], errorfunc,
config['max_initiate'], doneflag, upmeasure.get_rate, downmeasure.get_rate,
encoder.ever_got_incoming)
if config['spew']:
spewflag.set()
DownloaderFeedback(choker, rawserver.add_task, statusfunc,
upmeasure.get_rate, downmeasure.get_rate,
upmeasure.get_total, downmeasure.get_total, ratemeasure.get_time_left,
ratemeasure.get_size_left, file_length, finflag,
config['display_interval'], spewflag)
# useful info and functions for the UI
if paramfunc:
paramfunc({ 'max_upload_rate' : connecter.change_max_upload_rate, # change_max_upload_rate(<int bytes/sec>)
'max_uploads': choker.change_max_uploads, # change_max_uploads(<int max uploads>)
'listen_port' : listen_port, # int
'peer_id' : myid, # string
'info_hash' : infohash, # string
'start_connection' : encoder._start_connection # start_connection((<string ip>, <int port>), <peer id>)
})
statusfunc({"activity" : 'connecting to peers'})
ann[0] = rerequest.announce
rerequest.begin()
rawserver.listen_forever(encoder)
storage.close()
rerequest.announce(2)
|