test_query.py :  » Database » SQLAlchemy » SQLAlchemy-0.6.0 » test » orm » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » SQLAlchemy 
SQLAlchemy » SQLAlchemy 0.6.0 » test » orm » test_query.py
from sqlalchemy.test.testing import eq_,assert_raises,assert_raises_message
import operator
from sqlalchemy import *
from sqlalchemy import exc
from sqlalchemy.sql import compiler,table,column
from sqlalchemy.engine import default
from sqlalchemy.orm import *
from sqlalchemy.orm import attributes

from sqlalchemy.test.testing import eq_

import sqlalchemy as sa
from sqlalchemy.test import testing,AssertsCompiledSQL,Column,engines

from test.orm import _fixtures
from test.orm._fixtures import keywords,addresses,Base,Keyword,FixtureTest,\
           Dingaling, item_keywords, dingalings, User, items,\
           orders, Address, users, nodes, \
            order_items, Item, Order, Node, \
            composite_pk_table, CompositePk

from test.orm import _base

from sqlalchemy.orm.util import join,outerjoin,with_parent

class QueryTest(_fixtures.FixtureTest):
    run_setup_mappers = 'once'
    run_inserts = 'once'
    run_deletes = None


    @classmethod
    def setup_mappers(cls):
        mapper(User, users, properties={
            'addresses':relationship(Address, backref='user', order_by=addresses.c.id),
            'orders':relationship(Order, backref='user', order_by=orders.c.id), # o2m, m2o
        })
        mapper(Address, addresses, properties={
            'dingaling':relationship(Dingaling, uselist=False, backref="address")  #o2o
        })
        mapper(Dingaling, dingalings)
        mapper(Order, orders, properties={
            'items':relationship(Item, secondary=order_items, order_by=items.c.id),  #m2m
            'address':relationship(Address),  # m2o
        })
        mapper(Item, items, properties={
            'keywords':relationship(Keyword, secondary=item_keywords) #m2m
        })
        mapper(Keyword, keywords)

        mapper(Node, nodes, properties={
            'children':relationship(Node, 
                backref=backref('parent', remote_side=[nodes.c.id])
            )
        })

        mapper(CompositePk, composite_pk_table)

        compile_mappers()

class RowTupleTest(QueryTest):
    run_setup_mappers = None

    def test_custom_names(self):
        mapper(User, users, properties={
            'uname':users.c.name
        })
        
        row  = create_session().query(User.id, User.uname).filter(User.id==7).first()
        assert row.id == 7
        assert row.uname == 'jack'

class GetTest(QueryTest):
    def test_get(self):
        s = create_session()
        assert s.query(User).get(19) is None
        u = s.query(User).get(7)
        u2 = s.query(User).get(7)
        assert u is u2
        s.expunge_all()
        u2 = s.query(User).get(7)
        assert u is not u2

    def test_get_composite_pk(self):
        s = create_session()
        assert s.query(CompositePk).get((100,100)) is None
        one_two = s.query(CompositePk).get((1,2))
        assert one_two.i == 1
        assert one_two.j == 2
        assert one_two.k == 3
        q = s.query(CompositePk)
        assert_raises(sa_exc.InvalidRequestError, q.get, 7)        
    
    def test_get_null_pk(self):
        """test that a mapping which can have None in a 
        PK (i.e. map to an outerjoin) works with get()."""
        
        s = users.outerjoin(addresses)
        
        class UserThing(_base.ComparableEntity):
            pass
            
        mapper(UserThing, s, properties={
            'id':(users.c.id, addresses.c.user_id),
            'address_id':addresses.c.id,
        })
        sess = create_session()
        u10 = sess.query(UserThing).get((10, None))
        eq_(u10,
            UserThing(id=10)
        )

    def test_no_criterion(self):
        """test that get()/load() does not use preexisting filter/etc. criterion"""

        s = create_session()
        
        q = s.query(User).join('addresses').filter(Address.user_id==8)
        assert_raises(sa_exc.InvalidRequestError, q.get, 7)
        assert_raises(sa_exc.InvalidRequestError, s.query(User).filter(User.id==7).get, 19)
        
        # order_by()/get() doesn't raise
        s.query(User).order_by(User.id).get(8)

    def test_unique_param_names(self):
        class SomeUser(object):
            pass
        s = users.select(users.c.id!=12).alias('users')
        m = mapper(SomeUser, s)
        assert s.primary_key == m.primary_key

        sess = create_session()
        assert sess.query(SomeUser).get(7).name == 'jack'

    def test_load(self):
        s = create_session()

        assert s.query(User).populate_existing().get(19) is None

        u = s.query(User).populate_existing().get(7)
        u2 = s.query(User).populate_existing().get(7)
        assert u is u2
        s.expunge_all()
        u2 = s.query(User).populate_existing().get(7)
        assert u is not u2

        u2.name = 'some name'
        a = Address(email_address='some other name')
        u2.addresses.append(a)
        assert u2 in s.dirty
        assert a in u2.addresses

        s.query(User).populate_existing().get(7)
        assert u2 not in s.dirty
        assert u2.name =='jack'
        assert a not in u2.addresses

    @testing.requires.unicode_connections
    def test_unicode(self):
        """test that Query.get properly sets up the type for the bind parameter.  using unicode would normally fail
        on postgresql, mysql and oracle unless it is converted to an encoded string"""

        metadata = MetaData(engines.utf8_engine())
        table = Table('unicode_data', metadata,
            Column('id', Unicode(40), primary_key=True, test_needs_autoincrement=True),
            Column('data', Unicode(40)))
        try:
            metadata.create_all()
            # Py3K
            #ustring = b'petit voix m\xe2\x80\x99a'.decode('utf-8')
            # Py2K
            ustring = 'petit voix m\xe2\x80\x99a'.decode('utf-8')
            # end Py2K
            
            table.insert().execute(id=ustring, data=ustring)
            class LocalFoo(Base):
                pass
            mapper(LocalFoo, table)
            eq_(create_session().query(LocalFoo).get(ustring),
                              LocalFoo(id=ustring, data=ustring))
        finally:
            metadata.drop_all()

    def test_populate_existing(self):
        s = create_session()

        userlist = s.query(User).all()

        u = userlist[0]
        u.name = 'foo'
        a = Address(name='ed')
        u.addresses.append(a)

        self.assert_(a in u.addresses)

        s.query(User).populate_existing().all()

        self.assert_(u not in s.dirty)

        self.assert_(u.name == 'jack')

        self.assert_(a not in u.addresses)

        u.addresses[0].email_address = 'lala'
        u.orders[1].items[2].description = 'item 12'
        # test that lazy load doesnt change child items
        s.query(User).populate_existing().all()
        assert u.addresses[0].email_address == 'lala'
        assert u.orders[1].items[2].description == 'item 12'

        # eager load does
        s.query(User).options(joinedload('addresses'), joinedload_all('orders.items')).populate_existing().all()
        assert u.addresses[0].email_address == 'jack@bean.com'
        assert u.orders[1].items[2].description == 'item 5'

    @testing.fails_on_everything_except('sqlite', '+pyodbc', '+zxjdbc', 'mysql+oursql')
    def test_query_str(self):
        s = create_session()
        q = s.query(User).filter(User.id==1)
        eq_(
            str(q).replace('\n',''), 
            'SELECT users.id AS users_id, users.name AS users_name FROM users WHERE users.id = ?'
            )

class InvalidGenerationsTest(QueryTest):
    def test_no_limit_offset(self):
        s = create_session()
        
        for q in (
            s.query(User).limit(2),
            s.query(User).offset(2),
            s.query(User).limit(2).offset(2)
        ):
            assert_raises(sa_exc.InvalidRequestError, q.join, "addresses")

            assert_raises(sa_exc.InvalidRequestError, q.filter, User.name=='ed')

            assert_raises(sa_exc.InvalidRequestError, q.filter_by, name='ed')

            assert_raises(sa_exc.InvalidRequestError, q.order_by, 'foo')

            assert_raises(sa_exc.InvalidRequestError, q.group_by, 'foo')

            assert_raises(sa_exc.InvalidRequestError, q.having, 'foo')
    
            q.enable_assertions(False).join("addresses")
            q.enable_assertions(False).filter(User.name=='ed')
            q.enable_assertions(False).order_by('foo')
            q.enable_assertions(False).group_by('foo')
            
    def test_no_from(self):
        s = create_session()
    
        q = s.query(User).select_from(users)
        assert_raises(sa_exc.InvalidRequestError, q.select_from, users)

        q = s.query(User).join('addresses')
        assert_raises(sa_exc.InvalidRequestError, q.select_from, users)
        
        q = s.query(User).order_by(User.id)
        assert_raises(sa_exc.InvalidRequestError, q.select_from, users)

        assert_raises(sa_exc.InvalidRequestError, q.select_from, users)
        
        q.enable_assertions(False).select_from(users)
        
        # this is fine, however
        q.from_self()
    
    def test_invalid_select_from(self):
        s = create_session()
        q = s.query(User)
        assert_raises(sa_exc.ArgumentError, q.select_from, User.id==5)
        assert_raises(sa_exc.ArgumentError, q.select_from, User.id)

    def test_invalid_from_statement(self):
        s = create_session()
        q = s.query(User)
        assert_raises(sa_exc.ArgumentError, q.from_statement, User.id==5)
        assert_raises(sa_exc.ArgumentError, q.from_statement, users.join(addresses))
    
    def test_invalid_column(self):
        s = create_session()
        q = s.query(User)
        assert_raises(sa_exc.InvalidRequestError, q.add_column, object())
    
    def test_distinct(self):
        """test that a distinct() call is not valid before 'clauseelement' conditions."""
        
        s = create_session()
        q = s.query(User).distinct()
        assert_raises(sa_exc.InvalidRequestError, q.select_from, User)
        assert_raises(sa_exc.InvalidRequestError, q.from_statement, text("select * from table"))
        assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User)

    def test_order_by(self):
        """test that an order_by() call is not valid before 'clauseelement' conditions."""

        s = create_session()
        q = s.query(User).order_by(User.id)
        assert_raises(sa_exc.InvalidRequestError, q.select_from, User)
        assert_raises(sa_exc.InvalidRequestError, q.from_statement, text("select * from table"))
        assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User)
        
        
    def test_mapper_zero(self):
        s = create_session()
        
        q = s.query(User, Address)
        assert_raises(sa_exc.InvalidRequestError, q.get, 5)
        
    def test_from_statement(self):
        s = create_session()
        
        q = s.query(User).filter(User.id==5)
        assert_raises(sa_exc.InvalidRequestError, q.from_statement, "x")

        q = s.query(User).filter_by(id=5)
        assert_raises(sa_exc.InvalidRequestError, q.from_statement, "x")

        q = s.query(User).limit(5)
        assert_raises(sa_exc.InvalidRequestError, q.from_statement, "x")

        q = s.query(User).group_by(User.name)
        assert_raises(sa_exc.InvalidRequestError, q.from_statement, "x")

        q = s.query(User).order_by(User.name)
        assert_raises(sa_exc.InvalidRequestError, q.from_statement, "x")
        
class OperatorTest(QueryTest, AssertsCompiledSQL):
    """test sql.Comparator implementation for MapperProperties"""

    def _test(self, clause, expected):
        self.assert_compile(clause, expected, dialect=default.DefaultDialect())

    def test_arithmetic(self):
        create_session().query(User)
        for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'),
                                (operator.sub, '-'), 
                                # Py3k
                                #(operator.truediv, '/'),
                                # Py2K
                                (operator.div, '/'),
                                # end Py2K
                                ):
            for (lhs, rhs, res) in (
                (5, User.id, ':id_1 %s users.id'),
                (5, literal(6), ':param_1 %s :param_2'),
                (User.id, 5, 'users.id %s :id_1'),
                (User.id, literal('b'), 'users.id %s :param_1'),
                (User.id, User.id, 'users.id %s users.id'),
                (literal(5), 'b', ':param_1 %s :param_2'),
                (literal(5), User.id, ':param_1 %s users.id'),
                (literal(5), literal(6), ':param_1 %s :param_2'),
                ):
                self._test(py_op(lhs, rhs), res % sql_op)

    def test_comparison(self):
        create_session().query(User)
        ualias = aliased(User)
        
        for (py_op, fwd_op, rev_op) in ((operator.lt, '<', '>'),
                                        (operator.gt, '>', '<'),
                                        (operator.eq, '=', '='),
                                        (operator.ne, '!=', '!='),
                                        (operator.le, '<=', '>='),
                                        (operator.ge, '>=', '<=')):
            for (lhs, rhs, l_sql, r_sql) in (
                ('a', User.id, ':id_1', 'users.id'),
                ('a', literal('b'), ':param_2', ':param_1'), # note swap!
                (User.id, 'b', 'users.id', ':id_1'),
                (User.id, literal('b'), 'users.id', ':param_1'),
                (User.id, User.id, 'users.id', 'users.id'),
                (literal('a'), 'b', ':param_1', ':param_2'),
                (literal('a'), User.id, ':param_1', 'users.id'),
                (literal('a'), literal('b'), ':param_1', ':param_2'),
                (ualias.id, literal('b'), 'users_1.id', ':param_1'),
                (User.id, ualias.name, 'users.id', 'users_1.name'),
                (User.name, ualias.name, 'users.name', 'users_1.name'),
                (ualias.name, User.name, 'users_1.name', 'users.name'),
                ):

                # the compiled clause should match either (e.g.):
                # 'a' < 'b' -or- 'b' > 'a'.
                compiled = str(py_op(lhs, rhs).compile(dialect=default.DefaultDialect()))
                fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql)
                rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql)

                self.assert_(compiled == fwd_sql or compiled == rev_sql,
                             "\n'" + compiled + "'\n does not match\n'" +
                             fwd_sql + "'\n or\n'" + rev_sql + "'")
    
    def test_negated_null(self):
        self._test(User.id == None, "users.id IS NULL")
        self._test(~(User.id==None), "users.id IS NOT NULL")
        self._test(None == User.id, "users.id IS NULL")
        self._test(~(None == User.id), "users.id IS NOT NULL")
        self._test(Address.user == None, "addresses.user_id IS NULL")
        self._test(~(Address.user==None), "addresses.user_id IS NOT NULL")
        self._test(None == Address.user, "addresses.user_id IS NULL")
        self._test(~(None == Address.user), "addresses.user_id IS NOT NULL")
        
    def test_relationship(self):
        self._test(User.addresses.any(Address.id==17), 
                        "EXISTS (SELECT 1 "
                        "FROM addresses "
                        "WHERE users.id = addresses.user_id AND addresses.id = :id_1)"
                    )

        u7 = User(id=7)
        attributes.instance_state(u7).commit_all(attributes.instance_dict(u7))
        
        self._test(Address.user == u7, ":param_1 = addresses.user_id")

        self._test(Address.user != u7, "addresses.user_id != :user_id_1 OR addresses.user_id IS NULL")

        self._test(Address.user == None, "addresses.user_id IS NULL")

        self._test(Address.user != None, "addresses.user_id IS NOT NULL")

    def test_selfref_relationship(self):
        nalias = aliased(Node)

        # auto self-referential aliasing
        self._test(
            Node.children.any(Node.data=='n1'), 
                "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE "
                "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)"
        )

        # needs autoaliasing
        self._test(
            Node.children==None, 
            "NOT (EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE nodes.id = nodes_1.parent_id))"
        )
        
        self._test(
            Node.parent==None,
            "nodes.parent_id IS NULL"
        )

        self._test(
            nalias.parent==None,
            "nodes_1.parent_id IS NULL"
        )

        self._test(
            nalias.children==None, 
            "NOT (EXISTS (SELECT 1 FROM nodes WHERE nodes_1.id = nodes.parent_id))"
        )
        
        self._test(
                nalias.children.any(Node.data=='some data'), 
                "EXISTS (SELECT 1 FROM nodes WHERE "
                "nodes_1.id = nodes.parent_id AND nodes.data = :data_1)")
        
        # fails, but I think I want this to fail
        #self._test(
        #        Node.children.any(nalias.data=='some data'), 
        #        "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE "
        #        "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)"
        #        )

        self._test(
            nalias.parent.has(Node.data=='some data'), 
           "EXISTS (SELECT 1 FROM nodes WHERE nodes.id = nodes_1.parent_id AND nodes.data = :data_1)"
        )

        self._test(
            Node.parent.has(Node.data=='some data'), 
           "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE nodes_1.id = nodes.parent_id AND nodes_1.data = :data_1)"
        )
        
        self._test(
            Node.parent == Node(id=7), 
            ":param_1 = nodes.parent_id"
        )

        self._test(
            nalias.parent == Node(id=7), 
            ":param_1 = nodes_1.parent_id"
        )

        self._test(
            nalias.parent != Node(id=7), 
            'nodes_1.parent_id != :parent_id_1 OR nodes_1.parent_id IS NULL'
        )
        
        self._test(
            nalias.children.contains(Node(id=7)), "nodes_1.id = :param_1"
        )
        
    def test_op(self):
        self._test(User.name.op('ilike')('17'), "users.name ilike :name_1")

    def test_in(self):
         self._test(User.id.in_(['a', 'b']),
                    "users.id IN (:id_1, :id_2)")

    def test_in_on_relationship_not_supported(self):
        assert_raises(NotImplementedError, Address.user.in_, [User(id=5)])
    
    def test_neg(self):
        self._test(-User.id, "-users.id")
        self._test(User.id + -User.id, "users.id + -users.id")
        
    def test_between(self):
        self._test(User.id.between('a', 'b'),
                   "users.id BETWEEN :id_1 AND :id_2")

    def test_selfref_between(self):
        ualias = aliased(User)
        self._test(User.id.between(ualias.id, ualias.id), "users.id BETWEEN users_1.id AND users_1.id")
        self._test(ualias.id.between(User.id, User.id), "users_1.id BETWEEN users.id AND users.id")

    def test_clauses(self):
        for (expr, compare) in (
            (func.max(User.id), "max(users.id)"),
            (User.id.desc(), "users.id DESC"),
            (between(5, User.id, Address.id), ":param_1 BETWEEN users.id AND addresses.id"),
            # this one would require adding compile() to InstrumentedScalarAttribute.  do we want this ?
            #(User.id, "users.id")
        ):
            c = expr.compile(dialect=default.DefaultDialect())
            assert str(c) == compare, "%s != %s" % (str(c), compare)


class RawSelectTest(QueryTest, AssertsCompiledSQL):
    """compare a bunch of select() tests with the equivalent Query using straight table/columns.
    
    Results should be the same as Query should act as a select() pass-thru for ClauseElement entities.
    
    """
    def test_select(self):
        sess = create_session()

        self.assert_compile(sess.query(users).select_from(users.select()).with_labels().statement, 
            "SELECT users.id AS users_id, users.name AS users_name FROM users, "
            "(SELECT users.id AS id, users.name AS name FROM users) AS anon_1",
            dialect=default.DefaultDialect()
            )

        self.assert_compile(sess.query(users, exists([1], from_obj=addresses)).with_labels().statement, 
            "SELECT users.id AS users_id, users.name AS users_name, EXISTS "
            "(SELECT 1 FROM addresses) AS anon_1 FROM users",
            dialect=default.DefaultDialect()
            )

        # a little tedious here, adding labels to work around Query's auto-labelling.
        # also correlate needed explicitly.  hmmm.....
        # TODO: can we detect only one table in the "froms" and then turn off use_labels ?
        s = sess.query(addresses.c.id.label('id'), addresses.c.email_address.label('email')).\
            filter(addresses.c.user_id==users.c.id).correlate(users).statement.alias()
            
        self.assert_compile(sess.query(users, s.c.email).select_from(users.join(s, s.c.id==users.c.id)).with_labels().statement, 
                "SELECT users.id AS users_id, users.name AS users_name, anon_1.email AS anon_1_email "
                "FROM users JOIN (SELECT addresses.id AS id, addresses.email_address AS email FROM addresses "
                "WHERE addresses.user_id = users.id) AS anon_1 ON anon_1.id = users.id",
                dialect=default.DefaultDialect()
            )

        x = func.lala(users.c.id).label('foo')
        self.assert_compile(sess.query(x).filter(x==5).statement, 
            "SELECT lala(users.id) AS foo FROM users WHERE lala(users.id) = :param_1", dialect=default.DefaultDialect())

        self.assert_compile(sess.query(func.sum(x).label('bar')).statement,  
            "SELECT sum(lala(users.id)) AS bar FROM users", dialect=default.DefaultDialect()) 

