#! /usr/bin/python
# $Id: http_client.py 271 2004-10-09 10:50:59Z fredrik $
# a simple asynchronous http client (based on SimpleAsyncHTTP.py from
# "Python Standard Library" by Fredrik Lundh, O'Reilly 2001)
#
# HTTP/1.1 and GZIP support added in January 2003 by Fredrik Lundh.
#
# changes:
# 2004-08-26 fl unified http callback
# 2004-10-09 fl factored out gzip_consumer support
# 2005-07-08 mbp experimental support for keepalive connections
#
# Copyright (c) 2001-2004 by Fredrik Lundh. All rights reserved.
#
"""async/pipelined http client
Use
===
Users of this library pass in URLs they want to see, and consumer
objects that will receive the results at some point in the future.
Any number of requests may be queued up, and more may be added while
the download is in progress.
Requests can be both superscalar and superpipelined. That is to say,
for each server there can be multiple sockets open, and each socket
may have more than one request in flight.
Design
======
There is a single DownloadManager, and a connection object for each
open socket.
Request/consumer pairs are maintained in queues. Each connection has
a list of transmitted requests whose response has not yet been
received. There is also a per-server list of requests that have not
yet been submitted.
When a connection is ready to transmit a new request, it takes one
from the unsubmitted list, sends the request, and adds the request to
its unfulfilled list. This should happen when the connection has
space for more transmissions or when a new request is added by the
user. If the connection terminates with unfulfilled requests they are
put back onto the unsubmitted list, to be retried elsewhere.
Because responses come back precisely in order, the connection always
knows what it should expect next: the response for the next
unfulfilled request.
"""
# Note that (as of ubuntu python 2.4.1) every socket.connect() call
# with a hostname does a remote DNS resolution, which is pretty sucky.
# Shouldn't there be a cache in glibc? We should probably cache the
# address in, say, the DownloadManager.
# TODO: A default consumer operation that writes the received data
# into a file; by default the file is named the same as the last
# component of the URL.
# TODO: A utility function that is given a list of URLs, and downloads
# them all parallel/pipelined. If any fail, it raises an exception
# (and discards the rest), or perhaps can be told to continue anyhow.
# The content is written into temporary files. It returns a list of
# readable file objects.
# TODO: If we try pipelined or keepalive and the connection drop out
# then retry the request on a new connection; eventually we should perhaps
# learn that a given host or network just won't allow keepalive.
import asyncore
import socket, string, time, sys
import StringIO
import mimetools, urlparse, urllib
import logging
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s',
filename='/tmp/http_client.log',
filemode='w')
logger = logging.getLogger('bzr.http_client')
debug = logger.debug
info = logger.info
error = logger.error
##
# Close connection. Request handlers can raise this exception to
# indicate that the connection should be closed.
class CloseConnection(Exception):
pass
##
# Redirect connection. Request handlers can raise this exception to
# indicate that the a new request should be issued.
class Redirect(CloseConnection):
def __init__(self, location):
self.location = location
class DownloadManager(object):
"""Handles pipelined/overlapped downloads.
Pass in a series of URLs with handlers to receive the response.
This object will spread the requests over however many sockets
seem useful.
queued_requests
Requests not assigned to any channel
running_requests
Currently assigned to a channel
"""
def __init__(self):
self.queued_requests = []
# self.channel = HttpChannel('localhost', 8000, self)
self.channels = []
self.try_pipelined = False
self.try_keepalive = False
self.max_channels = 5
def enqueue(self, url, consumer):
self.queued_requests.append((url, consumer))
self._wake_up_channel()
def _channel_closed(self, channel):
"""Called by the channel when its socket closes.
"""
self.channels.remove(channel)
if self.queued_requests:
# might recreate one
self._wake_up_channel()
def _make_channel(self):
# proxy2 203.17.154.69
# return HttpChannel('82.211.81.161', 80, self) # bazaar-ng.org
# return HttpChannel('203.17.154.69', 8080, self)
return HttpChannel('127.0.0.1', 8000, self) # forwarded
def _wake_up_channel(self):
"""Try to wake up one channel to send the newly-added request.
There may be more than one request pending, and this may cause
more than one channel to take requests. That's OK; some of
them may be frustrated.
"""
from random import shuffle,choice
# first, wake up any idle channels
done = False
for ch in self.channels:
if not ch.sent_requests:
ch.take_one()
done = True
if done:
debug("woke existing idle channel(s)")
return
if len(self.channels) < self.max_channels:
newch = self._make_channel()
self.channels.append(newch)
newch.take_one()
debug("created new channel")
return
if self.try_pipelined:
# ask existing channels to take it
debug("woke busy channel")
choice(self.channels).take_one()
# debug("request postponed until a channel's idle")
def run(self):
"""Run until all outstanding requests have been served."""
#while self.running_requests or self.queued_requests \
# or not self.channel.is_idle():
# asyncore.loop(count=1)
asyncore.loop()
class Response(object):
"""Holds in-flight response."""
def _parse_response_http10(header):
from cStringIO import StringIO
fp = StringIO(header)
r = Response()
r.status = fp.readline().split(" ", 2)
r.headers = mimetools.Message(fp)
# we can only(?) expect to do keepalive if we got either a
# content-length or chunked encoding; otherwise there's no way to know
# when the content ends apart from through the connection close
r.content_type = r.headers.get("content-type")
try:
r.content_length = int(r.headers.get("content-length"))
except (ValueError, TypeError):
r.content_length = None
debug("seen content length of %r" % r.content_length)
r.transfer_encoding = r.headers.get("transfer-encoding")
r.content_encoding = r.headers.get("content-encoding")
r.connection_reply = r.headers.get("connection")
# TODO: pass status code to consumer?
if r.transfer_encoding:
raise NotImplementedError()
if r.transfer_encoding:
raise NotImplementedError()
if int(r.status[1]) != 200:
debug("can't handle response status %r" % r.status)
raise NotImplementedError()
if r.content_length is None:
raise NotImplementedError()
if r.content_length == 0:
raise NotImplementedError()
r.content_remaining = r.content_length
return r
class HttpChannel(asyncore.dispatcher_with_send):
"""One http socket, pipelining if possible."""
# asynchronous http client
user_agent = "http_client.py 1.3ka (based on effbot)"
proxies = urllib.getproxies()
def __init__(self, ip_host, ip_port, manager):
asyncore.dispatcher_with_send.__init__(self)
self.manager = manager
# if a response header has been seen, this holds it
self.response = None
self.data = ""
self.chunk_size = None
self.timestamp = time.time()
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
debug('connecting...')
self.connect((ip_host, ip_port))
# sent_requests holds (url, consumer)
self.sent_requests = []
self._outbuf = ''
def __repr__(self):
return 'HttpChannel(local_port=%r)' % (self.getsockname(),)
def is_idle(self):
return (not self.sent_requests)
def handle_connect(self):
debug("connected")
self.take_one()
def take_one(self):
"""Accept one request from the manager if possible."""
if self.manager.try_pipelined:
if len(self.sent_requests) > 4:
return
else:
if len(self.sent_requests) > 0:
return
try:
url, consumer = self.manager.queued_requests.pop(0)
debug('request accepted by channel')
except IndexError:
return
# TODO: If there are too many already in flight, don't take one.
# TODO: If the socket's not writable (tx buffer full), don't take.
self._push_request_http10(url, consumer)
def _push_request_http10(self, url, consumer):
"""Send a request, and add it to the outstanding queue."""
# TODO: check the url requested is appropriate for this connection
# TODO: If there are too many requests outstanding or (less likely) the
# connection fails, queue it for later use.
# TODO: Keep track of requests that have been sent but not yet fulfilled,
# because we might need to retransmit them if the connection fails. (Or
# should the caller do that?)
request = self._form_request_http10(url)
debug('send request for %s from %r' % (url, self))
# dispatcher_with_send handles buffering the data until it can
# be written, and hooks handle_write.
self.send(request)
self.sent_requests.append((url, consumer))
def _form_request_http10(self, url):
# TODO: get right vhost name
request = [
"GET %s HTTP/1.0" % (url),
"Host: www.bazaar-ng.org",
]
if self.manager.try_keepalive or self.manager.try_pipelined:
request.extend([
"Keep-Alive: 60",
"Connection: keep-alive",
])
# make sure to include a user agent
for header in request:
if string.lower(header).startswith("user-agent:"):
break
else:
request.append("User-Agent: %s" % self.user_agent)
return string.join(request, "\r\n") + "\r\n\r\n"
def handle_read(self):
# handle incoming data
data = self.recv(2048)
self.data = self.data + data
if len(data):
debug('got %d bytes from socket' % len(data))
else:
debug('server closed connection')
while self.data:
consumer = self.sent_requests[0][1]
if not self.response:
# do not have a full response header yet
# check if we've seen a full header
debug('getting header for %s' % self.sent_requests[0][0])
header = self.data.split("\r\n\r\n", 1)
if len(header) <= 1:
return
header, self.data = header
self.response = _parse_response_http10(header)
self.content_remaining = self.response.content_length
if not self.data:
return
# we now know how many (more) content bytes we have, and how much
# is in the data buffer. there are two main possibilities:
# too much data, and some must be left behind containing the next
# response headers, or too little, or possibly just right
want = self.content_remaining
if want > 0:
got_data = self.data[:want]
self.data = self.data[want:]
assert got_data
self.content_remaining -= len(got_data)
debug('pass back %d bytes of %s, %d remain'
% (len(got_data),
self.sent_requests[0][0],
self.content_remaining))
consumer.feed(data)
if self.content_remaining == 0:
del self.sent_requests[0]
debug('content complete')
consumer.content_complete()
# reset lots of things and try to get the next response header
if self.response.connection_reply == 'close':
debug('server requested close')
self.manager._channel_closed(self)
self.close()
elif not self.manager.try_keepalive:
debug('no keepalive for this socket')
self.manager._channel_closed(self)
self.close()
else:
debug("ready for next header...")
self.take_one()
self.response = None
def handle_close(self):
debug('async told us of close on %r' % self)
# if there are outstanding requests should probably reopen and
# retransmit, but if we're not making any progress then give up
self.manager._channel_closed(self)
self.close()
class DummyConsumer:
def __init__(self, url, pb):
self.url = url
self.outf = None
self._pb = pb
def feed(self, data):
# print "feed", repr(data)
# print "feed", repr(data[:20]), repr(data[-20:]), len(data)
if not self.outf:
base = self.url[self.url.rindex('/')+1:]
self.outf = file('/tmp/download/' + base, 'wb')
self.outf.write(data)
def error(self, err_info):
import traceback
error('error reported to consumer')
traceback.print_exception(err_info[0], err_info[1], err_info[2])
sys.exit(1)
def content_complete(self):
info('content complete from %s' % self.url)
self.outf.close()
self.outf = None
# using last_cnt is cheating
self._pb.update('downloading inventory',
self._pb.last_cnt+1,
self._pb.last_total)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
mgr = DownloadManager()
from bzrlib.branch import Branch
from bzrlib.progress import ProgressBar
pb = ProgressBar()
revs = Branch('/home/mbp/work/bzr').revision_history()
pb.update('downloading inventories', 0, len(revs))
for rev in revs:
url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/inventory-store/' \
+ rev + '.gz'
mgr.enqueue(url, DummyConsumer(url, pb))
mgr.run()
# for url in ['http://www.bazaar-ng.org/',
# 'http://www.bazaar-ng.org/tutorial.html',
# 'http://www.bazaar-ng.org/download.html',
# 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/mbp@hope-20050415013653-3b3c9c3d33fae0a6.gz',
# ]:
|