xml_access_parser.py :  » Development » SnapLogic » snaplogic » server » auth » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » server » auth » xml_access_parser.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2008-2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id: xml_access_parser.py 6844 2009-03-18 01:05:10Z jbrendel $


from xml.sax import saxutils,handler
from xml.sax import make_parser
from xml.sax.handler import feature_namespaces,ErrorHandler
from xml.sax._exceptions import *
from StringIO import StringIO
from sets import Set
from snaplogic.common.snap_exceptions import SnapFormatError


class _DoubleKeyError(Exception):
    """
    A specialized exception we need only here in this module.

    It is used to indicate a double key in the dictionary.

    """
    def __init__(self, keyname):
        self.__keyname = keyname

    def __str__(self):
        return self.__keyname


class _BuildAccessConfig(handler.ContentHandler):
    """
    A specialized SAX handler for our XML access config files.

    This prepares data structures, which are independent of the
    XML, and which are usable by our auth module to prepare fast
    lookup tables for permissions when individual requests arrive
    later on.

    Therefore, this class 'knows' our syntax, as documented in
    the XmlAccessParser class (see below).

    A key emphasis of this class is to provide good sanity checking
    and error reporting on the input file. The SAX parser is moderately
    helpful with this. On one hand it keeps us updated on the current
    location in the file (when it encounters an element opening or
    closing tag). On the other hand, it doesn't help you at all if you
    process a larger text chunk as element content. In that case, we
    provide a detailed error message, but can only give the location
    of the element-closing tag.

    """
    # These are the permissions that we know about
    __known_permissions  = Set([ 'read', 'write', 'execute' ])

    # These elements may only appear once...
    __single_elements    = [ 'AccessConfig', "Users", "Groups", "UserGroups", "Roles", "ACLs" ]
    # ... and these can appear more than once
    __multi_elements     = [ "Location" ]

    __elements = __single_elements + __multi_elements

    __implied_groups     = [ 'public', 'known' ]


    def __init__(self, user_list):
        """
        Initialize the handler.

        """
        # Some stuff related to XML parsing
        self.__locator          = None
        self.__in_content       = False
        self.__content          = None

        # The actual information we want to extract
        self._users_list        = {}
        for u in user_list:
            self._users_list[u] = []    # These are the groups per user. We haven't parsed that portion yet,
                                        # so it's just pre-defined as an empty list for now.
        self._groups_list       = {}
        self._roles_list        = {}
        self._rules_list        = {}

        # A list in which we keep track of the elements we
        # have seen already and which may only appear once.
        self.__seen_elements    = []

        # We need to double check that the root element of
        # the XML document is the correct one. That test
        # only needs to take place once, for the very first
        # element. Here is a flag to help with that.
        self.__first_elem       = True

        # Stores the attributes we see at the start of the
        # element, so that they are available to us at the
        # end, when we do the actual processing. Indexed by
        # the name of the element that we encounter.
        self.__attr_cache       = {}

        # Locations can only be inside of the ACLs element.
        # We use this as flag to keep track of where we are.
        self.__in_acls_element  = False

        # We have parsing functions for the different types of elements
        self.__content_elements = { "Users"      : self.__parse_users_content,
                                    "Groups"     : self.__parse_groups_content,
                                    "UserGroups" : self.__parse_usergroups_content,
                                    "Roles"      : self.__parse_roles_content,
                                    "Location"   : self.__parse_location_content,
                                  }


    def __get_normalized_lines(self, buffer):
        """
        Take a text buffer of multiple lines and normalize it.

        This means that superfluous whitespace and comment lines are
        removed, but that non-empty lines are retained as individual
        lines.

        @param buffer:  Input text buffer.
        @type  buffer:  string

        @return:        List of normalized lines.
        @rtype:         list
        
        """
        return [' '.join(l.split()) for l in buffer.split("\n") if len(l.strip()) > 0  and  not l.strip().startswith('#')] 


    def startElement(self, name, attr):
        """
        Process the beginning of a new XML element.

        For different elements we need to do different things.
        Some may only occur once in the config file, others
        multiple times, but only inside of certain other elements.
        This is enforced here.

        @param name:    Name of element.
        @type  name:    string

        @param attr:    Any attributes that the element may have.
        @type  attr:    L{xml.sax.xmlreader.AttributesImpl}

        """
        if name not in self.__elements:
            raise SAXParseException("Unknown element <%s>" % name, None, self.__locator)

        elif self.__first_elem:
            if name != "AccessConfig":
                raise SAXParseException("Must have <AccessConfig> as root element.", None, self.__locator)
            self.__first_elem = False

        elif name == "ACLs":
            self.__in_acls_element = True

        elif name in self.__content_elements:
            self.__in_content = True
            self.__content    = ""
            if name in self.__single_elements:
                if name in self.__seen_elements:
                    raise SAXParseException("Only one <%s> element allowed in config file." % name, None, self.__locator)
                self.__seen_elements.append(name)

        # The attributes that are passed in here may be important when
        # we finally get around to process the contents of the element.
        # We do this processing once we have reached the end of the
        # element, so we need to store the attributes somewhere. Assuming
        # that elements of the same name cannot be nested, we can use
        # this simple cache here, to get back to the attributes for the
        # element once we reach its end.
        self.__attr_cache[name] = attr


    def characters(self, ch):
        """
        Assemble the content buffer.

        Textual content of an XML element causes the SAX parser
        to call this function here, one piece of text at a time.
        We don't know if this is going to be by character, word,
        line or whatever.

        @param ch:  Some more text.
        @type  ch:  string

        """
        if self.__in_content:
            self.__content += ch


    def endElement(self, name):
        """
        Process the end of an XML element.

        Some special processing for some elements. However, most of
        them just have their registered handler function called.
        (in the __content_elements dictionary). The content buffer
        is cleaned and passed into the handler function.

        @param name:    Name of element.
        @type  name:    string

        """

        if name == "ACLs":
            self.__in_acls_element = False
        elif name in self.__content_elements:
            lines = self.__get_normalized_lines(self.__content)
            self.__content    = None
            self.__in_content = False
            self.__content_elements[name](lines)

        # Don't need the attributes in the cache anymore
        try:
            del self.__attr_cache[name]
        except:
            pass


    def setDocumentLocator(self, locator):
        """
        Update current location in the document.

        By providing this function, we give the SAX parser the chance
        to keep us up to date on where in the document it currently
        is.

        @param locator:     Location information.
        @type  locator:     L{xml.sax.expatreader.ExpatLocator}

        """
        self.__locator = locator


    def __parse_content(self, lines):
        """
        Parse a content block and return dictionary with content.

        We have several elements in the config file, which are made up
        of free-text information blocks (pieces of data, not organized
        via XML). For example, the users and groups are defined that way,
        just consisting of a name followed by a description, all on one
        line:

                foo This is the group with all foo-users.
                bar And this is another group.

        This function here takes such a content block and parses it
        into a dictionary, keyed by the first word and containing the
        remainder of each line as values:

            { "foo" : "This is the group with all foo-users.",
              "bar" : "And this is another group.
            }

        This will throw an exception if one of the keys (first words of
        each line) appears more than once.

        @param lines:   List of lines in the content block.
        @type  lines:   list

        @return:        Dictionary of content lines.
        @rtype:         dict

        """
        d = {}
        for l in lines:
            elems = l.split(" ",1)
            name = elems[0]
            if len(elems) > 1:
                description = elems[1]
            else:
                description = ""
            if name in d:
                raise _DoubleKeyError(name)
            d[name] = description

        return d

        
    def __parse_users_content(self, lines):
        """
        Parse content block of <Users> element.

        @param lines:   List of lines in the content block.
        @type  lines:   list

        """
        #
        # Users are now defined via the user/password file. We only keep
        # this 'parse' function here so that we can merely ignore the
        # content of the <Users> element.
        #
        return


    def __parse_groups_content(self, lines):
        """
        Parse content block of <Groups> element.

        @param lines:   List of lines in the content block.
        @type  lines:   list

        """
        try:
            self._groups_list = self.__parse_content(lines)
        except _DoubleKeyError, e:
            raise SAXParseException("Group '%s' is specified more than once in <Groups> block." % str(e), None, self.__locator)
            

    def __parse_usergroups_content(self, lines):
        """
        Parse content block of <UserGroups> element.

        @param lines:   List of lines in the content block.
        @type  lines:   list

        """
        try:
            d = self.__parse_content(lines)
        except _DoubleKeyError, e:
            raise SAXParseException("User '%s' is specified more than once in <UserGroups> block." % str(e), None, self.__locator)

        # Take the user/groups information and update our user list with groups,
        # and our group list with the users. In the process, the values in the
        # users and groups dictionaries are changed from strings (containing only
        # the description) to tuples that contain the description and a list of
        # users/groups.
        for user in d:
            if user not in self._users_list:
                raise SAXParseException("User '%s' is specified in <UserGroups> block, but not in user/password file." % user, None, self.__locator)
            group_list = self._users_list[user]
            groups = d[user].split()
            if len(groups) == 0:
                raise SAXParseException("No groups specified for user '%s' in <UserGroups> block." % user, None, self.__locator)
            for group in groups:
                group_list.append(group)
                try:
                    # See if this group already has the tuple as
                    # value.
                    (gdesc, ulist) = self._groups_list[group]
                except KeyError:
                    # Looks like we don't even have that group...
                    raise SAXParseException("Group '%s' is used in <UserGroups> block for user '%s', but is not specified in <Groups> block." %
                                                        (group, user), None, self.__locator)
                except ValueError:
                    # No, this group still only contains the description
                    # string, not a tuple yet. In that case, we just
                    # make the empty starter list and extract the description.
                    gdesc = self._groups_list[group]
                    ulist = []

                # Add the current user to the list and write the
                # updated tuple back into the groups list.
                ulist.append(user)
                self._groups_list[group] = (gdesc, ulist)


    def __parse_roles_content(self, lines):
        """
        Parse content block of <Roles> element.

        @param lines:   List of lines in the content block.
        @type  lines:   list

        """
        #
        # Note that roles are being phased out. They are read and supported
        # for reasons of backwards compatibility. However, when the server
        # writes the ACl config file, it will not write roles out anymore.
        # Instead, roles are translated on the fly to their permission set.
        try:
            self._roles_list = self.__parse_content(lines)
        except _DoubleKeyError, e:
            raise SAXParseException("Role '%s' is specified more than once in <Roles> block." % str(e), None, self.__locator)

        # Need to convert the permissions string into a list,
        # and check for sanity of the specified permissions.
        for role in self._roles_list:
            permissions = self._roles_list[role]
            pset = Set(permissions.split())
            if not pset:
                raise SAXParseException("Empty permissions set for role %s in <Roles> block." % role, None, self.__locator)
            # There shouldn't be any permissions that we don't know about already.
            # We quickly get a list of all those unknown ones...
            diff = pset.intersection(pset.difference(self.__known_permissions))
            if diff:
                dl = []
                dl.extend(diff)
                raise SAXParseException("Unknown permission(s) %s is specified in <Roles> block." % dl, None, self.__locator)
            self._roles_list[role] = []
            self._roles_list[role].extend(pset)


    def __parse_location_content(self, lines):
        """
        Parse content block of <Location> element.

        @param lines:   List of lines in the content block.
        @type  lines:   list

        """
        if not self.__in_acls_element:
            raise SAXParseException("<Location> element can only appear within <ACLs> block.", None, self.__locator)
        # The lines need to be passed in as lists of strings. We have
        # that already, but really need some sanity checking first.
        rule_list = []
        for line in lines:
            d = {}
            words = line.split()
            if len(words) < 3:
                raise SAXParseException("Malformed rule '%s' in <Location> block." % line, None, self.__locator)
            # Checking the sanity of the verb and principal first...
            verb = words[0].lower()
            if verb not in [ 'allow', 'deny' ]:
                raise SAXParseException("Rule '%s...' does not start with 'allow' or 'deny' in <Location> block." % line[:6], None, self.__locator)
            principal = words[1].lower()
            if principal not in [ 'user', 'group' ]:
                raise SAXParseException("Expected 'user' or 'group' in rule '%s...' in <Location> block. Instead got: %s" %
                                            (line[:6], principal), None, self.__locator)
            in_roles       = False
            in_permissions = False
            in_principals  = True
            d['verb']           = verb
            d['principal_type'] = principal
            d['principal']      = []
            for (index, word) in enumerate(words[2:]):
                word = word.lower()
                if word in [ 'role', 'permission' ]:
                    if index == 0:
                        raise SAXParseException("Malformed rule '%s' in <Location> block. No principal was specified." % line, None, self.__locator)
                    if verb == 'deny':
                        raise SAXParseException("Malformed rule '%s' in <Location> block. 'deny' does not accept 'role' or 'permission'." % line, None, self.__locator)
                    if in_roles or in_permissions:
                        raise SAXParseException("Malformed rule '%s' in <Location> block. 'role' and 'permission' can only appear once." % line, None, self.__locator)
                    d['permissions'] = []
                    if word == 'role':
                        in_roles = True
                    else:
                        in_permissions = True
                    in_principals = False
                elif in_principals:
                    if principal == 'user':
                        if word not in self._users_list:
                            raise SAXParseException("Rule '%s' in <Location> block contains unknown user '%s'." % (line, word), None, self.__locator)
                    else:
                        if word not in self._groups_list.keys() + self.__implied_groups:
                            raise SAXParseException("Rule '%s' in <Location> block contains unknown group '%s'." % (line, word), None, self.__locator)
                    d['principal'].append(word)
                elif in_roles:
                    if word not in self._roles_list:
                        raise SAXParseException("Rule '%s' in <Location> block contains unknown role '%s'." % (line, word), None, self.__locator)
                    pset = self._roles_list[word]
                    # Roles are translated on the fly to their permissions set
                    d['permissions'].extend(pset)
                elif in_permissions:
                    if word not in self.__known_permissions:
                        raise SAXParseException("Rule '%s' in <Location> block contains unknown permission '%s'." % (line, word), None, self.__locator)
                    d['permissions'].append(word)

            if d['verb'] == 'allow':
                if not d.has_key('roles')  and  not d.has_key('permissions'):
                    raise SAXParseException("'allow' rule '%s' in <Location> block does not contain permission or roles." % line, None, self.__locator)

            rule_list.append(d)

        # The path component to which these rules apply was
        # specified as a 'name' attribute for the Location element.
        try:
            path = self.__attr_cache["Location"]['name']
        except:
            raise SAXParseException("<Location> block requires attribute 'name'.", None, self.__locator)
        try:
            description = self.__attr_cache["Location"]['description']
        except:
            description = None

        #self._rules_list[path] = lines
        self._rules_list[path] = (rule_list, description)


