db_test_base.py :  » Issue-Tracker » Roundup-Issue-Tracker » roundup-1.4.13 » test » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Issue Tracker » Roundup Issue Tracker 
Roundup Issue Tracker » roundup 1.4.13 » test » db_test_base.py
#
# Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
# This module is free software, and you may redistribute it and/or modify
# under the same terms as Python, so long as this copyright message and
# disclaimer are retained in their original form.
#
# IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
# OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
# BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
# $Id: db_test_base.py,v 1.101 2008-08-19 01:40:59 richard Exp $

import unittest, os, shutil, errno, imp, sys, time, pprint, base64, os.path
# Python 2.3 ... 2.6 compatibility:
from roundup.anypy.sets_ import set

from roundup.hyperdb import String,Password,Link,Multilink,Date,\
    Interval, DatabaseError, Boolean, Number, Node
from roundup.mailer import Mailer
from roundup import date,password,init,instance,configuration,support

from mocknull import MockNull

config = configuration.CoreConfig()
config.DATABASE = "db"
config.RDBMS_NAME = "rounduptest"
config.RDBMS_HOST = "localhost"
config.RDBMS_USER = "rounduptest"
config.RDBMS_PASSWORD = "rounduptest"
#config.logging = MockNull()
# these TRACKER_WEB and MAIL_DOMAIN values are used in mailgw tests
config.MAIL_DOMAIN = "your.tracker.email.domain.example"
config.TRACKER_WEB = "http://tracker.example/cgi-bin/roundup.cgi/bugs/"
# uncomment the following to have excessive debug output from test cases
# FIXME: tracker logging level should be increased by -v arguments
#   to 'run_tests.py' script
#config.LOGGING_FILENAME = "/tmp/logfile"
#config.LOGGING_LEVEL = "DEBUG"
config.init_logging()

def setupTracker(dirname, backend="anydbm"):
    """Install and initialize new tracker in dirname; return tracker instance.

    If the directory exists, it is wiped out before the operation.

    """
    global config
    try:
        shutil.rmtree(dirname)
    except OSError, error:
        if error.errno not in (errno.ENOENT, errno.ESRCH): raise
    # create the instance
    init.install(dirname, os.path.join(os.path.dirname(__file__),
                                       '..',
                                       'share',
                                       'roundup',
                                       'templates',
                                       'classic'))
    init.write_select_db(dirname, backend)
    config.save(os.path.join(dirname, 'config.ini'))
    tracker = instance.open(dirname)
    if tracker.exists():
        tracker.nuke()
        init.write_select_db(dirname, backend)
    tracker.init(password.Password('sekrit'))
    return tracker

def setupSchema(db, create, module):
    status = module.Class(db, "status", name=String())
    status.setkey("name")
    priority = module.Class(db, "priority", name=String(), order=String())
    priority.setkey("name")
    user = module.Class(db, "user", username=String(), password=Password(),
        assignable=Boolean(), age=Number(), roles=String(), address=String(),
        supervisor=Link('user'),realname=String())
    user.setkey("username")
    file = module.FileClass(db, "file", name=String(), type=String(),
        comment=String(indexme="yes"), fooz=Password())
    file_nidx = module.FileClass(db, "file_nidx", content=String(indexme='no'))
    issue = module.IssueClass(db, "issue", title=String(indexme="yes"),
        status=Link("status"), nosy=Multilink("user"), deadline=Date(),
        foo=Interval(), files=Multilink("file"), assignedto=Link('user'),
        priority=Link('priority'), spam=Multilink('msg'),
        feedback=Link('msg'))
    stuff = module.Class(db, "stuff", stuff=String())
    session = module.Class(db, 'session', title=String())
    msg = module.FileClass(db, "msg", date=Date(),
                           author=Link("user", do_journal='no'),
                           files=Multilink('file'), inreplyto=String(),
                           messageid=String(),
                           recipients=Multilink("user", do_journal='no')
                           )
    session.disableJournalling()
    db.post_init()
    if create:
        user.create(username="admin", roles='Admin',
            password=password.Password('sekrit'))
        user.create(username="fred", roles='User',
            password=password.Password('sekrit'), address='fred@example.com')
        status.create(name="unread")
        status.create(name="in-progress")
        status.create(name="testing")
        status.create(name="resolved")
        priority.create(name="feature", order="2")
        priority.create(name="wish", order="3")
        priority.create(name="bug", order="1")
    db.commit()

    # nosy tests require this
    db.security.addPermissionToRole('User', 'View', 'msg')

class MyTestCase(unittest.TestCase):
    def tearDown(self):
        if hasattr(self, 'db'):
            self.db.close()
        if os.path.exists(config.DATABASE):
            shutil.rmtree(config.DATABASE)

if os.environ.has_key('LOGGING_LEVEL'):
    from roundup import rlog
    config.logging = rlog.BasicLogging()
    config.logging.setLevel(os.environ['LOGGING_LEVEL'])
    config.logging.getLogger('hyperdb').setFormat('%(message)s')

class DBTest(MyTestCase):
    def setUp(self):
        # remove previous test, ignore errors
        if os.path.exists(config.DATABASE):
            shutil.rmtree(config.DATABASE)
        os.makedirs(config.DATABASE + '/files')
        self.open_database()
        setupSchema(self.db, 1, self.module)

    def open_database(self):
        self.db = self.module.Database(config, 'admin')

    def testRefresh(self):
        self.db.refresh_database()

    #
    # automatic properties (well, the two easy ones anyway)
    #
    def testCreatorProperty(self):
        i = self.db.issue
        id1 = i.create(title='spam')
        self.db.journaltag = 'fred'
        id2 = i.create(title='spam')
        self.assertNotEqual(id1, id2)
        self.assertNotEqual(i.get(id1, 'creator'), i.get(id2, 'creator'))

    def testActorProperty(self):
        i = self.db.issue
        id1 = i.create(title='spam')
        self.db.journaltag = 'fred'
        i.set(id1, title='asfasd')
        self.assertNotEqual(i.get(id1, 'creator'), i.get(id1, 'actor'))

    # ID number controls
    def testIDGeneration(self):
        id1 = self.db.issue.create(title="spam", status='1')
        id2 = self.db.issue.create(title="eggs", status='2')
        self.assertNotEqual(id1, id2)
    def testIDSetting(self):
        # XXX numeric ids
        self.db.setid('issue', 10)
        id2 = self.db.issue.create(title="eggs", status='2')
        self.assertEqual('11', id2)

    #
    # basic operations
    #
    def testEmptySet(self):
        id1 = self.db.issue.create(title="spam", status='1')
        self.db.issue.set(id1)

    # String
    def testStringChange(self):
        for commit in (0,1):
            # test set & retrieve
            nid = self.db.issue.create(title="spam", status='1')
            self.assertEqual(self.db.issue.get(nid, 'title'), 'spam')

            # change and make sure we retrieve the correct value
            self.db.issue.set(nid, title='eggs')
            if commit: self.db.commit()
            self.assertEqual(self.db.issue.get(nid, 'title'), 'eggs')

    def testStringUnset(self):
        for commit in (0,1):
            nid = self.db.issue.create(title="spam", status='1')
            if commit: self.db.commit()
            self.assertEqual(self.db.issue.get(nid, 'title'), 'spam')
            # make sure we can unset
            self.db.issue.set(nid, title=None)
            if commit: self.db.commit()
            self.assertEqual(self.db.issue.get(nid, "title"), None)

    # FileClass "content" property (no unset test)
    def testFileClassContentChange(self):
        for commit in (0,1):
            # test set & retrieve
            nid = self.db.file.create(content="spam")
            self.assertEqual(self.db.file.get(nid, 'content'), 'spam')

            # change and make sure we retrieve the correct value
            self.db.file.set(nid, content='eggs')
            if commit: self.db.commit()
            self.assertEqual(self.db.file.get(nid, 'content'), 'eggs')

    def testStringUnicode(self):
        # test set & retrieve
        ustr = u'\xe4\xf6\xfc\u20ac'.encode('utf8')
        nid = self.db.issue.create(title=ustr, status='1')
        self.assertEqual(self.db.issue.get(nid, 'title'), ustr)

        # change and make sure we retrieve the correct value
        ustr2 = u'change \u20ac change'.encode('utf8')
        self.db.issue.set(nid, title=ustr2)
        self.db.commit()
        self.assertEqual(self.db.issue.get(nid, 'title'), ustr2)

    # Link
    def testLinkChange(self):
        self.assertRaises(IndexError, self.db.issue.create, title="spam",
            status='100')
        for commit in (0,1):
            nid = self.db.issue.create(title="spam", status='1')
            if commit: self.db.commit()
            self.assertEqual(self.db.issue.get(nid, "status"), '1')
            self.db.issue.set(nid, status='2')
            if commit: self.db.commit()
            self.assertEqual(self.db.issue.get(nid, "status"), '2')

    def testLinkUnset(self):
        for commit in (0,1):
            nid = self.db.issue.create(title="spam", status='1')
            if commit: self.db.commit()
            self.db.issue.set(nid, status=None)
            if commit: self.db.commit()
            self.assertEqual(self.db.issue.get(nid, "status"), None)

    # Multilink
    def testMultilinkChange(self):
        for commit in (0,1):
            self.assertRaises(IndexError, self.db.issue.create, title="spam",
                nosy=['foo%s'%commit])
            u1 = self.db.user.create(username='foo%s'%commit)
            u2 = self.db.user.create(username='bar%s'%commit)
            nid = self.db.issue.create(title="spam", nosy=[u1])
            if commit: self.db.commit()
            self.assertEqual(self.db.issue.get(nid, "nosy"), [u1])
            self.db.issue.set(nid, nosy=[])
            if commit: self.db.commit()
            self.assertEqual(self.db.issue.get(nid, "nosy"), [])
            self.db.issue.set(nid, nosy=[u1,u2])
            if commit: self.db.commit()
            l = [u1,u2]; l.sort()
            m = self.db.issue.get(nid, "nosy"); m.sort()
            self.assertEqual(l, m)

            # verify that when we pass None to an Multilink it sets
            # it to an empty list
            self.db.issue.set(nid, nosy=None)
            if commit: self.db.commit()
            self.assertEqual(self.db.issue.get(nid, "nosy"), [])

    def testMultilinkChangeIterable(self):
        for commit in (0,1):
            # invalid nosy value assertion
            self.assertRaises(IndexError, self.db.issue.create, title='spam',
                nosy=['foo%s'%commit])
            # invalid type for nosy create
            self.assertRaises(TypeError, self.db.issue.create, title='spam',
                nosy=1)
            u1 = self.db.user.create(username='foo%s'%commit)
            u2 = self.db.user.create(username='bar%s'%commit)
            # try a couple of the built-in iterable types to make
            # sure that we accept them and handle them properly
            # try a set as input for the multilink
            nid = self.db.issue.create(title="spam", nosy=set(u1))
            if commit: self.db.commit()
            self.assertEqual(self.db.issue.get(nid, "nosy"), [u1])
            self.assertRaises(TypeError, self.db.issue.set, nid,
                nosy='invalid type')
            # test with a tuple
            self.db.issue.set(nid, nosy=tuple())
            if commit: self.db.commit()
            self.assertEqual(self.db.issue.get(nid, "nosy"), [])
            # make sure we accept a frozen set
            self.db.issue.set(nid, nosy=set([u1,u2]))
            if commit: self.db.commit()
            l = [u1,u2]; l.sort()
            m = self.db.issue.get(nid, "nosy"); m.sort()
            self.assertEqual(l, m)


