XUpdate.py :  » XML » 4Suite » 4Suite-XML-1.0.2 » Ft » Xml » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » XML » 4Suite 
4Suite » 4Suite XML 1.0.2 » Ft » Xml » XUpdate.py
########################################################################
# $Header: /var/local/cvsroot/4Suite/Ft/Xml/XUpdate.py,v 1.58 2005/08/02 22:42:59 mbrown Exp $
"""
XUpdate request processing

XUpdate is specified (poorly) at
http://xmldb-org.sourceforge.net/xupdate/xupdate-wd.html

Copyright 2005 Fourthought, Inc. (USA).
Detailed license and copyright information: http://4suite.org/COPYRIGHT
Project home, documentation, distributions: http://4suite.org/
"""

__all__ = ['XUPDATE_NS', 'XUpdateException', 'g_errorMessages',
           'StringWriter', 'SUPPORTED_VERSIONS',
           'Processor', 'Reader', 'ApplyXUpdate']

XUPDATE_NS = 'http://www.xmldb.org/xupdate'

import string
from xml.dom import Node

from Ft import FtException
from Ft import TranslateMessage
from Ft.Xml import EMPTY_NAMESPACE,XML_NAMESPACE,Domlette
from Ft.Xml.Lib.XmlString import SplitQName,XmlStrStrip,IsXmlSpace
from Ft.Xml.XPath import g_parser
from Ft.Xml.XPath import XPathTypes
from Ft.Xml.XPath import Context,Conversions
from Ft.Xml.Xslt import NullWriter,DomWriter,AttributeValueTemplate
from Ft.Xml.Xslt.CopyOfElement import CopyNode

class XUpdateException(FtException):
    """
    Exception class for errors specific to XUpdate processing
    """
    SYNTAX_ERROR = 1
    UNRECOGNIZED_INSTRUCTION = 2
    NO_VERSION = 10
    NO_SELECT = 11
    NO_TEST = 12
    INVALID_SELECT = 13
    UNSUPPORTED_VERSION = 14
    INVALID_DOM_NODE = 100
    UNKNOWN_NODE_TYPE = 101

    def __init__(self, errorCode, *args, **kwargs):
        FtException.__init__(self, errorCode, g_errorMessages, args, **kwargs)


g_errorMessages = {
    XUpdateException.SYNTAX_ERROR: _('Syntax error in expression %(expr)r: %(err)s'),
    XUpdateException.UNRECOGNIZED_INSTRUCTION: _('Unrecognized instruction in XUpdate namespace: %(name)r'),
    XUpdateException.NO_VERSION: _('Missing required version attribute'),
    XUpdateException.NO_SELECT: _('Missing required select attribute'),
    XUpdateException.NO_TEST: _('Missing required "test" attribute'),
    XUpdateException.INVALID_SELECT: _('select expression "%(expr)s" must evaluate to a non-empty node-set'),
    XUpdateException.UNSUPPORTED_VERSION: _('XUpdate version %(version)s unsupported by this implementation'),
    XUpdateException.INVALID_DOM_NODE: _('Invalid DOM node %(node)r'),
    XUpdateException.UNKNOWN_NODE_TYPE: _('Unknown node type %(nodetype)r'),
    }

SUPPORTED_VERSIONS = ('1.0',)

class StringWriter(NullWriter.NullWriter):
    def __init__(self):
        self._result = []

    def getResult(self):
        return u''.join(self._result)

    def text(self, data):
        self._result.append(data)
        return


