ImportUtil.py :  » XML » 4Suite » 4Suite-XML-1.0.2 » Ft » Lib » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » XML » 4Suite 
4Suite » 4Suite XML 1.0.2 » Ft » Lib » ImportUtil.py
########################################################################
# $Header: /var/local/cvsroot/4Suite/Ft/Lib/ImportUtil.py,v 1.1.2.2 2006/12/08 22:50:53 jkloth Exp $
"""
Utilites for working with Python PEP 302 import hooks.

Copyright 2006 Fourthought, Inc. (USA).
Detailed license and copyright information: http://4suite.org/COPYRIGHT
Project home, documentation, distributions: http://4suite.org/
"""

from __future__ import generators
import os
import sys
import imp
import time
import types
import cStringIO
try:
    from zipimport import zipimporter
except ImportError:
    # placeholder for Python 2.2 to simplify code paths
    class zipimporter(object): pass

__all__ = [
    # Module Utilities
    'FindLoader', 'FindImporter', 'GetImporter', 'IterModules',
    'GetLastModified', 'GetSearchPath',
    # Resource Utilities
    'OsPathToResource', 'NormalizeResource', 'GetResourceFilename',
    'GetResourceString', 'GetResourceStream', 'GetResourceLastModified',
    ]

# Indicate that the use of "special" names is handled in a "zip-safe" way.
__zipsafe__ = True

IMP_SEARCH_ORDER = [ desc[0] for desc in imp.get_suffixes() ]

# ZIP imports always search for .pyc AND .pyo, but reverse their order
# depending on the optimzation flag (-O).
ZIP_SEARCH_ORDER = [ '.py', '.pyc', '.pyo']
if not __debug__:
    ZIP_SEARCH_ORDER.remove('.pyc')
    ZIP_SEARCH_ORDER.append('.pyc')

try:
    # New in Python 2.5 (or setuptools 0.7+)
    from pkgutil import ImpImporter,ImpLoader,iter_importers,get_loader,\
         find_loader, iter_modules, get_importer
