ModelValidation.py :  » Database » Modeling-Framework » Modeling-0.9 » Modeling » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » Modeling Framework 
Modeling Framework » Modeling 0.9 » Modeling » ModelValidation.py
# -*- coding: iso-8859-1 -*-
#-----------------------------------------------------------------------------
# Modeling Framework: an Object-Relational Bridge for python
#
# Copyright (c) 2001-2004 Sbastien Bigaret <sbigaret@users.sourceforge.net>
# All rights reserved.
#
# This file is part of the Modeling Framework.
#
# This code is distributed under a "3-clause BSD"-style license;
# see the LICENSE file for details.
#-----------------------------------------------------------------------------


"""
ModelValidation

  Validate a model

  CVS information

    $Id: ModelValidation.py 945 2004-10-18 20:31:45Z sbigaret $
  
"""

__version__='$Revision: 945 $'[11:-2]

import string, types

NOT_SUPPORTED=0
ERROR=1
WARNING=2
INFO=3
DEBUG=4

MSGS_FOR_LEVELS={ NOT_SUPPORTED: 'Not supported feature(s)',
                  ERROR: 'Error(s)',
                  WARNING: 'Warning(s)',
                  INFO: 'Info',
                  DEBUG: 'Debug',
                  }
LEVELS=tuple(MSGS_FOR_LEVELS.keys())

class ModelValidationException(Exception):
  """
  """

  def __init__(self, object=None, level=None, msg=None, ignore_levels=None):
    """
    Initializes the exception, possibly with supplied object, level and msg

    Parameters:

      object -- the object causing the error

      level -- valid levels are DEBUG, INFO, WARNING, ERROR, NOT_SUPPORTED,
               defined by the module

      msg -- informational message

      ignore_levels -- list of level to ignore. Default is (DEBUG,)

    """
    # Ignore levels
    if not ignore_levels:
      self._ignored_levels=(DEBUG,)
    else:
      if type(ignore_levels) not in (type(()), type([])):
        ignore_levels=(ignore_levels,)
      import operator
      if not reduce(operator.__and__,map(lambda l: l in LEVELS,ignore_levels)):
        raise ValueError, 'Parameter ignored_levels has invalid level(s)'
      self._ignored_levels=ignore_levels

    #
    if object is not None:
      if level is None or msg is None:
        raise ValueError,'level AND msg cannot be None when object is not None'
      if level not in LEVELS:
        raise ValueError, 'Invalid level'
      if level not in self._ignored_levels:
        self._dict={object: {level: [msg]}}
    else:
      self._dict={}

      
    
  def aggregateError(self, object, level, msgs):
    """
    Adds the supplied error type in the exception's dictionary
    """
    if level in self._ignored_levels:
      return
    
    if type(msgs)!=types.ListType:
      msgs=[msgs]
    obj_errs=self._dict.get(object, None)
    if obj_errs:
      level_errs=obj_errs.get( level, [] )
      level_errs.extend(msgs)
      obj_errs[level]=level_errs # in case it was created 
    else:
      self._dict[object]={level: msgs}
  
  def aggregateException(self, aModelValidationError):
    """
    Concatenates the supplied ModelValidationException to the current
    one.

    """
    errors=aModelValidationError.errorsDict()
    for object in errors.keys():
      for level in errors[object].keys():
        if level not in self._ignored_levels:
          self.aggregateError(object, level, errors[object][level])
  
  def errorObjects(self):
    "Returns the list of objects for which errors were detected"
    return self._dict.keys()
  
  def errorsDict(self):
    "Returns the error dictionary"
    return self._dict
  errors=errorsDict
  
  def has_errors(self, for_level=None):
    """
    """
    if not for_level:
      return not not self._dict
    if for_level in self._ignored_levels:
      return 0
    for object in self._dict.keys():
      for level in self._dict[object].keys():
        if level==for_level:
          return 1
    return 0
    
  def levels_of_errors(self):
    """
    """
    _res=[]
    for object in self._dict.keys():
      for level in self._dict[object].keys():
        if level not in _res: _res.append(level)
    return _res
  
  def __str__(self):
    "Returns a string representation of the exception"
    _str=""
    for object in self._dict.keys():
      line="Object: %s\n"%object
      _str+=line+(len(line)-1)*'-'+'\n'
      levels=self._dict[object].keys()
      levels.sort()
      for level in levels:
        _str=_str+" * %s:\n"%MSGS_FOR_LEVELS[level]
        for msg in self._dict[object][level]:
          _str=_str+"   - %s\n"%msg
      _str+='\n'
    return _str

  def __call__(self):
    "Returns the receiver's dictionary"
    return self._dict

  def finalize(self):
    """
    Finalize a validation process. If the exception has a non-empty errorsDict
    it raises, otherwise simply return
    """
    if self._dict: raise self
    return