class ExpressionTest(QueryTest, AssertsCompiledSQL):
        
    def test_deferred_instances(self):
        session = create_session()
        s = session.query(User).filter(and_(addresses.c.email_address == bindparam('emailad'), Address.user_id==User.id)).statement

        l = list(session.query(User).instances(s.execute(emailad = 'jack@bean.com')))
        eq_([User(id=7)], l)

    def test_scalar_subquery(self):
        session = create_session()
        
        q = session.query(User.id).filter(User.id==7).subquery()
        
        q = session.query(User).filter(User.id==q)
        
        eq_(User(id=7), q.one())
        
        
    def test_in(self):
        session = create_session()
        s = session.query(User.id).join(User.addresses).group_by(User.id).having(func.count(Address.id) > 2)
        eq_(
            session.query(User).filter(User.id.in_(s)).all(),
            [User(id=8)]
        )

    def test_union(self):
        s = create_session()
        
        q1 = s.query(User).filter(User.name=='ed').with_labels()
        q2 = s.query(User).filter(User.name=='fred').with_labels()
        eq_(
            s.query(User).from_statement(union(q1, q2).order_by('users_name')).all(),
            [User(name='ed'), User(name='fred')]
        )
    
    def test_select(self):
        s = create_session()
        
        # this is actually not legal on most DBs since the subquery has no alias
        q1 = s.query(User).filter(User.name=='ed')


        self.assert_compile(
            select([q1]),
            "SELECT users_id, users_name FROM (SELECT users.id AS users_id, "
            "users.name AS users_name FROM users WHERE users.name = :name_1)",
            dialect=default.DefaultDialect()
        )
        
    def test_join(self):
        s = create_session()

        # TODO: do we want aliased() to detect a query and convert to subquery() 
        # automatically ?
        q1 = s.query(Address).filter(Address.email_address=='jack@bean.com')
        adalias = aliased(Address, q1.subquery())
        eq_(
            s.query(User, adalias).join((adalias, User.id==adalias.user_id)).all(),
            [(User(id=7,name=u'jack'), Address(email_address=u'jack@bean.com',user_id=7,id=1))]
        )
        
# more slice tests are available in test/orm/generative.py
class SliceTest(QueryTest):
    def test_first(self):
        assert  User(id=7) == create_session().query(User).first()

        assert create_session().query(User).filter(User.id==27).first() is None

    @testing.fails_on_everything_except('sqlite')
    def test_limit_offset_applies(self):
        """Test that the expected LIMIT/OFFSET is applied for slices.
        
        The LIMIT/OFFSET syntax differs slightly on all databases, and
        query[x:y] executes immediately, so we are asserting against
        SQL strings using sqlite's syntax.
        
        """
        sess = create_session()
        q = sess.query(User)
        
        self.assert_sql(testing.db, lambda: q[10:20], [
            ("SELECT users.id AS users_id, users.name AS users_name FROM users  LIMIT 10 OFFSET 10", {})
        ])

        self.assert_sql(testing.db, lambda: q[:20], [
            ("SELECT users.id AS users_id, users.name AS users_name FROM users  LIMIT 20 OFFSET 0", {})
        ])

        self.assert_sql(testing.db, lambda: q[5:], [
            ("SELECT users.id AS users_id, users.name AS users_name FROM users  LIMIT -1 OFFSET 5", {})
        ])

        self.assert_sql(testing.db, lambda: q[2:2], [])

        self.assert_sql(testing.db, lambda: q[-2:-5], [])

        self.assert_sql(testing.db, lambda: q[-5:-2], [
            ("SELECT users.id AS users_id, users.name AS users_name FROM users", {})
        ])

        self.assert_sql(testing.db, lambda: q[-5:], [
            ("SELECT users.id AS users_id, users.name AS users_name FROM users", {})
        ])

        self.assert_sql(testing.db, lambda: q[:], [
            ("SELECT users.id AS users_id, users.name AS users_name FROM users", {})
        ])


    
class FilterTest(QueryTest):
    def test_basic(self):
        assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).all()

    @testing.fails_on('maxdb', 'FIXME: unknown')
    def test_limit(self):
        assert [User(id=8), User(id=9)] == create_session().query(User).order_by(User.id).limit(2).offset(1).all()

        assert [User(id=8), User(id=9)] == list(create_session().query(User).order_by(User.id)[1:3])

        assert User(id=8) == create_session().query(User).order_by(User.id)[1]
    
        assert [] == create_session().query(User).order_by(User.id)[3:3]
        assert [] == create_session().query(User).order_by(User.id)[0:0]
        
    @testing.requires.boolean_col_expressions
    def test_exists(self):
        sess = create_session(testing.db)
        
        assert sess.query(exists().where(User.id==9)).scalar()
        assert not sess.query(exists().where(User.id==29)).scalar()
        
    def test_one_filter(self):
        assert [User(id=8), User(id=9)] == create_session().query(User).filter(User.name.endswith('ed')).all()
    
    def test_contains(self):
        """test comparing a collection to an object instance."""

        sess = create_session()
        address = sess.query(Address).get(3)
        assert [User(id=8)] == sess.query(User).filter(User.addresses.contains(address)).all()

        try:
            sess.query(User).filter(User.addresses == address)
            assert False
        except sa_exc.InvalidRequestError:
            assert True

        assert [User(id=10)] == sess.query(User).filter(User.addresses==None).all()

        try:
            assert [User(id=7), User(id=9), User(id=10)] == sess.query(User).filter(User.addresses!=address).all()
            assert False
        except sa_exc.InvalidRequestError:
            assert True

        #assert [User(id=7), User(id=9), User(id=10)] == sess.query(User).filter(User.addresses!=address).all()

    def test_any(self):
        sess = create_session()

        assert [User(id=8), User(id=9)] == sess.query(User).filter(User.addresses.any(Address.email_address.like('%ed%'))).all()

        assert [User(id=8)] == sess.query(User).filter(User.addresses.any(Address.email_address.like('%ed%'), id=4)).all()

        assert [User(id=8)] == sess.query(User).filter(User.addresses.any(Address.email_address.like('%ed%'))).\
            filter(User.addresses.any(id=4)).all()

        assert [User(id=9)] == sess.query(User).filter(User.addresses.any(email_address='fred@fred.com')).all()
        
        # test that any() doesn't overcorrelate
        assert [User(id=7), User(id=8)] == sess.query(User).join("addresses").filter(~User.addresses.any(Address.email_address=='fred@fred.com')).all()
        
        # test that the contents are not adapted by the aliased join
        assert [User(id=7), User(id=8)] == sess.query(User).join("addresses", aliased=True).filter(~User.addresses.any(Address.email_address=='fred@fred.com')).all()

        assert [User(id=10)] == sess.query(User).outerjoin("addresses", aliased=True).filter(~User.addresses.any()).all()
        
    @testing.crashes('maxdb', 'can dump core')
    def test_has(self):
        sess = create_session()
        assert [Address(id=5)] == sess.query(Address).filter(Address.user.has(name='fred')).all()

        assert [Address(id=2), Address(id=3), Address(id=4), Address(id=5)] == \
                sess.query(Address).filter(Address.user.has(User.name.like('%ed%'))).order_by(Address.id).all()

        assert [Address(id=2), Address(id=3), Address(id=4)] == \
            sess.query(Address).filter(Address.user.has(User.name.like('%ed%'), id=8)).order_by(Address.id).all()

        # test has() doesn't overcorrelate
        assert [Address(id=2), Address(id=3), Address(id=4)] == \
            sess.query(Address).join("user").filter(Address.user.has(User.name.like('%ed%'), id=8)).order_by(Address.id).all()

        # test has() doesnt' get subquery contents adapted by aliased join
        assert [Address(id=2), Address(id=3), Address(id=4)] == \
            sess.query(Address).join("user", aliased=True).filter(Address.user.has(User.name.like('%ed%'), id=8)).order_by(Address.id).all()
        
        dingaling = sess.query(Dingaling).get(2)
        assert [User(id=9)] == sess.query(User).filter(User.addresses.any(Address.dingaling==dingaling)).all()
        
    def test_contains_m2m(self):
        sess = create_session()
        item = sess.query(Item).get(3)
        assert [Order(id=1), Order(id=2), Order(id=3)] == sess.query(Order).filter(Order.items.contains(item)).all()

        assert [Order(id=4), Order(id=5)] == sess.query(Order).filter(~Order.items.contains(item)).all()

        item2 = sess.query(Item).get(5)
        assert [Order(id=3)] == sess.query(Order).filter(Order.items.contains(item)).filter(Order.items.contains(item2)).all()
        

    def test_comparison(self):
        """test scalar comparison to an object instance"""

        sess = create_session()
        user = sess.query(User).get(8)
        assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter(Address.user==user).all()

        assert [Address(id=1), Address(id=5)] == sess.query(Address).filter(Address.user!=user).all()

        # generates an IS NULL
        assert [] == sess.query(Address).filter(Address.user == None).all()
        assert [] == sess.query(Address).filter(Address.user == null()).all()

        assert [Order(id=5)] == sess.query(Order).filter(Order.address == None).all()

        # o2o
        dingaling = sess.query(Dingaling).get(2)
        assert [Address(id=5)] == sess.query(Address).filter(Address.dingaling==dingaling).all()

        # m2m
        eq_(sess.query(Item).filter(Item.keywords==None).order_by(Item.id).all(), [Item(id=4), Item(id=5)])
        eq_(sess.query(Item).filter(Item.keywords!=None).order_by(Item.id).all(), [Item(id=1),Item(id=2), Item(id=3)])
    
    def test_filter_by(self):
        sess = create_session()
        user = sess.query(User).get(8)
        assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter_by(user=user).all()

        # many to one generates IS NULL
        assert [] == sess.query(Address).filter_by(user = None).all()
        assert [] == sess.query(Address).filter_by(user = null()).all()

        # one to many generates WHERE NOT EXISTS
        assert [User(name='chuck')] == sess.query(User).filter_by(addresses = None).all()
        assert [User(name='chuck')] == sess.query(User).filter_by(addresses = null()).all()
    
    def test_none_comparison(self):
        sess = create_session()
        
        # scalar
        eq_(
            [Order(description="order 5")],
            sess.query(Order).filter(Order.address_id==None).all()
        )
        eq_(
            [Order(description="order 5")],
            sess.query(Order).filter(Order.address_id==null()).all()
        )
        
        # o2o
        eq_([Address(id=1), Address(id=3), Address(id=4)], 
            sess.query(Address).filter(Address.dingaling==None).order_by(Address.id).all())
        eq_([Address(id=1), Address(id=3), Address(id=4)], 
            sess.query(Address).filter(Address.dingaling==null()).order_by(Address.id).all())
        eq_([Address(id=2), Address(id=5)], sess.query(Address).filter(Address.dingaling != None).order_by(Address.id).all())
        eq_([Address(id=2), Address(id=5)], sess.query(Address).filter(Address.dingaling != null()).order_by(Address.id).all())
        
        # m2o
        eq_([Order(id=5)], sess.query(Order).filter(Order.address==None).all())
        eq_([Order(id=1), Order(id=2), Order(id=3), Order(id=4)], sess.query(Order).order_by(Order.id).filter(Order.address!=None).all())
        
        # o2m
        eq_([User(id=10)], sess.query(User).filter(User.addresses==None).all())
        eq_([User(id=7),User(id=8),User(id=9)], sess.query(User).filter(User.addresses!=None).order_by(User.id).all())

    def test_blank_filter_by(self):
        eq_(
            [(7,), (8,), (9,), (10,)],
            create_session().query(User.id).filter_by().order_by(User.id).all()
        )
        eq_(
            [(7,), (8,), (9,), (10,)],
            create_session().query(User.id).filter_by(**{}).order_by(User.id).all()
        )


class FromSelfTest(QueryTest, AssertsCompiledSQL):
    def test_filter(self):

        assert [User(id=8), User(id=9)] == create_session().query(User).filter(User.id.in_([8,9])).from_self().all()

        assert [User(id=8), User(id=9)] == create_session().query(User).order_by(User.id).slice(1,3).from_self().all()
        assert [User(id=8)] == list(create_session().query(User).filter(User.id.in_([8,9])).from_self().order_by(User.id)[0:1])
    
    def test_join(self):
        assert [
            (User(id=8), Address(id=2)),
            (User(id=8), Address(id=3)),
            (User(id=8), Address(id=4)),
            (User(id=9), Address(id=5))
        ] == create_session().query(User).filter(User.id.in_([8,9])).from_self().\
            join('addresses').add_entity(Address).order_by(User.id, Address.id).all()
    
    def test_group_by(self):
        eq_(
            create_session().query(Address.user_id, func.count(Address.id).label('count')).\
                            group_by(Address.user_id).order_by(Address.user_id).all(),
            [(7, 1), (8, 3), (9, 1)]
        )

        eq_(
            create_session().query(Address.user_id, Address.id).\
                            from_self(Address.user_id, func.count(Address.id)).\
                            group_by(Address.user_id).order_by(Address.user_id).all(),
            [(7, 1), (8, 3), (9, 1)]
        )
        
    def test_no_joinedload(self):
        """test that joinedloads are pushed outwards and not rendered in subqueries."""
        
        s = create_session()
        
        oracle_as = not testing.against('oracle') and "AS " or ""
        
        self.assert_compile(
            s.query(User).options(joinedload(User.addresses)).from_self().statement,
            "SELECT anon_1.users_id, anon_1.users_name, addresses_1.id, addresses_1.user_id, "\
            "addresses_1.email_address FROM (SELECT users.id AS users_id, users.name AS users_name FROM users) %(oracle_as)sanon_1 "\
            "LEFT OUTER JOIN addresses %(oracle_as)saddresses_1 ON anon_1.users_id = addresses_1.user_id ORDER BY addresses_1.id" % {
                'oracle_as':oracle_as
            }
        )
            
    def test_aliases(self):
        """test that aliased objects are accessible externally to a from_self() call."""
        
        s = create_session()
        
        ualias = aliased(User)
        eq_(
            s.query(User, ualias).filter(User.id > ualias.id).from_self(User.name, ualias.name).
                    order_by(User.name, ualias.name).all(),
            [
                (u'chuck', u'ed'), 
                (u'chuck', u'fred'), 
                (u'chuck', u'jack'), 
                (u'ed', u'jack'), 
                (u'fred', u'ed'), 
                (u'fred', u'jack')
            ]
        )

        eq_(
            s.query(User, ualias).filter(User.id > ualias.id).from_self(User.name, ualias.name).filter(ualias.name=='ed')\
                .order_by(User.name, ualias.name).all(),
            [(u'chuck', u'ed'), (u'fred', u'ed')]
        )

        eq_(
            s.query(User, ualias).filter(User.id > ualias.id).from_self(ualias.name, Address.email_address).
                    join(ualias.addresses).order_by(ualias.name, Address.email_address).all(),
            [
                (u'ed', u'fred@fred.com'), 
                (u'jack', u'ed@bettyboop.com'), 
                (u'jack', u'ed@lala.com'), 
                (u'jack', u'ed@wood.com'), 
                (u'jack', u'fred@fred.com')]
        )
        
        
    def test_multiple_entities(self):
        sess = create_session()

        eq_(
            sess.query(User, Address).filter(User.id==Address.user_id).filter(Address.id.in_([2, 5])).from_self().all(),
            [
                (User(id=8), Address(id=2)),
                (User(id=9), Address(id=5))
            ]
        )

        eq_(
            sess.query(User, Address).filter(User.id==Address.user_id).filter(Address.id.in_([2, 5])).from_self().options(joinedload('addresses')).first(),
            
            #    order_by(User.id, Address.id).first(),
            (User(id=8, addresses=[Address(), Address(), Address()]), Address(id=2)),
        )

    def test_multiple_with_column_entities(self):
        sess = create_session()
        
        eq_(
            sess.query(User.id).from_self().\
                add_column(func.count().label('foo')).\
                group_by(User.id).\
                order_by(User.id).\
                from_self().all(),
            [
                (7,1), (8, 1), (9, 1), (10, 1)
            ]
            
        )

    
class SetOpsTest(QueryTest, AssertsCompiledSQL):
    
    def test_union(self):
        s = create_session()
        
        fred = s.query(User).filter(User.name=='fred')
        ed = s.query(User).filter(User.name=='ed')
        jack = s.query(User).filter(User.name=='jack')
        
        eq_(fred.union(ed).order_by(User.name).all(), 
            [User(name='ed'), User(name='fred')]
        )

        eq_(fred.union(ed, jack).order_by(User.name).all(), 
            [User(name='ed'), User(name='fred'), User(name='jack')]
        )
    
    def test_statement_labels(self):
        """test that label conflicts don't occur with joins etc."""
        
        s = create_session()
        q1 = s.query(User, Address).join(User.addresses).\
                                    filter(Address.email_address=="ed@wood.com")
        q2 = s.query(User, Address).join(User.addresses).\
                                    filter(Address.email_address=="jack@bean.com")
        q3 = q1.union(q2).order_by(User.name)
        
        eq_(
            q3.all(),
            [
                (User(name='ed'), Address(email_address="ed@wood.com")),
                (User(name='jack'), Address(email_address="jack@bean.com")),
            ]
        )
        
    def test_union_labels(self):
        """test that column expressions translate during 
            the _from_statement() portion of union(), others"""
        
        s = create_session()
        q1 = s.query(User, literal("x"))
        q2 = s.query(User, literal_column("'y'"))
        q3 = q1.union(q2)

        self.assert_compile(
            q3,
            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS anon_1_users_name,"
            " anon_1.anon_2 AS anon_1_anon_2 FROM (SELECT users.id AS users_id, users.name AS"
            " users_name, :param_1 AS anon_2 FROM users UNION SELECT users.id AS users_id, "
            "users.name AS users_name, 'y' FROM users) AS anon_1"
            , use_default_dialect = True
        )

        q4 = s.query(User, literal_column("'x'").label('foo'))
        q5 = s.query(User, literal("y"))
        q6 = q4.union(q5)
        
        for q in (q3.order_by(User.id, "anon_1_anon_2"), q6.order_by(User.id, "foo")):
            eq_(q.all(),
                [
                    (User(id=7, name=u'jack'), u'x'), 
                    (User(id=7, name=u'jack'), u'y'), 
                    (User(id=8, name=u'ed'), u'x'), 
                    (User(id=8, name=u'ed'), u'y'), 
                    (User(id=9, name=u'fred'), u'x'), 
                    (User(id=9, name=u'fred'), u'y'), 
                    (User(id=10, name=u'chuck'), u'x'), 
                    (User(id=10, name=u'chuck'), u'y')
                ]
            )
        
    @testing.fails_on('mysql', "mysql doesn't support intersect")
    def test_intersect(self):
        s = create_session()

        fred = s.query(User).filter(User.name=='fred')
        ed = s.query(User).filter(User.name=='ed')
        jack = s.query(User).filter(User.name=='jack')
        eq_(fred.intersect(ed, jack).all(), 
            []
        )

        eq_(fred.union(ed).intersect(ed.union(jack)).all(), 
            [User(name='ed')]
        )
    
    def test_eager_load(self):
        s = create_session()

        fred = s.query(User).filter(User.name=='fred')
        ed = s.query(User).filter(User.name=='ed')
        jack = s.query(User).filter(User.name=='jack')

        def go():
            eq_(
                fred.union(ed).order_by(User.name).options(joinedload(User.addresses)).all(), 
                [
                    User(name='ed', addresses=[Address(), Address(), Address()]), 
                    User(name='fred', addresses=[Address()])
                ]
            )
        self.assert_sql_count(testing.db, go, 1)
        
        
class AggregateTest(QueryTest):

    def test_sum(self):
        sess = create_session()
        orders = sess.query(Order).filter(Order.id.in_([2, 3, 4]))
        eq_(orders.values(func.sum(Order.user_id * Order.address_id)).next(), (79,))
        eq_(orders.value(func.sum(Order.user_id * Order.address_id)), 79)

    def test_apply(self):
        sess = create_session()
        assert sess.query(func.sum(Order.user_id * Order.address_id)).filter(Order.id.in_([2, 3, 4])).one() == (79,)

    def test_having(self):
        sess = create_session()
        assert [User(name=u'ed',id=8)] == sess.query(User).order_by(User.id).group_by(User).join('addresses').having(func.count(Address.id)> 2).all()

        assert [User(name=u'jack',id=7), User(name=u'fred',id=9)] == sess.query(User).order_by(User.id).group_by(User).join('addresses').having(func.count(Address.id)< 2).all()

