test_eager_relations.py :  » Database » SQLAlchemy » SQLAlchemy-0.6.0 » test » orm » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » SQLAlchemy 
SQLAlchemy » SQLAlchemy 0.6.0 » test » orm » test_eager_relations.py
"""basic tests of eager loaded attributes"""

from sqlalchemy.test.testing import eq_,is_,is_not_
import sqlalchemy as sa
from sqlalchemy.test import testing
from sqlalchemy.orm import joinedload,deferred,undefer,joinedload_all,backref
from sqlalchemy import Integer,String,Date,ForeignKey,and_,select,func
from sqlalchemy.test.schema import Table,Column
from sqlalchemy.orm import mapper,relationship,create_session,lazyload,aliased
from sqlalchemy.test.testing import eq_,assert_raises
from sqlalchemy.test.assertsql import CompiledSQL
from test.orm import _base,_fixtures
from sqlalchemy.util import OrderedDict
import datetime

class EagerTest(_fixtures.FixtureTest, testing.AssertsCompiledSQL):
    run_inserts = 'once'
    run_deletes = None

    @testing.resolve_artifact_names
    def test_basic(self):
        mapper(User, users, properties={
            'addresses':relationship(mapper(Address, addresses), lazy='joined', order_by=Address.id)
        })
        sess = create_session()
        q = sess.query(User)

        eq_([User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])],
            q.filter(User.id==7).all())
        eq_(self.static.user_address_result, q.order_by(User.id).all())

    @testing.resolve_artifact_names
    def test_late_compile(self):
        m = mapper(User, users)
        sess = create_session()
        sess.query(User).all()
        m.add_property("addresses", relationship(mapper(Address, addresses)))
        
        sess.expunge_all()
        def go():
            eq_(
               [User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])],
               sess.query(User).options(joinedload('addresses')).filter(User.id==7).all()
            )
        self.assert_sql_count(testing.db, go, 1)
            
        
    @testing.resolve_artifact_names
    def test_no_orphan(self):
        """An eagerly loaded child object is not marked as an orphan"""
        
        mapper(User, users, properties={
            'addresses':relationship(Address, cascade="all,delete-orphan", lazy='joined')
        })
        mapper(Address, addresses)

        sess = create_session()
        user = sess.query(User).get(7)
        assert getattr(User, 'addresses').\
                    hasparent(sa.orm.attributes.instance_state(user.addresses[0]), optimistic=True)
        assert not sa.orm.class_mapper(Address).\
                    _is_orphan(sa.orm.attributes.instance_state(user.addresses[0]))

    @testing.resolve_artifact_names
    def test_orderby(self):
        mapper(User, users, properties = {
            'addresses':relationship(mapper(Address, addresses), 
                        lazy='joined', order_by=addresses.c.email_address),
        })
        q = create_session().query(User)
        eq_([
            User(id=7, addresses=[
                Address(id=1)
            ]),
            User(id=8, addresses=[
                Address(id=3, email_address='ed@bettyboop.com'),
                Address(id=4, email_address='ed@lala.com'),
                Address(id=2, email_address='ed@wood.com')
            ]),
            User(id=9, addresses=[
                Address(id=5)
            ]),
            User(id=10, addresses=[])
        ], q.order_by(User.id).all())

    @testing.resolve_artifact_names
    def test_orderby_multi(self):
        mapper(User, users, properties = {
            'addresses':relationship(mapper(Address, addresses), 
                            lazy='joined', 
                            order_by=[addresses.c.email_address, addresses.c.id]),
        })
        q = create_session().query(User)
        eq_([
            User(id=7, addresses=[
                Address(id=1)
            ]),
            User(id=8, addresses=[
                Address(id=3, email_address='ed@bettyboop.com'),
                Address(id=4, email_address='ed@lala.com'),
                Address(id=2, email_address='ed@wood.com')
            ]),
            User(id=9, addresses=[
                Address(id=5)
            ]),
            User(id=10, addresses=[])
        ], q.order_by(User.id).all())

    @testing.resolve_artifact_names
    def test_orderby_related(self):
        """A regular mapper select on a single table can 
            order by a relationship to a second table"""
            
        mapper(Address, addresses)
        mapper(User, users, properties = dict(
            addresses = relationship(Address, lazy='joined', order_by=addresses.c.id),
        ))

        q = create_session().query(User)
        l = q.filter(User.id==Address.user_id).order_by(Address.email_address).all()

        eq_([
            User(id=8, addresses=[
                Address(id=2, email_address='ed@wood.com'),
                Address(id=3, email_address='ed@bettyboop.com'),
                Address(id=4, email_address='ed@lala.com'),
            ]),
            User(id=9, addresses=[
                Address(id=5)
            ]),
            User(id=7, addresses=[
                Address(id=1)
            ]),
        ], l)

    @testing.resolve_artifact_names
    def test_orderby_desc(self):
        mapper(Address, addresses)
        mapper(User, users, properties = dict(
            addresses = relationship(Address, lazy='joined',
                                 order_by=[sa.desc(addresses.c.email_address)]),
        ))
        sess = create_session()
        eq_([
            User(id=7, addresses=[
                Address(id=1)
            ]),
            User(id=8, addresses=[
                Address(id=2, email_address='ed@wood.com'),
                Address(id=4, email_address='ed@lala.com'),
                Address(id=3, email_address='ed@bettyboop.com'),
            ]),
            User(id=9, addresses=[
                Address(id=5)
            ]),
            User(id=10, addresses=[])
        ], sess.query(User).order_by(User.id).all())

    @testing.resolve_artifact_names
    def test_deferred_fk_col(self):
        mapper(Address, addresses, properties={
            'user_id':deferred(addresses.c.user_id),
            'user':relationship(User, lazy='joined')
        })
        mapper(User, users)

        sess = create_session()

        for q in [
            sess.query(Address).filter(Address.id.in_([1, 4, 5])),
            sess.query(Address).filter(Address.id.in_([1, 4, 5])).limit(3)
        ]:
            sess.expunge_all()
            eq_(q.all(),
                [Address(id=1, user=User(id=7)),
                 Address(id=4, user=User(id=8)),
                 Address(id=5, user=User(id=9))]
            )

        sess.expunge_all()
        a = sess.query(Address).filter(Address.id==1).all()[0]
        def go():
            eq_(a.user_id, 7)
        # assert that the eager loader added 'user_id' to the row and deferred
        # loading of that col was disabled
        self.assert_sql_count(testing.db, go, 0)

        sess.expunge_all()
        a = sess.query(Address).filter(Address.id==1).first()
        def go():
            eq_(a.user_id, 7)
        # assert that the eager loader added 'user_id' to the row and deferred
        # loading of that col was disabled
        self.assert_sql_count(testing.db, go, 0)

        # do the mapping in reverse
        # (we would have just used an "addresses" backref but the test
        # fixtures then require the whole backref to be set up, lazy loaders
        # trigger, etc.)
        sa.orm.clear_mappers()

        mapper(Address, addresses, properties={
            'user_id':deferred(addresses.c.user_id),
        })
        mapper(User, users, properties={
            'addresses':relationship(Address, lazy='joined')})

        for q in [
            sess.query(User).filter(User.id==7),
            sess.query(User).filter(User.id==7).limit(1)
        ]:
            sess.expunge_all()
            eq_(q.all(),
                [User(id=7, addresses=[Address(id=1)])]
            )

        sess.expunge_all()
        u = sess.query(User).get(7)
        def go():
            eq_(u.addresses[0].user_id, 7)
        # assert that the eager loader didn't have to affect 'user_id' here
        # and that its still deferred
        self.assert_sql_count(testing.db, go, 1)

        sa.orm.clear_mappers()

        mapper(User, users, properties={
            'addresses':relationship(Address, lazy='joined')})
        mapper(Address, addresses, properties={
            'user_id':deferred(addresses.c.user_id),
            'dingalings':relationship(Dingaling, lazy='joined')})
        mapper(Dingaling, dingalings, properties={
            'address_id':deferred(dingalings.c.address_id)})
        sess.expunge_all()
        def go():
            u = sess.query(User).get(8)
            eq_(User(id=8,
                     addresses=[Address(id=2, dingalings=[Dingaling(id=1)]),
                                Address(id=3),
                                Address(id=4)]),
                u)
        self.assert_sql_count(testing.db, go, 1)

    @testing.resolve_artifact_names
    def test_options_pathing(self):
        mapper(User, users, properties={
            'orders':relationship(Order, order_by=orders.c.id), # o2m, m2o
        })
        mapper(Order, orders, properties={
            'items':relationship(Item, 
                        secondary=order_items, order_by=items.c.id),  #m2m
        })
        mapper(Item, items, properties={
            'keywords':relationship(Keyword, 
                                        secondary=item_keywords,
                                        order_by=keywords.c.id) #m2m
        })
        mapper(Keyword, keywords)

        for opt, count in [
            ((
                joinedload(User.orders, Order.items), 
            ), 10),
            ((joinedload("orders.items"), ), 10),
            ((
                joinedload(User.orders, ), 
                joinedload(User.orders, Order.items), 
                joinedload(User.orders, Order.items, Item.keywords), 
            ), 1),
            ((
                joinedload(User.orders, Order.items, Item.keywords), 
            ), 10),
            ((
                joinedload(User.orders, Order.items), 
                joinedload(User.orders, Order.items, Item.keywords), 
            ), 5),
        ]:
            sess = create_session()
            def go():
                eq_(
                    sess.query(User).options(*opt).order_by(User.id).all(),
                    self.static.user_item_keyword_result
                )
            self.assert_sql_count(testing.db, go, count)



    @testing.resolve_artifact_names
    def test_many_to_many(self):

        mapper(Keyword, keywords)
        mapper(Item, items, properties = dict(
                keywords = relationship(Keyword, secondary=item_keywords,
                                    lazy='joined', order_by=keywords.c.id)))

        q = create_session().query(Item).order_by(Item.id)
        def go():
            eq_(self.static.item_keyword_result, q.all())
        self.assert_sql_count(testing.db, go, 1)

        def go():
            eq_(self.static.item_keyword_result[0:2],
                q.join('keywords').filter(Keyword.name == 'red').all())
        self.assert_sql_count(testing.db, go, 1)

        def go():
            eq_(self.static.item_keyword_result[0:2],
                (q.join('keywords', aliased=True).
                 filter(Keyword.name == 'red')).all())
        self.assert_sql_count(testing.db, go, 1)

    @testing.resolve_artifact_names
    def test_eager_option(self):
        mapper(Keyword, keywords)
        mapper(Item, items, properties = dict(
                keywords = relationship(Keyword, secondary=item_keywords, lazy='select',
                                    order_by=keywords.c.id)))

        q = create_session().query(Item)

        def go():
            eq_(self.static.item_keyword_result[0:2],
                (q.options(joinedload('keywords')).
                 join('keywords').filter(keywords.c.name == 'red')).order_by(Item.id).all())

        self.assert_sql_count(testing.db, go, 1)

    @testing.resolve_artifact_names
    def test_cyclical(self):
        """A circular eager relationship breaks the cycle with a lazy loader"""

        mapper(Address, addresses)
        mapper(User, users, properties = dict(
            addresses = relationship(Address, lazy='joined',
                                 backref=sa.orm.backref('user', lazy='joined'),
                                            order_by=Address.id)
        ))
        eq_(sa.orm.class_mapper(User).get_property('addresses').lazy, 'joined')
        eq_(sa.orm.class_mapper(Address).get_property('user').lazy, 'joined')

        sess = create_session()
        eq_(self.static.user_address_result, sess.query(User).order_by(User.id).all())

    @testing.resolve_artifact_names
    def test_double(self):
        """Eager loading with two relationships simultaneously, 
            from the same table, using aliases."""

        openorders = sa.alias(orders, 'openorders')
        closedorders = sa.alias(orders, 'closedorders')

        mapper(Address, addresses)
        mapper(Order, orders)
        
        open_mapper = mapper(Order, openorders, non_primary=True)
        closed_mapper = mapper(Order, closedorders, non_primary=True)
        
        mapper(User, users, properties = dict(
            addresses = relationship(Address, lazy='joined', order_by=addresses.c.id),
            open_orders = relationship(
                open_mapper,
                primaryjoin=sa.and_(openorders.c.isopen == 1,
                                 users.c.id==openorders.c.user_id),
                lazy='joined', order_by=openorders.c.id),
            closed_orders = relationship(
                closed_mapper,
                primaryjoin=sa.and_(closedorders.c.isopen == 0,
                                 users.c.id==closedorders.c.user_id),
                lazy='joined', order_by=closedorders.c.id)))

        q = create_session().query(User).order_by(User.id)

        def go():
            eq_([
                User(
                    id=7,
                    addresses=[Address(id=1)],
                    open_orders = [Order(id=3)],
                    closed_orders = [Order(id=1), Order(id=5)]
                ),
                User(
                    id=8,
                    addresses=[Address(id=2), Address(id=3), Address(id=4)],
                    open_orders = [],
                    closed_orders = []
                ),
                User(
                    id=9,
                    addresses=[Address(id=5)],
                    open_orders = [Order(id=4)],
                    closed_orders = [Order(id=2)]
                ),
                User(id=10)

            ], q.all())
        self.assert_sql_count(testing.db, go, 1)

    @testing.resolve_artifact_names
    def test_double_same_mappers(self):
        """Eager loading with two relationships simulatneously, 
        from the same table, using aliases."""

        mapper(Address, addresses)
        mapper(Order, orders, properties={
            'items': relationship(Item, secondary=order_items, lazy='joined',
                              order_by=items.c.id)})
        mapper(Item, items)
        mapper(User, users, properties=dict(
            addresses=relationship(Address, lazy='joined', order_by=addresses.c.id),
            open_orders=relationship(
                Order,
                primaryjoin=sa.and_(orders.c.isopen == 1,
                                 users.c.id==orders.c.user_id),
                lazy='joined', order_by=orders.c.id),
            closed_orders=relationship(
                Order,
                primaryjoin=sa.and_(orders.c.isopen == 0,
                                 users.c.id==orders.c.user_id),
                lazy='joined', order_by=orders.c.id)))
        q = create_session().query(User).order_by(User.id)

        def go():
            eq_([
                User(id=7,
                     addresses=[
                       Address(id=1)],
                     open_orders=[Order(id=3,
                                        items=[
                                          Item(id=3),
                                          Item(id=4),
                                          Item(id=5)])],
                     closed_orders=[Order(id=1,
                                          items=[
                                            Item(id=1),
                                            Item(id=2),
                                            Item(id=3)]),
                                    Order(id=5,
                                          items=[
                                            Item(id=5)])]),
                User(id=8,
                     addresses=[
                       Address(id=2),
                       Address(id=3),
                       Address(id=4)],
                     open_orders = [],
                     closed_orders = []),
                User(id=9,
                     addresses=[
                       Address(id=5)],
                     open_orders=[
                       Order(id=4,
                             items=[
                               Item(id=1),
                               Item(id=5)])],
                     closed_orders=[
                       Order(id=2,
                             items=[
                               Item(id=1),
                               Item(id=2),
                               Item(id=3)])]),
                User(id=10)
            ], q.all())
        self.assert_sql_count(testing.db, go, 1)

    @testing.resolve_artifact_names
    def test_no_false_hits(self):
        """Eager loaders don't interpret main table columns as 
        part of their eager load."""

        mapper(User, users, properties={
            'addresses':relationship(Address, lazy='joined'),
            'orders':relationship(Order, lazy='joined')
        })
        mapper(Address, addresses)
        mapper(Order, orders)

        allusers = create_session().query(User).all()

        # using a textual select, the columns will be 'id' and 'name'.  the
        # eager loaders have aliases which should not hit on those columns,
        # they should be required to locate only their aliased/fully table
        # qualified column name.
        noeagers = create_session().query(User).\
                        from_statement("select * from users").all()
        assert 'orders' not in noeagers[0].__dict__
        assert 'addresses' not in noeagers[0].__dict__

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_limit(self):
        """Limit operations combined with lazy-load relationships."""

        mapper(Item, items)
        mapper(Order, orders, properties={
            'items':relationship(Item, secondary=order_items, lazy='joined',
                order_by=items.c.id)
        })
        mapper(User, users, properties={
            'addresses':relationship(mapper(Address, addresses), lazy='joined', order_by=addresses.c.id),
            'orders':relationship(Order, lazy='select', order_by=orders.c.id)
        })

        sess = create_session()
        q = sess.query(User)

        l = q.order_by(User.id).limit(2).offset(1).all()
        eq_(self.static.user_all_result[1:3], l)

    @testing.resolve_artifact_names
    def test_distinct(self):
        # this is an involved 3x union of the users table to get a lot of rows.
        # then see if the "distinct" works its way out.  you actually get the same
        # result with or without the distinct, just via less or more rows.
        u2 = users.alias('u2')
        s = sa.union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')

        mapper(User, users, properties={
            'addresses':relationship(mapper(Address, addresses), lazy='joined', order_by=addresses.c.id),
        })

        sess = create_session()
        q = sess.query(User)

        def go():
            l = q.filter(s.c.u2_id==User.id).distinct().order_by(User.id).all()
            eq_(self.static.user_address_result, l)
        self.assert_sql_count(testing.db, go, 1)

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_limit_2(self):
        mapper(Keyword, keywords)
        mapper(Item, items, properties = dict(
                keywords = relationship(Keyword, secondary=item_keywords, lazy='joined', order_by=[keywords.c.id]),
            ))

        sess = create_session()
        q = sess.query(Item)
        l = q.filter((Item.description=='item 2') | 
                        (Item.description=='item 5') | 
                        (Item.description=='item 3')).\
            order_by(Item.id).limit(2).all()

        eq_(self.static.item_keyword_result[1:3], l)

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_limit_3(self):
        """test that the ORDER BY is propagated from the inner 
        select to the outer select, when using the
        'wrapped' select statement resulting from the combination of 
        eager loading and limit/offset clauses."""

        mapper(Item, items)
        mapper(Order, orders, properties = dict(
                items = relationship(Item, secondary=order_items, lazy='joined')
        ))

        mapper(Address, addresses)
        mapper(User, users, properties = dict(
            addresses = relationship(Address, lazy='joined', order_by=addresses.c.id),
            orders = relationship(Order, lazy='joined', order_by=orders.c.id),
        ))
        sess = create_session()

        q = sess.query(User)

        if not testing.against('maxdb', 'mssql'):
            l = q.join('orders').order_by(Order.user_id.desc()).limit(2).offset(1)
            eq_([
                User(id=9,
                    orders=[Order(id=2), Order(id=4)],
                    addresses=[Address(id=5)]
                ),
                User(id=7,
                    orders=[Order(id=1), Order(id=3), Order(id=5)],
                    addresses=[Address(id=1)]
                )
            ], l.all())

        l = q.join('addresses').order_by(Address.email_address.desc()).limit(1).offset(0)
        eq_([
            User(id=7,
                orders=[Order(id=1), Order(id=3), Order(id=5)],
                addresses=[Address(id=1)]
            )
        ], l.all())

    @testing.resolve_artifact_names
    def test_limit_4(self):
        # tests the LIMIT/OFFSET aliasing on a mapper 
        # against a select.   original issue from ticket #904
        sel = sa.select([users, addresses.c.email_address],
                        users.c.id==addresses.c.user_id).alias('useralias')
        mapper(User, sel, properties={
            'orders':relationship(Order, primaryjoin=sel.c.id==orders.c.user_id, lazy='joined')
        })
        mapper(Order, orders)

        sess = create_session()
        eq_(sess.query(User).first(),
            User(name=u'jack',orders=[
                Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1),
                Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3),
                Order(address_id=None,description=u'order 5',isopen=0,user_id=7,id=5)],
            email_address=u'jack@bean.com',id=7)
        )

    @testing.resolve_artifact_names
    def test_useget_cancels_eager(self):
        """test that a one to many lazyload cancels the unnecessary
        eager many-to-one join on the other side."""
        
        mapper(User, users)
        mapper(Address, addresses, properties={
            'user':relationship(User, lazy='joined', backref='addresses')
        })
        
        sess = create_session()
        u1 = sess.query(User).filter(User.id==8).one()
        def go():
            eq_(u1.addresses[0].user, u1)
        self.assert_sql_execution(testing.db, go, 
            CompiledSQL(
                "SELECT addresses.id AS addresses_id, addresses.user_id AS "
                "addresses_user_id, addresses.email_address AS "
                "addresses_email_address FROM addresses WHERE :param_1 = "
                "addresses.user_id",
             {'param_1': 8})
        )
    
    
    @testing.resolve_artifact_names
    def test_manytoone_limit(self):
        """test that the subquery wrapping only occurs with 
        limit/offset and m2m or o2m joins present."""
        
        mapper(User, users, properties=odict(
            orders=relationship(Order, backref='user')
        ))
        mapper(Order, orders, properties=odict(
            items=relationship(Item, secondary=order_items, backref='orders'),
            address=relationship(Address)
        ))
        mapper(Address, addresses)
        mapper(Item, items)
        
        sess = create_session()

        self.assert_compile(
            sess.query(User).options(joinedload(User.orders)).limit(10),
            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS anon_1_users_name, "
            "orders_1.id AS orders_1_id, orders_1.user_id AS orders_1_user_id, orders_1.address_id AS "
            "orders_1_address_id, orders_1.description AS orders_1_description, orders_1.isopen AS orders_1_isopen "
            "FROM (SELECT users.id AS users_id, users.name AS users_name "
            "FROM users "
            " LIMIT 10) AS anon_1 LEFT OUTER JOIN orders AS orders_1 ON anon_1.users_id = orders_1.user_id"
            ,use_default_dialect=True
        )

        self.assert_compile(
            sess.query(Order).options(joinedload(Order.user)).limit(10),
            "SELECT orders.id AS orders_id, orders.user_id AS orders_user_id, orders.address_id AS "
            "orders_address_id, orders.description AS orders_description, orders.isopen AS orders_isopen, "
            "users_1.id AS users_1_id, users_1.name AS users_1_name FROM orders LEFT OUTER JOIN users AS "
            "users_1 ON users_1.id = orders.user_id  LIMIT 10"
            ,use_default_dialect=True
        )

        self.assert_compile(
            sess.query(Order).options(joinedload(Order.user, innerjoin=True)).limit(10),
            "SELECT orders.id AS orders_id, orders.user_id AS orders_user_id, orders.address_id AS "
            "orders_address_id, orders.description AS orders_description, orders.isopen AS orders_isopen, "
            "users_1.id AS users_1_id, users_1.name AS users_1_name FROM orders JOIN users AS "
            "users_1 ON users_1.id = orders.user_id  LIMIT 10"
            ,use_default_dialect=True
        )

        self.assert_compile(
            sess.query(User).options(joinedload_all("orders.address")).limit(10),
            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS anon_1_users_name, "
            "addresses_1.id AS addresses_1_id, addresses_1.user_id AS addresses_1_user_id, "
            "addresses_1.email_address AS addresses_1_email_address, orders_1.id AS orders_1_id, "
            "orders_1.user_id AS orders_1_user_id, orders_1.address_id AS orders_1_address_id, "
            "orders_1.description AS orders_1_description, orders_1.isopen AS orders_1_isopen FROM "
            "(SELECT users.id AS users_id, users.name AS users_name FROM users  LIMIT 10) AS anon_1 "
            "LEFT OUTER JOIN orders AS orders_1 ON anon_1.users_id = orders_1.user_id LEFT OUTER JOIN "
            "addresses AS addresses_1 ON addresses_1.id = orders_1.address_id"
            ,use_default_dialect=True
        )

        self.assert_compile(
            sess.query(User).options(joinedload_all("orders.items"), joinedload("orders.address")),
            "SELECT users.id AS users_id, users.name AS users_name, items_1.id AS items_1_id, "
            "items_1.description AS items_1_description, addresses_1.id AS addresses_1_id, "
            "addresses_1.user_id AS addresses_1_user_id, addresses_1.email_address AS "
            "addresses_1_email_address, orders_1.id AS orders_1_id, orders_1.user_id AS "
            "orders_1_user_id, orders_1.address_id AS orders_1_address_id, orders_1.description "
            "AS orders_1_description, orders_1.isopen AS orders_1_isopen FROM users LEFT OUTER JOIN "
            "orders AS orders_1 ON users.id = orders_1.user_id LEFT OUTER JOIN order_items AS "
            "order_items_1 ON orders_1.id = order_items_1.order_id LEFT OUTER JOIN items AS "
            "items_1 ON items_1.id = order_items_1.item_id LEFT OUTER JOIN addresses AS "
            "addresses_1 ON addresses_1.id = orders_1.address_id"
            ,use_default_dialect=True
        )

        self.assert_compile(
            sess.query(User).options(joinedload("orders"), joinedload("orders.address", innerjoin=True)).limit(10),
            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS anon_1_users_name, "
            "addresses_1.id AS addresses_1_id, addresses_1.user_id AS addresses_1_user_id, "
            "addresses_1.email_address AS addresses_1_email_address, orders_1.id AS orders_1_id, "
            "orders_1.user_id AS orders_1_user_id, orders_1.address_id AS orders_1_address_id, "
            "orders_1.description AS orders_1_description, orders_1.isopen AS orders_1_isopen "
            "FROM (SELECT users.id AS users_id, users.name AS users_name "
            "FROM users "
            " LIMIT 10) AS anon_1 LEFT OUTER JOIN orders AS orders_1 ON anon_1.users_id = "
            "orders_1.user_id JOIN addresses AS addresses_1 ON addresses_1.id = orders_1.address_id"
            ,use_default_dialect=True
        )
        
    @testing.resolve_artifact_names
    def test_one_to_many_scalar(self):
        mapper(User, users, properties = dict(
            address = relationship(mapper(Address, addresses), 
                                    lazy='joined', uselist=False)
        ))
        q = create_session().query(User)

        def go():
            l = q.filter(users.c.id == 7).all()
            eq_([User(id=7, address=Address(id=1))], l)
        self.assert_sql_count(testing.db, go, 1)

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_many_to_one(self):
        mapper(Address, addresses, properties = dict(
            user = relationship(mapper(User, users), lazy='joined')
        ))
        sess = create_session()
        q = sess.query(Address)

        def go():
            a = q.filter(addresses.c.id==1).one()
            is_not_(a.user, None)
            u1 = sess.query(User).get(7)
            is_(a.user, u1)
        self.assert_sql_count(testing.db, go, 1)

    @testing.resolve_artifact_names
    def test_many_to_one_null(self):
        """test that a many-to-one eager load which loads None does
        not later trigger a lazy load.
        
        """
        
        # use a primaryjoin intended to defeat SA's usage of 
        # query.get() for a many-to-one lazyload
        mapper(Order, orders, properties = dict(
            address = relationship(mapper(Address, addresses), 
                primaryjoin=and_(
                    addresses.c.id==orders.c.address_id,
                    addresses.c.email_address != None
                ),
            
            lazy='joined')
        ))
        sess = create_session()

        def go():
            o1 = sess.query(Order).options(lazyload('address')).filter(Order.id==5).one()
            eq_(o1.address, None)
        self.assert_sql_count(testing.db, go, 2)
        
        sess.expunge_all()
        def go():
            o1 = sess.query(Order).filter(Order.id==5).one()
            eq_(o1.address, None)
        self.assert_sql_count(testing.db, go, 1)
        
    @testing.resolve_artifact_names
    def test_one_and_many(self):
        """tests eager load for a parent object with a child object that
        contains a many-to-many relationship to a third object."""

        mapper(User, users, properties={
            'orders':relationship(Order, lazy='joined', order_by=orders.c.id)
        })
        mapper(Item, items)
        mapper(Order, orders, properties = dict(
                items = relationship(Item, secondary=order_items, lazy='joined', order_by=items.c.id)
            ))

        q = create_session().query(User)

        l = q.filter("users.id in (7, 8, 9)").order_by("users.id")

        def go():
            eq_(self.static.user_order_result[0:3], l.all())
        self.assert_sql_count(testing.db, go, 1)

    @testing.resolve_artifact_names
    def test_double_with_aggregate(self):
        max_orders_by_user = sa.select([sa.func.max(orders.c.id).label('order_id')],
                                       group_by=[orders.c.user_id]
                                     ).alias('max_orders_by_user')

        max_orders = orders.select(orders.c.id==max_orders_by_user.c.order_id).\
                                alias('max_orders')

        mapper(Order, orders)
        mapper(User, users, properties={
               'orders':relationship(Order, backref='user', lazy='joined',
                                            order_by=orders.c.id),
               'max_order':relationship(
                                mapper(Order, max_orders, non_primary=True), 
                                lazy='joined', uselist=False)
               })

        q = create_session().query(User)

        def go():
            eq_([
                User(id=7, orders=[
                        Order(id=1),
                        Order(id=3),
                        Order(id=5),
                    ],
                    max_order=Order(id=5)
                ),
                User(id=8, orders=[]),
                User(id=9, orders=[Order(id=2),Order(id=4)],
                    max_order=Order(id=4)
                ),
                User(id=10),
            ], q.order_by(User.id).all())
        self.assert_sql_count(testing.db, go, 1)

    @testing.resolve_artifact_names 
    def test_uselist_false_warning(self):
        """test that multiple rows received by a 
        uselist=False raises a warning."""
        
        mapper(User, users, properties={
            'order':relationship(Order, uselist=False)
        })
        mapper(Order, orders)
        s = create_session()
        assert_raises(sa.exc.SAWarning,
                s.query(User).options(joinedload(User.order)).all)
        
    @testing.resolve_artifact_names
    def test_wide(self):
        mapper(Order, orders, properties={'items':relationship(Item, secondary=order_items, lazy='joined',
                                                           order_by=items.c.id)})
        mapper(Item, items)
        mapper(User, users, properties = dict(
            addresses = relationship(mapper(Address, addresses), lazy = False, order_by=addresses.c.id),
            orders = relationship(Order, lazy = False, order_by=orders.c.id),
        ))
        q = create_session().query(User)
        l = q.all()
        eq_(self.static.user_all_result, q.order_by(User.id).all())

    @testing.resolve_artifact_names
    def test_against_select(self):
        """test eager loading of a mapper which is against a select"""

        s = sa.select([orders], orders.c.isopen==1).alias('openorders')

        mapper(Order, s, properties={
            'user':relationship(User, lazy='joined')
        })
        mapper(User, users)
        mapper(Item, items)

        q = create_session().query(Order)
        eq_([
            Order(id=3, user=User(id=7)),
            Order(id=4, user=User(id=9))
        ], q.all())

        q = q.select_from(s.join(order_items).join(items)).filter(~Item.id.in_([1, 2, 5]))
        eq_([
            Order(id=3, user=User(id=7)),
        ], q.all())

    @testing.resolve_artifact_names
    def test_aliasing(self):
        """test that eager loading uses aliases to insulate the eager 
        load from regular criterion against those tables."""

        mapper(User, users, properties = dict(
            addresses = relationship(mapper(Address, addresses), 
                                    lazy='joined', order_by=addresses.c.id)
        ))
        q = create_session().query(User)
        l = q.filter(addresses.c.email_address == 'ed@lala.com').filter(
            Address.user_id==User.id).order_by(User.id)
        eq_(self.static.user_address_result[1:2], l.all())

    @testing.resolve_artifact_names
    def test_inner_join(self):
        mapper(User, users, properties = dict(
            addresses = relationship(mapper(Address, addresses), lazy='joined', innerjoin=True, order_by=addresses.c.id)
        ))
        sess = create_session()
        eq_(
            [User(id=7, addresses=[ Address(id=1) ]),
            User(id=8, 
                addresses=[ Address(id=2, email_address='ed@wood.com'), 
                            Address(id=3, email_address='ed@bettyboop.com'), 
                            Address(id=4, email_address='ed@lala.com'), ]),
            User(id=9, addresses=[ Address(id=5) ])]
            ,sess.query(User).all()
        )
        self.assert_compile(sess.query(User), 
                "SELECT users.id AS users_id, users.name AS users_name, "
                "addresses_1.id AS addresses_1_id, addresses_1.user_id AS addresses_1_user_id, "
                "addresses_1.email_address AS addresses_1_email_address FROM users JOIN "
                "addresses AS addresses_1 ON users.id = addresses_1.user_id ORDER BY addresses_1.id"
        , use_default_dialect=True)

    @testing.resolve_artifact_names
    def test_inner_join_options(self):
        mapper(User, users, properties = dict(
            orders =relationship(Order, backref=backref('user', innerjoin=True), order_by=orders.c.id)
        ))
        mapper(Order, orders, properties=dict(
            items=relationship(Item, secondary=order_items, order_by=items.c.id)
        ))
        mapper(Item, items)
        sess = create_session()
        self.assert_compile(sess.query(User).options(joinedload(User.orders, innerjoin=True)), 
            "SELECT users.id AS users_id, users.name AS users_name, orders_1.id AS orders_1_id, "
            "orders_1.user_id AS orders_1_user_id, orders_1.address_id AS orders_1_address_id, "
            "orders_1.description AS orders_1_description, orders_1.isopen AS orders_1_isopen "
            "FROM users JOIN orders AS orders_1 ON users.id = orders_1.user_id ORDER BY orders_1.id"
        , use_default_dialect=True)

        self.assert_compile(sess.query(User).options(joinedload_all(User.orders, Order.items, innerjoin=True)), 
            "SELECT users.id AS users_id, users.name AS users_name, items_1.id AS items_1_id, "
            "items_1.description AS items_1_description, orders_1.id AS orders_1_id, "
            "orders_1.user_id AS orders_1_user_id, orders_1.address_id AS orders_1_address_id, "
            "orders_1.description AS orders_1_description, orders_1.isopen AS orders_1_isopen "
            "FROM users JOIN orders AS orders_1 ON users.id = orders_1.user_id JOIN order_items AS "
            "order_items_1 ON orders_1.id = order_items_1.order_id JOIN items AS items_1 ON "
            "items_1.id = order_items_1.item_id ORDER BY orders_1.id, items_1.id"
        , use_default_dialect=True)
        
        def go():
            eq_(
                sess.query(User).options(
                    joinedload(User.orders, innerjoin=True), 
                    joinedload(User.orders, Order.items, innerjoin=True)).
                    order_by(User.id).all(),
                    
                [User(id=7, 
                    orders=[ 
                        Order(id=1, items=[ Item(id=1), Item(id=2), Item(id=3)]), 
                        Order(id=3, items=[ Item(id=3), Item(id=4), Item(id=5)]), 
                        Order(id=5, items=[Item(id=5)])]),
                User(id=9, orders=[
                    Order(id=2, items=[ Item(id=1), Item(id=2), Item(id=3)]), 
                    Order(id=4, items=[ Item(id=1), Item(id=5)])])
                ]
            )
        self.assert_sql_count(testing.db, go, 1)
        
        # test that default innerjoin setting is used for options
        self.assert_compile(
            sess.query(Order).options(joinedload(Order.user)).filter(Order.description == 'foo'),
            "SELECT orders.id AS orders_id, orders.user_id AS orders_user_id, orders.address_id AS "
            "orders_address_id, orders.description AS orders_description, orders.isopen AS "
            "orders_isopen, users_1.id AS users_1_id, users_1.name AS users_1_name "
            "FROM orders JOIN users AS users_1 ON users_1.id = orders.user_id "
            "WHERE orders.description = :description_1",
            use_default_dialect=True
        )
        
