########################################################################
# $Header: /var/local/cvsroot/4Suite/Ft/Xml/Xslt/XmlWriter.py,v 1.28 2005/10/19 15:02:59 uogbuji Exp $
"""
XML writer for XSLT output
Copyright 2005 Fourthought, Inc. (USA).
Detailed license and copyright information: http://4suite.org/COPYRIGHT
Project home, documentation, distributions: http://4suite.org/
"""
from Ft.Xml import XML_NAMESPACE,EMPTY_NAMESPACE,EMPTY_PREFIX,XMLNS_NAMESPACE
from Ft.Xml.Lib import XmlPrinter,XmlPrettyPrinter
from Ft.Xml.Lib.XmlString import XmlStrStrip,SplitQName
from Ft.Xml.Xslt import NullWriter,XsltException,Error
DEFAULT_GENERATED_PREFIX = u"org.4suite.4xslt.ns"
class XmlWriter(NullWriter.NullWriter):
"""
Takes events such as those generated by an XSLT processor and
invokes a serializer to produce XML.
"""
GENERATED_PREFIX = DEFAULT_GENERATED_PREFIX + "%s"
def __init__(self, outputParams, stream):
"""
outputParams - instance of Ft.Xml.Xslt.OutputParameters.OutputParameters
stream - a stream that takes a byte stream (not a unicode object)
"""
NullWriter.NullWriter.__init__(self, outputParams)
self._stream = stream
self._inFirstElement = False
self._elementName = None
self._elementUri = None
self._attributes = {}
self._namespaces = [{'': EMPTY_NAMESPACE,
'xml': XML_NAMESPACE,
'xmlns': XMLNS_NAMESPACE}]
self._printer = None
return
def getStream(self):
# Return the output stream without any encoding wrappers
return self._stream
def _completeLastElement(self):
if self._elementName:
# Create the mapping of new namespaces for the printer
namespaces = {}
for prefix, namespace in self._namespaces[-1].items():
if namespace != self._namespaces[-2].get(prefix):
# either added or changed
namespaces[prefix] = namespace or u''
self._printer.startElement(self._elementUri, self._elementName,
namespaces, self._attributes)
self._elementName = self._elementUri = None
self._attributes.clear()
return
def startDocument(self):
self._outputParams.setDefault('version', '1.0')
self._outputParams.setDefault('encoding', 'UTF-8')
self._outputParams.setDefault('indent', 0)
self._outputParams.setDefault('mediaType', 'text/xml')
if self._outputParams.indent:
printer = XmlPrettyPrinter.XmlPrettyPrinter
else:
printer = XmlPrinter.XmlPrinter
self._printer = printer(self._stream,
self._outputParams.encoding.encode('ascii'))
self._cdataSectionElements = self._outputParams.cdataSectionElements
if self._outputParams.utfbom:
#Probably bad if the stream is not UTF-8 (or UTF-7). Up to the user to not do that.
self._stream.write(u'\uFEFF')
if not self._outputParams.omitXmlDeclaration:
self._printer.startDocument(
self._outputParams.version.encode('ascii'),
self._outputParams.standalone)
return
def endDocument(self):
self._completeLastElement()
self._printer.endDocument()
return
def text(self, text, escapeOutput=True):
#print "text", repr(text), escapeOutput
self._completeLastElement()
self._printer.text(text, not escapeOutput)
return
def attribute(self, name, value, namespace=EMPTY_NAMESPACE):
"""
add an attribute to an element
name - the qualified name of the attribute
value - the attribute value: must be Unicode
namespace - must be Unicode or Ft.Xml.EMPTY_NAMESPACE (the default)
Strives for "sanity". For brilliant definition thereof, c.f. Joe English
http://lists.xml.org/archives/xml-dev/200204/msg00170.html
Uses terminology from that article
See also discussions starting
http://lists.fourthought.com/pipermail/4suite-dev/2003-March/001294.html
http://lists.fourthought.com/pipermail/4suite-dev/2003-March/001283.html
Note: attribute output is computed as invoked.
This means that the ugly case
attribute(u"foo", u"bar", "http://some-ns/")
attribute(u"x:foo", u"baz", "http://some-ns/")
will result in the ugly
xmlns:org.4suite.4xslt.ns0="http://some-ns/"
org.4suite.4xslt.ns0:foo="baz"
The user can easily correct this by reversing the
order of the calls
"""
if not self._elementName:
if self._inFirstElement:
raise XsltException(Error.ATTRIBUTE_ADDED_TOO_LATE)
else:
raise XsltException(Error.ATTRIBUTE_ADDED_TO_NON_ELEMENT)
(prefix, local) = SplitQName(name)
if namespace != EMPTY_NAMESPACE:
new_name = self._updateNamespace(prefix, namespace, local, forcePrefix=1)
if new_name: name = new_name
else:
name = local
self._attributes[name] = value
return
def _updateNamespace(self, prefix, namespace, local=u'', forcePrefix=0):
"""
Updates namespace mappings at the current scope
given requested prefix, a namespace, and an optional local name
May decide not to use the given prefix for a variety
of reasons, and if given a local name, it will compute and return
a new node name which can be assigned to any node whose name is
affected by such changes. If forcePrefix==1 and prefix==EMPTY_PREFIX,
it will always change the prefix.
The general approach is as follows:
* If the new namespace/prefix combo is unique in the scope, add
it as is.
* If the prefix is new, but the namespace already present, avoid
psychosis by reusing the existing namespace (even if it means
putting a formerly prefixed node into defaulted namespace form).
Note that this can
cause effective non-conformance in some cases because the XSLT
spec says that all namespace nodes must be copied to the reslt tree
(even if this causes psychosis). There is no mandate that all
ns nodes must be manifestd as matching NS Decls in the serialization,
but if the output is to result tree fragment, the required ns nodes
will simply disappear.
* If the prefix exists, but with a different namespace, generate
a new (and probably rather ugly) prefix.
"""
new_node_name = None
prefix_changed = False
if forcePrefix and prefix == EMPTY_PREFIX:
prefix = self.changePrefix(namespace, forbidEmpty=1)
prefix_changed = True
elif prefix not in self._namespaces[-1]:
if (prefix != EMPTY_PREFIX
and namespace in self._namespaces[-1].values()):
prefix = self.changePrefix(namespace)
prefix_changed = True
else:
self._namespaces[-1][prefix] = namespace
elif self._namespaces[-1][prefix] != namespace:
# An existing prefix/namespace pair that doesn't match what
# we're trying to use. Generate a new prefix.
prefix = self.changePrefix(namespace)
prefix_changed = True
if prefix_changed:
# We changed the prefix, create a new nodeName
if prefix:
new_node_name = prefix + ':' + local
else:
new_node_name = local
return new_node_name
def changePrefix(self, namespace, forbidEmpty=False):
# First use a generated prefix, which might be by reuse of an
# previously generated prefix
suffix = 0
done = False
while not done:
prefix = self.GENERATED_PREFIX % suffix
if (prefix not in self._namespaces[-1]
or self._namespaces[-1].get(prefix) == namespace):
# Found a new or existing and usable namespace declaration
done = True
suffix += 1
# Now see if there is an existing, non-generated prefix we can
# Use instead
# FIXME: a reverse ns hash would make this more efficient
if namespace in self._namespaces[-1].values():
orig_prefix = [ p for (p, n)
in self._namespaces[-1].items()
if n == namespace
][0]
if not (forbidEmpty and orig_prefix == EMPTY_PREFIX):
# Remove the generated prefix, if it was there before
if prefix in self._namespaces[-1]:
del self._namespaces[-1][prefix]
prefix = orig_prefix
self._namespaces[-1][prefix] = namespace
return prefix
def matchesGeneratedPrefix(self, prefix):
return prefix[:len(self.GENERATED_PREFIX)-2] == self.GENERATED_PREFIX
def namespace(self, prefix, namespace):
self._updateNamespace(prefix, namespace)
return
def processingInstruction(self, target, data):
self._completeLastElement()
# I don't think this is correct per Canonical XML 1.0, but we
# have a testcase explicitly for WS in data.
# (http://www.w3.org/TR/xml-c14n#Example-OutsideDoc)
self._printer.processingInstruction(target, XmlStrStrip(data))
return
def comment(self, body):
self._completeLastElement()
self._printer.comment(body)
return
def startElement(self, tagName, namespace=EMPTY_NAMESPACE, extraNss=None):
self._completeLastElement()
if not self._inFirstElement:
self._printer.doctype(tagName, self._outputParams.doctypePublic,
self._outputParams.doctypeSystem)
self._inFirstElement = True
self._elementName = tagName
self._elementUri = namespace
(prefix, local) = SplitQName(tagName)
# Update in-scope namespaces
if extraNss:
namespaces = extraNss.copy()
namespaces.update(self._namespaces[-1])
else:
namespaces = self._namespaces[-1].copy()
namespaces[prefix] = namespace
self._namespaces.append(namespaces)
return
def endElement(self, tagName, namespace=EMPTY_NAMESPACE):
self._completeLastElement()
self._printer.endElement(namespace, tagName)
del self._namespaces[-1]
return
class CdataSectionXmlWriter(XmlWriter):
"""
Converts character data to CDATA sections if the character data
occurs within an element defined as outputting CDATA sections.
"""
def __init__(self, outputParams, stream):
"""
outputParams - instance of Ft.Xml.Xslt.OutputParameters.OutputParameters
stream - a stream that takes a byte stream (not a unicode object)
"""
XmlWriter.__init__(self, outputParams, stream)
self._cdataSectionElements = self._outputParams.cdataSectionElements
self._useCdataSection = [0]
self._buffer = []
return
def _completeLastElement(self):
XmlWriter._completeLastElement(self)
if self._useCdataSection[-1] and self._buffer:
# Write out queued text
self._printer.cdataSection(''.join(self._buffer))
self._buffer = []
return
def startElement(self, tagName, namespace=EMPTY_NAMESPACE, extraNss=None):
XmlWriter.startElement(self, tagName, namespace, extraNss)
(prefix, local) = SplitQName(tagName)
cdata = (namespace, local) in self._cdataSectionElements
self._useCdataSection.append(cdata)
return
def endElement(self, tagName, namespace=EMPTY_NAMESPACE):
XmlWriter.endElement(self, tagName, namespace)
del self._useCdataSection[-1]
return
def text(self, text, escapeOutput=True):
# Only queue text writes when in a cdata section flagged element
if self._useCdataSection[-1]:
# CDATA Sections don't escape, so no need to save flag
self._buffer.append(text)
else:
XmlWriter.text(self, text, escapeOutput)
return
|