#!/usr/bin/env python
# -----------------------------------------------------------------------
# Copyright (C) 2003 Chris Ottrey.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MER-
# CHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
# -----------------------------------------------------------------------
#
# This code is part of the pytvgrab project:
# http://pytvgrab.sourceforge.net
#
# -----------------------------------------------------------------------
# Subversion Information, do not edit
#
# $Rev: 291 $
# $LastChangedDate: 2004-11-21 23:37:37 +1100 (Sun, 21 Nov 2004) $
# $LastChangedRevision: 291 $
# $LastChangedBy: ottrey $
#
# $Log: $
#
import sys
import os
import urllib
import string
import copy
from cPickle import load,dump
import re
import output
from output import red,green,blue,turquoise,yellow,purple,bold,tabulate
import config_locale
from config_locale import toprint
import message
import urlutils
from grabexceptions import *
from prog import Prog,ProgException,Option
from datetime2 import Date,Time,DateTime,today,nowdt
import xmltv
from xmltv import XMLTV,Element
import unittest2
import i18n
DateTime.output_fmt='%Y%m%d%H%M %z'
class Grab(Prog):
"""Base class for writing a tv_grabber."""
__author__ = 'Chris Ottrey, ottrey at users dot sf dot net'
__bugreport_url__ = 'http://sourceforge.net/tracker/?func=add&group_id=87433&atid=583152'
__sf_mirror__ = 'belnet' # preferred sourceforge mirror to download the source from,
# possibly to be used when creating RPMs?
_url_date_fmt = None # Set this in your grabber to be the date format
# required when grabbing dates from the source
__lib_version__ = None # Override this with the MAJOR.MINOR.* version
# of your tvgrab library dependency.
__conf_version__ = None # Override this with the config file version your grabber uses.
# set this in your grabber; either hard code it or set it from the conf.
channels={}
# set char set for your guide:
page_charset = "utf-8"
statistics = {
'channels' : 0,
'programs' : 0,
'info' : 0,
}
re_clean = [
## Example:
# re.compile( "<head *[^>]*>.*<\/head>", re.I | re.S | re.M ),
]
re_replace = [
## Example:
# ( re.compile( "( )" ), " " ),
]
parse_tags = [ "html", "body", "table", "tr", "td", "a", "input" ]
parse_attrs = [ "href", "colspan", "cols", "name", "value" ]
keep_empty_tags = [ "input" ]
def __init__(self, argv):
try:
self._init(argv)
except SystemExit:
pass
except KeyboardInterrupt:
sys.stderr.write( toprint( _( """\
User required program abort using Keyboard!
No xmltv guide was recorded!
""" ) ) )
sys.exit(1)
except Exception, e:
tb='%s_%s.tb' % (self.name, nowdt('%Y%m%d%H%M%S'))
message.error( _( """\
You have encountered a bug!
%s
The details have been saved in the file:
%s
Please use it to submit a bug report to:
%s
""" ) % ( yellow( str( e ) ), tb, bold( self.__bugreport_url__ ) ) )
tb_f=open(tb, 'w')
unittest2.print_exc_plus(file=tb_f)
tb_f.close()
sys.exit(1)
# __init__()
def _init(self, argv):
self.xmltv=XMLTV(self.getDescription())
# Start from today's date unless the --startdate option was set.
if not self.start_date:
self.start_date=today(self._url_date_fmt)
Prog.__init__(self, argv)
if self.xmltv:
self.xmltv.write(self.output)
message.info( _( 'Started at %s' ) % self._time_start.strftime('%c'))
message.info( _( 'Finished at %s' ) % nowdt('%c'))
elapsed_time=self.getElapsedTime()
message.info( _( 'Time taken %02d:%02d' ) %
(int(elapsed_time/60), (elapsed_time % 60)))
message.info( _( "Statistics (DEPRECATING): channels=%(channels)d, programs=%(programs)d, information=%(info)d" ) % self.statistics )
# _init()
def getDescription(self):
result=_( 'TV Grab' )
try:
result+=' '+self.__region_name__
result+='; ' + _( 'source' ) +' - '+self.__source_url__
except:
pass
return result
# getDescription()
# ---- Command line option handlers ---- #
output = None
def setOutput(self, a):
self.output=a
# setOutput()
days = 7
def setDays(self, a):
try:
self.days=int(a)
except:
raise ProgException(_('Invalid value for days: %s') % repr(a))
# setDays()
offset = 0
def setOffset(self, a):
try:
self.offset=int(a)
except:
raise ProgException(_('Invalid value for offset: %s') % repr(a))
# setOffset()
message.verbose=message.verbose_level.INFO # This is the default level for the grabber
def setVerbose(self):
message.verbose=message.verbose+1
# setVerbose()
def setQuiet(self):
message.verbose=message.verbose_level.QUIET
# setQuiet()
urlutils.cache_dir=None
def setCacheDir(self, a):
urlutils.cache_dir=a
# setUseCache()
start_date=None
def setStartDate(self, a):
self.start_date=Date(a, output_fmt=self._url_date_fmt)
# setStartDate()
def noColor( self ):
output.nocolor()
# noColor()
def noTitles( self ):
output.notitles()
# noTitles()
# ---- Command line option handlers ---- #
# ---- Command line options ---- #
options=Prog.options+[
Option(setOutput, 'o', 'output',
_('output to %s rather than stdout' ) + ' ' +
( _( '(default: %s)' ) % "stdout" ) , 'FILE' ),
Option(setDays, 'n', 'days',
_('get listing for the next %s days' ) + ' ' +
( _( '(default: %s)') % days ), 'N' ),
Option(setOffset, 'f', 'offset',
_('skip %s days of listings (from today)' ) + ' ' +
( _( '(default: %s)' ) % offset ), 'N' ),
Option(setQuiet, 'q', 'quiet',
_('suppress progress messages normally written to stderr') ),
Option(setVerbose, 'v', 'verbose',
_('increase progress messages written to stderr ' \
'(NB. -vv for more!)') ),
Option(setCacheDir, 'C', 'cachedir',
_('use cache dir %s' ) + ' ' +
( _( '(default: %s)' ) % "None" ), 'DIR' ),
Option(setStartDate, 's', 'startdate',
_('use %s as the starting date (use DD/MM/YYYY format)' ) + ' ' +
( _( '(default: %s)' ) % "today's date)" ), 'STARTDATE' ),
Option(noColor, 'N', 'nocolor',
_('do not use colored output.') ),
Option(noTitles, 'T', 'notitles',
_('do not change xterm titles.') ),
]
# ---- Command line options ---- #
def check_library(self):
"""Check to see if tvgrab library installed is the correct "release series" as required."""
if self.__lib_version__:
import __init__
lib_version=__init__.__version__
ver=lib_version[:lib_version.rfind('.')] # eg. just interested in 0.4 not 0.4.0
gver=self.__lib_version__[:self.__lib_version__.rfind('.')] # eg. just interested in 0.4 not 0.4.0
if ver != gver:
sys.stderr.write(_( """\
Incorrect tvgrab library is installed.
You have version : %s
You require version : %s
It is available from:
http://sourceforge.net/project/showfiles.php?group_id=87433&package_id=101497
""" ) % (lib_version, self.__lib_version__))
sys.exit(1)
# check_library()
def main(self):
self.check_library()
self.failures=0 # counts the number of guide extraction failures
# to generate filenames for testcase failure files.
for d in self.start_date.range(self.offset, self.offset+self.days):
self.grab_day(d)
# main()
def get_source_params(self, d):
result={
'datestr' : d,
'channel' : channel,
}
return result
# get_source_params()
def extract_programmes(self, buf):
""" This is the re2 method of programme extraction """
result=None
try:
guide=self._pat_guide.extract(buf)
result=guide.programme
except:
pass
return result
# extract_programmes()
def updateXmltv(self, date, channel, programmes):
""" This is the re2 method of updating the xmltv """
self.xmltv.addChannel(self.channels[channel], channel)
if type(programmes) != list:
# Doh ... sometimes theres only 1 programme in the list
programmes=[programmes]
for programme in programmes:
programme.start+=date
programme.channel=self.channels[channel]
self.xmltv.addProgram(programme.start, programme.channel, programme.title, info=programme.info)
# updateXmltv()
def clear_html( self, contents, unicode_error="replace" ):
"""
Clear unwanted tags, attributes and whatever is defined as a
compiled Regular Expression in re_clean and re_replace.
Converts to unicode if page_charset is set, using unicode error policy
defined by 'unicode_error'.
"""
if self.page_charset:
contents = unicode( contents, self.page_charset, unicode_error )
for clean in self.re_clean:
contents = clean.sub( " ", contents )
for repl in self.re_replace:
contents = repl[ 0 ].sub( repl[ 1 ], contents )
return contents
# clear_html()
###########################################################################
###########################################################################
## Utility functions, change them to fit your needs ##
###########################################################################
###########################################################################
def get_htmlstruct( self, url ):
"""Get HTML Struct for a given URL or dump struct and exit.
Get the given URL, try to parse it, if fails dump the struct to a
file and exit the program.
"""
try:
gup = urlutils.get_urlparsed
html_struct = gup( url,
filter = self.clear_html,
parse_tags = self.parse_tags,
parse_attrs = self.parse_attrs,
keep_empty_tags = self.keep_empty_tags )
return html_struct
except ParseError, e:
message.exception( e )
message.error( _( "Aborting..." ) )
self.dump_file( e.contents )
sys.exit( -1 )
# get_htmlstruct()
def dump_structure( self, html_struct, outfile=None ):
"""Dump HTML structure to outfile."""
if html_struct:
html_struct = copy.copy( html_struct )
html_struct.verbose=1
self.dump_file( str( html_struct ), outfile )
else:
message.error( _( "HTML structure was empty!" ) )
# dump_structure()
def dump_file( self, contents, outfile=None ):
"""Dump HTML to outfile."""
if isinstance( outfile, str ) or isinstance( outfile, unicode ):
outfile = open( outfile, "w" )
if not outfile or not isinstance( outfile, file ):
import tempfile
import sys
name = sys.argv[ 0 ] or "tvgrab"
outfile = tempfile.mktemp( ".html", "%s-" % name )
outfile = open( outfile, "w" )
outfile.write( contents )
message.warning( _( "HTML contents dumped to file '%s'\n" ) %
outfile.name )
outfile.close()
# dump_file()
def get_id( self, name ):
"""
return an ID for xmltv based on the channel name or user chosen.
"""
try:
return self.get_chan_by_name( name )['id']
except ( KeyError, TypeError ):
return name
# get_id()
def grab_day(self, d):
# XXX-GUSTAVO: I think this is broken!!!
# XXX-ottrey - works fine for the re2 implementation of extract_programmes
for channel in self.channels.keys():
params=self.get_source_params(d, channel)
url=self.url['programmes'] % params
buf=urlutils.get_urlcontents(url, self.clear_html)
if buf:
programmes=self.extract_programmes(buf)
if programmes:
message.moreinfo(_('Extracted programmes:\n%s') % programmes)
self.updateXmltv(d, channel, programmes)
else:
message.error(_('Failed to extract programmes for:\n %s') % url)
message.error(_('The grabber needs to be updated to handle this testcase.'))
self.failures+=1
failure_testcase='%s_fail_%02d.html' % (self.name, self.failures)
message.error(_('Saving testcase as %s.') % repr(failure_testcase))
message.error(_("""\
Please use it to submit a bug report to:
%s""") % self.__bugreport_url__)
f=open(failure_testcase, 'w')
f.write(buf)
f.close()
# now just keep trying .. then we get more failures to work with. ;-)
# grab_day()
# Grab
class Grab_C(Grab):
"""Base class for writing a configurable tv_grabber."""
def __init__(self, argv):
fname = "%s.conf" % os.path.basename( sys.argv[ 0 ] )
pname = os.path.basename( sys.argv[ 0 ] )
if os.name == "posix":
f = os.path.join( os.environ[ "HOME" ], ".xmltv", fname )
else:
f = os.path.join( os.path.sep, fname )
self.set_config_file( f )
self.ran_config=False
self.read_conf()
self.reloading = False
Grab.__init__(self, argv)
# __init__()
def get_channels(self):
"""Returns a list of tuples representing the channel:
( 'channel name', 'url', 'id' )
Where ID is optional, but must be returned if configured by user.
"""
raise NotImplementedError, ( _( "Not Implemented Yet" ) + ": " + \
_( "get_channels(). Each Grab_C grabber " \
"must implement this method." ) )
# get_channels()
def read_conf(self):
self.conf={}
if os.path.isfile(self.config_file):
f=open(self.config_file)
self.conf = load(f)
conf_version=0
if type(self.conf) == dict:
conf_version=self.conf.get('conf_version', 0)
if conf_version != self.__conf_version__:
message.error( _("""\
You have an old version of the config file.
%(config_file)s needs regenerating for this grabber to work.
Please remove it and try again.""") % {'config_file': repr(self.config_file)})
sys.exit(1)
else:
question=_('Enter location of config file (default=%s): ') % repr(self.config_file)
sys.stdout.write( toprint( question ) )
answer=string.strip(sys.stdin.readline())
if answer:
self.config_file=answer
self.config()
self.conf['conf_version']=self.__conf_version__
self.ran_config=True
self.write_conf()
# read_conf()
def ask(self, question, valid_answers=None, default=None,
case_sensitive=False ):
"""Ask a question of the user and return result."""
def ask_it():
q = question + ' \t'
if valid_answers:
q += '['
if isinstance( valid_answers, dict ):
va = []
for answer, shortcut in valid_answers.iteritems():
va.append( '%s(%s)' % ( answer, shortcut ) )
else:
va = valid_answers
q += ', '.join( va )
if default:
if default == _('yes'):
colored_default= green( _('yes') )
else:
colored_default= red( _('no') )
q += bold( _(' (default=%s)') ) % colored_default
q += ']'
# end if valid_answers
q += ' '
answer = raw_input( toprint( q ) ).strip()
if not case_sensitive:
answer = answer.lower()
if not answer and default:
return default
else:
return answer
# ask_it()
if valid_answers:
answer = None
if isinstance( valid_answers, dict ):
va = list( valid_answers.keys() ) + list( valid_answers.values() )
else:
va = valid_answers
while answer not in va:
answer = ask_it()
if isinstance( valid_answers, dict ):
# lookup key
for k, v in valid_answers.iteritems():
if v == answer:
answer = k
return answer
else:
return ask_it()
#ask()
def write_conf(self):
dir=os.path.dirname(self.config_file)
if dir and not os.path.isdir(dir):
os.mkdir(dir)
f=open(self.config_file, 'w')
dump(self.conf, f)
f.close()
# write_conf()
def config(self):
"""
Handles making a config file (mainly asking if a channel is wanted or not)
"""
channels = self.get_conf_channels()
pname = os.path.basename( sys.argv[ 0 ] )
output.xtermTitle( _( "configuring %s" ) % pname )
cs = channels.keys()
cs.sort()
answer = None
for cname in cs:
channel = self.get_chan_by_name( cname )
question = u" " + _( "Add channel %s?" ) % blue( cname )
if answer == _('all'):
print toprint( "%s %s" % ( question, green( _('yes') ) ) )
elif answer == _('none'):
print toprint( "%s %s" % ( question, red( _('no') ) ) )
else:
valid_answers={ _('yes'): _('y'),
_('no'): _('n'),
_('all'): _('a'),
_('none'): _('o') }
if channel[ 'grab_status' ] == 'no':
default = _('no')
else:
default=_('yes')
answer = self.ask( question, valid_answers, default )
if answer in [ _('yes'), _('all') ]:
grab_status = 'yes'
else:
grab_status = 'no'
channel[ 'grab_status' ] = grab_status
# for cname in cs
# config()
def configure(self):
if not self.ran_config:
self.config()
self.ran_config=True
self.write_conf()
sys.exit()
# configure()
def set_config_file( self, a ):
self.config_file = a
# set_config_file()
def change_channel( self, id, grab_status ):
channel = self.get_chan_by_id( id )
if channel:
channel[ 'grab_status' ] = grab_status
else:
message.error( _( "Channel ID '%s' doesn't exists!" ) % id )
self.write_conf()
sys.exit()
# change_channel()
def add_channel( self, id ):
self.change_channel( id, grab_status='yes' )
# addChanne()
def del_channel( self, id ):
self.change_channel( id, grab_status='no' )
# del_channel()
def get_chan_by_id( self, id ):
"""Get channel config structure by its id."""
channels = self.get_conf_channels()
for c in channels.itervalues():
if c[ "id" ] == id:
return c
# get_chan_by_id()
def get_chan_by_name( self, name ):
"""Get channel config structure by its name."""
channels = self.get_conf_channels()
return channels.get( name, None )
# get_chan_by_name()
def list_channels(self):
channels=self.get_conf_channels().values()
channels.sort()
for channel in channels:
s=channel['grab_status']
if s == 'no':
channel['grab_status']=red(_(s))
elif s == 'yes':
channel['grab_status']=green(_(s))
else:
raise Exception('unknown grab_status')
heading=[('grab_status',_('GRAB')), ('id',_('ID')), ('name',_('NAME'))]
print tabulate(heading, channels)
sys.exit()
# list_channels()
def change_channel_id( self, a ):
old_id, new_id = a.split( ':' )
channel = self.get_chan_by_id( old_id )
if channel:
check_new_id = self.get_chan_by_id( new_id )
if check_new_id:
message.error( _( "Channel ID '%s' was already present with channel " \
"name: %s" ) % ( new_id, check_new_id[ "name" ] ) )
else:
channel[ "id" ] = new_id
self.write_conf()
sys.exit()
# change_channel_id()
def change_all_channels_grab_status( self, grab_status ):
channels = self.get_conf_channels()
for channel in channels.itervalues():
channel[ 'grab_status' ] = grab_status
self.write_conf()
sys.exit()
# change_all_channels_grab_status()
def add_all_channels( self ):
self.change_all_channels_grab_status( grab_status='yes' )
# add_all_channels()
def del_all_channels(self):
self.change_all_channels_grab_status(grab_status='no')
# del_all_channels()
def get_conf_channels( self ):
if not self.conf.has_key( "channels" ) or not self.conf[ "channels" ] \
and not self.reloading:
self._reload_channels()
return self.conf[ "channels" ]
# get_conf_channels()
def _reload_channels(self):
def assemble_channel( c ):
"""Assemble a channel structure based on get_channel() list.
get_channel() must return a list of tuples representing the channel:
( 'channel name', 'channel url', 'channel id' )
where 'channel id' is optional and if omitted get_id( 'channel name' )
will be used.
"""
if len( c ) == 2:
cname, curl = c
cid = self.get_id( cname )
else:
cname, curl, cid = c
return { cname: {
'name': cname,
'url': curl,
'id': cid,
'grab_status': grab_status } }
# assemble_channel()
self.reloading = True
message.info('reloading channels from source...')
self.conf['channels']={}
grab_status = 'yes'
map( self.conf['channels'].update,
map( assemble_channel, self.get_channels() ) )
self.write_conf()
# _reload_channels()
def reload_channels(self):
self._reload_channels()
sys.exit()
# reload_channels()
xmltv.TOFFSET_MINUTES=0
def set_toffset_minutes(self, a):
xmltv.TOFFSET_MINUTES=int(a)
# set_toffset_minutes()
xmltv.OUTPUT_ENCODING='UTF-8'
def set_xml_encoding(self, a):
xmltv.OUTPUT_ENCODING=a
# set_xml_encoding()
options=Grab.options+[]
options.insert(2,
Option(configure,
None, 'configure' , _('prompt for configuration items') ),
)
options.insert(3,
Option(set_config_file,
'c', 'config-file' , _('use %s as config file') , 'FILE' ),
)
options.insert(4,
Option(add_channel,
'a', 'add' , _('add channel with %s') , 'ID' ),
)
options.insert(5,
Option(del_channel,
'd', 'del' , _('remove channel with %s') , 'ID' ),
)
options.insert(6,
Option(list_channels,
'l', 'list' , _('list channels') ),
)
options.insert(7,
Option(change_channel_id,
'g', 'change' , _('change channel ID from %s') , 'old_id:new_id'),
)
options.insert(8,
Option(add_all_channels,
'A', 'addall' , _('add all channels') ),
)
options.insert(9,
Option(del_all_channels,
'D', 'delall' , _('remove all channels') ),
)
options.insert(10,
Option(reload_channels,
'R', 'reload' , _('reload all channels from source') ),
)
options.append(
Option(set_toffset_minutes, 't', 'toffset',
_('%s to timeshift all start and end times') , _('MINUTES') ),
)
options.append(
Option(set_xml_encoding, None, 'xml_encoding',
_('%%s to be used in the xml output (default: %(encoding)s)') % {'encoding': xmltv.OUTPUT_ENCODING } ,
_('ENCODING') ),
)
# Grab_C
# -------------- Unit Tests -------------- #
using_unittest2=False
try:
import unittest2 as unittest
using_unittest2=True
except:
import unittest
class Grab_UnitTest(unittest.TestCase):
def setUp(self):
class T(Grab): message.verbose=message.verbose_level.ERROR
devnull='devnull'
if os.name == 'posix':
devnull='/dev/null'
else:
# What's a good place to send junk to on non-posix systems?
pass
self.t1=T(['t1', '-o%s' % devnull, '-n1' ])
self.t2=T(['t2', '-o%s' % devnull, '-f2' ])
self.t3=T(['t3', '-o%s' % devnull, '-n3', '-f4' ])
self.t4=T(['t4', '-o%s' % devnull, '-f5', '-n6' ])
# setUp()
def test01(self): v=self.t1.name ; assert v == 't1', v
def test02(self): v=self.t1.args ; assert v == [], v
def test03(self): v=self.t1.days ; assert v == 1, v
def test04(self): v=self.t2.name ; assert v == 't2', v
def test05(self): v=self.t2.args ; assert v == [], v
def test06(self): v=self.t2.offset; assert v == 2, v
def test07(self): v=self.t3.name ; assert v == 't3', v
def test08(self): v=self.t3.args ; assert v == [], v
def test09(self): v=self.t3.days ; assert v == 3, v
def test10(self): v=self.t3.offset; assert v == 4, v
def test11(self): v=self.t4.name ; assert v == 't4', v
def test12(self): v=self.t4.args ; assert v == [], v
def test13(self): v=self.t4.days ; assert v == 6, v
def test14(self): v=self.t4.offset; assert v == 5, v
#def test15(self):
# print Grab_C.ask(None, 'What do you want?' )
# print Grab_C.ask(None, 'a or b?', ( 'a', 'b' ) )
# print Grab_C.ask(None, '[A/b]', ( 'a', 'b' ), 'a' )
# print Grab_C.ask(None, 'Add channel ONE?', ( 'yes', 'no', 'all', 'none' ), 'yes' )
# print Grab_C.ask(None, 'Shortcuts?', { 'yes': 'y', 'no': 'n', 'all': 'a', 'none': 'o' }, 'yes' )
## test15()
if using_unittest2 or __name__ == '__main__':
unittest.main()
# -------------- Unit Tests -------------- #
|