# Written by Bram Cohen
# see LICENSE.txt for license information
from CurrentRateMeasure import Measure
from random import shuffle
from time import time
from bitfield import Bitfield
class SingleDownload:
def __init__(self, downloader, connection):
self.downloader = downloader
self.connection = connection
self.choked = True
self.interested = False
self.active_requests = []
self.measure = Measure(downloader.max_rate_period)
self.have = Bitfield(downloader.numpieces)
self.last = 0
self.example_interest = None
def disconnected(self):
self.downloader.downloads.remove(self)
for i in xrange(len(self.have)):
if self.have[i]:
self.downloader.picker.lost_have(i)
self._letgo()
def _letgo(self):
if not self.active_requests:
return
if self.downloader.storage.is_endgame():
self.active_requests = []
return
lost = []
for index, begin, length in self.active_requests:
self.downloader.storage.request_lost(index, begin, length)
if index not in lost:
lost.append(index)
self.active_requests = []
ds = [d for d in self.downloader.downloads if not d.choked]
shuffle(ds)
for d in ds:
d._request_more(lost)
for d in self.downloader.downloads:
if d.choked and not d.interested:
for l in lost:
if d.have[l] and self.downloader.storage.do_I_have_requests(l):
d.interested = True
d.connection.send_interested()
break
def got_choke(self):
if not self.choked:
self.choked = True
self._letgo()
def got_unchoke(self):
if self.choked:
self.choked = False
if self.interested:
self._request_more()
def is_choked(self):
return self.choked
def is_interested(self):
return self.interested
def got_piece(self, index, begin, piece):
try:
self.active_requests.remove((index, begin, len(piece)))
except ValueError:
return False
if self.downloader.storage.is_endgame():
self.downloader.all_requests.remove((index, begin, len(piece)))
self.last = time()
self.measure.update_rate(len(piece))
self.downloader.measurefunc(len(piece))
self.downloader.downmeasure.update_rate(len(piece))
if not self.downloader.storage.piece_came_in(index, begin, piece):
if self.downloader.storage.is_endgame():
while self.downloader.storage.do_I_have_requests(index):
nb, nl = self.downloader.storage.new_request(index)
self.downloader.all_requests.append((index, nb, nl))
for d in self.downloader.downloads:
d.fix_download_endgame()
return False
self.downloader.picker.bump(index)
ds = [d for d in self.downloader.downloads if not d.choked]
shuffle(ds)
for d in ds:
d._request_more([index])
return False
if self.downloader.storage.do_I_have(index):
self.downloader.picker.complete(index)
if self.downloader.storage.is_endgame():
for d in self.downloader.downloads:
if d is not self and d.interested:
if d.choked:
d.fix_download_endgame()
else:
try:
d.active_requests.remove((index, begin, len(piece)))
except ValueError:
continue
d.connection.send_cancel(index, begin, len(piece))
d.fix_download_endgame()
self._request_more()
if self.downloader.picker.am_I_complete():
for d in [i for i in self.downloader.downloads if i.have.numfalse == 0]:
d.connection.close()
return self.downloader.storage.do_I_have(index)
def _want(self, index):
return self.have[index] and self.downloader.storage.do_I_have_requests(index)
def _request_more(self, indices = None):
assert not self.choked
if len(self.active_requests) == self.downloader.backlog:
return
if self.downloader.storage.is_endgame():
self.fix_download_endgame()
return
lost_interests = []
while len(self.active_requests) < self.downloader.backlog:
if indices is None:
interest = self.downloader.picker.next(self._want, self.have.numfalse == 0)
else:
interest = None
for i in indices:
if self.have[i] and self.downloader.storage.do_I_have_requests(i):
interest = i
break
if interest is None:
break
if not self.interested:
self.interested = True
self.connection.send_interested()
self.example_interest = interest
begin, length = self.downloader.storage.new_request(interest)
self.downloader.picker.requested(interest, self.have.numfalse == 0)
self.active_requests.append((interest, begin, length))
self.connection.send_request(interest, begin, length)
if not self.downloader.storage.do_I_have_requests(interest):
lost_interests.append(interest)
if not self.active_requests and self.interested:
self.interested = False
self.connection.send_not_interested()
if lost_interests:
for d in self.downloader.downloads:
if d.active_requests or not d.interested:
continue
if d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest):
continue
for lost in lost_interests:
if d.have[lost]:
break
else:
continue
interest = self.downloader.picker.next(d._want, d.have.numfalse == 0)
if interest is None:
d.interested = False
d.connection.send_not_interested()
else:
d.example_interest = interest
if self.downloader.storage.is_endgame():
self.downloader.all_requests = []
for d in self.downloader.downloads:
self.downloader.all_requests.extend(d.active_requests)
for d in self.downloader.downloads:
d.fix_download_endgame()
def fix_download_endgame(self):
want = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests]
if self.interested and not self.active_requests and not want:
self.interested = False
self.connection.send_not_interested()
return
if not self.interested and want:
self.interested = True
self.connection.send_interested()
if self.choked:
return
shuffle(want)
del want[self.downloader.backlog - len(self.active_requests):]
self.active_requests.extend(want)
for piece, begin, length in want:
self.connection.send_request(piece, begin, length)
def got_have(self, index):
if self.have[index]:
return
self.have[index] = True
self.downloader.picker.got_have(index)
if self.downloader.picker.am_I_complete() and self.have.numfalse == 0:
self.connection.close()
return
if self.downloader.storage.is_endgame():
self.fix_download_endgame()
elif self.downloader.storage.do_I_have_requests(index):
if not self.choked:
self._request_more([index])
else:
if not self.interested:
self.interested = True
self.connection.send_interested()
def got_have_bitfield(self, have):
self.have = have
for i in xrange(len(self.have)):
if self.have[i]:
self.downloader.picker.got_have(i)
if self.downloader.picker.am_I_complete() and self.have.numfalse == 0:
self.connection.close()
return
if self.downloader.storage.is_endgame():
for piece, begin, length in self.downloader.all_requests:
if self.have[piece]:
self.interested = True
self.connection.send_interested()
return
for i in xrange(len(self.have)):
if self.have[i] and self.downloader.storage.do_I_have_requests(i):
self.interested = True
self.connection.send_interested()
return
def get_rate(self):
return self.measure.get_rate()
def is_snubbed(self):
return time() - self.last > self.downloader.snub_time
class Downloader:
def __init__(self, storage, picker, backlog, max_rate_period, numpieces,
downmeasure, snub_time, measurefunc = lambda x: None):
self.storage = storage
self.picker = picker
self.backlog = backlog
self.max_rate_period = max_rate_period
self.downmeasure = downmeasure
self.numpieces = numpieces
self.snub_time = snub_time
self.measurefunc = measurefunc
self.downloads = []
def make_download(self, connection):
self.downloads.append(SingleDownload(self, connection))
return self.downloads[-1]
class DummyPicker:
def __init__(self, num, r):
self.stuff = range(num)
self.r = r
def next(self, wantfunc, seed):
for i in self.stuff:
if wantfunc(i):
return i
return None
def lost_have(self, pos):
self.r.append('lost have')
def got_have(self, pos):
self.r.append('got have')
def requested(self, pos, seed):
self.r.append('requested')
def complete(self, pos):
self.stuff.remove(pos)
self.r.append('complete')
def am_I_complete(self):
return False
def bump(self, i):
pass
class DummyStorage:
def __init__(self, remaining, have_endgame = False, numpieces = 1):
self.remaining = remaining
self.active = [[] for i in xrange(numpieces)]
self.endgame = False
self.have_endgame = have_endgame
def do_I_have_requests(self, index):
return self.remaining[index] != []
def request_lost(self, index, begin, length):
x = (begin, length)
self.active[index].remove(x)
self.remaining[index].append(x)
self.remaining[index].sort()
def piece_came_in(self, index, begin, piece):
self.active[index].remove((begin, len(piece)))
return True
def do_I_have(self, index):
return (self.remaining[index] == [] and
self.active[index] == [])
def new_request(self, index):
x = self.remaining[index].pop()
for i in self.remaining:
if i:
break
else:
self.endgame = True
self.active[index].append(x)
self.active[index].sort()
return x
def is_endgame(self):
return self.have_endgame and self.endgame
class DummyConnection:
def __init__(self, events):
self.events = events
def send_interested(self):
self.events.append('interested')
def send_not_interested(self):
self.events.append('not interested')
def send_request(self, index, begin, length):
self.events.append(('request', index, begin, length))
def send_cancel(self, index, begin, length):
self.events.append(('cancel', index, begin, length))
def test_stops_at_backlog():
ds = DummyStorage([[(0, 2), (2, 2), (4, 2), (6, 2)]])
events = []
d = Downloader(ds, DummyPicker(len(ds.remaining), events), 2, 15, 1, Measure(15), 10)
sd = d.make_download(DummyConnection(events))
assert events == []
assert ds.remaining == [[(0, 2), (2, 2), (4, 2), (6, 2)]]
assert ds.active == [[]]
sd.got_have_bitfield(Bitfield(1, chr(0x80)))
assert events == ['got have', 'interested']
del events[:]
assert ds.remaining == [[(0, 2), (2, 2), (4, 2), (6, 2)]]
assert ds.active == [[]]
sd.got_unchoke()
assert events == ['requested', ('request', 0, 6, 2), 'requested', ('request', 0, 4, 2)]
del events[:]
assert ds.remaining == [[(0, 2), (2, 2)]]
assert ds.active == [[(4, 2), (6, 2)]]
sd.got_piece(0, 4, 'ab')
assert events == ['requested', ('request', 0, 2, 2)]
del events[:]
assert ds.remaining == [[(0, 2)]]
assert ds.active == [[(2, 2), (6, 2)]]
def test_got_have_single():
ds = DummyStorage([[(0, 2)]])
events = []
d = Downloader(ds, DummyPicker(len(ds.remaining), events), 2, 15, 1, Measure(15), 10)
sd = d.make_download(DummyConnection(events))
assert events == []
assert ds.remaining == [[(0, 2)]]
assert ds.active == [[]]
sd.got_unchoke()
assert events == []
assert ds.remaining == [[(0, 2)]]
assert ds.active == [[]]
sd.got_have(0)
assert events == ['got have', 'interested', 'requested', ('request', 0, 0, 2)]
del events[:]
assert ds.remaining == [[]]
assert ds.active == [[(0, 2)]]
sd.disconnected()
assert events == ['lost have']
def test_choke_clears_active():
ds = DummyStorage([[(0, 2)]])
events = []
d = Downloader(ds, DummyPicker(len(ds.remaining), events), 2, 15, 1, Measure(15), 10)
sd1 = d.make_download(DummyConnection(events))
sd2 = d.make_download(DummyConnection(events))
assert events == []
assert ds.remaining == [[(0, 2)]]
assert ds.active == [[]]
sd1.got_unchoke()
sd1.got_have(0)
assert events == ['got have', 'interested', 'requested', ('request', 0, 0, 2)]
del events[:]
assert ds.remaining == [[]]
assert ds.active == [[(0, 2)]]
sd2.got_unchoke()
sd2.got_have(0)
assert events == ['got have']
del events[:]
assert ds.remaining == [[]]
assert ds.active == [[(0, 2)]]
sd1.got_choke()
assert events == ['interested', 'requested', ('request', 0, 0, 2), 'not interested']
del events[:]
assert ds.remaining == [[]]
assert ds.active == [[(0, 2)]]
sd2.got_piece(0, 0, 'ab')
assert events == ['complete', 'not interested']
del events[:]
assert ds.remaining == [[]]
assert ds.active == [[]]
def test_endgame():
ds = DummyStorage([[(0, 2)], [(0, 2)], [(0, 2)]], True, 3)
events = []
d = Downloader(ds, DummyPicker(len(ds.remaining), events), 10, 15, 3, Measure(15), 10)
ev1 = []
ev2 = []
ev3 = []
ev4 = []
sd1 = d.make_download(DummyConnection(ev1))
sd2 = d.make_download(DummyConnection(ev2))
sd3 = d.make_download(DummyConnection(ev3))
sd1.got_unchoke()
sd1.got_have(0)
assert ev1 == ['interested', ('request', 0, 0, 2)]
del ev1[:]
sd2.got_unchoke()
sd2.got_have(0)
sd2.got_have(1)
assert ev2 == ['interested', ('request', 1, 0, 2)]
del ev2[:]
sd3.got_unchoke()
sd3.got_have(0)
sd3.got_have(1)
sd3.got_have(2)
assert (ev3 == ['interested', ('request', 2, 0, 2), ('request', 0, 0, 2), ('request', 1, 0, 2)] or
ev3 == ['interested', ('request', 2, 0, 2), ('request', 1, 0, 2), ('request', 0, 0, 2)])
del ev3[:]
assert ev2 == [('request', 0, 0, 2)]
del ev2[:]
sd2.got_piece(0, 0, 'ab')
assert ev1 == [('cancel', 0, 0, 2), 'not interested']
del ev1[:]
assert ev2 == []
assert ev3 == [('cancel', 0, 0, 2)]
del ev3[:]
sd3.got_choke()
assert ev1 == []
assert ev2 == []
assert ev3 == []
sd3.got_unchoke()
assert (ev3 == [('request', 2, 0, 2), ('request', 1, 0, 2)] or
ev3 == [('request', 1, 0, 2), ('request', 2, 0, 2)])
del ev3[:]
assert ev1 == []
assert ev2 == []
sd4 = d.make_download(DummyConnection(ev4))
sd4.got_have_bitfield([True, True, True])
assert ev4 == ['interested']
del ev4[:]
sd4.got_unchoke()
assert (ev4 == [('request', 2, 0, 2), ('request', 1, 0, 2)] or
ev4 == [('request', 1, 0, 2), ('request', 2, 0, 2)])
assert ev1 == []
assert ev2 == []
assert ev3 == []
def test_stops_at_backlog_endgame():
ds = DummyStorage([[(2, 2), (0, 2)], [(2, 2), (0, 2)], [(0, 2)]], True, 3)
events = []
d = Downloader(ds, DummyPicker(len(ds.remaining), events), 3, 15, 3, Measure(15), 10)
ev1 = []
ev2 = []
ev3 = []
sd1 = d.make_download(DummyConnection(ev1))
sd2 = d.make_download(DummyConnection(ev2))
sd3 = d.make_download(DummyConnection(ev3))
sd1.got_unchoke()
sd1.got_have(0)
assert ev1 == ['interested', ('request', 0, 0, 2), ('request', 0, 2, 2)]
del ev1[:]
sd2.got_unchoke()
sd2.got_have(0)
assert ev2 == []
sd2.got_have(1)
assert ev2 == ['interested', ('request', 1, 0, 2), ('request', 1, 2, 2)]
del ev2[:]
sd3.got_unchoke()
sd3.got_have(2)
assert (ev2 == [('request', 0, 0, 2)] or
ev2 == [('request', 0, 2, 2)])
n = ev2[0][2]
del ev2[:]
sd1.got_piece(0, n, 'ab')
assert ev1 == []
assert ev2 == [('cancel', 0, n, 2), ('request', 0, 2-n, 2)]
|