class Processor:
    def __init__(self, reader=None):
        self.writers = [NullWriter.NullWriter(None)]
        return

    #needed by CopyNode
    def output(self):
        """Returns the current output writer"""
        return self.writers[-1]

    def pushDomResult(self, ownerDocument):
        self.writers.append(DomWriter.DomWriter(ownerDocument))
        return

    def pushStringResult(self):
        self.writers.append(StringWriter())
        return

    def popResult(self):
        return self.writers.pop().getResult()

    def execute(self, node, xupdate, variables=None, processorNss=None):
        if variables is None:
            variables = {}
        if processorNss is None:
            processorNss = {}

        context = Context.Context(node, varBindings=variables,
                                  processorNss=processorNss)
        self.visit(context, xupdate, 0)
        return node

    def visit(self, context, node, preserveSpace):
        try:
            node_type = node.nodeType
        except AttributeError:
            raise XUpdateException(XUpdateException.INVALID_DOM_NODE,
                                   node=node)

        try:
            visit = self._dispatch_node[node_type]
        except KeyError:
            # unknown node type, try and get a "pretty" name for the error
            node_types = {}
            for name in dir(Node):
                if name.endswith('_NODE'):
                    node_types[getattr(Node, name)] = name
            node_type = node_types.get(node_type, node_type)
            raise XUpdateException(XUpdateException.INVALID_DOM_NODE,
                                   nodetype=node_type)
        else:
            visit(self, context, node, preserveSpace)
        return

    # -- NodeType Dispatch ---------------------------------------------
    _dispatch_node = {}

    def _visit_document(self, context, node, preserveSpace):
        element = node.documentElement
        if element is not None and \
               element.namespaceURI == XUPDATE_NS and \
               element.localName == 'modifications':
            version = element.getAttributeNS(EMPTY_NAMESPACE, 'version')
            if not version:
                raise XUpdateException(XUpdateException.NO_VERSION)
            if version not in SUPPORTED_VERSIONS:
                raise XUpdateException(XUpdateException.UNSUPPORTED_VERSION,
                                       version=version)
            for node in element.childNodes:
                self.visit(context, node, preserveSpace)
        return
    _dispatch_node[Node.DOCUMENT_NODE] = _visit_document

    def _visit_text(self, context, node, preserveSpace):
        if preserveSpace or not IsXmlSpace(node.data):
            self.writers[-1].text(node.data)
        return
    _dispatch_node[Node.TEXT_NODE] = _visit_text

    def _visit_element(self, context, element, preserveSpace):
        xml_space = element.getAttributeNS(XML_NAMESPACE, 'space')
        if xml_space == 'preserve':
            preserveSpace = 1
        elif xml_space == 'default':
            preserveSpace = 0
        # else, no change

        if element.namespaceURI != XUPDATE_NS:
            self.writers[-1].startElement(element.nodeName,
                                          element.namespaceURI)

            # Process the attributes
            for attr in element.attributes.values():
                self.writers[-1].attribute(attr.nodeName, attr.value,
                                           attr.namespaceURI)

            # Now the children
            for node in element.childNodes:
                self.visit(context, node, preserveSpace)

            self.writers[-1].endElement(element.nodeName,
                                        element.namespaceURI)
        else:
            try:
                visit = self._dispatch_xupdate[element.localName]
            except KeyError:
                # unknown/unsupported local-name
                raise XUpdateException(
                    XUpdateException.UNRECOGNIZED_INSTRUCTION,
                    name=element.localName)
            else:
                visit(self, context, element, preserveSpace)
        return
    _dispatch_node[Node.ELEMENT_NODE] = _visit_element

    def _visit_ignorable(self, context, node, preserveSpace):
        return
    _dispatch_node[Node.COMMENT_NODE] = _visit_ignorable
    _dispatch_node[Node.PROCESSING_INSTRUCTION_NODE] = _visit_ignorable

    # -- XUpdate Dispatch ----------------------------------------------
    _dispatch_xupdate = {}

    ### Commands ###

    def _xu_remove(self, context, element, preserveSpace):
        select = element.getAttributeNS(EMPTY_NAMESPACE, u'select')
        if not select:
            raise XUpdateException(XUpdateException.NO_SELECT)

        oldNss = context.processorNss
        context.processorNss = Domlette.GetAllNs(element)
        nodeset = self.evaluateExpression(context, select)
        if nodeset:
            refnode = nodeset[0]
            if refnode.nodeType == Node.ATTRIBUTE_NODE:
                parent = refnode.ownerElement
                parent.removeAttributeNode(refnode)
            else:
                parent = refnode.parentNode
                if parent is None:
                    parent = refnode.ownerDocument
                parent.removeChild(nodeset[0])
        context.processorNss = oldNss
        return
    _dispatch_xupdate['remove'] = _xu_remove

    def _xu_append(self, context, element, preserveSpace):
        select = element.getAttributeNS(EMPTY_NAMESPACE, u'select')
        if not select:
            raise XUpdateException(XUpdateException.NO_SELECT)

        child = element.getAttributeNS(EMPTY_NAMESPACE, u'child') or u'last()'

        oldNss = context.processorNss
        context.processorNss = Domlette.GetAllNs(element)

        nodeset = self.evaluateExpression(context, select)
        if not isinstance(nodeset, Types.NodesetType) or not nodeset:
            # Error if not a node-set or empty node-set
            raise XUpdateException(XUpdateException.INVALID_SELECT,
                                   expr=select)

        for refnode in nodeset:
            self.pushDomResult(refnode.ownerDocument)
            # A wrapper element is used in case attributes are being added
            wrapper_localName = 'wrapper'
            wrapper_namespace = EMPTY_NAMESPACE
            try:
                self.writers[-1].startElement(wrapper_localName,
                                              wrapper_namespace)
                for node in element.childNodes:
                    self.visit(context, node, preserveSpace)
            finally:
                self.writers[-1].endElement(wrapper_localName,
                                            wrapper_namespace)
                result = self.popResult()

            size = len(refnode.childNodes)
            con = Context.Context(refnode, 1, size,
                                  processorNss={'xupdate': XUPDATE_NS})
            # Python lists is 0-indexed counting, node-sets 1-indexed
            position = self.evaluateExpression(con, child)
            position = int(Conversions.NumberValue(position))

            wrapper = result.childNodes[0]
            if wrapper.attributes and hasattr(refnode, 'setAttributeNodeNS'):
                for attr in wrapper.attributes.values():
                    refnode.setAttributeNodeNS(attr)

            # we operate on a shallow copy of the child nodes here to avoid
            # modifying the membership of the sequence we're interating over.
            for node in tuple(wrapper.childNodes):
                if position >= size:
                    refnode.appendChild(node)
                else:
                    refnode.insertBefore(node, refnode.childNodes[position])

        context.processorNss = oldNss
        return
    _dispatch_xupdate['append'] = _xu_append

    def _xu_insert(self, context, element, preserveSpace):
        select = element.getAttributeNS(EMPTY_NAMESPACE, u'select')
        if not select:
            raise XUpdateException(XUpdateException.NO_SELECT)

        oldNss = context.processorNss
        context.processorNss = Domlette.GetAllNs(element)

        nodeset = self.evaluateExpression(context, select)
        if not nodeset:
            raise XUpdateException(XUpdateException.INVALID_SELECT,
                                   expr=select)

        for refnode in nodeset:
            self.pushDomResult(refnode.ownerDocument)
            try:
                for child in element.childNodes:
                    self.visit(context, child, preserveSpace)
            finally:
                result = self.popResult()

            if element.localName == 'insert-before':
                refnode.parentNode.insertBefore(result, refnode)
            elif element.localName == 'insert-after':
                # if arg 2 is None, insertBefore behaves like appendChild
                refnode.parentNode.insertBefore(result, refnode.nextSibling)
        context.processorNss = oldNss
        return
    _dispatch_xupdate['insert-after'] = _xu_insert
    _dispatch_xupdate['insert-before'] = _xu_insert

    def _xu_update(self, context, element, preserveSpace):
        select = element.getAttributeNS(EMPTY_NAMESPACE, u'select')
        if not select:
            raise XUpdateException(XUpdateException.NO_SELECT)

        oldNss = context.processorNss
        context.processorNss = Domlette.GetAllNs(element)

        nodeset = self.evaluateExpression(context, select)
        if not nodeset:
            raise XUpdateException(XUpdateException.INVALID_SELECT,
                                   expr=select)
        refnode = nodeset[0]

        if refnode.nodeType == Node.ATTRIBUTE_NODE:
            self.pushStringResult()
            try:
                for child in element.childNodes:
                    self.visit(context, child, preserveSpace)
            finally:
                result = self.popResult()
            refnode.value = result
        else:
            self.pushDomResult(refnode.ownerDocument)
            try:
                for child in element.childNodes:
                    self.visit(context, child, preserveSpace)
            finally:
                result = self.popResult()

            while refnode.firstChild:
                refnode.removeChild(refnode.firstChild)

            refnode.appendChild(result)

        context.processorNss = oldNss
        return
    _dispatch_xupdate['update'] = _xu_update

    def _xu_rename(self, context, element, preserveSpace):
        select = element.getAttributeNS(EMPTY_NAMESPACE, u'select')
        if not select:
            raise XUpdateException(XUpdateException.NO_SELECT)

        oldNss = context.processorNss
        context.processorNss = Domlette.GetAllNs(element)

        nodeset = self.evaluateExpression(context, select)
        if not nodeset:
            raise XUpdateException(XUpdateException.INVALID_SELECT,
                                   expr=select)

        new_name = XmlStrStrip(element.firstChild.data)
        (prefix, local) = SplitQName(new_name)
        if prefix:
            namespace = context.processorNss[prefix]
        else:
            namespace = EMPTY_NAMESPACE

        for refnode in nodeset:
            if refnode.nodeType == Node.ATTRIBUTE_NODE:
                parent = refnode.ownerElement
                parent.removeAttributeNode(refnode)
                parent.setAttributeNS(namespace, new_name, refnode.value)
            else:
                parent = refnode.parentNode
                if parent is None:
                    parent = refnode.ownerDocument
                new_elem = refnode.ownerDocument.createElementNS(namespace, new_name)
                parent.replaceChild(new_elem, refnode)
                # Copy any existing attributes to the newly created element
                if refnode.attributes:
                    for attr in refnode.attributes.values():
                        new_elem.setAttributeNodeNS(attr)
                # Now copy any children as well
                while refnode.firstChild:
                    new_elem.appendChild(refnode.firstChild)

        context.processorNss = oldNss
        return
    _dispatch_xupdate['rename'] = _xu_rename

    # Conditional statements are not part of the XUpdate spec,
    # though it has provisions for them because the spec is
    # not so much use without them
    # xupdate:if is a common-sense 4Suite extension
    def _xu_if(self, context, element, preserveSpace):
        test = element.getAttributeNS(EMPTY_NAMESPACE, u'test')
        if not test:
            raise XUpdateException(XUpdateException.NO_TEST)

        oldNss = context.processorNss
        context.processorNss = Domlette.GetAllNs(element)

        result = self.evaluateExpression(context, test)
        if Conversions.BooleanValue(result):
            for node in element.childNodes:
                self.visit(context, node, preserveSpace)

        context.processorNss = oldNss
        return
    _dispatch_xupdate['if'] = _xu_if

    def _xu_variable(self, context, element, preserveSpace):
        name = element.getAttributeNS(EMPTY_NAMESPACE, 'name')
        if not name:
            raise XUpdateException(XUpdateException.NO_NAME)

        select = element.getAttributeNS(EMPTY_NAMESPACE, u'select')
        if select:
            oldNss = context.processorNss
            context.processorNss = Domlette.GetAllNs(element)
            result = self.evaluateExpression(context, select)
            context.processorNss = oldNss
        else:
            result = Conversions.StringValue(element)

        (prefix, local) = SplitQName(name)
        if prefix:
            namespace = context.processorNss[prefix]
        else:
            namespace = EMPTY_NAMESPACE

        context.varBindings[(namespace, local)] = result
        return
    _dispatch_xupdate['variable'] = _xu_variable

    ### Instructions ###

    def _xu_element(self, context, element, preserveSpace):
        name = element.getAttributeNS(EMPTY_NAMESPACE, 'name')
        if not name:
            raise XUpdateException(XUpdateException.NO_NAME)
        _name = self.parseAVT(name)

        namespace = element.getAttributeNS(EMPTY_NAMESPACE, 'namespace')
        _namespace = self.parseAVT(namespace)

        oldNss = context.processorNss
        context.processorNss = Domlette.GetAllNs(element)
        name = _name.evaluate(context)

        namespace = _namespace and _namespace.evaluate(context)

        (prefix, local) = SplitQName(name)
        if not namespace:
            if prefix:
                namespace = context.processorNss[prefix]
            else:
                namespace = EMPTY_NAMESPACE

        self.writers[-1].startElement(name, namespace)
        for child in element.childNodes:
            self.visit(context, child, preserveSpace)
        self.writers[-1].endElement(name, namespace)
        context.processorNss = oldNss
        return
    _dispatch_xupdate['element'] = _xu_element

    def _xu_attribute(self, context, node, preserveSpace):
        name = node.getAttributeNS(EMPTY_NAMESPACE, 'name')
        if not name:
            raise XUpdateException(XUpdateException.NO_NAME)
        _name = self.parseAVT(name)

        namespace = node.getAttributeNS(EMPTY_NAMESPACE, 'namespace')
        _namespace = self.parseAVT(namespace)

        oldNss = context.processorNss
        context.processorNss = Domlette.GetAllNs(node)
        name = _name.evaluate(context)
        namespace = _namespace and _namespace.evaluate(context)

        (prefix, local) = SplitQName(name)
        if not namespace:
            if prefix:
                namespace = context.processorNss[prefix]
            else:
                namespace = EMPTY_NAMESPACE
        self.pushStringResult()
        try:
            for child in node.childNodes:
                self.visit(context, child, preserveSpace)
        finally:
            result = self.popResult()

        self.writers[-1].attribute(name, result, namespace)
        context.processorNss = oldNss
        return
    _dispatch_xupdate['attribute'] = _xu_attribute

    def _xu_text(self, context, element, preserveSpace):
        self.pushStringResult()
        try:
            for node in element.childNodes:
                self.visit(context, node, 1)
        finally:
            result = self.popResult()

        self.writers[-1].text(result)
        return
    _dispatch_xupdate['text'] = _xu_text

    def _xu_processing_instruction(self, context, element, preserveSpace):
        name = element.getAttributeNS(EMPTY_NAMESPACE, 'name')
        if not name:
            raise XUpdateException(XUpdateException.NO_NAME)
        _name = self.parseAVT(name)

        oldNss = context.processorNss
        context.processorNss = Domlette.GetAllNs(node)
        name = _name.evaluate(context)

        self.pushStringResult()
        try:
            for node in element.childNodes:
                self.visit(context, node, preserveSpace)
        finally:
            result = self.popResult()

        self.writers[-1].processingInstruction(name, result)
        context.processorNss = oldNss
        return
    _dispatch_xupdate['processing-instruction'] = _xu_processing_instruction

    def _xu_comment(self, context, element, preserveSpace):
        self.pushStringResult()
        try:
            for node in element.childNodes:
                self.visit(context, node, preserveSpace)
        finally:
            result = self.popResult()

        self.writers[-1].comment(result)
        return
    _dispatch_xupdate['comment'] = _xu_comment

    def _xu_value_of(self, context, element, preserveSpace):
        select = element.getAttributeNS(EMPTY_NAMESPACE, u'select')
        if not select:
            raise XUpdateException(XUpdateException.NO_SELECT)

        oldNss = context.processorNss
        context.processorNss = Domlette.GetAllNs(element)

        result = self.evaluateExpression(context, select)
        if isinstance(result, Types.NodesetType):
            for node in result:
                # should be OK to pass self as processor;
                # CopyNode only needs to access its .writers[-1]
                CopyNode(self, node)
        else:
            # a string, number or boolean
            if not isinstance(result, Types.StringType):
                result = Conversions.StringValue(result)
            self.writers[-1].text(result)
        context.processorNss = oldNss
        return
    _dispatch_xupdate['value-of'] = _xu_value_of

    def evaluateExpression(self, context, expression):
        try:
            parsed_expr = parser.new().parse(expression)
        except SyntaxError, e:
            raise XUpdateException(XUpdateException.SYNTAX_ERROR,
                                   expr=expression, err=str(e))
        else:
            return parsed_expr.evaluate(context)

    def parseAVT(self, avt):
        if avt is None: return None
        return AttributeValueTemplate.AttributeValueTemplate(avt)


class Reader(Domlette.NonvalidatingReaderBase):
    """
    A reader of XUpdate documents. Must contain a fromSrc() method
    that takes an Ft.Xml.InputSource and returns a Domlette document.
    It does not need to detect XUpdate syntax errors.
    """
    fromSrc = Domlette.NonvalidatingReaderBase.parse

# -- XUpdate user API -------------------------------------------------

def ApplyXUpdate(doc, xup):
    """
    Takes 2 InputSources, one for the source document and one for the
    XUpdate instructions.  It returns a DOM node representing the result
    of applying the XUpdate to the source document (the document is
    modified in-place).
    """
    reader = Domlette.NonvalidatingReader
    xureader = Reader()
    processor = Processor()
    source = reader.parse(doc)
    xupdate = xureader.fromSrc(xup)
    processor.execute(source, xupdate)
    #The source has been updated in place
    return source


def ApplyXupdate(doc, xup):
    """
    Deprecated. Use ApplyXUpdate (only the name changed).
    """
    import warnings
    warnings.warn('Deprecated function ApplyXupdate called. Please'
                  ' use ApplyXUpdate (with a capital "U") instead.',
                  DeprecationWarning, 2)
    return ApplyXUpdate(doc, xup)

www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.