# -*- coding: iso-8859-1 -*-
#-----------------------------------------------------------------------------
# Modeling Framework: an Object-Relational Bridge for python
#
# Copyright (c) 2001-2004 Sbastien Bigaret <sbigaret@users.sourceforge.net>
# All rights reserved.
#
# This file is part of the Modeling Framework.
#
# This code is distributed under a "3-clause BSD"-style license;
# see the LICENSE file for details.
#-----------------------------------------------------------------------------
""" ... describe me please I feel alone...
$Id: Relationship.py 972 2006-02-26 01:27:05Z sbigaret $"""
__version__='$Revision: 972 $'[11:-2]
# Modeling
from Modeling.Join import Join
from Modeling.utils import isaValidName,toBoolean
from Modeling.KeyValueCoding import KeyValueCoding
from Modeling.XMLutils import *
from Modeling import Validation
# Interfaces
from Modeling.interfaces.Relationship import RelationshipInterface
from Modeling.interfaces.XMLCapability import XMLCapabilityInterface
# python
import string, types
# Join semantic: for later use
INNER_JOIN = 0
FULL_OUTER_JOIN = 1
LEFT_OUTER_JOIN = 2
RIGHT_OUTER_JOIN = 3
# Relationships props
from Modeling.ClassDescription import DELETE_NULLIFY,DELETE_DENY,DELETE_CASCADE,DELETE_NOACTION,old_delete_rules
from Modeling.utils import isListOrTuple,base_persistent_object
class RelationshipError(Exception):
"..."
pass
class Relationship(base_persistent_object, XMLCapability, KeyValueCoding):
"See interfaces.Relationship for detail"
#
_isClassProperty=1
_comment=''
__implements__=(RelationshipInterface, XMLCapabilityInterface)
def __init__(self, aName=''):
if not isaValidName(aName):
raise ValueError, "A valid name must be provided at creation time."
self._deleteRule = DELETE_NULLIFY
self._displayLabel = ''
self._comment = ''
self._entity = None
self._isClassProperty = 1
self._multLower = 0
self._multUpper = 1 # Use -1 for 'n' (no upper bound)
self._name = aName
return
def comment(self):
"Returns the comment field"
return self._comment
def entity(self):
"Return the relationship's source entity"
return self._entity
sourceEntity = entity # Compatibility w/ old API, _TBD should be removed
def deleteRule(self):
"""
Returns the delete rule applicable to this relationship. Possible values
are module's integer constants DELETE_CASCADE, DELETE_NULLIFY and
DELETE_DENY
"""
return self._deleteRule
def displayLabel(self):
"Returns the display label for the relationship"
return self._displayLabel
def isClassProperty(self):
"Indicates whether the attribute belongs to the class properties/fields"
return self._isClassProperty
def isMandatory(self):
"""
Indicates whether the relationship requires at least a destination
entity to be bound. This is a computed attribute, equivalent to:
'self.multiplicityLowerBound()>0'.
"""
return self._multLower>0
def isNotClassProperty(self):
"negation of isClassProperty"
return not self.isClassProperty()
def isToMany(self):
"""
Indicates whether the relationship has a maximum cardinality > 1.
This is the contrary of method isToOne.
"""
return self._multUpper > 1 or self._multUpper == -1
def isToOne(self):
"""
Indicates whether the relationship has a maximim cardinality = 1.
This is the contrary of method isToMany.
"""
return not self.isToMany()
def multiplicityLowerBound(self):
"Returns the lower bound of the relationship's multiplicity"
return self._multLower
def multiplicityUpperBound(self):
"""
Returns the upper bound of the relationship's multiplicity.
Returned value is a strictly positive integer, or -1 if there is no upper
bound.
"""
return self._multUpper
def name(self):
"Returns the relationship's name"
return self._name
def setComment(self, aComment):
"Sets the comment field"
self._comment=aComment
def setDeleteRule(self, rule):
"""
Sets the delete rule. Accepted values are module's constants:
DELETE_NULLIFY, DELETE_CASCADE and DELETE_DENY
"""
#backward compatibility
if type(rule) in (type(0), type(0L), type(0.0)) or\
rule in ('0', '1', '2', '3'):
rule=old_delete_rules.get(int(rule), rule)
if rule not in (DELETE_NULLIFY, DELETE_CASCADE, DELETE_DENY):
raise ValueError, \
("Parameter 'rule' (%s) should be one of DELETE_NULLIFY (%s), "+\
"DELETE_CASCADE (%s) or DELETE_DENY (%s)") \
% (rule, DELETE_NULLIFY, DELETE_CASCADE, DELETE_DENY)
self._deleteRule=rule
def setDisplayLabel(self, aLabel):
"Sets the display label (string) for the relationship"
self._displayLabel=aLabel
def setIsClassProperty(self, aBool):
"Tells the receiver whether it belongs to the class properties/fields"
self._isClassProperty = toBoolean(aBool)
def setIsMandatory(self, aBool):
"""
Tells the relationship whether it is mandatory. This method has the
following side-effects: if set to 'true' and the multiplicity lower bound
was '0' (zero) it is set to '1' (one) ; if set to 'false', the multiplicity
lower bound is set to 'zero'.
"""
if toBoolean(aBool):
if self._multLower<1:
self._multLower=1
else:
self._multLower=0
def setMultiplicityLowerBound(self, lowerBound):
"""
Sets the lower bound of the relationship's multiplicity
Parameter lowerBound must be an positive integer.
"""
assert(int(lowerBound)>=0)
self._multLower=int(lowerBound)
def setMultiplicityUpperBound(self, upperBound):
"""
Sets the upper bound of the relationship's multiplicity.
Parameter:
upperBound -- must be a strictly positive integer, or -1 for a non
constrained to-many relationship. Special values '*'
and None are equivalent to -1.
"""
if upperBound in ('*', None): upperBound=-1
assert(int(upperBound)>0 or int(upperBound)==-1)
self._multUpper=int(upperBound)
def setMultiplicity(self, lowerBound, upperBound):
"""
Sets the lower and upper bounds for the relationship's multiplicity.
See also: setMultiplicityLowerBound, setMultiplicityUpperBound.
"""
self.setMultiplicityLowerBound(lowerBound)
self.setMultiplicityUpperBound(upperBound)
def setName(self, aName):
"Sets the relationship's name"
if not isaValidName(aName):
raise ValueError, "Invalid name"
oldName=self._name
self._name = aName
if self._entity:
self._entity.propertyNameDidChange(oldName)
def setEntity(self, anEntity):
"Sets the source entity"
#assert anEntity is None or (type(anEntity)==types.InstanceType and anEntity.__class__ is Entity)
self._entity = anEntity
_setSourceEntity = setEntity #Compatibility w/ old API,_TBD should be removed
# special
def __ne__(self, aRelationship):
# See test_Relationship.test_00_equality_n_inequality() for details
return not self.__eq__(aRelationship)
import sys
if sys.version_info < (2,3):
def __cmp__(self, r):
return not self.__eq__(r)
# XMLCapabilityInterface
def initWithXMLDOMNode(self, aNode, encoding='iso-8859-1'):
"""
Initializes a relationship with the supplied xml.dom.node.
"""
k_v=aNode.attributes.items()
for attributeName, value in k_v:
# Iterate on attributes declared in node
attrType=self.xmlAttributeType(attributeName)
set=self.xmlSetAttribute(attributeName)
if attrType=='string': value=unicodeToStr(value, encoding)
if attrType=='number': value=int(value)
elif attrType=='bool': value=int(value)
set(value)
return
def getXMLDOM(self, doc=None, parentNode=None, encoding='iso-8859-1'):
"""
Returns the (DOM) DocumentObject for the receiver.
Parameters 'doc' and 'parentDoc' should be both omitted or supplied.
If they are omitted, a new DocumentObject is created.
If they are supplied, elements are added to the parentNode.
Returns: the (possibly new) DocumentObject.
"""
if (doc is None) ^ (parentNode is None):
raise ValueError, "Parameters 'doc' and 'parentNode' should be together supplied or omitted"
if doc is None:
doc=createDOMDocumentObject(self.getXMLNodeName())
parentNode=doc.documentElement
node=parentNode
else:
node=doc.createElement(self.getXMLNodeName())
parentNode.appendChild(node)
#
exportAttrDict=self.xmlAttributesDict()
for attr in exportAttrDict.keys():
attrType=self.xmlAttributeType(attr)
value=self.xmlGetAttribute(attr)()
if attrType=='bool': value=int(value)
value=strToUnicode(str(value), encoding)
node.setAttribute(attr, value)
return (doc, node)
def getXMLNodeName(self):
"See interfaces.XMLCapability for details"
raise "Unimplemented", "Should be overriden in concrete subclass"
def xmlAttributesDict(self):
"-"
return {
'comment' : ('string',
self.setComment,
self.comment),
'name' : ('string',
lambda self=None,p=None: None,
self.name),
'deleteRule' : ('string',
self.setDeleteRule,
self.deleteRule),
'displayLabel' : ('string',
self.setDisplayLabel,
self.displayLabel),
'isClassProperty' : ('bool',
self.setIsClassProperty,
self.isClassProperty),
'multiplicityLowerBound': ('number',
self.setMultiplicityLowerBound,
self.multiplicityLowerBound),
'multiplicityUpperBound': ('number',
self.setMultiplicityUpperBound,
self.multiplicityUpperBound)
}
class SimpleRelationship(Relationship):
"""
Describes a relationship
Features added to the EOF standard API:
- cardinality of the relationship
"""
__implements__=(RelationshipInterface, XMLCapabilityInterface)
def __init__(self, aName=''):
"""
Initializes a relationship. A name **must** be provided.
Default value is a non-flattened, to-one relationship with no joins,
deleteRule: Nullify, joinSemantic: INNER_JOIN,
"""
Relationship.__init__.im_func(self, aName)
self._joins = ()
self._joinSemantic = INNER_JOIN # [later]
self._propagatesPK = 0
self._ownsDestination = 0
return
def raiseUnappropriate(self):
"-"
raise "Nonsense", "Unappropriate for a simple relationship"
# Check
def checkRelationshipValidity(self):
"""
Checks the relationship validity. This includes the following tests:
- cardinality > 0
- len(joins) > 0
- name
- source and destinationEntity are not None
- joins' sources are identical (idem for destinations)
Raises the relation is invalid.
"""
# multiplicity bounds cannot be wrong, thus they do not need to be checked
pass
def addJoin(self, aJoin):
"See interfaces.Relationship for details"
# aJoin's sourceAttribute must belong to the same entity...
if aJoin.sourceAttribute().entity()!=self.entity():
raise ValueError, \
"Cannot add a join whose entity is not the same as the receiver's"
# aJoin's source or attribute shouldn't be already registered
if aJoin.sourceAttribute() in self.sourceAttributes() or \
aJoin.destinationAttribute() in self.destinationAttributes():
raise ValueError, \
"Cannot add a join where either its source or destination is already registered by an other relationship's join"
# Last, aJoin's destinationEntity should be the same as previous ones
if self.destinationEntity():
if aJoin.destinationAttribute().entity()!=self.destinationEntity():
raise ValueError, "Cannot add join: destinationAttribute's entity is different from self.destinationEntity ('%s')" % (self.destinationEntity(),)
_joins=list(self._joins)
_joins.append(aJoin)
self._joins=tuple(_joins)
def anyInverseRelationship(self):
"""
If self.inverseRelationship() exists, return that one, otherwise build
and return an inverse relationship for self
See also: inverseRelationship()
"""
inverse=self.inverseRelationship()
if inverse:
return inverse
# we need to make one
parents=self.entity().parentEntities()
parents.reverse() # from root to leaf (self)
parents.append(self.entity())
rel_to_inverse=None
for parent in parents:
parent_rel=parent.relationshipNamed(self.name())
if parent_rel:
rel_to_inverse=parent_rel
break
#inverse_name='inverse_for_%s_%s'%(self.entity().name(), self.name())
inverse_name='inverse_for_%s_%s'%(parent_rel.entity().name(), self.name())
inverse_rel=SimpleRelationship(inverse_name)
if self.isToOne():
inverse_rel.setMultiplicityUpperBound(-1)
else:
inverse_rel.setMultiplicityUpperBound(1)
inverse_rel.setEntity(self.destinationEntity())
for join in rel_to_inverse.joins():
src=self.destinationEntity().attributeNamed(join.destinationAttribute().name())
dst=rel_to_inverse.entity().attributeNamed(join.sourceAttribute().name())
inv_join=Join(src, dst)
inverse_rel.addJoin(inv_join)
return inverse_rel
def clone(self, entity, name=None):
"""
Returns a copy of the current Relationship
Parameter:
entity -- the entity in which the relationship should be inserted
name -- optional. If provided the clone gets that name, otherwise
defaults to 'self.name()'.
See also: Entity.clone()
"""
clone=SimpleRelationship(name or self.name())
entity.addRelationship(clone)
# the mechanism used here relies on functionalities decl. in XMLCapability
for prop in self.xmlAttributesDict().keys():
clone.xmlSetAttribute(prop)(self.xmlGetAttribute(prop)())
for join in self.joins():
srcName=join.sourceAttributeName()
src=entity.attributeNamed(srcName)
if not src:
raise RuntimeError, 'Cannot find src attribute %s in entity %s'%(srcName, entity.name())
_j=Join(src, join.destinationAttribute())
clone.addJoin(_j)
return clone
def componentRelationships(self):
"See interfaces.Relationship for details"
return None
def definition(self):
"See interfaces.Relationship for details"
return None
def destinationAttributes(self):
"""
Simply returns the list of joins' destination attributes
"""
return map(lambda o:o.destinationAttribute(), self._joins)
def destinationAttributeForSourceAttribute(self, attribute):
"""
Returns the corresponding destination Attribute for 'attribute', i.e. the
destinationAttribute of the relationship's join which has 'attribute' as
the source attribute.
Raises ValueError if 'attribute' is not a source attribute of any join
defined in the Relationship.
"""
return self.destinationAttributes()[self.sourceAttributes().index(attribute)]
def destinationEntity(self):
"""
Returns the destination entity, or 'None' if the relationship has no joins
yet
"""
if not self.joins(): return None
else: return self.joins()[0].destinationAttribute().entity()
def destinationEntityName(self):
"Return the destination entity, or None"
try:
return self.destinationEntity().name()
except AttributeError:
return None
def inverseRelationship(self):
"""
Returns the inverse relationship in the destination entity,
or None if it does not exist
Note: the inverseRelationship() is not idempotent,
i.e. rel.inverseRelationship().inverseRelationship() does not necessarily
returns self. It returns self if and only if the relationship's entity has
no parent entity, or if the relationship is not an inherited property
(none of the parent entities defines the relationship).
See also: anyInverseRelationship()
"""
#import pdb ; pdb.set_trace()
search_in_entities=[]
# Search all possible relationships, i.e. relationships in the
# destinationEntity() pointing to self.entity() or to one of its
# parent entities:
src_entities=self.entity().parentEntities()
src_entities.insert(0, self.entity())
sameDestination=lambda rel, src=src_entities: rel.destinationEntity() in src
#dest_entities=self.destinationEntity().parentEntities()
dest_entities=[]
dest_entities.insert(0, self.destinationEntity())
possibleInverse=[]
for destEntity in dest_entities:
l=filter(sameDestination, destEntity.relationships())
possibleInverse.extend(l)
if not possibleInverse: return None
# Check all possible relationships
#import pdb ; pdb.set_trace()
# if you can't find an inverse on self.entity, we'd probably find one
# in parent entities:
#
# Note: the following makes it possible to compute an inverseRelationship
# for a relationship built by anyInverseRelationship()
# The trick here is that such a relion is NOT added to
# self.relationships(), so we need to explicitly add 'self'
# instead of relying on self.entity().relationshipNamed() which in
# this case returns None.
self_rels=[self]
self_rels.extend([e.relationshipNamed(self.name()) for e in src_entities
if e != self])
for self_rel in self_rels:
if self_rel is None: continue
for inverseRel in possibleInverse:
_self_joins=list(self_rel.joins())
_inverse_joins=list(inverseRel.joins())
if len(_self_joins)!=len(_inverse_joins): continue
isInverse=1
index=0
while _inverse_joins:
#for index in range(0, len(_inverse_joins)):
# Test each join of the relation against the set of joins in self
reciprocical=map(lambda j, join=_inverse_joins[index]: \
j.isReciprocicalToJoin(join),
_self_joins)
isInverse=reduce(lambda a,b:a+b, reciprocical)
if not isInverse: break # cannot be reciprocical
del _self_joins[reciprocical.index(1)]
del _inverse_joins[index]
index+=1
if isInverse:
return inverseRel
return None
#if not self.destinationEntity(): return None
#_inverseRel=None
#for _rel in self.destinationEntity().relationships():
# if _rel.destinationEntity() == self.entity():
# # Note: using 'is' here instead of '==' misleads the function
# _inverseRel=_rel
# break
#return _inverseRel
def isCompound(self):
"""
Simply indicates whether the relationship has one or more joins
Returns true iff relationship has one join, false otherwise.
Raises if relationship has no joins.
"""
if not self._joins: raise "Nonsense: relationship has no join"
else:
return (len(self._joins)>1)
def isFlattened(self):
"See interfaces.Relationship for details"
return 0
def isSimple(self):
"""
Indicates whether the relation holds one join.
Note that this method return also 'true' when the relationship
is invalid (i.e. it holds no joins)
"""
return not (self._joins and len(self._joins)>1)
def joins(self):
"Return the whole set of joins registered in this relationship"
return self._joins
def joinSemantic(self):
"""
Returns the join semantic to be used for the relationship's joins
Return value is one of the module's constant INNER_JOIN, FULL_OUTER_JOIN,
RIGHT_OUTER_JOIN, LEFT_OUTER_JOIN
"""
return self._joinSemantic
def ownsDestination(self): # strict equival. toUML aggregate ?
"""
Returns true if the relationship owns the destinationEntity.
See also: setOwnsDestination()
When a relationship owns its destination, the related entity object
cannot exist without the source object. Thus, as a side-effect, this
sets the delete rule to 'cascade'.
"""
return self._ownsDestination
def propagatesPrimaryKey(self):
"""
Returns true if the relationship propagates PK value, false otherwise
_TBD should be explained
"""
return self._propagatesPK
def removeJoin(self, aJoin):
"""
Removes the supplied join from the relation's join.
Parameter 'aJoin' must be a 'Join' instance.
Raises 'ValueError' if 'aJoin' cannot be found.
"""
#assert((type(aJoin)==types.StringType) or
# (type(aJoin)==types.InstanceType and aJoin.__class__ is Join))
try:
_joins = list(self._joins)
_joins.remove(aJoin)
self._joins=tuple(_joins)
except ValueError:
raise ValueError, "Relationship '%s' has such join" % (self.name(),)
#aJoin._setRelationship(None)
def setDefinition(self, definition):
"See interfaces.Relationship for details"
self.raiseUnappropriate()
def setJoinSemantic(self, semantic):
"""
Sets the join semantic for the underlying joins.
Parameter:
semantic -- one of the module's constant INNER_JOIN, FULL_OUTER_JOIN,
RIGHT_OUTER_JOIN, LEFT_OUTER_JOIN
"""
self._joinSemantic=int(semantic)
def setOwnsDestination(self, boolean):
"""
Tells the receiver whether it owns its destination objects.
When a relationship owns its destination, the related object(s)
cannot exist without the source object.
"""
if boolean:
self._ownsDestination=1
else:
self._ownsDestination=0
def setPropagatesPrimaryKey(self, aBool):
"""
Tells the relationship whether it propagates PK value
_TBD should be explained
"""
if aBool:
self._propagatesPK=1
else:
self._propagatesPK=0
def sourceAttributes(self):
"""
Simply returns the list of joins' source attributes
"""
return map(lambda o:o.sourceAttribute(), self._joins)
def validateRelationship(self):
"""Validates the relationship against general model consistency:
- relationship should have a source and a destination entity
-
"""
raise NotImplementedError
def validateValue(self, value, object=None):
"""
Checks whether the supplied value is valid, i.e. the following tests are
made:
0. if value is a fault (returns true for isFault()), it immediately
returns. This means that a faulted object is NOT triggered/its values
are not fetched for validation purpose: a fault is considered to be
okay wrt. to the validation rules.
1. value cannot be *void* ('value is None' is true) if the relationship
is mandatory
2. 'value' matches the multiplicity bounds
3. Value's entity is the relationship's destination entity (same for
'value' 's items if 'value' is a tuple)
Parameter 'value' is required, and 'object' is optional --it is
only used to make the error message clearer in case of failure.
Silently returns if it succeeds.
In case of failure, it raises 'Modeling.Validation.ValidationException' ;
in this case, exception's *value* ('sys.exc_info[1]') consists in a set
of lines, describing line per line the different tests which failed.
"""
try:
if value.isFault():
return
except AttributeError: pass # isFault does not exist
except TypeError: pass # isFault is not callable
_error=Validation.ValidationException()
# isMandatory?
if (not value) and self.isMandatory():
_error.addErrorForKey(Validation.REQUIRED, self.name())
raise _error # stop here!
if (not value): return
# correct length/multiplicity?
if not isListOrTuple(value): _values=[value,]
else: _values=list(value)
if len(_values)<self.multiplicityLowerBound():
_error.addErrorForKey(Validation.LOWER_BOUND, self.name())
if self.multiplicityUpperBound()!=-1 and \
len(_values)>self.multiplicityUpperBound():
_error.addErrorForKey(Validation.UPPER_BOUND, self.name())
# values refer to the correct entity?
_values2=list(_values) # _TBD do we need a copy here? (cf.'for' statement)
validNames=[self.destinationEntity().name()]
validNames+=self.destinationEntity().allSubEntitiesNames()
for _value in _values:
try:
if _value.isFault(): continue
if _value.entityName() not in validNames:
_error.addErrorForKey(Validation.TYPE_MISMATCH, self.name())
# remove this value from the list since following tests are
# inaccurate for such a value
_values2.remove(_value)
except AttributeError:
_error.addErrorForKey(Validation.MODEL_ERROR, self.name())
# Note: continue additional checks with remaining _values
# Note2: you're not blind, there's actually no such additional checks for
# the moment being.
_values = _values2
_error.finalize()
# special
def __eq__(self, aRelationship):
"Tests whether both relationship are equal"
if aRelationship is None:
return 0
try:
# _TBD define __eq__ for Entity
if aRelationship.isFlattened():
return 0
if self.name()!=aRelationship.name() or \
self.deleteRule()!=aRelationship.deleteRule() or \
self.definition()!=aRelationship.definition() or \
self.displayLabel()!=aRelationship.displayLabel() or \
self.entity().name()!=aRelationship.entity().name() or \
self.isFlattened()!=aRelationship.isFlattened() or \
self.isMandatory()!=aRelationship.isMandatory() or \
self.isSimple()!=aRelationship.isSimple() or \
self.destinationEntity().name()!=aRelationship.destinationEntity().name() or \
self.multiplicityLowerBound()!=aRelationship.multiplicityLowerBound() or \
self.multiplicityUpperBound()!=aRelationship.multiplicityUpperBound() or \
self.propagatesPrimaryKey()!=aRelationship.propagatesPrimaryKey() or \
self.ownsDestination()!=aRelationship.ownsDestination():
return 0
# Test joins set!!
if len(self._joins)!=len(aRelationship.joins()): return 0
for aJoin in aRelationship.joins():
if aJoin not in self._joins: return 0
except: #AttributeError
return 0
return 1
def __str__(self):
"Returns a short representation of the relationship"
if self.entity():
return "%s.%s"%(self.entity().name(), self.name())
else:
return "%s.%s"%('<Not bound to an entity>', self.name())
# XML
def initWithXMLDOMNode(self, aNode, encoding='iso-8859-1'):
"""
Initializes a relationship with the supplied xml.dom.node.
"""
#_attrDict=self.xmlAttributesDict()
#_attrNode=aNode.attributes
#for attributeName in [attr.name for attr in aNode.attributes]:
# # Iterate on attributes declared in node
# attrType=self.xmlAttributeType(attributeName)
# set=self.xmlSetAttribute(attributeName)
# value=xpath.Evaluate(attrType+'(@'+attributeName+')', contextNode=aNode)
# if attrType=='string': value=unicodeToStr(value, encoding)
# set(value)
Relationship.initWithXMLDOMNode.im_func(self, aNode, encoding)
# Now initializes joins
_destEntity=xpath.Evaluate('string(@destinationEntity)', contextNode=aNode)
destEntity=self.entity().model().entityNamed(unicodeToStr(_destEntity))
if destEntity is None:
raise XMLImportError, \
"Couldnt find destinationEntity '%s' for relationship %s.%s"%(_destEntity, self.entity().name(), self.name())
joinsNodes=xpath.Evaluate('join', contextNode=aNode)
for joinNode in joinsNodes:
src=xpath.Evaluate('string(@sourceAttribute)', contextNode=joinNode)
dst=xpath.Evaluate('string(@destinationAttribute)', contextNode=joinNode)
srcE=self.entity().attributeNamed(unicodeToStr(src))
if srcE is None:
raise XMLImportError, \
"Couldnt find entity '%s' for relationship %s.%s"%(src, self.entity().name(), self.name())
dstE=destEntity.attributeNamed(unicodeToStr(dst))
if dstE is None:
raise XMLImportError, \
"Couldnt find entity '%s' for relationship %s.%s"%(src, self.entity().name(), self.name())
join=Join(srcE, dstE)
self.addJoin(join)
return
def getXMLDOM(self, doc=None, parentNode=None, encoding='iso-8859-1'):
"""
Returns the (DOM) DocumentObject for the receiver.
Parameters 'doc' and 'parentDoc' should be both omitted or supplied.
If they are omitted, a new DocumentObject is created.
If they are supplied, elements are added to the parentNode.
Returns: the (possibly new) DocumentObject.
"""
#if (doc is None) ^ (parentNode is None):
# raise ValueError, "Parameters 'doc' and 'parentNode' should be together supplied or omitted"
#if doc is None:
# doc=createDOMDocumentObject('relation')
# parentNode=doc.documentElement
# node=parentNode
#else:
# node=doc.createElement('relation')
# parentNode.appendChild(node)
##
#exportAttrDict=self.xmlAttributesDict()
#for attr in exportAttrDict.keys():
# value=self.xmlGetAttribute(attr)()
# value=strToUnicode(str(value), encoding)
# node.setAttribute(attr, value)
doc,node=Relationship.getXMLDOM.im_func(self, doc, parentNode, encoding)
for join in self.joins(): # Joins
join.getXMLDOM(doc, node, encoding)
return doc
def getXMLNodeName(self):
"""Returns string "relation". See interfaces.XMLCapability for details"""
return "relation"
def xmlAttributesDict(self):
"."
d=Relationship.xmlAttributesDict.im_func(self)
d.update({
'destinationEntity' : ('string',
lambda self=None,p=None: None,
self.destinationEntityName),
'joinSemantic' : ('string',
self.setJoinSemantic,
self.joinSemantic),
})
return d
##
## KeyValueCoding error handling
##
def handleAssignementForUnboundKey(self, value, key):
if key=='doc': self.setComment(value)
elif key=='delete':
self.setDeleteRule(value)
else:
raise AttributeError, key
handleTakeStoredValueForUnboundKey=handleAssignementForUnboundKey
class FlattenedRelationship(Relationship):
"See interfaces.Relationship for details"
# Flattened relationships caches the componentRelationships
_v_componentRels=None
def __init__(self, aName=''):
"""
Initializes a flattened relationship. A name **must** be provided.
"""
Relationship.__init__.im_func(self, aName)
self._definition = None
self._multLower = 0
self._multUpper = -1 # -1 for 'n' or '*' (no upper bound)
#self._ownsDestination = 0
return
def raiseUnappropriate(self):
"-"
raise "Nonsense", "Unappropriate for a flattened relationship"
# Check
def checkRelationshipValidity(self):
"""
Not Implemented
Checks the relationship validity. This includes the following tests:
- cardinality > 0
- definition is set
- name
- source and destinationEntity are not None
Raises the relation is invalid.
"""
# multiplicity bounds cannot be wrong, thus they do not need to be checked
pass
def addJoin(self, aJoin):
"See interfaces.Relationship for details"
self.raiseUnappropriate()
def componentRelationships(self):
"See interfaces.Relationship for details"
if self._v_componentRels is not None: return self._v_componentRels
# Calculates the components
if not self._definition: return None
keys=string.split(self._definition, '.')
self._v_componentRels=[]
try:
currentEntity=self.entity()
for key in keys:
_rel=currentEntity.relationshipNamed(key)
if not _rel:
raise 'InvalidDefinition', "Unable to find relation '%s' "\
"in entity '%s'"%(key, currentEntity.name())
if _rel.isFlattened():
raise 'InvalidDefinition', "Found in definition a flattened "\
"relation ('%s', entity:'%s')"%(key, currentEntity.name())
self._v_componentRels.append(_rel)
currentEntity=_rel.destinationEntity()
except:
raise AttributeError, 'Unable to compute the component relationships'
return tuple(self._v_componentRels)
def definition(self):
"See interfaces.Relationship for details"
return self._definition
def destinationAttributes(self):
"""
Unimplemented
"""
raise NotImplementedError
#return ()
def destinationEntity(self):
"""
Returns the destination entity, or 'None' if the relationship has no
definition yet
"""
components=self.componentRelationships()
if not components: return None
return components[-1:][0].destinationEntity()
def destinationEntityName(self):
"Return the destination entity, or None"
try:
return self.destinationEntity().name()
except AttributeError:
return None
def inverseRelationship(self):
"""
Returns the inverse relationship in the destination entity,
or None if it does not exist
"""
#import pdb ; pdb.set_trace()
myDestEntity=self.destinationEntity()
sameTypeAndDestination=lambda rel, dst=self.entity(): \
rel.destinationEntity()==dst and rel.isFlattened()
possibleInverse=filter(sameTypeAndDestination,myDestEntity.relationships())
if not possibleInverse: return None
# Check all possible relationships
self_components=self.componentRelationships()
for inverseRel in possibleInverse:
inverse_components=inverseRel.componentRelationships()
if len(self_components)!=len(inverse_components): continue
isInverse=1
l=len(self_components)
for index in range(0, l):
if self_components[index].inverseRelationship()!=\
inverse_components[l-1-index]:
isInverse=0
break
if isInverse: return inverseRel
return None
def isCompound(self):
"-"
self.raiseUnappropriate()
def isFlattened(self):
"See interfaces.Relationship for details"
return 1
def isMandatory(self):
"""
Indicates whether the relationship requires at least a destination
entity to be bound. This is a computed attribute, equivalent to:
'self.multiplicityLowerBound()>0'.
"""
#return self._multLower>0
def isSimple(self):
"""
Unappropriate
"""
self.raiseUnappropriate()
def isToMany(self):
"""
Indicates whether the relationship has a maximum cardinality > 1.
This is the contrary of method isToOne.
"""
return self._multUpper > 1 or self._multUpper == -1
def isToOne(self):
"""
Indicates whether the relationship has a maximim cardinality = 1.
This is the contrary of method isToMany.
"""
return not self.isToMany()
def joins(self):
"Unappropriate for flattened relationship"
self.raiseUnappropriate()
def ownsDestination(self): # strict equival. toUML aggregate ?
"""
Unappropriate
"""
self.raiseUnappropriate()
def propagatesPrimaryKey(self):
"""
Returns true if the relationship propagates PK value, false otherwise
_TBD should be explained
"""
return 0
def removeJoin(self, aJoin):
"-"
self.raiseUnappropriate()
def setDefinition(self, definition):
"See interfaces.Relationship for details"
self._definition=definition
self._v_componentRels=None
# ??? Override to deny toOne for flattened??
#def setMultiplicityUpperBound(self, upperBound):
# """
# Sets the upper bound of the relationship's multiplicity.
# Parameter 'upperBound' must be a strictly positive integer, or -1 for a
# non constrained to-many relationship.
# """
# # TBD acceptable value are range(1,n) or -1 (no upper bound)
# assert(int(upperBound)>0 or int(upperBound)==-1)
# self._multUpper=int(upperBound)
#
#def setMultiplicity(self, lowerBound, upperBound):
# """
# Sets the lower and upper bounds for the relationship's multiplicity.
# See also: setMultiplicityLowerBound, setMultiplicityUpperBound.
# """
# self.setMultiplicityLowerBound(lowerBound)
# self.setMultiplicityUpperBound(upperBound)
def setOwnsDestination(self, boolean):
"""
Tells the receiver whether it owns its destination objects.
When a relationship owns its destination, the related object(s)
cannot exist without the source object.
"""
if boolean:
self._ownsDestination=1
else:
self._ownsDestination=0
def setPropagatesPrimaryKey(self, aBool):
"""
Tells the relationship whether it propagates PK value
_TBD should be explained
"""
if aBool:
self._propagatesPK=1
else:
self._propagatesPK=0
def sourceAttributes(self):
"""
Unimplemented
"""
raise NotImplementedError
def validateRelationship(self):
"""Validates the relationship against general model consistency:
- relationship should have a source and a destination entity
-
"""
raise NotImplementedError
def validateValue(self, value, object=None):
"""
Checks whether the supplied value is valid, i.e. the following tests are
made:
1. value cannot be *void* (None or an empty list) if the relationship
is mandatory
2. 'value' matches the multiplicity bounds
3. Value's entity is the relationship's destination entity (same for
'value' 's items if 'value' is a tuple)
Parameter 'value' is required, and 'object' is optional --it is
only used to make the error message clearer in case of failure.
Silently returns if it succeeds.
In case of failure, it raises 'Modeling.Validation.ValidationException' ;
in this case, exception's *value* ('sys.exc_info[1]') consists in a set
of lines, describing line per line the different tests which failed.
"""
_error=Validation.ValidationException()
# isMandatory?
if (not value) and self.isMandatory():
_error.addErrorForKey(Validation.REQUIRED, self.name())
raise _error # stop here!
if (not value): return
# correct length/multiplicity?
if type(value)!=types.TupleType: _values=[value,]
else: _values=list(value)
if len(_values)<self.multiplicityLowerBound():
_error.addErrorForKey(Validation.LOWER_BOUND, self.name())
if self.multiplicityUpperBound()!=-1 and \
len(_values)>self.multiplicityUpperBound():
_error.addErrorForKey(Validation.UPPER_BOUND, self.name())
# values refer to the correct entity?
_values2=list(_values) # _TBD do we need a copy here? (cf.'for' statement)
validNames=[self.destinationEntity().name()]
validNames+=self.destinationEntity().allSubEntitiesNames()
for _value in _values:
try:
if _value.entityName() not in validNames:
_error.addErrorForKey(Validation.TYPE_MISMATCH, self.name())
# remove this value from the list since following tests are inaccurate
# for such a value
_values2.remove(_value)
except AttributeError:
_error.addErrorForKey(Validation.MODEL_ERROR, self.name())
# Note: continue additional checks with remaining _values
# Note2: you're not blind, there's actually no such additional checks for
# the moment being.
_values = _values2
_error.finalize()
# special
def __eq__(self, aRelationship):
"Tests whether both relationship are equal"
if aRelationship is None or not aRelationship.isFlattened():
return 0
try:
# _TBD define __eq__ for Entity
if self.name()!=aRelationship.name() or \
self.deleteRule()!=aRelationship.deleteRule() or \
self.definition()!=aRelationship.definition() or \
self.displayLabel()!=aRelationship.displayLabel() or \
self.entity().name()!=aRelationship.entity().name() or \
self.isFlattened()!=aRelationship.isFlattened() or \
self.isMandatory()!=aRelationship.isMandatory() or \
self.destinationEntity().name()!=aRelationship.destinationEntity().name() or \
self.multiplicityLowerBound()!=aRelationship.multiplicityLowerBound() or \
self.multiplicityUpperBound()!=aRelationship.multiplicityUpperBound() or \
self.propagatesPrimaryKey()!=aRelationship.propagatesPrimaryKey():
return 0
except AttributeError:
return 0
return 1
def __str__(self):
"Returns a short representation of the relationship"
if self.entity():
return "%s.%s"%(self.entity().name(), self.name())
else:
return "%s.%s"%('<Not bound to an entity>', self.name())
# XML
#def initWithXMLDOMNode(self, aNode, encoding='iso-8859-1'):
# """
# Initializes a relationship with the supplied xml.dom.node.
# """
# Relationship.initWithXMLDOMNode.im_func(self, aNode, encoding)
# return
def getXMLDOM(self, doc=None, parentNode=None, encoding='iso-8859-1'):
"-"
doc,node=Relationship.getXMLDOM.im_func(self, doc, parentNode, encoding)
return doc
def getXMLNodeName(self):
"""Returns string "flattenedRelation".
See interfaces.XMLCapability for details"""
return "flattenedRelation"
def xmlAttributesDict(self):
"-"
d=Relationship.xmlAttributesDict.im_func(self)
d.update({
'definition': ('string',
self.setDefinition,
self.definition)
})
return d
# Validation
#
|