# -*- coding: iso-8859-1 -*-
#-----------------------------------------------------------------------------
# Modeling Framework: an Object-Relational Bridge for python
#
# Copyright (c) 2001-2004 Sbastien Bigaret <sbigaret@users.sourceforge.net>
# All rights reserved.
#
# This file is part of the Modeling Framework.
#
# This code is distributed under a "3-clause BSD"-style license;
# see the LICENSE file for details.
#-----------------------------------------------------------------------------
"""
ObserverCenter
Please refer to interfaces.ObserverCenter for documentation
CVS information
$Id: ObserverCenter.py 932 2004-07-20 06:21:57Z sbigaret $
"""
__version__='$Revision: 932 $'[11:-2]
isLogEnabled=0
from logging import trace
## The following lock is used by all module's methods
## Remember that you should *NOT* directly access to module's variables
## since they are considered private (MT-safety)
from threading import RLock
import weakref, sys
__observers_wdict = weakref.WeakKeyDictionary() # dict[object]=[observers]
__omniscientObservers = [] # omniscient observers
__alreadyNotifiedObjects = weakref.WeakKeyDictionary() # keys only
observerCenter_lock=RLock()
lock=observerCenter_lock.acquire
unlock=observerCenter_lock.release
def __observersForObject(anObject):
"""
Private method - do not access directly.
Equivalent to public method 'observersForObject()'
"""
observers=__observers_wdict.get(anObject, [])
for observer in observers:
if observer() is None:
observers.remove(observer)
#observers=filter(lambda o:o(), observers)
#__observers_wdict[anObject]=observers
return [obs() for obs in observers]
def __setObjectObservers(anObject, observers):
"""
Private method - do not access directly.
"""
__observers_wdict[anObject]=map(weakref.ref, observers)
def __ensureSubsequentChangeWillBeNotifiedForObject(anObject):
"""
Private method - do not access directly.
"""
try:
removeObjectFromAlreadyNotifiedObjects(anObject)
except KeyError:
#import sys
#print 'Ooops %s %s %s'%sys.exc_info()
#print __alreadyNotifiedObjects.keys()
pass
if sys.version_info >= (2, 2): # python v2.2 and higher
def removeObjectFromAlreadyNotifiedObjects(object):
"Private method - do not access directly."
del __alreadyNotifiedObjects[object]
else:
def removeObjectFromAlreadyNotifiedObjects(object):
"Private method - do not access directly."
del __alreadyNotifiedObjects[weakref.ref(object)]
## Public API
def addObserver(anObserver, anObject):
"See interfaces.ObserverCenter for details"
lock()
try:
observers = __observersForObject(anObject)
if anObserver not in observers:
observers.append(anObserver)
# add it and reset the 'wasNotified' flag
__setObjectObservers(anObject, observers)
__ensureSubsequentChangeWillBeNotifiedForObject(anObject)
finally:
unlock()
def addOmniscientObserver(anObserver):
"See interfaces.ObserverCenter for details"
lock()
try:
obs=weakref.ref(anObserver)
if obs not in __omniscientObservers:
__omniscientObservers.append(obs)
finally:
unlock()
def ensureSubsequentChangeWillBeNotifiedForObject(anObject):
"See interfaces.ObserverCenter for details"
lock()
try:
__ensureSubsequentChangeWillBeNotifiedForObject(anObject)
finally:
unlock()
def notifyObserversObjectWillChange(anObject):
"See interfaces.ObserverCenter for details"
lock()
try:
for omni in map(lambda o: o(), __omniscientObservers):
#trace('Notifying omniscient observer %s'%str(omni))
if omni is not None:
omni.objectWillChange(anObject)
global __alreadyNotifiedObjects
if anObject is None:
__alreadyNotifiedObjects.clear()
return
if __alreadyNotifiedObjects.get(anObject, 0): return
observers=__observersForObject(anObject)
for observer in observers:
try:
#trace('Notifying observer %s'%str(observer))
observer.objectWillChange(anObject)
except 'truc': # swallow all possible exceptions
raise
__alreadyNotifiedObjects[anObject]=1
finally:
unlock()
def observersForObject(anObject):
"See interfaces.ObserverCenter for details"
lock()
try:
# make a copy and returns it
return tuple(__observersForObject(anObject))
finally:
unlock()
def removeObserver(anObserver, anObject):
"See interfaces.ObserverCenter for details"
lock()
try:
observers = __observersForObject(anObject)
try:
observers.remove(anObserver)
__setObjectObservers(anObject, observers)
except:
pass
finally:
unlock()
def removeOmniscientObserver(anObserver):
"See interfaces.ObserverCenter for details"
lock()
try:
__omniscientObservers.remove(weakref.ref(anObserver))
finally:
unlock()
|