DatabaseChannel.py :  » Database » Modeling-Framework » Modeling-0.9 » Modeling » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » Modeling Framework 
Modeling Framework » Modeling 0.9 » Modeling » DatabaseChannel.py
# -*- coding: iso-8859-1 -*-
#-----------------------------------------------------------------------------
# Modeling Framework: an Object-Relational Bridge for python
#
# Copyright (c) 2001-2004 Sbastien Bigaret <sbigaret@users.sourceforge.net>
# All rights reserved.
#
# This file is part of the Modeling Framework.
#
# This code is distributed under a "3-clause BSD"-style license;
# see the LICENSE file for details.
#-----------------------------------------------------------------------------


"""
DatabaseContext

  See interfaces.DatabaseChannel for details

  CVS information

    $Id: DatabaseChannel.py 969 2006-02-25 23:15:05Z sbigaret $
  
"""

__version__='$Revision: 969 $'[11:-2]

from logging import trace
from NotificationFramework import NotificationCenter
NC=NotificationCenter

# Framework
from ObjectStore import ObjectsChangedInStoreNotification,UpdatedKey
from ClassDescription import ClassDescription
from GlobalID import KeyGlobalID,GlobalIDChangedNotification

class DatabaseChannel:
  """
  The DatabaseChannel's reponsability is to fetch objects on behalf of a
  DatabaseContext.

  It uses an concrete AdaptorChannel to perform the fetch, it then builds
  DatabaseObject from the fetched rows, and returns those objects to the
  requesting DatabaseContext --the DatabaseContext uses 'fetchObject()' to get
  each object after another.

  Note about inheritance: the DatabaseChannel does not care about
  inheritance. When it gets a request/FetchSpecification for a given entity it
  fetches rows from the database table 'entity.externalName()' and from this
  one only, whatever the 'isDeep' flag of the FetchSpecification is. It is the
  DatabaseContext responsability to correctly drive the DatabaseChannel (see
  DatabaseContext.objectsWithFetchSpecification())

  """

  def __init__(self, aDatabaseContext):
    """
    Initializes a new DatabaseChannel and registers it with 'aDatabaseContext'.

    It also creates its own AdaptorChannel to work with, by sending
    to aDatabaseContext.adaptorContext() the message createAdaptorChannel().

    The following properties: currentEntity, isLocking() and
    isRefreshingObjects() are reset each time the method
    selectObjectsWithFetchSpecification() is called, according to the
    FetchSpecification settings.
    """
    self._dbContext=aDatabaseContext
    self._adaptorChannel=aDatabaseContext.adaptorContext().createAdaptorChannel()
    self.__isFetchInProgress=0
    # the following settings are reset each time selectObjects...() is called
    # thus defaults are not important at all
    self._editingContext=None
    self._currentEntity=''
    self._isLocking=0
    self._refreshesRefreshedObject=0
    self._rawRows=0
    
  def adaptorChannel(self):
    """
    Returns the underlying concrete AdaptorChannel.
    """
    return self._adaptorChannel
  
  def cancelFetch(self):
    """
    Closes the underlying AdaptorChannel, cancels the pending fetch, and
    resets/flushes all internal state.
    """
    self._adaptorChannel.closeChannel()
    self.setCurrentEditingContext(None)
    self.__isFetchInProgress=0
    self._editingContext=None
    self._currentEntity=''
    self._isLocking=0
    self._refreshesRefreshedObject=0
    self._rawRows=0
    
  def databaseContext(self):
    """
    Returns the DatabaseContext which pilots the DatabaseChannel.
    """
    return self._dbContext
  
  def fetchesRawRows(self):
    """
    """
    return self._rawRows
  
  def fetchObject(self):
    """

    - channel.fetchRow

    - build GlobalID

    Posts ObjectsChangedInStoreNotification (updated) if isRefreshingObjects

    If no more object is to be fetched, it resets the DatabaseChannel and
    returns None.

    Note that if the object fetched is already registered within the Database,
    its snapshots is not refreshed, and it is substituted to the fetched row
    (unless the fetch isRefreshingObjects)
    
    See also: cancelFetch(), selectObjectsWithFetchSpecification()
              Entity.globalIDForRow()
    """
    #import pdb ; pdb.set_trace()
    if not self.isFetchInProgress():
      raise RuntimeError, 'No fetch in progress'

    import time
    currentTimestamp=time.time() # for snapshots
    raw_row=self._adaptorChannel.fetchRow()

    if raw_row is None:
      self.cancelFetch() # No more objects
      return None

    globalID=self._currentEntity.globalIDForRow(raw_row)

    if self.fetchesRawRows():
      ec=self._editingContext
      object=ec.objectForGlobalID(globalID)
      if object is not None:
        if not object.isFault():
          return object.snapshot_raw()
        else:
          database=self.databaseContext().database()
          registeredSnapshot=database.snapshotForGlobalID(globalID)
          if registeredSnapshot:
            return registeredSnapshot
          else:
            return raw_row
      else:
        return raw_row

    # Get snapshot
    #snapshot=self._currentEntity.snapshotForRow(raw_row)
    snapshot=raw_row
    database=self.databaseContext().database()

    # Locking?
    if self.isLocking():
      self.databaseContext().registerLockedObjectWithGlobalID(globalID)


    # Now we need to lock the corresponding database object. Why?
    #
    # One could argue that e.g. since an EC forwards objectsWithFetchSpec()
    # ultimately to its root object store --an ObjectStoreCoordinator--, and
    # since OSC locks/unlocks itself before/after each operation, an other EC
    # used in a concurrent thread can access the Database object (and its
    # snapshots) in the meantime.
    #
    # This is true w.r.t. ECs standard operations, but there remains a case
    # where a concurrent thread can change the Database's snapshots: when an
    # EC is finalized and garbage-collected, it forgets its objects and
    # decrements the corresponding snapshot reference count --at this time, if
    # the ref.count falls down to zero, the snapshot is removed from the
    # Database's snapshots cache.
    #
    # Now this can happen in another thread while in this thread, the snapshot
    # has been retrieved but it has not been used yet by ec.initializeObject()
    # (forwarded to and handled by DBContext.initializeObject()). When this
    # happens, Database.incrementSnapshotCountForGlobalID() called by
    # DBContext.initializeObject() raises KeyError on the corresp. GlobalID
    # since the corresponding snapshot has been thrown away.
    #
    snapshot_refcount_incremented = 0
    database.lock()
    try:
      # Handle snapshot
      # NB: ask delegate __TBD
      if self.isRefreshingObjects() or self.isLocking():
        # Unconditional refresh
        database.recordSnapshotForGlobalID(snapshot, globalID)
        if self.isRefreshingObjects():
          # NB: isLocking-> no need to invalidate objects since two ECs cannot
          # hold the same object at the same time
          NC.postNotification(ObjectsChangedInStoreNotification, self,
                              {UpdatedKey: (globalID,)})
      else:
        # Should the snapshot be updated? if there is already one, no update.
        # __TBD  ask the DatabaseContext delegate whether we should refresh
        # __TBD  and, if yes, post ObjectsChangedInStoreNotification
        registeredSnapshot=database.snapshotForGlobalID(globalID)
        if not registeredSnapshot:
          database.recordSnapshotForGlobalID(snapshot, globalID)
        else:
          snapshot=registeredSnapshot
          
      ec=self._editingContext
      object=ec.objectForGlobalID(globalID)
      
      if object is not None and not object.isFault():
        return object
      
      if object is not None and object.isFault():
        #NC.removeObserver(object.faultHandler(), GlobalIDChangedNotification,
        #                  globalID)
        NC.removeObserver(ec, GlobalIDChangedNotification, globalID)
        #object.clearFault()
        
      #else:
      cd=ClassDescription.classDescriptionForName(self._currentEntity.name())
      # first, check that we do not have a fault registered under the ``root''
      # GlobalID
      root_cd_name=cd.rootClassDescription().entityName()
      
      if root_cd_name!=cd.entityName():
        root_globalID=KeyGlobalID(root_cd_name, globalID.keyValues())
        trace('Searching for possible fault w/ rootGlobalID=%s'%root_globalID)
        possible_faulted_object=ec.objectForGlobalID(root_globalID)
        if possible_faulted_object:
          if possible_faulted_object.isFault():
            trace('Found it: posting GlobalIDChangedNotification')
            NC.postNotification(GlobalIDChangedNotification, root_globalID,
                                {root_globalID: globalID})
            object=possible_faulted_object
            # useless: it did receive the notification, and it has unregistered
            # itself from the list of the observers
            #NC.removeObserver(object.faultHandler(),
            #                  GlobalIDChangedNotification, globalID)
            object.clearFault()
            new_class=cd.classForInstances()
            object.__class__=new_class
            object.__class__.__init__(object)
            
          else:
            raise RuntimeError, "Serious: shouldn't happen, please report"
        else:
          # create object
          object=cd.createInstanceWithEditingContext(ec)
          ec.recordObject(object, globalID)
      else:
        if not object:
          # create object
          object=cd.createInstanceWithEditingContext(ec)
          ec.recordObject(object, globalID)
      

      database.incrementSnapshotCountForGlobalID(globalID)
      snapshot_refcount_incremented = 1
    finally:
      database.unlock()
      
    # Initialize the new object or the cleared fault
    try:
      object.clearFault()
    except:
      pass

    try:
      ec.initializeObject(object, globalID, ec)
    finally:
      if snapshot_refcount_incremented:
        database.decrementSnapshotCountForGlobalID(globalID)

    return object
  
 
  def isFetchInProgress(self):
    """
    """
    return self.__isFetchInProgress
  
  def isLocking(self):
    """
    Tells whether the fetched objects should be locked or not.

    If the DatabaseChannel has no fetch in progress, it returns false.
    
    See also:
      selectObjectsWithFetchSpecification,
      FetchSpecification.locksObjects()
    """
    return self._isLocking
  
  def isRefreshingObjects(self):
    """
    Tells whether the fetched objects should be refreshed or not.

    See also:

      - selectObjectsWithFetchSpecification,

      - FetchSpecification.refreshesRefetchedObjects()

    """
    return self._isRefreshingObjects
  
  def selectCountObjectsWithFetchSpecification(self, aFetchSpecification,
                                               anEditingContext=None):
    """
    Returns the number of rows that would be retrieved if the method
    selectObjectsWithFetchSpecification() was called with the same parameter.

    Parameters:

      aFetchSpecification -- a FetchSpecification object describing the
        objects to be fetched

      anEditingContext -- optional (since objects are not really fetched)

    See also: selectObjectsWithFetchSpecification(),
              EditingContext.objectsCountWithFetchSpecification()
              DatabaseContext.objectsCountWithFetchSpecification()
              AdaptorChannel.rowCountForSelectAttributes()
    """
    if self.isFetchInProgress():
      raise RuntimeError, 'a fetch in already in progress'
    self.__isFetchInProgress=1
    # Get an adaptorChannel
    try:
      if not self._adaptorChannel.isOpen():
        self._adaptorChannel.openChannel()
      import ModelSet
      defModelSet=ModelSet.defaultModelSet()
      entity=defModelSet.entityNamed(aFetchSpecification.entityName())
      return self._adaptorChannel.rowCountForSelectAttributes(entity.attributesToFetch(), aFetchSpecification, self._isLocking, entity)
    finally:
      self.cancelFetch()
    
  def selectObjectsWithFetchSpecification(self,
                                          aFetchSpecification,
                                          anEditingContext):
    """
        
    The method follows the following steps:

      1. sets the object's properties according to aFetchSpecification's
         properties, i.e.: currentEditingContext, currentEntity, isLocking,
         isRefreshingObjects, etc.

      2. Opens its AdaptorChannel, if needed, then sends it the appropriate
         'selectAttributes' message.

    Fetched objects can be then retrieved one after the other by calling
    the method fetchObject().

    Inheritance and 'isDeep' flaf of aFetchSpecification: Note that
    aFetchSpecification's entity is the real destination entity for the
    fetched objects, not one of its superclasses.  Inheritance is handled at a
    higher level, i.e. in DatabaseContext.objectsWithFetchSpecification().
    The 'isDeep' flag is completely ignored.
    
    See also: fetchObject(), selectCountObjectsWithFetchSpecification()
              AdaptorChannel.selectAttributes()
              EditingContext.objectsWithFetchSpecification()
    """
    if self.isFetchInProgress():
      raise RuntimeError, 'a fetch in already in progress'
    self.__isFetchInProgress=1
    try:
      # Reinitializes settings
      self.setCurrentEditingContext(anEditingContext)
      import ModelSet
      defModelSet=ModelSet.defaultModelSet()
      entity=defModelSet.entityNamed(aFetchSpecification.entityName())
      self.setCurrentEntity(entity)
      self.setIsLocking(aFetchSpecification.locksObjects())
      self.setIsRefreshingObjects(aFetchSpecification.refreshesRefetchedObjects())
      self.setFetchesRawRow(aFetchSpecification.fetchesRawRows())
      # Get an adaptorChannel
      if not self._adaptorChannel.isOpen():
        self._adaptorChannel.openChannel()
      
      #self._adaptorChannel.executeExpression(sqlExpr)
      self._adaptorChannel.selectAttributes(entity.attributesToFetch(),
                                            aFetchSpecification,
                                            self._isLocking,
                                            entity)
    except:
      # If something went wrong we must cancel the fetch in progress
      self.cancelFetch()
      raise
    
  def setCurrentEditingContext(self, anEditingContext):
    """
    This is automatically called by selectObjectsWithFetchSpecification()
    """
    self._editingContext=anEditingContext
    
  def setCurrentEntity(self, anEntity):
    """
    This is automatically called by selectObjectsWithFetchSpecification(),
    according to the settings of the FetchSpecification the latter method got.
    """
    self._currentEntity=anEntity
    
  def setFetchesRawRow(self, aBool):
    """
    This is automatically called by selectObjectsWithFetchSpecification(),
    according to the settings of the FetchSpecification the latter method got.
    """
    self._rawRows=not not aBool

  def setIsLocking(self, isLocking):
    """
    This is automatically called by selectObjectsWithFetchSpecification(),
    according to the settings of the FetchSpecification the latter method got.
    """
    self._isLocking=not not isLocking

  def setIsRefreshingObjects(self, isRefreshingObjects):
    """
    This is automatically called by selectObjectsWithFetchSpecification(),
    according to the settings of the FetchSpecification the latter method got.
    """
    self._isRefreshingObjects=not not isRefreshingObjects
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.