urlutils.py :  » Web-Services » python-xmltv » pytvgrab-lib-0.5.1 » lib » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Services » python xmltv 
python xmltv » pytvgrab lib 0.5.1 » lib » urlutils.py
#!/usr/bin/python
# -----------------------------------------------------------------------
# Copyright (C) 2003 Gustavo Sverzut Barbieri.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MER-
# CHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
# -----------------------------------------------------------------------
#
# This code is part of the pytvgrab project:
#    http://pytvgrab.sourceforge.net
#
# -----------------------------------------------------------------------
# Subversion Information, do not edit
#
# $Rev: 246 $
# $LastChangedDate: 2004-10-14 03:16:23 +1000 (Thu, 14 Oct 2004) $
# $LastChangedRevision: 246 $
# $LastChangedBy: gustavo $
#
# $Log: $
#

from urllib import urlopen,urlencode
from customizedparser import CustomizedParser
from grabexceptions import ParseError
import message
import i18n
import os
import md5

cache_dir=None


class URL:
    """Provides an URL with GET and POST data.

    This class is an easy way to deal with url and its associate GET and POST
    data, it methods to get the url (address + get, as you may use in browsers)
    with the getURL() method.   The string representation is done with the
    POST data as a GET variable '__POST'.

    It also provides a open() method so it's easier to send both data and
    get a file descriptor to the stream.

    Example:

    >>> u = URL( 'test.php',
                 { 'getdata': '%(getdata)s' },
                 { 'postdata': '%(postdata)s' } )
    >>> print u
    test.php?getdata=%25%28getdata%29s&__POST=postdata%3D%2525%2528postdata%2529s
    >>> u2 = u % { 'getdata': 'test1', 'postdata': 'test2' }
    >>> print u2
    test.php?getdata=test1&__POST=postdata%3Dtest2
    >>> print u2.getURL()
    test.php?getdata=test1
    >>> u2
    <URL address="test.php" get="getdata=test1" post="postdata=test2">

    """
    def __init__( self, url, get=None, post=None ):
        self.url  = url
        self.get  = get  or { }
        self.post = post or { }
    # __init__()


    def __str__( self ):
        s = self.getURL()
        if self.post:
            post = self.__urlencode__( self.post )
            post = self.__urlencode__( { "__POST": post } )
            if not self.get:
                s += "?"
            else:
                s += "&"
            s += post
        return s
    # __str__()


    def __repr__( self ):
        url  = self.url
        get  = self.__urlencode__( self.get )
        post = self.__urlencode__( self.post )
        return '<URL address="%s" get="%s" post="%s">' % ( url, get, post )
    # __repr__()


    def __mod__( self, p ):
        url = self.url % p
        get = {}
        if self.get:
            for k in self.get:
                get[ k ] = self.get[ k ] % p
        post = {}
        if self.post:
            for k in self.post:
                post[ k ] = self.post[ k ] % p
        return URL( url, get, post )
    # __mod__()


    def __urlencode__( self, p ):
        return urlencode( p )
    # __urlencode__()


    def getURL( self ):
        s = self.url
        if self.get:
            s += "?" + self.__urlencode__( self.get )
        return s
    # getURL()


    def open( self, proxies=None ):
        u = self.getURL()
        p = None
        if self.post:
            p = self.__urlencode__( self.post )
        return urlopen( u, p, proxies )
    # open()
# URL



def cache_hash( url ):
    """Return the hashed value of url.

    Use some hash method to get a unique name for url
    """
    return os.path.join( cache_dir,
                         "tv_grab-CACHE-" +
                         str( md5.new( str( url ) ).hexdigest() ) + '.html' )
# cache_hash()


def cache_geturl( url ):
    """Return a file descriptor for URL based on cache.

    If the file was not previously cached, cahce it.
    """
    hash = cache_hash( url )
    if os.path.isfile( hash ):
        # hit
        try:
            purl = url.getURL()
        except:
            purl = url
  message.moreinfo( _( 'getting cached "%s" (local file is "%s")' )% \
                     ( purl, hash ) )
        return file( hash )
    else:
        # miss
        try:
            purl = url.getURL()
        except:
            purl = url
        d = os.path.dirname( hash )
        if not os.path.isdir( d ):
            os.makedirs( d )

        if hasattr( url, "open" ) and callable( url.open ):
            u = url.open()
        else:
            u = urlopen( url )

        buf = u.read()

        if buf.strip() == '':
            raise IOError( _( 'URL contents were empty' ) )

  message.moreinfo( _( 'getting and caching "%s" as local file "%s"' ) \
                          % ( purl, hash ) )
        f = open( hash, "w+b" )
        f.write( buf )
        u.close()
        f.flush()
        f.seek( 0, 0 )
        return f
# cache_geturl()



def cache_clearurl( url ):
    """Clear cache for given url.

    Remove the file that holds previous data from a given url.
    """
    hash = cache_hash( url )
    if os.path.isfile( hash ):
        os.remove( hash )
# cache_clearurl()



