"""
$Id: mythtvutil.py,v 1.21 2007/04/07 12:51:13 frooby Exp $
Copyright (C) 2005 Tom Warkentin <tom@ixionstudios.com>
A utility library for python scripts written for the Xbox Media Center (XBMC).
"""
from xml.dom import minidom,Node
import codecs
import dircache
import os
import singleton
import smb
import sre
import stat
import string
import time
import traceback
import xbmc
import xbmcgui
import datetime
DEBUG_NONE = 0
DEBUG_MISC = 1
DEBUG_GUI = 2
DEBUG_DATABASE = 4
DEBUG_MYTH = 8
DEBUG_SKIN = 16
DEBUG_ALL = (DEBUG_MISC | DEBUG_GUI | DEBUG_DATABASE | \
DEBUG_MYTH | DEBUG_SKIN)
global __gDebugLevel
__gDebugLevel = 0
global clearedLog
clearedLog = False
global fh
def debugGetLevel():
return __gDebugLevel
def debugSetLevel( level ):
"""
Enables debug output.
"""
globals()["__gDebugLevel"] = level
initializeLog()
if level == 0:
debug( DEBUG_ALL, ">> Debugging OFF <<")
else:
debug( DEBUG_ALL, ">> Debug level set to %d <<"%level )
def debugOff():
"""
Disables debug output.
"""
debugSetLevel( 0 )
def initializeLog():
# if globals()["clearedLog"] == False:
try:
os.remove( getcwd() + os.sep + "xbmcmythtv.old.log" )
except:
pass
try:
os.rename( getcwd() + os.sep + "xbmcmythtv.log", getcwd() + os.sep + "xbmcmythtv.old.log" )
except:
pass
# globals()["clearedLog"] = True
def writeLog( strng ):
"""
Write strng to xbmcmythtv.log
"""
fh = file( getcwd() + os.sep + "xbmcmythtv.log", 'a' )
## Output free memory too to see where it's using it all
fh.write( "%s [%s] %s\n" %(datetime.datetime.now(),xbmc.getFreeMem(),strng))
fh.close()
def debug( level, strng ):
"""
String to be output if debug is enabled.
"""
if ( __gDebugLevel & level ) or ( level == DEBUG_ALL ):
writeLog( strng )
def copySMBFile( remotePath, localPath ):
rc = -1
remoteInfo = parseSMBPath( remotePath )
if not remoteInfo:
# not a SMB path
return rc
user,password,host,service,dirPath,fileName = remoteInfo
hostIp = singleton.getInstance( mythtv.NMBService ).gethostipbyname( host )
remote = smb.SMB(host,hostIp)
if remote.is_login_required():
remote.login( user, password )
remotePath = "%s%s" % (dirPath,fileName)
try:
remote.copyToLocal( service, remotePath, localPath )
rc = 0
except smb.SessionError, ex:
if ex[1] == 1 and ex[2] == 2:
# open file failed to retrieve preview screenshot
pass
except Exception, ex:
traceback.print_exc()
raise
return rc
global __gFileLocations
__gFileLocations = {}
def findMediaFile( filename ):
debug( DEBUG_SKIN, "> mythtvutil.findMediaFile()" )
retPath = None
if __gFileLocations.has_key( filename ):
retPath = __gFileLocations[filename];
else:
skinDir = getSkinDir()
filePath = skinDir + "media" + os.sep + filename
if os.path.exists( filePath ):
retPath = filePath
if not retPath:
filePath = getcwd() + os.sep + "skin" + os.sep +\
"shared" + os.sep + "media" + os.sep + filename
if os.path.exists( filePath ):
retPath = filePath
if retPath:
globals()['__gFileLocations'][filename] = retPath
debug( DEBUG_SKIN, "< mythtvutil.findMediaFile() => [%s]"%retPath )
return retPath
def findSkinFile( filename, width, height ):
"""
Function to find the appropriate skin file based on screen resolution.
Resolution Directories checked
======================= ===================
1920x1080 1080i, 720p, ntsc16x9, pal16x9, ntsc, pal, shared
1280x720 720p, ntsc16x9, pal16x9, ntsc, pal, shared
720x480 ntsc, pal, shared
otherwise pal, ntsc, shared
"""
debug( DEBUG_SKIN, "> mythtvutil.findSkinFile()" )
# figure out which directories to check
retDir = None
if __gFileLocations.has_key( filename ):
retDir = __gFileLocations[filename]
else:
dirsToCheck = []
if width == 1920 and height == 1080:
dirsToCheck = ['1080i', '720p', 'ntsc16x9', 'pal16x9', 'ntsc', 'pal']
elif width == 1280 and height == 720:
dirsToCheck = ['720p', 'ntsc16x9', 'pal16x9', 'ntsc', 'pal']
elif width == 720 and height == 480:
dirsToCheck = ['ntsc16x9','pal16x9','ntsc', 'pal']
# elif width == 720 and height == 576:
else:
dirsToCheck = ['pal', 'ntsc']
mainSkinDir = getSkinDir()
sharedSkinDir = getcwd() + os.sep + "skin" + os.sep + "shared" + os.sep
if mainSkinDir == sharedSkinDir:
doShared = False
else:
doShared = True
# check directories until the file is found once
# first check 'custom' skins, then 'shared'
debug(DEBUG_SKIN, "Checking %s exists" % mainSkinDir)
if os.path.exists( mainSkinDir ):
for dir in dirsToCheck:
skinDir = mainSkinDir + dir + os.sep + filename
debug( DEBUG_SKIN, "skinDir=[%s]"%skinDir )
if os.path.exists( skinDir ):
retDir = skinDir
break
if not retDir and doShared:
# have moved all 'shared' skins into similar directory structure
# so search those directories too
for dir in dirsToCheck:
skinDir = sharedSkinDir + dir + os.sep + filename
debug( DEBUG_SKIN, "skinDir=[%s]"%skinDir )
if os.path.exists( skinDir ):
retDir = skinDir
break
if retDir:
globals()['__gFileLocations'][filename] = retDir
else:
raise Exception, \
"Unable to find skin file '%s' in subdirs of '%s'."%\
(filename,getSkinDir())
debug( DEBUG_SKIN, "< mythtvutil.findSkinFile()" )
return retDir
def loadFile( fileName, width, height ):
debug(
DEBUG_SKIN,
"> mythtvutil.loadFile( fileName=[%s], width=[%d], height=[%d] )"%(fileName, width, height ) )
s = ""
f = file( fileName )
for l in f.readlines():
m = sre.match( '^#include (.*)$', l )
if m:
incFile = m.group(1)
if sre.match( '^\w+\.xml$', incFile ):
# need to find skin file
incFile = findSkinFile( incFile, width, height )
elif sre.match( '\%SKINDIR\%', incFile ):
incFile = string.replace(
incFile, "%SKINDIR%", getSkinDir() )
else:
# convert path separators to proper path separator - just in case
incFile = string.replace( incFile, "/", os.sep )
incFile = string.replace( incFile, "\\", os.sep )
# assume relative path provided
path = os.path.dirname( fileName )
path += os.sep + incFile
incFile = path
s += loadFile( incFile, width, height )
else:
s += l
f.close()
debug( DEBUG_SKIN, "< mythtvutil.loadFile( fileName=[%s] )"%fileName )
return s
def loadSkin( skinName, width, height ):
"""
Function to load the specified skin for the specified resolution. When a
file is loaded, it is checked for include statements. An attempt is made
to resolve all includes until no more includes are left.
On success, the function returns a string containing the skin XML with all
include statements resolved.
On failure, the function returns None.
"""
debug( DEBUG_SKIN, "> mythtvutil.loadSkin()" )
skinXml = ""
fileName = findSkinFile( skinName, width, height )
skinXml = loadFile( fileName, width, height )
debug( DEBUG_SKIN, "< mythtvutil.loadSkin()" )
return skinXml
def getcwd():
"""
Function to retrieve current working directory. os.getcwd() in XBMC
returns a path with ';' appended. This is a temporary workaround.
If os.getcwd() changes, this function can be removed and a direct import
of getcwd can be performed.
"""
d = string.replace( os.getcwd(), ";", "" )
return d
def getLocalizedString( id ):
"""
Function to return a localized string given an id. This is provided so
that the XBMC strings.xml file does not need to be modified to allocate
strings (reduces work required to upgrade XBMC).
"""
return singleton.getInstance( LocalizedStrings ).getString( id )
def getSkinDir():
"""
Function to return the path to the skin directory. This will return
something like:
./skin/Project Mayhem/
Note: This is different than what xbmc.getSkinDir() returns.
"""
global skinDir
skinDir = getcwd() + os.sep + "skin" + os.sep + \
string.lower(xbmc.getSkinDir()) + os.sep
try:
s = os.stat( skinDir )
except OSError, ex:
debug( DEBUG_SKIN, "*** WARNING: Skin Directory for %s NOT found under xbmcmythtv/skins - Using default skin" % str(xbmc.getSkinDir()) )
skinDir = getcwd() + os.sep + "skin" + os.sep + \
"shared" + os.sep
try:
s = os.stat( skinDir )
except OSError, ex:
raise Exception, "Default skin fallback failed."
return skinDir
def initialize():
"""
Initialize utility module. Should be called on startup before calling any
other methods.
"""
globals()["__gFileLocations"] = {}
# initializeLog()
writeLog(">> XBMCMythTV Started <<")
def parseSMBPath(path):
m = sre.match('^smb://(\w+):([^@]+)@([^/]+)/([^/]+)(/?.*?)([^/]*)$', path)
if m:
return m.groups()
m = sre.match('^smb://([^/]+)/([^/]+)(/?.*?)([^/]*)$', path)
if m:
return '', '', m.group(1), m.group(2), m.group(3), m.group(4)
return None
class FileCache( object ):
def __init__( self, path, numDays=120 ):
self.cachePath = path
self.numDays = numDays
self.checkOrCreateDir( self.cachePath )
self.deleteOldFiles( self.cachePath, self.numDays )
def buildPath( self, obj ):
raise NotImplementedError, "Abstract base class"
def checkOrCreateDir( self, path ):
debug( DEBUG_MISC, "> mythtvutil.FileCache.checkOrCreateDir()" )
if not os.path.exists( path ):
os.mkdir( path )
debug( DEBUG_MISC, "< mythtvutil.FileCache.checkOrCreateDir()" )
def deleteOldFiles( self, path, numDays=120 ):
debug( DEBUG_MISC, "> mythtvutil.FileCache.deleteOldFiles()" )
if numDays > 0:
daysSecs = numDays * (60*60*24)
t = time.time()
files = dircache.listdir( path )
for f in files:
filePath = path + os.sep + f
try:
s = os.stat( filePath )
if s[9] < t - daysSecs:
os.remove( filePath )
debug( DEBUG_MISC, "delete file [%s]"%filePath )
except OSError:
traceback.print_exc()
debug( DEBUG_MISC, "< mythtvutil.FileCache.deleteOldFiles()" )
def findFile( self, obj, theHost ):
debug( DEBUG_MISC, "> mythtvutil.FileCache.findFile( obj=[%s])"%obj )
file = self.buildPath( obj )
if file and not os.path.exists( file ):
file = self.storeFile( obj , theHost )
debug( DEBUG_MISC, "< mythtvutil.FileCache.findFile() => [%s]"%file )
return file
def storeFile( self, obj ):
raise NotImplementedError, "Abstract base class"
class LocalizedStrings( object ):
def __init__( self ):
self.strings = {}
self.loadStrings()
def loadStrings( self ):
# Determine codec for GUI so that loaded messages can be encoded for
# GUI immediately
langInfo = singleton.getInstance( XBMCLangInfo )
language = langInfo.getSetting( 'language.charsets.gui' )
debug( DEBUG_MISC, "language=[%s]"%language )
(e,d,r,w) = codecs.lookup( language )
# Build language string file name. If the file does not exist, default
# to 'english'.
languageDir = getcwd() + os.sep + 'language' + os.sep
lang = string.lower( xbmc.getLanguage() )
langFile = languageDir + lang + os.sep + 'strings.xml'
if not os.path.exists( langFile ) and lang != "english":
langFile = languageDir + "english" + os.sep + "strings.xml"
debug( DEBUG_SKIN, "langFile=[%s]"%langFile )
# parse the string file
dom = minidom.parse( langFile )
# load up localized string hash from file
for n in dom.getElementsByTagName( "string" ):
strId = None
strValue = None
# assume one and only one id node
tmpNode = n.getElementsByTagName( "id" )[0]
for m in tmpNode.childNodes:
if m.nodeType == Node.TEXT_NODE:
strId = int( m.nodeValue )
# assume one and only one value node
tmpNode = n.getElementsByTagName( "value" )[0]
for m in tmpNode.childNodes:
if m.nodeType == Node.TEXT_NODE:
strValue = m.nodeValue
# only add it if an id has been specified
if strId >= 0:
debug( DEBUG_SKIN, "adding strId=[%s]"%strId )
(val,num) = e( strValue )
self.strings[strId] = val
# free up dom tree
dom.unlink()
del dom
def getString( self, id ):
debug( DEBUG_SKIN, "> LocalizedStrings.getString( %d )"%(id) )
retStr = None
try:
retStr = self.strings[id]
except:
retStr = "<Undefined>"
pass
debug( DEBUG_SKIN, "< LocalizedStrings.getString( %d ) => [%s]"%(id,retStr) )
return retStr
class XBMCSettings( object ):
def __init__( self, filePath=None ):
debug( DEBUG_MISC, "> mythtvutil.XBMCSettings [%d]"%id(self) )
self.domTree = None
self.tagsSeen = {}
self.loadSettings( filePath )
debug( DEBUG_MISC, "< mythtvutil.XBMCSettings [%d]"%id(self) )
def getSetting( self, tag ):
debug( DEBUG_MISC, "> mythtvutil.XBMCSettings.getSetting( tag=[%s] )"%(tag) )
value = None
if not self.tagsSeen.has_key( tag ):
path = tag.split( '.' )
dom = self.domTree
while len( path ) > 0:
search = path[0]
path.remove( search )
debug( DEBUG_MISC, "search=[%s]"%search )
dom = dom.getElementsByTagName( search )[0]
if dom:
value = ""
for n in dom.childNodes:
value += n.nodeValue
debug( DEBUG_MISC, "value=[%s]"%value )
self.tagsSeen[tag] = value
else:
value = self.tagsSeen[tag]
debug( DEBUG_MISC, "< mythtvutil.XBMCSettings.getSetting( tag=[%s] )"%(tag) )
return value
def loadSettings( self, filePath ):
debug( DEBUG_MISC, "> mythtvutil.XBMCSettings.loadSettings( filePath=[%s] )"%filePath )
if not filePath:
if os.name != "nt":
filePath = '.' + os.sep + 'test' + os.sep + 'settings.xml'
else:
filePath='E:' + os.sep + 'TDATA' + os.sep + '0face008' + os.sep + 'settings.xml'
self.domTree = minidom.parse( filePath )
debug( DEBUG_MISC, "< mythtvutil.XBMCSettings.loadSettings( filePath=[%s] )"%filePath )
class XBMCLangInfo( XBMCSettings ):
def __init__( self ):
language = xbmc.getLanguage()
filePath = ""
if os.name != "nt":
filePath = '.' + os.sep + 'test' + os.sep
filePath += 'language' + os.sep + language + os.sep + \
'langinfo.xml'
else:
relPath = "language" + os.sep + language + os.sep + "langinfo.xml"
filePath = sre.sub( 'scripts.*$', relPath, getcwd() )
XBMCSettings.__init__( self, filePath )
if __name__ == "__main__":
import unittest
import xbmcgui
debugSetLevel(
DEBUG_MISC | DEBUG_GUI | DEBUG_MYTH )
#mythtvutil.DEBUG_ALL )
initialize()
class PublicInterfaceTest( unittest.TestCase ):
def testLocalizedString( self ):
v = getLocalizedString( 0 )
self.failIf( v != 'Myth TV' )
def testXBMCSettings( self ):
s = singleton.getInstance( XBMCSettings )
v = s.getSetting( "settings.LookAndFeel.Language" )
self.failIf( v != 'english' )
def testXBMCLangInfo( self ):
s = singleton.getInstance( XBMCLangInfo )
v = s.getSetting( "language.charsets.gui" )
self.failIf( v != 'CP1252' )
#xbmcgui.Dialog().ok( "Info", "You might want to run mythtvmain.py instead." )
unittest.main()
|