testCache.py :  » Web-Frameworks » Zope » Zope-2.6.0 » lib » python » ZODB » tests » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Frameworks » Zope 
Zope » Zope 2.6.0 » lib » python » ZODB » tests » testCache.py
"""A few simple tests of the public cache API.

Each DB Connection has a separate PickleCache.  The Cache serves two
purposes. It acts like a memo for unpickling.  It also keeps recent
objects in memory under the assumption that they may be used again.
"""
from __future__ import nested_scopes

import time
import types
import unittest

import ZODB
import ZODB.MappingStorage
from ZODB.cPickleCache import PickleCache
from ZODB.POSException import ConflictError
from ZODB.PersistentMapping import PersistentMapping
from ZODB.tests.MinPO import MinPO
from ZODB.utils import p64

from Persistence import Persistent

class CacheTestBase(unittest.TestCase):

    def setUp(self):
        store = ZODB.MappingStorage.MappingStorage()
        self.db = ZODB.DB(store,
                          cache_size = self.CACHE_SIZE)
        self.conns = []

    def tearDown(self):
        for conn in self.conns:
            conn.close()
        self.db.close()

    CACHE_SIZE = 20

    def noodle_new_connection(self):
        """Do some reads and writes on a new connection."""

        c = self.db.open()
        self.conns.append(c)
        self.noodle_connection(c)

    def noodle_connection(self, c):
        r = c.root()

        i = len(self.conns)
        d = r.get(i)
        if d is None:
            d = r[i] = PersistentMapping()
            get_transaction().commit()

        for i in range(15):
            o = d.get(i)
            if o is None:
                o = d[i] = MinPO(i)
            o.value += 1
        get_transaction().commit()

class DBMethods(CacheTestBase):

    __super_setUp = CacheTestBase.setUp

    def setUp(self):
        self.__super_setUp()
        for i in range(4):
            self.noodle_new_connection()

    def checkCacheDetail(self):
        for name, count in self.db.cacheDetail():
            self.assert_(isinstance(name, types.StringType))
            self.assert_(isinstance(count, types.IntType))

    def checkCacheExtremeDetail(self):
        expected = ['conn_no', 'id', 'oid', 'rc', 'klass', 'state']
        for dict in self.db.cacheExtremeDetail():
            for k, v in dict.items():
                self.assert_(k in expected)

    # XXX not really sure how to do a black box test of the cache.
    # should the full sweep and minimize calls always remove things?

    # The sleep(3) call is based on the implementation of the cache.
    # It's measures time in units of three seconds, so something used
    # within the last three seconds looks like something that is
    # currently being used.  Three seconds old is the youngest
    # something can be and still be collected.

    def checkFullSweep(self):
        old_size = self.db.cacheSize()
        time.sleep(3)
        self.db.cacheFullSweep(0)
        new_size = self.db.cacheSize()
        self.assert_(new_size < old_size, "%s < %s" % (old_size, new_size))

    def checkMinimize(self):
        old_size = self.db.cacheSize()
        time.sleep(3)
        self.db.cacheMinimize(0)
        new_size = self.db.cacheSize()
        self.assert_(new_size < old_size, "%s < %s" % (old_size, new_size))

    # XXX don't have an explicit test for incrgc, because the
    # connection and database call it internally

    # XXX same for the get and invalidate methods

    def checkLRUitems(self):
        # get a cache
        c = self.conns[0]._cache
        c.lru_items()

    def checkClassItems(self):
        c = self.conns[0]._cache
        c.klass_items()