class AddEntityTest(_fixtures.FixtureTest):
    run_inserts = 'once'
    run_deletes = None

    @testing.resolve_artifact_names
    def _assert_result(self):
        return [
            (
                User(id=7,
                    addresses=[Address(id=1)]
                ),
                Order(id=1,
                    items=[Item(id=1), Item(id=2), Item(id=3)]
                ),
            ),
            (
                User(id=7,
                    addresses=[Address(id=1)]
                ),
                Order(id=3,
                    items=[Item(id=3), Item(id=4), Item(id=5)]
                ),
            ),
            (
                User(id=7,
                    addresses=[Address(id=1)]
                ),
                Order(id=5,
                    items=[Item(id=5)]
                ),
            ),
            (
                 User(id=9,
                    addresses=[Address(id=5)]
                ),
                 Order(id=2,
                    items=[Item(id=1), Item(id=2), Item(id=3)]
                ),
             ),
             (
                  User(id=9,
                    addresses=[Address(id=5)]
                ),
                  Order(id=4,
                    items=[Item(id=1), Item(id=5)]
                ),
              )
        ]

    @testing.resolve_artifact_names
    def test_mapper_configured(self):
        mapper(User, users, properties={
            'addresses':relationship(Address, lazy='joined'),
            'orders':relationship(Order)
        })
        mapper(Address, addresses)
        mapper(Order, orders, properties={
            'items':relationship(Item, secondary=order_items, lazy='joined', order_by=items.c.id)
        })
        mapper(Item, items)


        sess = create_session()
        oalias = sa.orm.aliased(Order)
        def go():
            ret = sess.query(User, oalias).join(('orders', oalias)).order_by(User.id,
                                                                             oalias.id).all()
            eq_(ret, self._assert_result())
        self.assert_sql_count(testing.db, go, 1)

    @testing.resolve_artifact_names
    def test_options(self):
        mapper(User, users, properties={
            'addresses':relationship(Address),
            'orders':relationship(Order)
        })
        mapper(Address, addresses)
        mapper(Order, orders, properties={
            'items':relationship(Item, secondary=order_items, order_by=items.c.id)
        })
        mapper(Item, items)

        sess = create_session()

        oalias = sa.orm.aliased(Order)
        def go():
            ret = sess.query(User, oalias).options(joinedload('addresses')).join(
                ('orders', oalias)).order_by(User.id, oalias.id).all()
            eq_(ret, self._assert_result())
        self.assert_sql_count(testing.db, go, 6)

        sess.expunge_all()
        def go():
            ret = sess.query(User, oalias).options(joinedload('addresses'),
                                                   joinedload(oalias.items)).join(
                ('orders', oalias)).order_by(User.id, oalias.id).all()
            eq_(ret, self._assert_result())
        self.assert_sql_count(testing.db, go, 1)

class OrderBySecondaryTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('m2m', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('aid', Integer, ForeignKey('a.id')),
              Column('bid', Integer, ForeignKey('b.id')))

        Table('a', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(50)))
        Table('b', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(50)))

    @classmethod
    def fixtures(cls):
        return dict(
            a=(('id', 'data'),
               (1, 'a1'),
               (2, 'a2')),

            b=(('id', 'data'),
               (1, 'b1'),
               (2, 'b2'),
               (3, 'b3'),
               (4, 'b4')),

            m2m=(('id', 'aid', 'bid'),
                 (2, 1, 1),
                 (4, 2, 4),
                 (1, 1, 3),
                 (6, 2, 2),
                 (3, 1, 2),
                 (5, 2, 3)))

    @testing.resolve_artifact_names
    def test_ordering(self):
        class A(_base.ComparableEntity):pass
        class B(_base.ComparableEntity):pass

        mapper(A, a, properties={
            'bs':relationship(B, secondary=m2m, lazy='joined', order_by=m2m.c.id)
        })
        mapper(B, b)

        sess = create_session()
        eq_(sess.query(A).all(), [
                        A(data='a1', bs=[B(data='b3'), B(data='b1'), B(data='b2')]),
                        A(bs=[B(data='b4'), B(data='b3'), B(data='b2')])
        ])


class SelfReferentialEagerTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('nodes', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('parent_id', Integer, ForeignKey('nodes.id')),
            Column('data', String(30)))

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_basic(self):
        class Node(_base.ComparableEntity):
            def append(self, node):
                self.children.append(node)

        mapper(Node, nodes, properties={
            'children':relationship(Node, 
                                        lazy='joined', 
                                        join_depth=3, order_by=nodes.c.id)
        })
        sess = create_session()
        n1 = Node(data='n1')
        n1.append(Node(data='n11'))
        n1.append(Node(data='n12'))
        n1.append(Node(data='n13'))
        n1.children[1].append(Node(data='n121'))
        n1.children[1].append(Node(data='n122'))
        n1.children[1].append(Node(data='n123'))
        sess.add(n1)
        sess.flush()
        sess.expunge_all()
        def go():
            d = sess.query(Node).filter_by(data='n1').all()[0]
            eq_(Node(data='n1', children=[
                Node(data='n11'),
                Node(data='n12', children=[
                    Node(data='n121'),
                    Node(data='n122'),
                    Node(data='n123')
                ]),
                Node(data='n13')
            ]), d)
        self.assert_sql_count(testing.db, go, 1)

        sess.expunge_all()
        def go():
            d = sess.query(Node).filter_by(data='n1').first()
            eq_(Node(data='n1', children=[
                Node(data='n11'),
                Node(data='n12', children=[
                    Node(data='n121'),
                    Node(data='n122'),
                    Node(data='n123')
                ]),
                Node(data='n13')
            ]), d)
        self.assert_sql_count(testing.db, go, 1)


    @testing.resolve_artifact_names
    def test_lazy_fallback_doesnt_affect_eager(self):
        class Node(_base.ComparableEntity):
            def append(self, node):
                self.children.append(node)

        mapper(Node, nodes, properties={
            'children':relationship(Node, lazy='joined', join_depth=1,
                                    order_by=nodes.c.id)
        })
        sess = create_session()
        n1 = Node(data='n1')
        n1.append(Node(data='n11'))
        n1.append(Node(data='n12'))
        n1.append(Node(data='n13'))
        n1.children[1].append(Node(data='n121'))
        n1.children[1].append(Node(data='n122'))
        n1.children[1].append(Node(data='n123'))
        sess.add(n1)
        sess.flush()
        sess.expunge_all()

        # eager load with join depth 1.  when eager load of 'n1' hits the
        # children of 'n12', no columns are present, eager loader degrades to
        # lazy loader; fine.  but then, 'n12' is *also* in the first level of
        # columns since we're loading the whole table.  when those rows
        # arrive, now we *can* eager load its children and an eager collection
        # should be initialized.  essentially the 'n12' instance is present in
        # not just two different rows but two distinct sets of columns in this
        # result set.
        def go():
            allnodes = sess.query(Node).order_by(Node.data).all()
            n12 = allnodes[2]
            eq_(n12.data, 'n12')
            eq_([
                Node(data='n121'),
                Node(data='n122'),
                Node(data='n123')
            ], list(n12.children))
        self.assert_sql_count(testing.db, go, 1)

    @testing.resolve_artifact_names
    def test_with_deferred(self):
        class Node(_base.ComparableEntity):
            def append(self, node):
                self.children.append(node)

        mapper(Node, nodes, properties={
            'children':relationship(Node, lazy='joined', join_depth=3,
                                    order_by=nodes.c.id),
            'data':deferred(nodes.c.data)
        })
        sess = create_session()
        n1 = Node(data='n1')
        n1.append(Node(data='n11'))
        n1.append(Node(data='n12'))
        sess.add(n1)
        sess.flush()
        sess.expunge_all()

        def go():
            eq_( 
                Node(data='n1', children=[Node(data='n11'), Node(data='n12')]),
                sess.query(Node).order_by(Node.id).first(),
                )
        self.assert_sql_count(testing.db, go, 4)

        sess.expunge_all()

        def go():
            eq_(Node(data='n1', children=[Node(data='n11'), Node(data='n12')]),
                sess.query(Node).options(undefer('data')).order_by(Node.id).first())
        self.assert_sql_count(testing.db, go, 3)

        sess.expunge_all()

        def go():
            eq_(Node(data='n1', children=[Node(data='n11'), Node(data='n12')]),
                sess.query(Node).options(undefer('data'),
                                            undefer('children.data')).first())
        self.assert_sql_count(testing.db, go, 1)


    @testing.resolve_artifact_names
    def test_options(self):
        class Node(_base.ComparableEntity):
            def append(self, node):
                self.children.append(node)

        mapper(Node, nodes, properties={
            'children':relationship(Node, lazy='select', order_by=nodes.c.id)
        }, order_by=nodes.c.id)
        sess = create_session()
        n1 = Node(data='n1')
        n1.append(Node(data='n11'))
        n1.append(Node(data='n12'))
        n1.append(Node(data='n13'))
        n1.children[1].append(Node(data='n121'))
        n1.children[1].append(Node(data='n122'))
        n1.children[1].append(Node(data='n123'))
        sess.add(n1)
        sess.flush()
        sess.expunge_all()
        def go():
            d = sess.query(Node).filter_by(data='n1').\
                        options(joinedload('children.children')).first()
            eq_(Node(data='n1', children=[
                Node(data='n11'),
                Node(data='n12', children=[
                    Node(data='n121'),
                    Node(data='n122'),
                    Node(data='n123')
                ]),
                Node(data='n13')
            ]), d)
        self.assert_sql_count(testing.db, go, 2)

        def go():
            d = sess.query(Node).filter_by(data='n1').\
                        options(joinedload('children.children')).first()

        # test that the query isn't wrapping the initial query for eager loading.
        self.assert_sql_execution(testing.db, go, 
            CompiledSQL(
                "SELECT nodes.id AS nodes_id, nodes.parent_id AS nodes_parent_id, nodes.data AS nodes_data FROM nodes "
                "WHERE nodes.data = :data_1 ORDER BY nodes.id  LIMIT 1 OFFSET 0",
                {'data_1': 'n1'}
            )
        )

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_no_depth(self):
        class Node(_base.ComparableEntity):
            def append(self, node):
                self.children.append(node)

        mapper(Node, nodes, properties={
            'children':relationship(Node, lazy='joined')
        })
        sess = create_session()
        n1 = Node(data='n1')
        n1.append(Node(data='n11'))
        n1.append(Node(data='n12'))
        n1.append(Node(data='n13'))
        n1.children[1].append(Node(data='n121'))
        n1.children[1].append(Node(data='n122'))
        n1.children[1].append(Node(data='n123'))
        sess.add(n1)
        sess.flush()
        sess.expunge_all()
        def go():
            d = sess.query(Node).filter_by(data='n1').first()
            eq_(Node(data='n1', children=[
                Node(data='n11'),
                Node(data='n12', children=[
                    Node(data='n121'),
                    Node(data='n122'),
                    Node(data='n123')
                ]),
                Node(data='n13')
            ]), d)
        self.assert_sql_count(testing.db, go, 3)

class MixedSelfReferentialEagerTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('a_table', metadata,
               Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
               )

        Table('b_table', metadata,
               Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
               Column('parent_b1_id', Integer, ForeignKey('b_table.id')),
               Column('parent_a_id', Integer, ForeignKey('a_table.id')),
               Column('parent_b2_id', Integer, ForeignKey('b_table.id')))


    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        class A(_base.ComparableEntity):
            pass
        class B(_base.ComparableEntity):
            pass
            
        mapper(A,a_table)
        mapper(B,b_table,properties = {
           'parent_b1': relationship(B,
                            remote_side = [b_table.c.id],
                            primaryjoin = (b_table.c.parent_b1_id ==b_table.c.id),
                            order_by = b_table.c.id
                            ),
           'parent_z': relationship(A,lazy = True),
           'parent_b2': relationship(B,
                            remote_side = [b_table.c.id],
                            primaryjoin = (b_table.c.parent_b2_id ==b_table.c.id),
                            order_by = b_table.c.id
                            )
        });
    
    @classmethod
    @testing.resolve_artifact_names
    def insert_data(cls):
        a_table.insert().execute(dict(id=1), dict(id=2), dict(id=3))
        b_table.insert().execute(
            dict(id=1, parent_a_id=2, parent_b1_id=None, parent_b2_id=None),
            dict(id=2, parent_a_id=1, parent_b1_id=1, parent_b2_id=None),
            dict(id=3, parent_a_id=1, parent_b1_id=1, parent_b2_id=2),
            dict(id=4, parent_a_id=3, parent_b1_id=1, parent_b2_id=None),
            dict(id=5, parent_a_id=3, parent_b1_id=None, parent_b2_id=2),
            dict(id=6, parent_a_id=1, parent_b1_id=1, parent_b2_id=3),
            dict(id=7, parent_a_id=2, parent_b1_id=None, parent_b2_id=3),
            dict(id=8, parent_a_id=2, parent_b1_id=1, parent_b2_id=2),
            dict(id=9, parent_a_id=None, parent_b1_id=1, parent_b2_id=None),
            dict(id=10, parent_a_id=3, parent_b1_id=7, parent_b2_id=2),
            dict(id=11, parent_a_id=3, parent_b1_id=1, parent_b2_id=8),
            dict(id=12, parent_a_id=2, parent_b1_id=5, parent_b2_id=2),
            dict(id=13, parent_a_id=3, parent_b1_id=4, parent_b2_id=4),
            dict(id=14, parent_a_id=3, parent_b1_id=7, parent_b2_id=2),
        )
        
    @testing.resolve_artifact_names
    def test_eager_load(self):
        session = create_session()
        def go():
            eq_(
                session.query(B).\
                    options(
                                joinedload('parent_b1'),
                                joinedload('parent_b2'),
                                joinedload('parent_z')).
                            filter(B.id.in_([2, 8, 11])).order_by(B.id).all(),
                [
                    B(id=2, parent_z=A(id=1), parent_b1=B(id=1), parent_b2=None),
                    B(id=8, parent_z=A(id=2), parent_b1=B(id=1), parent_b2=B(id=2)),
                    B(id=11, parent_z=A(id=3), parent_b1=B(id=1), parent_b2=B(id=8))
                ]
            )
        self.assert_sql_count(testing.db, go, 1)
        