##############################################################################

##
## Model
##
def validateModel(model, errors):
  """
  """
  validateModel_internals(model, errors)
  for entity in model.entities():
    validateEntity(entity, errors)
    
def validateModel_internals(model, errors):
  """
  """
  # ERROR
  msgs=[]
  if not model.adaptorName():
    msgs.append('adaptorName is not defined')
  if not model.packageName():
    msgs.append('packageName is not defined')
  if msgs:
    errors.aggregateError('Model %s'%model.name(), ERROR, msgs)

  # WARNING
  msgs=[]
  if model.adaptorName():
    if model.adaptorName() not in ('Postgresql', 'MySQL', 'SQLite'):
      msgs.append("adaptorName '%s' is not one of the framework's, maybe one"\
                  " of yours"%model.adaptorName())
    # try to find the concrete adaptor
    concreteAdaptor=adaptorForModel(model)
    if not concreteAdaptor:
      msgs.append("Couldnt find concrete Adaptor for '%s'"%model.adaptorName())
    del concreteAdaptor
  if msgs:
    errors.aggregateError('Model %s'%model.name(), WARNING, msgs)

    
##
## Entity
##
def validateEntity(entity, errors):
  """
  """
  validateEntity_internals(entity, errors)
  for attr in entity.attributes():
    validateAttribute(attr, errors)
  for rel in entity.relationships():
    validateRelationship(rel, errors)


def validateEntity_internals(entity, errors):
  """
  """
  # NOT SUPPORTED
  msgs=[]
  if entity.isReadOnly():
    msgs.append('Read-only entities are not supported yet')
  if len(entity.primaryKeyAttributes())>1:
    msgs.append('Compound primary key are not supported yet')
  if entity.isAbstract():
    msgs.append('Abstract entities are not supported yet')

  if msgs:
    errors.aggregateError('Entity %s'%entity.name(), NOT_SUPPORTED, msgs)

  # TBD 'should ``else:'' when abstract entities are supported

  # ERROR
  msgs=[]
  if not entity.moduleName(): msgs.append('module is not defined')
  if not entity.className(): msgs.append('className is not defined')
  if not entity.externalName(): msgs.append('externalName is not defined')
  if not entity.primaryKeyAttributes(): msgs.append('no primary key defined')
  else:
    for pk in entity.primaryKeyAttributes():
      if pk.isClassProperty() and pk.defaultValue()!=0:
        msgs.append("PK '%s' is a class property: its default value must be 0 (int zero)"%pk.name())
  if msgs:
    errors.aggregateError('Entity %s'%entity.name(), ERROR, msgs)

  validateEntityAgainstParentEntity(entity, errors)

  # WARNING
  msgs=[]
  if entity.primaryKeyAttributes():
    for pk in entity.primaryKeyAttributes():
      if pk.isClassProperty():
        msgs.append("Primary key '%s' is also marked as ``class property'' "\
                    "--this is strongly discouraged"\
                    %pk.name())
      if not pk.isRequired():
        errors.aggregateError('Entity %s'%entity.name(), WARNING,
                              "Primary key '%s' should be mandatory"%pk.name())
  if msgs:
    errors.aggregateError('Entity %s'%entity.name(), WARNING, msgs)

  # DEBUG
  msgs=[]
  if not entity.typeName(): msgs.append('typeName is not defined')
  if msgs:
    errors.aggregateError('Entity %s'%entity.name(), DEBUG, msgs)


