# -*- coding: iso-8859-1 -*-
#-----------------------------------------------------------------------------
# Modeling Framework: an Object-Relational Bridge for python
#
# Copyright (c) 2001-2004 Sbastien Bigaret <sbigaret@users.sourceforge.net>
# All rights reserved.
#
# This file is part of the Modeling Framework.
#
# This code is distributed under a "3-clause BSD"-style license;
# see the LICENSE file for details.
#-----------------------------------------------------------------------------
"""
DatabaseContext
See interfaces.DatabaseChannel for details
CVS information
$Id: DatabaseChannel.py 969 2006-02-25 23:15:05Z sbigaret $
"""
__version__='$Revision: 969 $'[11:-2]
from logging import trace
from NotificationFramework import NotificationCenter
NC=NotificationCenter
# Framework
from ObjectStore import ObjectsChangedInStoreNotification,UpdatedKey
from ClassDescription import ClassDescription
from GlobalID import KeyGlobalID,GlobalIDChangedNotification
class DatabaseChannel:
"""
The DatabaseChannel's reponsability is to fetch objects on behalf of a
DatabaseContext.
It uses an concrete AdaptorChannel to perform the fetch, it then builds
DatabaseObject from the fetched rows, and returns those objects to the
requesting DatabaseContext --the DatabaseContext uses 'fetchObject()' to get
each object after another.
Note about inheritance: the DatabaseChannel does not care about
inheritance. When it gets a request/FetchSpecification for a given entity it
fetches rows from the database table 'entity.externalName()' and from this
one only, whatever the 'isDeep' flag of the FetchSpecification is. It is the
DatabaseContext responsability to correctly drive the DatabaseChannel (see
DatabaseContext.objectsWithFetchSpecification())
"""
def __init__(self, aDatabaseContext):
"""
Initializes a new DatabaseChannel and registers it with 'aDatabaseContext'.
It also creates its own AdaptorChannel to work with, by sending
to aDatabaseContext.adaptorContext() the message createAdaptorChannel().
The following properties: currentEntity, isLocking() and
isRefreshingObjects() are reset each time the method
selectObjectsWithFetchSpecification() is called, according to the
FetchSpecification settings.
"""
self._dbContext=aDatabaseContext
self._adaptorChannel=aDatabaseContext.adaptorContext().createAdaptorChannel()
self.__isFetchInProgress=0
# the following settings are reset each time selectObjects...() is called
# thus defaults are not important at all
self._editingContext=None
self._currentEntity=''
self._isLocking=0
self._refreshesRefreshedObject=0
self._rawRows=0
def adaptorChannel(self):
"""
Returns the underlying concrete AdaptorChannel.
"""
return self._adaptorChannel
def cancelFetch(self):
"""
Closes the underlying AdaptorChannel, cancels the pending fetch, and
resets/flushes all internal state.
"""
self._adaptorChannel.closeChannel()
self.setCurrentEditingContext(None)
self.__isFetchInProgress=0
self._editingContext=None
self._currentEntity=''
self._isLocking=0
self._refreshesRefreshedObject=0
self._rawRows=0
def databaseContext(self):
"""
Returns the DatabaseContext which pilots the DatabaseChannel.
"""
return self._dbContext
def fetchesRawRows(self):
"""
"""
return self._rawRows
def fetchObject(self):
"""
- channel.fetchRow
- build GlobalID
Posts ObjectsChangedInStoreNotification (updated) if isRefreshingObjects
If no more object is to be fetched, it resets the DatabaseChannel and
returns None.
Note that if the object fetched is already registered within the Database,
its snapshots is not refreshed, and it is substituted to the fetched row
(unless the fetch isRefreshingObjects)
See also: cancelFetch(), selectObjectsWithFetchSpecification()
Entity.globalIDForRow()
"""
#import pdb ; pdb.set_trace()
if not self.isFetchInProgress():
raise RuntimeError, 'No fetch in progress'
import time
currentTimestamp=time.time() # for snapshots
raw_row=self._adaptorChannel.fetchRow()
if raw_row is None:
self.cancelFetch() # No more objects
return None
globalID=self._currentEntity.globalIDForRow(raw_row)
if self.fetchesRawRows():
ec=self._editingContext
object=ec.objectForGlobalID(globalID)
if object is not None:
if not object.isFault():
return object.snapshot_raw()
else:
database=self.databaseContext().database()
registeredSnapshot=database.snapshotForGlobalID(globalID)
if registeredSnapshot:
return registeredSnapshot
else:
return raw_row
else:
return raw_row
# Get snapshot
#snapshot=self._currentEntity.snapshotForRow(raw_row)
snapshot=raw_row
database=self.databaseContext().database()
# Locking?
if self.isLocking():
self.databaseContext().registerLockedObjectWithGlobalID(globalID)
# Now we need to lock the corresponding database object. Why?
#
# One could argue that e.g. since an EC forwards objectsWithFetchSpec()
# ultimately to its root object store --an ObjectStoreCoordinator--, and
# since OSC locks/unlocks itself before/after each operation, an other EC
# used in a concurrent thread can access the Database object (and its
# snapshots) in the meantime.
#
# This is true w.r.t. ECs standard operations, but there remains a case
# where a concurrent thread can change the Database's snapshots: when an
# EC is finalized and garbage-collected, it forgets its objects and
# decrements the corresponding snapshot reference count --at this time, if
# the ref.count falls down to zero, the snapshot is removed from the
# Database's snapshots cache.
#
# Now this can happen in another thread while in this thread, the snapshot
# has been retrieved but it has not been used yet by ec.initializeObject()
# (forwarded to and handled by DBContext.initializeObject()). When this
# happens, Database.incrementSnapshotCountForGlobalID() called by
# DBContext.initializeObject() raises KeyError on the corresp. GlobalID
# since the corresponding snapshot has been thrown away.
#
snapshot_refcount_incremented = 0
database.lock()
try:
# Handle snapshot
# NB: ask delegate __TBD
if self.isRefreshingObjects() or self.isLocking():
# Unconditional refresh
database.recordSnapshotForGlobalID(snapshot, globalID)
if self.isRefreshingObjects():
# NB: isLocking-> no need to invalidate objects since two ECs cannot
# hold the same object at the same time
NC.postNotification(ObjectsChangedInStoreNotification, self,
{UpdatedKey: (globalID,)})
else:
# Should the snapshot be updated? if there is already one, no update.
# __TBD ask the DatabaseContext delegate whether we should refresh
# __TBD and, if yes, post ObjectsChangedInStoreNotification
registeredSnapshot=database.snapshotForGlobalID(globalID)
if not registeredSnapshot:
database.recordSnapshotForGlobalID(snapshot, globalID)
else:
snapshot=registeredSnapshot
ec=self._editingContext
object=ec.objectForGlobalID(globalID)
if object is not None and not object.isFault():
return object
if object is not None and object.isFault():
#NC.removeObserver(object.faultHandler(), GlobalIDChangedNotification,
# globalID)
NC.removeObserver(ec, GlobalIDChangedNotification, globalID)
#object.clearFault()
#else:
cd=ClassDescription.classDescriptionForName(self._currentEntity.name())
# first, check that we do not have a fault registered under the ``root''
# GlobalID
root_cd_name=cd.rootClassDescription().entityName()
if root_cd_name!=cd.entityName():
root_globalID=KeyGlobalID(root_cd_name, globalID.keyValues())
trace('Searching for possible fault w/ rootGlobalID=%s'%root_globalID)
possible_faulted_object=ec.objectForGlobalID(root_globalID)
if possible_faulted_object:
if possible_faulted_object.isFault():
trace('Found it: posting GlobalIDChangedNotification')
NC.postNotification(GlobalIDChangedNotification, root_globalID,
{root_globalID: globalID})
object=possible_faulted_object
# useless: it did receive the notification, and it has unregistered
# itself from the list of the observers
#NC.removeObserver(object.faultHandler(),
# GlobalIDChangedNotification, globalID)
object.clearFault()
new_class=cd.classForInstances()
object.__class__=new_class
object.__class__.__init__(object)
else:
raise RuntimeError, "Serious: shouldn't happen, please report"
else:
# create object
object=cd.createInstanceWithEditingContext(ec)
ec.recordObject(object, globalID)
else:
if not object:
# create object
object=cd.createInstanceWithEditingContext(ec)
ec.recordObject(object, globalID)
database.incrementSnapshotCountForGlobalID(globalID)
snapshot_refcount_incremented = 1
finally:
database.unlock()
# Initialize the new object or the cleared fault
try:
object.clearFault()
except:
pass
try:
ec.initializeObject(object, globalID, ec)
finally:
if snapshot_refcount_incremented:
database.decrementSnapshotCountForGlobalID(globalID)
return object
def isFetchInProgress(self):
"""
"""
return self.__isFetchInProgress
def isLocking(self):
"""
Tells whether the fetched objects should be locked or not.
If the DatabaseChannel has no fetch in progress, it returns false.
See also:
selectObjectsWithFetchSpecification,
FetchSpecification.locksObjects()
"""
return self._isLocking
def isRefreshingObjects(self):
"""
Tells whether the fetched objects should be refreshed or not.
See also:
- selectObjectsWithFetchSpecification,
- FetchSpecification.refreshesRefetchedObjects()
"""
return self._isRefreshingObjects
def selectCountObjectsWithFetchSpecification(self, aFetchSpecification,
anEditingContext=None):
"""
Returns the number of rows that would be retrieved if the method
selectObjectsWithFetchSpecification() was called with the same parameter.
Parameters:
aFetchSpecification -- a FetchSpecification object describing the
objects to be fetched
anEditingContext -- optional (since objects are not really fetched)
See also: selectObjectsWithFetchSpecification(),
EditingContext.objectsCountWithFetchSpecification()
DatabaseContext.objectsCountWithFetchSpecification()
AdaptorChannel.rowCountForSelectAttributes()
"""
if self.isFetchInProgress():
raise RuntimeError, 'a fetch in already in progress'
self.__isFetchInProgress=1
# Get an adaptorChannel
try:
if not self._adaptorChannel.isOpen():
self._adaptorChannel.openChannel()
import ModelSet
defModelSet=ModelSet.defaultModelSet()
entity=defModelSet.entityNamed(aFetchSpecification.entityName())
return self._adaptorChannel.rowCountForSelectAttributes(entity.attributesToFetch(), aFetchSpecification, self._isLocking, entity)
finally:
self.cancelFetch()
def selectObjectsWithFetchSpecification(self,
aFetchSpecification,
anEditingContext):
"""
The method follows the following steps:
1. sets the object's properties according to aFetchSpecification's
properties, i.e.: currentEditingContext, currentEntity, isLocking,
isRefreshingObjects, etc.
2. Opens its AdaptorChannel, if needed, then sends it the appropriate
'selectAttributes' message.
Fetched objects can be then retrieved one after the other by calling
the method fetchObject().
Inheritance and 'isDeep' flaf of aFetchSpecification: Note that
aFetchSpecification's entity is the real destination entity for the
fetched objects, not one of its superclasses. Inheritance is handled at a
higher level, i.e. in DatabaseContext.objectsWithFetchSpecification().
The 'isDeep' flag is completely ignored.
See also: fetchObject(), selectCountObjectsWithFetchSpecification()
AdaptorChannel.selectAttributes()
EditingContext.objectsWithFetchSpecification()
"""
if self.isFetchInProgress():
raise RuntimeError, 'a fetch in already in progress'
self.__isFetchInProgress=1
try:
# Reinitializes settings
self.setCurrentEditingContext(anEditingContext)
import ModelSet
defModelSet=ModelSet.defaultModelSet()
entity=defModelSet.entityNamed(aFetchSpecification.entityName())
self.setCurrentEntity(entity)
self.setIsLocking(aFetchSpecification.locksObjects())
self.setIsRefreshingObjects(aFetchSpecification.refreshesRefetchedObjects())
self.setFetchesRawRow(aFetchSpecification.fetchesRawRows())
# Get an adaptorChannel
if not self._adaptorChannel.isOpen():
self._adaptorChannel.openChannel()
#self._adaptorChannel.executeExpression(sqlExpr)
self._adaptorChannel.selectAttributes(entity.attributesToFetch(),
aFetchSpecification,
self._isLocking,
entity)
except:
# If something went wrong we must cancel the fetch in progress
self.cancelFetch()
raise
def setCurrentEditingContext(self, anEditingContext):
"""
This is automatically called by selectObjectsWithFetchSpecification()
"""
self._editingContext=anEditingContext
def setCurrentEntity(self, anEntity):
"""
This is automatically called by selectObjectsWithFetchSpecification(),
according to the settings of the FetchSpecification the latter method got.
"""
self._currentEntity=anEntity
def setFetchesRawRow(self, aBool):
"""
This is automatically called by selectObjectsWithFetchSpecification(),
according to the settings of the FetchSpecification the latter method got.
"""
self._rawRows=not not aBool
def setIsLocking(self, isLocking):
"""
This is automatically called by selectObjectsWithFetchSpecification(),
according to the settings of the FetchSpecification the latter method got.
"""
self._isLocking=not not isLocking
def setIsRefreshingObjects(self, isRefreshingObjects):
"""
This is automatically called by selectObjectsWithFetchSpecification(),
according to the settings of the FetchSpecification the latter method got.
"""
self._isRefreshingObjects=not not isRefreshingObjects
|