Catalog.py :  » XML » 4Suite » 4Suite-XML-1.0.2 » Ft » Xml » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » XML » 4Suite 
4Suite » 4Suite XML 1.0.2 » Ft » Xml » Catalog.py
########################################################################
# $Source: /var/local/cvsroot/4Suite/Ft/Xml/Catalog.py,v $ $Revision: 1.38 $ $Date: 2006/08/12 15:56:22 $
"""
Classes and functions that help implement OASIS XML and TR9401 Catalogs.
Resolution with Catalogs is handled via the Ft.Xml.InputSource module.

Based on a contribution to PyXML from Tarn Weisner Burton
<twburton@users.sf.net>. See
http://sourceforge.net/tracker/index.php?func=detail&aid=490069&group_id=6473&atid=306473

Copyright 2005 Fourthought, Inc. (USA).
Detailed license and copyright information: http://4suite.org/COPYRIGHT
Project home, documentation, distributions: http://4suite.org/
"""

import os, re, sys, warnings, cStringIO
from xml.sax import xmlreader

from Ft import FtWarning,GetConfigVar
from Ft.Lib import Uri,UriException,ImportUtil
from Ft.Xml import XML_NAMESPACE
from Ft.Xml.Lib.XmlString import IsXml

__all__ = ['Catalog', 'GetDefaultCatalog']

TR9401 = re.compile(r'^\s*(BASE|CATALOG|DELEGATE|PUBLIC|SYSTEM|OVERRIDE\s+YES|OVERRIDE\s+NO)\s+"((?:[^"\\]|\\.)*)"(?:\s+"((?:[^"\\]|\\.)*)")?', re.M | re.I)


_urn_hex_re = re.compile('%(..)')
_urn_trans_re = re.compile('[+:;]')
_urn_trans_map = {'+' : ' ',
                  ';' : '//',
                  ':' : '::',
                  }

def UnwrapUrn(urn):
    unwrapped = False
    if urn:
        # normalize URN
        if urn.lower()[:4] == 'urn:':
            # make the first 2 components lowercase
            parts = urn.split(':', 2)
            parts[:2] = [ x.lower() for x in parts[:2] ]
            urn = ':'.join(parts)
            # make hex codes uppercase
            urn = _urn_hex_re.sub(lambda m: '%' + m.group(1).upper(), urn)

        # "unwrap" publicid URN
        if urn[:13] == 'urn:publicid:':
            urn = urn[13:]
            urn = _urn_trans_re.sub(lambda m: _urn_trans_map[m.group()], urn)
            urn = _urn_hex_re.sub(lambda m: chr(int(m.group(1), 16)), urn)
            unwrapped = True

    return (unwrapped, urn)