class CountTest(QueryTest):
    def test_basic(self):
        s = create_session()
        
        eq_(s.query(User).count(), 4)

        eq_(s.query(User).filter(users.c.name.endswith('ed')).count(), 2)

    def test_multiple_entity(self):
        s = create_session()
        q = s.query(User, Address)
        eq_(q.count(), 20)  # cartesian product
        
        q = s.query(User, Address).join(User.addresses)
        eq_(q.count(), 5)
    
    def test_nested(self):
        s = create_session()
        q = s.query(User, Address).limit(2)
        eq_(q.count(), 2)

        q = s.query(User, Address).limit(100)
        eq_(q.count(), 20)

        q = s.query(User, Address).join(User.addresses).limit(100)
        eq_(q.count(), 5)
    
    def test_cols(self):
        """test that column-based queries always nest."""
        
        s = create_session()
        
        q = s.query(func.count(distinct(User.name)))
        eq_(q.count(), 1)

        q = s.query(func.count(distinct(User.name))).distinct()
        eq_(q.count(), 1)

        q = s.query(User.name)
        eq_(q.count(), 4)

        q = s.query(User.name, Address)
        eq_(q.count(), 20)

        q = s.query(Address.user_id)
        eq_(q.count(), 5)
        eq_(q.distinct().count(), 3)
        
        
class DistinctTest(QueryTest):
    def test_basic(self):
        eq_(
            [User(id=7), User(id=8), User(id=9),User(id=10)],
            create_session().query(User).order_by(User.id).distinct().all()
        )
        eq_(
            [User(id=7), User(id=9), User(id=8),User(id=10)], 
            create_session().query(User).distinct().order_by(desc(User.name)).all()
        ) 

    def test_joined(self):
        """test that orderbys from a joined table get placed into the columns clause when DISTINCT is used"""

        sess = create_session()
        q = sess.query(User).join('addresses').distinct().order_by(desc(Address.email_address))

        assert [User(id=7), User(id=9), User(id=8)] == q.all()

        sess.expunge_all()

        # test that it works on embedded joinedload/LIMIT subquery
        q = sess.query(User).join('addresses').distinct().options(joinedload('addresses')).order_by(desc(Address.email_address)).limit(2)

        def go():
            assert [
                User(id=7, addresses=[
                    Address(id=1)
                ]),
                User(id=9, addresses=[
                    Address(id=5)
                ]),
            ] == q.all()
        self.assert_sql_count(testing.db, go, 1)


class YieldTest(QueryTest):
    def test_basic(self):
        sess = create_session()
        q = iter(sess.query(User).yield_per(1).from_statement("select * from users"))

        ret = []
        eq_(len(sess.identity_map), 0)
        ret.append(q.next())
        ret.append(q.next())
        eq_(len(sess.identity_map), 2)
        ret.append(q.next())
        ret.append(q.next())
        eq_(len(sess.identity_map), 4)
        try:
            q.next()
            assert False
        except StopIteration:
            pass

class HintsTest(QueryTest, AssertsCompiledSQL):
    def test_hints(self):
        from sqlalchemy.dialects import mysql
        dialect = mysql.dialect()
        
        sess = create_session()
        
        self.assert_compile(
            sess.query(User).with_hint(User, 'USE INDEX (col1_index,col2_index)'),
            "SELECT users.id AS users_id, users.name AS users_name "
            "FROM users USE INDEX (col1_index,col2_index)",
            dialect=dialect
        )

        self.assert_compile(
            sess.query(User).with_hint(User, 'WITH INDEX col1_index', 'sybase'),
            "SELECT users.id AS users_id, users.name AS users_name "
            "FROM users",
            dialect=dialect
        )
        
        ualias = aliased(User)
        self.assert_compile(
            sess.query(User, ualias).with_hint(ualias, 'USE INDEX (col1_index,col2_index)').
                join((ualias, ualias.id > User.id)),
            "SELECT users.id AS users_id, users.name AS users_name, "
            "users_1.id AS users_1_id, users_1.name AS users_1_name "
            "FROM users INNER JOIN users AS users_1 USE INDEX (col1_index,col2_index) "
            "ON users.id < users_1.id",
            dialect=dialect
        )
    

class TextTest(QueryTest):
    def test_fulltext(self):
        assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).from_statement("select * from users order by id").all()

        assert User(id=7) == create_session().query(User).from_statement("select * from users order by id").first()
        assert None == create_session().query(User).from_statement("select * from users where name='nonexistent'").first()

    def test_fragment(self):
        assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (8, 9)").all()

        assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter("id=9").all()

        assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter(User.id==9).all()

    def test_binds(self):
        assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (:id1, :id2)").params(id1=8, id2=9).all()

    def test_as_column(self):
        s = create_session()
        assert_raises(sa_exc.InvalidRequestError, s.query, User.id, text("users.name"))

        eq_(s.query(User.id, "name").order_by(User.id).all(), [(7, u'jack'), (8, u'ed'), (9, u'fred'), (10, u'chuck')])

class ParentTest(QueryTest):
    def test_o2m(self):
        sess = create_session()
        q = sess.query(User)

        u1 = q.filter_by(name='jack').one()

        # test auto-lookup of property
        o = sess.query(Order).with_parent(u1).all()
        assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o

        # test with explicit property
        o = sess.query(Order).with_parent(u1, property='orders').all()
        assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o

        o = sess.query(Order).filter(with_parent(u1, User.orders)).all()
        assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
        
        # test generative criterion
        o = sess.query(Order).with_parent(u1).filter(orders.c.id>2).all()
        assert [Order(description="order 3"), Order(description="order 5")] == o

        # test against None for parent? this can't be done with the current API since we don't know
        # what mapper to use
        #assert sess.query(Order).with_parent(None, property='addresses').all() == [Order(description="order 5")]

    def test_noparent(self):
        sess = create_session()
        q = sess.query(User)

        u1 = q.filter_by(name='jack').one()

        try:
            q = sess.query(Item).with_parent(u1)
            assert False
        except sa_exc.InvalidRequestError, e:
            assert str(e) == "Could not locate a property which relates instances of class 'Item' to instances of class 'User'"

    def test_m2m(self):
        sess = create_session()
        i1 = sess.query(Item).filter_by(id=2).one()
        k = sess.query(Keyword).with_parent(i1).all()
        assert [Keyword(name='red'), Keyword(name='small'), Keyword(name='square')] == k