class XmlAccessParser(object):
    """
    Parses an XML config file for our special access information.

    Any provider of access information needs to implement the following
    interface:

        get_users_and_groups():
            This returns a dictionary, with user as key and a list
            of groups as values.

        get_rules():
            This returns a dictionary with URI path as key and a list
            of rules as value. Each rule is a dictionary. See the docstring
            for the get_rules() function for more information.

    Please note: An AccessParser class must take care of all syntax and
    semantic checking.

    """
    def __init__(self, fname, user_list):
        """
        Initialize the XmlAccessParser class.

        Since we are using SAX here, we create a SAX parser and pass it
        our own handler class.

        @param fname:       File name of snap-access config file.
        @type  fname:       string

        @param user_list:   List of user names.
        @type  user_list:   list

        """
        # Create a parser
        parser = make_parser()

        # Tell the parser we are not interested in XML namespaces
        parser.setFeature(feature_namespaces, 0)

        # Create the handler
        self.__dh = _BuildAccessConfig(user_list)

        # Tell the parser to use our handler
        parser.setContentHandler(self.__dh)

        # Parse the input
        f   = open(fname, "r")
        buf = StringIO()
        for l in f:
            # We strip comment lines out, since we want to allow them
            # in places where XML would normally not like to see them.
            if l.strip().startswith('#'):
                l = "\n"
            buf.write(l)
        f.close()
        buf.seek(0)
        # The version of the config file with the comments stripped out
        # is now in a StringIO buffer, and we use that one for the actual
        # parsing.
        try:
            parser.parse(buf)
        except SAXParseException, e:
            raise SnapFormatError(str(e))

        """
        if not self.__dh._users_list:
            raise SnapFormatError("No users were specified.")
            
        if not self.__dh._groups_list:
            raise SnapFormatError("No groups were specified.")

        if not self.__dh._rules_list:
            raise SnapFormatError("No rules were specified.")
        """


    def get_users_and_groups(self):
        """
        Return dictionary with user as key and list of groups as values.

        @return:    User/groups dictionary.
        @rtype:     dict

        """
        return self.__dh._users_list

                
    def get_rules(self):
        """
        Return a dictionary with rules for URIs, the URI as index and
        a list of rules as values. Each rule is a dictionary, and each
        element in the dictionary describes a rule element. For example:

            { "verb"           : "allow",
              "principal_type" : "user",
              "principal"      : [ 'mike', 'chris' ],
              "roles"          : None,
              "permissions"    : [ "operator", "user" ]
            }

        Alternatively, the 'permissions' may be 'None' and the 'roles'
        have a list of role names as value.
           
        A 'deny' role does not have any roles or permissions attached,
        meaning that these elements are missing in that case.

        @return:    Lookup table between URI and list of rules.
        @rtype:     dict

        """
        return self.__dh._rules_list
  
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.