class SelfReferentialM2MEagerTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('widget', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('name', sa.Unicode(40), nullable=False, unique=True),
        )

        Table('widget_rel', metadata,
            Column('parent_id', Integer, ForeignKey('widget.id')),
            Column('child_id', Integer, ForeignKey('widget.id')),
            sa.UniqueConstraint('parent_id', 'child_id'),
        )

    @testing.resolve_artifact_names
    def test_basic(self):
        class Widget(_base.ComparableEntity):
            pass

        mapper(Widget, widget, properties={
            'children': relationship(Widget, secondary=widget_rel,
                primaryjoin=widget_rel.c.parent_id==widget.c.id,
                secondaryjoin=widget_rel.c.child_id==widget.c.id,
                lazy='joined', join_depth=1,
            )
        })

        sess = create_session()
        w1 = Widget(name=u'w1')
        w2 = Widget(name=u'w2')
        w1.children.append(w2)
        sess.add(w1)
        sess.flush()
        sess.expunge_all()

        eq_([Widget(name='w1', children=[Widget(name='w2')])],
            sess.query(Widget).filter(Widget.name==u'w1').all())

class MixedEntitiesTest(_fixtures.FixtureTest, testing.AssertsCompiledSQL):
    run_setup_mappers = 'once'
    run_inserts = 'once'
    run_deletes = None

    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        mapper(User, users, properties={
            'addresses':relationship(Address, backref='user'),
            'orders':relationship(Order, backref='user'), # o2m, m2o
        })
        mapper(Address, addresses)
        mapper(Order, orders, properties={
            'items':relationship(Item, secondary=order_items, order_by=items.c.id),  #m2m
        })
        mapper(Item, items, properties={
            'keywords':relationship(Keyword, secondary=item_keywords) #m2m
        })
        mapper(Keyword, keywords)

    @testing.resolve_artifact_names
    def test_two_entities(self):
        sess = create_session()

        # two FROM clauses
        def go():
            eq_(
                [
                    (User(id=9, addresses=[Address(id=5)]), Order(id=2, items=[Item(id=1), Item(id=2), Item(id=3)])),
                    (User(id=9, addresses=[Address(id=5)]), Order(id=4, items=[Item(id=1), Item(id=5)])),
                ],
                sess.query(User, Order).filter(User.id==Order.user_id).\
                    options(joinedload(User.addresses), joinedload(Order.items)).filter(User.id==9).\
                        order_by(User.id, Order.id).all(),
            )
        self.assert_sql_count(testing.db, go, 1)

        # one FROM clause
        def go():
            eq_(
                [
                    (User(id=9, addresses=[Address(id=5)]), Order(id=2, items=[Item(id=1), Item(id=2), Item(id=3)])),
                    (User(id=9, addresses=[Address(id=5)]), Order(id=4, items=[Item(id=1), Item(id=5)])),
                ],
                sess.query(User, Order).join(User.orders).options(joinedload(User.addresses), joinedload(Order.items)).filter(User.id==9).\
                    order_by(User.id, Order.id).all(),
            )
        self.assert_sql_count(testing.db, go, 1)

    @testing.exclude('sqlite', '>', (0, ), "sqlite flat out blows it on the multiple JOINs")
    @testing.resolve_artifact_names
    def test_two_entities_with_joins(self):
        sess = create_session()
        
        # two FROM clauses where there's a join on each one
        def go():
            u1 = aliased(User)
            o1 = aliased(Order)
            eq_(
                [
                    (
                        User(addresses=[Address(email_address=u'fred@fred.com')], name=u'fred'), 
                        Order(description=u'order 2', isopen=0, items=[Item(description=u'item 1'), Item(description=u'item 2'), Item(description=u'item 3')]),
                        User(addresses=[Address(email_address=u'jack@bean.com')], name=u'jack'), 
                        Order(description=u'order 3', isopen=1, items=[Item(description=u'item 3'), Item(description=u'item 4'), Item(description=u'item 5')])
                    ), 

                    (
                        User(addresses=[Address(email_address=u'fred@fred.com')], name=u'fred'), 
                        Order(description=u'order 2', isopen=0, items=[Item(description=u'item 1'), Item(description=u'item 2'), Item(description=u'item 3')]),
                        User(addresses=[Address(email_address=u'jack@bean.com')], name=u'jack'), 
                        Order(address_id=None, description=u'order 5', isopen=0, items=[Item(description=u'item 5')])
                    ), 

                    (
                        User(addresses=[Address(email_address=u'fred@fred.com')], name=u'fred'), 
                        Order(description=u'order 4', isopen=1, items=[Item(description=u'item 1'), Item(description=u'item 5')]),
                        User(addresses=[Address(email_address=u'jack@bean.com')], name=u'jack'), 
                        Order(address_id=None, description=u'order 5', isopen=0, items=[Item(description=u'item 5')])
                    ), 
                ],
                sess.query(User, Order, u1, o1).\
                        join((Order, User.orders)).options(joinedload(User.addresses), joinedload(Order.items)).filter(User.id==9).\
                        join((o1, u1.orders)).options(joinedload(u1.addresses), joinedload(o1.items)).filter(u1.id==7).\
                        filter(Order.id<o1.id).\
                        order_by(User.id, Order.id, u1.id, o1.id).all(),
            )
        self.assert_sql_count(testing.db, go, 1)
        
        

    @testing.resolve_artifact_names
    def test_aliased_entity(self):
        sess = create_session()

        oalias = sa.orm.aliased(Order)

        # two FROM clauses
        def go():
            eq_(
                [
                    (User(id=9, addresses=[Address(id=5)]), Order(id=2, items=[Item(id=1), Item(id=2), Item(id=3)])),
                    (User(id=9, addresses=[Address(id=5)]), Order(id=4, items=[Item(id=1), Item(id=5)])),
                ],
                sess.query(User, oalias).filter(User.id==oalias.user_id).\
                    options(joinedload(User.addresses), joinedload(oalias.items)).filter(User.id==9).\
                    order_by(User.id, oalias.id).all(),
            )
        self.assert_sql_count(testing.db, go, 1)

        # one FROM clause
        def go():
            eq_(
                [
                    (User(id=9, addresses=[Address(id=5)]), Order(id=2, items=[Item(id=1), Item(id=2), Item(id=3)])),
                    (User(id=9, addresses=[Address(id=5)]), Order(id=4, items=[Item(id=1), Item(id=5)])),
                ],
                sess.query(User, oalias).join((User.orders, oalias)).options(joinedload(User.addresses), joinedload(oalias.items)).filter(User.id==9).\
                    order_by(User.id, oalias.id).all(),
            )
        self.assert_sql_count(testing.db, go, 1)

        from sqlalchemy.engine.default import DefaultDialect

        # improper setup: oalias in the columns clause but join to usual
        # orders alias.  this should create two FROM clauses even though the
        # query has a from_clause set up via the join
        self.assert_compile(sess.query(User, oalias).join(User.orders).options(joinedload(oalias.items)).with_labels().statement,
        "SELECT users.id AS users_id, users.name AS users_name, orders_1.id AS orders_1_id, "\
        "orders_1.user_id AS orders_1_user_id, orders_1.address_id AS orders_1_address_id, "\
        "orders_1.description AS orders_1_description, orders_1.isopen AS orders_1_isopen, items_1.id AS items_1_id, "\
        "items_1.description AS items_1_description FROM users JOIN orders ON users.id = orders.user_id, "\
        "orders AS orders_1 LEFT OUTER JOIN order_items AS order_items_1 ON orders_1.id = order_items_1.order_id "\
        "LEFT OUTER JOIN items AS items_1 ON items_1.id = order_items_1.item_id ORDER BY items_1.id",
        dialect=DefaultDialect()
        )