class InheritedJoinTest(_base.MappedTest, AssertsCompiledSQL):
    run_setup_mappers = 'once'
    
    @classmethod
    def define_tables(cls, metadata):
        Table('companies', metadata,
           Column('company_id', Integer, primary_key=True, test_needs_autoincrement=True),
           Column('name', String(50)))

        Table('people', metadata,
           Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
           Column('company_id', Integer, ForeignKey('companies.company_id')),
           Column('name', String(50)),
           Column('type', String(30)))

        Table('engineers', metadata,
           Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
           Column('status', String(30)),
           Column('engineer_name', String(50)),
           Column('primary_language', String(50)),
          )
     
        Table('machines', metadata,
            Column('machine_id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('name', String(50)),
            Column('engineer_id', Integer, ForeignKey('engineers.person_id')))
        
        Table('managers', metadata,
           Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
           Column('status', String(30)),
           Column('manager_name', String(50))
           )

        Table('boss', metadata,
            Column('boss_id', Integer, ForeignKey('managers.person_id'), primary_key=True),
            Column('golf_swing', String(30)),
            )

        Table('paperwork', metadata,
            Column('paperwork_id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('description', String(50)),
            Column('person_id', Integer, ForeignKey('people.person_id')))
    
    @classmethod
    @testing.resolve_artifact_names
    def setup_classes(cls):
        class Company(_fixtures.Base):
            pass
        class Person(_fixtures.Base):
            pass
        class Engineer(Person):
            pass
        class Manager(Person):
            pass
        class Boss(Manager):
            pass
        class Machine(_fixtures.Base):
            pass
        class Paperwork(_fixtures.Base):
            pass

        mapper(Company, companies, properties={
            'employees':relationship(Person, order_by=people.c.person_id)
        })

        mapper(Machine, machines)

        mapper(Person, people, 
            polymorphic_on=people.c.type, 
            polymorphic_identity='person', 
            order_by=people.c.person_id, 
            properties={
                'paperwork':relationship(Paperwork, order_by=paperwork.c.paperwork_id)
            })
        mapper(Engineer, engineers, inherits=Person, polymorphic_identity='engineer', properties={
                'machines':relationship(Machine, order_by=machines.c.machine_id)
            })
        mapper(Manager, managers, 
                    inherits=Person, polymorphic_identity='manager')
        mapper(Boss, boss, inherits=Manager, polymorphic_identity='boss')
        mapper(Paperwork, paperwork)
    
    @testing.resolve_artifact_names
    def test_single_prop(self):
        sess = create_session()
    
        self.assert_compile(
            sess.query(Company).join(Company.employees),
            "SELECT companies.company_id AS companies_company_id, companies.name AS companies_name "
            "FROM companies JOIN people ON companies.company_id = people.company_id"
            , use_default_dialect = True
        )

    @testing.resolve_artifact_names
    def test_force_via_select_from(self):
        sess = create_session()

        self.assert_compile(
            sess.query(Company).\
                filter(Company.company_id==Engineer.company_id).\
                filter(Engineer.primary_language=='java'),
            "SELECT companies.company_id AS companies_company_id, companies.name AS companies_name "
            "FROM companies, people, engineers "
            "WHERE companies.company_id = people.company_id AND engineers.primary_language "
            "= :primary_language_1",
            use_default_dialect=True
        )

        self.assert_compile(
            sess.query(Company).select_from(Company, Engineer).\
                filter(Company.company_id==Engineer.company_id).\
                filter(Engineer.primary_language=='java'),
            "SELECT companies.company_id AS companies_company_id, companies.name AS companies_name "
            "FROM companies, people JOIN engineers ON people.person_id = engineers.person_id "
            "WHERE companies.company_id = people.company_id AND engineers.primary_language ="
            " :primary_language_1",
            use_default_dialect=True
            
        )
            
    @testing.resolve_artifact_names
    def test_single_prop_of_type(self):
        sess = create_session()

        self.assert_compile(
            sess.query(Company).join(Company.employees.of_type(Engineer)),
            "SELECT companies.company_id AS companies_company_id, companies.name AS companies_name "
            "FROM companies JOIN (SELECT people.person_id AS people_person_id, "
            "people.company_id AS people_company_id, people.name AS people_name, "
            "people.type AS people_type, engineers.person_id AS "
            "engineers_person_id, engineers.status AS engineers_status, "
            "engineers.engineer_name AS engineers_engineer_name, "
            "engineers.primary_language AS engineers_primary_language "
            "FROM people JOIN engineers ON people.person_id = engineers.person_id) AS "
            "anon_1 ON companies.company_id = anon_1.people_company_id"
            , use_default_dialect = True
        )

    @testing.resolve_artifact_names
    def test_prop_with_polymorphic(self):
        sess = create_session()
        
        self.assert_compile(
            sess.query(Person).with_polymorphic(Manager).
                    join('paperwork').filter(Paperwork.description.like('%review%')),
                "SELECT people.person_id AS people_person_id, people.company_id AS"
                " people_company_id, "
                "people.name AS people_name, people.type AS people_type, managers.person_id "
                "AS managers_person_id, "
                "managers.status AS managers_status, managers.manager_name AS "
                "managers_manager_name FROM people "
                "LEFT OUTER JOIN managers ON people.person_id = managers.person_id JOIN "
                "paperwork ON people.person_id = "
                "paperwork.person_id WHERE paperwork.description LIKE :description_1 "
                "ORDER BY people.person_id"
                , use_default_dialect=True
            )
        
        self.assert_compile(
            sess.query(Person).with_polymorphic(Manager).
                    join('paperwork', aliased=True).
                    filter(Paperwork.description.like('%review%')),
            "SELECT people.person_id AS people_person_id, people.company_id AS people_company_id, "
            "people.name AS people_name, people.type AS people_type, managers.person_id "
            "AS managers_person_id, "
            "managers.status AS managers_status, managers.manager_name AS managers_manager_name "
            "FROM people LEFT OUTER JOIN managers ON people.person_id = managers.person_id JOIN "
            "paperwork AS paperwork_1 ON people.person_id = paperwork_1.person_id "
            "WHERE paperwork_1.description LIKE :description_1 ORDER BY people.person_id"
            , use_default_dialect=True
        )

    @testing.resolve_artifact_names
    def test_explicit_polymorphic_join(self):
        sess = create_session()
        
        self.assert_compile(
            sess.query(Company).join(Engineer).filter(Engineer.engineer_name=='vlad'),
            "SELECT companies.company_id AS companies_company_id, companies.name AS "
            "companies_name "
            "FROM companies JOIN (SELECT people.person_id AS people_person_id, "
            "people.company_id AS "
            "people_company_id, people.name AS people_name, people.type AS people_type,"
            " engineers.person_id AS "
            "engineers_person_id, engineers.status AS engineers_status, "
            "engineers.engineer_name AS engineers_engineer_name, "
            "engineers.primary_language AS engineers_primary_language "
            "FROM people JOIN engineers ON people.person_id = engineers.person_id) "
            "AS anon_1 ON "
            "companies.company_id = anon_1.people_company_id "
            "WHERE anon_1.engineers_engineer_name = :engineer_name_1"
            , use_default_dialect=True
        )
        self.assert_compile(
            sess.query(Company).join((Engineer, Company.company_id==Engineer.company_id)).
                    filter(Engineer.engineer_name=='vlad'),
            "SELECT companies.company_id AS companies_company_id, companies.name "
            "AS companies_name "
            "FROM companies JOIN (SELECT people.person_id AS people_person_id, "
            "people.company_id AS "
            "people_company_id, people.name AS people_name, people.type AS "
            "people_type, engineers.person_id AS "
            "engineers_person_id, engineers.status AS engineers_status, "
            "engineers.engineer_name AS engineers_engineer_name, "
            "engineers.primary_language AS engineers_primary_language "
            "FROM people JOIN engineers ON people.person_id = engineers.person_id) AS "
            "anon_1 ON "
            "companies.company_id = anon_1.people_company_id "
            "WHERE anon_1.engineers_engineer_name = :engineer_name_1"
            , use_default_dialect=True
        )

    @testing.resolve_artifact_names
    def test_multiple_adaption(self):
        """test that multiple filter() adapters get chained together "
        and work correctly within a multiple-entry join()."""
        
        sess = create_session()

        self.assert_compile(
            sess.query(Company).join((people.join(engineers), Company.employees)).
                filter(Engineer.name=='dilbert'),
            "SELECT companies.company_id AS companies_company_id, companies.name AS "
            "companies_name "
            "FROM companies JOIN (SELECT people.person_id AS people_person_id, "
            "people.company_id AS "
            "people_company_id, people.name AS people_name, people.type AS "
            "people_type, engineers.person_id "
            "AS engineers_person_id, engineers.status AS engineers_status, "
            "engineers.engineer_name AS engineers_engineer_name, "
            "engineers.primary_language AS engineers_primary_language FROM people "
            "JOIN engineers ON people.person_id = "
            "engineers.person_id) AS anon_1 ON companies.company_id = "
            "anon_1.people_company_id WHERE anon_1.people_name = :name_1"
            , use_default_dialect = True
        )
        
        mach_alias = machines.select()
        self.assert_compile(
            sess.query(Company).join((people.join(engineers), Company.employees), 
                                        (mach_alias, Engineer.machines)).
                filter(Engineer.name=='dilbert').filter(Machine.name=='foo'),
            "SELECT companies.company_id AS companies_company_id, companies.name AS "
            "companies_name "
            "FROM companies JOIN (SELECT people.person_id AS people_person_id, "
            "people.company_id AS "
            "people_company_id, people.name AS people_name, people.type AS people_type,"
            " engineers.person_id "
            "AS engineers_person_id, engineers.status AS engineers_status, "
            "engineers.engineer_name AS engineers_engineer_name, "
            "engineers.primary_language AS engineers_primary_language FROM people "
            "JOIN engineers ON people.person_id = "
            "engineers.person_id) AS anon_1 ON companies.company_id = "
            "anon_1.people_company_id JOIN "
            "(SELECT machines.machine_id AS machine_id, machines.name AS name, "
            "machines.engineer_id AS engineer_id "
            "FROM machines) AS anon_2 ON anon_1.engineers_person_id = anon_2.engineer_id "
            "WHERE anon_1.people_name = :name_1 AND anon_2.name = :name_2"
            , use_default_dialect = True
        )

class AddEntityEquivalenceTest(_base.MappedTest, AssertsCompiledSQL):
    run_setup_mappers = 'once'

    @classmethod
    def define_tables(cls, metadata):
        Table('a', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('name', String(50)),
            Column('type', String(20)),
            Column('bid', Integer, ForeignKey('b.id'))
        )

        Table('b', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('name', String(50)),
            Column('type', String(20))
        )

        Table('c', metadata,
            Column('id', Integer, ForeignKey('b.id'), primary_key=True),
            Column('age', Integer)
        )

        Table('d', metadata,
            Column('id', Integer, ForeignKey('a.id'), primary_key=True),
            Column('dede', Integer)
        )

    @classmethod
    @testing.resolve_artifact_names
    def setup_classes(cls):
        class A(_fixtures.Base):
            pass
            
        class B(_fixtures.Base):
            pass
        
        class C(B):
            pass
        
        class D(A):
            pass
            
        mapper(A, a, 
                    polymorphic_identity='a', 
                    polymorphic_on=a.c.type,
                    with_polymorphic= ('*', None),
                    properties={
                        'link':relation( B, uselist=False, backref='back')
                    })
        mapper(B, b, 
                    polymorphic_identity='b', 
                    polymorphic_on=b.c.type,
                    with_polymorphic= ('*', None)
                    )
        mapper(C, c, inherits=B, polymorphic_identity='c')
        mapper(D, d, inherits=A, polymorphic_identity='d')
        
    @classmethod
    @testing.resolve_artifact_names
    def insert_data(cls):
        sess = create_session()
        sess.add_all([
            B(name='b1'), 
            A(name='a1', link= C(name='c1',age=3)), 
            C(name='c2',age=6), 
            A(name='a2')
            ])
        sess.flush()
    
    @testing.resolve_artifact_names
    def test_add_entity_equivalence(self):
        sess = create_session()
        
        for q in [
            sess.query( A,B).join( A.link),
            sess.query( A).join( A.link).add_entity(B),
        ]:
            eq_(
                q.all(),
                [(
                    A(bid=2, id=1, name=u'a1', type=u'a'), 
                    C(age=3, id=2, name=u'c1', type=u'c')
                )]
            )

        for q in [
            sess.query( B,A).join( B.back),
            sess.query( B).join( B.back).add_entity(A),
            sess.query( B).add_entity(A).join( B.back)
        ]:
            eq_(
                q.all(),
                [(
                    C(age=3, id=2, name=u'c1', type=u'c'), 
                    A(bid=2, id=1, name=u'a1', type=u'a')
                )]
            )
        
class JoinTest(QueryTest, AssertsCompiledSQL):
    
    def test_single_name(self):
        sess = create_session()

        self.assert_compile(
            sess.query(User).join("orders"),
            "SELECT users.id AS users_id, users.name AS users_name "
            "FROM users JOIN orders ON users.id = orders.user_id"
            , use_default_dialect = True
        )

        assert_raises(
            sa_exc.InvalidRequestError,
            sess.query(User).join, "user",
        )

        self.assert_compile(
            sess.query(User).join("orders", "items"),
            "SELECT users.id AS users_id, users.name AS users_name FROM users "
            "JOIN orders ON users.id = orders.user_id JOIN order_items AS order_items_1 "
            "ON orders.id = order_items_1.order_id JOIN items ON items.id = order_items_1.item_id"
            , use_default_dialect=True
        )

        # test overlapping paths.   User->orders is used by both joins, but rendered once.
        self.assert_compile(
            sess.query(User).join("orders", "items").join("orders", "address"),
            "SELECT users.id AS users_id, users.name AS users_name FROM users JOIN orders "
            "ON users.id = orders.user_id JOIN order_items AS order_items_1 ON orders.id = "
            "order_items_1.order_id JOIN items ON items.id = order_items_1.item_id JOIN addresses "
            "ON addresses.id = orders.address_id"
            , use_default_dialect=True
        )
        
    def test_single_prop(self):
        sess = create_session()
        self.assert_compile(
            sess.query(User).join(User.orders),
            "SELECT users.id AS users_id, users.name AS users_name "
            "FROM users JOIN orders ON users.id = orders.user_id"
            , use_default_dialect=True
        )

        self.assert_compile(
            sess.query(User).join(Order.user),
            "SELECT users.id AS users_id, users.name AS users_name "
            "FROM orders JOIN users ON users.id = orders.user_id"
            , use_default_dialect=True
        )

        oalias1 = aliased(Order)
        oalias2 = aliased(Order)

        self.assert_compile(
            sess.query(User).join(oalias1.user),
            "SELECT users.id AS users_id, users.name AS users_name "
            "FROM orders AS orders_1 JOIN users ON users.id = orders_1.user_id"
            , use_default_dialect=True
        )
        
        # another nonsensical query.  (from [ticket:1537]).
        # in this case, the contract of "left to right" is honored
        self.assert_compile(
            sess.query(User).join(oalias1.user).join(oalias2.user),
            "SELECT users.id AS users_id, users.name AS users_name "
            "FROM orders AS orders_1 JOIN users ON users.id = orders_1.user_id, "
            "orders AS orders_2 JOIN users ON users.id = orders_2.user_id"
            , use_default_dialect=True
        )
        
        self.assert_compile(
            sess.query(User).join(User.orders, Order.items),
            "SELECT users.id AS users_id, users.name AS users_name FROM users "
            "JOIN orders ON users.id = orders.user_id JOIN order_items AS order_items_1 "
            "ON orders.id = order_items_1.order_id JOIN items ON items.id = order_items_1.item_id"
            , use_default_dialect=True
        )
        
        ualias = aliased(User)
        self.assert_compile(
            sess.query(ualias).join(ualias.orders),
            "SELECT users_1.id AS users_1_id, users_1.name AS users_1_name "
            "FROM users AS users_1 JOIN orders ON users_1.id = orders.user_id"
            , use_default_dialect=True
        )
        
        # this query is somewhat nonsensical.  the old system didn't render a correct
        # query for this.   In this case its the most faithful to what was asked -
        # there's no linkage between User.orders and "oalias", so two FROM elements
        # are generated.
        oalias = aliased(Order)
        self.assert_compile(
            sess.query(User).join(User.orders, oalias.items),
            "SELECT users.id AS users_id, users.name AS users_name FROM users "
            "JOIN orders ON users.id = orders.user_id, "
            "orders AS orders_1 JOIN order_items AS order_items_1 ON orders_1.id = order_items_1.order_id "
            "JOIN items ON items.id = order_items_1.item_id"
            , use_default_dialect=True
        )

        # same as before using an aliased() for User as well
        ualias = aliased(User)
        self.assert_compile(
            sess.query(ualias).join(ualias.orders, oalias.items),
            "SELECT users_1.id AS users_1_id, users_1.name AS users_1_name FROM users AS users_1 "
            "JOIN orders ON users_1.id = orders.user_id, "
            "orders AS orders_1 JOIN order_items AS order_items_1 ON orders_1.id = order_items_1.order_id "
            "JOIN items ON items.id = order_items_1.item_id"
            , use_default_dialect=True
        )

        self.assert_compile(
            sess.query(User).filter(User.name=='ed').from_self().join(User.orders),
            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS anon_1_users_name "
            "FROM (SELECT users.id AS users_id, users.name AS users_name "
            "FROM users "
            "WHERE users.name = :name_1) AS anon_1 JOIN orders ON anon_1.users_id = orders.user_id"
            , use_default_dialect=True
        )
        
        self.assert_compile(
            sess.query(User).join(User.addresses, aliased=True).filter(Address.email_address=='foo'),
            "SELECT users.id AS users_id, users.name AS users_name "
            "FROM users JOIN addresses AS addresses_1 ON users.id = addresses_1.user_id "
            "WHERE addresses_1.email_address = :email_address_1"
            , use_default_dialect=True
        )

        self.assert_compile(
            sess.query(User).join(User.orders, Order.items, aliased=True).filter(Item.id==10),
            "SELECT users.id AS users_id, users.name AS users_name "
            "FROM users JOIN orders AS orders_1 ON users.id = orders_1.user_id "
            "JOIN order_items AS order_items_1 ON orders_1.id = order_items_1.order_id "
            "JOIN items AS items_1 ON items_1.id = order_items_1.item_id "
            "WHERE items_1.id = :id_1"
            , use_default_dialect=True
        )
        
        # test #1 for [ticket:1706]
        ualias = aliased(User)
        self.assert_compile(
            sess.query(ualias).
                    join((oalias1, ualias.orders)).\
                    join((Address, ualias.addresses)),
            "SELECT users_1.id AS users_1_id, users_1.name AS "
            "users_1_name FROM users AS users_1 JOIN orders AS orders_1 "
            "ON users_1.id = orders_1.user_id JOIN addresses ON users_1.id "
            "= addresses.user_id"
            , use_default_dialect=True
        )
        
        # test #2 for [ticket:1706]
        ualias2 = aliased(User)
        self.assert_compile(
            sess.query(ualias).
                    join((Address, ualias.addresses)).
                    join((ualias2, Address.user)).
                    join((Order, ualias.orders)),
            "SELECT users_1.id AS users_1_id, users_1.name AS users_1_name FROM users "
            "AS users_1 JOIN addresses ON users_1.id = addresses.user_id JOIN users AS users_2 "
            "ON users_2.id = addresses.user_id JOIN orders ON users_1.id = orders.user_id"
            , use_default_dialect=True
        )
        
    def test_overlapping_paths(self):
        for aliased in (True,False):
            # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
            result = create_session().query(User).join('orders', 'items', aliased=aliased).\
                    filter_by(id=3).join('orders','address', aliased=aliased).filter_by(id=1).all()
            assert [User(id=7, name='jack')] == result

    def test_overlapping_paths_outerjoin(self):
        result = create_session().query(User).outerjoin('orders', 'items').\
                filter_by(id=3).outerjoin('orders','address').filter_by(id=1).all()
        assert [User(id=7, name='jack')] == result
    
    def test_from_joinpoint(self):
        sess = create_session()
        
        for oalias,ialias in [(True, True), (False, False), (True, False), (False, True)]:
            eq_(
                sess.query(User).join('orders', aliased=oalias).join('items', from_joinpoint=True, aliased=ialias).filter(Item.description == 'item 4').all(),
                [User(name='jack')]
            )

            # use middle criterion
            eq_(
                sess.query(User).join('orders', aliased=oalias).filter(Order.user_id==9).join('items', from_joinpoint=True, aliased=ialias).filter(Item.description=='item 4').all(),
                []
            )
        
        orderalias = aliased(Order)
        itemalias = aliased(Item)
        eq_(
            sess.query(User).join(('orders', orderalias), ('items', itemalias)).filter(itemalias.description == 'item 4').all(),
            [User(name='jack')]
        )
        eq_(
            sess.query(User).join(('orders', orderalias), ('items', itemalias)).filter(orderalias.user_id==9).filter(itemalias.description=='item 4').all(),
            []
        )
    
    def test_join_nonmapped_column(self):
        """test that the search for a 'left' doesn't trip on non-mapped cols"""
        sess = create_session()
        
        # intentionally join() with a non-existent "left" side
        self.assert_compile(
            sess.query(User.id, literal_column('foo')).join(Order.user),
            "SELECT users.id AS users_id, foo FROM orders JOIN users ON users.id = orders.user_id"
            , use_default_dialect=True
        )
        
        
        
    def test_backwards_join(self):
        # a more controversial feature.  join from
        # User->Address, but the onclause is Address.user.
        
        sess = create_session()

        eq_(
            sess.query(User).join(Address.user).filter(Address.email_address=='ed@wood.com').all(),
            [User(id=8,name=u'ed')]
        )

        # its actually not so controversial if you view it in terms
        # of multiple entities.
        eq_(
            sess.query(User, Address).join(Address.user).filter(Address.email_address=='ed@wood.com').all(),
            [(User(id=8,name=u'ed'), Address(email_address='ed@wood.com'))]
        )
        
        # this was the controversial part.  now, raise an error if the feature is abused.
        # before the error raise was added, this would silently work.....
        assert_raises(
            sa_exc.InvalidRequestError,
            sess.query(User).join, (Address, Address.user),
        )

        # but this one would silently fail 
        adalias = aliased(Address)
        assert_raises(
            sa_exc.InvalidRequestError,
            sess.query(User).join, (adalias, Address.user),
        )
        
    def test_multiple_with_aliases(self):
        sess = create_session()
        
        ualias = aliased(User)
        oalias1 = aliased(Order)
        oalias2 = aliased(Order)
        self.assert_compile(
            sess.query(ualias).join((oalias1, ualias.orders), (oalias2, ualias.orders)).\
                    filter(or_(oalias1.user_id==9, oalias2.user_id==7)),
            "SELECT users_1.id AS users_1_id, users_1.name AS users_1_name FROM users AS users_1 "
            "JOIN orders AS orders_1 ON users_1.id = orders_1.user_id JOIN orders AS orders_2 ON "
            "users_1.id = orders_2.user_id WHERE orders_1.user_id = :user_id_1 OR orders_2.user_id = :user_id_2",
            use_default_dialect=True
        )

    def test_select_from_orm_joins(self):
        sess = create_session()
        
        ualias = aliased(User)
        oalias1 = aliased(Order)
        oalias2 = aliased(Order)

        self.assert_compile(
            join(User, oalias2, User.id==oalias2.user_id),
            "users JOIN orders AS orders_1 ON users.id = orders_1.user_id",
            use_default_dialect=True
        )

        self.assert_compile(
            join(ualias, oalias1, ualias.orders),
            "users AS users_1 JOIN orders AS orders_1 ON users_1.id = orders_1.user_id",
            use_default_dialect=True
        )

        self.assert_compile(
            sess.query(ualias).select_from(join(ualias, oalias1, ualias.orders)),
            "SELECT users_1.id AS users_1_id, users_1.name AS users_1_name FROM users AS users_1 "
            "JOIN orders AS orders_1 ON users_1.id = orders_1.user_id",
            use_default_dialect=True
        )

        self.assert_compile(
            sess.query(User, ualias).select_from(join(ualias, oalias1, ualias.orders)),
            "SELECT users.id AS users_id, users.name AS users_name, users_1.id AS users_1_id, "
            "users_1.name AS users_1_name FROM users, users AS users_1 JOIN orders AS orders_1 ON users_1.id = orders_1.user_id",
            use_default_dialect=True
        )

        # this fails (and we cant quite fix right now).
        if False:
            self.assert_compile(
                sess.query(User, ualias).\
                        join((oalias1, ualias.orders)).\
                        join((oalias2, User.id==oalias2.user_id)).\
                        filter(or_(oalias1.user_id==9, oalias2.user_id==7)),
                "SELECT users.id AS users_id, users.name AS users_name, users_1.id AS users_1_id, users_1.name AS "
                "users_1_name FROM users JOIN orders AS orders_2 ON users.id = orders_2.user_id, "
                "users AS users_1 JOIN orders AS orders_1 ON users_1.id = orders_1.user_id  "
                "WHERE orders_1.user_id = :user_id_1 OR orders_2.user_id = :user_id_2",
                use_default_dialect=True
            )

        # this is the same thing using explicit orm.join() (which now offers multiple again)
        self.assert_compile(
            sess.query(User, ualias).\
                    select_from(
                        join(ualias, oalias1, ualias.orders),
                        join(User, oalias2, User.id==oalias2.user_id),
                    ).\
                    filter(or_(oalias1.user_id==9, oalias2.user_id==7)),
            "SELECT users.id AS users_id, users.name AS users_name, users_1.id AS users_1_id, users_1.name AS "
            "users_1_name FROM users AS users_1 JOIN orders AS orders_1 ON users_1.id = orders_1.user_id, "
            "users JOIN orders AS orders_2 ON users.id = orders_2.user_id "
            "WHERE orders_1.user_id = :user_id_1 OR orders_2.user_id = :user_id_2",
            
            use_default_dialect=True
        )
        
        
    def test_overlapping_backwards_joins(self):
        sess = create_session()

        oalias1 = aliased(Order)
        oalias2 = aliased(Order)
        
        # this is invalid SQL - joins from orders_1/orders_2 to User twice.  
        # but that is what was asked for so they get it !
        self.assert_compile(
            sess.query(User).join(oalias1.user).join(oalias2.user),
            "SELECT users.id AS users_id, users.name AS users_name FROM orders AS orders_1 "
            "JOIN users ON users.id = orders_1.user_id, orders AS orders_2 JOIN users ON users.id = orders_2.user_id",
            use_default_dialect=True,
        )

    def test_replace_multiple_from_clause(self):
        """test adding joins onto multiple FROM clauses"""
        
        sess = create_session()
        
        self.assert_compile(
            sess.query(Address, User).join(Address.dingaling).join(User.orders, Order.items),
            "SELECT addresses.id AS addresses_id, addresses.user_id AS addresses_user_id, "
            "addresses.email_address AS addresses_email_address, users.id AS users_id, "
            "users.name AS users_name FROM addresses JOIN dingalings ON addresses.id = dingalings.address_id, "
            "users JOIN orders ON users.id = orders.user_id JOIN order_items AS order_items_1 "
            "ON orders.id = order_items_1.order_id JOIN items ON items.id = order_items_1.item_id",
            use_default_dialect = True
        )
    
    def test_multiple_adaption(self):
        sess = create_session()

        self.assert_compile(
            sess.query(User).join(User.orders, Order.items, aliased=True).filter(Order.id==7).filter(Item.id==8),
            "SELECT users.id AS users_id, users.name AS users_name FROM users JOIN orders AS orders_1 "
            "ON users.id = orders_1.user_id JOIN order_items AS order_items_1 ON orders_1.id = order_items_1.order_id "
            "JOIN items AS items_1 ON items_1.id = order_items_1.item_id WHERE orders_1.id = :id_1 AND items_1.id = :id_2",
            use_default_dialect=True
        )
    
    def test_onclause_conditional_adaption(self):
        sess = create_session()

        self.assert_compile(
            sess.query(User).join(User.orders, 
                (Item, 
                    and_(Order.id==order_items.c.order_id, order_items.c.item_id==Item.id)
                ),aliased=True
                ),
            "SELECT users.id AS users_id, users.name AS users_name FROM users JOIN "
            "orders AS orders_1 ON users.id = orders_1.user_id JOIN items AS items_1 "
            "ON orders_1.id = order_items.order_id AND order_items.item_id = items_1.id",
            use_default_dialect=True
        )
        
        oalias = orders.select()
        self.assert_compile(
            sess.query(User).join((oalias, User.orders), 
                (Item, 
                    and_(Order.id==order_items.c.order_id, order_items.c.item_id==Item.id)
                ),
                ),
            "SELECT users.id AS users_id, users.name AS users_name FROM users JOIN "
            "(SELECT orders.id AS id, orders.user_id AS user_id, orders.address_id AS address_id, orders.description "
            "AS description, orders.isopen AS isopen FROM orders) AS anon_1 ON users.id = anon_1.user_id JOIN items "
            "ON anon_1.id = order_items.order_id AND order_items.item_id = items.id",
            use_default_dialect=True
        )
        
        
        # query.join(<stuff>, aliased=True).join((target, sql_expression))
        # or: query.join(path_to_some_joined_table_mapper).join((target, sql_expression))
        
    def test_pure_expression_error(self):
        sess = create_session()
        
        assert_raises_message(sa.exc.InvalidRequestError, "Could not find a FROM clause to join from", sess.query(users).join, addresses)
        
        
    def test_orderby_arg_bug(self):
        sess = create_session()
        # no arg error
        result = sess.query(User).join('orders', aliased=True).order_by(Order.id).reset_joinpoint().order_by(users.c.id).all()
    
    def test_no_onclause(self):
        sess = create_session()

        eq_(
            sess.query(User).select_from(join(User, Order).join(Item, Order.items)).filter(Item.description == 'item 4').all(),
            [User(name='jack')]
        )

        eq_(
            sess.query(User.name).select_from(join(User, Order).join(Item, Order.items)).filter(Item.description == 'item 4').all(),
            [('jack',)]
        )

        eq_(
            sess.query(User).join(Order, (Item, Order.items)).filter(Item.description == 'item 4').all(),
            [User(name='jack')]
        )
        
    def test_clause_onclause(self):
        sess = create_session()

        eq_(
            sess.query(User).join(
                (Order, User.id==Order.user_id), 
                (order_items, Order.id==order_items.c.order_id), 
                (Item, order_items.c.item_id==Item.id)
            ).filter(Item.description == 'item 4').all(),
            [User(name='jack')]
        )

        eq_(
            sess.query(User.name).join(
                (Order, User.id==Order.user_id), 
                (order_items, Order.id==order_items.c.order_id), 
                (Item, order_items.c.item_id==Item.id)
            ).filter(Item.description == 'item 4').all(),
            [('jack',)]
        )

        ualias = aliased(User)
        eq_(
            sess.query(ualias.name).join(
                (Order, ualias.id==Order.user_id), 
                (order_items, Order.id==order_items.c.order_id), 
                (Item, order_items.c.item_id==Item.id)
            ).filter(Item.description == 'item 4').all(),
            [('jack',)]
        )

        # explicit onclause with from_self(), means
        # the onclause must be aliased against the query's custom
        # FROM object
        eq_(
            sess.query(User).order_by(User.id).offset(2).from_self().join(
                (Order, User.id==Order.user_id)
            ).all(),
            [User(name='fred')]
        )

        # same with an explicit select_from()
        eq_(
            sess.query(User).select_from(select([users]).order_by(User.id).offset(2).alias()).join(
                (Order, User.id==Order.user_id)
            ).all(),
            [User(name='fred')]
        )
        
        
    def test_aliased_classes(self):
        sess = create_session()

        (user7, user8, user9, user10) = sess.query(User).all()
        (address1, address2, address3, address4, address5) = sess.query(Address).all()
        expected = [(user7, address1),
            (user8, address2),
            (user8, address3),
            (user8, address4),
            (user9, address5),
            (user10, None)]

        q = sess.query(User)
        AdAlias = aliased(Address)
        q = q.add_entity(AdAlias).select_from(outerjoin(User, AdAlias))
        l = q.order_by(User.id, AdAlias.id).all()
        eq_(l, expected)

        sess.expunge_all()

        q = sess.query(User).add_entity(AdAlias)
        l = q.select_from(outerjoin(User, AdAlias)).filter(AdAlias.email_address=='ed@bettyboop.com').all()
        eq_(l, [(user8, address3)])

        l = q.select_from(outerjoin(User, AdAlias, 'addresses')).filter(AdAlias.email_address=='ed@bettyboop.com').all()
        eq_(l, [(user8, address3)])

        l = q.select_from(outerjoin(User, AdAlias, User.id==AdAlias.user_id)).filter(AdAlias.email_address=='ed@bettyboop.com').all()
        eq_(l, [(user8, address3)])

        # this is the first test where we are joining "backwards" - from AdAlias to User even though
        # the query is against User
        q = sess.query(User, AdAlias)
        l = q.join(AdAlias.user).filter(User.name=='ed')
        eq_(l.all(), [(user8, address2),(user8, address3),(user8, address4),])

        q = sess.query(User, AdAlias).select_from(join(AdAlias, User, AdAlias.user)).filter(User.name=='ed')
        eq_(l.all(), [(user8, address2),(user8, address3),(user8, address4),])
        
    def test_implicit_joins_from_aliases(self):
        sess = create_session()
        OrderAlias = aliased(Order)

        eq_(
            sess.query(OrderAlias).join('items').filter_by(description='item 3').\
                order_by(OrderAlias.id).all(),
            [
                Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1), 
                Order(address_id=4,description=u'order 2',isopen=0,user_id=9,id=2), 
                Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3)
            ]
        )
         
        eq_(
            sess.query(User, OrderAlias, Item.description).join(('orders', OrderAlias), 'items').filter_by(description='item 3').\
                order_by(User.id, OrderAlias.id).all(),
            [
                (User(name=u'jack',id=7), Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1), u'item 3'), 
                (User(name=u'jack',id=7), Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3), u'item 3'), 
                (User(name=u'fred',id=9), Order(address_id=4,description=u'order 2',isopen=0,user_id=9,id=2), u'item 3')
            ]
        )   
        
    def test_aliased_classes_m2m(self):
        sess = create_session()
        
        (order1, order2, order3, order4, order5) = sess.query(Order).all()
        (item1, item2, item3, item4, item5) = sess.query(Item).all()
        expected = [
            (order1, item1),
            (order1, item2),
            (order1, item3),
            (order2, item1),
            (order2, item2),
            (order2, item3),
            (order3, item3),
            (order3, item4),
            (order3, item5),
            (order4, item1),
            (order4, item5),
            (order5, item5),
        ]
        
        q = sess.query(Order)
        q = q.add_entity(Item).select_from(join(Order, Item, 'items')).order_by(Order.id, Item.id)
        l = q.all()
        eq_(l, expected)

        IAlias = aliased(Item)
        q = sess.query(Order, IAlias).select_from(join(Order, IAlias, 'items')).filter(IAlias.description=='item 3')
        l = q.all()
        eq_(l, 
            [
                (order1, item3),
                (order2, item3),
                (order3, item3),
            ]
        )
        
    def test_reset_joinpoint(self):
        for aliased in (True, False):
            # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
            result = create_session().query(User).join('orders', 'items', aliased=aliased).filter_by(id=3).reset_joinpoint().join('orders','address', aliased=aliased).filter_by(id=1).all()
            assert [User(id=7, name='jack')] == result

            result = create_session().query(User).outerjoin('orders', 'items', aliased=aliased).filter_by(id=3).reset_joinpoint().outerjoin('orders','address', aliased=aliased).filter_by(id=1).all()
            assert [User(id=7, name='jack')] == result
    
    def test_overlap_with_aliases(self):
        oalias = orders.alias('oalias')

        result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_(["order 1", "order 2", "order 3"])).join('orders', 'items').order_by(User.id).all()
        assert [User(id=7, name='jack'), User(id=9, name='fred')] == result

        result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_(["order 1", "order 2", "order 3"])).join('orders', 'items').filter_by(id=4).all()
        assert [User(id=7, name='jack')] == result

    def test_aliased(self):
        """test automatic generation of aliased joins."""

        sess = create_session()

        # test a basic aliasized path
        q = sess.query(User).join('addresses', aliased=True).filter_by(email_address='jack@bean.com')
        assert [User(id=7)] == q.all()

        q = sess.query(User).join('addresses', aliased=True).filter(Address.email_address=='jack@bean.com')
        assert [User(id=7)] == q.all()

        q = sess.query(User).join('addresses', aliased=True).filter(or_(Address.email_address=='jack@bean.com', Address.email_address=='fred@fred.com'))
        assert [User(id=7), User(id=9)] == q.all()

        # test two aliasized paths, one to 'orders' and the other to 'orders','items'.
        # one row is returned because user 7 has order 3 and also has order 1 which has item 1
        # this tests a o2m join and a m2m join.
        q = sess.query(User).join('orders', aliased=True).filter(Order.description=="order 3").join('orders', 'items', aliased=True).filter(Item.description=="item 1")
        assert q.count() == 1
        assert [User(id=7)] == q.all()

        # test the control version - same joins but not aliased.  rows are not returned because order 3 does not have item 1
        q = sess.query(User).join('orders').filter(Order.description=="order 3").join('orders', 'items').filter(Item.description=="item 1")
        assert [] == q.all()
        assert q.count() == 0

        # the left half of the join condition of the any() is aliased.
        q = sess.query(User).join('orders', aliased=True).filter(Order.items.any(Item.description=='item 4'))
        assert [User(id=7)] == q.all()
        
        # test that aliasing gets reset when join() is called
        q = sess.query(User).join('orders', aliased=True).filter(Order.description=="order 3").join('orders', aliased=True).filter(Order.description=="order 5")
        assert q.count() == 1
        assert [User(id=7)] == q.all()

    def test_aliased_order_by(self):
        sess = create_session()

        ualias = aliased(User)
        eq_(
            sess.query(User, ualias).filter(User.id > ualias.id).order_by(desc(ualias.id), User.name).all(),
            [
                (User(id=10,name=u'chuck'), User(id=9,name=u'fred')), 
                (User(id=10,name=u'chuck'), User(id=8,name=u'ed')), 
                (User(id=9,name=u'fred'), User(id=8,name=u'ed')), 
                (User(id=10,name=u'chuck'), User(id=7,name=u'jack')), 
                (User(id=8,name=u'ed'), User(id=7,name=u'jack')),
                (User(id=9,name=u'fred'), User(id=7,name=u'jack'))
            ]
        )

    def test_plain_table(self):
            
        sess = create_session()
        
        eq_(
            sess.query(User.name).join((addresses, User.id==addresses.c.user_id)).order_by(User.id).all(),
            [(u'jack',), (u'ed',), (u'ed',), (u'ed',), (u'fred',)]
        )
        
    def test_from_self_resets_joinpaths(self):
        """test a join from from_self() doesn't confuse joins inside the subquery
        with the outside.
        """
        sess = create_session()
        
        self.assert_compile(
            sess.query(Item).join(Item.keywords).from_self(Keyword).join(Item.keywords),
            "SELECT keywords.id AS keywords_id, keywords.name AS keywords_name FROM "
            "(SELECT items.id AS items_id, items.description AS items_description "
            "FROM items JOIN item_keywords AS item_keywords_1 ON items.id = "
            "item_keywords_1.item_id JOIN keywords ON keywords.id = item_keywords_1.keyword_id) "
            "AS anon_1 JOIN item_keywords AS item_keywords_2 ON "
            "anon_1.items_id = item_keywords_2.item_id "
            "JOIN keywords ON "
            "keywords.id = item_keywords_2.keyword_id",
            use_default_dialect=True
        )
        
        
