# -*- coding: iso-8859-1 -*-
#-----------------------------------------------------------------------------
# Modeling Framework: an Object-Relational Bridge for python
#
# Copyright (c) 2001-2004 Sbastien Bigaret <sbigaret@users.sourceforge.net>
# All rights reserved.
#
# This file is part of the Modeling Framework.
#
# This code is distributed under a "3-clause BSD"-style license;
# see the LICENSE file for details.
#-----------------------------------------------------------------------------
"""
ModelValidation
Validate a model
CVS information
$Id: ModelValidation.py 945 2004-10-18 20:31:45Z sbigaret $
"""
__version__='$Revision: 945 $'[11:-2]
import string, types
NOT_SUPPORTED=0
ERROR=1
WARNING=2
INFO=3
DEBUG=4
MSGS_FOR_LEVELS={ NOT_SUPPORTED: 'Not supported feature(s)',
ERROR: 'Error(s)',
WARNING: 'Warning(s)',
INFO: 'Info',
DEBUG: 'Debug',
}
LEVELS=tuple(MSGS_FOR_LEVELS.keys())
class ModelValidationException(Exception):
"""
"""
def __init__(self, object=None, level=None, msg=None, ignore_levels=None):
"""
Initializes the exception, possibly with supplied object, level and msg
Parameters:
object -- the object causing the error
level -- valid levels are DEBUG, INFO, WARNING, ERROR, NOT_SUPPORTED,
defined by the module
msg -- informational message
ignore_levels -- list of level to ignore. Default is (DEBUG,)
"""
# Ignore levels
if not ignore_levels:
self._ignored_levels=(DEBUG,)
else:
if type(ignore_levels) not in (type(()), type([])):
ignore_levels=(ignore_levels,)
import operator
if not reduce(operator.__and__,map(lambda l: l in LEVELS,ignore_levels)):
raise ValueError, 'Parameter ignored_levels has invalid level(s)'
self._ignored_levels=ignore_levels
#
if object is not None:
if level is None or msg is None:
raise ValueError,'level AND msg cannot be None when object is not None'
if level not in LEVELS:
raise ValueError, 'Invalid level'
if level not in self._ignored_levels:
self._dict={object: {level: [msg]}}
else:
self._dict={}
def aggregateError(self, object, level, msgs):
"""
Adds the supplied error type in the exception's dictionary
"""
if level in self._ignored_levels:
return
if type(msgs)!=types.ListType:
msgs=[msgs]
obj_errs=self._dict.get(object, None)
if obj_errs:
level_errs=obj_errs.get( level, [] )
level_errs.extend(msgs)
obj_errs[level]=level_errs # in case it was created
else:
self._dict[object]={level: msgs}
def aggregateException(self, aModelValidationError):
"""
Concatenates the supplied ModelValidationException to the current
one.
"""
errors=aModelValidationError.errorsDict()
for object in errors.keys():
for level in errors[object].keys():
if level not in self._ignored_levels:
self.aggregateError(object, level, errors[object][level])
def errorObjects(self):
"Returns the list of objects for which errors were detected"
return self._dict.keys()
def errorsDict(self):
"Returns the error dictionary"
return self._dict
errors=errorsDict
def has_errors(self, for_level=None):
"""
"""
if not for_level:
return not not self._dict
if for_level in self._ignored_levels:
return 0
for object in self._dict.keys():
for level in self._dict[object].keys():
if level==for_level:
return 1
return 0
def levels_of_errors(self):
"""
"""
_res=[]
for object in self._dict.keys():
for level in self._dict[object].keys():
if level not in _res: _res.append(level)
return _res
def __str__(self):
"Returns a string representation of the exception"
_str=""
for object in self._dict.keys():
line="Object: %s\n"%object
_str+=line+(len(line)-1)*'-'+'\n'
levels=self._dict[object].keys()
levels.sort()
for level in levels:
_str=_str+" * %s:\n"%MSGS_FOR_LEVELS[level]
for msg in self._dict[object][level]:
_str=_str+" - %s\n"%msg
_str+='\n'
return _str
def __call__(self):
"Returns the receiver's dictionary"
return self._dict
def finalize(self):
"""
Finalize a validation process. If the exception has a non-empty errorsDict
it raises, otherwise simply return
"""
if self._dict: raise self
return
##############################################################################
##
## Model
##
def validateModel(model, errors):
"""
"""
validateModel_internals(model, errors)
for entity in model.entities():
validateEntity(entity, errors)
def validateModel_internals(model, errors):
"""
"""
# ERROR
msgs=[]
if not model.adaptorName():
msgs.append('adaptorName is not defined')
if not model.packageName():
msgs.append('packageName is not defined')
if msgs:
errors.aggregateError('Model %s'%model.name(), ERROR, msgs)
# WARNING
msgs=[]
if model.adaptorName():
if model.adaptorName() not in ('Postgresql', 'MySQL', 'SQLite'):
msgs.append("adaptorName '%s' is not one of the framework's, maybe one"\
" of yours"%model.adaptorName())
# try to find the concrete adaptor
concreteAdaptor=adaptorForModel(model)
if not concreteAdaptor:
msgs.append("Couldnt find concrete Adaptor for '%s'"%model.adaptorName())
del concreteAdaptor
if msgs:
errors.aggregateError('Model %s'%model.name(), WARNING, msgs)
##
## Entity
##
def validateEntity(entity, errors):
"""
"""
validateEntity_internals(entity, errors)
for attr in entity.attributes():
validateAttribute(attr, errors)
for rel in entity.relationships():
validateRelationship(rel, errors)
def validateEntity_internals(entity, errors):
"""
"""
# NOT SUPPORTED
msgs=[]
if entity.isReadOnly():
msgs.append('Read-only entities are not supported yet')
if len(entity.primaryKeyAttributes())>1:
msgs.append('Compound primary key are not supported yet')
if entity.isAbstract():
msgs.append('Abstract entities are not supported yet')
if msgs:
errors.aggregateError('Entity %s'%entity.name(), NOT_SUPPORTED, msgs)
# TBD 'should ``else:'' when abstract entities are supported
# ERROR
msgs=[]
if not entity.moduleName(): msgs.append('module is not defined')
if not entity.className(): msgs.append('className is not defined')
if not entity.externalName(): msgs.append('externalName is not defined')
if not entity.primaryKeyAttributes(): msgs.append('no primary key defined')
else:
for pk in entity.primaryKeyAttributes():
if pk.isClassProperty() and pk.defaultValue()!=0:
msgs.append("PK '%s' is a class property: its default value must be 0 (int zero)"%pk.name())
if msgs:
errors.aggregateError('Entity %s'%entity.name(), ERROR, msgs)
validateEntityAgainstParentEntity(entity, errors)
# WARNING
msgs=[]
if entity.primaryKeyAttributes():
for pk in entity.primaryKeyAttributes():
if pk.isClassProperty():
msgs.append("Primary key '%s' is also marked as ``class property'' "\
"--this is strongly discouraged"\
%pk.name())
if not pk.isRequired():
errors.aggregateError('Entity %s'%entity.name(), WARNING,
"Primary key '%s' should be mandatory"%pk.name())
if msgs:
errors.aggregateError('Entity %s'%entity.name(), WARNING, msgs)
# DEBUG
msgs=[]
if not entity.typeName(): msgs.append('typeName is not defined')
if msgs:
errors.aggregateError('Entity %s'%entity.name(), DEBUG, msgs)
def validateEntityAgainstParentEntity(entity, errors):
"""
"""
if not entity.parentEntity():
return
# verify that all properties defined by the parent (and its parent *only*)
# are defined within the supplied 'entity'
msgs=[]
addErr=msgs.append
for attr in entity.parentEntity().attributes():
if not entity.attributeNamed(attr.name()):
addErr("doesn't define its parent entity's attribute '%s'"%attr.name())
for rel in entity.parentEntity().relationships():
if not entity.relationshipNamed(rel.name()):
addErr("doesn't define its parent entity's relationship '%s'"%rel.name())
if msgs:
errors.aggregateError('Entity %s'%entity.name(),ERROR, msgs)
##
## Attribute
##
def validateAttribute(attribute, errors):
"""
"""
# NOT_SUPPORTED
msgs=[]
if attribute.definition():
msgs.append("definition is set but neither flattened nor derived "\
"attributes are supported yet")
if msgs:
errors.aggregateError('Attribute %s.%s'%(attribute.entity().name(),
attribute.name()),
NOT_SUPPORTED, msgs)
# ERROR
# check SQL TYPE
msgs=[]
if not attribute.columnName():
msgs.append("columnName is not set")
if not attribute.externalType():
msgs.append("externalType is not set")
if not attribute.externalType():
msgs.append("externalType is not set")
else:
concreteAdaptor=adaptorForModel(attribute.entity().model())
concreteAdaptorName=attribute.entity().model().adaptorName()
if not concreteAdaptor:
errors.aggregateError('Attribute %s.%s'%(attribute.entity().name(),
attribute.name()),
WARNING,
"Can't find concrete adaptor: can't check SQL types")
else:
validSQLTypes=concreteAdaptor.expressionClass()().\
valueTypeForExternalTypeMapping().keys()
extType=string.lower(attribute.externalType())
if extType not in validSQLTypes:
msgs.append("Invalid SQL type: '%s' (valid for this adaptor: %s)"%\
(attribute.externalType(),validSQLTypes))
else:
if extType in ('numeric',):
if not attribute.precision() or not attribute.scale():
msgs.append("NUMERIC type requires both 'precision' AND 'scale'")
elif extType in ('varchar', 'char'):
if not attribute.width():
if concreteAdaptorName not in ('Postgresql', 'SQLite'):
msgs.append("CHAR/VARCHAR type requires 'width' to be set")
else: # INFO
errors.aggregateError('Attribute %s.%s'%(attribute.entity().name(), attribute.name()), INFO, "ANSI SQL requires that a (VAR)CHAR has its width set (but %s accepts it)"%concreteAdaptorName)
elif extType in ('integer',):
if attribute.width() or attribute.width() or attribute.precision():
msgs.append("INTEGER type does not support width, precision or scale")
# last, check for postgresql and datetime
if extType.lower()[:8]=='datetime' and \
attribute.entity().model().adaptorName()=='Postgresql':
errors.aggregateError('Attribute %s.%s'%(attribute.entity().name(),
attribute.name()),
WARNING,
"DATETIME is not supported anymore in postgresql-server v7.3 and higher")
if msgs:
errors.aggregateError('Attribute %s.%s'%(attribute.entity().name(),
attribute.name()),
ERROR, msgs)
##
## Relationship
##
def validateRelationship(relationship, errors):
"""
"""
if relationship.isSimple():
return validateSimpleRelationship(relationship, errors)
else:
return validateFlattenedRelationship(relationship, errors)
def validateSimpleRelationship(relationship, errors):
"""
"""
validateSimpleRelationship_internals(relationship, errors)
def validateSimpleRelationship_internals(relationship, errors):
"""
"""
# NOT_SUPPORTED
msgs=[]
errorOnJoin=0
if relationship.joins() and len(relationship.joins())>1:
msgs.append('Multiple joins are not supported yet')
errorOnJoin=1
invRel=relationship.inverseRelationship()
if invRel:
if relationship.isToMany() and invRel.isToMany():
msgs.append("Many-to-many relationships are not supported yet "\
"(involved relationships: this one and its inverse: %s.%s)"\
%(invRel.entity().name(), invRel.name()))
if relationship.isToOne() and invRel.isToOne():
msgs.append("one-to-one relationships are not supported yet: one of "\
"these should be changed to a ``toMany'' relationship "\
"(involved relationships: this one and its inverse: %s.%s)"\
%(invRel.entity().name(), invRel.name()))
if msgs:
errors.aggregateError('Relationship %s.%s'%(relationship.entity().name(),
relationship.name()),
NOT_SUPPORTED, msgs)
## Error
msgs=[]
if not relationship.joins():
errorOnJoin=1
msgs.append('Relationship has no join')
if not errorOnJoin:
srcAttr=relationship.joins()[0].sourceAttribute()
dstAttr=relationship.joins()[0].destinationAttribute()
if not srcAttr or not dstAttr:
msgs.append('Source AND destination attributes should be set')
errorOnJoin=1
if not errorOnJoin and relationship.isToOne():
if not relationship.isMandatory() and srcAttr.isRequired():
msgs.append("relationship is not mandatory but source attribute "\
"%s.%s is required"\
%(srcAttr.entity().name(), srcAttr.name()))
if msgs:
errors.aggregateError('Relationship %s.%s'%(relationship.entity().name(),
relationship.name()),
ERROR, msgs)
## Warning
msgs=[]
def check_src_dst_attrs(srcAttr, dstAttr, msgs):
if srcAttr in srcAttr.entity().primaryKeyAttributes():
msgs.append('Source attribute is a primary key: are you sure?')
elif srcAttr.isClassProperty():
msgs.append("Foreign key '%s.%s' is also marked as a class property"\
"--this is strongly discouraged"%(srcAttr.entity().name(),
srcAttr.name()))
if not dstAttr in dstAttr.entity().primaryKeyAttributes():
msgs.append('Destination attribute is not a primary key: are you sure?')
if not errorOnJoin:
if relationship.isToOne():
check_src_dst_attrs(srcAttr, dstAttr, msgs)
if relationship.isMandatory() and not srcAttr.isRequired():
msgs.append("relationship is mandatory but source attribute "\
"%s.%s is not required"\
%(srcAttr.entity().name(), srcAttr.name()))
else:
check_src_dst_attrs(dstAttr, srcAttr, msgs)
if msgs:
errors.aggregateError('Relationship %s.%s'%(relationship.entity().name(),
relationship.name()),
WARNING, msgs)
def validateFlattenedRelationship(relationship, errors):
"""
"""
validateFlattenedRelationship_internals(relationship, errors)
def validateFlattenedRelationship_internals(relationship, errors):
"""
"""
errors.aggregateError('Relationship %s.%s'%(relationship.entity().name(),
relationship.name()),
NOT_SUPPORTED,
'Flattened relationships are not supported yet')
##
## misc.
##
def adaptorForModel(model):
"Returns an instance of the corresponding adaptor, or None"
import Adaptor
from Adaptor import AdaptorImportError
concreteAdaptor=None
try:
concreteAdaptor=Adaptor.adaptorWithName(model.adaptorName())
except AdaptorImportError: pass
except ValueError: pass
return concreteAdaptor
|