# XXX one day, maybe...
#    def testMultilinkOrdering(self):
#        for i in range(10):
#            self.db.user.create(username='foo%s'%i)
#        i = self.db.issue.create(title="spam", nosy=['5','3','12','4'])
#        self.db.commit()
#        l = self.db.issue.get(i, "nosy")
#        # all backends should return the Multilink numeric-id-sorted
#        self.assertEqual(l, ['3', '4', '5', '12'])

    # Date
    def testDateChange(self):
        self.assertRaises(TypeError, self.db.issue.create,
            title='spam', deadline=1)
        for commit in (0,1):
            nid = self.db.issue.create(title="spam", status='1')
            self.assertRaises(TypeError, self.db.issue.set, nid, deadline=1)
            a = self.db.issue.get(nid, "deadline")
            if commit: self.db.commit()
            self.db.issue.set(nid, deadline=date.Date())
            b = self.db.issue.get(nid, "deadline")
            if commit: self.db.commit()
            self.assertNotEqual(a, b)
            self.assertNotEqual(b, date.Date('1970-1-1.00:00:00'))
            # The 1970 date will fail for metakit -- it is used
            # internally for storing NULL. The others would, too
            # because metakit tries to convert date.timestamp to an int
            # for storing and fails with an overflow.
            for d in [date.Date (x) for x in '2038', '1970', '0033', '9999']:
                self.db.issue.set(nid, deadline=d)
                if commit: self.db.commit()
                c = self.db.issue.get(nid, "deadline")
                self.assertEqual(c, d)

    def testDateLeapYear(self):
        nid = self.db.issue.create(title='spam', status='1',
            deadline=date.Date('2008-02-29'))
        self.assertEquals(str(self.db.issue.get(nid, 'deadline')),
            '2008-02-29.00:00:00')
        self.assertEquals(self.db.issue.filter(None,
            {'deadline': '2008-02-29'}), [nid])
        self.db.issue.set(nid, deadline=date.Date('2008-03-01'))
        self.assertEquals(str(self.db.issue.get(nid, 'deadline')),
            '2008-03-01.00:00:00')
        self.assertEquals(self.db.issue.filter(None,
            {'deadline': '2008-02-29'}), [])

    def testDateUnset(self):
        for commit in (0,1):
            nid = self.db.issue.create(title="spam", status='1')
            self.db.issue.set(nid, deadline=date.Date())
            if commit: self.db.commit()
            self.assertNotEqual(self.db.issue.get(nid, "deadline"), None)
            self.db.issue.set(nid, deadline=None)
            if commit: self.db.commit()
            self.assertEqual(self.db.issue.get(nid, "deadline"), None)

    # Interval
    def testIntervalChange(self):
        self.assertRaises(TypeError, self.db.issue.create,
            title='spam', foo=1)
        for commit in (0,1):
            nid = self.db.issue.create(title="spam", status='1')
            self.assertRaises(TypeError, self.db.issue.set, nid, foo=1)
            if commit: self.db.commit()
            a = self.db.issue.get(nid, "foo")
            i = date.Interval('-1d')
            self.db.issue.set(nid, foo=i)
            if commit: self.db.commit()
            self.assertNotEqual(self.db.issue.get(nid, "foo"), a)
            self.assertEqual(i, self.db.issue.get(nid, "foo"))
            j = date.Interval('1y')
            self.db.issue.set(nid, foo=j)
            if commit: self.db.commit()
            self.assertNotEqual(self.db.issue.get(nid, "foo"), i)
            self.assertEqual(j, self.db.issue.get(nid, "foo"))

    def testIntervalUnset(self):
        for commit in (0,1):
            nid = self.db.issue.create(title="spam", status='1')
            self.db.issue.set(nid, foo=date.Interval('-1d'))
            if commit: self.db.commit()
            self.assertNotEqual(self.db.issue.get(nid, "foo"), None)
            self.db.issue.set(nid, foo=None)
            if commit: self.db.commit()
            self.assertEqual(self.db.issue.get(nid, "foo"), None)

    # Boolean
    def testBooleanSet(self):
        nid = self.db.user.create(username='one', assignable=1)
        self.assertEqual(self.db.user.get(nid, "assignable"), 1)
        nid = self.db.user.create(username='two', assignable=0)
        self.assertEqual(self.db.user.get(nid, "assignable"), 0)

    def testBooleanChange(self):
        userid = self.db.user.create(username='foo', assignable=1)
        self.assertEqual(1, self.db.user.get(userid, 'assignable'))
        self.db.user.set(userid, assignable=0)
        self.assertEqual(self.db.user.get(userid, 'assignable'), 0)
        self.db.user.set(userid, assignable=1)
        self.assertEqual(self.db.user.get(userid, 'assignable'), 1)

    def testBooleanUnset(self):
        nid = self.db.user.create(username='foo', assignable=1)
        self.db.user.set(nid, assignable=None)
        self.assertEqual(self.db.user.get(nid, "assignable"), None)

    # Number
    def testNumberChange(self):
        nid = self.db.user.create(username='foo', age=1)
        self.assertEqual(1, self.db.user.get(nid, 'age'))
        self.db.user.set(nid, age=3)
        self.assertNotEqual(self.db.user.get(nid, 'age'), 1)
        self.db.user.set(nid, age=1.0)
        self.assertEqual(self.db.user.get(nid, 'age'), 1)
        self.db.user.set(nid, age=0)
        self.assertEqual(self.db.user.get(nid, 'age'), 0)

        nid = self.db.user.create(username='bar', age=0)
        self.assertEqual(self.db.user.get(nid, 'age'), 0)

    def testNumberUnset(self):
        nid = self.db.user.create(username='foo', age=1)
        self.db.user.set(nid, age=None)
        self.assertEqual(self.db.user.get(nid, "age"), None)

    # Password
    def testPasswordChange(self):
        x = password.Password('x')
        userid = self.db.user.create(username='foo', password=x)
        self.assertEqual(x, self.db.user.get(userid, 'password'))
        self.assertEqual(self.db.user.get(userid, 'password'), 'x')
        y = password.Password('y')
        self.db.user.set(userid, password=y)
        self.assertEqual(self.db.user.get(userid, 'password'), 'y')
        self.assertRaises(TypeError, self.db.user.create, userid,
            username='bar', password='x')
        self.assertRaises(TypeError, self.db.user.set, userid, password='x')

    def testPasswordUnset(self):
        x = password.Password('x')
        nid = self.db.user.create(username='foo', password=x)
        self.db.user.set(nid, assignable=None)
        self.assertEqual(self.db.user.get(nid, "assignable"), None)

    # key value
    def testKeyValue(self):
        self.assertRaises(ValueError, self.db.user.create)

        newid = self.db.user.create(username="spam")
        self.assertEqual(self.db.user.lookup('spam'), newid)
        self.db.commit()
        self.assertEqual(self.db.user.lookup('spam'), newid)
        self.db.user.retire(newid)
        self.assertRaises(KeyError, self.db.user.lookup, 'spam')

        # use the key again now that the old is retired
        newid2 = self.db.user.create(username="spam")
        self.assertNotEqual(newid, newid2)
        # try to restore old node. this shouldn't succeed!
        self.assertRaises(KeyError, self.db.user.restore, newid)

        self.assertRaises(TypeError, self.db.issue.lookup, 'fubar')

    # label property
    def testLabelProp(self):
        # key prop
        self.assertEqual(self.db.status.labelprop(), 'name')
        self.assertEqual(self.db.user.labelprop(), 'username')
        # title
        self.assertEqual(self.db.issue.labelprop(), 'title')
        # name
        self.assertEqual(self.db.file.labelprop(), 'name')
        # id
        self.assertEqual(self.db.stuff.labelprop(default_to_id=1), 'id')

    # retirement
    def testRetire(self):
        self.db.issue.create(title="spam", status='1')
        b = self.db.status.get('1', 'name')
        a = self.db.status.list()
        nodeids = self.db.status.getnodeids()
        self.db.status.retire('1')
        others = nodeids[:]
        others.remove('1')

        self.assertEqual(set(self.db.status.getnodeids()),
            set(nodeids))
        self.assertEqual(set(self.db.status.getnodeids(retired=True)),
            set(['1']))
        self.assertEqual(set(self.db.status.getnodeids(retired=False)),
            set(others))

        self.assert_(self.db.status.is_retired('1'))

        # make sure the list is different
        self.assertNotEqual(a, self.db.status.list())

        # can still access the node if necessary
        self.assertEqual(self.db.status.get('1', 'name'), b)
        self.assertRaises(IndexError, self.db.status.set, '1', name='hello')
        self.db.commit()
        self.assert_(self.db.status.is_retired('1'))
        self.assertEqual(self.db.status.get('1', 'name'), b)
        self.assertNotEqual(a, self.db.status.list())

        # try to restore retired node
        self.db.status.restore('1')

        self.assert_(not self.db.status.is_retired('1'))

    def testCacheCreateSet(self):
        self.db.issue.create(title="spam", status='1')
        a = self.db.issue.get('1', 'title')
        self.assertEqual(a, 'spam')
        self.db.issue.set('1', title='ham')
        b = self.db.issue.get('1', 'title')
        self.assertEqual(b, 'ham')

    def testSerialisation(self):
        nid = self.db.issue.create(title="spam", status='1',
            deadline=date.Date(), foo=date.Interval('-1d'))
        self.db.commit()
        assert isinstance(self.db.issue.get(nid, 'deadline'), date.Date)
        assert isinstance(self.db.issue.get(nid, 'foo'), date.Interval)
        uid = self.db.user.create(username="fozzy",
            password=password.Password('t. bear'))
        self.db.commit()
        assert isinstance(self.db.user.get(uid, 'password'), password.Password)

    def testTransactions(self):
        # remember the number of items we started
        num_issues = len(self.db.issue.list())
        num_files = self.db.numfiles()
        self.db.issue.create(title="don't commit me!", status='1')
        self.assertNotEqual(num_issues, len(self.db.issue.list()))
        self.db.rollback()
        self.assertEqual(num_issues, len(self.db.issue.list()))
        self.db.issue.create(title="please commit me!", status='1')
        self.assertNotEqual(num_issues, len(self.db.issue.list()))
        self.db.commit()
        self.assertNotEqual(num_issues, len(self.db.issue.list()))
        self.db.rollback()
        self.assertNotEqual(num_issues, len(self.db.issue.list()))
        self.db.file.create(name="test", type="text/plain", content="hi")
        self.db.rollback()
        self.assertEqual(num_files, self.db.numfiles())
        for i in range(10):
            self.db.file.create(name="test", type="text/plain",
                    content="hi %d"%(i))
            self.db.commit()
        num_files2 = self.db.numfiles()
        self.assertNotEqual(num_files, num_files2)
        self.db.file.create(name="test", type="text/plain", content="hi")
        self.db.rollback()
        self.assertNotEqual(num_files, self.db.numfiles())
        self.assertEqual(num_files2, self.db.numfiles())

        # rollback / cache interaction
        name1 = self.db.user.get('1', 'username')
        self.db.user.set('1', username = name1+name1)
        # get the prop so the info's forced into the cache (if there is one)
        self.db.user.get('1', 'username')
        self.db.rollback()
        name2 = self.db.user.get('1', 'username')
        self.assertEqual(name1, name2)

    def testDestroyBlob(self):
        # destroy an uncommitted blob
        f1 = self.db.file.create(content='hello', type="text/plain")
        self.db.commit()
        fn = self.db.filename('file', f1)
        self.db.file.destroy(f1)
        self.db.commit()
        self.assertEqual(os.path.exists(fn), False)

    def testDestroyNoJournalling(self):
        self.innerTestDestroy(klass=self.db.session)

    def testDestroyJournalling(self):
        self.innerTestDestroy(klass=self.db.issue)

    def innerTestDestroy(self, klass):
        newid = klass.create(title='Mr Friendly')
        n = len(klass.list())
        self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
        count = klass.count()
        klass.destroy(newid)
        self.assertNotEqual(count, klass.count())
        self.assertRaises(IndexError, klass.get, newid, 'title')
        self.assertNotEqual(len(klass.list()), n)
        if klass.do_journal:
            self.assertRaises(IndexError, klass.history, newid)

        # now with a commit
        newid = klass.create(title='Mr Friendly')
        n = len(klass.list())
        self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
        self.db.commit()
        count = klass.count()
        klass.destroy(newid)
        self.assertNotEqual(count, klass.count())
        self.assertRaises(IndexError, klass.get, newid, 'title')
        self.db.commit()
        self.assertRaises(IndexError, klass.get, newid, 'title')
        self.assertNotEqual(len(klass.list()), n)
        if klass.do_journal:
            self.assertRaises(IndexError, klass.history, newid)

        # now with a rollback
        newid = klass.create(title='Mr Friendly')
        n = len(klass.list())
        self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
        self.db.commit()
        count = klass.count()
        klass.destroy(newid)
        self.assertNotEqual(len(klass.list()), n)
        self.assertRaises(IndexError, klass.get, newid, 'title')
        self.db.rollback()
        self.assertEqual(count, klass.count())
        self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
        self.assertEqual(len(klass.list()), n)
        if klass.do_journal:
            self.assertNotEqual(klass.history(newid), [])

    def testExceptions(self):
        # this tests the exceptions that should be raised
        ar = self.assertRaises

        ar(KeyError, self.db.getclass, 'fubar')

        #
        # class create
        #
        # string property
        ar(TypeError, self.db.status.create, name=1)
        # id, creation, creator and activity properties are reserved
        ar(KeyError, self.db.status.create, id=1)
        ar(KeyError, self.db.status.create, creation=1)
        ar(KeyError, self.db.status.create, creator=1)
        ar(KeyError, self.db.status.create, activity=1)
        ar(KeyError, self.db.status.create, actor=1)
        # invalid property name
        ar(KeyError, self.db.status.create, foo='foo')
        # key name clash
        ar(ValueError, self.db.status.create, name='unread')
        # invalid link index
        ar(IndexError, self.db.issue.create, title='foo', status='bar')
        # invalid link value
        ar(ValueError, self.db.issue.create, title='foo', status=1)
        # invalid multilink type
        ar(TypeError, self.db.issue.create, title='foo', status='1',
            nosy='hello')
        # invalid multilink index type
        ar(ValueError, self.db.issue.create, title='foo', status='1',
            nosy=[1])
        # invalid multilink index
        ar(IndexError, self.db.issue.create, title='foo', status='1',
            nosy=['10'])

        #
        # key property
        #
        # key must be a String
        ar(TypeError, self.db.file.setkey, 'fooz')
        # key must exist
        ar(KeyError, self.db.file.setkey, 'fubar')

        #
        # class get
        #
        # invalid node id
        ar(IndexError, self.db.issue.get, '99', 'title')
        # invalid property name
        ar(KeyError, self.db.status.get, '2', 'foo')

        #
        # class set
        #
        # invalid node id
        ar(IndexError, self.db.issue.set, '99', title='foo')
        # invalid property name
        ar(KeyError, self.db.status.set, '1', foo='foo')
        # string property
        ar(TypeError, self.db.status.set, '1', name=1)
        # key name clash
        ar(ValueError, self.db.status.set, '2', name='unread')
        # set up a valid issue for me to work on
        id = self.db.issue.create(title="spam", status='1')
        # invalid link index
        ar(IndexError, self.db.issue.set, id, title='foo', status='bar')
        # invalid link value
        ar(ValueError, self.db.issue.set, id, title='foo', status=1)
        # invalid multilink type
        ar(TypeError, self.db.issue.set, id, title='foo', status='1',
            nosy='hello')
        # invalid multilink index type
        ar(ValueError, self.db.issue.set, id, title='foo', status='1',
            nosy=[1])
        # invalid multilink index
        ar(IndexError, self.db.issue.set, id, title='foo', status='1',
            nosy=['10'])
        # NOTE: the following increment the username to avoid problems
        # within metakit's backend (it creates the node, and then sets the
        # info, so the create (and by a fluke the username set) go through
        # before the age/assignable/etc. set, which raises the exception)
        # invalid number value
        ar(TypeError, self.db.user.create, username='foo', age='a')
        # invalid boolean value
        ar(TypeError, self.db.user.create, username='foo2', assignable='true')
        nid = self.db.user.create(username='foo3')
        # invalid number value
        ar(TypeError, self.db.user.set, nid, age='a')
        # invalid boolean value
        ar(TypeError, self.db.user.set, nid, assignable='true')

    def testAuditors(self):
        class test:
            called = False
            def call(self, *args): self.called = True
        create = test()

        self.db.user.audit('create', create.call)
        self.db.user.create(username="mary")
        self.assertEqual(create.called, True)

        set = test()
        self.db.user.audit('set', set.call)
        self.db.user.set('1', username="joe")
        self.assertEqual(set.called, True)

        retire = test()
        self.db.user.audit('retire', retire.call)
        self.db.user.retire('1')
        self.assertEqual(retire.called, True)

    def testAuditorTwo(self):
        class test:
            n = 0
            def a(self, *args): self.call_a = self.n; self.n += 1
            def b(self, *args): self.call_b = self.n; self.n += 1
            def c(self, *args): self.call_c = self.n; self.n += 1
        test = test()
        self.db.user.audit('create', test.b, 1)
        self.db.user.audit('create', test.a, 1)
        self.db.user.audit('create', test.c, 2)
        self.db.user.create(username="mary")
        self.assertEqual(test.call_a, 0)
        self.assertEqual(test.call_b, 1)
        self.assertEqual(test.call_c, 2)

    def testJournals(self):
        muid = self.db.user.create(username="mary")
        self.db.user.create(username="pete")
        self.db.issue.create(title="spam", status='1')
        self.db.commit()

        # journal entry for issue create
        journal = self.db.getjournal('issue', '1')
        self.assertEqual(1, len(journal))
        (nodeid, date_stamp, journaltag, action, params) = journal[0]
        self.assertEqual(nodeid, '1')
        self.assertEqual(journaltag, self.db.user.lookup('admin'))
        self.assertEqual(action, 'create')
        keys = params.keys()
        keys.sort()
        self.assertEqual(keys, [])

        # journal entry for link
        journal = self.db.getjournal('user', '1')
        self.assertEqual(1, len(journal))
        self.db.issue.set('1', assignedto='1')
        self.db.commit()
        journal = self.db.getjournal('user', '1')
        self.assertEqual(2, len(journal))
        (nodeid, date_stamp, journaltag, action, params) = journal[1]
        self.assertEqual('1', nodeid)
        self.assertEqual('1', journaltag)
        self.assertEqual('link', action)
        self.assertEqual(('issue', '1', 'assignedto'), params)

        # wait a bit to keep proper order of journal entries
        time.sleep(0.01)
        # journal entry for unlink
        self.db.setCurrentUser('mary')
        self.db.issue.set('1', assignedto='2')
        self.db.commit()
        journal = self.db.getjournal('user', '1')
        self.assertEqual(3, len(journal))
        (nodeid, date_stamp, journaltag, action, params) = journal[2]
        self.assertEqual('1', nodeid)
        self.assertEqual(muid, journaltag)
        self.assertEqual('unlink', action)
        self.assertEqual(('issue', '1', 'assignedto'), params)

        # test disabling journalling
        # ... get the last entry
        jlen = len(self.db.getjournal('user', '1'))
        self.db.issue.disableJournalling()
        self.db.issue.set('1', title='hello world')
        self.db.commit()
        # see if the change was journalled when it shouldn't have been
        self.assertEqual(jlen,  len(self.db.getjournal('user', '1')))
        jlen = len(self.db.getjournal('issue', '1'))
        self.db.issue.enableJournalling()
        self.db.issue.set('1', title='hello world 2')
        self.db.commit()
        # see if the change was journalled
        self.assertNotEqual(jlen,  len(self.db.getjournal('issue', '1')))

    def testJournalPreCommit(self):
        id = self.db.user.create(username="mary")
        self.assertEqual(len(self.db.getjournal('user', id)), 1)
        self.db.commit()

    def testPack(self):
        id = self.db.issue.create(title="spam", status='1')
        self.db.commit()
        time.sleep(1)
        self.db.issue.set(id, status='2')
        self.db.commit()

        # sleep for at least a second, then get a date to pack at
        time.sleep(1)
        pack_before = date.Date('.')

        # wait another second and add one more entry
        time.sleep(1)
        self.db.issue.set(id, status='3')
        self.db.commit()
        jlen = len(self.db.getjournal('issue', id))

        # pack
        self.db.pack(pack_before)

        # we should have the create and last set entries now
        self.assertEqual(jlen-1, len(self.db.getjournal('issue', id)))

    def testIndexerSearching(self):
        f1 = self.db.file.create(content='hello', type="text/plain")
        # content='world' has the wrong content-type and won't be indexed
        f2 = self.db.file.create(content='world', type="text/frozz",
            comment='blah blah')
        i1 = self.db.issue.create(files=[f1, f2], title="flebble plop")
        i2 = self.db.issue.create(title="flebble the frooz")
        self.db.commit()
        self.assertEquals(self.db.indexer.search([], self.db.issue), {})
        self.assertEquals(self.db.indexer.search(['hello'], self.db.issue),
            {i1: {'files': [f1]}})
        # content='world' has the wrong content-type and shouldn't be indexed
        self.assertEquals(self.db.indexer.search(['world'], self.db.issue), {})
        self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
            {i2: {}})
        self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
            {i1: {}, i2: {}})

        # test AND'ing of search terms
        self.assertEquals(self.db.indexer.search(['frooz', 'flebble'],
            self.db.issue), {i2: {}})

        # unindexed stopword
        self.assertEquals(self.db.indexer.search(['the'], self.db.issue), {})

    def testIndexerSearchingLink(self):
        m1 = self.db.msg.create(content="one two")
        i1 = self.db.issue.create(messages=[m1])
        m2 = self.db.msg.create(content="two three")
        i2 = self.db.issue.create(feedback=m2)
        self.db.commit()
        self.assertEquals(self.db.indexer.search(['two'], self.db.issue),
            {i1: {'messages': [m1]}, i2: {'feedback': [m2]}})

    def testIndexerSearchMulti(self):
        m1 = self.db.msg.create(content="one two")
        m2 = self.db.msg.create(content="two three")
        i1 = self.db.issue.create(messages=[m1])
        i2 = self.db.issue.create(spam=[m2])
        self.db.commit()
        self.assertEquals(self.db.indexer.search([], self.db.issue), {})
        self.assertEquals(self.db.indexer.search(['one'], self.db.issue),
            {i1: {'messages': [m1]}})
        self.assertEquals(self.db.indexer.search(['two'], self.db.issue),
            {i1: {'messages': [m1]}, i2: {'spam': [m2]}})
        self.assertEquals(self.db.indexer.search(['three'], self.db.issue),
            {i2: {'spam': [m2]}})

    def testReindexingChange(self):
        search = self.db.indexer.search
        issue = self.db.issue
        i1 = issue.create(title="flebble plop")
        i2 = issue.create(title="flebble frooz")
        self.db.commit()
        self.assertEquals(search(['plop'], issue), {i1: {}})
        self.assertEquals(search(['flebble'], issue), {i1: {}, i2: {}})

        # change i1's title
        issue.set(i1, title="plop")
        self.db.commit()
        self.assertEquals(search(['plop'], issue), {i1: {}})
        self.assertEquals(search(['flebble'], issue), {i2: {}})

    def testReindexingClear(self):
        search = self.db.indexer.search
        issue = self.db.issue
        i1 = issue.create(title="flebble plop")
        i2 = issue.create(title="flebble frooz")
        self.db.commit()
        self.assertEquals(search(['plop'], issue), {i1: {}})
        self.assertEquals(search(['flebble'], issue), {i1: {}, i2: {}})

        # unset i1's title
        issue.set(i1, title="")
        self.db.commit()
        self.assertEquals(search(['plop'], issue), {})
        self.assertEquals(search(['flebble'], issue), {i2: {}})

    def testFileClassReindexing(self):
        f1 = self.db.file.create(content='hello')
        f2 = self.db.file.create(content='hello, world')
        i1 = self.db.issue.create(files=[f1, f2])
        self.db.commit()
        d = self.db.indexer.search(['hello'], self.db.issue)
        self.assert_(d.has_key(i1))
        d[i1]['files'].sort()
        self.assertEquals(d, {i1: {'files': [f1, f2]}})
        self.assertEquals(self.db.indexer.search(['world'], self.db.issue),
            {i1: {'files': [f2]}})
        self.db.file.set(f1, content="world")
        self.db.commit()
        d = self.db.indexer.search(['world'], self.db.issue)
        d[i1]['files'].sort()
        self.assertEquals(d, {i1: {'files': [f1, f2]}})
        self.assertEquals(self.db.indexer.search(['hello'], self.db.issue),
            {i1: {'files': [f2]}})

    def testFileClassIndexingNoNoNo(self):
        f1 = self.db.file.create(content='hello')
        self.db.commit()
        self.assertEquals(self.db.indexer.search(['hello'], self.db.file),
            {'1': {}})

        f1 = self.db.file_nidx.create(content='hello')
        self.db.commit()
        self.assertEquals(self.db.indexer.search(['hello'], self.db.file_nidx),
            {})

    def testForcedReindexing(self):
        self.db.issue.create(title="flebble frooz")
        self.db.commit()
        self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
            {'1': {}})
        self.db.indexer.quiet = 1
        self.db.indexer.force_reindex()
        self.db.post_init()
        self.db.indexer.quiet = 9
        self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
            {'1': {}})

    def testIndexingPropertiesOnImport(self):
        # import an issue
        title = 'Bzzt'
        nodeid = self.db.issue.import_list(['title', 'messages', 'files',
            'spam', 'nosy', 'superseder'], [repr(title), '[]', '[]',
            '[]', '[]', '[]'])
        self.db.commit()

        # Content of title attribute is indexed
        self.assertEquals(self.db.indexer.search([title], self.db.issue),
            {str(nodeid):{}})


    #
    # searching tests follow
    #
    def testFindIncorrectProperty(self):
        self.assertRaises(TypeError, self.db.issue.find, title='fubar')

    def _find_test_setup(self):
        self.db.file.create(content='')
        self.db.file.create(content='')
        self.db.user.create(username='')
        one = self.db.issue.create(status="1", nosy=['1'])
        two = self.db.issue.create(status="2", nosy=['2'], files=['1'],
            assignedto='2')
        three = self.db.issue.create(status="1", nosy=['1','2'])
        four = self.db.issue.create(status="3", assignedto='1',
            files=['1','2'])
        return one, two, three, four

    def testFindLink(self):
        one, two, three, four = self._find_test_setup()
        got = self.db.issue.find(status='1')
        got.sort()
        self.assertEqual(got, [one, three])
        got = self.db.issue.find(status={'1':1})
        got.sort()
        self.assertEqual(got, [one, three])

    def testFindLinkFail(self):
        self._find_test_setup()
        self.assertEqual(self.db.issue.find(status='4'), [])
        self.assertEqual(self.db.issue.find(status={'4':1}), [])

    def testFindLinkUnset(self):
        one, two, three, four = self._find_test_setup()
        got = self.db.issue.find(assignedto=None)
        got.sort()
        self.assertEqual(got, [one, three])
        got = self.db.issue.find(assignedto={None:1})
        got.sort()
        self.assertEqual(got, [one, three])

    def testFindMultipleLink(self):
        one, two, three, four = self._find_test_setup()
        l = self.db.issue.find(status={'1':1, '3':1})
        l.sort()
        self.assertEqual(l, [one, three, four])
        l = self.db.issue.find(assignedto={None:1, '1':1})
        l.sort()
        self.assertEqual(l, [one, three, four])

    def testFindMultilink(self):
        one, two, three, four = self._find_test_setup()
        got = self.db.issue.find(nosy='2')
        got.sort()
        self.assertEqual(got, [two, three])
        got = self.db.issue.find(nosy={'2':1})
        got.sort()
        self.assertEqual(got, [two, three])
        got = self.db.issue.find(nosy={'2':1}, files={})
        got.sort()
        self.assertEqual(got, [two, three])

    def testFindMultiMultilink(self):
        one, two, three, four = self._find_test_setup()
        got = self.db.issue.find(nosy='2', files='1')
        got.sort()
        self.assertEqual(got, [two, three, four])
        got = self.db.issue.find(nosy={'2':1}, files={'1':1})
        got.sort()
        self.assertEqual(got, [two, three, four])

    def testFindMultilinkFail(self):
        self._find_test_setup()
        self.assertEqual(self.db.issue.find(nosy='3'), [])
        self.assertEqual(self.db.issue.find(nosy={'3':1}), [])

    def testFindMultilinkUnset(self):
        self._find_test_setup()
        self.assertEqual(self.db.issue.find(nosy={}), [])

    def testFindLinkAndMultilink(self):
        one, two, three, four = self._find_test_setup()
        got = self.db.issue.find(status='1', nosy='2')
        got.sort()
        self.assertEqual(got, [one, two, three])
        got = self.db.issue.find(status={'1':1}, nosy={'2':1})
        got.sort()
        self.assertEqual(got, [one, two, three])

    def testFindRetired(self):
        one, two, three, four = self._find_test_setup()
        self.assertEqual(len(self.db.issue.find(status='1')), 2)
        self.db.issue.retire(one)
        self.assertEqual(len(self.db.issue.find(status='1')), 1)

    def testStringFind(self):
        self.assertRaises(TypeError, self.db.issue.stringFind, status='1')

        ids = []
        ids.append(self.db.issue.create(title="spam"))
        self.db.issue.create(title="not spam")
        ids.append(self.db.issue.create(title="spam"))
        ids.sort()
        got = self.db.issue.stringFind(title='spam')
        got.sort()
        self.assertEqual(got, ids)
        self.assertEqual(self.db.issue.stringFind(title='fubar'), [])

        # test retiring a node
        self.db.issue.retire(ids[0])
        self.assertEqual(len(self.db.issue.stringFind(title='spam')), 1)

    def filteringSetup(self):
        for user in (
                {'username': 'bleep', 'age': 1},
                {'username': 'blop', 'age': 1.5},
                {'username': 'blorp', 'age': 2}):
            self.db.user.create(**user)
        iss = self.db.issue
        file_content = ''.join([chr(i) for i in range(255)])
        f = self.db.file.create(content=file_content)
        for issue in (
                {'title': 'issue one', 'status': '2', 'assignedto': '1',
                    'foo': date.Interval('1:10'), 'priority': '3',
                    'deadline': date.Date('2003-02-16.22:50')},
                {'title': 'issue two', 'status': '1', 'assignedto': '2',
                    'foo': date.Interval('1d'), 'priority': '3',
                    'deadline': date.Date('2003-01-01.00:00')},
                {'title': 'issue three', 'status': '1', 'priority': '2',
                    'nosy': ['1','2'], 'deadline': date.Date('2003-02-18')},
                {'title': 'non four', 'status': '3',
                    'foo': date.Interval('0:10'), 'priority': '2',
                    'nosy': ['1','2','3'], 'deadline': date.Date('2004-03-08'),
                    'files': [f]}):
            self.db.issue.create(**issue)
        self.db.commit()
        return self.assertEqual, self.db.issue.filter

    def testFilteringID(self):
        ae, filt = self.filteringSetup()
        ae(filt(None, {'id': '1'}, ('+','id'), (None,None)), ['1'])
        ae(filt(None, {'id': '2'}, ('+','id'), (None,None)), ['2'])
        ae(filt(None, {'id': '100'}, ('+','id'), (None,None)), [])

    def testFilteringNumber(self):
        self.filteringSetup()
        ae, filt = self.assertEqual, self.db.user.filter
        ae(filt(None, {'age': '1'}, ('+','id'), (None,None)), ['3'])
        ae(filt(None, {'age': '1.5'}, ('+','id'), (None,None)), ['4'])
        ae(filt(None, {'age': '2'}, ('+','id'), (None,None)), ['5'])
        ae(filt(None, {'age': ['1','2']}, ('+','id'), (None,None)), ['3','5'])

    def testFilteringString(self):
        ae, filt = self.filteringSetup()
        ae(filt(None, {'title': ['one']}, ('+','id'), (None,None)), ['1'])
        ae(filt(None, {'title': ['issue one']}, ('+','id'), (None,None)),
            ['1'])
        ae(filt(None, {'title': ['issue', 'one']}, ('+','id'), (None,None)),
            ['1'])
        ae(filt(None, {'title': ['issue']}, ('+','id'), (None,None)),
            ['1','2','3'])
        ae(filt(None, {'title': ['one', 'two']}, ('+','id'), (None,None)),
            [])

    def testFilteringLink(self):
        ae, filt = self.filteringSetup()
        ae(filt(None, {'status': '1'}, ('+','id'), (None,None)), ['2','3'])
        ae(filt(None, {'assignedto': '-1'}, ('+','id'), (None,None)), ['3','4'])
        ae(filt(None, {'assignedto': None}, ('+','id'), (None,None)), ['3','4'])
        ae(filt(None, {'assignedto': [None]}, ('+','id'), (None,None)),
            ['3','4'])
        ae(filt(None, {'assignedto': ['-1', None]}, ('+','id'), (None,None)),
            ['3','4'])
        ae(filt(None, {'assignedto': ['1', None]}, ('+','id'), (None,None)),
            ['1', '3','4'])

    def testFilteringMultilinkAndGroup(self):
        """testFilteringMultilinkAndGroup:
        See roundup Bug 1541128: apparently grouping by something and
        searching a Multilink failed with MySQL 5.0
        """
        ae, filt = self.filteringSetup()
        ae(filt(None, {'files': '1'}, ('-','activity'), ('+','status')), ['4'])

    def testFilteringRetired(self):
        ae, filt = self.filteringSetup()
        self.db.issue.retire('2')
        ae(filt(None, {'status': '1'}, ('+','id'), (None,None)), ['3'])

    def testFilteringMultilink(self):
        ae, filt = self.filteringSetup()
        ae(filt(None, {'nosy': '3'}, ('+','id'), (None,None)), ['4'])
        ae(filt(None, {'nosy': '-1'}, ('+','id'), (None,None)), ['1', '2'])
        ae(filt(None, {'nosy': ['1','2']}, ('+', 'status'),
            ('-', 'deadline')), ['4', '3'])

    def testFilteringMany(self):
        ae, filt = self.filteringSetup()
        ae(filt(None, {'nosy': '2', 'status': '1'}, ('+','id'), (None,None)),
            ['3'])

    def testFilteringRangeBasic(self):
        ae, filt = self.filteringSetup()
        ae(filt(None, {'deadline': 'from 2003-02-10 to 2003-02-23'}), ['1','3'])
        ae(filt(None, {'deadline': '2003-02-10; 2003-02-23'}), ['1','3'])
        ae(filt(None, {'deadline': '; 2003-02-16'}), ['2'])

    def testFilteringRangeTwoSyntaxes(self):
        ae, filt = self.filteringSetup()
        ae(filt(None, {'deadline': 'from 2003-02-16'}), ['1', '3', '4'])
        ae(filt(None, {'deadline': '2003-02-16;'}), ['1', '3', '4'])

    def testFilteringRangeYearMonthDay(self):
        ae, filt = self.filteringSetup()
        ae(filt(None, {'deadline': '2002'}), [])
        ae(filt(None, {'deadline': '2003'}), ['1', '2', '3'])
        ae(filt(None, {'deadline': '2004'}), ['4'])
        ae(filt(None, {'deadline': '2003-02-16'}), ['1'])
        ae(filt(None, {'deadline': '2003-02-17'}), [])

    def testFilteringRangeMonths(self):
        ae, filt = self.filteringSetup()
        for month in range(1, 13):
            for n in range(1, month+1):
                i = self.db.issue.create(title='%d.%d'%(month, n),
                    deadline=date.Date('2001-%02d-%02d.00:00'%(month, n)))
        self.db.commit()

        for month in range(1, 13):
            r = filt(None, dict(deadline='2001-%02d'%month))
            assert len(r) == month, 'month %d != length %d'%(month, len(r))

    def testFilteringRangeInterval(self):
        ae, filt = self.filteringSetup()
        ae(filt(None, {'foo': 'from 0:50 to 2:00'}), ['1'])
        ae(filt(None, {'foo': 'from 0:50 to 1d 2:00'}), ['1', '2'])
        ae(filt(None, {'foo': 'from 5:50'}), ['2'])
        ae(filt(None, {'foo': 'to 0:05'}), [])

    def testFilteringRangeGeekInterval(self):
        ae, filt = self.filteringSetup()
        for issue in (
                { 'deadline': date.Date('. -2d')},
                { 'deadline': date.Date('. -1d')},
                { 'deadline': date.Date('. -8d')},
                ):
            self.db.issue.create(**issue)
        ae(filt(None, {'deadline': '-2d;'}), ['5', '6'])
        ae(filt(None, {'deadline': '-1d;'}), ['6'])
        ae(filt(None, {'deadline': '-1w;'}), ['5', '6'])

    def testFilteringIntervalSort(self):
        # 1: '1:10'
        # 2: '1d'
        # 3: None
        # 4: '0:10'
        ae, filt = self.filteringSetup()
        # ascending should sort None, 1:10, 1d
        ae(filt(None, {}, ('+','foo'), (None,None)), ['3', '4', '1', '2'])
        # descending should sort 1d, 1:10, None
        ae(filt(None, {}, ('-','foo'), (None,None)), ['2', '1', '4', '3'])

    def testFilteringStringSort(self):
        # 1: 'issue one'
        # 2: 'issue two'
        # 3: 'issue three'
        # 4: 'non four'
        ae, filt = self.filteringSetup()
        ae(filt(None, {}, ('+','title')), ['1', '3', '2', '4'])
        ae(filt(None, {}, ('-','title')), ['4', '2', '3', '1'])
        # Test string case: For now allow both, w/wo case matching.
        # 1: 'issue one'
        # 2: 'issue two'
        # 3: 'Issue three'
        # 4: 'non four'
        self.db.issue.set('3', title='Issue three')
        ae(filt(None, {}, ('+','title')), ['1', '3', '2', '4'])
        ae(filt(None, {}, ('-','title')), ['4', '2', '3', '1'])
        # Obscure bug in anydbm backend trying to convert to number
        # 1: '1st issue'
        # 2: '2'
        # 3: 'Issue three'
        # 4: 'non four'
        self.db.issue.set('1', title='1st issue')
        self.db.issue.set('2', title='2')
        ae(filt(None, {}, ('+','title')), ['1', '2', '3', '4'])
        ae(filt(None, {}, ('-','title')), ['4', '3', '2', '1'])

    def testFilteringMultilinkSort(self):
        # 1: []                 Reverse:  1: []
        # 2: []                           2: []
        # 3: ['admin','fred']             3: ['fred','admin']
        # 4: ['admin','bleep','fred']     4: ['fred','bleep','admin']
        # Note the sort order for the multilink doen't change when
        # reversing the sort direction due to the re-sorting of the
        # multilink!
        ae, filt = self.filteringSetup()
        ae(filt(None, {}, ('+','nosy'), (None,None)), ['1', '2', '4', '3'])
        ae(filt(None, {}, ('-','nosy'), (None,None)), ['4', '3', '1', '2'])

    def testFilteringMultilinkSortGroup(self):
        # 1: status: 2 "in-progress" nosy: []
        # 2: status: 1 "unread"      nosy: []
        # 3: status: 1 "unread"      nosy: ['admin','fred']
        # 4: status: 3 "testing"     nosy: ['admin','bleep','fred']
        ae, filt = self.filteringSetup()
        ae(filt(None, {}, ('+','nosy'), ('+','status')), ['1', '4', '2', '3'])
        ae(filt(None, {}, ('-','nosy'), ('+','status')), ['1', '4', '3', '2'])
        ae(filt(None, {}, ('+','nosy'), ('-','status')), ['2', '3', '4', '1'])
        ae(filt(None, {}, ('-','nosy'), ('-','status')), ['3', '2', '4', '1'])
        ae(filt(None, {}, ('+','status'), ('+','nosy')), ['1', '2', '4', '3'])
        ae(filt(None, {}, ('-','status'), ('+','nosy')), ['2', '1', '4', '3'])
        ae(filt(None, {}, ('+','status'), ('-','nosy')), ['4', '3', '1', '2'])
        ae(filt(None, {}, ('-','status'), ('-','nosy')), ['4', '3', '2', '1'])

    def testFilteringLinkSortGroup(self):
        # 1: status: 2 -> 'i', priority: 3 -> 1
        # 2: status: 1 -> 'u', priority: 3 -> 1
        # 3: status: 1 -> 'u', priority: 2 -> 3
        # 4: status: 3 -> 't', priority: 2 -> 3
        ae, filt = self.filteringSetup()
        ae(filt(None, {}, ('+','status'), ('+','priority')),
            ['1', '2', '4', '3'])
        ae(filt(None, {'priority':'2'}, ('+','status'), ('+','priority')),
            ['4', '3'])
        ae(filt(None, {'priority.order':'3'}, ('+','status'), ('+','priority')),
            ['4', '3'])
        ae(filt(None, {'priority':['2','3']}, ('+','priority'), ('+','status')),
            ['1', '4', '2', '3'])
        ae(filt(None, {}, ('+','priority'), ('+','status')),
            ['1', '4', '2', '3'])

    def testFilteringDateSort(self):
        # '1': '2003-02-16.22:50'
        # '2': '2003-01-01.00:00'
        # '3': '2003-02-18'
        # '4': '2004-03-08'
        ae, filt = self.filteringSetup()
        # ascending
        ae(filt(None, {}, ('+','deadline'), (None,None)), ['2', '1', '3', '4'])
        # descending
        ae(filt(None, {}, ('-','deadline'), (None,None)), ['4', '3', '1', '2'])

    def testFilteringDateSortPriorityGroup(self):
        # '1': '2003-02-16.22:50'  1 => 2
        # '2': '2003-01-01.00:00'  3 => 1
        # '3': '2003-02-18'        2 => 3
        # '4': '2004-03-08'        1 => 2
        ae, filt = self.filteringSetup()

        # ascending
        ae(filt(None, {}, ('+','deadline'), ('+','priority')),
            ['2', '1', '3', '4'])
        ae(filt(None, {}, ('-','deadline'), ('+','priority')),
            ['1', '2', '4', '3'])
        # descending
        ae(filt(None, {}, ('+','deadline'), ('-','priority')),
            ['3', '4', '2', '1'])
        ae(filt(None, {}, ('-','deadline'), ('-','priority')),
            ['4', '3', '1', '2'])

    def filteringSetupTransitiveSearch(self):
        u_m = {}
        k = 30
        for user in (
                {'username': 'ceo', 'age': 129},
                {'username': 'grouplead1', 'age': 29, 'supervisor': '3'},
                {'username': 'grouplead2', 'age': 29, 'supervisor': '3'},
                {'username': 'worker1', 'age': 25, 'supervisor' : '4'},
                {'username': 'worker2', 'age': 24, 'supervisor' : '4'},
                {'username': 'worker3', 'age': 23, 'supervisor' : '5'},
                {'username': 'worker4', 'age': 22, 'supervisor' : '5'},
                {'username': 'worker5', 'age': 21, 'supervisor' : '5'}):
            u = self.db.user.create(**user)
            u_m [u] = self.db.msg.create(author = u, content = ' '
                , date = date.Date ('2006-01-%s' % k))
            k -= 1
        iss = self.db.issue
        for issue in (
                {'title': 'ts1', 'status': '2', 'assignedto': '6',
                    'priority': '3', 'messages' : [u_m ['6']], 'nosy' : ['4']},
                {'title': 'ts2', 'status': '1', 'assignedto': '6',
                    'priority': '3', 'messages' : [u_m ['6']], 'nosy' : ['5']},
                {'title': 'ts4', 'status': '2', 'assignedto': '7',
                    'priority': '3', 'messages' : [u_m ['7']]},
                {'title': 'ts5', 'status': '1', 'assignedto': '8',
                    'priority': '3', 'messages' : [u_m ['8']]},
                {'title': 'ts6', 'status': '2', 'assignedto': '9',
                    'priority': '3', 'messages' : [u_m ['9']]},
                {'title': 'ts7', 'status': '1', 'assignedto': '10',
                    'priority': '3', 'messages' : [u_m ['10']]},
                {'title': 'ts8', 'status': '2', 'assignedto': '10',
                    'priority': '3', 'messages' : [u_m ['10']]},
                {'title': 'ts9', 'status': '1', 'assignedto': '10',
                    'priority': '3', 'messages' : [u_m ['10'], u_m ['9']]}):
            self.db.issue.create(**issue)
        return self.assertEqual, self.db.issue.filter

    def testFilteringTransitiveLinkUser(self):
        ae, filt = self.filteringSetupTransitiveSearch()
        ufilt = self.db.user.filter
        ae(ufilt(None, {'supervisor.username': 'ceo'}, ('+','username')),
            ['4', '5'])
        ae(ufilt(None, {'supervisor.supervisor.username': 'ceo'},
            ('+','username')), ['6', '7', '8', '9', '10'])
        ae(ufilt(None, {'supervisor.supervisor': '3'}, ('+','username')),
            ['6', '7', '8', '9', '10'])
        ae(ufilt(None, {'supervisor.supervisor.id': '3'}, ('+','username')),
            ['6', '7', '8', '9', '10'])
        ae(ufilt(None, {'supervisor.username': 'grouplead1'}, ('+','username')),
            ['6', '7'])
        ae(ufilt(None, {'supervisor.username': 'grouplead2'}, ('+','username')),
            ['8', '9', '10'])
        ae(ufilt(None, {'supervisor.username': 'grouplead2',
            'supervisor.supervisor.username': 'ceo'}, ('+','username')),
            ['8', '9', '10'])
        ae(ufilt(None, {'supervisor.supervisor': '3', 'supervisor': '4'},
            ('+','username')), ['6', '7'])

    def testFilteringTransitiveLinkSort(self):
        ae, filt = self.filteringSetupTransitiveSearch()
        ufilt = self.db.user.filter
        # Need to make ceo his own (and first two users') supervisor,
        # otherwise we will depend on sorting order of NULL values.
        # Leave that to a separate test.
        self.db.user.set('1', supervisor = '3')
        self.db.user.set('2', supervisor = '3')
        self.db.user.set('3', supervisor = '3')
        ae(ufilt(None, {'supervisor':'3'}, []), ['1', '2', '3', '4', '5'])
        ae(ufilt(None, {}, [('+','supervisor.supervisor.supervisor'),
            ('+','supervisor.supervisor'), ('+','supervisor'),
            ('+','username')]),
            ['1', '3', '2', '4', '5', '6', '7', '8', '9', '10'])
        ae(ufilt(None, {}, [('+','supervisor.supervisor.supervisor'),
            ('-','supervisor.supervisor'), ('-','supervisor'),
            ('+','username')]),
            ['8', '9', '10', '6', '7', '1', '3', '2', '4', '5'])
        ae(filt(None, {}, [('+','assignedto.supervisor.supervisor.supervisor'),
            ('+','assignedto.supervisor.supervisor'),
            ('+','assignedto.supervisor'), ('+','assignedto')]),
            ['1', '2', '3', '4', '5', '6', '7', '8'])
        ae(filt(None, {}, [('+','assignedto.supervisor.supervisor.supervisor'),
            ('+','assignedto.supervisor.supervisor'),
            ('-','assignedto.supervisor'), ('+','assignedto')]),
            ['4', '5', '6', '7', '8', '1', '2', '3'])
        ae(filt(None, {}, [('+','assignedto.supervisor.supervisor.supervisor'),
            ('+','assignedto.supervisor.supervisor'),
            ('+','assignedto.supervisor'), ('+','assignedto'),
            ('-','status')]),
            ['2', '1', '3', '4', '5', '6', '8', '7'])
        ae(filt(None, {}, [('+','assignedto.supervisor.supervisor.supervisor'),
            ('+','assignedto.supervisor.supervisor'),
            ('+','assignedto.supervisor'), ('+','assignedto'),
            ('+','status')]),
            ['1', '2', '3', '4', '5', '7', '6', '8'])
        ae(filt(None, {}, [('+','assignedto.supervisor.supervisor.supervisor'),
            ('+','assignedto.supervisor.supervisor'),
            ('-','assignedto.supervisor'), ('+','assignedto'), ('+','status')]),
            ['4', '5', '7', '6', '8', '1', '2', '3'])
        ae(filt(None, {'assignedto':['6','7','8','9','10']},
            [('+','assignedto.supervisor.supervisor.supervisor'),
            ('+','assignedto.supervisor.supervisor'),
            ('-','assignedto.supervisor'), ('+','assignedto'), ('+','status')]),
            ['4', '5', '7', '6', '8', '1', '2', '3'])
        ae(filt(None, {'assignedto':['6','7','8','9']},
            [('+','assignedto.supervisor.supervisor.supervisor'),
            ('+','assignedto.supervisor.supervisor'),
            ('-','assignedto.supervisor'), ('+','assignedto'), ('+','status')]),
            ['4', '5', '1', '2', '3'])

    def testFilteringTransitiveLinkSortNull(self):
        """Check sorting of NULL values"""
        ae, filt = self.filteringSetupTransitiveSearch()
        ufilt = self.db.user.filter
        ae(ufilt(None, {}, [('+','supervisor.supervisor.supervisor'),
            ('+','supervisor.supervisor'), ('+','supervisor'),
            ('+','username')]),
            ['1', '3', '2', '4', '5', '6', '7', '8', '9', '10'])
        ae(ufilt(None, {}, [('+','supervisor.supervisor.supervisor'),
            ('-','supervisor.supervisor'), ('-','supervisor'),
            ('+','username')]),
            ['8', '9', '10', '6', '7', '4', '5', '1', '3', '2'])
        ae(filt(None, {}, [('+','assignedto.supervisor.supervisor.supervisor'),
            ('+','assignedto.supervisor.supervisor'),
            ('+','assignedto.supervisor'), ('+','assignedto')]),
            ['1', '2', '3', '4', '5', '6', '7', '8'])
        ae(filt(None, {}, [('+','assignedto.supervisor.supervisor.supervisor'),
            ('+','assignedto.supervisor.supervisor'),
            ('-','assignedto.supervisor'), ('+','assignedto')]),
            ['4', '5', '6', '7', '8', '1', '2', '3'])

    def testFilteringTransitiveLinkIssue(self):
        ae, filt = self.filteringSetupTransitiveSearch()
        ae(filt(None, {'assignedto.supervisor.username': 'grouplead1'},
            ('+','id')), ['1', '2', '3'])
        ae(filt(None, {'assignedto.supervisor.username': 'grouplead2'},
            ('+','id')), ['4', '5', '6', '7', '8'])
        ae(filt(None, {'assignedto.supervisor.username': 'grouplead2',
                       'status': '1'}, ('+','id')), ['4', '6', '8'])
        ae(filt(None, {'assignedto.supervisor.username': 'grouplead2',
                       'status': '2'}, ('+','id')), ['5', '7'])
        ae(filt(None, {'assignedto.supervisor.username': ['grouplead2'],
                       'status': '2'}, ('+','id')), ['5', '7'])
        ae(filt(None, {'assignedto.supervisor': ['4', '5'], 'status': '2'},
            ('+','id')), ['1', '3', '5', '7'])

    def testFilteringTransitiveMultilink(self):
        ae, filt = self.filteringSetupTransitiveSearch()
        ae(filt(None, {'messages.author.username': 'grouplead1'},
            ('+','id')), [])
        ae(filt(None, {'messages.author': '6'},
            ('+','id')), ['1', '2'])
        ae(filt(None, {'messages.author.id': '6'},
            ('+','id')), ['1', '2'])
        ae(filt(None, {'messages.author.username': 'worker1'},
            ('+','id')), ['1', '2'])
        ae(filt(None, {'messages.author': '10'},
            ('+','id')), ['6', '7', '8'])
        ae(filt(None, {'messages.author': '9'},
            ('+','id')), ['5', '8'])
        ae(filt(None, {'messages.author': ['9', '10']},
            ('+','id')), ['5', '6', '7', '8'])
        ae(filt(None, {'messages.author': ['8', '9']},
            ('+','id')), ['4', '5', '8'])
        ae(filt(None, {'messages.author': ['8', '9'], 'status' : '1'},
            ('+','id')), ['4', '8'])
        ae(filt(None, {'messages.author': ['8', '9'], 'status' : '2'},
            ('+','id')), ['5'])
        ae(filt(None, {'messages.author': ['8', '9', '10'],
            'messages.date': '2006-01-22.21:00;2006-01-23'}, ('+','id')),
            ['6', '7', '8'])
        ae(filt(None, {'nosy.supervisor.username': 'ceo'},
            ('+','id')), ['1', '2'])
        ae(filt(None, {'messages.author': ['6', '9']},
            ('+','id')), ['1', '2', '5', '8'])
        ae(filt(None, {'messages': ['5', '7']},
            ('+','id')), ['3', '5', '8'])
        ae(filt(None, {'messages.author': ['6', '9'], 'messages': ['5', '7']},
            ('+','id')), ['5', '8'])

    def testFilteringTransitiveMultilinkSort(self):
        ae, filt = self.filteringSetupTransitiveSearch()
        ae(filt(None, {}, [('+','messages.author')]),
            ['1', '2', '3', '4', '5', '8', '6', '7'])
        ae(filt(None, {}, [('-','messages.author')]),
            ['8', '6', '7', '5', '4', '3', '1', '2'])
        ae(filt(None, {}, [('+','messages.date')]),
            ['6', '7', '8', '5', '4', '3', '1', '2'])
        ae(filt(None, {}, [('-','messages.date')]),
            ['1', '2', '3', '4', '8', '5', '6', '7'])
        ae(filt(None, {}, [('+','messages.author'),('+','messages.date')]),
            ['1', '2', '3', '4', '5', '8', '6', '7'])
        ae(filt(None, {}, [('-','messages.author'),('+','messages.date')]),
            ['8', '6', '7', '5', '4', '3', '1', '2'])
        ae(filt(None, {}, [('+','messages.author'),('-','messages.date')]),
            ['1', '2', '3', '4', '5', '8', '6', '7'])
        ae(filt(None, {}, [('-','messages.author'),('-','messages.date')]),
            ['8', '6', '7', '5', '4', '3', '1', '2'])
        ae(filt(None, {}, [('+','messages.author'),('+','assignedto')]),
            ['1', '2', '3', '4', '5', '8', '6', '7'])
        ae(filt(None, {}, [('+','messages.author'),
            ('-','assignedto.supervisor'),('-','assignedto')]),
            ['1', '2', '3', '4', '5', '8', '6', '7'])
        ae(filt(None, {},
            [('+','messages.author.supervisor.supervisor.supervisor'),
            ('+','messages.author.supervisor.supervisor'),
            ('+','messages.author.supervisor'), ('+','messages.author')]),
            ['1', '2', '3', '4', '5', '6', '7', '8'])
        self.db.user.setorderprop('age')
        self.db.msg.setorderprop('date')
        ae(filt(None, {}, [('+','messages'), ('+','messages.author')]),
            ['6', '7', '8', '5', '4', '3', '1', '2'])
        ae(filt(None, {}, [('+','messages.author'), ('+','messages')]),
            ['6', '7', '8', '5', '4', '3', '1', '2'])
        self.db.msg.setorderprop('author')
        # Orderprop is a Link/Multilink:
        # messages are sorted by orderprop().labelprop(), i.e. by
        # author.username, *not* by author.orderprop() (author.age)!
        ae(filt(None, {}, [('+','messages')]),
            ['1', '2', '3', '4', '5', '8', '6', '7'])
        ae(filt(None, {}, [('+','messages.author'), ('+','messages')]),
            ['6', '7', '8', '5', '4', '3', '1', '2'])
        # The following will sort by
        # author.supervisor.username and then by
        # author.username
        # I've resited the tempation to implement recursive orderprop
        # here: There could even be loops if several classes specify a
        # Link or Multilink as the orderprop...
        # msg: 4: worker1 (id  5) : grouplead1 (id 4) ceo (id 3)
        # msg: 5: worker2 (id  7) : grouplead1 (id 4) ceo (id 3)
        # msg: 6: worker3 (id  8) : grouplead2 (id 5) ceo (id 3)
        # msg: 7: worker4 (id  9) : grouplead2 (id 5) ceo (id 3)
        # msg: 8: worker5 (id 10) : grouplead2 (id 5) ceo (id 3)
        # issue 1: messages 4   sortkey:[[grouplead1], [worker1], 1]
        # issue 2: messages 4   sortkey:[[grouplead1], [worker1], 2]
        # issue 3: messages 5   sortkey:[[grouplead1], [worker2], 3]
        # issue 4: messages 6   sortkey:[[grouplead2], [worker3], 4]
        # issue 5: messages 7   sortkey:[[grouplead2], [worker4], 5]
        # issue 6: messages 8   sortkey:[[grouplead2], [worker5], 6]
        # issue 7: messages 8   sortkey:[[grouplead2], [worker5], 7]
        # issue 8: messages 7,8 sortkey:[[grouplead2, grouplead2], ...]
        self.db.user.setorderprop('supervisor')
        ae(filt(None, {}, [('+','messages.author'), ('-','messages')]),
            ['3', '1', '2', '6', '7', '5', '4', '8'])

    def testFilteringSortId(self):
        ae, filt = self.filteringSetupTransitiveSearch()
        ae(self.db.user.filter(None, {}, ('+','id')),
            ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10'])

# XXX add sorting tests for other types

    # nuke and re-create db for restore
    def nukeAndCreate(self):
        # shut down this db and nuke it
        self.db.close()
        self.nuke_database()

        # open a new, empty database
        os.makedirs(config.DATABASE + '/files')
        self.db = self.module.Database(config, 'admin')
        setupSchema(self.db, 0, self.module)

    def testImportExport(self):
        # use the filtering setup to create a bunch of items
        ae, filt = self.filteringSetup()
        # Get some stuff into the journal for testing import/export of
        # journal data:
        self.db.user.set('4', password = password.Password('xyzzy'))
        self.db.user.set('4', age = 3)
        self.db.user.set('4', assignable = True)
        self.db.issue.set('1', title = 'i1', status = '3')
        self.db.issue.set('1', deadline = date.Date('2007'))
        self.db.issue.set('1', foo = date.Interval('1:20'))
        p = self.db.priority.create(name = 'some_prio_without_order')
        self.db.commit()
        self.db.user.set('4', password = password.Password('123xyzzy'))
        self.db.user.set('4', assignable = False)
        self.db.priority.set(p, order = '4711')
        self.db.commit()

        self.db.user.retire('3')
        self.db.issue.retire('2')

        # grab snapshot of the current database
        orig = {}
        origj = {}
        for cn,klass in self.db.classes.items():
            cl = orig[cn] = {}
            jn = origj[cn] = {}
            for id in klass.list():
                it = cl[id] = {}
                jn[id] = self.db.getjournal(cn, id)
                for name in klass.getprops().keys():
                    it[name] = klass.get(id, name)

        os.mkdir('_test_export')
        try:
            # grab the export
            export = {}
            journals = {}
            for cn,klass in self.db.classes.items():
                names = klass.export_propnames()
                cl = export[cn] = [names+['is retired']]
                for id in klass.getnodeids():
                    cl.append(klass.export_list(names, id))
                    if hasattr(klass, 'export_files'):
                        klass.export_files('_test_export', id)
                journals[cn] = klass.export_journals()

            self.nukeAndCreate()

            # import
            for cn, items in export.items():
                klass = self.db.classes[cn]
                names = items[0]
                maxid = 1
                for itemprops in items[1:]:
                    id = int(klass.import_list(names, itemprops))
                    if hasattr(klass, 'import_files'):
                        klass.import_files('_test_export', str(id))
                    maxid = max(maxid, id)
                self.db.setid(cn, str(maxid+1))
                klass.import_journals(journals[cn])
            # This is needed, otherwise journals won't be there for anydbm
            self.db.commit()
        finally:
            shutil.rmtree('_test_export')

        # compare with snapshot of the database
        for cn, items in orig.iteritems():
            klass = self.db.classes[cn]
            propdefs = klass.getprops(1)
            # ensure retired items are retired :)
            l = items.keys(); l.sort()
            m = klass.list(); m.sort()
            ae(l, m, '%s id list wrong %r vs. %r'%(cn, l, m))
            for id, props in items.items():
                for name, value in props.items():
                    l = klass.get(id, name)
                    if isinstance(value, type([])):
                        value.sort()
                        l.sort()
                    try:
                        ae(l, value)
                    except AssertionError:
                        if not isinstance(propdefs[name], Date):
                            raise
                        # don't get hung up on rounding errors
                        assert not l.__cmp__(value, int_seconds=1)
        for jc, items in origj.iteritems():
            for id, oj in items.iteritems():
                rj = self.db.getjournal(jc, id)
                # Both mysql and postgresql have some minor issues with
                # rounded seconds on export/import, so we compare only
                # the integer part.
                for j in oj:
                    j[1].second = float(int(j[1].second))
                for j in rj:
                    j[1].second = float(int(j[1].second))
                oj.sort()
                rj.sort()
                ae(oj, rj)

        # make sure the retired items are actually imported
        ae(self.db.user.get('4', 'username'), 'blop')
        ae(self.db.issue.get('2', 'title'), 'issue two')

        # make sure id counters are set correctly
        maxid = max([int(id) for id in self.db.user.list()])
        newid = self.db.user.create(username='testing')
        assert newid > maxid

    # test import/export via admin interface
    def testAdminImportExport(self):
        import roundup.admin
        import csv
        # use the filtering setup to create a bunch of items
        ae, filt = self.filteringSetup()
        # create large field
        self.db.priority.create(name = 'X' * 500)
        self.db.config.CSV_FIELD_SIZE = 400
        self.db.commit()
        output = []
        # ugly hack to get stderr output and disable stdout output
        # during regression test. Depends on roundup.admin not using
        # anything but stdout/stderr from sys (which is currently the
        # case)
        def stderrwrite(s):
            output.append(s)
        roundup.admin.sys = MockNull ()
        try:
            roundup.admin.sys.stderr.write = stderrwrite
            tool = roundup.admin.AdminTool()
            home = '.'
            tool.tracker_home = home
            tool.db = self.db
            tool.verbose = False
            tool.do_export (['_test_export'])
            self.assertEqual(len(output), 2)
            self.assertEqual(output [1], '\n')
            self.failUnless(output [0].startswith
                ('Warning: config csv_field_size should be at least'))
            self.failUnless(int(output[0].split()[-1]) > 500)

            if hasattr(roundup.admin.csv, 'field_size_limit'):
                self.nukeAndCreate()
                self.db.config.CSV_FIELD_SIZE = 400
                tool = roundup.admin.AdminTool()
                tool.tracker_home = home
                tool.db = self.db
                tool.verbose = False
                self.assertRaises(csv.Error, tool.do_import, ['_test_export'])

            self.nukeAndCreate()
            self.db.config.CSV_FIELD_SIZE = 3200
            tool = roundup.admin.AdminTool()
            tool.tracker_home = home
            tool.db = self.db
            tool.verbose = False
            tool.do_import(['_test_export'])
        finally:
            roundup.admin.sys = sys
            shutil.rmtree('_test_export')

    def testAddProperty(self):
        self.db.issue.create(title="spam", status='1')
        self.db.commit()

        self.db.issue.addprop(fixer=Link("user"))
        # force any post-init stuff to happen
        self.db.post_init()
        props = self.db.issue.getprops()
        keys = props.keys()
        keys.sort()
        self.assertEqual(keys, ['activity', 'actor', 'assignedto', 'creation',
            'creator', 'deadline', 'feedback', 'files', 'fixer', 'foo', 'id', 'messages',
            'nosy', 'priority', 'spam', 'status', 'superseder', 'title'])
        self.assertEqual(self.db.issue.get('1', "fixer"), None)

    def testRemoveProperty(self):
        self.db.issue.create(title="spam", status='1')
        self.db.commit()

        del self.db.issue.properties['title']
        self.db.post_init()
        props = self.db.issue.getprops()
        keys = props.keys()
        keys.sort()
        self.assertEqual(keys, ['activity', 'actor', 'assignedto', 'creation',
            'creator', 'deadline', 'feedback', 'files', 'foo', 'id', 'messages',
            'nosy', 'priority', 'spam', 'status', 'superseder'])
        self.assertEqual(self.db.issue.list(), ['1'])

    def testAddRemoveProperty(self):
        self.db.issue.create(title="spam", status='1')
        self.db.commit()

        self.db.issue.addprop(fixer=Link("user"))
        del self.db.issue.properties['title']
        self.db.post_init()
        props = self.db.issue.getprops()
        keys = props.keys()
        keys.sort()
        self.assertEqual(keys, ['activity', 'actor', 'assignedto', 'creation',
            'creator', 'deadline', 'feedback', 'files', 'fixer', 'foo', 'id',
            'messages', 'nosy', 'priority', 'spam', 'status', 'superseder'])
        self.assertEqual(self.db.issue.list(), ['1'])

    def testNosyMail(self) :
        """Creates one issue with two attachments, one smaller and one larger
           than the set max_attachment_size.
        """
        db = self.db
        db.config.NOSY_MAX_ATTACHMENT_SIZE = 4096
        res = dict(mail_to = None, mail_msg = None)
        def dummy_snd(s, to, msg, res=res) :
            res["mail_to"], res["mail_msg"] = to, msg
        backup, Mailer.smtp_send = Mailer.smtp_send, dummy_snd
        try :
            f1 = db.file.create(name="test1.txt", content="x" * 20)
            f2 = db.file.create(name="test2.txt", content="y" * 5000)
            m  = db.msg.create(content="one two", author="admin",
                files = [f1, f2])
            i  = db.issue.create(title='spam', files = [f1, f2],
                messages = [m], nosy = [db.user.lookup("fred")])

            db.issue.nosymessage(i, m, {})
            mail_msg = str(res["mail_msg"])
            self.assertEqual(res["mail_to"], ["fred@example.com"])
            self.assert_("From: admin" in mail_msg)
            self.assert_("Subject: [issue1] spam" in mail_msg)
            self.assert_("New submission from admin" in mail_msg)
            self.assert_("one two" in mail_msg)
            self.assert_("File 'test1.txt' not attached" not in mail_msg)
            self.assert_(base64.encodestring("xxx").rstrip() in mail_msg)
            self.assert_("File 'test2.txt' not attached" in mail_msg)
            self.assert_(base64.encodestring("yyy").rstrip() not in mail_msg)
        finally :
            Mailer.smtp_send = backup

class ROTest(MyTestCase):
    def setUp(self):
        # remove previous test, ignore errors
        if os.path.exists(config.DATABASE):
            shutil.rmtree(config.DATABASE)
        os.makedirs(config.DATABASE + '/files')
        self.db = self.module.Database(config, 'admin')
        setupSchema(self.db, 1, self.module)
        self.db.close()

        self.db = self.module.Database(config)
        setupSchema(self.db, 0, self.module)

    def testExceptions(self):
        # this tests the exceptions that should be raised
        ar = self.assertRaises

        # this tests the exceptions that should be raised
        ar(DatabaseError, self.db.status.create, name="foo")
        ar(DatabaseError, self.db.status.set, '1', name="foo")
        ar(DatabaseError, self.db.status.retire, '1')


class SchemaTest(MyTestCase):
    def setUp(self):
        # remove previous test, ignore errors
        if os.path.exists(config.DATABASE):
            shutil.rmtree(config.DATABASE)
        os.makedirs(config.DATABASE + '/files')

    def open_database(self):
        self.db = self.module.Database(config, 'admin')

    def test_reservedProperties(self):
        self.open_database()
        self.assertRaises(ValueError, self.module.Class, self.db, "a",
            creation=String())
        self.assertRaises(ValueError, self.module.Class, self.db, "a",
            activity=String())
        self.assertRaises(ValueError, self.module.Class, self.db, "a",
            creator=String())
        self.assertRaises(ValueError, self.module.Class, self.db, "a",
            actor=String())

    def init_a(self):
        self.open_database()
        a = self.module.Class(self.db, "a", name=String())
        a.setkey("name")
        self.db.post_init()

    def test_fileClassProps(self):
        self.open_database()
        a = self.module.FileClass(self.db, 'a')
        l = a.getprops().keys()
        l.sort()
        self.assert_(l, ['activity', 'actor', 'content', 'created',
            'creation', 'type'])

    def init_ab(self):
        self.open_database()
        a = self.module.Class(self.db, "a", name=String())
        a.setkey("name")
        b = self.module.Class(self.db, "b", name=String(),
            fooz=Multilink('a'))
        b.setkey("name")
        self.db.post_init()

    def test_addNewClass(self):
        self.init_a()

        self.assertRaises(ValueError, self.module.Class, self.db, "a",
            name=String())

        aid = self.db.a.create(name='apple')
        self.db.commit(); self.db.close()

        # add a new class to the schema and check creation of new items
        # (and existence of old ones)
        self.init_ab()
        bid = self.db.b.create(name='bear', fooz=[aid])
        self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
        self.db.commit()
        self.db.close()

        # now check we can recall the added class' items
        self.init_ab()
        self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
        self.assertEqual(self.db.a.lookup('apple'), aid)
        self.assertEqual(self.db.b.get(bid, 'name'), 'bear')
        self.assertEqual(self.db.b.get(bid, 'fooz'), [aid])
        self.assertEqual(self.db.b.lookup('bear'), bid)

        # confirm journal's ok
        self.db.getjournal('a', aid)
        self.db.getjournal('b', bid)

    def init_amod(self):
        self.open_database()
        a = self.module.Class(self.db, "a", name=String(), newstr=String(),
            newint=Interval(), newnum=Number(), newbool=Boolean(),
            newdate=Date())
        a.setkey("name")
        b = self.module.Class(self.db, "b", name=String())
        b.setkey("name")
        self.db.post_init()

    def test_modifyClass(self):
        self.init_ab()

        # add item to user and issue class
        aid = self.db.a.create(name='apple')
        bid = self.db.b.create(name='bear')
        self.db.commit(); self.db.close()

        # modify "a" schema
        self.init_amod()
        self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
        self.assertEqual(self.db.a.get(aid, 'newstr'), None)
        self.assertEqual(self.db.a.get(aid, 'newint'), None)
        # hack - metakit can't return None for missing values, and we're not
        # really checking for that behavior here anyway
        self.assert_(not self.db.a.get(aid, 'newnum'))
        self.assert_(not self.db.a.get(aid, 'newbool'))
        self.assertEqual(self.db.a.get(aid, 'newdate'), None)
        self.assertEqual(self.db.b.get(aid, 'name'), 'bear')
        aid2 = self.db.a.create(name='aardvark', newstr='booz')
        self.db.commit(); self.db.close()

        # test
        self.init_amod()
        self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
        self.assertEqual(self.db.a.get(aid, 'newstr'), None)
        self.assertEqual(self.db.b.get(aid, 'name'), 'bear')
        self.assertEqual(self.db.a.get(aid2, 'name'), 'aardvark')
        self.assertEqual(self.db.a.get(aid2, 'newstr'), 'booz')

        # confirm journal's ok
        self.db.getjournal('a', aid)
        self.db.getjournal('a', aid2)

    def init_amodkey(self):
        self.open_database()
        a = self.module.Class(self.db, "a", name=String(), newstr=String())
        a.setkey("newstr")
        b = self.module.Class(self.db, "b", name=String())
        b.setkey("name")
        self.db.post_init()

    def test_changeClassKey(self):
        self.init_amod()
        aid = self.db.a.create(name='apple')
        self.assertEqual(self.db.a.lookup('apple'), aid)
        self.db.commit(); self.db.close()

        # change the key to newstr on a
        self.init_amodkey()
        self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
        self.assertEqual(self.db.a.get(aid, 'newstr'), None)
        self.assertRaises(KeyError, self.db.a.lookup, 'apple')
        aid2 = self.db.a.create(name='aardvark', newstr='booz')
        self.db.commit(); self.db.close()

        # check
        self.init_amodkey()
        self.assertEqual(self.db.a.lookup('booz'), aid2)

        # confirm journal's ok
        self.db.getjournal('a', aid)

    def test_removeClassKey(self):
        self.init_amod()
        aid = self.db.a.create(name='apple')
        self.assertEqual(self.db.a.lookup('apple'), aid)
        self.db.commit(); self.db.close()

        self.db = self.module.Database(config, 'admin')
        a = self.module.Class(self.db, "a", name=String(), newstr=String())
        self.db.post_init()

        aid2 = self.db.a.create(name='apple', newstr='booz')
        self.db.commit()


    def init_amodml(self):
        self.open_database()
        a = self.module.Class(self.db, "a", name=String(),
            newml=Multilink('a'))
        a.setkey('name')
        self.db.post_init()

    def test_makeNewMultilink(self):
        self.init_a()
        aid = self.db.a.create(name='apple')
        self.assertEqual(self.db.a.lookup('apple'), aid)
        self.db.commit(); self.db.close()

        # add a multilink prop
        self.init_amodml()
        bid = self.db.a.create(name='bear', newml=[aid])
        self.assertEqual(self.db.a.find(newml=aid), [bid])
        self.assertEqual(self.db.a.lookup('apple'), aid)
        self.db.commit(); self.db.close()

        # check
        self.init_amodml()
        self.assertEqual(self.db.a.find(newml=aid), [bid])
        self.assertEqual(self.db.a.lookup('apple'), aid)
        self.assertEqual(self.db.a.lookup('bear'), bid)

        # confirm journal's ok
        self.db.getjournal('a', aid)
        self.db.getjournal('a', bid)

    def test_removeMultilink(self):
        # add a multilink prop
        self.init_amodml()
        aid = self.db.a.create(name='apple')
        bid = self.db.a.create(name='bear', newml=[aid])
        self.assertEqual(self.db.a.find(newml=aid), [bid])
        self.assertEqual(self.db.a.lookup('apple'), aid)
        self.assertEqual(self.db.a.lookup('bear'), bid)
        self.db.commit(); self.db.close()

        # remove the multilink
        self.init_a()
        self.assertEqual(self.db.a.lookup('apple'), aid)
        self.assertEqual(self.db.a.lookup('bear'), bid)

        # confirm journal's ok
        self.db.getjournal('a', aid)
        self.db.getjournal('a', bid)

    def test_removeClass(self):
        self.init_ab()
        aid = self.db.a.create(name='apple')
        bid = self.db.b.create(name='bear')
        self.db.commit(); self.db.close()

        # drop the b class
        self.init_a()
        self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
        self.assertEqual(self.db.a.lookup('apple'), aid)
        self.db.commit(); self.db.close()

        # now check we can recall the added class' items
        self.init_a()
        self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
        self.assertEqual(self.db.a.lookup('apple'), aid)

        # confirm journal's ok
        self.db.getjournal('a', aid)

class RDBMSTest:
    """ tests specific to RDBMS backends """
    def test_indexTest(self):
        self.assertEqual(self.db.sql_index_exists('_issue', '_issue_id_idx'), 1)
        self.assertEqual(self.db.sql_index_exists('_issue', '_issue_x_idx'), 0)


class ClassicInitTest(unittest.TestCase):
    count = 0
    db = None

    def setUp(self):
        ClassicInitTest.count = ClassicInitTest.count + 1
        self.dirname = '_test_init_%s'%self.count
        try:
            shutil.rmtree(self.dirname)
        except OSError, error:
            if error.errno not in (errno.ENOENT, errno.ESRCH): raise

    def testCreation(self):
        ae = self.assertEqual

        # set up and open a tracker
        tracker = setupTracker(self.dirname, self.backend)
        # open the database
        db = self.db = tracker.open('test')

        # check the basics of the schema and initial data set
        l = db.priority.list()
        l.sort()
        ae(l, ['1', '2', '3', '4', '5'])
        l = db.status.list()
        l.sort()
        ae(l, ['1', '2', '3', '4', '5', '6', '7', '8'])
        l = db.keyword.list()
        ae(l, [])
        l = db.user.list()
        l.sort()
        ae(l, ['1', '2'])
        l = db.msg.list()
        ae(l, [])
        l = db.file.list()
        ae(l, [])
        l = db.issue.list()
        ae(l, [])

    def tearDown(self):
        if self.db is not None:
            self.db.close()
        try:
            shutil.rmtree(self.dirname)
        except OSError, error:
            if error.errno not in (errno.ENOENT, errno.ESRCH): raise

# vim: set et sts=4 sw=4 :
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.