class MultiplePathTest(_base.MappedTest, AssertsCompiledSQL):
    @classmethod
    def define_tables(cls, metadata):
        global t1, t2, t1t2_1, t1t2_2
        t1 = Table('t1', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(30))
            )
        t2 = Table('t2', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(30))
            )

        t1t2_1 = Table('t1t2_1', metadata,
            Column('t1id', Integer, ForeignKey('t1.id')),
            Column('t2id', Integer, ForeignKey('t2.id'))
            )

        t1t2_2 = Table('t1t2_2', metadata,
            Column('t1id', Integer, ForeignKey('t1.id')),
            Column('t2id', Integer, ForeignKey('t2.id'))
            )

    def test_basic(self):
        class T1(object):pass
        class T2(object):pass

        mapper(T1, t1, properties={
            't2s_1':relationship(T2, secondary=t1t2_1),
            't2s_2':relationship(T2, secondary=t1t2_2),
        })
        mapper(T2, t2)

        q = create_session().query(T1).join('t2s_1').filter(t2.c.id==5).reset_joinpoint().join('t2s_2')
        self.assert_compile(
            q,
            "SELECT t1.id AS t1_id, t1.data AS t1_data FROM t1 JOIN t1t2_1 AS t1t2_1_1 "
            "ON t1.id = t1t2_1_1.t1id JOIN t2 ON t2.id = t1t2_1_1.t2id JOIN t1t2_2 AS t1t2_2_1 "
            "ON t1.id = t1t2_2_1.t1id JOIN t2 ON t2.id = t1t2_2_1.t2id WHERE t2.id = :id_1"
            , use_default_dialect=True
        )


class SynonymTest(QueryTest):

    @classmethod
    def setup_mappers(cls):
        mapper(User, users, properties={
            'name_syn':synonym('name'),
            'addresses':relationship(Address),
            'orders':relationship(Order, backref='user'), # o2m, m2o
            'orders_syn':synonym('orders')
        })
        mapper(Address, addresses)
        mapper(Order, orders, properties={
            'items':relationship(Item, secondary=order_items),  #m2m
            'address':relationship(Address),  # m2o
            'items_syn':synonym('items')
        })
        mapper(Item, items, properties={
            'keywords':relationship(Keyword, secondary=item_keywords) #m2m
        })
        mapper(Keyword, keywords)

    def test_joins(self):
        for j in (
            ['orders', 'items'],
            ['orders_syn', 'items'],
            ['orders', 'items_syn'],
            ['orders_syn', 'items_syn'],
        ):
            result = create_session().query(User).join(*j).filter_by(id=3).all()
            assert [User(id=7, name='jack'), User(id=9, name='fred')] == result

    def test_with_parent(self):
        for nameprop, orderprop in (
            ('name', 'orders'),
            ('name_syn', 'orders'),
            ('name', 'orders_syn'),
            ('name_syn', 'orders_syn'),
        ):
            sess = create_session()
            q = sess.query(User)

            u1 = q.filter_by(**{nameprop:'jack'}).one()

            o = sess.query(Order).with_parent(u1, property=orderprop).all()
            assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o

class InstancesTest(QueryTest, AssertsCompiledSQL):

    def test_from_alias(self):

        query = users.select(users.c.id==7).union(users.select(users.c.id>7)).alias('ulist').outerjoin(addresses).select(use_labels=True,order_by=['ulist.id', addresses.c.id])
        sess =create_session()
        q = sess.query(User)

        def go():
            l = list(q.options(contains_alias('ulist'), contains_eager('addresses')).instances(query.execute()))
            assert self.static.user_address_result == l
        self.assert_sql_count(testing.db, go, 1)

        sess.expunge_all()

        def go():
            l = q.options(contains_alias('ulist'), contains_eager('addresses')).from_statement(query).all()
            assert self.static.user_address_result == l
        self.assert_sql_count(testing.db, go, 1)

        # better way.  use select_from()
        def go():
            l = sess.query(User).select_from(query).options(contains_eager('addresses')).all()
            assert self.static.user_address_result == l
        self.assert_sql_count(testing.db, go, 1)

        # same thing, but alias addresses, so that the adapter generated by select_from() is wrapped within
        # the adapter created by contains_eager()
        adalias = addresses.alias()
        query = users.select(users.c.id==7).union(users.select(users.c.id>7)).alias('ulist').outerjoin(adalias).select(use_labels=True,order_by=['ulist.id', adalias.c.id])
        def go():
            l = sess.query(User).select_from(query).options(contains_eager('addresses', alias=adalias)).all()
            assert self.static.user_address_result == l
        self.assert_sql_count(testing.db, go, 1)

    def test_contains_eager(self):
        sess = create_session()

        # test that contains_eager suppresses the normal outer join rendering
        q = sess.query(User).outerjoin(User.addresses).options(contains_eager(User.addresses)).order_by(User.id)
        self.assert_compile(q.with_labels().statement, 
            "SELECT addresses.id AS addresses_id, addresses.user_id AS addresses_user_id, "\
            "addresses.email_address AS addresses_email_address, users.id AS users_id, "\
            "users.name AS users_name FROM users LEFT OUTER JOIN addresses "\
            "ON users.id = addresses.user_id ORDER BY users.id"
            , dialect=default.DefaultDialect())
                
        def go():
            assert self.static.user_address_result == q.all()
        self.assert_sql_count(testing.db, go, 1)
        sess.expunge_all()

        adalias = addresses.alias()
        q = sess.query(User).select_from(users.outerjoin(adalias)).options(contains_eager(User.addresses, alias=adalias))
        def go():
            eq_(self.static.user_address_result, q.order_by(User.id).all())
        self.assert_sql_count(testing.db, go, 1)
        sess.expunge_all()

        selectquery = users.outerjoin(addresses).select(users.c.id<10, use_labels=True, order_by=[users.c.id, addresses.c.id])
        q = sess.query(User)

        def go():
            l = list(q.options(contains_eager('addresses')).instances(selectquery.execute()))
            assert self.static.user_address_result[0:3] == l
        self.assert_sql_count(testing.db, go, 1)

        sess.expunge_all()

        def go():
            l = list(q.options(contains_eager(User.addresses)).instances(selectquery.execute()))
            assert self.static.user_address_result[0:3] == l
        self.assert_sql_count(testing.db, go, 1)
        sess.expunge_all()

        def go():
            l = q.options(contains_eager('addresses')).from_statement(selectquery).all()
            assert self.static.user_address_result[0:3] == l
        self.assert_sql_count(testing.db, go, 1)

    def test_contains_eager_alias(self):
        adalias = addresses.alias('adalias')
        selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.id, adalias.c.id])
        sess = create_session()
        q = sess.query(User)
    
        # string alias name
        def go():
            l = list(q.options(contains_eager('addresses', alias="adalias")).instances(selectquery.execute()))
            assert self.static.user_address_result == l
        self.assert_sql_count(testing.db, go, 1)
        sess.expunge_all()

        # expression.Alias object
        def go():
            l = list(q.options(contains_eager('addresses', alias=adalias)).instances(selectquery.execute()))
            assert self.static.user_address_result == l
        self.assert_sql_count(testing.db, go, 1)

        sess.expunge_all()

        # Aliased object
        adalias = aliased(Address)
        def go():
            l = q.options(contains_eager('addresses', alias=adalias)).outerjoin((adalias, User.addresses)).order_by(User.id, adalias.id)
            assert self.static.user_address_result == l.all()
        self.assert_sql_count(testing.db, go, 1)
        sess.expunge_all()

        oalias = orders.alias('o1')
        ialias = items.alias('i1')
        query = users.outerjoin(oalias).outerjoin(order_items).outerjoin(ialias).select(use_labels=True).order_by(users.c.id, oalias.c.id, ialias.c.id)
        q = create_session().query(User)
        # test using string alias with more than one level deep
        def go():
            l = list(q.options(contains_eager('orders', alias='o1'), contains_eager('orders.items', alias='i1')).instances(query.execute()))
            assert self.static.user_order_result == l
        self.assert_sql_count(testing.db, go, 1)

        sess.expunge_all()

        # test using Alias with more than one level deep
        def go():
            l = list(q.options(contains_eager('orders', alias=oalias), contains_eager('orders.items', alias=ialias)).instances(query.execute()))
            assert self.static.user_order_result == l
        self.assert_sql_count(testing.db, go, 1)
        sess.expunge_all()

        # test using Aliased with more than one level deep
        oalias = aliased(Order)
        ialias = aliased(Item)
        def go():
            l = q.options(contains_eager(User.orders, alias=oalias), contains_eager(User.orders, Order.items, alias=ialias)).\
                outerjoin((oalias, User.orders), (ialias, oalias.items)).order_by(User.id, oalias.id, ialias.id)
            assert self.static.user_order_result == l.all()
        self.assert_sql_count(testing.db, go, 1)
        sess.expunge_all()

    def test_mixed_eager_contains_with_limit(self):
        sess = create_session()
    
        q = sess.query(User)
        def go():
            # outerjoin to User.orders, offset 1/limit 2 so we get user 7 + second two orders.
            # then joinedload the addresses.  User + Order columns go into the subquery, address
            # left outer joins to the subquery, joinedloader for User.orders applies context.adapter 
            # to result rows.  This was [ticket:1180].
            l = q.outerjoin(User.orders).options(joinedload(User.addresses), contains_eager(User.orders)).order_by(User.id, Order.id).offset(1).limit(2).all()
            eq_(l, [User(id=7,
            addresses=[Address(email_address=u'jack@bean.com',user_id=7,id=1)],
            name=u'jack',
            orders=[
                Order(address_id=1,user_id=7,description=u'order 3',isopen=1,id=3), 
                Order(address_id=None,user_id=7,description=u'order 5',isopen=0,id=5)
            ])])
        self.assert_sql_count(testing.db, go, 1)
        sess.expunge_all()

        def go():
            # same as above, except Order is aliased, so two adapters are applied by the
            # eager loader
            oalias = aliased(Order)
            l = q.outerjoin((User.orders, oalias)).options(joinedload(User.addresses), contains_eager(User.orders, alias=oalias)).order_by(User.id, oalias.id).offset(1).limit(2).all()
            eq_(l, [User(id=7,
            addresses=[Address(email_address=u'jack@bean.com',user_id=7,id=1)],
            name=u'jack',
            orders=[
                Order(address_id=1,user_id=7,description=u'order 3',isopen=1,id=3), 
                Order(address_id=None,user_id=7,description=u'order 5',isopen=0,id=5)
            ])])
        self.assert_sql_count(testing.db, go, 1)
    
    
