test_dynamic.py :  » Database » SQLAlchemy » SQLAlchemy-0.6.0 » test » orm » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » SQLAlchemy 
SQLAlchemy » SQLAlchemy 0.6.0 » test » orm » test_dynamic.py
from sqlalchemy.test.testing import eq_,ne_
import operator
from sqlalchemy.orm import dynamic_loader,backref
from sqlalchemy.test import testing
from sqlalchemy import Integer,String,ForeignKey,desc,select,func
from sqlalchemy.test.schema import Table,Column
from sqlalchemy.orm import mapper,relationship,create_session,Query,attributes
from sqlalchemy.orm.dynamic import AppenderMixin
from sqlalchemy.test.testing import eq_,AssertsCompiledSQL
from sqlalchemy.util import function_named
from test.orm import _base,_fixtures


class DynamicTest(_fixtures.FixtureTest, AssertsCompiledSQL):
    @testing.resolve_artifact_names
    def test_basic(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses))
        })
        sess = create_session()
        q = sess.query(User)

        u = q.filter(User.id==7).first()
        eq_([User(id=7,
                  addresses=[Address(id=1, email_address='jack@bean.com')])],
            q.filter(User.id==7).all())
        eq_(self.static.user_address_result, q.all())

    @testing.resolve_artifact_names
    def test_statement(self):
        """test that the .statement accessor returns the actual statement that
        would render, without any _clones called."""
        
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses))
        })
        sess = create_session()
        q = sess.query(User)

        u = q.filter(User.id==7).first()
        self.assert_compile(
            u.addresses.statement, 
            "SELECT addresses.id, addresses.user_id, addresses.email_address FROM "
            "addresses WHERE :param_1 = addresses.user_id",
            use_default_dialect=True
        )
        
    @testing.resolve_artifact_names
    def test_order_by(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses))
        })
        sess = create_session()
        u = sess.query(User).get(8)
        eq_(
            list(u.addresses.order_by(desc(Address.email_address))),
             [Address(email_address=u'ed@wood.com'), Address(email_address=u'ed@lala.com'), 
              Address(email_address=u'ed@bettyboop.com')]
            )

    @testing.resolve_artifact_names
    def test_configured_order_by(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses), order_by=desc(Address.email_address))
        })
        sess = create_session()
        u = sess.query(User).get(8)
        eq_(list(u.addresses), [Address(email_address=u'ed@wood.com'), Address(email_address=u'ed@lala.com'), Address(email_address=u'ed@bettyboop.com')])

        # test cancellation of None, replacement with something else
        eq_(
            list(u.addresses.order_by(None).order_by(Address.email_address)),
            [Address(email_address=u'ed@bettyboop.com'), Address(email_address=u'ed@lala.com'), Address(email_address=u'ed@wood.com')]
        )

        # test cancellation of None, replacement with nothing
        eq_(
            set(u.addresses.order_by(None)),
            set([Address(email_address=u'ed@bettyboop.com'), Address(email_address=u'ed@lala.com'), Address(email_address=u'ed@wood.com')])
        )

    @testing.resolve_artifact_names
    def test_count(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses))
        })
        sess = create_session()
        u = sess.query(User).first()
        eq_(u.addresses.count(), 1)

    @testing.resolve_artifact_names
    def test_backref(self):
        mapper(Address, addresses, properties={
            'user':relationship(User, backref=backref('addresses', lazy='dynamic'))
        })
        mapper(User, users)

        sess = create_session()
        ad = sess.query(Address).get(1)
        def go():
            ad.user = None
        self.assert_sql_count(testing.db, go, 0)
        sess.flush()
        u = sess.query(User).get(7)
        assert ad not in u.addresses

    @testing.resolve_artifact_names
    def test_no_count(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses))
        })
        sess = create_session()
        q = sess.query(User)

        # dynamic collection cannot implement __len__() (at least one that
        # returns a live database result), else additional count() queries are
        # issued when evaluating in a list context
        def go():
            eq_([User(id=7,
                      addresses=[Address(id=1,
                                         email_address='jack@bean.com')])],
                q.filter(User.id==7).all())
        self.assert_sql_count(testing.db, go, 2)

    @testing.resolve_artifact_names
    def test_m2m(self):
        mapper(Order, orders, properties={
            'items':relationship(Item, secondary=order_items, lazy="dynamic",
                             backref=backref('orders', lazy="dynamic"))
        })
        mapper(Item, items)

        sess = create_session()
        o1 = Order(id=15, description="order 10")
        i1 = Item(id=10, description="item 8")
        o1.items.append(i1)
        sess.add(o1)
        sess.flush()

        assert o1 in i1.orders.all()
        assert i1 in o1.items.all()

    @testing.resolve_artifact_names
    def test_association_nonaliased(self):
        mapper(Order, orders, properties={
            'items':relationship(Item, secondary=order_items, 
                                lazy="dynamic", 
                                order_by=order_items.c.item_id)
        })
        mapper(Item, items)

        sess = create_session()
        o = sess.query(Order).first()

        self.assert_compile(
            o.items,
            "SELECT items.id AS items_id, items.description AS items_description FROM items,"
            " order_items WHERE :param_1 = order_items.order_id AND items.id = order_items.item_id"
            " ORDER BY order_items.item_id",
            use_default_dialect=True
        )

        # filter criterion against the secondary table 
        # works
        eq_(
            o.items.filter(order_items.c.item_id==2).all(),
            [Item(id=2)]
        )
        
        
    @testing.resolve_artifact_names
    def test_transient_detached(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses))
        })
        sess = create_session()
        u1 = User()
        u1.addresses.append(Address())
        eq_(u1.addresses.count(), 1)
        eq_(u1.addresses[0], Address())

    @testing.resolve_artifact_names
    def test_custom_query(self):
        class MyQuery(Query):
            pass

        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses),
                                       query_class=MyQuery)
        })
        sess = create_session()
        u = User()
        sess.add(u)

        col = u.addresses
        assert isinstance(col, Query)
        assert isinstance(col, MyQuery)
        assert hasattr(col, 'append')
        eq_(type(col).__name__, 'AppenderMyQuery')

        q = col.limit(1)
        assert isinstance(q, Query)
        assert isinstance(q, MyQuery)
        assert not hasattr(q, 'append')
        eq_(type(q).__name__, 'MyQuery')

    @testing.resolve_artifact_names
    def test_custom_query_with_custom_mixin(self):
        class MyAppenderMixin(AppenderMixin):
            def add(self, items):
                if isinstance(items, list):
                    for item in items:
                        self.append(item)
                else:
                    self.append(items)

        class MyQuery(Query):
            pass

        class MyAppenderQuery(MyAppenderMixin, MyQuery):
            query_class = MyQuery

        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses),
                                       query_class=MyAppenderQuery)
        })
        sess = create_session()
        u = User()
        sess.add(u)

        col = u.addresses
        assert isinstance(col, Query)
        assert isinstance(col, MyQuery)
        assert hasattr(col, 'append')
        assert hasattr(col, 'add')
        eq_(type(col).__name__, 'MyAppenderQuery')

        q = col.limit(1)
        assert isinstance(q, Query)
        assert isinstance(q, MyQuery)
        assert not hasattr(q, 'append')
        assert not hasattr(q, 'add')
        eq_(type(q).__name__, 'MyQuery')