class Catalog:
    """
    Reads and provides access to a catalog, providing mappings of public
    and system IDs to URIs, etc.

    It is implemented as a SAX ContentHandler and is able to read
    OASIS TR 9401 Catalogs <http://www.oasis-open.org/specs/a401.htm>
    and OASIS XML Catalogs <http://www.oasis-open.org/committees/entity/spec.html>
    """
    def __init__(self, uri, quiet=True):
        self.systemIds = {}
        self.publicIds = {}
        self.uris = {}
        self.publicDelegates = []
        self.systemDelegates = []
        self.uriDelegates = []
        self.systemRewrites = []
        self.uriRewrites = []
        self.catalogs = []
        self.uri = uri
        self.quiet = quiet

        if not Uri.IsAbsolute(uri):
            # Using a relative URI here makes it hard to reliably
            # locate the catalog. Also, if the catalog doesn't set
            # its own base URI with xml:base, then we won't be able
            # to resolve relative URI references within the catalog.
            # So we should warn that this situation is undesirable.
            warnings.warn("Catalog URI '%s' is not absolute.",
                          FtWarning, 2)

        stream = Uri.BASIC_RESOLVER.resolve(uri)
        data = stream.read()
        stream.close()

        if IsXml(data):
            # cannot be a TR 9401 document, assume an XML Catalog
            self._parseXmlCat(data)
        else:
            # cannot be an XML Catalog, assume a TR 9401 file
            self._parseTr9401(data)

        # longest match first
        self.publicDelegates.sort()
        self.publicDelegates.reverse()
        self.systemDelegates.sort()
        self.systemDelegates.reverse()
        self.uriDelegates.sort()
        self.uriDelegates.reverse()
        self.systemRewrites.sort()
        self.systemRewrites.reverse()
        self.uriRewrites.sort()
        self.uriRewrites.reverse()

        if not quiet:
            sys.stderr.write('Catalog contents:\n')
            for key in self.__dict__.keys():
                sys.stderr.write('  %s = %r\n' % (key, self.__dict__[key]))
            sys.stderr.flush()
        return

    def resolveEntity(self, publicId, systemId):
        """
        Return the applicable URI.

        If an external identifier (PUBLIC or SYSTEM) entry exists in the
        Catalog for the identifier(s) specified, return the mapped value.

        External identifiers identify the external subset, entities, and
        notations of an XML document.
        """
        unwrapped, publicId = UnwrapUrn(publicId)
        unwrapped, systemId = UnwrapUrn(systemId)
        # If the system identifier is a URN in the publicid namespace, it is
        # converted into a public identifier by "unwrapping" the URN.
        if unwrapped:
            # 1. No public identifier was provided. Resolution continues as if
            # the public identifier constructed by unwrapping the URN was
            # supplied as the original public identifier and no system
            # identifier was provided.
            if not publicId:
                publicId = systemId
                systemId = None
            # 2. The normalized public identifier provided is lexically
            # identical to the public identifier constructed by unwrapping
            # the URN. Resolution continues as if the system identifier had
            # not been supplied.
            elif publicId == systemId:
                systemId = None
            # 3. The normalized public identifier provided is different from
            # the public identifier constructed by unwrapping the URN. This
            # is an error. Applications may recover from this error by
            # discarding the system identifier and proceeding with the
            # original public identifier.
            else:
                warnings.warn("publicId %r does not match the unwrapped "
                              "systemId %r" % (publicId, systemId),
                              FtWarning, 2)
                systemId = None

        # Resolution follows the steps listed below, proceeding to each
        # subsequent step if and only if no other action is indicated.
        #
        # 1. Resolution begins in the first catalog entry file in the
        # current catalog entry file list.

        if systemId is not None:
            # 2. If a system identifier is provided, and at least one matching
            # system entry exists, the (absolutized) value of the uri
            # attribute of the first matching system entry is returned.
            if systemId in self.systemIds:
                return self.systemIds[systemId]

            # 3. If a system identifier is provided, and at least one matching
            # rewriteSystem entry exists, rewriting is performed.
            #
            # Rewriting removes the matching prefix and replaces it with the
            # rewrite prefix identified by the matching rewriteSystem entry.
            # The rewritten string is returned.
            for length, start, rewrite in self.systemRewrites:
                if start == systemId[:length]:
                    return rewrite + systemId[length:]

            # 4. If a system identifier is provided, and one or more
            # delegateSystem entries match, delegation is performed.
            #
            # If delegation is to be performed, a new catalog entry file list
            # is generated from the set of all matching delegateSystem
            # entries. The (absolutized) value of the catalog attribute of
            # each matching delegateSystem entry is inserted into the new
            # catalog entry file list such that the delegate entry with the
            # longest matching systemIdStartString is first on the list, the
            # entry with the second longest match is second, etc.
            #
            # These are the only catalog entry files on the list, the current
            # list is not considered for the purpose of delegation. If
            # delegation fails to find a match, resolution for this entity
            # does not resume with the current list. (A subsequent resolution
            # attempt for a different entity begins with the original list; in
            # other words the catalog entry file list used for delegation is
            # distinct and unrelated to the "normal" catalog entry file list.)
            #
            # Catalog resolution restarts using exclusively the catalog entry
            # files in this new list and the given system identifier; any
            # originally given public identifier is ignored during the
            # remainder of the resolution of this external identifier: return
            # to step 1.
            attempted = False
            for length, start, catalog in self.systemDelegates:
                if start == systemId[:length]:
                    attempted = True
                    result = catalog.resolveEntity(publicId, systemId)
                    if result:
                        return result
            if attempted:
                # delegation attempted but failed, resolution aborted
                return

        if publicId is not None:
            # 5. If a public identifier is provided, and at least one matching
            # public entry exists, the (absolutized) value of the uri
            # attribute of the first matching public entry is returned. If a
            # system identifier is also provided as part of the input to this
            # catalog lookup, only public entries that occur where the prefer
            # setting is public are considered for matching.
            if publicId in self.publicIds:
                uri, prefer = self.publicIds[publicId]
                if systemId is None or prefer:
                    return uri

            # 6. If a public identifier is provided, and one or more
            # delegatePublic entries match, delegation is performed. If a
            # system identifier is also provided as part of the input to this
            # catalog lookup, only delegatePublic entries that occur where
            # the prefer setting is public are considered for matching.
            #
            # See #4 above for details on delegation.
            attempted = False
            for length, start, catalog, prefer in self.publicDelegates:
                if (systemId is None or prefer) and start == publicId[:length]:
                    attempted = True
                    result = catalog.resolveEntity(publicId, systemId)
                    if result:
                        return result
            if attempted:
                # delegation attempted but failed, resolution aborted
                return

        # 7. If the current catalog entry file contains one or more
        # nextCatalog entries, the catalog entry files referenced by each
        # nextCatalog entry's "catalog" attribute are inserted, in the order
        # that they appear in this catalog entry file, onto the current
        # catalog entry file list, immediately after the current catalog
        # entry file.
        #
        # 8. If there are one or more catalog entry files remaining on the
        # current catalog entry file list, load the next catalog entry file
        # and continue resolution efforts: return to step 2.
        for catalog in self.catalogs:
            result = catalog.resolveEntity(publicId, systemId)
            if result:
                return result

        # 9. Indicate to the calling application that no match was found.
        return


    def resolveURI(self, uri):
        """
        Return the applicable URI.

        If a URI entry exists in the Catalog for the URI specified, return
        the mapped value.

        URI references, for example namespace names, stylesheets, included
        files, graphics, and hypertext references, simply identify other
        resources.
        """
        # If the URI reference is a URN in the publicid namespace
        # ([RFC 3151]), it is converted into a public identifier by
        # "unwrapping" the URN (Section 6.4). Resolution continues by
        # following the semantics of external identifier resolution
        # (Section 7.1) as if the public identifier constructed by
        # unwrapping the URN had been provided and no system identifier had
        # been provided.
        unwrapped, publicId = UnwrapUrn(uri)
        if unwrapped:
            return self.resolveEntity(publicId, None)

        # Resolution of a generic URI reference follows the steps listed
        # below, proceeding to each subsequent step if and only if no other
        # action is indicated.

        # 1. Resolution begins in the first catalog entry file in the
        # current catalog list.

        # 2. If at least one matching uri entry exists, the (absolutized)
        # value of the uri attribute of the first matching uri entry is
        # returned.
        if uri in self.uris:
            return self.uris[uri]

        # 3. If at least one matching rewriteURI entry exists, rewriting is
        # performed.
        #
        # Rewriting removes the matching prefix and replaces it with the
        # rewrite prefix identified by the matching rewriteURI entry. The
        # rewritten string is returned.
        for length, start, rewrite in self.uriRewrites:
            if start == uri[:length]:
                return rewrite + uri[length:]

        # 4. If one or more delegateURI entries match, delegation is performed.
        #
        # If delegation is to be performed, a new catalog entry file list is
        # generated from the set of all matching delegateURI entries. The
        # (absolutized) value of the catalog attribute of each matching
        # delegateURI entry is inserted into the new catalog entry file list
        # such that the delegate entry with the longest matching
        # uriStartString is first on the list, the entry with the second
        # longest match is second, etc.
        #
        # These are the only catalog entry files on the list, the current list
        # is not considered for the purpose of delegation. If delegation fails
        # to find a match, resolution for this entity does not resume with the
        # current list. (A subsequent resolution attempt for a different
        # entity begins with the original list; in other words the catalog
        # entry file list used for delegation is distinct and unrelated to the
        # "normal" catalog entry file list.)
        #
        # Catalog resolution restarts using exclusively the catalog entry
        # files in this new list and the given URI reference: return to step 1.
        attempted = False
        for length, start, catalog in self.uriDelegates:
            if start == uri[:length]:
                attempted = True
                result = catalog.resolveURI(uri)
                if result:
                    return result
        if attempted:
            # delegation attempted but failed, resolution aborted
            return

        # 5. If the current catalog entry file contains one or more
        # nextCatalog entries, the catalog entry files referenced by each
        # nextCatalog entry's "catalog" attribute are inserted, in the order
        # that they appear in this catalog entry file, onto the current
        # catalog entry file list, immediately after the current catalog
        # entry file.
        #
        # 6. If there are one or more catalog entry files remaining on the
        # current catalog entry file list, load the next catalog entry file
        # and continue resolution efforts: return to step 2.
        for catalog in self.catalogs:
            result = catalog.resolveURI(uri)
            if result:
                return result

        # 7. Indicate to the calling application that no match was found.
        return

    def _parseXmlCat(self, data):
        """
        Parse an XML Catalog, as specified in
        http://www.oasis-open.org/committees/entity/spec-2001-08-06.html.
        Partially implemented.
        """
        self.prefer_public = [True]
        self.base = [self.uri]

        # Since we have the catalog data already, parse it.
        source = xmlreader.InputSource(self.uri)
        source.setByteStream(cStringIO.StringIO(data))

        from Ft.Xml.Sax import CreateParser
        p = CreateParser()
        p.setFeature(
          'http://xml.org/sax/features/external-parameter-entities', False)
        p.setContentHandler(self)
        p.parse(source)

        # are these explicit dels needed?
        del self.prefer_public
        del self.base
        return

    def _parseTr9401(self, data):
        """
        Parse a TR9401 Catalog, as specified in
        <http://www.oasis-open.org/specs/a401.htm>.
        Partially implemented.
        """
        prefer_public = True
        base = self.uri
        for cmd in TR9401.findall(data):
            token = cmd[0].upper()
            if token == 'PUBLIC':
                if len(cmd) == 3:
                    self.publicIds[cmd[1]] = (Uri.Absolutize(cmd[2], base), prefer_public)
            elif token == 'SYSTEM':
                if len(cmd) == 3:
                    self.systemIds[cmd[1]] = Uri.Absolutize(cmd[2], base)
            elif token == 'BASE':
                base = cmd[1]
            elif token[:8] == 'OVERRIDE':
                prefer_public = token[8:].strip() == 'YES'
            elif token == 'DELEGATE':
                if len(cmd) == 3:
                    self.publicDelegates[cmd[1]] = Uri.Absolutize(cmd[2], base)
            elif token == 'CATALOG':
                if len(cmd) == 2:
                    catalog = Catalog(Uri.Absolutize(cmd[1], base), self.quiet)
                    self.catalogs.append(catalog)
        return

    # methods used by the XML parser

    def startElementNS(self, (namespace, name), qualifiedName, attrs):
        """
        Handle an element start event for the XML parser.
        This is a SAX ContentHandler method.
        """
        # update current base URI
        base = self.base[-1]
        if name not in ('rewriteSystem', 'rewriteURI'):
            base = attrs.get((XML_NAMESPACE, 'base'), base)
        self.base.append(base)

        if name == 'public':
            # a publicId lookup
            if self.__ensure_attrs(name, attrs, 'publicId', 'uri'):
                # save the state of prefer_public also
                publicId = attrs[(None, 'publicId')]
                uri = Uri.Absolutize(attrs[(None, 'uri')], base)
                self.publicIds[publicId] = (uri, self.prefer_public[-1])
        elif name == 'system':
            # a systemId lookup
            if self.__ensure_attrs(name, attrs, 'systemId', 'uri'):
                systemId = attrs[(None, 'systemId')]
                uri = Uri.Absolutize(attrs[(None, 'uri')], base)
                self.systemIds[systemId] = uri
        elif name == 'uri':
            # a URI lookup
            if self.__ensure_attrs(name, attrs, 'name', 'uri'):
                name = attrs[(None, 'name')]
                uri = Uri.Absolutize(attrs[(None, 'uri')], base)
                self.uris[name] = uri
        elif name == 'rewriteURI':
            # a URI rewrite
            if self.__ensure_attrs(name, attrs, 'uriStartString', 'rewritePrefix'):
                startString = attrs[(None, 'uriStartString')]
                rewritePrefix = Uri.Absolutize(attrs[(None, 'rewritePrefix')],
                                               base)
                rewriteRule = (len(startString), startString, rewritePrefix)
                self.uriRewrites.append(rewriteRule)
        elif name == 'rewriteSystem':
            # a systemId rewrite
            if self.__ensure_attrs(name, attrs, 'systemIdStartString', 'rewritePrefix'):
                startString = attrs[(None, 'systemIdStartString')]
                rewritePrefix = Uri.Absolutize(attrs[(None, 'rewritePrefix')],
                                               base)
                rewriteRule = (len(startString), startString, rewritePrefix)
                self.systemRewrites.append(rewriteRule)
        elif name == 'delegateSystem':
            # delegate systemId to specific catalog
            if self.__ensure_attrs(name, attrs, 'systemIdStartString', 'catalog '):
                startString = attrs[(None, 'systemIdStartString')]
                catalog = Uri.Absolutize(attrs[(None, 'catalog')], base)
                delegate = Catalog(catalog, self.quiet)
                delegateRule = (len(startString), startString, delegate)
                self.systemDelegates.append(delegateRule)

        elif name == 'delegatePublic':
            # delegate publicId to specific catalog
            if self.__ensure_attrs(name, attrs, 'publicIdStartString', 'catalog '):
                # save the state of prefer_public also
                startString = attrs[(None, 'publicIdStartString')]
                catalog = Uri.Absolutize(attrs[(None, 'catalog')], base)
                delegate = Catalog(catalog, self.quiet)
                delegateRule = (len(startString), startString, delegate,
                                self.prefer_public[-1])
                self.publicDelegates.append(delegateRule)
        elif name == 'delegateURI':
            # delegate URI to specific catalog
            if self.__ensure_attrs(name, attrs, 'uriStartString', 'catalog '):
                startString = attrs[(None, 'uriStartString')]
                catalog = Uri.Absolutize(attrs[(None, 'catalog')], base)
                delegate = Catalog(catalog, self.quiet)
                delegateRule = (len(startString), startString, delegate)
                self.uriDelegates.append(delegateRule)
        elif name == 'nextCatalog':
            # the next catalog in a chain
            if self.__ensure_attrs(name, attrs, 'catalog'):
                catalog = Uri.Absolutize(attrs[(None, 'catalog')], base)
                self.catalogs.append(Catalog(catalog, self.quiet))
        elif name in ('catalog', 'group'):
            # look for prefer attribute and update the stack
            prefer = self.prefer_public[-1] and 'public' or 'system'
            prefer = attrs.get((None, 'prefer'), prefer) == 'public'
            self.prefer_public.append(prefer)
        return

    def __ensure_attrs(self, name, attrs, *attr_names):
        """
        Ensure that the right attributes exist just in case the parser
        is a non-validating one.
        """
        for attr_name in attr_names:
            #if not attr_name in attrs:
            if not attrs.has_key((None, attr_name)):
                if not self.quiet:
                    print '%s: Malformed %s element, missing %s attribute' % (self.uri, name, attr_name)
                return False
        return True


    def endElementNS(self, (namespace, name), qualifiedName):
        """
        Handle an element end event for the XML parser.
        This is a SAX ContentHandler method.
        """
        self.base.pop()
        if name in ('catalog', 'group'):
            # pop the stack
            self.prefer_public.pop()
        return


