import os
import sys
import imp
from glob import glob
__all__ = ['PackageLoader']
class PackageLoader:
def __init__(self, verbose=False, infunc=False):
""" Manages loading packages.
"""
if infunc:
_level = 2
else:
_level = 1
self.parent_frame = frame = sys._getframe(_level)
self.parent_name = eval('__name__',frame.f_globals,frame.f_locals)
parent_path = eval('__path__',frame.f_globals,frame.f_locals)
if isinstance(parent_path, str):
parent_path = [parent_path]
self.parent_path = parent_path
if '__all__' not in frame.f_locals:
exec('__all__ = []',frame.f_globals,frame.f_locals)
self.parent_export_names = eval('__all__',frame.f_globals,frame.f_locals)
self.info_modules = {}
self.imported_packages = []
self.verbose = None
def _get_info_files(self, package_dir, parent_path, parent_package=None):
""" Return list of (package name,info.py file) from parent_path subdirectories.
"""
from glob import glob
files = glob(os.path.join(parent_path,package_dir,'info.py'))
for info_file in glob(os.path.join(parent_path,package_dir,'info.pyc')):
if info_file[:-1] not in files:
files.append(info_file)
info_files = []
for info_file in files:
package_name = os.path.dirname(info_file[len(parent_path)+1:])\
.replace(os.sep,'.')
if parent_package:
package_name = parent_package + '.' + package_name
info_files.append((package_name,info_file))
info_files.extend(self._get_info_files('*',
os.path.dirname(info_file),
package_name))
return info_files
def _init_info_modules(self, packages=None):
"""Initialize info_modules = {<package_name>: <package info.py module>}.
"""
import imp
info_files = []
info_modules = self.info_modules
if packages is None:
for path in self.parent_path:
info_files.extend(self._get_info_files('*',path))
else:
for package_name in packages:
package_dir = os.path.join(*package_name.split('.'))
for path in self.parent_path:
names_files = self._get_info_files(package_dir, path)
if names_files:
info_files.extend(names_files)
break
else:
try:
exec 'import %s.info as info' % (package_name)
info_modules[package_name] = info
except ImportError, msg:
self.warn('No scipy-style subpackage %r found in %s. '\
'Ignoring: %s'\
% (package_name,':'.join(self.parent_path), msg))
for package_name,info_file in info_files:
if package_name in info_modules:
continue
fullname = self.parent_name +'.'+ package_name
if info_file[-1]=='c':
filedescriptor = ('.pyc','rb',2)
else:
filedescriptor = ('.py','U',1)
try:
info_module = imp.load_module(fullname+'.info',
open(info_file,filedescriptor[1]),
info_file,
filedescriptor)
except Exception,msg:
self.error(msg)
info_module = None
if info_module is None or getattr(info_module,'ignore',False):
info_modules.pop(package_name,None)
else:
self._init_info_modules(getattr(info_module,'depends',[]))
info_modules[package_name] = info_module
return
def _get_sorted_names(self):
""" Return package names sorted in the order as they should be
imported due to dependence relations between packages.
"""
depend_dict = {}
for name,info_module in self.info_modules.items():
depend_dict[name] = getattr(info_module,'depends',[])
package_names = []
for name in depend_dict.keys():
if not depend_dict[name]:
package_names.append(name)
del depend_dict[name]
while depend_dict:
for name, lst in depend_dict.items():
new_lst = [n for n in lst if n in depend_dict]
if not new_lst:
package_names.append(name)
del depend_dict[name]
else:
depend_dict[name] = new_lst
return package_names
def __call__(self,*packages, **options):
"""Load one or more packages into parent package top-level namespace.
This function is intended to shorten the need to import many
subpackages, say of scipy, constantly with statements such as
import scipy.linalg, scipy.fftpack, scipy.etc...
Instead, you can say:
import scipy
scipy.pkgload('linalg','fftpack',...)
or
scipy.pkgload()
to load all of them in one call.
If a name which doesn't exist in scipy's namespace is
given, a warning is shown.
Parameters
----------
*packges : arg-tuple
the names (one or more strings) of all the modules one
wishes to load into the top-level namespace.
verbose= : integer
verbosity level [default: -1].
verbose=-1 will suspend also warnings.
force= : bool
when True, force reloading loaded packages [default: False].
postpone= : bool
when True, don't load packages [default: False]
"""
frame = self.parent_frame
self.info_modules = {}
if options.get('force',False):
self.imported_packages = []
self.verbose = verbose = options.get('verbose',-1)
postpone = options.get('postpone',None)
self._init_info_modules(packages or None)
self.log('Imports to %r namespace\n----------------------------'\
% self.parent_name)
for package_name in self._get_sorted_names():
if package_name in self.imported_packages:
continue
info_module = self.info_modules[package_name]
global_symbols = getattr(info_module,'global_symbols',[])
postpone_import = getattr(info_module,'postpone_import',False)
if (postpone and not global_symbols) \
or (postpone_import and postpone is not None):
self.log('__all__.append(%r)' % (package_name))
if '.' not in package_name:
self.parent_export_names.append(package_name)
continue
old_object = frame.f_locals.get(package_name,None)
cmdstr = 'import '+package_name
if self._execcmd(cmdstr):
continue
self.imported_packages.append(package_name)
if verbose!=-1:
new_object = frame.f_locals.get(package_name)
if old_object is not None and old_object is not new_object:
self.warn('Overwriting %s=%s (was %s)' \
% (package_name,self._obj2repr(new_object),
self._obj2repr(old_object)))
if '.' not in package_name:
self.parent_export_names.append(package_name)
for symbol in global_symbols:
if symbol=='*':
symbols = eval('getattr(%s,"__all__",None)'\
% (package_name),
frame.f_globals,frame.f_locals)
if symbols is None:
symbols = eval('dir(%s)' % (package_name),
frame.f_globals,frame.f_locals)
symbols = filter(lambda s:not s.startswith('_'),symbols)
else:
symbols = [symbol]
if verbose!=-1:
old_objects = {}
for s in symbols:
if s in frame.f_locals:
old_objects[s] = frame.f_locals[s]
cmdstr = 'from '+package_name+' import '+symbol
if self._execcmd(cmdstr):
continue
if verbose!=-1:
for s,old_object in old_objects.items():
new_object = frame.f_locals[s]
if new_object is not old_object:
self.warn('Overwriting %s=%s (was %s)' \
% (s,self._obj2repr(new_object),
self._obj2repr(old_object)))
if symbol=='*':
self.parent_export_names.extend(symbols)
else:
self.parent_export_names.append(symbol)
return
def _execcmd(self,cmdstr):
""" Execute command in parent_frame."""
frame = self.parent_frame
try:
exec (cmdstr, frame.f_globals,frame.f_locals)
except Exception,msg:
self.error('%s -> failed: %s' % (cmdstr,msg))
return True
else:
self.log('%s -> success' % (cmdstr))
return
def _obj2repr(self,obj):
""" Return repr(obj) with"""
module = getattr(obj,'__module__',None)
file = getattr(obj,'__file__',None)
if module is not None:
return repr(obj) + ' from ' + module
if file is not None:
return repr(obj) + ' from ' + file
return repr(obj)
def log(self,mess):
if self.verbose>1:
print >> sys.stderr, str(mess)
def warn(self,mess):
if self.verbose>=0:
print >> sys.stderr, str(mess)
def error(self,mess):
if self.verbose!=-1:
print >> sys.stderr, str(mess)
def _get_doc_title(self, info_module):
""" Get the title from a package info.py file.
"""
title = getattr(info_module,'__doc_title__',None)
if title is not None:
return title
title = getattr(info_module,'__doc__',None)
if title is not None:
title = title.lstrip().split('\n',1)[0]
return title
return '* Not Available *'
def _format_titles(self,titles,colsep='---'):
display_window_width = 70 # How to determine the correct value in runtime??
lengths = [len(name)-name.find('.')-1 for (name,title) in titles]+[0]
max_length = max(lengths)
lines = []
for (name,title) in titles:
name = name[name.find('.')+1:]
w = max_length - len(name)
words = title.split()
line = '%s%s %s' % (name,w*' ',colsep)
tab = len(line) * ' '
while words:
word = words.pop(0)
if len(line)+len(word)>display_window_width:
lines.append(line)
line = tab
line += ' ' + word
else:
lines.append(line)
return '\n'.join(lines)
def get_pkgdocs(self):
""" Return documentation summary of subpackages.
"""
import sys
self.info_modules = {}
self._init_info_modules(None)
titles = []
symbols = []
for package_name, info_module in self.info_modules.items():
global_symbols = getattr(info_module,'global_symbols',[])
fullname = self.parent_name +'.'+ package_name
note = ''
if fullname not in sys.modules:
note = ' [*]'
titles.append((fullname,self._get_doc_title(info_module) + note))
if global_symbols:
symbols.append((package_name,', '.join(global_symbols)))
retstr = self._format_titles(titles) +\
'\n [*] - using a package requires explicit import (see pkgload)'
if symbols:
retstr += """\n\nGlobal symbols from subpackages"""\
"""\n-------------------------------\n""" +\
self._format_titles(symbols,'-->')
return retstr
class PackageLoaderDebug(PackageLoader):
def _execcmd(self,cmdstr):
""" Execute command in parent_frame."""
frame = self.parent_frame
print 'Executing',`cmdstr`,'...',
sys.stdout.flush()
exec (cmdstr, frame.f_globals,frame.f_locals)
print 'ok'
sys.stdout.flush()
return
if int(os.environ.get('NUMPY_IMPORT_DEBUG','0')):
PackageLoader = PackageLoaderDebug
|