except ImportError:
    import marshal, inspect, re

    MODULE_TYPE_INFO = {}
    for suffix, mode, module_type in imp.get_suffixes():
        MODULE_TYPE_INFO[module_type] = (suffix, mode)

    class ImpImporter:
        """PEP 302 Importer that wraps Python's "classic" import algorithm

        ImpImporter(dirname) produces a PEP 302 importer that searches that
        directory.  ImpImporter(None) produces a PEP 302 importer that
        searches the current sys.path, plus any modules that are frozen or
        built-in.

        Note that ImpImporter does not currently support being used by
        placement on sys.meta_path.
        """

        def __init__(self, path=None):
            if path is not None:
                if not path:
                    path = '.'
                elif not os.path.isdir(path):
                    raise ImportError("not a directory")
            self.path = path

        def __repr__(self):
            return "<ImpImporter object %r>" % self.path

        def find_module(self, fullname, path=None):
            # Note: we ignore 'path' argument since it is only used via
            # meta_path
            subname = fullname.split(".")[-1]
            if subname != fullname and self.path is None:
                return None
            if self.path is None:
                path = None
            else:
                path = [os.path.realpath(self.path)]
            try:
                file, filename, etc = imp.find_module(subname, path)
            except ImportError:
                return None
            return ImpLoader(fullname, file, filename, etc)

        def iter_modules(self, prefix=''):
            if self.path is None or not os.path.isdir(self.path):
                return

            yielded = {}

            filenames = os.listdir(self.path)
            filenames.sort()  # handle packages before same-named modules

            for fn in filenames:
                modname = inspect.getmodulename(fn)
                if modname == '__init__' or modname in yielded:
                    continue

                path = os.path.join(self.path, fn)
                ispkg = False

                if not modname and os.path.isdir(path) and '.' not in fn:
                    modname = fn
                    for fn in os.listdir(path):
                        subname = inspect.getmodulename(fn)
                        if subname == '__init__':
                            ispkg = True
                            break
                    else:
                        continue    # not a package

                if modname and '.' not in modname:
                    yielded[modname] = 1
                    yield prefix + modname, ispkg
            return

    class ImpLoader:
        """PEP 302 Loader that wraps Python's "classic" import algorithm
        """
        code = None
        source = None

        def __init__(self, fullname, file, filename, etc):
            self.file = file
            self.filename = filename
            self.fullname = fullname
            self.etc = etc

        def load_module(self, fullname):
            self._reopen()
            try:
                mod = imp.load_module(fullname, self.file, self.filename,
                                      self.etc)
            finally:
                self.file.close()
            # Note: we don't set __loader__ because we want the module to
            # look normal; i.e. this is just a wrapper for standard import
            # machinery
            return mod

        def get_data(self, pathname):
            f = open(pathname, 'rb')
            try:
                data = f.read()
            finally:
                f.close()
            return data

        def _reopen(self):
            if self.file and self.file.closed:
                suffix, mode, module_type = self.etc
                self.file = open(self.filename, mode)
            return

        def _fix_name(self, fullname):
            if fullname is None:
                fullname = self.fullname
            elif fullname != self.fullname:
                raise ImportError("Loader for module %s cannot handle "
                                  "module %s" % (self.fullname, fullname))
            return fullname

        def _get_package_loader(self):
            return ImpImporter(self.filename).find_module('__init__')

        def is_package(self, fullname):
            fullname = self._fix_name(fullname)
            return self.etc[2] == imp.PKG_DIRECTORY

        def get_code(self, fullname=None):
            fullname = self._fix_name(fullname)
            if self.code is None:
                module_type = self.etc[2]
                if module_type == imp.PY_SOURCE:
                    source = self.get_source(fullname)
                    self.code = compile(source, self.filename, 'exec')
                elif module_type == imp.PY_COMPILED:
                    self._reopen()
                    try:
                        magic = self.file.read(4)
                        if magic == imp.get_magic():
                            timestamp = self.file.read(4)
                            self.code = marshal.load(self.file)
                    finally:
                        self.file.close()
                elif module_type == imp.PKG_DIRECTORY:
                    self.code = self._get_package_loader().get_code()
            return self.code

        def get_source(self, fullname=None):
            fullname = self._fix_name(fullname)
            if self.source is None:
                module_type = self.etc[2]
                if module_type == imp.PY_SOURCE:
                    self._reopen()
                    try:
                        self.source = self.file.read()
                    finally:
                        self.file.close()
                elif module_type == imp.PY_COMPILED:
                    suffix, mode = MODULE_TYPE_INFO[imp.PY_COMPILED]
                    filename = os.path.splitext(self.filename)[0] + suffix
                    if os.path.exists(filename):
                        f = open(filename, mode)
                        try:
                            self.source = f.read()
                        finally:
                            f.close()
                elif module_type == imp.PKG_DIRECTORY:
                    self.source = self._get_package_loader().get_source()
            return self.source

        def get_filename(self, fullname=None):
            fullname = self._fix_name(fullname)
            module_type = self.etc[2]
            if module_type == imp.PKG_DIRECTORY:
                return self._get_package_loader().get_filename()
            elif module_type in MODULE_TYPE_INFO:
                return self.filename
            return None

    def get_importer(path_item):
        """Retrieve a PEP 302 importer for the given path item
        """
        if sys.version < '2.3':
            importer = None
        else:
            try:
                importer = sys.path_importer_cache[path_item]
            except KeyError:
                for path_hook in sys.path_hooks:
                    try:
                        importer = path_hook(path_item)
                        break
                    except ImportError:
                        pass
                else:
                    importer = None
                sys.path_importer_cache.setdefault(path_item, importer)

        # The boolean values are used for caching valid and invalid
        # file paths for the built-in import machinery
        if importer in (None, True, False):
            try:
                importer = ImpImporter(path_item)
            except ImportError:
                importer = None
        return importer

    def iter_importers(fullname=''):
        if '.' in fullname:
            # Get the containing package's __path__
            pkg = '.'.join(fullname.split('.')[:-1])
            if pkg not in sys.modules:
                __import__(pkg)
            path = sys.modules[pkg].__path__
        else:
            # sys.meta_path is available in Python 2.3+
            for importer in getattr(sys, 'meta_path', []):
                yield importer
            path = sys.path
        for item in path:
            yield get_importer(item)
        if '.' not in fullname:
            yield ImpImporter()

    def get_loader(module_or_name):
        """Get a PEP 302 "loader" object for module_or_name

        If the module or package is accessible via the normal import
        mechanism, a wrapper around the relevant part of that machinery
        is returned.  Returns None if the module cannot be found or imported.
        If the named module is not already imported, its containing package
        (if any) is imported, in order to establish the package __path__.
        """
        if module_or_name in sys.modules:
            module_or_name = sys.modules[module_or_name]
        if isinstance(module_or_name, types.ModuleType):
            module = module_or_name
            loader = getattr(module, '__loader__', None)
            if loader is not None:
                return loader
            fullname = module.__name__
        else:
            fullname = module_or_name
        return find_loader(fullname)

    def find_loader(fullname):
        """Find a PEP 302 "loader" object for fullname

        If fullname contains dots, path must be the containing package's
        __path__. Returns None if the module cannot be found or imported.
        """
        for importer in iter_importers(fullname):
            loader = importer.find_module(fullname)
            if loader is not None:
                return loader
        return None

    def iter_zipimport_modules(importer, prefix):
        # make the path components regex safe
        sep = os.sep.replace('\\', '\\\\')
        path = prefix.replace(os.sep, sep)
        # using "non-greedy" matching in case a suffix is not just an
        # extension (like module.so for dlopen imports)
        modname = '[a-zA-Z_][a-zA-Z0-9_]*?'
        pkginit = sep + '__init__'
        suffix = '|'.join([ desc[0] for desc in imp.get_suffixes() ])
        suffix = suffix.replace('.', '\\.')
        pattern = '^%s(%s)(%s)?(%s)$' % (path, modname, pkginit, suffix)
        submodule_match = re.compile(pattern).match
        yielded = {}
        dirlist = list(importer._files)
        dirlist.sort()
        for fn in dirlist:
            match = submodule_match(fn)
            if match is not None:
                modname, pkginit, suffix = match.groups()
                if pkginit:
                    ispkg = True
                elif modname == '__init__':
                    continue
                else:
                    ispkg = False
                if modname not in yielded:
                    yielded[modname] = True
                    yield modname, ispkg
        return

    def iter_modules(path=None, prefix=''):
        """Yield submodule names+loaders for path or sys.path"""
        if path is None:
            importers = iter_importers()
        else:
            importers = map(get_importer, path)

        yielded = {}
        for importer in importers:
            if hasattr(importer, 'iter_modules'):
                modules = importer.iter_modules(prefix)
            elif isinstance(importer, zipimporter):
                modules = iter_zipimport_modules(importer, prefix)
            else:
                modules = []
            for name, ispkg in modules:
                if name not in yielded:
                    yielded[name] = 1
                    yield importer, name, ispkg
        return