def validateEntityAgainstParentEntity(entity, errors):
  """
  """
  if not entity.parentEntity():
    return
  # verify that all properties defined by the parent (and its parent *only*)
  # are defined within the supplied 'entity'
  msgs=[]
  addErr=msgs.append
  for attr in entity.parentEntity().attributes():
    if not entity.attributeNamed(attr.name()):
      addErr("doesn't define its parent entity's attribute '%s'"%attr.name())
  for rel in entity.parentEntity().relationships():
    if not entity.relationshipNamed(rel.name()):
      addErr("doesn't define its parent entity's relationship '%s'"%rel.name())
  if msgs:
    errors.aggregateError('Entity %s'%entity.name(),ERROR, msgs)


##
## Attribute
##
def validateAttribute(attribute, errors):
  """
  """
  # NOT_SUPPORTED
  msgs=[]
  if attribute.definition():
    msgs.append("definition is set but neither flattened nor derived "\
                "attributes are supported yet")
  if msgs:
    errors.aggregateError('Attribute %s.%s'%(attribute.entity().name(),
                                                attribute.name()),
                          NOT_SUPPORTED, msgs)
    

  # ERROR
  # check SQL TYPE
  msgs=[]
  if not attribute.columnName():
    msgs.append("columnName is not set")
  if not attribute.externalType():
    msgs.append("externalType is not set")
  if not attribute.externalType():
    msgs.append("externalType is not set")
  else:
    concreteAdaptor=adaptorForModel(attribute.entity().model())
    concreteAdaptorName=attribute.entity().model().adaptorName()
    if not concreteAdaptor:
      errors.aggregateError('Attribute %s.%s'%(attribute.entity().name(),
                                               attribute.name()),
                            WARNING,
                            "Can't find concrete adaptor: can't check SQL types")
    else:
      validSQLTypes=concreteAdaptor.expressionClass()().\
                     valueTypeForExternalTypeMapping().keys()
      extType=string.lower(attribute.externalType())
      if extType not in validSQLTypes:
        msgs.append("Invalid SQL type: '%s' (valid for this adaptor: %s)"%\
                    (attribute.externalType(),validSQLTypes))
      else:
        if extType in ('numeric',):
          if not attribute.precision() or not attribute.scale():
            msgs.append("NUMERIC type requires both 'precision' AND 'scale'")
        elif extType in ('varchar', 'char'):
          if not attribute.width():
            if concreteAdaptorName not in ('Postgresql', 'SQLite'):
              msgs.append("CHAR/VARCHAR type requires 'width' to be set")
            else: # INFO
              errors.aggregateError('Attribute %s.%s'%(attribute.entity().name(), attribute.name()), INFO, "ANSI SQL requires that a (VAR)CHAR has its width set (but %s accepts it)"%concreteAdaptorName)
        elif extType in ('integer',):
          if attribute.width() or attribute.width() or attribute.precision():
            msgs.append("INTEGER type does not support width, precision or scale")
      # last, check for postgresql and datetime
      if extType.lower()[:8]=='datetime' and \
         attribute.entity().model().adaptorName()=='Postgresql':
        errors.aggregateError('Attribute %s.%s'%(attribute.entity().name(),
                                                 attribute.name()),
                              WARNING,
                              "DATETIME is not supported anymore in postgresql-server v7.3 and higher")
  if msgs:
    errors.aggregateError('Attribute %s.%s'%(attribute.entity().name(),
                                                attribute.name()),
                          ERROR, msgs)
  
##
## Relationship
##
def validateRelationship(relationship, errors):
  """
  """
  if relationship.isSimple():
    return validateSimpleRelationship(relationship, errors)
  else:
    return validateFlattenedRelationship(relationship, errors)


def validateSimpleRelationship(relationship, errors):
  """
  """
  validateSimpleRelationship_internals(relationship, errors)
  