class SessionTest(_fixtures.FixtureTest):
    run_inserts = None

    @testing.resolve_artifact_names
    def test_events(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses))
        })
        sess = create_session()
        u1 = User(name='jack')
        a1 = Address(email_address='foo')
        sess.add_all([u1, a1])
        sess.flush()

        eq_(testing.db.scalar(select([func.count(1)]).where(addresses.c.user_id!=None)), 0)
        u1 = sess.query(User).get(u1.id)
        u1.addresses.append(a1)
        sess.flush()

        eq_(testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall(),
            [(a1.id, u1.id, 'foo')])

        u1.addresses.remove(a1)
        sess.flush()
        eq_(testing.db.scalar(select([func.count(1)]).where(addresses.c.user_id!=None)), 0)

        u1.addresses.append(a1)
        sess.flush()
        eq_(testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall(),
            [(a1.id, u1.id, 'foo')])

        a2 = Address(email_address='bar')
        u1.addresses.remove(a1)
        u1.addresses.append(a2)
        sess.flush()
        eq_(testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall(),
            [(a2.id, u1.id, 'bar')])


    @testing.resolve_artifact_names
    def test_merge(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses), order_by=addresses.c.email_address)
        })
        sess = create_session()
        u1 = User(name='jack')
        a1 = Address(email_address='a1')
        a2 = Address(email_address='a2')
        a3 = Address(email_address='a3')

        u1.addresses.append(a2)
        u1.addresses.append(a3)

        sess.add_all([u1, a1])
        sess.flush()

        u1 = User(id=u1.id, name='jack')
        u1.addresses.append(a1)
        u1.addresses.append(a3)
        u1 = sess.merge(u1)
        eq_(attributes.get_history(u1, 'addresses'), (
            [a1],
            [a3],
            [a2]
        ))

        sess.flush()

        eq_(
            list(u1.addresses),
            [a1, a3]
        )

    @testing.resolve_artifact_names
    def test_flush(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses))
        })
        sess = create_session()
        u1 = User(name='jack')
        u2 = User(name='ed')
        u2.addresses.append(Address(email_address='foo@bar.com'))
        u1.addresses.append(Address(email_address='lala@hoho.com'))
        sess.add_all((u1, u2))
        sess.flush()

        from sqlalchemy.orm import attributes
        eq_(attributes.get_history(u1, 'addresses'), ([], [Address(email_address='lala@hoho.com')], []))

        sess.expunge_all()

        # test the test fixture a little bit
        ne_(User(name='jack', addresses=[Address(email_address='wrong')]),
            sess.query(User).first())
        eq_(User(name='jack', addresses=[Address(email_address='lala@hoho.com')]),
            sess.query(User).first())

        eq_([
            User(name='jack', addresses=[Address(email_address='lala@hoho.com')]),
            User(name='ed', addresses=[Address(email_address='foo@bar.com')])
            ],
            sess.query(User).all())

    @testing.resolve_artifact_names
    def test_hasattr(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses))
        })
        u1 = User(name='jack')

        assert 'addresses' not in u1.__dict__.keys()
        u1.addresses = [Address(email_address='test')]
        assert 'addresses' in dir(u1)

    @testing.resolve_artifact_names
    def test_collection_set(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses), order_by=addresses.c.email_address)
        })
        sess = create_session(autoflush=True, autocommit=False)
        u1 = User(name='jack')
        a1 = Address(email_address='a1')
        a2 = Address(email_address='a2')
        a3 = Address(email_address='a3')
        a4 = Address(email_address='a4')

        sess.add(u1)
        u1.addresses = [a1, a3]
        eq_(list(u1.addresses), [a1, a3])
        u1.addresses = [a1, a2, a4]
        eq_(list(u1.addresses), [a1, a2, a4])
        u1.addresses = [a2, a3]
        eq_(list(u1.addresses), [a2, a3])
        u1.addresses = []
        eq_(list(u1.addresses), [])

    @testing.resolve_artifact_names
    def test_rollback(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses))
        })
        sess = create_session(expire_on_commit=False, autocommit=False, autoflush=True)
        u1 = User(name='jack')
        u1.addresses.append(Address(email_address='lala@hoho.com'))
        sess.add(u1)
        sess.flush()
        sess.commit()
        u1.addresses.append(Address(email_address='foo@bar.com'))
        eq_(u1.addresses.order_by(Address.id).all(), 
                 [Address(email_address='lala@hoho.com'), Address(email_address='foo@bar.com')])
        sess.rollback()
        eq_(u1.addresses.all(), [Address(email_address='lala@hoho.com')])

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_delete_nocascade(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses), order_by=Address.id,
                                       backref='user')
        })
        sess = create_session(autoflush=True)
        u = User(name='ed')
        u.addresses.append(Address(email_address='a'))
        u.addresses.append(Address(email_address='b'))
        u.addresses.append(Address(email_address='c'))
        u.addresses.append(Address(email_address='d'))
        u.addresses.append(Address(email_address='e'))
        u.addresses.append(Address(email_address='f'))
        sess.add(u)

        eq_(Address(email_address='c'), u.addresses[2])
        sess.delete(u.addresses[2])
        sess.delete(u.addresses[4])
        sess.delete(u.addresses[3])
        eq_([Address(email_address='a'), Address(email_address='b'), Address(email_address='d')],
            list(u.addresses))

        sess.expunge_all()
        u = sess.query(User).get(u.id)

        sess.delete(u)

        # u.addresses relationship will have to force the load
        # of all addresses so that they can be updated
        sess.flush()
        sess.close()

        eq_(testing.db.scalar(addresses.count(addresses.c.user_id != None)), 0)

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_delete_cascade(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses), order_by=Address.id,
                                       backref='user', cascade="all, delete-orphan")
        })
        sess = create_session(autoflush=True)
        u = User(name='ed')
        u.addresses.append(Address(email_address='a'))
        u.addresses.append(Address(email_address='b'))
        u.addresses.append(Address(email_address='c'))
        u.addresses.append(Address(email_address='d'))
        u.addresses.append(Address(email_address='e'))
        u.addresses.append(Address(email_address='f'))
        sess.add(u)

        eq_(Address(email_address='c'), u.addresses[2])
        sess.delete(u.addresses[2])
        sess.delete(u.addresses[4])
        sess.delete(u.addresses[3])
        eq_([Address(email_address='a'), Address(email_address='b'), Address(email_address='d')],
            list(u.addresses))

        sess.expunge_all()
        u = sess.query(User).get(u.id)

        sess.delete(u)

        # u.addresses relationship will have to force the load
        # of all addresses so that they can be updated
        sess.flush()
        sess.close()

        eq_(testing.db.scalar(addresses.count()), 0)

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_remove_orphans(self):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses), order_by=Address.id,
                                       cascade="all, delete-orphan", backref='user')
        })
        sess = create_session(autoflush=True)
        u = User(name='ed')
        u.addresses.append(Address(email_address='a'))
        u.addresses.append(Address(email_address='b'))
        u.addresses.append(Address(email_address='c'))
        u.addresses.append(Address(email_address='d'))
        u.addresses.append(Address(email_address='e'))
        u.addresses.append(Address(email_address='f'))
        sess.add(u)

        eq_([Address(email_address='a'), Address(email_address='b'), Address(email_address='c'),
             Address(email_address='d'), Address(email_address='e'), Address(email_address='f')],
            sess.query(Address).all())

        eq_(Address(email_address='c'), u.addresses[2])

        try:
            del u.addresses[3]
            assert False
        except TypeError, e:
            assert "doesn't support item deletion" in str(e), str(e)

        for a in u.addresses.filter(Address.email_address.in_(['c', 'e', 'f'])):
            u.addresses.remove(a)

        eq_([Address(email_address='a'), Address(email_address='b'), Address(email_address='d')],
            list(u.addresses))

        eq_([Address(email_address='a'), Address(email_address='b'), Address(email_address='d')],
            sess.query(Address).all())

        sess.delete(u)
        sess.close()

    @testing.resolve_artifact_names
    def _backref_test(self, autoflush, saveuser):
        mapper(User, users, properties={
            'addresses':dynamic_loader(mapper(Address, addresses), backref='user')
        })
        sess = create_session(autoflush=autoflush)

        u = User(name='buffy')

        a = Address(email_address='foo@bar.com')
        a.user = u

        if saveuser:
            sess.add(u)
        else:
            sess.add(a)

        if not autoflush:
            sess.flush()

        assert u in sess
        assert a in sess

        eq_(list(u.addresses), [a])

        a.user = None
        if not autoflush:
            eq_(list(u.addresses), [a])

        if not autoflush:
            sess.flush()
        eq_(list(u.addresses), [])

    def test_backref_autoflush_saveuser(self):
        self._backref_test(True, True)

    def test_backref_autoflush_savead(self):
        self._backref_test(True, False)

    def test_backref_saveuser(self):
        self._backref_test(False, True)

    def test_backref_savead(self):
        self._backref_test(False, False)

class DontDereferenceTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('users', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('name', String(40)),
              Column('fullname', String(100)),
              Column('password', String(15)))

        Table('addresses', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('email_address', String(100), nullable=False),
              Column('user_id', Integer, ForeignKey('users.id')))

    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        class User(_base.ComparableEntity):
            pass

        class Address(_base.ComparableEntity):
            pass

        mapper(User, users, properties={
            'addresses': relationship(Address, backref='user', lazy='dynamic')
            })
        mapper(Address, addresses)

    @testing.resolve_artifact_names
    def test_no_deref(self):
        session = create_session()
        user = User()
        user.name = 'joe'
        user.fullname = 'Joe User'
        user.password = 'Joe\'s secret'
        address = Address()
        address.email_address = 'joe@joesdomain.example'
        address.user = user
        session.add(user)
        session.flush()
        session.expunge_all()

        def query1():
            session = create_session(testing.db)
            user = session.query(User).first()
            return user.addresses.all()

        def query2():
            session = create_session(testing.db)
            return session.query(User).first().addresses.all()

        def query3():
            session = create_session(testing.db)
            user = session.query(User).first()
            return session.query(User).first().addresses.all()

        eq_(query1(), [Address(email_address='joe@joesdomain.example')])
        eq_(query2(), [Address(email_address='joe@joesdomain.example')])
        eq_(query3(), [Address(email_address='joe@joesdomain.example')])


www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.