try:
    from pkg_resources import get_provider,resource_filename
except ImportError:
    class DefaultProvider:
        """Resource provider for "classic" loaders"""
        def __init__(self, module):
            self.loader = getattr(module, '__loader__', None)
            self.module_path = os.path.dirname(module.__file__)

        def get_resource_filename(self, manager, resource_name):
            return self._fn(self.module_path, resource_name)

        def get_resource_stream(self, manager, resource_name):
            return open(self._fn(self.module_path, resource_name), 'rb')

        def get_resource_string(self, manager, resource_name):
            stream = self.get_resource_stream(manager, resource_name)
            try:
                return stream.read()
            finally:
                stream.close()

        def has_resource(self, resource_name):
            return self._has(self._fn(self.module_path, resource_name))

        def resource_isdir(self, resource_name):
            return self._isdir(self._fn(self.module_path, resource_name))

        def resource_listdir(self, resource_name):
            return self._listdir(self._fn(self.module_path, resource_name))

        def _fn(self, base, resource_name):
            return os.path.join(base, *resource_name.split('/'))

        def _has(self, pathname):
            return os.path.exists(pathname)

        def _isdir(self, pathname):
            return os.path.isdir(pathname)

        def _listdir(self, pathname):
            return os.listdir(pathname)

    class ZipProvider(DefaultProvider):
        """Resource provider for ZIP loaders"""

        _dirindex = None

        def __init__(self, module):
            DefaultProvider.__init__(self, module)
            self.zipinfo = self.loader._files
            self.zip_pre = self.loader.archive + os.sep

        def get_resource_filename(self, manager, resource_name):
            raise NotImplementedError("not supported by ZIP loaders")

        def get_resource_stream(self, manager, resource_name):
            data = self.get_resource_string(manager, resource_name)
            return cStringIO.StringIO(data)

        def get_resource_string(self, manager, resource_name):
            pathname = self._fn(self.module_path, resource_name)
            return self.loader.get_data(pathname)

        def _zipinfo_name(self, pathname):
            # Convert a virtual filename (full path to file) into a zipfile
            # subpath usable with the zipimport directory cache for our
            # target archive.
            if pathname.startswith(self.zip_pre):
                return pathname[len(self.zip_pre):]
            raise ValueError("%s not in %s" % (pathname, self.zip_pre))

        def _build_index(self):
            self._dirindex = index = {}
            for path in self.zipinfo:
                parts = path.split(os.sep)
                while parts:
                    parent = os.sep.join(parts[:-1])
                    if parent in index:
                        index[parent].append(parts[-1])
                        break
                    else:
                        index[parent] = [parts.pop()]
            return index

        def _has(self, pathname):
            arcname = self._zipinfo_name(fspath)
            return (arcname in self.zipinfo or
                    arcname in (self._dirindex or self._build_index()))

        def _isdir(self, pathname):
            arcname = self._zipinfo_name(pathname)
            return arcname in (self._dirindex or self._build_index())

        def _listdir(self, pathname):
            arcname = self._zipinfo_name(pathname)
            if arcname in (self._dirindex or self._build_index()):
                return self._dirindex[arcname][:]
            return []

    def get_provider(fullname):
        if fullname not in sys.modules:
            __import__(fullname)
        module = sys.modules[fullname]
        loader = getattr(module, '__loader__', None)
        if loader is None:
            provider = DefaultProvider(module)
        elif isinstance(loader, zipimporter):
            provider = ZipProvider(module)
        else:
            raise NotImplementedError('unsupported loader type: %s' % loader)
        return provider

    _resource_manager = None