class MixedEntitiesTest(QueryTest, AssertsCompiledSQL):

    def test_values(self):
        sess = create_session()

        assert list(sess.query(User).values()) == list()

        sel = users.select(User.id.in_([7, 8])).alias()
        q = sess.query(User)
        q2 = q.select_from(sel).values(User.name)
        eq_(list(q2), [(u'jack',), (u'ed',)])
    
        q = sess.query(User)
        q2 = q.order_by(User.id).values(User.name, User.name + " " + cast(User.id, String(50)))
        eq_(list(q2), [(u'jack', u'jack 7'), (u'ed', u'ed 8'), (u'fred', u'fred 9'), (u'chuck', u'chuck 10')])
    
        q2 = q.join('addresses').filter(User.name.like('%e%')).order_by(User.id, Address.id).values(User.name, Address.email_address)
        eq_(list(q2), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@bettyboop.com'), (u'ed', u'ed@lala.com'), (u'fred', u'fred@fred.com')])
    
        q2 = q.join('addresses').filter(User.name.like('%e%')).order_by(desc(Address.email_address)).slice(1, 3).values(User.name, Address.email_address)
        eq_(list(q2), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@lala.com')])
    
        adalias = aliased(Address)
        q2 = q.join(('addresses', adalias)).filter(User.name.like('%e%')).values(User.name, adalias.email_address)
        eq_(list(q2), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@bettyboop.com'), (u'ed', u'ed@lala.com'), (u'fred', u'fred@fred.com')])
    
        q2 = q.values(func.count(User.name))
        assert q2.next() == (4,)

        q2 = q.select_from(sel).filter(User.id==8).values(User.name, sel.c.name, User.name)
        eq_(list(q2), [(u'ed', u'ed', u'ed')])

        # using User.xxx is alised against "sel", so this query returns nothing
        q2 = q.select_from(sel).filter(User.id==8).filter(User.id>sel.c.id).values(User.name, sel.c.name, User.name)
        eq_(list(q2), [])

        # whereas this uses users.c.xxx, is not aliased and creates a new join
        q2 = q.select_from(sel).filter(users.c.id==8).filter(users.c.id>sel.c.id).values(users.c.name, sel.c.name, User.name)
        eq_(list(q2), [(u'ed', u'jack', u'jack')])

    @testing.fails_on('mssql', 'FIXME: unknown')
    def test_values_specific_order_by(self):
        sess = create_session()

        assert list(sess.query(User).values()) == list()

        sel = users.select(User.id.in_([7, 8])).alias()
        q = sess.query(User)
        u2 = aliased(User)
        q2 = q.select_from(sel).filter(u2.id>1).order_by(User.id, sel.c.id, u2.id).values(User.name, sel.c.name, u2.name)
        eq_(list(q2), [(u'jack', u'jack', u'jack'), (u'jack', u'jack', u'ed'), (u'jack', u'jack', u'fred'), (u'jack', u'jack', u'chuck'), (u'ed', u'ed', u'jack'), (u'ed', u'ed', u'ed'), (u'ed', u'ed', u'fred'), (u'ed', u'ed', u'chuck')])

    @testing.fails_on('mssql', 'FIXME: unknown')
    @testing.fails_on('oracle', "Oracle doesn't support boolean expressions as columns")
    @testing.fails_on('postgresql+pg8000', "pg8000 parses the SQL itself before passing on to PG, doesn't parse this")
    @testing.fails_on('postgresql+zxjdbc', "zxjdbc parses the SQL itself before passing on to PG, doesn't parse this")
    def test_values_with_boolean_selects(self):
        """Tests a values clause that works with select boolean evaluations"""
        sess = create_session()

        q = sess.query(User)
        q2 = q.group_by(User.name.like('%j%')).order_by(desc(User.name.like('%j%'))).values(User.name.like('%j%'), func.count(User.name.like('%j%')))
        eq_(list(q2), [(True, 1), (False, 3)])

        q2 = q.order_by(desc(User.name.like('%j%'))).values(User.name.like('%j%'))
        eq_(list(q2), [(True,), (False,), (False,), (False,)])


    def test_correlated_subquery(self):
        """test that a subquery constructed from ORM attributes doesn't leak out 
        those entities to the outermost query.
    
        """
        sess = create_session()
    
        subq = select([func.count()]).\
            where(User.id==Address.user_id).\
            correlate(users).\
            label('count')

        # we don't want Address to be outside of the subquery here
        eq_(
            list(sess.query(User, subq)[0:3]),
            [(User(id=7,name=u'jack'), 1), (User(id=8,name=u'ed'), 3), (User(id=9,name=u'fred'), 1)]
            )

        # same thing without the correlate, as it should
        # not be needed
        subq = select([func.count()]).\
            where(User.id==Address.user_id).\
            label('count')

        # we don't want Address to be outside of the subquery here
        eq_(
            list(sess.query(User, subq)[0:3]),
            [(User(id=7,name=u'jack'), 1), (User(id=8,name=u'ed'), 3), (User(id=9,name=u'fred'), 1)]
            )

    def test_tuple_labeling(self):
        sess = create_session()
        
        # test pickle + all the protocols !
        for pickled in False, -1, 0, 1, 2:
            for row in sess.query(User, Address).join(User.addresses).all():
                if pickled is not False:
                    row = util.pickle.loads(util.pickle.dumps(row, pickled))
                    
                eq_(set(row.keys()), set(['User', 'Address']))
                eq_(row.User, row[0])
                eq_(row.Address, row[1])
        
            for row in sess.query(User.name, User.id.label('foobar')):
                if pickled is not False:
                    row = util.pickle.loads(util.pickle.dumps(row, pickled))
                eq_(set(row.keys()), set(['name', 'foobar']))
                eq_(row.name, row[0])
                eq_(row.foobar, row[1])

            for row in sess.query(User).values(User.name, User.id.label('foobar')):
                if pickled is not False:
                    row = util.pickle.loads(util.pickle.dumps(row, pickled))
                eq_(set(row.keys()), set(['name', 'foobar']))
                eq_(row.name, row[0])
                eq_(row.foobar, row[1])

            oalias = aliased(Order)
            for row in sess.query(User, oalias).join(User.orders).all():
                if pickled is not False:
                    row = util.pickle.loads(util.pickle.dumps(row, pickled))
                eq_(set(row.keys()), set(['User']))
                eq_(row.User, row[0])

            oalias = aliased(Order, name='orders')
            for row in sess.query(User, oalias).join(User.orders).all():
                if pickled is not False:
                    row = util.pickle.loads(util.pickle.dumps(row, pickled))
                eq_(set(row.keys()), set(['User', 'orders']))
                eq_(row.User, row[0])
                eq_(row.orders, row[1])
            
            if pickled is not False:
                ret = sess.query(User, Address).join(User.addresses).all()
                util.pickle.loads(util.pickle.dumps(ret, pickled))
                
    def test_column_queries(self):
        sess = create_session()

        eq_(sess.query(User.name).all(), [(u'jack',), (u'ed',), (u'fred',), (u'chuck',)])
    
        sel = users.select(User.id.in_([7, 8])).alias()
        q = sess.query(User.name)
        q2 = q.select_from(sel).all()
        eq_(list(q2), [(u'jack',), (u'ed',)])

        eq_(sess.query(User.name, Address.email_address).filter(User.id==Address.user_id).all(), [
            (u'jack', u'jack@bean.com'), (u'ed', u'ed@wood.com'), 
            (u'ed', u'ed@bettyboop.com'), (u'ed', u'ed@lala.com'), 
            (u'fred', u'fred@fred.com')
        ])
    
        eq_(sess.query(User.name, func.count(Address.email_address)).outerjoin(User.addresses).group_by(User.id, User.name).order_by(User.id).all(), 
            [(u'jack', 1), (u'ed', 3), (u'fred', 1), (u'chuck', 0)]
        )

        eq_(sess.query(User, func.count(Address.email_address)).outerjoin(User.addresses).group_by(User).order_by(User.id).all(), 
            [(User(name='jack',id=7), 1), (User(name='ed',id=8), 3), (User(name='fred',id=9), 1), (User(name='chuck',id=10), 0)]
        )

        eq_(sess.query(func.count(Address.email_address), User).outerjoin(User.addresses).group_by(User).order_by(User.id).all(), 
            [(1, User(name='jack',id=7)), (3, User(name='ed',id=8)), (1, User(name='fred',id=9)), (0, User(name='chuck',id=10))]
        )
    
        adalias = aliased(Address)
        eq_(sess.query(User, func.count(adalias.email_address)).outerjoin(('addresses', adalias)).group_by(User).order_by(User.id).all(), 
            [(User(name='jack',id=7), 1), (User(name='ed',id=8), 3), (User(name='fred',id=9), 1), (User(name='chuck',id=10), 0)]
        )

        eq_(sess.query(func.count(adalias.email_address), User).outerjoin((User.addresses, adalias)).group_by(User).order_by(User.id).all(),
            [(1, User(name=u'jack',id=7)), (3, User(name=u'ed',id=8)), (1, User(name=u'fred',id=9)), (0, User(name=u'chuck',id=10))]
        )

        # select from aliasing + explicit aliasing
        eq_(
            sess.query(User, adalias.email_address, adalias.id).outerjoin((User.addresses, adalias)).from_self(User, adalias.email_address).order_by(User.id, adalias.id).all(),
            [
                (User(name=u'jack',id=7), u'jack@bean.com'), 
                (User(name=u'ed',id=8), u'ed@wood.com'), 
                (User(name=u'ed',id=8), u'ed@bettyboop.com'),
                (User(name=u'ed',id=8), u'ed@lala.com'), 
                (User(name=u'fred',id=9), u'fred@fred.com'), 
                (User(name=u'chuck',id=10), None)
            ]
        )
    
        # anon + select from aliasing
        eq_(
            sess.query(User).join(User.addresses, aliased=True).filter(Address.email_address.like('%ed%')).from_self().all(),
            [
                User(name=u'ed',id=8), 
                User(name=u'fred',id=9), 
            ]
        )

        # test eager aliasing, with/without select_from aliasing
        for q in [
            sess.query(User, adalias.email_address).outerjoin((User.addresses, adalias)).options(joinedload(User.addresses)).order_by(User.id, adalias.id).limit(10),
            sess.query(User, adalias.email_address, adalias.id).outerjoin((User.addresses, adalias)).from_self(User, adalias.email_address).options(joinedload(User.addresses)).order_by(User.id, adalias.id).limit(10),
        ]:
            eq_(

                q.all(),
                [(User(addresses=[Address(user_id=7,email_address=u'jack@bean.com',id=1)],name=u'jack',id=7), u'jack@bean.com'), 
                (User(addresses=[
                                    Address(user_id=8,email_address=u'ed@wood.com',id=2), 
                                    Address(user_id=8,email_address=u'ed@bettyboop.com',id=3), 
                                    Address(user_id=8,email_address=u'ed@lala.com',id=4)],name=u'ed',id=8), u'ed@wood.com'), 
                (User(addresses=[
                            Address(user_id=8,email_address=u'ed@wood.com',id=2), 
                            Address(user_id=8,email_address=u'ed@bettyboop.com',id=3), 
                            Address(user_id=8,email_address=u'ed@lala.com',id=4)],name=u'ed',id=8), u'ed@bettyboop.com'), 
                (User(addresses=[
                            Address(user_id=8,email_address=u'ed@wood.com',id=2), 
                            Address(user_id=8,email_address=u'ed@bettyboop.com',id=3), 
                            Address(user_id=8,email_address=u'ed@lala.com',id=4)],name=u'ed',id=8), u'ed@lala.com'), 
                (User(addresses=[Address(user_id=9,email_address=u'fred@fred.com',id=5)],name=u'fred',id=9), u'fred@fred.com'), 

                (User(addresses=[],name=u'chuck',id=10), None)]
        )

    def test_column_from_limited_joinedload(self):
        sess = create_session()
    
        def go():
            results = sess.query(User).limit(1).options(joinedload('addresses')).add_column(User.name).all()
            eq_(results, [(User(name='jack'), 'jack')])
        self.assert_sql_count(testing.db, go, 1)
    
    @testing.fails_on('postgresql+pg8000', "'type oid 705 not mapped to py type' (due to literal)")
    def test_self_referential(self):
    
        sess = create_session()
        oalias = aliased(Order)

        for q in [
            sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id).order_by(Order.id, oalias.id),
            sess.query(Order, oalias).from_self().filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id).order_by(Order.id, oalias.id),
        
            # same thing, but reversed.  
            sess.query(oalias, Order).from_self().filter(oalias.user_id==Order.user_id).filter(oalias.user_id==7).filter(Order.id<oalias.id).order_by(oalias.id, Order.id),
        
            # here we go....two layers of aliasing
            sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id).from_self().order_by(Order.id, oalias.id).limit(10).options(joinedload(Order.items)),

            # gratuitous four layers
            sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id).from_self().from_self().from_self().order_by(Order.id, oalias.id).limit(10).options(joinedload(Order.items)),

        ]:
    
            eq_(
            q.all(),
            [
                (Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3), Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1)), 
                (Order(address_id=None,description=u'order 5',isopen=0,user_id=7,id=5), Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1)), 
                (Order(address_id=None,description=u'order 5',isopen=0,user_id=7,id=5), Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3))                
            ]
        )
        
        
        # ensure column expressions are taken from inside the subquery, not restated at the top
        q = sess.query(Order.id, Order.description, literal_column("'q'").label('foo')).\
            filter(Order.description == u'order 3').from_self()
        self.assert_compile(q, 
            "SELECT anon_1.orders_id AS anon_1_orders_id, anon_1.orders_description AS anon_1_orders_description, "
            "anon_1.foo AS anon_1_foo FROM (SELECT orders.id AS orders_id, orders.description AS orders_description, "
            "'q' AS foo FROM orders WHERE orders.description = :description_1) AS anon_1", use_default_dialect=True)
        eq_(
            q.all(),
            [(3, u'order 3', 'q')]
        )
        
    
    def test_multi_mappers(self):

        test_session = create_session()

        (user7, user8, user9, user10) = test_session.query(User).all()
        (address1, address2, address3, address4, address5) = test_session.query(Address).all()

        expected = [(user7, address1),
            (user8, address2),
            (user8, address3),
            (user8, address4),
            (user9, address5),
            (user10, None)]

        sess = create_session()

        selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.id, addresses.c.id])
        eq_(list(sess.query(User, Address).instances(selectquery.execute())), expected)
        sess.expunge_all()

        for address_entity in (Address, aliased(Address)):
            q = sess.query(User).add_entity(address_entity).outerjoin(('addresses', address_entity)).order_by(User.id, address_entity.id)
            eq_(q.all(), expected)
            sess.expunge_all()

            q = sess.query(User).add_entity(address_entity)
            q = q.join(('addresses', address_entity)).filter_by(email_address='ed@bettyboop.com')
            eq_(q.all(), [(user8, address3)])
            sess.expunge_all()

            q = sess.query(User, address_entity).join(('addresses', address_entity)).filter_by(email_address='ed@bettyboop.com')
            eq_(q.all(), [(user8, address3)])
            sess.expunge_all()

            q = sess.query(User, address_entity).join(('addresses', address_entity)).options(joinedload('addresses')).filter_by(email_address='ed@bettyboop.com')
            eq_(list(util.OrderedSet(q.all())), [(user8, address3)])
            sess.expunge_all()

    def test_aliased_multi_mappers(self):
        sess = create_session()

        (user7, user8, user9, user10) = sess.query(User).all()
        (address1, address2, address3, address4, address5) = sess.query(Address).all()

        expected = [(user7, address1),
            (user8, address2),
            (user8, address3),
            (user8, address4),
            (user9, address5),
            (user10, None)]

        q = sess.query(User)
        adalias = addresses.alias('adalias')
        q = q.add_entity(Address, alias=adalias).select_from(users.outerjoin(adalias))
        l = q.order_by(User.id, adalias.c.id).all()
        assert l == expected

        sess.expunge_all()

        q = sess.query(User).add_entity(Address, alias=adalias)
        l = q.select_from(users.outerjoin(adalias)).filter(adalias.c.email_address=='ed@bettyboop.com').all()
        assert l == [(user8, address3)]

    def test_multi_columns(self):
        sess = create_session()

        expected = [(u, u.name) for u in sess.query(User).all()]

        for add_col in (User.name, users.c.name):
            assert sess.query(User).add_column(add_col).all() == expected
            sess.expunge_all()

        assert_raises(sa_exc.InvalidRequestError, sess.query(User).add_column, object())

    def test_add_multi_columns(self):
        """test that add_column accepts a FROM clause."""
    
        sess = create_session()
    
        eq_(
            sess.query(User.id).add_column(users).all(),
            [(7, 7, u'jack'), (8, 8, u'ed'), (9, 9, u'fred'), (10, 10, u'chuck')]
        )
    
    def test_multi_columns_2(self):
        """test aliased/nonalised joins with the usage of add_column()"""
        sess = create_session()

        (user7, user8, user9, user10) = sess.query(User).all()
        expected = [(user7, 1),
            (user8, 3),
            (user9, 1),
            (user10, 0)
            ]

        q = sess.query(User)
        q = q.group_by(users).order_by(User.id).outerjoin('addresses').add_column(func.count(Address.id).label('count'))
        eq_(q.all(), expected)
        sess.expunge_all()
    
        adalias = aliased(Address)
        q = sess.query(User)
        q = q.group_by(users).order_by(User.id).outerjoin(('addresses', adalias)).add_column(func.count(adalias.id).label('count'))
        eq_(q.all(), expected)
        sess.expunge_all()

        # TODO: figure out why group_by(users) doesn't work here
        s = select([users, func.count(addresses.c.id).label('count')]).select_from(users.outerjoin(addresses)).group_by(*[c for c in users.c]).order_by(User.id)
        q = sess.query(User)
        l = q.add_column("count").from_statement(s).all()
        assert l == expected


    def test_raw_columns(self):
        sess = create_session()
        (user7, user8, user9, user10) = sess.query(User).all()
        expected = [
            (user7, 1, "Name:jack"),
            (user8, 3, "Name:ed"),
            (user9, 1, "Name:fred"),
            (user10, 0, "Name:chuck")]

        adalias = addresses.alias()
        q = create_session().query(User).add_column(func.count(adalias.c.id))\
            .add_column(("Name:" + users.c.name)).outerjoin(('addresses', adalias))\
            .group_by(users).order_by(users.c.id)

        assert q.all() == expected

        # test with a straight statement
        s = select([users, func.count(addresses.c.id).label('count'), ("Name:" + users.c.name).label('concat')], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=[users.c.id])
        q = create_session().query(User)
        l = q.add_column("count").add_column("concat").from_statement(s).all()
        assert l == expected

        sess.expunge_all()

        # test with select_from()
        q = create_session().query(User).add_column(func.count(addresses.c.id))\
            .add_column(("Name:" + users.c.name)).select_from(users.outerjoin(addresses))\
            .group_by(users).order_by(users.c.id)

        assert q.all() == expected
        sess.expunge_all()

        q = create_session().query(User).add_column(func.count(addresses.c.id))\
            .add_column(("Name:" + users.c.name)).outerjoin('addresses')\
            .group_by(users).order_by(users.c.id)

        assert q.all() == expected
        sess.expunge_all()

        q = create_session().query(User).add_column(func.count(adalias.c.id))\
            .add_column(("Name:" + users.c.name)).outerjoin(('addresses', adalias))\
            .group_by(users).order_by(users.c.id)

        assert q.all() == expected
        sess.expunge_all()

class ImmediateTest(_fixtures.FixtureTest):
    run_inserts = 'once'
    run_deletes = None

    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        mapper(Address, addresses)

        mapper(User, users, properties=dict(
            addresses=relationship(Address)))

    @testing.resolve_artifact_names
    def test_one(self):
        sess = create_session()

        assert_raises(sa.orm.exc.NoResultFound,
                          sess.query(User).filter(User.id == 99).one)

        eq_(sess.query(User).filter(User.id == 7).one().id, 7)

        assert_raises(sa.orm.exc.MultipleResultsFound,
                          sess.query(User).one)

        assert_raises(
            sa.orm.exc.NoResultFound,
            sess.query(User.id, User.name).filter(User.id == 99).one)

        eq_(sess.query(User.id, User.name).filter(User.id == 7).one(),
            (7, 'jack'))

        assert_raises(sa.orm.exc.MultipleResultsFound,
                          sess.query(User.id, User.name).one)

        assert_raises(sa.orm.exc.NoResultFound,
                          (sess.query(User, Address).
                           join(User.addresses).
                           filter(Address.id == 99)).one)

        eq_((sess.query(User, Address).
            join(User.addresses).
            filter(Address.id == 4)).one(),
           (User(id=8), Address(id=4)))

        assert_raises(sa.orm.exc.MultipleResultsFound,
                         sess.query(User, Address).join(User.addresses).one)

        # this result returns multiple rows, the first
        # two rows being the same.  but uniquing is 
        # not applied for a column based result.
        assert_raises(sa.orm.exc.MultipleResultsFound,
                       sess.query(User.id).
                       join(User.addresses).
                       filter(User.id.in_([8, 9])).
                       order_by(User.id).
                       one)

        # test that a join which ultimately returns 
        # multiple identities across many rows still 
        # raises, even though the first two rows are of 
        # the same identity and unique filtering 
        # is applied ([ticket:1688])
        assert_raises(sa.orm.exc.MultipleResultsFound,
                        sess.query(User).
                        join(User.addresses).
                        filter(User.id.in_([8, 9])).
                        order_by(User.id).
                        one)
                        

    @testing.future
    def test_getslice(self):
        assert False

    @testing.resolve_artifact_names
    def test_scalar(self):
        sess = create_session()

        eq_(sess.query(User.id).filter_by(id=7).scalar(), 7)
        eq_(sess.query(User.id, User.name).filter_by(id=7).scalar(), 7)
        eq_(sess.query(User.id).filter_by(id=0).scalar(), None)
        eq_(sess.query(User).filter_by(id=7).scalar(),
            sess.query(User).filter_by(id=7).one())
        
        assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User).scalar)
        assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User.id, User.name).scalar)
        
    @testing.resolve_artifact_names
    def test_value(self):
        sess = create_session()

        eq_(sess.query(User).filter_by(id=7).value(User.id), 7)
        eq_(sess.query(User.id, User.name).filter_by(id=7).value(User.id), 7)
        eq_(sess.query(User).filter_by(id=0).value(User.id), None)

        sess.bind = testing.db
        eq_(sess.query().value(sa.literal_column('1').label('x')), 1)


