#! /usr/bin/env python
# -*- coding: iso-8859-1 -*-
#-----------------------------------------------------------------------------
# Modeling Framework: an Object-Relational Bridge for python
#
# Copyright (c) 2001-2004 Sbastien Bigaret <sbigaret@users.sourceforge.net>
# All rights reserved.
#
# This file is part of the Modeling Framework.
#
# This code is distributed under a "3-clause BSD"-style license;
# see the LICENSE file for details.
#-----------------------------------------------------------------------------
"""
Global tests for EditingContext, relative to inheritance
This tests the editing context and the whole underlying framework
CVS information
$Id: test_EditingContext_Global_Inheritance.py 968 2006-02-25 15:46:25Z sbigaret $
"""
__version__='$Revision: 968 $'[11:-2]
import unittest, sys
import utils
if __name__ == "__main__":
utils.disable_model_cache()
if '-c' in sys.argv or '-C' in sys.argv \
or '-m' in sys.argv or '-M' in sys.argv:
utils.fixpath(include_testPackages=0)
utils.dynamically_build_test_packages(sys.argv)
else:
utils.fixpath(include_testPackages=1)
from Modeling import ModelSet,Model
from Modeling.EditingContext import EditingContext
from Modeling.FetchSpecification import FetchSpecification
from Modeling.Qualifier import qualifierWithQualifierFormat
from Modeling.Validation import ValidationException,\
CUSTOM_OBJECT_VALIDATION, OBJECT_WIDE_KEY, DELETE_DENY_KEY
from Modeling import Adaptor
from Modeling import Database
from StoreEmployees.Store import Store
from StoreEmployees import Employee
from StoreEmployees.SalesClerk import SalesClerk
from StoreEmployees.Executive import Executive
from StoreEmployees.Address import Address
from StoreEmployees.Holidays import Holidays
from mx import DateTime
DateFrom=DateTime.DateFrom
class TestEditingContext_Global_Inheritance(unittest.TestCase):
"Global tests for EditingContext in relation to inheritance in a model"
def test_01_objectsWithFetchSpecification(self):
"[EC/Inheritance] objectsWithFetchSpecification"
ec=EditingContext()
fetchSpec=FetchSpecification(entityName='Employee', deepFlag=0)
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(len(objects)==0)
fetchSpec=FetchSpecification(entityName='Employee', deepFlag=1)
objects=ec.objectsWithFetchSpecification(fetchSpec)
#print len(objects), objects
self.failUnless(len(objects)==3)
def test_02_toOneFault(self):
"[EC/Inheritance] toOneFault"
ec=EditingContext()
qual=qualifierWithQualifierFormat('street like "2,*"')
fetchSpec=FetchSpecification(entityName='Address', qualifier=qual)
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(len(objects)==1)
addr_JohnJr=objects[0]
johnJr=addr_JohnJr.getToEmployee()
self.failUnless(johnJr.isFault())
self.failUnless(johnJr.__class__==Employee.Employee)
johnJr.willRead()
self.failIf(johnJr.isFault())
self.failUnless(johnJr.__class__==SalesClerk)
ec.dispose()
def test_03_toOneFault_notifications(self):
"[EC/Inheritance] toOneFault/GlobalIDChangedNotification"
ec1=EditingContext()
ec2=EditingContext()
qual=qualifierWithQualifierFormat('street like "2,*"')
fetchSpec=FetchSpecification(entityName='Address', qualifier=qual)
objects_1=ec1.objectsWithFetchSpecification(fetchSpec)
objects_2=ec2.objectsWithFetchSpecification(fetchSpec)
addr_JohnJr_1=objects_1[0]
johnJr_1=addr_JohnJr_1.getToEmployee()
addr_JohnJr_2=objects_2[0]
johnJr_2=addr_JohnJr_2.getToEmployee()
gid_2=ec2.globalIDForObject(johnJr_2)
# trigger the first one (1)
johnJr_1.willRead()
gid_1=ec1.globalIDForObject(johnJr_1)
# check that the notification was propagated to the 2nd EC
# meaning that the ``root'' globalID was turned into the concrete one
# the second fault shouldnt have been triggered
self.failUnless(johnJr_2.isFault())
# it is still available under the previous gid (gid_2)
self.failUnless(ec2.objectForGlobalID(gid_2)==johnJr_2)
# but not registered with that GlobalID!
self.failUnless(ec2.globalIDForObject(johnJr_2)==gid_1)
# Rather, with the new, concrete one
self.failUnless(ec2.objectForGlobalID(gid_1)==johnJr_2)
#
# Make some changes in the database, in the meantime
#
model=ModelSet.defaultModelSet().modelNamed('StoreEmployees')
pgAdaptor=Adaptor.adaptorWithModel(model)
context=pgAdaptor.createAdaptorContext()
channel=context.createAdaptorChannel()
sqlExpr=pgAdaptor.expressionClass()()
original_store_area=johnJr_1.getStoreArea()
if original_store_area=='AB':
new_store_area='AC'
else:
new_store_area='AB'
sqlExpr.setStatement("update SALES_CLERK set STORE_AREA='%s' where id=%i"\
%(new_store_area, gid_1.keyValues().values()[0]))
channel.evaluateExpression(sqlExpr)
#
# Now, check that triggering the fault for johnJr_2 uses the DB's snapshot
#
johnJr_2.willRead()
# Note: This also check that john2_Jr is of the correct class.
self.failIf(johnJr_2.getStoreArea()!=original_store_area)
self.failIf(johnJr_1.getStoreArea()!=original_store_area)
def test_04_validation(self):
"[EC/Inheritance] test Validation"
ec=EditingContext()
qual=qualifierWithQualifierFormat('street like "2,*"')
fetchSpec=FetchSpecification(entityName='Address', qualifier=qual)
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(len(objects)==1)
addr_JohnJr=objects[0]
johnJr=addr_JohnJr.getToEmployee()
addr_JohnJr.validateValueForKey(johnJr, 'toEmployee')
johnJr.willRead()
addr_JohnJr.validateValueForKey(johnJr, 'toEmployee')
def test_05_toOneFault_uniquing(self):
"[EC/Inheritance] Tests that toFaults are also unique within a ec"
ec=EditingContext()
# get address for John Cleese
qual=qualifierWithQualifierFormat('street like "4,*"')
fetchSpec=FetchSpecification(entityName='Address', qualifier=qual)
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(len(objects)==1)
john_addr=objects[0]
# get John's mark
fetchSpec=FetchSpecification(entityName='Mark')
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(len(objects)==1)
john_mark=objects[0]
# Verify that both faults points to the same object
self.failUnless(john_addr.getToEmployee().isFault())
self.failIf(john_addr.getToEmployee()!=john_mark.getExecutive())
# trigger one of the fault, then check again
john_addr.getToEmployee().willRead()
self.failIf(john_mark.getExecutive().isFault())
self.failIf(john_addr.getToEmployee()!=john_mark.getExecutive())
def test_05b_toOneFault_uniquing(self):
"[EC/Inheritance] Tests unicity for toOne-related objects when they are already fetched"
## In this test we make sure that, after an object has been fetched,
## any other object pointing to it do not get a to-one fault, but the
## object that is previously fetched.
ec=EditingContext()
## get John
qual=qualifierWithQualifierFormat('firstName=="John" AND lastName=="Cleese"')
fetchSpec=FetchSpecification(entityName='Executive', qualifier=qual)
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(len(objects)==1)
john=objects[0]
## get address for John Cleese
qual=qualifierWithQualifierFormat('street like "4,*"')
fetchSpec=FetchSpecification(entityName='Address', qualifier=qual)
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(len(objects)==1)
john_addr=objects[0]
## get John's mark
fetchSpec=FetchSpecification(entityName='Mark')
objects=ec.objectsWithFetchSpecification(fetchSpec)
self.failUnless(len(objects)==1)
john_mark=objects[0]
self.failIf(john_addr.getToEmployee().isFault())
self.failIf(john_mark.getExecutive().isFault())
## Check that inheritance does not defeat the uniquing strategy...
## Note: this is what used to fail when john is fetched before the others:
## in this case john_mark.getExecutive() was NOT a fault,
## while john_addr.getToEmployee() was a fault
gid1=ec.globalIDForObject(john)
gid2=ec.globalIDForObject(john_mark.getExecutive())
gid3=ec.globalIDForObject(john_addr.getToEmployee())
## ... for GlobalIDs...
self.failIf(gid1!=gid2)
self.failIf(gid2!=gid3)
## ...and for the corresponding objects
self.failIf(john_addr.getToEmployee()!=john)
self.failIf(john_addr.getToEmployee()!=john_mark.getExecutive())
## trigger one of the fault, then check again
john_addr.getToEmployee().willRead()
self.failIf(john_mark.getExecutive().isFault())
self.failIf(john_addr.getToEmployee()!=john_mark.getExecutive())
#self.failIf(john_addr.getToEmployee()!=john)
def test_06_objectsCountWithFetchSpecification(self):
"[EC/Inheritance] objectsCountWithFetchSpecification"
ec=EditingContext()
fs=FetchSpecification(entityName='Employee')
fs.setIsDeep(1)
nb=ec.objectsCountWithFetchSpecification(fs)
self.failIf(nb!=3)
#
fs=FetchSpecification(entityName='SalesClerk')
nb=ec.objectsCountWithFetchSpecification(fs)
self.failIf(nb!=2)
def test_07_fetch_shouldnt_change_FetchSpecification(self):
"[EC/Inheritance] fetching shouldn't change FetchSpecification"
# Bug #753147
from StoreEmployees import Address
from Modeling.FetchSpecification import FetchSpecification
from Modeling.EditingContext import EditingContext
from Modeling.Qualifier import qualifierWithQualifierFormat
ec = EC()
q = qWQF('toAddresses.zipCode like "4*"')
fs_deep = FS('Employee', qualifier=q, deepFlag=1) # Note the deepFlag
objs1=ec.objectsWithFetchSpecification(fs_deep)
self.failIf(len(objs1)!=3)
self.failIf(fs_deep.entityName()!='Employee')
objs2=ec.objectsWithFetchSpecification(fs_deep)
self.failIf(len(objs2)!=3)
def test_08a_fetchesRawRows(self):
"[EC/Inheritance] fetchesRawRows / basics"
ec=EditingContext()
# No inheritance
addresses=ec.fetch('Address', rawRows=1)
self.failIf(len(addresses)!=3)
for a in addresses:
self.failIf(type(a) is not type({}))
# Inheritance
employees=ec.fetch('Employee', isDeep=1, rawRows=1)
self.failIf(len(employees)!=3)
for e in employees:
self.failIf(type(e) is not type({}))
# modified objects
john=ec.fetch('Executive', 'lastName == "Cleese"')[0]
john_gid=john.globalID()
john_name=john.getLastName()
alternate_john_name=john_name+' -- testing'
john.setLastName(alternate_john_name)
john=ec.fetch('Executive', isDeep=1, rawRows=1)[0]
self.assertEqual(john['lastName'], alternate_john_name)
def test_08b_fetchesRawRows(self):
"[EC/Inheritance] fetchesRawRows / inserted & deleted objects"
ec=EditingContext()
# Inserted objects should appear, as expected
nsc=SalesClerk()
nsc.setLastName('New SC')
ne=Executive()
ne.setLastName('New Executive')
ec.insert(nsc)
ec.insert(ne)
employees=ec.fetch('Employee', isDeep=1, rawRows=1)
self.assertEqual(len(employees), 5)
rnsc=rne=None # returned nsc/ne
for e in employees:
self.assertEqual(type(e), type({}))
# deletedObjects should not appear
ec.delete(nsc)
jeanne=ec.fetch('SalesClerk', 'firstName=="Jeanne"')[0]
ec.deleteObject(jeanne)
employees=ec.fetch('Employee', isDeep=1, rawRows=1)
self.assertEqual(len(employees), 3)
rnsc=rne=rjeanne=None # returned nsc/ne
for e in employees:
self.assertEqual(type(e), type({}))
if e['lastName']==nsc.getLastName(): rnsc=e
if e['lastName']==ne.getLastName(): rne=e
if e['firstName']==jeanne.getFirstName(): rjeanne=e
self.assertEqual(rnsc, None)
self.assertEqual(rjeanne, None)
self.assertEqual(rne, ne.snapshot_raw())
def test_09_faultForRawRow(self):
"[EC/Inheritance] faultForRawRow"
ec=EditingContext()
employees=ec.fetch('Employee', isDeep=1, rawRows=1)
r_executives=[]
r_sales_clerks=[]
for e in employees:
if e.has_key('officeLocation'):
r_executives.append(e)
else:
r_sales_clerks.append(e)
self.failIf(len(r_executives)!=1)
self.failIf(len(r_sales_clerks)!=2)
r_john_c=r_executives[0]
john_c1=ec.faultForRawRow(r_john_c, 'Employee')
john_c2=ec.faultForRawRow(r_john_c, 'Executive')
self.failUnless(john_c1.isFault())
self.failUnless(john_c1==john_c2)
john_c3=ec.fetch('Employee', 'firstName=="John" AND lastName=="Cleese"',
isDeep=1)[0]
self.failUnless(john_c2==john_c3)
john_c3.getFirstName()
self.failIf(john_c1.isFault())
# Anything in the correct inheritance tree return the right object
john_c4=ec.faultForRawRow(r_john_c, 'SalesClerk')
self.failUnless(john_c1==john_c4)
# Check that it correctly raise when 'pk' is not present
self.assertRaises(ValueError,
ec.faultForRawRow, {'incorrect':0}, 'Employee')
## TBD: CHECK FOR TEMPORARY/INSERTED
def test_10_toMany_no_inverse__db_snapshot_is_correct(self):
"[EC/Inheritance] toMany w/o inverse: check db snapshot"
# We check here that a database snapshot for an object being pointed to
# by a toMany relationship with no inverse correctly contains its FK
ec=EditingContext()
holidays=Holidays()
holidays.setStartDate(DateFrom('2003-07-01 08:00'))
holidays.setEndDate(DateFrom('2003-08-01 08:00'))
ec.insert(holidays)
ec.saveChanges()
database=ec.rootObjectStore().objectStoreForObject(holidays).database()
snap=database.snapshotForGlobalID(holidays.globalID())
self.failIf(not snap.has_key('fkEmployeeId'))
## ToMany with no inverse:
# test10 / insertion:
# [a] insert (h.+emp.) + emp.addToHolidays + saveChges
# [b] insert h. + saveChges + emp.addToHolidays + saveChges
# [c] insert h.+ emp.addToHolidays + saveChges
# [d] insert h.+ saveChges + emp.addToHolidays + h.setStartDate + saveChges
# [e] insert h. + saveChges + insert emp. + emp.addToHolidays + saveChanges
#
# test11 / remove:
# [a] remove simple
# [b] add h1 + remove h2
# [c] remove (+ implicit delete w/ cascade rule) emp.
def _test_10_create_emp_n_holidays(self):
"""
helper function for tests test_10...: return (employee, holidays), brand
new instances of Employee/Holidays.
"""
emp=self._test_10_class_employee()
emp.setLastName('Test10'); emp.setFirstName('T.')
holidays=Holidays()
holidays.setStartDate(DateFrom('2003-07-01 08:00'))
holidays.setEndDate(DateFrom('2003-08-01 08:00'))
return emp, holidays
def _test_10_check_insert_correctness_part1(self, ec, emp, holidays):
emp_gid=emp.globalID()
holidays_gid=holidays.globalID()
db=ec.rootObjectStore().objectStoreForObject(emp).database()
self.failUnless(holidays_gid)
self.failIf(not db.snapshotForGlobalID(holidays_gid))
self.failIf(not db.snapshotForGlobalID(holidays_gid).has_key('fkEmployeeId'))
self.assertEqual(db.snapshotForGlobalID(holidays_gid)['fkEmployeeId'],
emp_gid.keyValues()['id'])
self.failUnless(db.snapshotForSourceGlobalID(emp.globalID(), 'holidays'))
return emp_gid, db
def _test_10_check_insert_correctness_part2(self, emp_gid, database):
""
# Now check that it was correctly saved. If 'holidays' was not correctly
# saved, it hasn't get the right value for its FK and we won't retrieve
# any holidays for the test employee
# make sure we won't get informations from the db cache
self.failIf(database.snapshotForSourceGlobalID(emp_gid, 'holidays'))
ec2=EditingContext()
emp2=ec2.fetch('Employee','lastName=="Test10"', isDeep=1)[0]
self.failIf(len(emp2.getHolidays())==0)
def _test_10a_toMany_with_no_inverse(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/a]"
# insert (h.+emp) + emp.addToHolidays + saveChanges
ec=EditingContext()
emp, holidays=self._test_10_create_emp_n_holidays()
ec.insert(emp)
ec.insert(holidays)
emp.addToHolidays(holidays)
ec.saveChanges()
emp_gid, db=self._test_10_check_insert_correctness_part1(ec, emp, holidays)
del ec # deleting the ec invalidates the database's cache
self._test_10_check_insert_correctness_part2(emp_gid, db)
def test_10a_toMany_with_no_inverse(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/a]"
self._test_10_class_employee=Employee.Employee
self._test_10a_toMany_with_no_inverse()
self._test_10_class_employee=SalesClerk
self._test_10a_toMany_with_no_inverse()
def _test_10b_toMany_with_no_inverse(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/b]"
# insert h. + saveChanges + emp.addToHolidays + saveChanges
ec=EditingContext()
emp, holidays=self._test_10_create_emp_n_holidays()
emp_gid=emp.globalID()
ec.insert(emp)
ec.saveChanges()
ec.insert(holidays)
ec.saveChanges()
emp.addToHolidays(holidays)
#emp.setLastName('fdsk')
ec.saveChanges()
emp_gid, db=self._test_10_check_insert_correctness_part1(ec, emp, holidays)
del ec # deleting the ec invalidates the database's cache
self._test_10_check_insert_correctness_part2(emp_gid, db)
def test_10b_toMany_with_no_inverse(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/b]"
self._test_10_class_employee=Employee.Employee
self._test_10b_toMany_with_no_inverse()
self._test_10_class_employee=SalesClerk
self._test_10b_toMany_with_no_inverse()
def _test_10c_toMany_with_no_inverse(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/c]"
# insert h. + emp.addToHolidays + saveChanges
ec=EditingContext()
emp, holidays=self._test_10_create_emp_n_holidays()
ec.insert(emp)
ec.saveChanges()
ec.insert(holidays)
emp.addToHolidays(holidays)
ec.saveChanges()
emp_gid, db=self._test_10_check_insert_correctness_part1(ec, emp, holidays)
del ec # deleting the ec invalidates the database's cache
self._test_10_check_insert_correctness_part2(emp_gid, db)
def test_10c_toMany_with_no_inverse(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/c]"
self._test_10_class_employee=Employee.Employee
self._test_10c_toMany_with_no_inverse()
self._test_10_class_employee=SalesClerk
self._test_10c_toMany_with_no_inverse()
def _test_10d_toMany_with_no_inverse(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/d]"
#insert h. + saveChanges + emp.addToHolidays + h.setStartDate + saveChanges
ec=EditingContext()
emp, holidays=self._test_10_create_emp_n_holidays()
ec.insert(emp)
ec.saveChanges()
ec.insert(holidays)
ec.saveChanges()
emp.addToHolidays(holidays)
holidays.setStartDate(DateFrom('2003-07-02 08:00'))
ec.saveChanges()
emp_gid, db=self._test_10_check_insert_correctness_part1(ec, emp, holidays)
del ec # deleting the ec invalidates the database's cache
self._test_10_check_insert_correctness_part2(emp_gid, db)
def test_10d_toMany_with_no_inverse(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/d]"
self._test_10_class_employee=Employee.Employee
self._test_10d_toMany_with_no_inverse()
self._test_10_class_employee=SalesClerk
self._test_10d_toMany_with_no_inverse()
def _test_10e_toMany_with_no_inverse(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/e]"
# insert h. + save + insert emp. + emp.addToHolidays + saveChanges
ec=EditingContext()
emp, holidays=self._test_10_create_emp_n_holidays()
ec.insert(holidays)
ec.saveChanges()
ec.insert(emp)
emp.addToHolidays(holidays)
ec.saveChanges()
emp_gid, db=self._test_10_check_insert_correctness_part1(ec, emp, holidays)
del ec # deleting the ec invalidates the database's cache
self._test_10_check_insert_correctness_part2(emp_gid, db)
def test_10e_toMany_with_no_inverse(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/e]"
self._test_10_class_employee=Employee.Employee
self._test_10e_toMany_with_no_inverse()
self._test_10_class_employee=SalesClerk
self._test_10e_toMany_with_no_inverse()
def _test_11a_init_emp_and_holidays(self, ec):
emp=self._test_11_class_employee()
emp.setLastName('Test11'); emp.setFirstName('T.')
holidays=Holidays()
holidays.setStartDate(DateFrom('2003-07-01 08:00'))
holidays.setEndDate(DateFrom('2003-08-01 08:00'))
ec.insert(emp)
ec.insert(holidays)
emp.addToHolidays(holidays)
ec.saveChanges()
return emp, holidays
def _test_11a_toMany_with_no_inverse__remove(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [remove/a]"
# remove simple
ec=EditingContext()
emp, holidays=self._test_11a_init_emp_and_holidays(ec)
emp.removeFromHolidays(holidays)
ec.saveChanges()
emp_gid=emp.globalID()
h_gid=holidays.globalID()
db=ec.rootObjectStore().objectStoreForObject(emp).database()
self.failUnless(db.snapshotForGlobalID(h_gid)['fkEmployeeId'] is None)
del ec
# make sure we won't get informations from the db cache
self.failIf(db.snapshotForSourceGlobalID(emp_gid, 'holidays'))
ec2=EditingContext()
emp2=ec2.fetch('Employee', 'lastName=="Test11"', isDeep=1)[0]
self.failIf(len(emp2.getHolidays())!=0)
def test_11a_toMany_with_no_inverse__remove(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [remove/a]"
self._test_11_class_employee=Employee.Employee
self._test_11a_toMany_with_no_inverse__remove()
self._test_11_class_employee=SalesClerk
self._test_11a_toMany_with_no_inverse__remove()
def _test_11b_toMany_with_no_inverse__remove(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/b]"
# remove + add
ec=EditingContext()
emp, holidays=self._test_11a_init_emp_and_holidays(ec)
emp.removeFromHolidays(holidays)
h2=Holidays()
sd2=DateFrom('2003-01-01 01:00')
ed2=DateFrom('2003-02-02 02:00')
h2.setStartDate(sd2)
h2.setEndDate(ed2)
ec.insert(h2)
emp.addToHolidays(h2)
ec.saveChanges()
emp_gid=emp.globalID()
h_gid=holidays.globalID()
db=ec.rootObjectStore().objectStoreForObject(emp).database()
self.failUnless(db.snapshotForGlobalID(h_gid)['fkEmployeeId'] is None)
del ec
# make sure we won't get informations from the db cache
self.failIf(db.snapshotForSourceGlobalID(emp_gid, 'holidays'))
ec2=EditingContext()
emp2=ec2.fetch('Employee', 'lastName=="Test11"', isDeep=1)[0]
self.assertEqual(len(emp2.getHolidays()), 1)
h2b=emp2.getHolidays()[0]
# the fetched value may be of a different type (python's datetime),
# if returned as such by the python db-adapter (mysqldb e.g.)
if getattr(h2b.getStartDate(),'isoformat',None):
self.assertEqual(sd2.Format('%Y-%m-%dT%H:%M:%S'),
h2b.getStartDate().isoformat())
self.assertEqual(ed2.Format('%Y-%m-%dT%H:%M:%S'),
h2b.getEndDate().isoformat())
else:
self.assertEqual(sd2, h2b.getStartDate())
self.assertEqual(ed2, h2b.getEndDate())
#print db.snapshotForGlobalID(h_gid)
def test_11b_toMany_with_no_inverse__remove(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/b]"
self._test_11_class_employee=Employee.Employee
self._test_11b_toMany_with_no_inverse__remove()
self._test_11_class_employee=SalesClerk
self._test_11b_toMany_with_no_inverse__remove()
def _test_11c_toMany_with_no_inverse__remove(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/c]"
# remove + add
ec=EditingContext()
emp, holidays=self._test_11a_init_emp_and_holidays(ec)
emp.removeFromHolidays(holidays)
ec.delete(emp)
ec.saveChanges()
db=ec.rootObjectStore().objectStoreForObject(holidays).database()
h_gid=holidays.globalID()
del ec
ec2=EditingContext()
h2=ec2.fetch('Holidays')[0]
#print db.snapshotForGlobalID(h_gid)
self.failIf(db.snapshotForGlobalID(h_gid)['fkEmployeeId']!=None)
def test_11c_toMany_with_no_inverse__remove(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/c]"
self._test_11_class_employee=Employee.Employee
self._test_11c_toMany_with_no_inverse__remove()
self._test_11_class_employee=SalesClerk
self._test_11c_toMany_with_no_inverse__remove()
def _test_12_toMany_with_no_inverse__correct_fetch(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly fetched"
ec=EditingContext()
emp, holidays=self._test_11a_init_emp_and_holidays(ec)
db=ec.rootObjectStore().objectStoreForObject(holidays).database()
h_gid=holidays.globalID()
del ec
ec2=EditingContext()
h2=ec2.fetch('Holidays')[0]
self.failIf(db.snapshotForGlobalID(h_gid)['fkEmployeeId']==None)
def test_12_toMany_with_no_inverse__correct_fetch(self):
"[EC/Inheritance] toMany w/o inverse: obj. correctly fetched"
self._test_11_class_employee=Employee.Employee
self._test_12_toMany_with_no_inverse__correct_fetch()
self._test_11_class_employee=SalesClerk
self._test_12_toMany_with_no_inverse__correct_fetch()
def _test_13_toMany_with_no_inverse__snapshot_row(self):
"[EC/Inheritance] toMany w/o inverse: obj.snapshot_raw()"
ec=EditingContext()
emp, holidays=self._test_10_create_emp_n_holidays()
ec.insert(emp)
ec.insert(holidays)
emp.addToHolidays(holidays)
# Not computed before saveChanges()
self.failUnless(holidays.snapshot_raw().has_key('fkEmployeeId'))
self.assertEqual(holidays.snapshot_raw()['fkEmployeeId'], None)
ec.saveChanges()
#print holidays.snapshot_raw()
# We check here that snapshot_raw() is able to find the value
# for the FK by examining the database's cache.
self.assertEqual(holidays.snapshot_raw()['fkEmployeeId'],
emp.globalID().keyValues()['id'])
def test_13_toMany_with_no_inverse__snapshot_row(self):
"[EC/Inheritance] toMany w/o inverse: obj.snapshot_raw()"
self._test_10_class_employee=Employee.Employee
self._test_13_toMany_with_no_inverse__snapshot_row()
self._test_10_class_employee=SalesClerk
self._test_13_toMany_with_no_inverse__snapshot_row()
def tearDown(self):
"""
Cleans up the Database after each tests
"""
ec=EditingContext()
w=Store()
dbChannel=ec.rootObjectStore().objectStoreForObject(w).availableChannel()
channel=dbChannel.adaptorChannel()
sqlExpr=channel.adaptorContext().adaptor().expressionClass()()
sqlExpr.setStatement("delete from STORE where id>1")
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("delete from SALES_CLERK where id>2")
channel.evaluateExpression(sqlExpr)
# id>3 coz' SALES_CLERK and EXECUTIVE share the same SQL sequence
sqlExpr.setStatement("delete from EXECUTIVE where id>3")
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("delete from ADDRESS where id>3")
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("delete from EMPLOYEE")
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("delete from HOLIDAYS")
channel.evaluateExpression(sqlExpr)
def reinitDB(database_cfg):
"""
Reinitializes the database: recreates the db's schema needed for these tests
to run. Also inserts some data in the created db.
"""
from Modeling.SchemaGeneration import SchemaGeneration,\
CreateDatabaseKey, DropDatabaseKey, CreateTablesKey, DropTablesKey, \
CreatePrimaryKeySupportKey, DropPrimaryKeySupportKey,\
ForeignKeyConstraintsKey, PrimaryKeyConstraintsKey
options={
DropDatabaseKey : 1,
CreateDatabaseKey : 1,
DropTablesKey : 1,
CreateTablesKey : 1,
CreatePrimaryKeySupportKey : 1,
DropPrimaryKeySupportKey : 1,
ForeignKeyConstraintsKey : 0,
PrimaryKeyConstraintsKey : 1,
}
model=ModelSet.defaultModelSet().modelNamed('StoreEmployees')
pgAdaptor=Adaptor.adaptorWithModel(model)
schemaGeneration = pgAdaptor.schemaGenerationFactory()
sqlExprs=schemaGeneration.schemaCreationStatementsForEntities(model.entities(), options)
context=pgAdaptor.createAdaptorContext()
channel=context.createAdaptorChannel()
for sqlExpr in sqlExprs:
try: channel.evaluateExpression(sqlExpr)
except Exception, exc:
print 'error: %s'%str(exc)
if database_cfg=='Oracle.cfg':
sqlExpr.setStatement("alter session set nls_date_format = 'YYYY-MM-DD HH24:mi:ss'")
channel.evaluateExpression(sqlExpr)
# Store
Store=model.entityNamed('Store')
pks=channel.primaryKeysForNewRowsWithEntity(1, Store)
st_pk=pks[0].values()[0]
sqlExpr=pgAdaptor.expressionClass()()
sqlExpr.setStatement("insert into STORE (id, corporate_name) "\
"values (%i,'Flying Circus')"%st_pk)
channel.evaluateExpression(sqlExpr)
# SalesClerk
SalesClerk=model.entityNamed('SalesClerk')
pks=channel.primaryKeysForNewRowsWithEntity(2, SalesClerk)
sc_pks=[pk.values()[0] for pk in pks]
sqlExpr.setStatement("insert into SALES_CLERK "\
"(id, FIRST_NAME, LAST_NAME, STORE_AREA, FK_STORE_ID) "\
"values (%i,'John Jr.','Cleese', 'AB', %i)"%(sc_pks[0],
st_pk))
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("insert into SALES_CLERK "\
"(ID, FIRST_NAME, LAST_NAME, STORE_AREA, FK_STORE_ID) "\
"values (%i,'Jeanne','Cleese', 'DE', %i)"%(sc_pks[1],
st_pk))
channel.evaluateExpression(sqlExpr)
# Executive
Executive=model.entityNamed('Executive')
pks=channel.primaryKeysForNewRowsWithEntity(1, Executive)
ex_pk=pks[0].values()[0]
sqlExpr.setStatement("insert into EXECUTIVE "\
"(id,FIRST_NAME,LAST_NAME,OFFICE_LOCATION,FK_STORE_ID)"\
" values (%i,'John','Cleese', '4XD7', %i)"%(ex_pk,
st_pk))
channel.evaluateExpression(sqlExpr)
# Address
Address=model.entityNamed('Address')
pks=channel.primaryKeysForNewRowsWithEntity(3, Address)
ad_pks=[pk.values()[0] for pk in pks]
sqlExpr.setStatement("insert into ADDRESS "\
"(ID, STREET, ZIP_CODE, TOWN, FK_EMPLOYEE_ID) "\
"values (%i,'2, rue Chose','45','ChoseVille',%i)"\
%(ad_pks[0],
sc_pks[0])) # John Jr.
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("insert into ADDRESS "\
"(ID, STREET, ZIP_CODE, TOWN, FK_EMPLOYEE_ID) "\
"values (%i,'2b, rue Chose','45','ChoseVille',%i)"\
%(ad_pks[1],
sc_pks[1])) # Jeanne.
channel.evaluateExpression(sqlExpr)
sqlExpr.setStatement("insert into ADDRESS "\
"(ID, STREET, ZIP_CODE, TOWN, FK_EMPLOYEE_ID) "\
"values (%i,'4, rue MachinChose','45','ChoseVille',%i)"\
%(ad_pks[2],
ex_pk)) # John C.
channel.evaluateExpression(sqlExpr)
# Mark
Mark=model.entityNamed('Mark')
pks=channel.primaryKeysForNewRowsWithEntity(1, Mark)
mark_pk=pks[0].values()[0]
sqlExpr=pgAdaptor.expressionClass()()
# John Cleese got a very good mark in January: 20/20
sqlExpr.setStatement("insert into MARK (id, month, mark, FK_EXECUTIVE_ID) "\
"values (%i,'01','20',%i)"%(mark_pk,ex_pk)) # John C.
channel.evaluateExpression(sqlExpr)
def usage(prgName, exitStatus=None):
_usage="""%s [-v] [-V]
Global tests for the framework's core, testing inheritance-related features
Options
--------
-v Minimally verbose
-V Really verbose
-h Prints this message
-p enables profiling
-r reinitialize the (postgresql) database
the database defined in the model 'model_StoreEmployees.xml'
is altered: tables are dropped, then they are re-created along with
constraints for PKs and FKs and PK support (sequences)
-d sets the database adaptor to use: Postgresql (default), MySQL, Oracle or
or SQLite
""" % prgName
if exitStatus is not None:
print _usage
sys.exit(exitStatus)
else:
return _usage
verbose=0
database_cfg='Postgresql.cfg'
def main(args):
me=args[0]
import getopt
options, args = getopt.getopt(sys.argv[1:], 'vVprd:')
#except: usage(me, 1)
global verbose, database_cfg
profile=0; reinitDB_flag=0
for k, v in options:
if k=='-h': usage(me, 1)
if k=='-v': verbose=1; continue
if k=='-V': verbose="Y"; continue
if k=='-p': profile=1; continue
if k=='-d':
if v not in ('Postgresql', 'MySQL', 'Oracle', 'SQLite'): usage(me, 1)
database_cfg='%s.cfg'%v
continue
if k=='-r': reinitDB_flag=1; continue
if args: usage(me, 1) #raise 'Unexpected arguments', args
# Initialization of model's caracs.
model=ModelSet.defaultModelSet().modelNamed('StoreEmployees')
Model.updateModelWithCFG(model, database_cfg)
# MySQL specifics: change TIMESTAMP to DATETIME
if database_cfg=='MySQL.cfg':
model.entityNamed('Holidays').attributeNamed('startDate').setExternalType('DATETIME')
model.entityNamed('Holidays').attributeNamed('endDate').setExternalType('DATETIME')
# Oracle specifics: change TIMESTAMP to DATE
if database_cfg=='Oracle.cfg':
model.entityNamed('Holidays').attributeNamed('startDate').setExternalType('DATE')
model.entityNamed('Holidays').attributeNamed('endDate').setExternalType('DATE')
global DateFrom
def OracleDateFrom(str):
date, time=str.split()
yyyy,mm,dd=map(lambda s: int(s), date.split('-'))
h,m=map(lambda s: int(s), time.split(':'))
s=0
import DCOracle2
return DCOracle2.Timestamp(yyyy,mm,dd,h,m,s)
DateFrom=OracleDateFrom
# SQLite specifics: test_03 requires that the framework closes the db
# connection, so that the new context/channel can connect to the db (sqlite
# does not allow concurrent access to the db).
if database_cfg=='SQLite.cfg':
import os
os.environ['MDL_TRANSIENT_DB_CONNECTION']='yes'
utils.enable_model_cache_and_compute()
if reinitDB_flag: reinitDB(database_cfg); return
if profile:
import profile
profile.run('utils.run_suite(test_suite(), verbosity=verbose)',
'profile.out')
return
else:
return utils.run_suite(test_suite(), verbosity=verbose)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestEditingContext_Global_Inheritance,
"test_"))
return suite
if __name__ == "__main__":
errs = main(sys.argv)
#errs = utils.run_suite(test_suite())
sys.exit(errs and 1 or 0)
|