else:
    # pkg_resources (aka setuptools) installed.; the resource_filename
    # top-level name is actually the bound method of the global
    # ResourceManager (at least that is what the PkgResources docs say).
    _resource_manager = resource_filename.im_self
    del resource_filename

GetImporter = get_importer
FindLoader = find_loader
IterModules = iter_modules

def FindImporter(fullname):
    """Find a PEP 302 "loader" object for fullname

    If fullname contains dots, path must be the containing package's
    __path__. Returns None if the module cannot be found or imported.
    """
    for importer in iter_importers(fullname):
        if importer.find_module(fullname) is not None:
            return importer
    return None

def GetLastModified(fullname):
    """
    Returns the last modified timestamp for the given module.
    """
    loader = get_loader(fullname)
    if hasattr(loader, 'get_filename'):
        suffixes = IMP_SEARCH_ORDER
    elif isinstance(loader, zipimporter):
        suffixes = ZIP_SEARCH_ORDER
    else:
        raise NotImplementedError("unsupported loader %s" % laoder)

    barename = '/' + fullname.replace('.', '/')
    if loader.is_package(fullname):
        barename += '/__init__'
    for suffix in suffixes:
        resource = barename + suffix
        try:
            timestamp = GetResourceLastModified(fullname, resource)
        except EnvironmentError:
            timestamp = 0
        else:
            break
    return timestamp