def get_urlcontents( url, filter=None, nretries=3 ):
    """
    retrun the contents of the the given url.
    If you want to filter the contents, please provide the function as
    the 'filter' parameter.
    """
    contents = ''
    while nretries > 0:
        try:
            gurl = None
            if cache_dir:
                gurl = cache_geturl( url )
            else:
                try:
                    purl = url.getURL()
                except:
                    purl = url
        message.moreinfo('getting %s' % purl)

                if hasattr( url, "open" ) and callable( url.open ):
                    gurl = url.open()
                else:
                    gurl = urlopen( url )

            contents = gurl.read()

            if contents.strip() == '':
                raise IOError( _( 'URL contents were empty' ) )

      message.moreinfo( _( 'got %s' ) % url)
            gurl.close()
            break
        except IOError, e:
            message.warning( _( "get_urlcontents: IOError: %s" ) % e )
            message.moreinfo( _( "get_urlcontents: Retry to fetch: %s" ) % \
                              url )
            nretries -= 1
    if nretries == 0:
        message.error( _( "get_urlcontents: Error fetching %s" ) % url )

    if filter and contents:
        contents = filter( contents )

    return contents
# get_urlcontents()


def get_htmlstructure( contents, parse_tags=None,
                       parse_attrs=None, must_close_tags=None,
                       keep_empty_tags=None, verbose=0 ):
    """
    parse the html and return its associated Tag structure.
    You may choose what tags to parse providing a list of tag names
    in 'parse_tags' and a list of attributes, providing a list of
    attribute names in 'parse_attrs'.
    """
    parser = CustomizedParser( parse_tags, parse_attrs,
                               must_close_tags, keep_empty_tags,
                               verbose )
    structure = None
    try:
        parser.feed( contents )
        structure = parser.get_structure()
        parser.close()
        return structure
    except Exception, e:
        message.exception( e ) 
# get_htmlstructure()


def get_urlparsed( url, filter=None, parse_tags=None,
                   parse_attrs=None, must_close_tags=None,
                   keep_empty_tags=None, retries=3, verbose=0):
    """
    returns the Tag structure of the document at URL.
    You can specify a 'filter' function (see get_urlcontents()) and
    a list of tags ('parse_tags') and attributes ('parse_attrs') to
    parse (see get_htmlstructure()).
    """
    r = retries
    while r > 0:
        c = get_urlcontents( url, filter, retries )
        struct = get_htmlstructure( c, parse_tags, parse_attrs,
                                    must_close_tags=None,
                                    keep_empty_tags=None, verbose=verbose )
        if not struct:
            r -= 1
            if cache_dir:
                # don't keep bogus cache
                cache_clearurl( url )
        else:
            return struct
    # while r > 0

    pe = ParseError( _( "%s could not be correctly parsed." ) % url )
    pe.contents = c
    raise pe
# get_urlparsed()


# --------------  Unit Tests  -------------- #
using_unittest2=False
try:
  import unittest2 as unittest
  using_unittest2=True
except:
  import unittest

class urlutils_UnitTest(unittest.TestCase):
  def test01(self): v=1; assert v == 1, v



class URL_Test( unittest.TestCase ):
    def test__repr__( self ):
        self.assertEqual( repr( URL( 'test.php' ) ),
                          '<URL address="test.php" get="" post="">' )
        self.assertEqual( repr( URL( 'test.php', { "a":1 } ) ),
                          '<URL address="test.php" get="a=1" post="">' )
        self.assertEqual( repr( URL( 'test.php', { "a":1 }, { "b":2 } ) ),
                          '<URL address="test.php" get="a=1" post="b=2">' )
    # test__repr__()


    def test__str__( self ):
        self.assertEqual( str( URL( 'test.php' ) ),
                          'test.php' )
        self.assertEqual( str( URL( 'test.php', { "a":1 } ) ),
                          'test.php?a=1' )
        self.assertEqual( str( URL( 'test.php', { "a":1 }, { "b":2 } ) ),
                          'test.php?a=1&__POST=b%3D2' )
        self.assertEqual( str( URL( 'test.php', post={ "b":2 } ) ),
                          'test.php?__POST=b%3D2' )
    # test__str__()


    def testgetURL( self ):
        self.assertEqual( URL( 'test.php' ).getURL(),
                          'test.php' )
        self.assertEqual( URL( 'test.php', { "a":1 } ).getURL(),
                          'test.php?a=1' )
        self.assertEqual( URL( 'test.php', { "a":1 }, { "b":2 } ).getURL(),
                          'test.php?a=1' )
    # testgetURL()


    def test__mod__( self ):
        d = { "getdata": "123", "postdata": "456", "url": "test" }
        
        a = URL( "test.php" ) % d
        self.assertEqual( str( a ), "test.php" )

        a = URL( "%(url)s.php3" ) % d
        self.assertEqual( str( a ), "test.php3" )

        a = URL( "%(url)s.php4",
                 { "gd": "%(getdata)s" },
                 { "pd": "%(postdata)s" } ) % d
        self.assertEqual( str( a ), "test.php4?gd=123&__POST=pd%3D456" )
    # test__mod__()
# URL_Test

if using_unittest2 or __name__ == '__main__':
  unittest.main()
# --------------  Unit Tests  -------------- #
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.