class SelectFromTest(QueryTest, AssertsCompiledSQL):
    run_setup_mappers = None

    def test_replace_with_select(self):
        mapper(User, users, properties = {
            'addresses':relationship(Address)
        })
        mapper(Address, addresses)

        sel = users.select(users.c.id.in_([7, 8])).alias()
        sess = create_session()

        eq_(sess.query(User).select_from(sel).all(), [User(id=7), User(id=8)])

        eq_(sess.query(User).select_from(sel).filter(User.id==8).all(), [User(id=8)])

        eq_(sess.query(User).select_from(sel).order_by(desc(User.name)).all(), [
            User(name='jack',id=7), User(name='ed',id=8)
        ])

        eq_(sess.query(User).select_from(sel).order_by(asc(User.name)).all(), [
            User(name='ed',id=8), User(name='jack',id=7)
        ])

        eq_(sess.query(User).select_from(sel).options(joinedload('addresses')).first(),
            User(name='jack', addresses=[Address(id=1)])
        )

    def test_join_mapper_order_by(self):
        """test that mapper-level order_by is adapted to a selectable."""
    
        mapper(User, users, order_by=users.c.id)

        sel = users.select(users.c.id.in_([7, 8]))
        sess = create_session()

        eq_(sess.query(User).select_from(sel).all(),
            [
                User(name='jack',id=7), User(name='ed',id=8)
            ]
        )

    def test_differentiate_self_external(self):
        """test some different combinations of joining a table to a subquery of itself."""
        
        mapper(User, users)
        
        sess = create_session()

        sel = sess.query(User).filter(User.id.in_([7, 8])).subquery()
        ualias = aliased(User)
        
        self.assert_compile(
            sess.query(User).join((sel, User.id>sel.c.id)),
            "SELECT users.id AS users_id, users.name AS users_name FROM "
            "users JOIN (SELECT users.id AS id, users.name AS name FROM "
            "users WHERE users.id IN (:id_1, :id_2)) AS anon_1 ON users.id > anon_1.id",
            use_default_dialect=True
        )
    
        self.assert_compile(
            sess.query(ualias).select_from(sel).filter(ualias.id>sel.c.id),
            "SELECT users_1.id AS users_1_id, users_1.name AS users_1_name FROM "
            "users AS users_1, (SELECT users.id AS id, users.name AS name FROM "
            "users WHERE users.id IN (:id_1, :id_2)) AS anon_1 WHERE users_1.id > anon_1.id",
            use_default_dialect=True
        )

        # these two are essentially saying, "join ualias to ualias", so an 
        # error is raised.  join() deals with entities, not what's in
        # select_from().
        assert_raises(sa_exc.InvalidRequestError,
            sess.query(ualias).select_from(sel).join, (ualias, ualias.id>sel.c.id)
        )

        assert_raises(sa_exc.InvalidRequestError,
            sess.query(ualias).select_from(sel).join, (ualias, ualias.id>User.id)
        )

        salias = aliased(User, sel)
        self.assert_compile(
            sess.query(salias).join((ualias, ualias.id>salias.id)),
            "SELECT anon_1.id AS anon_1_id, anon_1.name AS anon_1_name FROM "
            "(SELECT users.id AS id, users.name AS name FROM users WHERE users.id "
            "IN (:id_1, :id_2)) AS anon_1 JOIN users AS users_1 ON users_1.id > anon_1.id",
            use_default_dialect=True
        )
        
        
        # this one uses an explicit join(left, right, onclause) so works
        self.assert_compile(
            sess.query(ualias).select_from(join(sel, ualias, ualias.id>sel.c.id)),
            "SELECT users_1.id AS users_1_id, users_1.name AS users_1_name FROM "
            "(SELECT users.id AS id, users.name AS name FROM users WHERE users.id "
            "IN (:id_1, :id_2)) AS anon_1 JOIN users AS users_1 ON users_1.id > anon_1.id",
            use_default_dialect=True
        )
        
        
        
    def test_join_no_order_by(self):
        mapper(User, users)

        sel = users.select(users.c.id.in_([7, 8]))
        sess = create_session()

        eq_(sess.query(User).select_from(sel).all(),
            [
                User(name='jack',id=7), User(name='ed',id=8)
            ]
        )

    def test_join(self):
        mapper(User, users, properties = {
            'addresses':relationship(Address)
        })
        mapper(Address, addresses)

        sel = users.select(users.c.id.in_([7, 8]))
        sess = create_session()

        eq_(sess.query(User).select_from(sel).join('addresses').
                    add_entity(Address).order_by(User.id).order_by(Address.id).all(),
            [
                (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)),
                (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)),
                (User(name='ed',id=8), Address(user_id=8,email_address='ed@bettyboop.com',id=3)),
                (User(name='ed',id=8), Address(user_id=8,email_address='ed@lala.com',id=4))
            ]
        )

        adalias = aliased(Address)
        eq_(sess.query(User).select_from(sel).join(('addresses', adalias)).
                    add_entity(adalias).order_by(User.id).order_by(adalias.id).all(),
            [
                (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)),
                (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)),
                (User(name='ed',id=8), Address(user_id=8,email_address='ed@bettyboop.com',id=3)),
                (User(name='ed',id=8), Address(user_id=8,email_address='ed@lala.com',id=4))
            ]
        )
    

    def test_more_joins(self):
        mapper(User, users, properties={
            'orders':relationship(Order, backref='user'), # o2m, m2o
        })
        mapper(Order, orders, properties={
            'items':relationship(Item, secondary=order_items, order_by=items.c.id),  #m2m
        })
        mapper(Item, items, properties={
            'keywords':relationship(Keyword, secondary=item_keywords, order_by=keywords.c.id) #m2m
        })
        mapper(Keyword, keywords)

        sel = users.select(users.c.id.in_([7, 8]))
        sess = create_session()
    
        eq_(sess.query(User).select_from(sel).join('orders', 'items', 'keywords').filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [
            User(name=u'jack',id=7)
        ])

        eq_(sess.query(User).select_from(sel).join('orders', 'items', 'keywords', aliased=True).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [
            User(name=u'jack',id=7)
        ])

        def go():
            eq_(
                sess.query(User).select_from(sel).
                            options(joinedload_all('orders.items.keywords')).
                            join('orders', 'items', 'keywords', aliased=True).
                            filter(Keyword.name.in_(['red', 'big', 'round'])).all(), 
                [
                User(name=u'jack',orders=[
                    Order(description=u'order 1',items=[
                        Item(description=u'item 1',keywords=[Keyword(name=u'red'), Keyword(name=u'big'), Keyword(name=u'round')]),
                        Item(description=u'item 2',keywords=[Keyword(name=u'red',id=2), Keyword(name=u'small',id=5), Keyword(name=u'square')]),
                        Item(description=u'item 3',keywords=[Keyword(name=u'green',id=3), Keyword(name=u'big',id=4), Keyword(name=u'round',id=6)])
                    ]),
                    Order(description=u'order 3',items=[
                        Item(description=u'item 3',keywords=[Keyword(name=u'green',id=3), Keyword(name=u'big',id=4), Keyword(name=u'round',id=6)]),
                        Item(description=u'item 4',keywords=[],id=4),
                        Item(description=u'item 5',keywords=[],id=5)
                        ]),
                    Order(description=u'order 5',items=[Item(description=u'item 5',keywords=[])])])
                ])
        self.assert_sql_count(testing.db, go, 1)

        sess.expunge_all()
        sel2 = orders.select(orders.c.id.in_([1,2,3]))
        eq_(sess.query(Order).select_from(sel2).join('items', 'keywords').filter(Keyword.name == 'red').order_by(Order.id).all(), [
            Order(description=u'order 1',id=1),
            Order(description=u'order 2',id=2),
        ])
        eq_(sess.query(Order).select_from(sel2).join('items', 'keywords', aliased=True).filter(Keyword.name == 'red').order_by(Order.id).all(), [
            Order(description=u'order 1',id=1),
            Order(description=u'order 2',id=2),
        ])


    def test_replace_with_eager(self):
        mapper(User, users, properties = {
            'addresses':relationship(Address, order_by=addresses.c.id)
        })
        mapper(Address, addresses)

        sel = users.select(users.c.id.in_([7, 8]))
        sess = create_session()

        def go():
            eq_(sess.query(User).options(joinedload('addresses')).select_from(sel).order_by(User.id).all(),
                [
                    User(id=7, addresses=[Address(id=1)]),
                    User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])
                ]
            )
        self.assert_sql_count(testing.db, go, 1)
        sess.expunge_all()

        def go():
            eq_(sess.query(User).options(joinedload('addresses')).select_from(sel).filter(User.id==8).order_by(User.id).all(),
                [User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])]
            )
        self.assert_sql_count(testing.db, go, 1)
        sess.expunge_all()

        def go():
            eq_(sess.query(User).options(joinedload('addresses')).select_from(sel).order_by(User.id)[1], User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)]))
        self.assert_sql_count(testing.db, go, 1)

class CustomJoinTest(QueryTest):
    run_setup_mappers = None

    def test_double_same_mappers(self):
        """test aliasing of joins with a custom join condition"""
        mapper(Address, addresses)
        mapper(Order, orders, properties={
            'items':relationship(Item, secondary=order_items, lazy='select', order_by=items.c.id),
        })
        mapper(Item, items)
        mapper(User, users, properties = dict(
            addresses = relationship(Address, lazy='select'),
            open_orders = relationship(Order, primaryjoin = and_(orders.c.isopen == 1, users.c.id==orders.c.user_id), lazy='select'),
            closed_orders = relationship(Order, primaryjoin = and_(orders.c.isopen == 0, users.c.id==orders.c.user_id), lazy='select')
        ))
        q = create_session().query(User)
        
        eq_(
            q.join('open_orders', 'items', aliased=True).filter(Item.id==4).\
                        join('closed_orders', 'items', aliased=True).filter(Item.id==3).all(),
            [User(id=7)]
        )

class SelfReferentialTest(_base.MappedTest, AssertsCompiledSQL):
    run_setup_mappers = 'once'
    run_inserts = 'once'
    run_deletes = None

    @classmethod
    def define_tables(cls, metadata):
        global nodes
        nodes = Table('nodes', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('parent_id', Integer, ForeignKey('nodes.id')),
            Column('data', String(30)))

    @classmethod
    def insert_data(cls):
        global Node
    
        class Node(Base):
            def append(self, node):
                self.children.append(node)

        mapper(Node, nodes, properties={
            'children':relationship(Node, lazy='select', join_depth=3,
                backref=backref('parent', remote_side=[nodes.c.id])
            )
        })
        sess = create_session()
        n1 = Node(data='n1')
        n1.append(Node(data='n11'))
        n1.append(Node(data='n12'))
        n1.append(Node(data='n13'))
        n1.children[1].append(Node(data='n121'))
        n1.children[1].append(Node(data='n122'))
        n1.children[1].append(Node(data='n123'))
        sess.add(n1)
        sess.flush()
        sess.close()
    
    def test_join(self):
        sess = create_session()

        node = sess.query(Node).join('children', aliased=True).filter_by(data='n122').first()
        assert node.data=='n12'

        ret = sess.query(Node.data).join(Node.children, aliased=True).filter_by(data='n122').all()
        assert ret == [('n12',)]

    
        node = sess.query(Node).join('children', 'children', aliased=True).filter_by(data='n122').first()
        assert node.data=='n1'

        node = sess.query(Node).filter_by(data='n122').join('parent', aliased=True).filter_by(data='n12').\
            join('parent', aliased=True, from_joinpoint=True).filter_by(data='n1').first()
        assert node.data == 'n122'
    
    def test_string_or_prop_aliased(self):
        """test that join('foo') behaves the same as join(Cls.foo) in a self
        referential scenario.
        
        """
        
        sess = create_session()
        nalias = aliased(Node, sess.query(Node).filter_by(data='n1').subquery())
        
        q1 = sess.query(nalias).join(nalias.children, aliased=True).\
                join(Node.children, from_joinpoint=True)

        q2 = sess.query(nalias).join(nalias.children, aliased=True).\
                join("children", from_joinpoint=True)

        for q in (q1, q2):
            self.assert_compile(
                q,
                "SELECT anon_1.id AS anon_1_id, anon_1.parent_id AS "
                "anon_1_parent_id, anon_1.data AS anon_1_data FROM "
                "(SELECT nodes.id AS id, nodes.parent_id AS parent_id, "
                "nodes.data AS data FROM nodes WHERE nodes.data = :data_1) "
                "AS anon_1 JOIN nodes AS nodes_1 ON anon_1.id = "
                "nodes_1.parent_id JOIN nodes ON nodes_1.id = nodes.parent_id",
                use_default_dialect=True
            )
        
        q1 = sess.query(Node).join(nalias.children, aliased=True).\
                join(Node.children, aliased=True, from_joinpoint=True).\
                join(Node.children, from_joinpoint=True)

        q2 = sess.query(Node).join(nalias.children, aliased=True).\
                join("children", aliased=True, from_joinpoint=True).\
                join("children", from_joinpoint=True)
                
        for q in (q1, q2):
            self.assert_compile(
                q,
                "SELECT nodes.id AS nodes_id, nodes.parent_id AS "
                "nodes_parent_id, nodes.data AS nodes_data FROM (SELECT "
                "nodes.id AS id, nodes.parent_id AS parent_id, nodes.data "
                "AS data FROM nodes WHERE nodes.data = :data_1) AS anon_1 "
                "JOIN nodes AS nodes_1 ON anon_1.id = nodes_1.parent_id "
                "JOIN nodes AS nodes_2 ON nodes_1.id = nodes_2.parent_id "
                "JOIN nodes ON nodes_2.id = nodes.parent_id",
                use_default_dialect=True
            )
        
    def test_from_self_inside_excludes_outside(self):
        """test the propagation of aliased() from inside to outside
        on a from_self()..
        """
        sess = create_session()
        
        n1 = aliased(Node)
        
        # n1 is not inside the from_self(), so all cols must be maintained
        # on the outside
        self.assert_compile(
            sess.query(Node).filter(Node.data=='n122').from_self(n1, Node.id),
            "SELECT nodes_1.id AS nodes_1_id, nodes_1.parent_id AS nodes_1_parent_id, "
            "nodes_1.data AS nodes_1_data, anon_1.nodes_id AS anon_1_nodes_id "
            "FROM nodes AS nodes_1, (SELECT nodes.id AS nodes_id, "
            "nodes.parent_id AS nodes_parent_id, nodes.data AS nodes_data FROM "
            "nodes WHERE nodes.data = :data_1) AS anon_1",
            use_default_dialect=True
        )

        parent = aliased(Node)
        grandparent = aliased(Node)
        q = sess.query(Node, parent, grandparent).\
            join((Node.parent, parent), (parent.parent, grandparent)).\
                filter(Node.data=='n122').filter(parent.data=='n12').\
                filter(grandparent.data=='n1').from_self().limit(1)
        
        # parent, grandparent *are* inside the from_self(), so they 
        # should get aliased to the outside.
        self.assert_compile(
            q,
            "SELECT anon_1.nodes_id AS anon_1_nodes_id, "
            "anon_1.nodes_parent_id AS anon_1_nodes_parent_id, "
            "anon_1.nodes_data AS anon_1_nodes_data, "
            "anon_1.nodes_1_id AS anon_1_nodes_1_id, "
            "anon_1.nodes_1_parent_id AS anon_1_nodes_1_parent_id, "
            "anon_1.nodes_1_data AS anon_1_nodes_1_data, "
            "anon_1.nodes_2_id AS anon_1_nodes_2_id, "
            "anon_1.nodes_2_parent_id AS anon_1_nodes_2_parent_id, "
            "anon_1.nodes_2_data AS anon_1_nodes_2_data "
            "FROM (SELECT nodes.id AS nodes_id, nodes.parent_id "
            "AS nodes_parent_id, nodes.data AS nodes_data, "
            "nodes_1.id AS nodes_1_id, nodes_1.parent_id AS nodes_1_parent_id, "
            "nodes_1.data AS nodes_1_data, nodes_2.id AS nodes_2_id, "
            "nodes_2.parent_id AS nodes_2_parent_id, nodes_2.data AS "
            "nodes_2_data FROM nodes JOIN nodes AS nodes_1 ON "
            "nodes_1.id = nodes.parent_id JOIN nodes AS nodes_2 "
            "ON nodes_2.id = nodes_1.parent_id "
            "WHERE nodes.data = :data_1 AND nodes_1.data = :data_2 AND "
            "nodes_2.data = :data_3) AS anon_1  LIMIT 1",
            use_default_dialect=True
        )
        
    def test_explicit_join(self):
        sess = create_session()
    
        n1 = aliased(Node)
        n2 = aliased(Node)
    
        node = sess.query(Node).select_from(join(Node, n1, 'children')).filter(n1.data=='n122').first()
        assert node.data=='n12'
    
        node = sess.query(Node).select_from(join(Node, n1, 'children').join(n2, 'children')).\
            filter(n2.data=='n122').first()
        assert node.data=='n1'
    
        # mix explicit and named onclauses
        node = sess.query(Node).select_from(join(Node, n1, Node.id==n1.parent_id).join(n2, 'children')).\
            filter(n2.data=='n122').first()
        assert node.data=='n1'

        node = sess.query(Node).select_from(join(Node, n1, 'parent').join(n2, 'parent')).\
            filter(and_(Node.data=='n122', n1.data=='n12', n2.data=='n1')).first()
        assert node.data == 'n122'

        eq_(
            list(sess.query(Node).select_from(join(Node, n1, 'parent').join(n2, 'parent')).\
            filter(and_(Node.data=='n122', n1.data=='n12', n2.data=='n1')).values(Node.data, n1.data, n2.data)),
            [('n122', 'n12', 'n1')])

    def test_join_to_nonaliased(self):
        sess = create_session()
    
        n1 = aliased(Node)

        # using 'n1.parent' implicitly joins to unaliased Node
        eq_(
            sess.query(n1).join(n1.parent).filter(Node.data=='n1').all(),
            [Node(parent_id=1,data=u'n11',id=2), Node(parent_id=1,data=u'n12',id=3), Node(parent_id=1,data=u'n13',id=4)]
        )
    
        # explicit (new syntax)
        eq_(
            sess.query(n1).join((Node, n1.parent)).filter(Node.data=='n1').all(),
            [Node(parent_id=1,data=u'n11',id=2), Node(parent_id=1,data=u'n12',id=3), Node(parent_id=1,data=u'n13',id=4)]
        )
    
        
    def test_multiple_explicit_entities(self):
        sess = create_session()
    
        parent = aliased(Node)
        grandparent = aliased(Node)
        eq_(
            sess.query(Node, parent, grandparent).\
                join((Node.parent, parent), (parent.parent, grandparent)).\
                    filter(Node.data=='n122').filter(parent.data=='n12').\
                    filter(grandparent.data=='n1').first(),
            (Node(data='n122'), Node(data='n12'), Node(data='n1'))
        )

        eq_(
            sess.query(Node, parent, grandparent).\
                join((Node.parent, parent), (parent.parent, grandparent)).\
                    filter(Node.data=='n122').filter(parent.data=='n12').\
                    filter(grandparent.data=='n1').from_self().first(),
            (Node(data='n122'), Node(data='n12'), Node(data='n1'))
        )

        # same, change order around
        eq_(
            sess.query(parent, grandparent, Node).\
                join((Node.parent, parent), (parent.parent, grandparent)).\
                    filter(Node.data=='n122').filter(parent.data=='n12').\
                    filter(grandparent.data=='n1').from_self().first(),
            (Node(data='n12'), Node(data='n1'), Node(data='n122'))
        )

        eq_(
            sess.query(Node, parent, grandparent).\
                join((Node.parent, parent), (parent.parent, grandparent)).\
                    filter(Node.data=='n122').filter(parent.data=='n12').\
                    filter(grandparent.data=='n1').\
                    options(joinedload(Node.children)).first(),
            (Node(data='n122'), Node(data='n12'), Node(data='n1'))
        )

        eq_(
            sess.query(Node, parent, grandparent).\
                join((Node.parent, parent), (parent.parent, grandparent)).\
                    filter(Node.data=='n122').filter(parent.data=='n12').\
                    filter(grandparent.data=='n1').from_self().\
                    options(joinedload(Node.children)).first(),
            (Node(data='n122'), Node(data='n12'), Node(data='n1'))
        )
    
    
    def test_any(self):
        sess = create_session()
        eq_(sess.query(Node).filter(Node.children.any(Node.data=='n1')).all(), [])
        eq_(sess.query(Node).filter(Node.children.any(Node.data=='n12')).all(), [Node(data='n1')])
        eq_(sess.query(Node).filter(~Node.children.any()).order_by(Node.id).all(), 
                [Node(data='n11'), Node(data='n13'),Node(data='n121'),Node(data='n122'),Node(data='n123'),])

    def test_has(self):
        sess = create_session()
    
        eq_(sess.query(Node).filter(Node.parent.has(Node.data=='n12')).order_by(Node.id).all(), 
            [Node(data='n121'),Node(data='n122'),Node(data='n123')])
        eq_(sess.query(Node).filter(Node.parent.has(Node.data=='n122')).all(), [])
        eq_(sess.query(Node).filter(~Node.parent.has()).all(), [Node(data='n1')])

    def test_contains(self):
        sess = create_session()
    
        n122 = sess.query(Node).filter(Node.data=='n122').one()
        eq_(sess.query(Node).filter(Node.children.contains(n122)).all(), [Node(data='n12')])

        n13 = sess.query(Node).filter(Node.data=='n13').one()
        eq_(sess.query(Node).filter(Node.children.contains(n13)).all(), [Node(data='n1')])

    def test_eq_ne(self):
        sess = create_session()
    
        n12 = sess.query(Node).filter(Node.data=='n12').one()
        eq_(sess.query(Node).filter(Node.parent==n12).all(), [Node(data='n121'),Node(data='n122'),Node(data='n123')])
    
        eq_(sess.query(Node).filter(Node.parent != n12).all(), [Node(data='n1'), Node(data='n11'), Node(data='n12'), Node(data='n13')])

class SelfReferentialM2MTest(_base.MappedTest):
    run_setup_mappers = 'once'
    run_inserts = 'once'
    run_deletes = None

    @classmethod
    def define_tables(cls, metadata):
        global nodes, node_to_nodes
        nodes = Table('nodes', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(30)))
        
        node_to_nodes =Table('node_to_nodes', metadata,
            Column('left_node_id', Integer, ForeignKey('nodes.id'),primary_key=True),
            Column('right_node_id', Integer, ForeignKey('nodes.id'),primary_key=True),
            )

    @classmethod
    def insert_data(cls):
        global Node
    
        class Node(Base):
            pass

        mapper(Node, nodes, properties={
            'children':relationship(Node, lazy='select', secondary=node_to_nodes,
                primaryjoin=nodes.c.id==node_to_nodes.c.left_node_id,
                secondaryjoin=nodes.c.id==node_to_nodes.c.right_node_id,
            )
        })
        sess = create_session()
        n1 = Node(data='n1')
        n2 = Node(data='n2')
        n3 = Node(data='n3')
        n4 = Node(data='n4')
        n5 = Node(data='n5')
        n6 = Node(data='n6')
        n7 = Node(data='n7')
    
        n1.children = [n2, n3, n4]
        n2.children = [n3, n6, n7]
        n3.children = [n5, n4]

        sess.add(n1)
        sess.add(n2)
        sess.add(n3)
        sess.add(n4)
        sess.flush()
        sess.close()

    def test_any(self):
        sess = create_session()
        eq_(sess.query(Node).filter(Node.children.any(Node.data=='n3')).all(), [Node(data='n1'), Node(data='n2')])

    def test_contains(self):
        sess = create_session()
        n4 = sess.query(Node).filter_by(data='n4').one()

        eq_(sess.query(Node).filter(Node.children.contains(n4)).order_by(Node.data).all(), [Node(data='n1'), Node(data='n3')])
        eq_(sess.query(Node).filter(not_(Node.children.contains(n4))).order_by(Node.data).all(), [Node(data='n2'), Node(data='n4'), Node(data='n5'), Node(data='n6'), Node(data='n7')])

    def test_explicit_join(self):
        sess = create_session()
    
        n1 = aliased(Node)
        eq_(
            sess.query(Node).select_from(join(Node, n1, 'children')).filter(n1.data.in_(['n3', 'n7'])).order_by(Node.id).all(),
            [Node(data='n1'), Node(data='n2')]
        )
    