def GetDefaultCatalog(basename='default.cat'):
    """
    Load the default catalog file(s).
    """
    quiet = 'XML_DEBUG_CATALOG' not in os.environ

    uris = []
    # original 4Suite XML Catalog support
    if 'XML_CATALOGS' in os.environ:
        # os.pathsep seperated list of pathnames
        for path in os.environ['XML_CATALOGS'].split(os.pathsep):
            uris.append(Uri.OsPathToUri(path))

    # libxml2 XML Catalog support
    if 'XML_CATALOG_FILES' in os.environ:
        # whitespace-separated list of pathnames or URLs (ick!)
        for path in os.environ['XML_CATALOG_FILES'].split():
            # if its already not already an URL, make it one
            if not Uri.IsAbsolute(path):
                uris.append(Uri.OsPathToUri(path))
            else:
                uris.append(path)

    # add the default 4Suite catalog
    pathname = os.path.join(GetConfigVar('DATADIR'), basename)
    if GetConfigVar('RESOURCEBUNDLE'):
        resource = ImportUtil.OsPathToResource(pathname)
        uri = Uri.ResourceToUri('Ft.Xml', resource)
    else:
        uri = Uri.OsPathToUri(pathname)
    uris.append(uri)

    if not quiet:
        prefix = "Catalog URIs:"
        for uri in uris:
            sys.stderr.write('%s %s\n' % (prefix, uri))
            prefix = " "*len(prefix)

    catalog = None
    for uri in uris:
        if not quiet:
            sys.stderr.write('Reading %s\n' % uri)
            sys.stderr.flush()
        try:
            # FIXME: Use dict merging rather than this inefficient cascading
            if catalog is None:
                if not quiet:
                    sys.stderr.write('Creating catalog from %s\n' % uri)
                    sys.stderr.flush()
                catalog = Catalog(uri, quiet)
            else:
                if not quiet:
                    sys.stderr.write('Appending %s\n' % uri)
                    sys.stderr.flush()
                catalog.catalogs.append(Catalog(uri, quiet))
        except UriException, e:
            warnings.warn("Catalog resource (%s) disabled: %s" % (uri,
                                                                  e.message),
                          FtWarning)

    if not quiet:
        sys.stderr.write('Done. Result is %r\n' % catalog)
        sys.stderr.flush()

    return catalog
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.