# -*- coding: iso-8859-1 -*-
#-----------------------------------------------------------------------------
# Modeling Framework: an Object-Relational Bridge for python
#
# Copyright (c) 2001-2004 Sbastien Bigaret <sbigaret@users.sourceforge.net>
# All rights reserved.
#
# This file is part of the Modeling Framework.
#
# This code is distributed under a "3-clause BSD"-style license;
# see the LICENSE file for details.
#-----------------------------------------------------------------------------
""" ... describe me please I feel alone...
$Id: Model.py 957 2004-11-30 17:07:37Z sbigaret $"""
__version__='$Revision: 957 $'[11:-2]
class InvalidNameException(Exception):
"..."
pass
class ModelError(Exception):
"..."
pass
#from Entity import Entity
from Modeling.utils import isaValidName,base_persistent_object
from Modeling.XMLutils import *
from Modeling.KeyValueCoding import KeyValueCoding
import types
def updateModelWithCFG(model, cfg_path=None):
"""
Updates the model's connection dictionary and adaptorName with the values
in file 'cfg_path'. If cfg_path is omitted or None, the value stored
in the environment variable MDL_DB_CONNECTIONS_CFG is used instead.
A sample configuration file is like::
[DEFAULT]
host: localhost
[ModelName_1]
user: user_1
password: pwd_1
[ModelName_2]
adaptor: MySQL
user: user_2
password: pwd_2
The special field 'adaptor', if present, changes the adaptorName of the
model.
Raises IOError if file 'cfg_path' cannot be found.
See also: ModelSet.addModel()
:Parameters:
- `model`: the model whose conn.dict. should be updated
- `cfg_path`: the full path to the configuration file, or if omitted or
``None``, defaults to the value stored in the env. variable
``MDL_DB_CONNECTIONS_CFG``
"""
import os
if cfg_path is None:
cfg_path=os.environ.get('MDL_DB_CONNECTIONS_CFG')
if not cfg_path:
return
from ConfigParser import ConfigParser
defaults=model.connectionDictionary()
cp=ConfigParser()
try:
cp.readfp(open(cfg_path))
except IOError:
import traceback, cStringIO, sys
exc_raised=sys.exc_info()[:2]
err_msg="Unable to open file '%s' (passed in parameter 'cfg_path' or taken from env. variable MDL_DB_CONNECTIONS_CFG"%cfg_path
exc=cStringIO.StringIO()
traceback.print_exception(exc_raised[0], exc_raised[1], None, file=exc)
err_msg+="\nOriginal exception was: %s"%exc.getvalue()
raise IOError, err_msg
try: options=cp.options(model.name())
except: return
try: options.remove('adaptor')
except ValueError: pass
for key in options:
defaults[key]=cp.get(model.name(), key)
model.setConnectionDictionary(defaults)
try:
model.setAdaptorName(cp.get(model.name(), 'adaptor'))
except:
pass
def loadModel(path):
"""
Load a model stored in the file `path`. The lookup procedure is:
- if `path` ends with ``.py``, we assume it is a python module. This module
is imported and the following attributes are searched within it, in that
order:
1. ``model`` (either an attribute or a function): if found, the method
determines whether this is a `Model.Model` or a `PyModel.Model`.
If the PyModel has not been built yet, it is build() here.
2. ``pymodel`` (either an attribute or a function): if found, we
assume this is an instance of PyModel.Model and we return
its 'component' attribute. If the PyModel has not been built yet,
it is build() here.
3. ``model_src`` (id.): if found, we assume this is a string and the
returned model is built from that xml stream with `loadXMLModel()`
- if `path` ends with ``.xml``, we assume this is a xml-file and we return
the model build with loadXMLModel()
Note: this method calls `updateModelWithCFG()` on the model before returning
it. If you want to have the non-updated model, use `_loadModel` instead.
:param path: the path of the file where the model is stored
:return: the loaded Modeling.Model.Model instance
:raise ValueError: if file 'path' cannot be handled, or IOError or
ImportError if the files does no exists, has errors, etc.
"""
model, model_type = _loadModel(path)
updateModelWithCFG(model)
return model
def _loadModel(path):
"""
Note: this method does NOT call updateModelWithCFG()
:return: a tuple ``(model, model_type)``, where ``model`` is the model
itself, and ``model_type`` a string being either ``"xmlmodel"`` or
``"pymodel"`` indicating whether the model was found in a xml stream or in
a `PyModel`.
"""
model_type = "xmlmodel"
if path[-4:]=='.xml':
model=loadXMLModel(open(path, 'rb').read())
elif path[-3:]=='.py':
import os,imp
dir,filename=os.path.dirname(path),os.path.basename(path)
modulename=filename[:-3]
file=None
try:
try:
file,pathname,description=imp.find_module(modulename, [dir])
module=imp.load_module(modulename,file,pathname,description)
if hasattr(module, 'model'):
model=module.model
from NotificationFramework.mems_lib import isinstance
if callable(model) and not isinstance(model, Model):
# Model.Model may derive from ZODB.Persistent if ZODB is installed:
# in this case it is callable but it shouldn't be called!
model=model()
import PyModel
if isinstance(model, PyModel.Model):
model_type = "pymodel"
if not model.is_built:
model.build()
model=model.component
elif hasattr(module, 'pymodel'):
model_type = "pymodel"
pymodel=module.pymodel
if callable(pymodel):
pymodel=pymodel()
if not model.is_built:
pymodel.build()
model=pymodel.component
elif hasattr(module, 'model_src'):
model_src=module.model_src
if callable(model_src):
model_src=model_src()
from ModelSet import ModelSet
model=loadXMLModel(model_src)
else:
raise ValueError, "Couldn't find any of these attributes in python file '%s': model, pymodel (PyModel) or model_src (xml)"%path
except ImportError:
raise
except:
import cStringIO, traceback
exc=cStringIO.StringIO()
traceback.print_exc(file=exc)
raise ValueError, 'Unable to handle python file %s\nReason: exception raised:\n%s'%(path,exc.getvalue())
finally:
if file: file.close()
else:
raise ValueError, 'Unable to handle file %s: unrecognized format (filename should end with either with .py or .xml)'%path
return model, model_type
def loadXMLModel(xml_content):
"""
Loads an xml-model and returns the corresponding Model.
:param xml_content: a string containing the whole xml stream. If the xml is
stored in a file, it is suggested that the file is opened in binary mode
(for example, ``loadXMLModel(open(file_path, 'rb').read())``)
"""
from XMLutils import autoDetectXMLEncoding,XMLImportError,unicodeToStr
from xml.dom.minidom import parseString
encoding=autoDetectXMLEncoding(xml_content)
xmldoc=parseString(xml_content)
# Do we have only one model? _TBD: a DTD should be responsible for this!
modelNode=xpath.Evaluate('/model', contextNode=xmldoc)
if len(modelNode)!=1:
raise XMLImportError, 'Only one model in a xml file!'
# Ok, go!
modelNode=modelNode[0]
modelName=modelNode.getAttribute('name')
model=Model(unicodeToStr(modelName, encoding))
model.initWithXMLDOMNode(modelNode, encoding)
return model
def searchModel(modelName, path=None, verbose=0):
"""
Searches for the model named `modelName` by trying `loadModel()` with the
following paths: ``pymodel_<modelName>.py``, ``model_<modelName>.py`` and
``model_<modelName>.xml`` in the current directory and the MDL/ directory.
:return: the model, or None if it cannot be found/loaded
"""
if verbose:
import sys
mylog=lambda msg, stderr=sys.stderr:stderr.write('[Model.searchModel] %s\n'%msg)
else:
mylog=lambda msg: None
searchList=('pymodel_%s.py', 'model_%s.py', 'model_%s.xml',
'MDL/pymodel_%s.py', 'MDL/model_%s.py', 'MDL/model_%s.xml')
for file in searchList:
model=None
file=file%modelName
if path:
import os
file=os.path.join(path, file)
try:
mylog('Trying %s'%file)
model=loadModel(file)
except (IOError, ValueError, ImportError):
import cStringIO, traceback
exc=cStringIO.StringIO()
traceback.print_exc(file=exc)
mylog('Not found: %s\n Exception:%s'%(file,exc.getvalue()))
del exc
pass
if model:
return model
mylog('Not found: %s'%file)
mylog('modelName %s: All possibilities exhausted -- returning None'%modelName)
return None
class Model(base_persistent_object, XMLCapability, KeyValueCoding):
"Describes a model"
_is_a_model = 1
_adaptorName=''
_connectionDictionary={}
_packageName=''
_comment=''
def __init__(self, aName=''):
"Initializes a model. A name **must** be provided."
# Commented out becoz of persistence
if not isaValidName(aName):
raise ModelError, "A name must be provided at creation time."
assert type(aName)==types.StringType
self._connectionDictionary={}
self._entities=()
self._modelSet = None
self._adaptorName=None
self._packageName=''
#if aName: self.setName(aName)
self.setName(aName)
def cacheSimpleMethods(self):
"""
Iterates on entities, entities' attributes and relationships, and self,
and caches the result of methods taking only one argument ('self').
This needs to be enabled by setting the environment variable
MDL_ENABLE_SIMPLE_METHOD_CACHE to any non-empty string; you want to leave
it disabled when you manipulate models at runtime, for example in a model
designer.
See: Modeling.utils.cache_simple_methods
"""
from utils import cache_simple_methods
for e in self.entities():
for a in e.attributes():
cache_simple_methods(a)
for r in e.relationships():
if not r.isFlattened():
for j in r.joins():
cache_simple_methods(j)
cache_simple_methods(r)
cache_simple_methods(e)
cache_simple_methods(self, ['cacheSimpleMethods'])
def adaptorName(self):
"Returns the DB adaptor name. It may be None."
return self._adaptorName
def addEntity(self, anEntity):
"""
Adds 'anEntity' to the model, and silently returns if 'anEntity' is
already registered within the Model.
Raises 'ModelError' if the anEntity's name is already used by an other
entity in the model or its ModelSet.
"""
if anEntity in self._entities:
return
# Entities share a single namespace: check this!
if self.isEntityNameDeclaredInNamespace(anEntity.name()):
raise ModelError, "Attempt to insert an entity in model %s but an "\
"entity named '%s' is already registered either in the model "\
"or its ModelSet" % (self.name(), anEntity.name())
_entities=list(self.entities())
_entities.append(anEntity)
self._entities=tuple(_entities)
anEntity._setModel(self)
def comment(self):
"Returns the comment field"
return self._comment
def connectionDictionary(self):
"Returns the connection dictionary"
return self._connectionDictionary
# DB connection info.?
def entities(self):
"Returns the whole set of attributes"
return self._entities # tuple
def entityForObject(self, aDatabaseObject):
"""
Returns the entity corresponding to the supplied object, or 'None' if
it cannot be found
"""
_entityName=aDatabaseObject.entityName()
return self.entityNamed(_entityName)
def entityNamed(self, aName):
"Returns the entity named 'aName' or None if it does not exist."
for _entity in self.entities():
if _entity.name() == aName: return _entity
return None
def entitiesNames(self):
"Returns the whole set ('tuple') of entities' name."
return map(lambda o: o.name(), self.entities())
#def isaValidEntityName(self):
# """
# **Unimplemented** for the moment being (raise NotImplementedError)
# Returns 'true' if modelSet() is 'None', or if modelSet() does not
# hold another model where an entity is already registered with the
# same name.
# """
# # _TBD: bound to check + called by ModelSet when a Model is added
# raise NotImplementedError
def isEntityNameDeclaredInNamespace(self, aName):
"""
Checks whether the name is already registered within the model or its
ModelSet if it has one.
"""
if aName in self.entitiesNames():
return 1
if self.modelSet() and aName in self.modelSet().entitiesNames():
return 1
return 0
def modelSet(self):
"Returns the parent Modeling.ModelSet"
return self._modelSet
def name(self):
"Returns the Model's name"
return self._name
def packageName(self):
"""
Returns the name of the package that will be used when generating
code templates for that model
"""
return self._packageName
def removeEntity(self, anEntity):
"""
Removes the supplied entity from the registered entities.
Parameter 'anEntity' can be either an 'Entity' instance or a
subentity's name.
No referential checks are done.
Raises 'ModelError' if 'anEntity' cannot be found.
"""
#assert ((type(anEntity)==types.StringType) or
# (type(anEntity)==types.InstanceType and anEntity.__class__ is Entity)) #+Assert
if type(anEntity)==types.StringType:
anEntity = self.entityNamed(anEntity)
try:
_ents = list(self._entities)
_ents.remove(anEntity)
self._entities=tuple(_ents)
except ValueError:
# pb: None has no name()!!!
raise ModelError, "Model '%s' has no entity named '%s'" % (self.name(), anEntity.name())
anEntity._setModel(None)
def removeEntityAndReferences(self, anEntity):
""" **UNIMPLEMENTED**
Removes the supplied entity from the registered entities,
as well as all related relationships in other registered entities.
"""
# _TBD: Note: same for attributes when flattened (derived? no) will be taken into account
raise NotImplementedError
def setAdaptorName(self, adaptorName):
"Sets the DB adaptor name"
self._adaptorName=adaptorName
def setComment(self, aComment):
"Sets the comment field"
self._comment=aComment
def setConnectionDictionary(self, aConnectionDictionary):
"Sets the connection dictionary"
if type(aConnectionDictionary)==type(''):
aConnectionDictionary=eval(aConnectionDictionary)
if type(aConnectionDictionary)!=type({}):
raise ValueError, 'the connectionDictionary should be a dictionary, or'\
' a string which can be parsed and evaluated to produce a '\
"dictionary (cf. '__builtins__.eval()')"
self._connectionDictionary=aConnectionDictionary
def _setModelSet(self, aModelSet):
"Sets the parent Modeling.ModelSet"
self._modelSet = aModelSet
def setName(self, aName):
"Sets the model's name. Raises if invalid"
if not isaValidName(aName):
raise ModelError, "Supplied name incorrect (%s)" % aName
self._name = aName
def setPackageName(self, packageName):
"""
Sets the name of the package that will be used when generating
code templates for that model
"""
self._packageName=packageName
def validateObject(self, aModel):
"Checks whether the supplied object is valid"
raise NotImplementedError
# XML functionalities
def initWithXMLDOMNode(self, aNode, encoding='iso-8859-1'):
"""
Initializes a model with the supplied xml.dom.node.
A XMLutils.XMLImportError is raised if the model is not empty, i.e. if it
already holds some entities
"""
if self.entities():
raise XMLImportError, "Cannot initialize a non-empty model"
k_v=aNode.attributes.items()
for attributeName, value in k_v:
attrType=self.xmlAttributeType(attributeName)
set=self.xmlSetAttribute(attributeName)
if attrType=='string': value=unicodeToStr(value, encoding)
set(value)
# Entities now
entitiesNodes=xpath.Evaluate('/model/entity', contextNode=aNode)
# Phase 1: just attributes
from Entity import Entity
for node in entitiesNodes:
name=node.getAttribute('name')
entity=Entity(unicodeToStr(name, encoding))
self.addEntity(entity)
entity.initWithXMLDOMNode(node, phase=1, encoding=encoding)
# Phase 2: time for relationships
for node in entitiesNodes:
name=node.getAttribute('name')
entity=self.entityNamed(unicodeToStr(name, encoding))
entity.initWithXMLDOMNode(node, phase=2, encoding=encoding)
def getXMLDOM(self, doc=None, parentNode=None, encoding='iso-8859-1'):
"""
Returns the (DOM) DocumentObject for the receiver.
Parameters 'doc' and 'parentDoc' should be both omitted or supplied.
If they are omitted, a new DocumentObject is created.
If they are supplied, elements are added to the parentNode.
Returns: the (possibly new) DocumentObject.
"""
if (doc is None) ^ (parentNode is None):
raise ValueError, "Parameters 'doc' and 'parentNode' should be together supplied or omitted"
if doc is None:
doc=createDOMDocumentObject('model')
parentNode=doc.documentElement
node=parentNode
else:
raise NotImplementedError
exportAttrDict=self.xmlAttributesDict()
for attr in exportAttrDict.keys():
value=self.xmlGetAttribute(attr)()
value=strToUnicode(str(value), encoding)
node.setAttribute(attr, value)
entities=list(self.entities())
entities.sort(lambda e1, e2: cmp(e1.name(),e2.name()))
for entity in entities:
entity.getXMLDOM(doc, node, encoding)
return doc
def saveModelAsXMLFile(self, xmlFilePath, encoding='iso-8859-1'):
"Saves the model in specified file --xml-formatted"
import StringIO
_strIO=StringIO.StringIO()
doc=self.getXMLDOM(encoding=encoding)
from xml.dom.ext import PrettyPrint
PrettyPrint(doc, stream=_strIO, encoding=encoding)
_str=_strIO.getvalue()
_strIO.close()
file=open(xmlFilePath, 'wb')
file.write(_str)
file.close()
return
def xmlAttributesDict(self):
"."
import types
return {'name': ('string',
lambda self=None,p=None: None,
self.name ),
'adaptorName': ('string',
self.setAdaptorName,
self.adaptorName),
'packageName': ('string',
self.setPackageName,
self.packageName),
'connectionDictionary': ('string',
self.setConnectionDictionary,
self.connectionDictionary),
'comment': ('string',
self.setComment,
self.comment),
}
##
## KeyValueCoding error handling
##
def handleAssignementForUnboundKey(self, value, key):
if key=='connDict': self.setConnectionDictionary(value)
elif key=='doc': self.setComment(value)
else:
raise AttributeError, key
handleTakeStoredValueForUnboundKey=handleAssignementForUnboundKey
# Validation
#
# General
# -
#
# Generation
# - packageName should exist
#
|