#! /usr/bin/env python
# -*- coding: iso-8859-1 -*-
#-----------------------------------------------------------------------------
# Modeling Framework: an Object-Relational Bridge for python
#
# Copyright (c) 2001-2004 Sbastien Bigaret <sbigaret@users.sourceforge.net>
# All rights reserved.
#
# This file is part of the Modeling Framework.
#
# This code is distributed under a "3-clause BSD"-style license;
# see the LICENSE file for details.
#-----------------------------------------------------------------------------
from __future__ import nested_scopes
"""
Global tests for EditingContext
This tests the editing context and the whole underlying framework
CVS information
$Id: test_EditingContext_Global.py 942 2004-09-21 18:49:10Z sbigaret $
"""
__version__='$Revision: 942 $'[11:-2]
import unittest, sys
import utils
if __name__ == "__main__":
utils.disable_model_cache()
if '-c' in sys.argv or '-C' in sys.argv \
or '-m' in sys.argv or '-M' in sys.argv:
utils.fixpath(include_testPackages=0)
utils.dynamically_build_test_packages(sys.argv)
else:
utils.fixpath(include_testPackages=1)
from Modeling import ModelSet,Model
from AuthorBooks.Writer import Writer
from AuthorBooks.Book import Book
from Modeling.EditingContext import EditingContext
from Modeling.FetchSpecification import FetchSpecification
from Modeling.Qualifier import qualifierWithQualifierFormat
from Modeling.Validation import ValidationException,\
CUSTOM_OBJECT_VALIDATION, OBJECT_WIDE_KEY, DELETE_DENY_KEY
from Modeling import Adaptor
from Modeling import Database
class Writer_test07(Writer):
refusesValidateForDelete=0
def validateLastName(self, value):
"Refuses empty strings"
if not value:
raise ValidationException
def validateForDelete(self):
_error=ValidationException()
try:
Writer.validateForDelete(self)
except ValidationException, error:
_error.aggregateException(error)
if self.refusesValidateForDelete:
_error.aggregateError(CUSTOM_OBJECT_VALIDATION, OBJECT_WIDE_KEY)
_error.finalize()
# utilities
def concreteAdaptorChannelForEntity(anEntityName, ec):
fs=FetchSpecification(anEntityName)
dbContext=ec.rootObjectStore().objectStoreForFetchSpecification(fs)
return dbContext.availableChannel().adaptorChannel()
class TestEditingContext_Global(unittest.TestCase):
"Global tests for EditingContext"
def test_00_insertObject_loads_databaseContexts(self):
"[EditingContext] ..."
# drop any existing default ObjectStoreCoordinator
from Modeling import ObjectStoreCoordinator
ObjectStoreCoordinator.setDefaultCoordinator(None)
ec=EditingContext()
w=Writer()
ec.insertObject(w)
objStoreCoord=ec.parentObjectStore()
self.failUnless(objStoreCoord)
self.failUnless(len(objStoreCoord.cooperatingObjectStores())==1)
self.failUnless(objStoreCoord.cooperatingObjectStores()[0].ownsObject(w))
def test_01_objectsWithFetchSpecification_alone(self):
"[EditingContext] objectsWithFetchSpecification alone"
ec=EditingContext()
fetchSpec=FetchSpecification(entityName='Writer')
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(len(objects)==3)
for object in objects:
if object.getLastName()=='Dard':
dard=object
if object.getLastName()=='Cleese':
cleese=object
if object.getLastName()=='Rabelais':
rabelais=object
self.failUnless(cleese.getLastName()=='Cleese')
object=cleese
self.failUnless('_lastName' in object.__dict__.keys())
self.failUnless('_firstName' in object.__dict__.keys())
self.failUnless('_age' in object.__dict__.keys())
self.failIf('_id' in object.__dict__.keys())
self.failIf('id' in object.__dict__.keys())
self.failIf('fk_writer_id' in object.__dict__.keys())
self.failIf('_fk_writer_id' in object.__dict__.keys())
# check types
self.failIf(cleese.getAge()!=24)
from mx.DateTime import DateTime
self.failUnless(type(cleese.getBirthday())!=DateTime)
#print objects
#print objects[0].getE1()
#print objects[0].__dict__
#print objects[1].__dict__
#print objects[2].__dict__
#print '------------------Triggerring fault ------------------------'
dard_pygmalion=dard.getPygmalion()
self.failIf(dard_pygmalion.isFault()) # Shouldnt be a fault
adaptorChannel=concreteAdaptorChannelForEntity('Writer', ec)
fetchCount=adaptorChannel._count_for_execute
dard_pygmalion.getLastName() # hence, this should not fetch from DB
self.failIf(adaptorChannel._count_for_execute!=fetchCount)
self.failIf(dard_pygmalion.isFault())
# Last, check that the snapshots exists in the Database
dard_gID=ec.globalIDForObject(dard)
pyg_dard_gID=ec.globalIDForObject(dard_pygmalion)
db=ec.parentObjectStore().objectStoreForGlobalID(dard_gID).database()
self.failUnless(db.snapshotForGlobalID(dard_gID))
def test_01b_objectsWithFetchSpecification_and_insertObject(self):
"[EditingContext] objectsWithFetchSpecification & insertObject"
# We want to check that objectsWithFetchSpecification() returns
# objects within the database AND the newly inserted objects
ec=EditingContext()
w=Writer()
ec.insertObject(w)
fetchSpec=FetchSpecification(entityName='Writer')
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(w in objects)
def test_01c_objectsWithFetchSpecification_closes_adaptorChannel(self):
"[EditingContext] objectsWithFetchSpecification closes adaptorChannel"
# We want to check that objectsWithFetchSpecification() closes its
# adaptor channel afetr finishing
ec=EditingContext()
fetchSpec=FetchSpecification(entityName='Writer')
objects=ec.objectsWithFetchSpecification(fetchSpec)
objStore=ec.rootObjectStore().objectStoreForFetchSpecification(fetchSpec)
adaptorChannel=objStore.availableChannel().adaptorChannel()
self.failIf(adaptorChannel.isOpen())
def test_01d_fetch(self):
"[EditingContext] fetch == objectsWithFetchSpecification"
ec=EditingContext()
qualifier=qualifierWithQualifierFormat('lastName like "?a*"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects_1=ec.objectsWithFetchSpecification(fetchSpec)
objects_2=ec.fetch('Writer', 'lastName like "?a*"')
self.failIf(len(objects_1)!=len(objects_2))
self.failIf([o for o in objects_1 if o not in objects_2])
objects_3=ec.fetch('Writer', qualifier)
self.failIf(len(objects_1)!=len(objects_3))
self.failIf([o for o in objects_1 if o not in objects_3])
def test_02_toOneFaultTrigger(self):
"[EditingContext] toOneFaultTrigger"
ec=EditingContext()
qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(len(objects)==1)
dard=objects[0]
self.failUnless(dard.getLastName()=='Dard')
dard_pygmalion=dard.getPygmalion()
self.failUnless(dard_pygmalion.isFault())
dard_pygmalion.getLastName() #trigger
self.failIf(dard_pygmalion.isFault())
self.failUnless(dard._pygmalion==dard_pygmalion)
# Last, check that the original object containing the toMany fault was not
# marked as changed
self.failIf(dard in ec.updatedObjects())
self.failIf(ec.hasChanges())
def test_02b_toOneFaultOrNone(self):
"[EditingContext] toOne relationship are None, not fault"
ec=EditingContext()
qualifier=qualifierWithQualifierFormat('lastName=="Rabelais"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
rabelais=ec.objectsWithFetchSpecification(fetchSpec)[0]
self.failUnless(rabelais.getPygmalion() is None)
def test_03_toManyFaultTrigger(self):
"[EditingContext] toManyFaultTrigger"
ec=EditingContext()
qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(len(objects)==1)
dard=objects[0]
#print '--------- triggering toMany fault -----'
#print dard.getBooks()
#print len(dard.getBooks())
#print type(dard.getBooks())
#print dard.getBooks()[0]
adaptorChannel=concreteAdaptorChannelForEntity('Writer', ec)
books=dard.getBooks()
self.failUnless(len(books)==3) # triggers the fault
fetchCount=adaptorChannel._count_for_execute
# all subsequent accesses to the AccessArrayFaultHandler should not
# trigger the already-fired-fault
self.failUnless(books[0]) # just test access w/ __getitem__
self.failUnless(adaptorChannel._count_for_execute==fetchCount)
# Last, check that the original object containing the toMany fault was not
# marked as changed
self.failIf(dard in ec.updatedObjects())
def test_04_trackingChanges(self):
"[EditingContext] tracking changes and processing changes"
##
## These tests was moved here from test_EditingContext.py
## We need the whole framework, somehow, so that fetch is available
##
ec=EditingContext()
w=Writer()
ec.insertObject(w)
self.failUnless(ec.hasChanges(), 'has changes')
self.failUnless(ec.hasUnprocessedChanges(), 'has unprocessed changes')
self.failIf(ec.insertedObjects())
ec.processRecentChanges()
self.failIf(ec.hasUnprocessedChanges(), 'no unprocessed changes anymore')
self.failUnless(ec.hasChanges(), 'has changes')
self.assertEqual(ec.insertedObjects(), [w])
# new EC
ec=EditingContext()
fetchSpec=FetchSpecification(entityName='Writer')
ws=ec.objectsWithFetchSpecification(fetchSpec)
ws[0].setFirstName('Glop')
self.failUnless(ec.hasChanges(), 'has changes')
self.failUnless(ec.hasUnprocessedChanges(), 'has unprocessed changes')
self.failIf(ec.updatedObjects())
ec.processRecentChanges()
self.failIf(ec.hasUnprocessedChanges(), 'no unprocessed changes anymore')
self.failUnless(ec.hasChanges(), 'has changes')
self.assertEqual(ec.updatedObjects(), [ws[0]])
# new EC
ec=EditingContext()
qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
dard=ec.objectsWithFetchSpecification(fetchSpec)[0]
ec.deleteObject(dard)
self.failIf(ec.deletedObjects())
self.failUnless(ec.hasChanges(), 'has changes')
self.failUnless(ec.hasUnprocessedChanges(), 'has unprocessed changes')
dard.getPygmalion().willRead()
#print '###############', dard.getPygmalion()
#print '###############', dard.storedValueForKey('pygmalion')
ec.processRecentChanges()
self.failIf(ec.hasUnprocessedChanges(), 'no unprocessed changes anymore')
self.failUnless(ec.hasChanges(), 'has changes')
#self.assertEqual(ec.deletedObjects(), [dard])
def test_05_deleteRule_cascade(self):
"[EditingContext] processRecentChanges: CASCADE"
# tests for CASCADE
# __TBD we do not need to perform an actual fetch from the DB:
# __TBD this is just a shortcut here, and it should be eventually moved to
# __TBD test_EditingContext.py.
ec=EditingContext()
qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
dard=ec.objectsWithFetchSpecification(fetchSpec)[0]
# Delete object
ec.deleteObject(dard)
dard_books=dard.getBooks()
ldb=len(dard_books) # trigger
dard_books=dard.getBooks()
self.failIf(dard_books[0] in ec.deletedObjects())
ec.processRecentChanges()
self.failUnless(len(ec.deletedObjects())==ldb+1)
# Check that the author was also deleted [cascade]
self.failUnless(dard in ec.deletedObjects())
# AND that neither the author nor the book are marked as updated
# [Bug #599602]
self.failIf(dard_books[0] in ec.updatedObjects())
self.failIf(dard in ec.updatedObjects())
for book in dard_books:
self.failUnless(book in ec.deletedObjects())
def test_06_deleteRule_nullify(self):
"[EditingContext] processRecentChanges: deleteRule: NULLIFY"
# tests for NULLIFY
# __TBD we do not need to perform an actual fetch from the DB:
# __TBD this is just a shortcut here, and it should be eventually moved to
# __TBD test_EditingContext.py.
ec=EditingContext()
qualifier=qualifierWithQualifierFormat('author.lastName caseInsensitiveLike "dard"')
fetchSpec=FetchSpecification(entityName='Book', qualifier=qualifier)
dard_book1=ec.objectsWithFetchSpecification(fetchSpec)[0]
dard=dard_book1.getAuthor()
# We need to trigger the fault here: if not, it will be triggered AFTER
# deleteObject(), and the triggerring will in turn filters the
# deletedObject from the returned set of objectsWithFetchSpecification(),
# hence, the result will be 2, instead of 3 as expected for this
# test
# !!! NOT TRUE ANYMORE, see notes below !!!
#len(dard.getBooks())
# Second note: this is not true anymore since DBContext's objsW/FetchSpec.
# no longer filters its content depending on the state of the EC.
# (changes made for release 0.9)
# Implementation note:
# AccessArrayFaultHandler.completeInitializationOfObject() calls
# DBContext.objectsForSourceGlobalID() which in turn calls
# DBContext.objectsWithFetchSpecification(). It seems reasonable, since
# the deleted objects are nont deleted yet in the EC. However, we need
# additional tests before removing these comments, to be sure that the
# pendingDeletedObjects and deletedObjects in EC are not treated the same
# way (put differently: unprocessed deleted objects() should appear in a
# toMany fault after the fault is cleared, but should NOT be there if
# processRecentChanges() already processed the deleted object).
ec.deleteObject(dard_book1)
self.failUnless(len(dard.getBooks())==3)
ec.processRecentChanges()
# Now the dard.getBooks() should have been refreshed and the book
# that was deleted should be removed from dard's books.
self.failIf(len(dard.getBooks())!=2)
self.failIf(dard_book1 in dard.getBooks())
def test_07_savechanges_01(self):
"[EditingContext] saveChanges part 01: insert + validation"
ec=EditingContext()
hugo=Writer_test07()
ec.insertObject(hugo)
hugo.setLastName(None)
hugo.setFirstName('Victor')
self.assertRaises(ValidationException, ec.saveChanges)
hugo.setLastName('')
self.assertRaises(ValidationException, ec.saveChanges)
hugo.setLastName('Hugo')
hugo_temp_gID=ec.globalIDForObject(hugo)
self.failUnless(hugo_temp_gID.isTemporary())
try:
ec.saveChanges()
except:
raise #self.fail("saveChanges() shouldn't fail here")
## No more inserted, updated or deleted objects
self.failIf(ec.insertedObjects())
self.failIf(ec.updatedObjects())
self.failIf(ec.deletedObjects())
## Check that the globalID has been changed
hugo_gID=ec.globalIDForObject(hugo)
self.failIf(hugo_gID.isTemporary())
self.failIf(ec.objectForGlobalID(hugo_temp_gID))
## since we're here, check CustomObject.globalID
self.assertEqual(hugo_gID, hugo.globalID())
## check that DB snapshots are updated as expected
db=ec.parentObjectStore().objectStoreForGlobalID(hugo_gID).database()
self.failUnless(db.snapshotForGlobalID(hugo_gID))
def test_08_savechanges_02(self):
"[EditingContext] saveChanges part 02: delete + validation"
# create it: this implies that test_07 did pass, or this will fail as well
ec=EditingContext()
hugo=Writer_test07()
ec.insertObject(hugo)
hugo.setFirstName('Victor'); hugo.setLastName('Hugo')
try:
ec.saveChanges()
except:
self.fail("insert failed: test aborted")
#
ec=EditingContext()
qualifier=qualifierWithQualifierFormat('lastName=="Hugo"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
hugo=ec.objectsWithFetchSpecification(fetchSpec)[0]
## 1st test: make sure a delete cannot be made persistent if an object
## has objects bound to a key corresponding to a relationship whose
## deleteRule() is DELETE_DENY
# make sure that the relationship's rule is DELETE_DENY
from Modeling.Relationship import DELETE_DENY
writer=ModelSet.defaultModelSet().entityNamed('Writer')
writer_books=writer.relationshipNamed('books')
writer_books.setDeleteRule(DELETE_DENY)
b=Book()
hugo.__class__=Writer_test07 #so that we can easily fake custom validation
hugo.addToBooks(b)
ec.deleteObject(hugo)
try:
ec.saveChanges()
except ValidationException, exc:
self.failUnless(exc.errorsDict().has_key('books'))
self.failUnless(DELETE_DENY_KEY in exc.errorsDict()['books'])
else:
self.fail('deleteObject() should have raised')
## 2nd test: check that custom validation works as expected
hugo.removeFromBooks(b)
hugo.refusesValidateForDelete=1
try:
ec.saveChanges()
except ValidationException, exc:
errorsDict=exc.errorsDict()
self.failUnless(errorsDict.has_key(OBJECT_WIDE_KEY))
self.failUnless(CUSTOM_OBJECT_VALIDATION in errorsDict[OBJECT_WIDE_KEY])
else:
self.fail('deleteObject() should have raised')
## 3rd test: check that 'objectsWithFetchSpecification()' does not
## return objects marked for deletion
fetchSpec=FetchSpecification(entityName='Writer')
writers=ec.objectsWithFetchSpecification(fetchSpec)
self.failIf(hugo in writers)
## 4rd test: now deleting it for real should be OK
hugo.refusesValidateForDelete=0
try:
ec.saveChanges()
except:
self.fail("Error: deletion failed")
## 5th test: check that the object has been removed from the uniquing table
self.failIf(hugo in ec.registeredObjects())
def test_09_savechanges_03(self):
"[EditingContext] saveChanges part 03: change + validation"
# create it: this implies that test_07 did pass, or this will fail as well
ec=EditingContext()
hugo=Writer_test07()
ec.insertObject(hugo)
hugo.setFirstName('Victor'); hugo.setLastName('Hugo')
hugo.setAge(46)
ec.saveChanges()
# TBD: invalidate to refault everything!
ec=EditingContext()
qualifier=qualifierWithQualifierFormat('lastName=="Hugo"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
hugo=ec.objectsWithFetchSpecification(fetchSpec)[0]
hugo.setLastName(None)
self.assertRaises(ValidationException, ec.saveChanges)
hugo.setLastName('Hugo') # Nothing changed
try:
ec.saveChanges()
except:
raise
self.fail("saveChanges() should not fail here")
hugo.setLastName('hugo')
try:
ec.saveChanges()
except:
self.fail("saveChanges() should not fail here")
def test_10_saveChanges_04(self):
"[EditingContext] saveChanges part 04"
# create it: this implies that test_07 did pass, or this will fail as well
ec=EditingContext()
hugo=Writer_test07()
ec.insertObject(hugo)
hugo.setFirstName('Victor'); hugo.setLastName('hugo')
hugo.setAge(46)
#b1=Book()
#b1.setTitle("Les Misrables")
#ec.insertObject(b1)
#b1.addObjectToBothSidesOfRelationshipWithKey(hugo, 'author')
ec.saveChanges()
ec.dispose()
# TBD: invalidate to refault everything!
ec=EditingContext()
qualifier=qualifierWithQualifierFormat('lastName caseInsensitiveLike "Hugo"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
hugo=ec.objectsWithFetchSpecification(fetchSpec)[0]
qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
dard=ec.objectsWithFetchSpecification(fetchSpec)[0]
b1=Book()
b1.setTitle("Les Misrables")
b2=Book()
b2.setTitle("Les Misrables: le retour")
b2.setPrice(4.7)
b3=Book()
b3.setTitle("Les Misrables: la vengeance")
ec.insertObject(b1) ; ec.insertObject(b2) ; ec.insertObject(b3)
b1.addObjectToBothSidesOfRelationshipWithKey(hugo, 'author')
b2.addObjectToBothSidesOfRelationshipWithKey(hugo, 'author')
hugo.addObjectToBothSidesOfRelationshipWithKey(b3, 'books')
hugo.setLastName('Hugo')
b4=Book()
b4.setTitle("Bloub bloub")
dard.setAge(82)
from mx.DateTime import DateFrom
hugo.setBirthday(DateFrom('1802-02-26 14:28'))
ec.insertObject(b4)
dard.addToBooks(b4) ; b4.setAuthor(dard)
ec.saveChanges()
# Last, check that the Database has no snapshots left when the
# EditingContext is finalized
db=ec.rootObjectStore().objectStoreForObject(b4).database()
#ec.dispose() # normally useless
del ec
self.failIf(db.snapshots())
def test_11_allQualifierOperators(self):
"[EditingContext] tests all qualifier operators"
ec=EditingContext()
qWQF=qualifierWithQualifierFormat
# LIKE
qualifier=qWQF('lastName like "Dard"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(objects[0].getLastName()=='Dard')
qualifier=qWQF('lastName like "Da?d"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(objects[0].getLastName()=='Dard')
qualifier=qWQF('lastName like "Da*"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(objects[0].getLastName()=='Dard')
qualifier=qWQF('lastName like "?a*"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless('Dard' in objects_names)
self.failUnless('Rabelais' in objects_names)
#CASE_INSENSITIVE_LIKE
qualifier=qWQF('lastName caseInsensitiveLike "Dard"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(objects[0].getLastName()=='Dard')
qualifier=qWQF('lastName caseInsensitiveLike "dard"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(objects[0].getLastName()=='Dard')
qualifier=qWQF('lastName caseInsensitiveLike "?A*"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless('Dard' in objects_names)
self.failUnless('Rabelais' in objects_names)
# EQUAL
qualifier=qWQF('age==508')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==1)
self.failUnless('Rabelais' in objects_names)
# DIFFERENT
qualifier=qWQF('age != 508')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==2)
self.failUnless('Dard' in objects_names)
self.failUnless('Cleese' in objects_names)
# GREATER THAN
qualifier=qWQF('age>90')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==1)
self.failUnless('Rabelais' in objects_names)
# GREATER THAN, OR EQUAL
qualifier=qWQF('age>=50')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==2)
self.failUnless('Rabelais' in objects_names)
self.failUnless('Dard' in objects_names)
# LESSER THAN
qualifier=qWQF('age<90')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==2)
self.failUnless('Dard' in objects_names)
self.failUnless('Cleese' in objects_names)
# LESSER THAN, OR EQUAL
qualifier=qWQF('age<=24')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==1)
self.failUnless('Cleese' in objects_names)
# AND
qualifier=qWQF('age>=24 AND lastName like "???e*"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==2)
self.failUnless('Cleese' in objects_names)
self.failUnless('Rabelais' in objects_names)
# OR
qualifier=qWQF('age<50 OR lastName like "????"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==2)
self.failUnless('Cleese' in objects_names)
self.failUnless('Dard' in objects_names)
qualifier=qWQF('age<50 OR age>200')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==2)
self.failUnless('Cleese' in objects_names)
self.failUnless('Rabelais' in objects_names)
# NOT
qualifier=qWQF('age>200 OR NOT age<50')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==2)
self.failUnless('Dard' in objects_names)
self.failUnless('Rabelais' in objects_names)
qualifier=qWQF('NOT(age<50 OR age>200)')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==1)
self.failUnless('Dard' in objects_names)
# Some others, mixed
qualifier=qWQF('NOT age<50 OR age>200') #!!! equiv. to NOT(...)
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==1)
self.failUnless('Dard' in objects_names)
qualifier=qWQF('(age<100 OR age>200) AND books.title like "G*"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==1)
self.failUnless('Rabelais' in objects_names)
## This one was added after a fix was proposed for bug #614261
## to make sure that a complex qualifier is okay as far as MySQL
## is concerned
qualifier=qWQF('(age<100) AND pygmalion.books.title like "G*"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==1)
self.failUnless('Dard' in objects_names)
def test_12_fetchingDoesNotUpdateSnapshot(self):
"[EditingContext] Fetching should not update the Database's snapshot"
ec=EditingContext()
hugo=Writer_test07()
ec.insertObject(hugo)
hugo.setFirstName('Victor'); hugo.setLastName('Hugo')
hugo.setAge(46)
ec.saveChanges()
del ec
ec1=EditingContext()
qualifier=qualifierWithQualifierFormat('lastName=="Hugo"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec1.objectsWithFetchSpecification(fetchSpec)
self.failUnless(len(objects)==1)
hugo1=objects[0]
gid1=ec1.globalIDForObject(hugo1)
database1=ec1.rootObjectStore().objectStoreForObject(hugo1).database()
timestamp1=database1.timestampForGlobalID(gid1)
snapshot1=database1.snapshotForGlobalID(gid1)
#
# Change in the meantime
#
model=ModelSet.defaultModelSet().modelNamed('AuthorBooks')
pgAdaptor=Adaptor.adaptorWithModel(model)
context=pgAdaptor.createAdaptorContext()
channel=context.createAdaptorChannel()
sqlExpr=pgAdaptor.expressionClass()()
sqlExpr.setStatement("update WRITER set LAST_NAME='hugoo' where id=%i"\
%gid1.keyValues().values()[0])
channel.evaluateExpression(sqlExpr)
#
ec2=EditingContext()
qualifier=qualifierWithQualifierFormat('firstName=="Victor"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec2.objectsWithFetchSpecification(fetchSpec)
self.failUnless(len(objects)==1)
hugo2=objects[0]
gid2=ec2.globalIDForObject(hugo2)
database2=ec2.rootObjectStore().objectStoreForObject(hugo2).database()
timestamp2=database2.timestampForGlobalID(gid2)
snapshot2=database2.snapshotForGlobalID(gid2)
#
# Since we're here, we can check that we got the same Database object
# as well
self.failIf(database1!=database2, 'Database objects should be the same')
# Now check that the corresponding snapshot has NOT been updated, nor its
# timestamp
self.failIf(timestamp1!=timestamp2, 'timestamp shouldnt have been changed')
self.failIf(snapshot1!=snapshot2, 'snapshot shouldnt have been changed')
def test_13_objectsCountWithFetchSpecification(self):
"[EditingContext] objectsCountWithFetchSpecification"
fs=FetchSpecification(entityName='Writer')
ec=EditingContext()
nb=ec.objectsCountWithFetchSpecification(fs)
self.failIf(nb!=3)
def test_13b_objectsCountWithFetchSpecification_closes_channel(self):
"[EditingContext] objectsCountWithFetchSpecification closes channel"
fs=FetchSpecification(entityName='Writer')
ec=EditingContext()
nb=ec.objectsCountWithFetchSpecification(fs)
self.failIf(nb!=3)
objStore=ec.rootObjectStore().objectStoreForFetchSpecification(fs)
adaptorChannel=objStore.availableChannel().adaptorChannel()
self.failIf(adaptorChannel.isOpen())
def test_13c_fetchCount(self):
"[EditingContext] fetchCount == objectsCountWithFetchSpecification"
q=qualifierWithQualifierFormat('lastName like "?a*"')
fs=FetchSpecification(entityName='Writer', qualifier=q)
ec=EditingContext()
self.assertEqual(ec.objectsCountWithFetchSpecification(fs),
ec.fetchCount('Writer', 'lastName like "?a*"'))
#print ec.fetchCount('Writer', 'lastName like "?a*"')
def test_14_qualifierWithNullValue(self):
"[EditingContext] objectsWithFetchSpec. & NULL value"
## Note: this used to fail w/ MySQL, cf bug #614261
q=qualifierWithQualifierFormat('author.pygmalion.books.price == NULL')
fs=FetchSpecification('Book', qualifier=q)
ec=EditingContext()
nb_objs=ec.objectsCountWithFetchSpecification(fs)
self.assertEqual(nb_objs, 3) # only 3: author 'Dard' is the only
# one with a pygmalion
def test_15_propagatesInsertionForRelatedObjects_01(self):
"[EditingContext] propagatesInsertionForRelatedObjects (updated objects)"
ec=EditingContext()
qualifier=qualifierWithQualifierFormat('lastName=="Cleese"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
cleese=ec.objectsWithFetchSpecification(fetchSpec)[0]
pygmalion_cleese=Writer()
book_cleese=Book()
cleese.setPygmalion(pygmalion_cleese)
cleese.addToBooks(book_cleese)
ec.processRecentChanges()
self.failIf(book_cleese in ec.insertedObjects())
self.failIf(pygmalion_cleese in ec.insertedObjects())
cleese.willChange()
ec.setPropagatesInsertionForRelatedObjects(1)
ec.processRecentChanges()
self.failUnless(book_cleese in ec.insertedObjects())
self.failUnless(pygmalion_cleese in ec.insertedObjects())
# Last: check that subsequent changes are notified after
# ec.processRecentChanges()
ec=EditingContext()
ec.setPropagatesInsertionForRelatedObjects(1)
qualifier=qualifierWithQualifierFormat('lastName=="Cleese"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
cleese=ec.objectsWithFetchSpecification(fetchSpec)[0]
cleese.setLastName('Cleese')
ec.processRecentChanges()
pygmalion_cleese=Writer()
book_cleese=Book()
cleese.setPygmalion(pygmalion_cleese)
cleese.addToBooks(book_cleese)
ec.processRecentChanges()
self.failUnless(book_cleese in ec.insertedObjects())
self.failUnless(pygmalion_cleese in ec.insertedObjects())
def test_15_propagatesInsertionForRelatedObjects_02(self):
"[EditingContext] propagatesInsertionForRelatedObjects (inserted objects)"
ec=EditingContext()
ec.setPropagatesInsertionForRelatedObjects(1)
hugo=Writer()
hugo.setFirstName('Victor'); hugo.setLastName('hugo')
hugo.setAge(46)
ec.insertObject(hugo)
b1=Book() ; b1.setTitle("Les Misrables")
hugo.addObjectToBothSidesOfRelationshipWithKey(b1, 'books')
ec.processRecentChanges()
self.failUnless(b1 in ec.insertedObjects())
#Check that subsequent changes are notified after ec.processRecentChanges()
b2=Book() ; b2.setTitle("Les travailleurs de la mer")
hugo.addObjectToBothSidesOfRelationshipWithKey(b2, 'books')
ec.processRecentChanges()
self.failUnless(b2 in ec.insertedObjects())
def test_16_toManySnapshotsUpdated(self):
"[EditingContext] checks that toManySnapshots are correctly updated"
# Description of the bug tested here:
# cf. comment in DatabaseContext.performChanges()
ec=EditingContext(); ec.setPropagatesInsertionForRelatedObjects(1)
# Create a new object, with rels.
hugo=Writer()
hugo.setFirstName('Victor'); hugo.setLastName('Hugo'); hugo.setAge(46)
ec.insertObject(hugo)
b1=Book() ; b1.setTitle("Les Misrables")
b2=Book() ; b2.setTitle("Les travailleurs de la mer")
hugo.addObjectToBothSidesOfRelationshipWithKey(b1, 'books')
hugo.addObjectToBothSidesOfRelationshipWithKey(b2, 'books')
ec.saveChanges()
# Now delete one of these, on a new EditingContext, WITHOUT deleting the
# former EC so that snapshots are not forgotten
ec2=EditingContext()
qualifier=qualifierWithQualifierFormat('lastName=="Hugo"')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
hugo2=ec2.objectsWithFetchSpecification(fetchSpec)[0]
book=hugo2.getBooks()[0] # any of the two
hugo2.removeObjectFromBothSidesOfRelationshipWithKey(book, 'books')
ec2.deleteObject(book)
ec2.saveChanges()
# Check that the database snapshots were updated, as expected
remaining_book=hugo2.getBooks()[0]
db=ec2.rootObjectStore().objectStoreForObject(hugo2).database()
hugo2_gID=ec2.globalIDForObject(hugo2)
toManySnap=db.snapshotForSourceGlobalID(hugo2_gID, 'books')
#print toManySnap
self.failIf(len(toManySnap)!=1)
def test_17_insertedObject_and_PK_as_classProperty(self):
"[EC] Checks that a PK/class prop. gets its value after saving changes"
ec=EditingContext()
b=Book()
b.setTitle('dummy title')
ec.insertObject(b)
ec.saveChanges()
gid=ec.globalIDForObject(b)
self.failUnless(b.getId() == gid.keyValues()['id'])
def test_17b_insertedObject_and_FK_as_classProperty(self):
"[EC] Checks that a FK/class prop. gets its value after saving changes"
ec=EditingContext()
w=Writer()
w.setLastName('test author')
ec.insertObject(w)
ec.saveChanges()
b=Book()
ec.insertObject(b)
b.setTitle('dummy title')
b.setAuthor(w)
w.addToBooks(b)
self.failIf(b.getFK_Writer_Id())
ec.saveChanges()
gid=ec.globalIDForObject(w)
# a FK defined as a class property is NOT updated by the framework
# see the User's Guide, section B.1
# so we get None instead of gid.keyValues()['id']
self.assertEqual(b.getFK_Writer_Id(), None)
def test_18_percent_in_qualifiers_should_be_correctly_escaped(self):
"[EditingContext] percent in qualifiers should be correctly escaped"
q=qualifierWithQualifierFormat('title like "G%"')
fs=FetchSpecification('Book', qualifier=q)
ec=EditingContext()
objs=ec.objectsWithFetchSpecification(fs)
self.failIf(objs)
def test_19_in_not_in_operators_in_qualifiers(self):
"[EditingContext] IN/NOT IN operators in qualifiers"
ec=EditingContext()
objects=ec.fetch('Writer', 'age in [24, 82, 81]')
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==2)
self.failUnless('Cleese' in objects_names)
self.failUnless('Dard' in objects_names)
objects=ec.fetch('Writer', 'age not in [24, 82, 81]')
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==1)
self.failUnless('Rabelais' in objects_names)
objects=ec.fetch('Writer', 'age in [24]')
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==1)
self.failUnless('Cleese' in objects_names)
def test_19b_in_not_in_operators_in_qualifiers_with_long(self):
"[EditingContext] IN/NOT IN operators in qualifiers with longs"
# Bug #779775
from Modeling.Qualifier import KeyValueQualifier,QualifierOperatorIn
ec=EditingContext()
q=KeyValueQualifier('age', QualifierOperatorIn, [24L, 81L, 82L])
objects=ec.fetch('Writer', q)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==2)
self.failUnless('Cleese' in objects_names)
self.failUnless('Dard' in objects_names)
def test_19c_in_not_in_operators_in_qualifiers_with_strings(self):
"[EditingContext] IN/NOT IN operators in qualifiers with strings & date"
# Bug #847212: strings not handled properly
from Modeling.Qualifier import KeyValueQualifier,QualifierOperatorIn
ec=EditingContext()
q=KeyValueQualifier('lastName', QualifierOperatorIn, ['Dard', 'Cleese'])
objects=ec.fetch('Writer', q)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==2)
self.failUnless('Cleese' in objects_names)
self.failUnless('Dard' in objects_names)
# While we're at it, test date
q=KeyValueQualifier('author.birthday',
QualifierOperatorIn, ['1484-07-02 18:16:12'])
objects=ec.fetch('Book', q)
objects_names=[b.getTitle() for b in objects]
self.failUnless(len(objects)==1)
self.failUnless('Gargantua' in objects_names)
# prepare last test w/ floats
g=objects[0]
g.setPrice(12.7)
ec.saveChanges()
try:
# Test float
ec2=EditingContext()
q=KeyValueQualifier('books.price', QualifierOperatorIn, [12.7, 12.2])
objects=ec2.fetch('Writer', q)
objects_names=[o.getLastName() for o in objects]
#print objects_names
self.failUnless(len(objects)==1)
self.failUnless('Rabelais' in objects_names)
finally:
g.setPrice(None)
ec.saveChanges()
def test_20_snapshot(self):
"[EditingContext] CustomObject.snapshot()"
ec=EditingContext()
## single object, no object related
w=Writer()
w.setLastName('Test'); w.setFirstName('20')
ec.insertObject(w)
ws=w.snapshot()
def check_single_object():
self.assertEqual(ws['lastName'], 'Test')
self.assertEqual(ws['firstName'], '20')
self.assertEqual(ws['birthday'], None)
self.assertEqual(ws['pygmalion'], None)
self.assertEqual(ws['books'], [])
check_single_object()
ec.saveChanges()
check_single_object()
## all objects saved, no faults here
b1=Book(); b1.setTitle('b1')
b2=Book(); b2.setTitle('b2')
ec.insert(b1)
ec.insert(b2)
w.addToBooks(b1) ; b1.setAuthor(w)
w.addToBooks(b2) ; b2.setAuthor(w)
p=Writer(); p.setLastName('test_20 pygmalion')
ec.insert(p)
w.setPygmalion(p)
ws=w.snapshot()
self.assertEqual(ws['lastName'], 'Test')
self.assertEqual(ws['firstName'], '20')
self.assertEqual(ws['pygmalion'], p.globalID())
self.assertEqual(len(ws['books']), 2)
self.failUnless(b1.globalID() in ws['books'])
self.failUnless(b2.globalID() in ws['books'])
ec.saveChanges()
p_globalID=p.globalID()
del ec
## Now both relationships 'pygmalion' and 'books' are faults
ec=EditingContext()
w=ec.fetch('Writer', 'lastName=="Test" and firstName=="20"')[0]
ws=w.snapshot()
self.assertEqual(ws['lastName'], 'Test')
self.assertEqual(ws['firstName'], '20')
# for a faulted to-one rel, we get the corresponding globalID
self.assertEqual(ws['pygmalion'], p_globalID)
self.failUnless(w.getPygmalion().isFault())
# for a faulted to-many rel, we get a CustomObject.Snapshot_ToManyFault
self.failUnless(w.getBooks().isFault())
from Modeling.FaultHandler import AccessArrayFaultHandler
from Modeling.CustomObject import Snapshot_ToManyFault
self.assertEqual(ws['books'].__class__, Snapshot_ToManyFault)
# last, we check that the to-many fault is accessible, if we want to:
self.assertEqual(ws['books'].sourceGlobalID, w.globalID())
self.assertEqual(ws['books'].key, 'books')
tomany_fault=ws['books'].getToManyFault(ec)
from Modeling.FaultHandler import AccessArrayFaultHandler
self.failUnless(isinstance(tomany_fault, AccessArrayFaultHandler))
self.failUnless(tomany_fault.isFault())
# check that we got the same set of objects
adaptorChannel=concreteAdaptorChannelForEntity('Writer', ec)
fetchCount=adaptorChannel._count_for_execute
self.assertEqual(len(tomany_fault), 2)
self.assertEqual(adaptorChannel._count_for_execute,
fetchCount+1) # fault was cleared
for book in w.getBooks():
self.failIf(book not in tomany_fault)
def test_21_snapshot_raw(self):
"[EditingContext] CustomObject.snapshot_raw()"
ec=EditingContext()
# saved objects, related to other saved objects
raw_fdard=ec.fetch('Writer', 'age in [81, 82]', rawRows=1)[0]
fdard=ec.fetch('Writer', 'age in [81, 82]')[0]
rabelais=ec.fetch('Writer', 'lastName == "Rabelais"')[0]
fdard_snapshot_raw=fdard.snapshot_raw()
self.assertEqual(raw_fdard, fdard_snapshot_raw)
self.assertEqual(fdard_snapshot_raw['FK_Writer_id'],
rabelais.globalID().keyValues()['id'])
# new object, related to saved objects
w=Writer(); w.setFirstName('F'), w.setLastName('L')
ec.insert(w)
w.setPygmalion(fdard)
w_sr=w.snapshot_raw()
self.assertEqual(w_sr.keys(), fdard_snapshot_raw.keys())
self.assertEqual(w_sr['FK_Writer_id'], fdard.globalID().keyValues()['id'])
self.assertEqual(w_sr['id'], w.globalID()) # we get a TemporaryGlobalID
self.failUnless(w_sr['id'].isTemporary())
w.setPygmalion(None)
ec.delete(w)
# saved object, related to a new object
w=Writer(); w.setFirstName('F'), w.setLastName('L')
ec.insert(w)
rabelais=fdard.getPygmalion()
fdard.setPygmalion(w)
self.assertEqual(fdard.snapshot_raw().keys(), fdard_snapshot_raw.keys())
self.assertEqual(fdard.snapshot_raw()['FK_Writer_id'], w.globalID())
fdard.setPygmalion(rabelais)
ec.delete(w)
# new object, related to new objects
w1=Writer(); w1.setFirstName('F'), w1.setLastName('L')
w2=Writer(); w2.setFirstName('F'), w2.setLastName('L')
ec.insert(w1)
ec.insert(w2)
w1.setPygmalion(w2)
self.assertEqual(w1.snapshot_raw().keys(), fdard_snapshot_raw.keys())
self.assertEqual(w1.snapshot_raw()['id'], w1.globalID())
self.assertEqual(w1.snapshot_raw()['FK_Writer_id'], w2.globalID())
ec.delete(w1); ec.delete(w2)
# modified object
fdard_name=fdard.getLastName()
alternate_fdard_name=fdard_name+' -- testing'
fdard.setLastName(alternate_fdard_name)
self.assertEqual(fdard.snapshot_raw()['lastName'], alternate_fdard_name)
# Faults
fdard_gid=fdard.globalID()
ec=EditingContext()
fdard=ec.faultForGlobalID(fdard_gid, ec)
self.failUnless(fdard.isFault())
self.assertEqual(fdard.snapshot_raw()['lastName'], fdard_name)
def test_22_cancel_delete(self):
"[EditingContext] cancel a delete on an object"
## check: delete / insert
ec=EditingContext()
rabelais=ec.fetch('Writer', 'lastName == "Rabelais"')[0]
ec.deleteObject(rabelais)
self.failUnless(rabelais in ec.allDeletedObjects())
self.failIf(rabelais in ec.deletedObjects()) # unprocessed
ec.insertObject(rabelais) # cancel the deletion
self.failIf(len(ec.fetch('Writer', 'lastName == "Rabelais"'))!=1)
self.failIf(rabelais in ec.allDeletedObjects())
self.failIf(rabelais in ec.allInsertedObjects())
## check: delete / processRecentChanges / insert
ec=EditingContext()
rabelais=ec.fetch('Writer', 'lastName == "Rabelais"')[0]
ec.deleteObject(rabelais)
self.failUnless(rabelais in ec.allDeletedObjects())
ec.processRecentChanges()
self.failUnless(rabelais in ec.deletedObjects())
ec.insertObject(rabelais) # cancel the deletion
self.failIf(rabelais in ec.allDeletedObjects())
self.failIf(rabelais in ec.allInsertedObjects())
## check: delete / saveChanges / insert
ec=EditingContext()
new=Writer(); new.setLastName('test_22')
ec.insert(new)
ec.saveChanges()
ec=EditingContext()
new=ec.fetch('Writer', 'lastName == "test_22"')[0]
new_gid=new.globalID()
ec.deleteObject(new)
self.failUnless(new in ec.allDeletedObjects())
self.failIf(new in ec.deletedObjects())
ec.saveChanges()
ec.insertObject(new) # cancel the deletion? No, insertion of a new object
self.failIf(new in ec.allDeletedObjects())
self.failUnless(new in ec.allInsertedObjects())
self.failIf(new.globalID()==new_gid)
self.failIf(not new.globalID().isTemporary())
## check: modify / delete / processRecentChanges / insert -> in modify?
ec=EditingContext()
new=Writer(); new.setLastName('test_22')
ec.insert(new)
ec.saveChanges()
ec=EditingContext()
new=ec.fetch('Writer', 'lastName == "test_22"')[0]
new_gid=new.globalID()
new.setLastName('test_22 alternate value')
self.failUnless(new in ec.allUpdatedObjects())
ec.deleteObject(new)
self.failUnless(new in ec.allDeletedObjects())
self.failIf(new in ec.deletedObjects())
self.failIf(new in ec.allUpdatedObjects())
ec.processRecentChanges()
self.failUnless(new in ec.deletedObjects())
self.failIf(new in ec.allUpdatedObjects())
ec.insertObject(new) # cancel the deletion
self.failIf(new in ec.allDeletedObjects())
self.failUnless(new in ec.allUpdatedObjects())
self.failUnless(new.globalID()==new_gid)
## check: modify / delete / saveChanges / insert
def test_23_fetch_star_and_interrogation_mark_chars(self):
"[EditingContext] fetch real '?' and '*' w/ LIKE"
ec=EditingContext()
b1=Book(); b1.setTitle('abc?d')
b2=Book(); b2.setTitle('abcXd')
b3=Book(); b3.setTitle('abc*d')
ec.insert(b1); ec.insert(b2); ec.insert(b3)
ec.saveChanges()
res=ec.fetch('Book', 'title like "abc?d"')
self.assertEqual(len(res), 3)
res=ec.fetch('Book', 'title like "abc*d"')
self.assertEqual(len(res), 3)
res=ec.fetch('Book', 'title like "abc\?d"')
self.assertEqual(len(res), 1)
self.assertEqual(res[0].getTitle(), 'abc?d')
res=ec.fetch('Book', 'title like "abc\*d"')
self.assertEqual(len(res), 1)
self.assertEqual(res[0].getTitle(), 'abc*d')
def test_23b_fetch_star_and_interrogation_mark_chars(self):
"[EditingContext] fetch real '?' and '*' w/ ILIKE"
ec=EditingContext()
b1=Book(); b1.setTitle('abc?d')
b2=Book(); b2.setTitle('aBC?d')
b3=Book(); b3.setTitle('abcXd')
b4=Book(); b4.setTitle('abc*d')
b5=Book(); b5.setTitle('abc*D')
ec.insert(b1); ec.insert(b2); ec.insert(b3); ec.insert(b4); ec.insert(b5)
ec.saveChanges()
res=ec.fetch('Book', 'title ilike "abc?d"')
self.assertEqual(len(res), 5)
res=ec.fetch('Book', 'title ilike "abc*d"')
self.assertEqual(len(res), 5)
res=ec.fetch('Book', 'title ilike "abc\?d"')
self.assertEqual(len(res), 2)
titles=[r.getTitle() for r in res]
self.failIf('abc?d' not in titles)
self.failIf('aBC?d' not in titles)
res=ec.fetch('Book', 'title ilike "abc\*d"')
self.assertEqual(len(res), 2)
titles=[r.getTitle() for r in res]
self.failIf('abc*d' not in titles)
self.failIf('abc*D' not in titles)
def test_24_fetch_does_not_return_duplicates(self):
"[EditingContext] fetch does not return duplicates"
# bug #780495
ec=EditingContext()
ws=ec.fetch('Writer', 'books.title like "*"')
# explicitely look for duplicates
for w in ws:
_ws=list(ws) #[:-1])
_ws.remove(w)
self.failIf(w in _ws, 'Duplicate found: %s'%w)
def test_25_fetchCount_is_not_affected_by_duplicates(self):
"[EditingContext] fetchCount is not affected by duplicates"
# bug #780495
ec=EditingContext()
ws=ec.fetchCount('Writer', 'books.title like "*"')
self.assertEqual(ws, 2)
def test_26_fetch_underscore_percent(self):
"[EditingContext] fetch real '_' and '%'"
# bug #785913
ec=EditingContext()
b1=Book(); b1.setTitle('abc_d')
b2=Book(); b2.setTitle('abcXd')
b3=Book(); b3.setTitle('abc%d')
ec.insert(b1); ec.insert(b2); ec.insert(b3)
ec.saveChanges()
res=ec.fetch('Book', 'title like "*abc_d*"')
self.assertEqual(len(res), 1)
self.assertEqual(res[0].getTitle(), "abc_d")
res=ec.fetch('Book', 'title like "*abc%d*"')
self.assertEqual(len(res), 1)
self.assertEqual(res[0].getTitle(), "abc%d")
# TBD: with ILIKE
def test_27_like_is_case_sensitive(self):
"[EditingContext] check that LIKE is case-sensitive"
# bug #786217
ec=EditingContext()
b1=Book(); b1.setTitle('abcd')
b2=Book(); b2.setTitle('aBcd')
b3=Book(); b3.setTitle('aBCd')
ec.insert(b1); ec.insert(b2); ec.insert(b3)
ec.saveChanges()
res=ec.fetch('Book', 'title like "abcd"')
self.assertEqual(len(res), 1)
self.assertEqual(res[0].getTitle(), "abcd")
res=ec.fetch('Book', 'title like "aBCd"')
self.assertEqual(len(res), 1)
self.assertEqual(res[0].getTitle(), "aBCd")
def test_28_bug857803(self):
"[EditingContext] Complex query bug #857803"
# bug #857803
#'(age<100) AND ((books.title like "G*") OR (pygmalion.books.title like "G*"))'
ec=EditingContext()
qWQF=qualifierWithQualifierFormat
qualifier=qWQF('(age<100) AND ((books.title like "G*") OR (pygmalion.books.title like "G*"))')
fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
objects=ec.objectsWithFetchSpecification(fetchSpec)
objects_names=[o.getLastName() for o in objects]
self.failUnless(len(objects)==1)
self.failUnless('Dard' in objects_names)
def test_999_customSQLQuery(self):
"[EditingContext] custom SQL Query"
fs=FetchSpecification(entityName='Writer')
#dbContext=EditingContext().rootObjectStore().objectStoreForFetchSpecification(fs)
ec=EditingContext()
#ec.insertObject(Writer())
model=ModelSet.defaultModelSet().modelNamed('AuthorBooks')
from Modeling import DatabaseContext
dbContext=DatabaseContext.registeredDatabaseContextForModel(model, ec)
adaptorChannel=dbContext.availableChannel().adaptorChannel()
sqlExpr=adaptorChannel.adaptorContext().adaptor().expressionClass()()
sqlExpr.setStatement('select max(age) from WRITER')
adaptorChannel.setAttributesToFetch([model.entityNamed('Writer').attributeNamed('age')])
## Note: added begin/endTransaction()
## The reason for this is: module pypgsql discards a connection's cursor
## status when the cnx is committed or rolled back ; this makes this test
## fail when using pypgsql, since evaluateExpression() automatically
## opens and closes transaction if no transaction is opened when it is
## called
adaptorChannel.adaptorContext().beginTransaction()
if database_cfg=='SQLite.cfg':
sqlExpr2=adaptorChannel.adaptorContext().adaptor().expressionClass()()
sqlExpr2.setStatement('-- types int')
adaptorChannel.evaluateExpression(sqlExpr2)
adaptorChannel.evaluateExpression(sqlExpr)
row=adaptorChannel.fetchRow()
adaptorChannel.adaptorContext().commitTransaction() ## See comments above
self.failUnless(row.get('age', None))
self.failUnless(int(row['age'])==508)
adaptorChannel.cancelFetch()
def tearDown(self):
"""
Cleans up the Database after each tests
"""
ec=EditingContext()
w=Writer()
dbChannel=ec.rootObjectStore().objectStoreForObject(w).availableChannel()
channel=dbChannel.adaptorChannel()
sqlExpr=channel.adaptorContext().adaptor().expressionClass()()
sqlExpr.setStatement("delete from BOOK where id>4")
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("delete from WRITER where id>3")
channel.evaluateExpression(sqlExpr)
def reinitDB(database_cfg):
"""
Reinitializes the database: recreates the db's schema needed for these tests
to run. Also inserts some data in the created db.
"""
from Modeling.SchemaGeneration import SchemaGeneration,\
DropPrimaryKeySupportKey, DropForeignKeyConstraintsKey, \
DropPrimaryKeyConstraintsKey, DropTablesKey, DropDatabaseKey, \
CreateDatabaseKey, CreateTablesKey, PrimaryKeyConstraintsKey, \
ForeignKeyConstraintsKey, CreatePrimaryKeySupportKey
options={
DropPrimaryKeySupportKey : 1,
DropForeignKeyConstraintsKey : 1,
DropPrimaryKeyConstraintsKey : 1,
DropTablesKey : 1,
DropDatabaseKey : 0,
CreateDatabaseKey : 0,
CreateTablesKey : 1,
PrimaryKeyConstraintsKey : 1,
ForeignKeyConstraintsKey : 1,
CreatePrimaryKeySupportKey : 1,
}
model=ModelSet.defaultModelSet().modelNamed('AuthorBooks')
pgAdaptor=Adaptor.adaptorWithModel(model)
schemaGeneration = pgAdaptor.schemaGenerationFactory()
sqlExprs=schemaGeneration.schemaCreationStatementsForEntities(model.entities(), options)
context=pgAdaptor.createAdaptorContext()
channel=context.createAdaptorChannel()
for sqlExpr in sqlExprs:
try: channel.evaluateExpression(sqlExpr)
except Exception, exc:
print 'error: %s'%str(exc)
pks=channel.primaryKeysForNewRowsWithEntity(3, model.entityNamed('Writer'))
pks=[pk.values()[0] for pk in pks]
sqlExpr=pgAdaptor.expressionClass()()
if database_cfg=='Oracle.cfg':
sqlExpr.setStatement("alter session set nls_date_format = 'YYYY-MM-DD HH24:mi:ss'")
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("insert into WRITER "\
"(id, age, last_name,first_name,birthday,fk_writer_id)"\
" values (%i,24,'Cleese','John','1939-10-27 08:31:15',null)"%pks[0])
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("insert into WRITER "\
"(id, age, last_name,first_name,birthday,fk_writer_id)"\
" values (%i,508,'Rabelais','Francois','1484-07-02 18:16:12',null)"%pks[1])
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("insert into WRITER "\
"(id, age, last_name,first_name,birthday,fk_writer_id)"\
" values (%i,81,'Dard','Frederic','1921-06-29 04:56:34', %i)"%(pks[2], pks[1]))
channel.evaluateExpression(sqlExpr)
# Book
bookPKs=channel.primaryKeysForNewRowsWithEntity(4, model.entityNamed('Book'))
bookPKs=[pk.values()[0] for pk in bookPKs]
sqlExpr.setStatement("insert into BOOK "\
"(id, title, fk_writer_id) "\
"values (%i,'Gargantua',%i)"%(bookPKs[0], pks[1]))
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("insert into BOOK "\
"(id, title, fk_writer_id) "\
"values (%i,'Bouge ton pied que je voie la mer',%i)"%\
(bookPKs[1], pks[2]))
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("insert into BOOK "\
"(id, title, fk_writer_id) "\
"values (%i,'Le coup du pere Francois',%i)"%\
(bookPKs[2], pks[2]))
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("insert into BOOK "\
"(id, title, fk_writer_id) "\
"values(%i,'T\'\'assieds pas sur le compte-gouttes',%i)"% (bookPKs[3], pks[2]))
channel.evaluateExpression(sqlExpr)
#Fautil tuer les petits garons qui ont les mains sur les hanches ?
#
def usage(prgName, exitStatus=None):
_usage="""%s [-vV] [-p] [-d <dbAdaptorName>] [-r]
Runs the tests for the Modeling package
Options
--------
-v Minimally verbose
-V Really verbose
-h Prints this message
-p enables profiling
-d sets the database adaptor to use: Postgresql (default), MySQL, Oracle
or SQLite
-r reinitialize the (postgresql) database
the database defined in the model 'model_AuthorBooks'
(located in testPackages/AuthorBooks/) is altered: tables are
dropped, then they are re-created along with constraints for PKs and
FKs and PK support (sequences)
""" % prgName
if exitStatus is not None:
print _usage
sys.exit(exitStatus)
else:
return _usage
verbose=0
database_cfg='Postgresql.cfg'
def main(args):
me=args[0]
import getopt
options, args = getopt.getopt(sys.argv[1:], 'vVprd:')
#except: usage(me, 1)
global verbose, database_cfg
profile=0; reinitDB_flag=0
for k, v in options:
if k=='-h': usage(me, 1)
if k=='-v': verbose=1; continue
if k=='-V': verbose="Y"; continue
if k=='-p': profile=1; continue
if k=='-d':
if v not in ('Postgresql', 'MySQL', 'Oracle', 'SQLite'): usage(me, 1)
database_cfg='%s.cfg'%v
continue
if k=='-r': reinitDB_flag=1; continue
if args: usage(me, 1) #raise 'Unexpected arguments', args
author_books_model=ModelSet.defaultModelSet().modelNamed('AuthorBooks')
# Initialization of model's caracs.
Model.updateModelWithCFG(author_books_model, database_cfg)
# MySQL specifics: change TIMESTAMP to DATETIME
if database_cfg=='MySQL.cfg':
author_books_model.entityNamed('Writer').attributeNamed('birthday').setExternalType('DATETIME')
# Oracle specifics: change TIMESTAMP to DATE
if database_cfg=='Oracle.cfg':
author_books_model.entityNamed('Writer').attributeNamed('birthday').setExternalType('DATE')
from AuthorBooks import Writer
reload(Writer)
# SQLite specifics: test_12 requires that the framework closes the db
# connection, so that the new context/channel can connect to the db (sqlite
# does not allow concurrent access to the db).
if database_cfg=='SQLite.cfg':
import os
os.environ['MDL_TRANSIENT_DB_CONNECTION']='yes'
utils.enable_model_cache_and_compute()
if reinitDB_flag: reinitDB(database_cfg); return
if profile:
import profile
profile.run('utils.run_suite(test_suite(), verbosity=verbose)',
'profile.out')
return
else:
return utils.run_suite(test_suite(), verbosity=verbose)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestEditingContext_Global, "test_"))
return suite
if __name__ == "__main__":
errs = main(sys.argv)
#errs = utils.run_suite(test_suite())
sys.exit(errs and 1 or 0)
|