class LRUCacheTests(CacheTestBase):

    def checkLRU(self):
        # verify the LRU behavior of the cache
        dataset_size = 5
        CACHE_SIZE = dataset_size*2+1
        # a cache big enough to hold the objects added in two
        # transactions, plus the root object
        self.db.setCacheSize(CACHE_SIZE)
        c = self.db.open()
        r = c.root()
        l = {}
        # the root is the only thing in the cache, because all the
        # other objects are new
        self.assertEqual(len(c._cache), 1)
        # run several transactions
        for t in range(5):
            for i in range(dataset_size):
                l[(t,i)] = r[i] = MinPO(i)
            get_transaction().commit()
            # commit() will register the objects, placing them in the
            # cache.  at the end of commit, the cache will be reduced
            # down to CACHE_SIZE items
            if len(l)>CACHE_SIZE:
                self.assertEqual(c._cache.ringlen(), CACHE_SIZE)
        for i in range(dataset_size):
            # Check objects added in the first two transactions.
            # They must all be ghostified.
            self.assertEqual(l[(0,i)]._p_changed, None)
            self.assertEqual(l[(1,i)]._p_changed, None)
            # Check objects added in the last two transactions.
            # They must all still exist in memory, but have
            # had their changes flushed
            self.assertEqual(l[(3,i)]._p_changed, 0)
            self.assertEqual(l[(4,i)]._p_changed, 0)
            # Of the objects added in the middle transaction, most
            # will have been ghostified. There is one cache slot
            # that may be occupied by either one of those objects or
            # the root, depending on precise order of access. We do
            # not bother to check this

    def checkSize(self):
        self.assertEqual(self.db.cacheSize(), 0)
        self.assertEqual(self.db.cacheDetailSize(), [])

        CACHE_SIZE = 10
        self.db.setCacheSize(CACHE_SIZE)

        CONNS = 3
        for i in range(CONNS):
            self.noodle_new_connection()

        self.assertEquals(self.db.cacheSize(), CACHE_SIZE * CONNS)
        details = self.db.cacheDetailSize()
        self.assertEquals(len(details), CONNS)
        for d in details:
            self.assertEquals(d['ngsize'], CACHE_SIZE)
            # the root is also in the cache as ghost, because
            # the connection holds a reference to it
            self.assertEquals(d['size'], CACHE_SIZE + 1)

    def checkDetail(self):
        CACHE_SIZE = 10
        self.db.setCacheSize(CACHE_SIZE)

        CONNS = 3
        for i in range(CONNS):
            self.noodle_new_connection()

        for klass, count in self.db.cacheDetail():
            if klass.endswith('MinPO'):
                self.assertEqual(count, CONNS * CACHE_SIZE)
            if klass.endswith('PersistentMapping'):
                # one root per connection
                self.assertEqual(count, CONNS)

        for details in self.db.cacheExtremeDetail():
            # one dict per object.  keys:
            if details['klass'].endswith('PersistentMapping'):
                self.assertEqual(details['state'], None)
            else:
                self.assert_(details['klass'].endswith('MinPO'))
                self.assertEqual(details['state'], 0)

class StubDataManager:
    def setklassstate(self, object):
        pass

class StubObject(Persistent):
    pass

class CacheErrors(unittest.TestCase):

    def setUp(self):
        self.jar = StubDataManager()
        self.cache = PickleCache(self.jar)

    def checkGetBogusKey(self):
        self.assertRaises(KeyError, self.cache.get, p64(0))
        try:
            self.cache[12]
        except KeyError:
            pass
        else:
            self.fail("expected KeyError")
        try:
            self.cache[12] = 12
        except TypeError:
            pass
        else:
            self.fail("expected TyepError")
        try:
            del self.cache[12]
        except TypeError:
            pass
        else:
            self.fail("expected TypeError")

    def checkBogusObject(self):
        def add(key, obj):
            self.cache[key] = obj

        key = p64(2)
        # value isn't persistent
        self.assertRaises(TypeError, add, key, 12)

        o = StubObject()
        # o._p_oid == None
        self.assertRaises(TypeError, add, key, o)

        o._p_oid = p64(3)
        self.assertRaises(ValueError, add, key, o)

        o._p_oid = key
        # o._p_jar == None
        self.assertRaises(Exception, add, key, o)

        o._p_jar = self.jar
        self.cache[key] = o
        # make sure it can be added multiple times
        self.cache[key] = o

        # same object, different keys
        self.assertRaises(ValueError, add, p64(0), o)

    def checkTwoCaches(self):
        jar2 = StubDataManager()
        cache2 = PickleCache(jar2)

        o = StubObject()
        key = o._p_oid = p64(1)
        o._p_jar = jar2

        cache2[key] = o

        try:
            self.cache[key] = o
        except ValueError:
            pass
        else:
            self.fail("expected ValueError because object already in cache")

    def checkReadOnlyAttrsWhenCached(self):
        o = StubObject()
        key = o._p_oid = p64(1)
        o._p_jar = self.jar
        self.cache[key] = o
        try:
            o._p_oid = p64(2)
        except ValueError:
            pass
        else:
            self.fail("expect that you can't change oid of cached object")
        try:
            del o._p_jar
        except ValueError:
            pass
        else:
            self.fail("expect that you can't delete jar of cached object")

def test_suite():
    s = unittest.makeSuite(DBMethods, 'check')
    s.addTest(unittest.makeSuite(LRUCacheTests, 'check'))
    s.addTest(unittest.makeSuite(CacheErrors, 'check'))
    return s
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.