# -----------------------------------------------------------------------
# Copyright (C) 2003 Gustavo Sverzut Barbieri.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MER-
# Public License for more details.
# -----------------------------------------------------------------------
# This code is part of the pytvgrab project:
# http://pytvgrab.sourceforge.net
# -----------------------------------------------------------------------
# Subversion Information, do not edit
# $Rev: 246 $
# $LastChangedDate: 2004-10-14 03:16:23 +1000 (Thu, 14 Oct 2004) $
# $LastChangedRevision: 246 $
# $LastChangedBy: gustavo $
# $Log: $
from urllib import urlopen,urlencode
from customizedparser import CustomizedParser
from grabexceptions import ParseError
import message
import i18n
import os
import md5
class URL:
"""Provides an URL with GET and POST data.
This class is an easy way to deal with url and its associate GET and POST
data, it methods to get the url (address + get, as you may use in browsers)
with the getURL() method. The string representation is done with the
POST data as a GET variable '__POST'.
It also provides a open() method so it's easier to send both data and
get a file descriptor to the stream.
>>> u = URL( 'test.php',
{ 'getdata': '%(getdata)s' },
{ 'postdata': '%(postdata)s' } )
>>> print u
>>> u2 = u % { 'getdata': 'test1', 'postdata': 'test2' }
>>> print u2
>>> print u2.getURL()
>>> u2
<URL address="test.php" get="getdata=test1" post="postdata=test2">
def __init__( self, url, get=None, post=None ):
self.url = url
self.get = get or { }
self.post = post or { }
# __init__()
def __str__( self ):
s = self.getURL()
if self.post:
post = self.__urlencode__( self.post )
post = self.__urlencode__( { "__POST": post } )
if not self.get:
s += "?"
s += "&"
s += post
return s
# __str__()
def __repr__( self ):
url = self.url
get = self.__urlencode__( self.get )
post = self.__urlencode__( self.post )
return '<URL address="%s" get="%s" post="%s">' % ( url, get, post )
# __repr__()
def __mod__( self, p ):
url = self.url % p
get = {}
if self.get:
for k in self.get:
get[ k ] = self.get[ k ] % p
post = {}
if self.post:
for k in self.post:
post[ k ] = self.post[ k ] % p
return URL( url, get, post )
# __mod__()
def __urlencode__( self, p ):
return urlencode( p )
# __urlencode__()
def getURL( self ):
s = self.url
if self.get:
s += "?" + self.__urlencode__( self.get )
return s
# getURL()
def open( self, proxies=None ):
u = self.getURL()
p = None
if self.post:
p = self.__urlencode__( self.post )
return urlopen( u, p, proxies )
# open()
def cache_hash( url ):
"""Return the hashed value of url.
Use some hash method to get a unique name for url
return os.path.join( cache_dir,
"tv_grab-CACHE-" +
str( md5.new( str( url ) ).hexdigest() ) + '.html' )
# cache_hash()
def cache_geturl( url ):
"""Return a file descriptor for URL based on cache.
If the file was not previously cached, cahce it.
hash = cache_hash( url )
if os.path.isfile( hash ):
# hit
purl = url.getURL()
purl = url
message.moreinfo( _( 'getting cached "%s" (local file is "%s")' )% \
( purl, hash ) )
return file( hash )
# miss
purl = url.getURL()
purl = url
d = os.path.dirname( hash )
if not os.path.isdir( d ):
os.makedirs( d )
if hasattr( url, "open" ) and callable( url.open ):
u = url.open()
u = urlopen( url )
buf = u.read()
if buf.strip() == '':
raise IOError( _( 'URL contents were empty' ) )
message.moreinfo( _( 'getting and caching "%s" as local file "%s"' ) \
% ( purl, hash ) )
f = open( hash, "w+b" )
f.write( buf )
f.seek( 0, 0 )
return f
# cache_geturl()
def cache_clearurl( url ):
"""Clear cache for given url.
Remove the file that holds previous data from a given url.
hash = cache_hash( url )
if os.path.isfile( hash ):
os.remove( hash )
# cache_clearurl()
def get_urlcontents( url, filter=None, nretries=3 ):
retrun the contents of the the given url.
If you want to filter the contents, please provide the function as
the 'filter' parameter.
contents = ''
while nretries > 0:
gurl = None
if cache_dir:
gurl = cache_geturl( url )
purl = url.getURL()
purl = url
message.moreinfo('getting %s' % purl)
if hasattr( url, "open" ) and callable( url.open ):
gurl = url.open()
gurl = urlopen( url )
contents = gurl.read()
if contents.strip() == '':
raise IOError( _( 'URL contents were empty' ) )
message.moreinfo( _( 'got %s' ) % url)
except IOError, e:
message.warning( _( "get_urlcontents: IOError: %s" ) % e )
message.moreinfo( _( "get_urlcontents: Retry to fetch: %s" ) % \
url )
nretries -= 1
if nretries == 0:
message.error( _( "get_urlcontents: Error fetching %s" ) % url )
if filter and contents:
contents = filter( contents )
return contents
# get_urlcontents()
def get_htmlstructure( contents, parse_tags=None,
parse_attrs=None, must_close_tags=None,
keep_empty_tags=None, verbose=0 ):
parse the html and return its associated Tag structure.
You may choose what tags to parse providing a list of tag names
in 'parse_tags' and a list of attributes, providing a list of
attribute names in 'parse_attrs'.
parser = CustomizedParser( parse_tags, parse_attrs,
must_close_tags, keep_empty_tags,
verbose )
structure = None
parser.feed( contents )
structure = parser.get_structure()
return structure
except Exception, e:
message.exception( e )
# get_htmlstructure()
def get_urlparsed( url, filter=None, parse_tags=None,
parse_attrs=None, must_close_tags=None,
keep_empty_tags=None, retries=3, verbose=0):
returns the Tag structure of the document at URL.
You can specify a 'filter' function (see get_urlcontents()) and
a list of tags ('parse_tags') and attributes ('parse_attrs') to
parse (see get_htmlstructure()).
r = retries
while r > 0:
c = get_urlcontents( url, filter, retries )
struct = get_htmlstructure( c, parse_tags, parse_attrs,
keep_empty_tags=None, verbose=verbose )
if not struct:
r -= 1
if cache_dir:
# don't keep bogus cache
cache_clearurl( url )
return struct
# while r > 0
pe = ParseError( _( "%s could not be correctly parsed." ) % url )
pe.contents = c
raise pe
# get_urlparsed()
# -------------- Unit Tests -------------- #
import unittest2 as unittest
import unittest
class urlutils_UnitTest(unittest.TestCase):
def test01(self): v=1; assert v == 1, v
class URL_Test( unittest.TestCase ):
def test__repr__( self ):
self.assertEqual( repr( URL( 'test.php' ) ),
'<URL address="test.php" get="" post="">' )
self.assertEqual( repr( URL( 'test.php', { "a":1 } ) ),
'<URL address="test.php" get="a=1" post="">' )
self.assertEqual( repr( URL( 'test.php', { "a":1 }, { "b":2 } ) ),
'<URL address="test.php" get="a=1" post="b=2">' )
# test__repr__()
def test__str__( self ):
self.assertEqual( str( URL( 'test.php' ) ),
'test.php' )
self.assertEqual( str( URL( 'test.php', { "a":1 } ) ),
'test.php?a=1' )
self.assertEqual( str( URL( 'test.php', { "a":1 }, { "b":2 } ) ),
'test.php?a=1&__POST=b%3D2' )
self.assertEqual( str( URL( 'test.php', post={ "b":2 } ) ),
'test.php?__POST=b%3D2' )
# test__str__()
def testgetURL( self ):
self.assertEqual( URL( 'test.php' ).getURL(),
'test.php' )
self.assertEqual( URL( 'test.php', { "a":1 } ).getURL(),
'test.php?a=1' )
self.assertEqual( URL( 'test.php', { "a":1 }, { "b":2 } ).getURL(),
'test.php?a=1' )
# testgetURL()
def test__mod__( self ):
d = { "getdata": "123", "postdata": "456", "url": "test" }
a = URL( "test.php" ) % d
self.assertEqual( str( a ), "test.php" )
a = URL( "%(url)s.php3" ) % d
self.assertEqual( str( a ), "test.php3" )
a = URL( "%(url)s.php4",
{ "gd": "%(getdata)s" },
{ "pd": "%(postdata)s" } ) % d
self.assertEqual( str( a ), "test.php4?gd=123&__POST=pd%3D456" )
# test__mod__()
# URL_Test
if using_unittest2 or __name__ == '__main__':
# -------------- Unit Tests -------------- #