# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008-2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: xml_access_parser.py 6844 2009-03-18 01:05:10Z jbrendel $
from xml.sax import saxutils,handler
from xml.sax import make_parser
from xml.sax.handler import feature_namespaces,ErrorHandler
from xml.sax._exceptions import *
from StringIO import StringIO
from sets import Set
from snaplogic.common.snap_exceptions import SnapFormatError
class _DoubleKeyError(Exception):
"""
A specialized exception we need only here in this module.
It is used to indicate a double key in the dictionary.
"""
def __init__(self, keyname):
self.__keyname = keyname
def __str__(self):
return self.__keyname
class _BuildAccessConfig(handler.ContentHandler):
"""
A specialized SAX handler for our XML access config files.
This prepares data structures, which are independent of the
XML, and which are usable by our auth module to prepare fast
lookup tables for permissions when individual requests arrive
later on.
Therefore, this class 'knows' our syntax, as documented in
the XmlAccessParser class (see below).
A key emphasis of this class is to provide good sanity checking
and error reporting on the input file. The SAX parser is moderately
helpful with this. On one hand it keeps us updated on the current
location in the file (when it encounters an element opening or
closing tag). On the other hand, it doesn't help you at all if you
process a larger text chunk as element content. In that case, we
provide a detailed error message, but can only give the location
of the element-closing tag.
"""
# These are the permissions that we know about
__known_permissions = Set([ 'read', 'write', 'execute' ])
# These elements may only appear once...
__single_elements = [ 'AccessConfig', "Users", "Groups", "UserGroups", "Roles", "ACLs" ]
# ... and these can appear more than once
__multi_elements = [ "Location" ]
__elements = __single_elements + __multi_elements
__implied_groups = [ 'public', 'known' ]
def __init__(self, user_list):
"""
Initialize the handler.
"""
# Some stuff related to XML parsing
self.__locator = None
self.__in_content = False
self.__content = None
# The actual information we want to extract
self._users_list = {}
for u in user_list:
self._users_list[u] = [] # These are the groups per user. We haven't parsed that portion yet,
# so it's just pre-defined as an empty list for now.
self._groups_list = {}
self._roles_list = {}
self._rules_list = {}
# A list in which we keep track of the elements we
# have seen already and which may only appear once.
self.__seen_elements = []
# We need to double check that the root element of
# the XML document is the correct one. That test
# only needs to take place once, for the very first
# element. Here is a flag to help with that.
self.__first_elem = True
# Stores the attributes we see at the start of the
# element, so that they are available to us at the
# end, when we do the actual processing. Indexed by
# the name of the element that we encounter.
self.__attr_cache = {}
# Locations can only be inside of the ACLs element.
# We use this as flag to keep track of where we are.
self.__in_acls_element = False
# We have parsing functions for the different types of elements
self.__content_elements = { "Users" : self.__parse_users_content,
"Groups" : self.__parse_groups_content,
"UserGroups" : self.__parse_usergroups_content,
"Roles" : self.__parse_roles_content,
"Location" : self.__parse_location_content,
}
def __get_normalized_lines(self, buffer):
"""
Take a text buffer of multiple lines and normalize it.
This means that superfluous whitespace and comment lines are
removed, but that non-empty lines are retained as individual
lines.
@param buffer: Input text buffer.
@type buffer: string
@return: List of normalized lines.
@rtype: list
"""
return [' '.join(l.split()) for l in buffer.split("\n") if len(l.strip()) > 0 and not l.strip().startswith('#')]
def startElement(self, name, attr):
"""
Process the beginning of a new XML element.
For different elements we need to do different things.
Some may only occur once in the config file, others
multiple times, but only inside of certain other elements.
This is enforced here.
@param name: Name of element.
@type name: string
@param attr: Any attributes that the element may have.
@type attr: L{xml.sax.xmlreader.AttributesImpl}
"""
if name not in self.__elements:
raise SAXParseException("Unknown element <%s>" % name, None, self.__locator)
elif self.__first_elem:
if name != "AccessConfig":
raise SAXParseException("Must have <AccessConfig> as root element.", None, self.__locator)
self.__first_elem = False
elif name == "ACLs":
self.__in_acls_element = True
elif name in self.__content_elements:
self.__in_content = True
self.__content = ""
if name in self.__single_elements:
if name in self.__seen_elements:
raise SAXParseException("Only one <%s> element allowed in config file." % name, None, self.__locator)
self.__seen_elements.append(name)
# The attributes that are passed in here may be important when
# we finally get around to process the contents of the element.
# We do this processing once we have reached the end of the
# element, so we need to store the attributes somewhere. Assuming
# that elements of the same name cannot be nested, we can use
# this simple cache here, to get back to the attributes for the
# element once we reach its end.
self.__attr_cache[name] = attr
def characters(self, ch):
"""
Assemble the content buffer.
Textual content of an XML element causes the SAX parser
to call this function here, one piece of text at a time.
We don't know if this is going to be by character, word,
line or whatever.
@param ch: Some more text.
@type ch: string
"""
if self.__in_content:
self.__content += ch
def endElement(self, name):
"""
Process the end of an XML element.
Some special processing for some elements. However, most of
them just have their registered handler function called.
(in the __content_elements dictionary). The content buffer
is cleaned and passed into the handler function.
@param name: Name of element.
@type name: string
"""
if name == "ACLs":
self.__in_acls_element = False
elif name in self.__content_elements:
lines = self.__get_normalized_lines(self.__content)
self.__content = None
self.__in_content = False
self.__content_elements[name](lines)
# Don't need the attributes in the cache anymore
try:
del self.__attr_cache[name]
except:
pass
def setDocumentLocator(self, locator):
"""
Update current location in the document.
By providing this function, we give the SAX parser the chance
to keep us up to date on where in the document it currently
is.
@param locator: Location information.
@type locator: L{xml.sax.expatreader.ExpatLocator}
"""
self.__locator = locator
def __parse_content(self, lines):
"""
Parse a content block and return dictionary with content.
We have several elements in the config file, which are made up
of free-text information blocks (pieces of data, not organized
via XML). For example, the users and groups are defined that way,
just consisting of a name followed by a description, all on one
line:
foo This is the group with all foo-users.
bar And this is another group.
This function here takes such a content block and parses it
into a dictionary, keyed by the first word and containing the
remainder of each line as values:
{ "foo" : "This is the group with all foo-users.",
"bar" : "And this is another group.
}
This will throw an exception if one of the keys (first words of
each line) appears more than once.
@param lines: List of lines in the content block.
@type lines: list
@return: Dictionary of content lines.
@rtype: dict
"""
d = {}
for l in lines:
elems = l.split(" ",1)
name = elems[0]
if len(elems) > 1:
description = elems[1]
else:
description = ""
if name in d:
raise _DoubleKeyError(name)
d[name] = description
return d
def __parse_users_content(self, lines):
"""
Parse content block of <Users> element.
@param lines: List of lines in the content block.
@type lines: list
"""
#
# Users are now defined via the user/password file. We only keep
# this 'parse' function here so that we can merely ignore the
# content of the <Users> element.
#
return
def __parse_groups_content(self, lines):
"""
Parse content block of <Groups> element.
@param lines: List of lines in the content block.
@type lines: list
"""
try:
self._groups_list = self.__parse_content(lines)
except _DoubleKeyError, e:
raise SAXParseException("Group '%s' is specified more than once in <Groups> block." % str(e), None, self.__locator)
def __parse_usergroups_content(self, lines):
"""
Parse content block of <UserGroups> element.
@param lines: List of lines in the content block.
@type lines: list
"""
try:
d = self.__parse_content(lines)
except _DoubleKeyError, e:
raise SAXParseException("User '%s' is specified more than once in <UserGroups> block." % str(e), None, self.__locator)
# Take the user/groups information and update our user list with groups,
# and our group list with the users. In the process, the values in the
# users and groups dictionaries are changed from strings (containing only
# the description) to tuples that contain the description and a list of
# users/groups.
for user in d:
if user not in self._users_list:
raise SAXParseException("User '%s' is specified in <UserGroups> block, but not in user/password file." % user, None, self.__locator)
group_list = self._users_list[user]
groups = d[user].split()
if len(groups) == 0:
raise SAXParseException("No groups specified for user '%s' in <UserGroups> block." % user, None, self.__locator)
for group in groups:
group_list.append(group)
try:
# See if this group already has the tuple as
# value.
(gdesc, ulist) = self._groups_list[group]
except KeyError:
# Looks like we don't even have that group...
raise SAXParseException("Group '%s' is used in <UserGroups> block for user '%s', but is not specified in <Groups> block." %
(group, user), None, self.__locator)
except ValueError:
# No, this group still only contains the description
# string, not a tuple yet. In that case, we just
# make the empty starter list and extract the description.
gdesc = self._groups_list[group]
ulist = []
# Add the current user to the list and write the
# updated tuple back into the groups list.
ulist.append(user)
self._groups_list[group] = (gdesc, ulist)
def __parse_roles_content(self, lines):
"""
Parse content block of <Roles> element.
@param lines: List of lines in the content block.
@type lines: list
"""
#
# Note that roles are being phased out. They are read and supported
# for reasons of backwards compatibility. However, when the server
# writes the ACl config file, it will not write roles out anymore.
# Instead, roles are translated on the fly to their permission set.
try:
self._roles_list = self.__parse_content(lines)
except _DoubleKeyError, e:
raise SAXParseException("Role '%s' is specified more than once in <Roles> block." % str(e), None, self.__locator)
# Need to convert the permissions string into a list,
# and check for sanity of the specified permissions.
for role in self._roles_list:
permissions = self._roles_list[role]
pset = Set(permissions.split())
if not pset:
raise SAXParseException("Empty permissions set for role %s in <Roles> block." % role, None, self.__locator)
# There shouldn't be any permissions that we don't know about already.
# We quickly get a list of all those unknown ones...
diff = pset.intersection(pset.difference(self.__known_permissions))
if diff:
dl = []
dl.extend(diff)
raise SAXParseException("Unknown permission(s) %s is specified in <Roles> block." % dl, None, self.__locator)
self._roles_list[role] = []
self._roles_list[role].extend(pset)
def __parse_location_content(self, lines):
"""
Parse content block of <Location> element.
@param lines: List of lines in the content block.
@type lines: list
"""
if not self.__in_acls_element:
raise SAXParseException("<Location> element can only appear within <ACLs> block.", None, self.__locator)
# The lines need to be passed in as lists of strings. We have
# that already, but really need some sanity checking first.
rule_list = []
for line in lines:
d = {}
words = line.split()
if len(words) < 3:
raise SAXParseException("Malformed rule '%s' in <Location> block." % line, None, self.__locator)
# Checking the sanity of the verb and principal first...
verb = words[0].lower()
if verb not in [ 'allow', 'deny' ]:
raise SAXParseException("Rule '%s...' does not start with 'allow' or 'deny' in <Location> block." % line[:6], None, self.__locator)
principal = words[1].lower()
if principal not in [ 'user', 'group' ]:
raise SAXParseException("Expected 'user' or 'group' in rule '%s...' in <Location> block. Instead got: %s" %
(line[:6], principal), None, self.__locator)
in_roles = False
in_permissions = False
in_principals = True
d['verb'] = verb
d['principal_type'] = principal
d['principal'] = []
for (index, word) in enumerate(words[2:]):
word = word.lower()
if word in [ 'role', 'permission' ]:
if index == 0:
raise SAXParseException("Malformed rule '%s' in <Location> block. No principal was specified." % line, None, self.__locator)
if verb == 'deny':
raise SAXParseException("Malformed rule '%s' in <Location> block. 'deny' does not accept 'role' or 'permission'." % line, None, self.__locator)
if in_roles or in_permissions:
raise SAXParseException("Malformed rule '%s' in <Location> block. 'role' and 'permission' can only appear once." % line, None, self.__locator)
d['permissions'] = []
if word == 'role':
in_roles = True
else:
in_permissions = True
in_principals = False
elif in_principals:
if principal == 'user':
if word not in self._users_list:
raise SAXParseException("Rule '%s' in <Location> block contains unknown user '%s'." % (line, word), None, self.__locator)
else:
if word not in self._groups_list.keys() + self.__implied_groups:
raise SAXParseException("Rule '%s' in <Location> block contains unknown group '%s'." % (line, word), None, self.__locator)
d['principal'].append(word)
elif in_roles:
if word not in self._roles_list:
raise SAXParseException("Rule '%s' in <Location> block contains unknown role '%s'." % (line, word), None, self.__locator)
pset = self._roles_list[word]
# Roles are translated on the fly to their permissions set
d['permissions'].extend(pset)
elif in_permissions:
if word not in self.__known_permissions:
raise SAXParseException("Rule '%s' in <Location> block contains unknown permission '%s'." % (line, word), None, self.__locator)
d['permissions'].append(word)
if d['verb'] == 'allow':
if not d.has_key('roles') and not d.has_key('permissions'):
raise SAXParseException("'allow' rule '%s' in <Location> block does not contain permission or roles." % line, None, self.__locator)
rule_list.append(d)
# The path component to which these rules apply was
# specified as a 'name' attribute for the Location element.
try:
path = self.__attr_cache["Location"]['name']
except:
raise SAXParseException("<Location> block requires attribute 'name'.", None, self.__locator)
try:
description = self.__attr_cache["Location"]['description']
except:
description = None
#self._rules_list[path] = lines
self._rules_list[path] = (rule_list, description)
class XmlAccessParser(object):
"""
Parses an XML config file for our special access information.
Any provider of access information needs to implement the following
interface:
get_users_and_groups():
This returns a dictionary, with user as key and a list
of groups as values.
get_rules():
This returns a dictionary with URI path as key and a list
of rules as value. Each rule is a dictionary. See the docstring
for the get_rules() function for more information.
Please note: An AccessParser class must take care of all syntax and
semantic checking.
"""
def __init__(self, fname, user_list):
"""
Initialize the XmlAccessParser class.
Since we are using SAX here, we create a SAX parser and pass it
our own handler class.
@param fname: File name of snap-access config file.
@type fname: string
@param user_list: List of user names.
@type user_list: list
"""
# Create a parser
parser = make_parser()
# Tell the parser we are not interested in XML namespaces
parser.setFeature(feature_namespaces, 0)
# Create the handler
self.__dh = _BuildAccessConfig(user_list)
# Tell the parser to use our handler
parser.setContentHandler(self.__dh)
# Parse the input
f = open(fname, "r")
buf = StringIO()
for l in f:
# We strip comment lines out, since we want to allow them
# in places where XML would normally not like to see them.
if l.strip().startswith('#'):
l = "\n"
buf.write(l)
f.close()
buf.seek(0)
# The version of the config file with the comments stripped out
# is now in a StringIO buffer, and we use that one for the actual
# parsing.
try:
parser.parse(buf)
except SAXParseException, e:
raise SnapFormatError(str(e))
"""
if not self.__dh._users_list:
raise SnapFormatError("No users were specified.")
if not self.__dh._groups_list:
raise SnapFormatError("No groups were specified.")
if not self.__dh._rules_list:
raise SnapFormatError("No rules were specified.")
"""
def get_users_and_groups(self):
"""
Return dictionary with user as key and list of groups as values.
@return: User/groups dictionary.
@rtype: dict
"""
return self.__dh._users_list
def get_rules(self):
"""
Return a dictionary with rules for URIs, the URI as index and
a list of rules as values. Each rule is a dictionary, and each
element in the dictionary describes a rule element. For example:
{ "verb" : "allow",
"principal_type" : "user",
"principal" : [ 'mike', 'chris' ],
"roles" : None,
"permissions" : [ "operator", "user" ]
}
Alternatively, the 'permissions' may be 'None' and the 'roles'
have a list of role names as value.
A 'deny' role does not have any roles or permissions attached,
meaning that these elements are missing in that case.
@return: Lookup table between URI and list of rules.
@rtype: dict
"""
return self.__dh._rules_list
|