def GetSearchPath(fullname):
    loader = get_loader(fullname)
    if loader.is_package(fullname):
        if fullname in sys.modules:
            package = sys.modules[fullname]
        else:
            package = loader.load_module(fullname)
        return package.__path__
    return None

# -- Resource Handling ------------------------------------------------

def OsPathToResource(pathname):
    components = []
    for component in pathname.split(os.sep):
        if component == '..':
            del components[-1:]
        elif component not in ('', '.'):
            components.append(component)
    resource = '/'.join(components)
    if pathname.startswith(os.sep):
        resource = '/' + resource
    return resource

def NormalizeResource(package, resource):
    # normalize the resource pathname
    # Note, posixpath is not used as it doesn't remove leading '..'s
    components = []
    for component in resource.split('/'):
        if component == '..':
            del components[-1:]
        elif component not in ('', '.'):
            components.append(component)
    absolute = resource.startswith('/')
    resource = '/'.join(components)
    provider = get_provider(package)
    if absolute:
        # Find the provider for the distribution directory
        module_path = provider.module_path
        packages = package.split('.')
        if not get_loader(package).is_package(package):
            del packages[-1]
        for module in packages:
            module_path = os.path.dirname(module_path)
        provider.module_path = module_path
    return (provider, resource)

def GetResourceFilename(package, resource):
    """Returns a true filesystem name for the specified resource.
    """
    provider, resource = NormalizeResource(package, resource)
    return provider.get_resource_filename(_resource_manager, resource)

def GetResourceString(package, resource):
    """Return a string containing the contents of the specified resource.

    If the pathname is absolute it is retrieved starting at the path of
    the importer for 'fullname'.  Otherwise, it is retrieved relative
    to the module within the loader.
    """
    provider, resource = NormalizeResource(package, resource)
    return provider.get_resource_string(_resource_manager, resource)

def GetResourceStream(package, resource):
    """Return a readable stream for specified resource"""
    provider, resource = NormalizeResource(package, resource)
    return provider.get_resource_stream(_resource_manager, resource)

def GetResourceLastModified(package, resource):
    """Return a timestamp indicating the last-modified time of the
    specified resource.  Raises IOError is the pathname cannot be found
    from the loader for 'fullname'.
    """
    provider, resource = NormalizeResource(package, resource)
    if isinstance(provider.loader, zipimporter):
        if not resource:
            # it is the archive itself
            timestamp = os.stat(provider.module_path).st_mtime
        else:
            filename = provider._fn(provider.module_path, resource)
            zipinfo_name = provider._zipinfo_name(filename)
            try:
                dostime, dosdate = provider.zipinfo[zipinfo_name][5:7]
            except:
                import errno
                errorcode = errno.ENOENT
                raise IOError(errorcode, os.strerror(errorcode), zipinfo_name)
            timestamp = time.mktime((
                ((dosdate >> 9)  & 0x7f) + 1980, # tm_year
                ((dosdate >> 5)  & 0x0f) - 1,    # tm_mon
                ((dosdate >> 0)  & 0x1f),        # tm_mday
                ((dostime >> 11) & 0x1f),        # tm_hour
                ((dostime >> 5)  & 0x3f),        # tm_min
                ((dostime >> 0)  & 0x1f) * 2,    # tm_secs
                0, 0, -1))
    else:
        filename = provider.get_resource_filename(_resource_manager, resource)
        timestamp = os.stat(filename).st_mtime
    return timestamp
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.