def validateSimpleRelationship_internals(relationship, errors):
  """
  """
  # NOT_SUPPORTED
  msgs=[]
  errorOnJoin=0
  if relationship.joins() and len(relationship.joins())>1:
    msgs.append('Multiple joins are not supported yet')
    errorOnJoin=1

  invRel=relationship.inverseRelationship()
  if invRel:
    if relationship.isToMany() and invRel.isToMany():
      msgs.append("Many-to-many relationships are not supported yet "\
                  "(involved relationships: this one and its inverse: %s.%s)"\
                  %(invRel.entity().name(), invRel.name()))
    if relationship.isToOne() and invRel.isToOne():
      msgs.append("one-to-one relationships are not supported yet: one of "\
                  "these should be changed to a ``toMany'' relationship "\
                  "(involved relationships: this one and its inverse: %s.%s)"\
                  %(invRel.entity().name(), invRel.name()))
            
  if msgs:
    errors.aggregateError('Relationship %s.%s'%(relationship.entity().name(),
                                                relationship.name()),
                          NOT_SUPPORTED, msgs)

  ## Error
  msgs=[]
  if not relationship.joins():
    errorOnJoin=1
    msgs.append('Relationship has no join')

  if not errorOnJoin:
    srcAttr=relationship.joins()[0].sourceAttribute()
    dstAttr=relationship.joins()[0].destinationAttribute()
    if not srcAttr or not dstAttr:
      msgs.append('Source AND destination attributes should be set')
      errorOnJoin=1

  if not errorOnJoin and relationship.isToOne():
    if not relationship.isMandatory() and srcAttr.isRequired():
      msgs.append("relationship is not mandatory but source attribute "\
                  "%s.%s is required"\
                  %(srcAttr.entity().name(), srcAttr.name()))
  if msgs:
    errors.aggregateError('Relationship %s.%s'%(relationship.entity().name(),
                                                relationship.name()),
                          ERROR, msgs)

  ## Warning
  msgs=[]

  def check_src_dst_attrs(srcAttr, dstAttr, msgs):
    if srcAttr in srcAttr.entity().primaryKeyAttributes():
      msgs.append('Source attribute is a primary key: are you sure?')
    elif srcAttr.isClassProperty():
      msgs.append("Foreign key '%s.%s' is also marked as a class property"\
                  "--this is strongly discouraged"%(srcAttr.entity().name(),
                                                    srcAttr.name()))
    if not dstAttr in dstAttr.entity().primaryKeyAttributes():
      msgs.append('Destination attribute is not a primary key: are you sure?')
    
  if not errorOnJoin:
    if relationship.isToOne():
      check_src_dst_attrs(srcAttr, dstAttr, msgs)
      if relationship.isMandatory() and not srcAttr.isRequired():
        msgs.append("relationship is mandatory  but source attribute "\
                    "%s.%s is not required"\
                    %(srcAttr.entity().name(), srcAttr.name()))
    else:
      check_src_dst_attrs(dstAttr, srcAttr, msgs)



      
  if msgs:
    errors.aggregateError('Relationship %s.%s'%(relationship.entity().name(),
                                                relationship.name()),
                          WARNING, msgs)

def validateFlattenedRelationship(relationship, errors):
  """
  """
  validateFlattenedRelationship_internals(relationship, errors)

  
def validateFlattenedRelationship_internals(relationship, errors):
  """
  """
  errors.aggregateError('Relationship %s.%s'%(relationship.entity().name(),
                                              relationship.name()),
                        NOT_SUPPORTED,
                        'Flattened relationships are not supported yet')

##
## misc.
##
def adaptorForModel(model):
  "Returns an instance of the corresponding adaptor, or None"
  import Adaptor
  from Adaptor import AdaptorImportError
  concreteAdaptor=None
  try:
    concreteAdaptor=Adaptor.adaptorWithName(model.adaptorName())
  except AdaptorImportError: pass
  except ValueError: pass
  return concreteAdaptor
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.