class ExternalColumnsTest(QueryTest):
    """test mappers with SQL-expressions added as column properties."""

    run_setup_mappers = None

    def test_external_columns_bad(self):

        assert_raises_message(sa_exc.ArgumentError, "not represented in the mapper's table", mapper, User, users, properties={
            'concat': (users.c.id * 2),
        })
        clear_mappers()

    def test_external_columns(self):
        """test querying mappings that reference external columns or selectables."""
    
        mapper(User, users, properties={
            'concat': column_property((users.c.id * 2)),
            'count': column_property(select([func.count(addresses.c.id)], users.c.id==addresses.c.user_id).correlate(users).as_scalar())
        })

        mapper(Address, addresses, properties={
            'user':relationship(User)
        })

        sess = create_session()
    
        sess.query(Address).options(joinedload('user')).all()

        eq_(sess.query(User).all(), 
            [
                User(id=7, concat=14, count=1),
                User(id=8, concat=16, count=3),
                User(id=9, concat=18, count=1),
                User(id=10, concat=20, count=0),
            ]
        )

        address_result = [
            Address(id=1, user=User(id=7, concat=14, count=1)),
            Address(id=2, user=User(id=8, concat=16, count=3)),
            Address(id=3, user=User(id=8, concat=16, count=3)),
            Address(id=4, user=User(id=8, concat=16, count=3)),
            Address(id=5, user=User(id=9, concat=18, count=1))
        ]
        eq_(sess.query(Address).all(), address_result)

        # run the eager version twice to test caching of aliased clauses
        for x in range(2):
            sess.expunge_all()
            def go():
               eq_(sess.query(Address).options(joinedload('user')).order_by(Address.id).all(), address_result)
            self.assert_sql_count(testing.db, go, 1)
    
        ualias = aliased(User)
        eq_(
            sess.query(Address, ualias).join(('user', ualias)).all(), 
            [(address, address.user) for address in address_result]
        )

        eq_(
                sess.query(Address, ualias.count).join(('user', ualias)).join('user', aliased=True).order_by(Address.id).all(),
                [
                    (Address(id=1), 1),
                    (Address(id=2), 3),
                    (Address(id=3), 3),
                    (Address(id=4), 3),
                    (Address(id=5), 1)
                ]
            )

        eq_(sess.query(Address, ualias.concat, ualias.count).join(('user', ualias)).join('user', aliased=True).order_by(Address.id).all(),
            [
                (Address(id=1), 14, 1),
                (Address(id=2), 16, 3),
                (Address(id=3), 16, 3),
                (Address(id=4), 16, 3),
                (Address(id=5), 18, 1)
            ]
        )

        ua = aliased(User)
        eq_(sess.query(Address, ua.concat, ua.count).
                    select_from(join(Address, ua, 'user')).
                    options(joinedload(Address.user)).order_by(Address.id).all(),
            [
                (Address(id=1, user=User(id=7, concat=14, count=1)), 14, 1),
                (Address(id=2, user=User(id=8, concat=16, count=3)), 16, 3),
                (Address(id=3, user=User(id=8, concat=16, count=3)), 16, 3),
                (Address(id=4, user=User(id=8, concat=16, count=3)), 16, 3),
                (Address(id=5, user=User(id=9, concat=18, count=1)), 18, 1)
            ]
        )

        eq_(list(sess.query(Address).join('user').values(Address.id, User.id, User.concat, User.count)), 
            [(1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), (5, 9, 18, 1)]
        )

        eq_(list(sess.query(Address, ua).select_from(join(Address,ua, 'user')).values(Address.id, ua.id, ua.concat, ua.count)), 
            [(1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), (5, 9, 18, 1)]
        )

    def test_external_columns_joinedload(self):
        # in this test, we have a subquery on User that accesses "addresses", underneath
        # an joinedload for "addresses".  So the "addresses" alias adapter needs to *not* hit 
        # the "addresses" table within the "user" subquery, but "user" still needs to be adapted.
        # therefore the long standing practice of eager adapters being "chained" has been removed
        # since its unnecessary and breaks this exact condition.
        mapper(User, users, properties={
            'addresses':relationship(Address, backref='user', order_by=addresses.c.id),
            'concat': column_property((users.c.id * 2)),
            'count': column_property(select([func.count(addresses.c.id)], users.c.id==addresses.c.user_id).correlate(users))
        })
        mapper(Address, addresses)
        mapper(Order, orders, properties={
            'address':relationship(Address),  # m2o
        })

        sess = create_session()
        def go():
            o1 = sess.query(Order).options(joinedload_all('address.user')).get(1)
            eq_(o1.address.user.count, 1)
        self.assert_sql_count(testing.db, go, 1)

        sess = create_session()
        def go():
            o1 = sess.query(Order).options(joinedload_all('address.user')).first()
            eq_(o1.address.user.count, 1)
        self.assert_sql_count(testing.db, go, 1)

class TestOverlyEagerEquivalentCols(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        global base, sub1, sub2
        base = Table('base', metadata, 
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(50))
        )

        sub1 = Table('sub1', metadata, 
            Column('id', Integer, ForeignKey('base.id'), primary_key=True),
            Column('data', String(50))
        )

        sub2 = Table('sub2', metadata, 
            Column('id', Integer, ForeignKey('base.id'), ForeignKey('sub1.id'), primary_key=True),
            Column('data', String(50))
        )

    def test_equivs(self):
        class Base(_base.ComparableEntity):
            pass
        class Sub1(_base.ComparableEntity):
            pass
        class Sub2(_base.ComparableEntity):
            pass
    
        mapper(Base, base, properties={
            'sub1':relationship(Sub1),
            'sub2':relationship(Sub2)
        })
    
        mapper(Sub1, sub1)
        mapper(Sub2, sub2)
        sess = create_session()
    
        s11 = Sub1(data='s11')
        s12 = Sub1(data='s12')
        s2 = Sub2(data='s2')
        b1 = Base(data='b1', sub1=[s11], sub2=[])
        b2 = Base(data='b1', sub1=[s12], sub2=[])
        sess.add(b1)
        sess.add(b2)
        sess.flush()
    
        # theres an overlapping ForeignKey here, so not much option except
        # to artifically control the flush order
        b2.sub2 = [s2]
        sess.flush()
    
        q = sess.query(Base).outerjoin('sub2', aliased=True)
        assert sub1.c.id not in q._filter_aliases.equivalents

        eq_(
            sess.query(Base).join('sub1').outerjoin('sub2', aliased=True).\
                filter(Sub1.id==1).one(),
                b1
        )
    
class UpdateDeleteTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('users', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('name', String(32)),
              Column('age', Integer))

        Table('documents', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('user_id', None, ForeignKey('users.id')),
              Column('title', String(32)))

    @classmethod
    def setup_classes(cls):
        class User(_base.ComparableEntity):
            pass

        class Document(_base.ComparableEntity):
            pass

    @classmethod
    @testing.resolve_artifact_names
    def insert_data(cls):
        users.insert().execute([
            dict(id=1, name='john', age=25),
            dict(id=2, name='jack', age=47),
            dict(id=3, name='jill', age=29),
            dict(id=4, name='jane', age=37),
        ])

    @testing.resolve_artifact_names
    def insert_documents(self):
        documents.insert().execute([
            dict(id=1, user_id=1, title='foo'),
            dict(id=2, user_id=1, title='bar'),
            dict(id=3, user_id=2, title='baz'),
        ])

    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        mapper(User, users)
        mapper(Document, documents, properties={
            'user': relationship(User, lazy='joined', backref=backref('documents', lazy='select'))
        })

    @testing.resolve_artifact_names
    def test_illegal_operations(self):
        s = create_session()
        
        for q, mname in (
            (s.query(User).limit(2), "limit"),
            (s.query(User).offset(2), "offset"),
            (s.query(User).limit(2).offset(2), "limit"),
            (s.query(User).order_by(User.id), "order_by"),
            (s.query(User).group_by(User.id), "group_by"),
            (s.query(User).distinct(), "distinct")
        ):
            assert_raises_message(sa_exc.InvalidRequestError, r"Can't call Query.update\(\) when %s\(\) has been called" % mname, q.update, {'name':'ed'})
            assert_raises_message(sa_exc.InvalidRequestError, r"Can't call Query.delete\(\) when %s\(\) has been called" % mname, q.delete)
            
        
    @testing.resolve_artifact_names
    def test_delete(self):
        sess = create_session(bind=testing.db, autocommit=False)
    
        john,jack,jill,jane = sess.query(User).order_by(User.id).all()
        sess.query(User).filter(or_(User.name == 'john', User.name == 'jill')).delete()
    
        assert john not in sess and jill not in sess
    
        eq_(sess.query(User).order_by(User.id).all(), [jack,jane])

    @testing.resolve_artifact_names
    def test_delete_with_bindparams(self):
        sess = create_session(bind=testing.db, autocommit=False)

        john,jack,jill,jane = sess.query(User).order_by(User.id).all()
        sess.query(User).filter('name = :name').params(name='john').delete('fetch')
        assert john not in sess

        eq_(sess.query(User).order_by(User.id).all(), [jack,jill,jane])

    @testing.resolve_artifact_names
    def test_delete_rollback(self):
        sess = sessionmaker()()
        john,jack,jill,jane = sess.query(User).order_by(User.id).all()
        sess.query(User).filter(or_(User.name == 'john', User.name == 'jill')).delete(synchronize_session='evaluate')
        assert john not in sess and jill not in sess
        sess.rollback()
        assert john in sess and jill in sess

    @testing.resolve_artifact_names
    def test_delete_rollback_with_fetch(self):
        sess = sessionmaker()()
        john,jack,jill,jane = sess.query(User).order_by(User.id).all()
        sess.query(User).filter(or_(User.name == 'john', User.name == 'jill')).delete(synchronize_session='fetch')
        assert john not in sess and jill not in sess
        sess.rollback()
        assert john in sess and jill in sess
    
    @testing.resolve_artifact_names
    def test_delete_without_session_sync(self):
        sess = create_session(bind=testing.db, autocommit=False)
    
        john,jack,jill,jane = sess.query(User).order_by(User.id).all()
        sess.query(User).filter(or_(User.name == 'john', User.name == 'jill')).delete(synchronize_session=False)
    
        assert john in sess and jill in sess
    
        eq_(sess.query(User).order_by(User.id).all(), [jack,jane])

    @testing.resolve_artifact_names
    def test_delete_with_fetch_strategy(self):
        sess = create_session(bind=testing.db, autocommit=False)
    
        john,jack,jill,jane = sess.query(User).order_by(User.id).all()
        sess.query(User).filter(or_(User.name == 'john', User.name == 'jill')).delete(synchronize_session='fetch')
    
        assert john not in sess and jill not in sess
    
        eq_(sess.query(User).order_by(User.id).all(), [jack,jane])

    @testing.fails_on('mysql', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_delete_invalid_evaluation(self):
        sess = create_session(bind=testing.db, autocommit=False)
    
        john,jack,jill,jane = sess.query(User).order_by(User.id).all()
    
        assert_raises(sa_exc.InvalidRequestError,
            sess.query(User).filter(User.name == select([func.max(User.name)])).delete, synchronize_session='evaluate'
        )
        
        sess.query(User).filter(User.name == select([func.max(User.name)])).delete(synchronize_session='fetch')
        
        assert john not in sess
    
        eq_(sess.query(User).order_by(User.id).all(), [jack,jill,jane])

    @testing.resolve_artifact_names
    def test_update(self):
        sess = create_session(bind=testing.db, autocommit=False)
    
        john,jack,jill,jane = sess.query(User).order_by(User.id).all()
        sess.query(User).filter(User.age > 29).update({'age': User.age - 10}, synchronize_session='evaluate')
    
        eq_([john.age, jack.age, jill.age, jane.age], [25,37,29,27])
        eq_(sess.query(User.age).order_by(User.id).all(), zip([25,37,29,27]))

        sess.query(User).filter(User.age > 29).update({User.age: User.age - 10}, synchronize_session='evaluate')
        eq_([john.age, jack.age, jill.age, jane.age], [25,27,29,27])
        eq_(sess.query(User.age).order_by(User.id).all(), zip([25,27,29,27]))

        sess.query(User).filter(User.age > 27).update({users.c.age: User.age - 10}, synchronize_session='evaluate')
        eq_([john.age, jack.age, jill.age, jane.age], [25,27,19,27])
        eq_(sess.query(User.age).order_by(User.id).all(), zip([25,27,19,27]))

        sess.query(User).filter(User.age == 25).update({User.age: User.age - 10}, synchronize_session='fetch')
        eq_([john.age, jack.age, jill.age, jane.age], [15,27,19,27])
        eq_(sess.query(User.age).order_by(User.id).all(), zip([15,27,19,27]))


    @testing.resolve_artifact_names
    def test_update_with_bindparams(self):
        sess = create_session(bind=testing.db, autocommit=False)

        john,jack,jill,jane = sess.query(User).order_by(User.id).all()

        sess.query(User).filter('age > :x').params(x=29).update({'age': User.age - 10}, synchronize_session='fetch')

        eq_([john.age, jack.age, jill.age, jane.age], [25,37,29,27])
        eq_(sess.query(User.age).order_by(User.id).all(), zip([25,37,29,27]))

    @testing.resolve_artifact_names
    def test_update_changes_resets_dirty(self):
        sess = create_session(bind=testing.db, autocommit=False, autoflush=False)

        john,jack,jill,jane = sess.query(User).order_by(User.id).all()
    
        john.age = 50
        jack.age = 37
    
        # autoflush is false.  therefore our '50' and '37' are getting blown away by this operation.
    
        sess.query(User).filter(User.age > 29).update({'age': User.age - 10}, synchronize_session='evaluate')

        for x in (john, jack, jill, jane):
            assert not sess.is_modified(x)

        eq_([john.age, jack.age, jill.age, jane.age], [25,37,29,27])
    
        john.age = 25
        assert john in sess.dirty
        assert jack in sess.dirty
        assert jill not in sess.dirty
        assert not sess.is_modified(john)
        assert not sess.is_modified(jack)

    @testing.resolve_artifact_names
    def test_update_changes_with_autoflush(self):
        sess = create_session(bind=testing.db, autocommit=False, autoflush=True)

        john,jack,jill,jane = sess.query(User).order_by(User.id).all()

        john.age = 50
        jack.age = 37

        sess.query(User).filter(User.age > 29).update({'age': User.age - 10}, synchronize_session='evaluate')

        for x in (john, jack, jill, jane):
            assert not sess.is_modified(x)

        eq_([john.age, jack.age, jill.age, jane.age], [40, 27, 29, 27])

        john.age = 25
        assert john in sess.dirty
        assert jack not in sess.dirty
        assert jill not in sess.dirty
        assert sess.is_modified(john)
        assert not sess.is_modified(jack)
    
    

    @testing.resolve_artifact_names
    def test_update_with_expire_strategy(self):
        sess = create_session(bind=testing.db, autocommit=False)
    
        john,jack,jill,jane = sess.query(User).order_by(User.id).all()
        sess.query(User).filter(User.age > 29).update({'age': User.age - 10}, synchronize_session='fetch')
    
        eq_([john.age, jack.age, jill.age, jane.age], [25,37,29,27])
        eq_(sess.query(User.age).order_by(User.id).all(), zip([25,37,29,27]))

    @testing.fails_if(lambda: not testing.db.dialect.supports_sane_rowcount)
    @testing.resolve_artifact_names
    def test_update_returns_rowcount(self):
        sess = create_session(bind=testing.db, autocommit=False)

        rowcount = sess.query(User).filter(User.age > 29).update({'age': User.age + 0})
        eq_(rowcount, 2)

        rowcount = sess.query(User).filter(User.age > 29).update({'age': User.age - 10})
        eq_(rowcount, 2)

    @testing.fails_if(lambda: not testing.db.dialect.supports_sane_rowcount)
    @testing.resolve_artifact_names
    def test_delete_returns_rowcount(self):
        sess = create_session(bind=testing.db, autocommit=False)

        rowcount = sess.query(User).filter(User.age > 26).delete(synchronize_session=False)
        eq_(rowcount, 3)

    @testing.resolve_artifact_names
    def test_update_with_eager_relationships(self):
        self.insert_documents()

        sess = create_session(bind=testing.db, autocommit=False)

        foo,bar,baz = sess.query(Document).order_by(Document.id).all()
        sess.query(Document).filter(Document.user_id == 1).update({'title': Document.title+Document.title}, synchronize_session='fetch')

        eq_([foo.title, bar.title, baz.title], ['foofoo','barbar', 'baz'])
        eq_(sess.query(Document.title).order_by(Document.id).all(), zip(['foofoo','barbar', 'baz']))

    @testing.resolve_artifact_names
    def test_update_with_explicit_joinedload(self):
        sess = create_session(bind=testing.db, autocommit=False)

        john,jack,jill,jane = sess.query(User).order_by(User.id).all()
        sess.query(User).options(joinedload(User.documents)).filter(User.age > 29).update({'age': User.age - 10}, synchronize_session='fetch')

        eq_([john.age, jack.age, jill.age, jane.age], [25,37,29,27])
        eq_(sess.query(User.age).order_by(User.id).all(), zip([25,37,29,27]))

    @testing.resolve_artifact_names
    def test_delete_with_eager_relationships(self):
        self.insert_documents()

        sess = create_session(bind=testing.db, autocommit=False)

        sess.query(Document).filter(Document.user_id == 1).delete(synchronize_session=False)

        eq_(sess.query(Document.title).all(), zip(['baz']))

    @testing.resolve_artifact_names
    def test_update_all(self):
        sess = create_session(bind=testing.db, autocommit=False)
    
        john,jack,jill,jane = sess.query(User).order_by(User.id).all()
        sess.query(User).update({'age': 42}, synchronize_session='evaluate')
    
        eq_([john.age, jack.age, jill.age, jane.age], [42,42,42,42])
        eq_(sess.query(User.age).order_by(User.id).all(), zip([42,42,42,42]))

    @testing.resolve_artifact_names
    def test_delete_all(self):
        sess = create_session(bind=testing.db, autocommit=False)
    
        john,jack,jill,jane = sess.query(User).order_by(User.id).all()
        sess.query(User).delete(synchronize_session='evaluate')
        
        assert not (john in sess or jack in sess or jill in sess or jane in sess)
        eq_(sess.query(User).count(), 0)
        

class StatementOptionsTest(QueryTest):
    """ Make sure a Query's execution_options are passed on to the
    resulting statement. """

    def test_query_with_statement_option(self):
        sess = create_session(bind=testing.db, autocommit=False)

        q1 = sess.query(User)
        assert q1._execution_options == dict()
        q2 = q1.execution_options(foo='bar', stream_results=True)
        # q1's options should be unchanged.
        assert q1._execution_options == dict()
        # q2 should have them set.
        assert q2._execution_options == dict(foo='bar', stream_results=True)
        q3 = q2.execution_options(foo='not bar', answer=42)
        assert q2._execution_options == dict(foo='bar', stream_results=True)

        q3_options = dict(foo='not bar', stream_results=True, answer=42)
        assert q3._execution_options == q3_options
        assert q3.statement._execution_options == q3_options
        assert q3._compile_context().statement._execution_options == q3_options
        assert q3.subquery().original._execution_options == q3_options

    # TODO: Test that statement options are passed on to
    # updates/deletes, but currently there are no such options
    # applicable for them.
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.