class CyclicalInheritingEagerTest(_base.MappedTest):

    @classmethod
    def define_tables(cls, metadata):
        Table('t1', metadata,
            Column('c1', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('c2', String(30)),
            Column('type', String(30))
            )

        Table('t2', metadata,
            Column('c1', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('c2', String(30)),
            Column('type', String(30)),
            Column('t1.id', Integer, ForeignKey('t1.c1')))

    @testing.resolve_artifact_names
    def test_basic(self):
        class T(object):
            pass

        class SubT(T):
            pass

        class T2(object):
            pass

        class SubT2(T2):
            pass

        mapper(T, t1, polymorphic_on=t1.c.type, polymorphic_identity='t1')
        mapper(SubT, None, inherits=T, polymorphic_identity='subt1', properties={
            't2s':relationship(SubT2, lazy='joined', backref=sa.orm.backref('subt', lazy='joined'))
        })
        mapper(T2, t2, polymorphic_on=t2.c.type, polymorphic_identity='t2')
        mapper(SubT2, None, inherits=T2, polymorphic_identity='subt2')

        # testing a particular endless loop condition in eager join setup
        create_session().query(SubT).all()

class SubqueryTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('users_table', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('name', String(16))
        )

        Table('tags_table', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('user_id', Integer, ForeignKey("users_table.id")),
            Column('score1', sa.Float),
            Column('score2', sa.Float),
        )

    @testing.resolve_artifact_names
    def test_label_anonymizing(self):
        """Eager loading works with subqueries with labels,

        Even if an explicit labelname which conflicts with a label on the
        parent.

        There's not much reason a column_property() would ever need to have a
        label of a specific name (and they don't even need labels these days),
        unless you'd like the name to line up with a name that you may be
        using for a straight textual statement used for loading instances of
        that type.

        """
        class User(_base.ComparableEntity):
            @property
            def prop_score(self):
                return sum([tag.prop_score for tag in self.tags])

        class Tag(_base.ComparableEntity):
            @property
            def prop_score(self):
                return self.score1 * self.score2

        for labeled, labelname in [(True, 'score'), (True, None), (False, None)]:
            sa.orm.clear_mappers()

            tag_score = (tags_table.c.score1 * tags_table.c.score2)
            user_score = sa.select([sa.func.sum(tags_table.c.score1 *
                                                tags_table.c.score2)],
                                   tags_table.c.user_id == users_table.c.id)

            if labeled:
                tag_score = tag_score.label(labelname)
                user_score = user_score.label(labelname)
            else:
                user_score = user_score.as_scalar()

            mapper(Tag, tags_table, properties={
                'query_score': sa.orm.column_property(tag_score),
            })


            mapper(User, users_table, properties={
                'tags': relationship(Tag, backref='user', lazy='joined'),
                'query_score': sa.orm.column_property(user_score),
            })

            session = create_session()
            session.add(User(name='joe', tags=[Tag(score1=5.0, score2=3.0), Tag(score1=55.0, score2=1.0)]))
            session.add(User(name='bar', tags=[Tag(score1=5.0, score2=4.0), Tag(score1=50.0, score2=1.0), Tag(score1=15.0, score2=2.0)]))
            session.flush()
            session.expunge_all()

            for user in session.query(User).all():
                eq_(user.query_score, user.prop_score)

            def go():
                u = session.query(User).filter_by(name='joe').one()
                eq_(u.query_score, u.prop_score)
            self.assert_sql_count(testing.db, go, 1)

            for t in (tags_table, users_table):
                t.delete().execute()

class CorrelatedSubqueryTest(_base.MappedTest):
    """tests for #946, #947, #948.
    
    The "users" table is joined to "stuff", and the relationship
    would like to pull only the "stuff" entry with the most recent date.
    
    Exercises a variety of ways to configure this.
    
    """

    # another argument for joinedload learning about inner joins
    
    __requires__ = ('correlated_outer_joins', )
    
    @classmethod
    def define_tables(cls, metadata):
        users = Table('users', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('name', String(50))
            )

        stuff = Table('stuff', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('date', Date),
            Column('user_id', Integer, ForeignKey('users.id')))
    
    @classmethod
    @testing.resolve_artifact_names
    def insert_data(cls):
        users.insert().execute(
            {'id':1, 'name':'user1'},
            {'id':2, 'name':'user2'},
            {'id':3, 'name':'user3'},
        )

        stuff.insert().execute(
            {'id':1, 'user_id':1, 'date':datetime.date(2007, 10, 15)},
            {'id':2, 'user_id':1, 'date':datetime.date(2007, 12, 15)},
            {'id':3, 'user_id':1, 'date':datetime.date(2007, 11, 15)},
            {'id':4, 'user_id':2, 'date':datetime.date(2008, 1, 15)},
            {'id':5, 'user_id':3, 'date':datetime.date(2007, 6, 15)},
            {'id':6, 'user_id':3, 'date':datetime.date(2007, 3, 15)},
        )
        
    
    def test_labeled_on_date_noalias(self):
        self._do_test('label', True, False)

    def test_scalar_on_date_noalias(self):
        self._do_test('scalar', True, False)

    def test_plain_on_date_noalias(self):
        self._do_test('none', True, False)

    def test_labeled_on_limitid_noalias(self):
        self._do_test('label', False, False)

    def test_scalar_on_limitid_noalias(self):
        self._do_test('scalar', False, False)

    def test_plain_on_limitid_noalias(self):
        self._do_test('none', False, False)

    def test_labeled_on_date_alias(self):
        self._do_test('label', True, True)

    def test_scalar_on_date_alias(self):
        self._do_test('scalar', True, True)

    def test_plain_on_date_alias(self):
        self._do_test('none', True, True)

    def test_labeled_on_limitid_alias(self):
        self._do_test('label', False, True)

    def test_scalar_on_limitid_alias(self):
        self._do_test('scalar', False, True)

    def test_plain_on_limitid_alias(self):
        self._do_test('none', False, True)
        
    @testing.resolve_artifact_names
    def _do_test(self, labeled, ondate, aliasstuff):
        class User(_base.ComparableEntity):
            pass

        class Stuff(_base.ComparableEntity):
            pass
        
        mapper(Stuff, stuff)

        if aliasstuff:
            salias = stuff.alias()
        else:
            # if we don't alias the 'stuff' table within the correlated subquery, 
            # it gets aliased in the eager load along with the "stuff" table to "stuff_1".
            # but it's a scalar subquery, and this doesn't actually matter
            salias = stuff

        if ondate:
            # the more 'relational' way to do this, join on the max date
            stuff_view = select([func.max(salias.c.date).label('max_date')]).\
                                where(salias.c.user_id==users.c.id).correlate(users)
        else:
            # a common method with the MySQL crowd, which actually might perform better in some
            # cases - subquery does a limit with order by DESC, join on the id
            stuff_view = select([salias.c.id]).where(salias.c.user_id==users.c.id).\
                                    correlate(users).order_by(salias.c.date.desc()).limit(1)

        if labeled == 'label':
            stuff_view = stuff_view.label('foo')
        elif labeled == 'scalar':
            stuff_view = stuff_view.as_scalar()

        if ondate:
            mapper(User, users, properties={
                'stuff':relationship(Stuff, primaryjoin=and_(users.c.id==stuff.c.user_id, stuff.c.date==stuff_view))
            })
        else:
            mapper(User, users, properties={
                'stuff':relationship(Stuff, primaryjoin=and_(users.c.id==stuff.c.user_id, stuff.c.id==stuff_view))
            })
            
        sess = create_session()
        def go():
            eq_(
                sess.query(User).order_by(User.name).options(joinedload('stuff')).all(),
                [
                    User(name='user1', stuff=[Stuff(id=2)]),
                    User(name='user2', stuff=[Stuff(id=4)]),
                    User(name='user3', stuff=[Stuff(id=5)])
                ]
            )
        self.assert_sql_count(testing.db, go, 1)
    
        sess = create_session()
        def go():
            eq_(
                sess.query(User).order_by(User.name).first(),
                User(name='user1', stuff=[Stuff(id=2)])
            )
        self.assert_sql_count(testing.db, go, 2)

        sess = create_session()
        def go():
            eq_(
                sess.query(User).order_by(User.name).options(joinedload('stuff')).first(),
                User(name='user1', stuff=[Stuff(id=2)])
            )
        self.assert_sql_count(testing.db, go, 1)

        sess = create_session()
        def go():
            eq_(
                sess.query(User).filter(User.id==2).options(joinedload('stuff')).one(),
                User(name='user2', stuff=[Stuff(id=4)])
            )
        self.assert_sql_count(testing.db, go, 1)


www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.