test_mapper.py :  » Database » SQLAlchemy » SQLAlchemy-0.6.0 » test » orm » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » SQLAlchemy 
SQLAlchemy » SQLAlchemy 0.6.0 » test » orm » test_mapper.py
"""General mapper operations with an emphasis on selecting/loading."""

from sqlalchemy.test.testing import assert_raises,assert_raises_message
import sqlalchemy as sa
from sqlalchemy.test import testing,pickleable
from sqlalchemy import MetaData,Integer,String,ForeignKey,func,util
from sqlalchemy.test.schema import Table,Column
from sqlalchemy.engine import default
from sqlalchemy.orm import mapper,relationship,backref,create_session,class_mapper,compile_mappers,reconstructor,validates,aliased
from sqlalchemy.orm import defer,deferred,synonym,attributes,column_property,composite,relationship,dynamic_loader,comparable_property,AttributeExtension
from sqlalchemy.test.testing import eq_,AssertsCompiledSQL
from test.orm import _base,_fixtures
from sqlalchemy.test.assertsql import AllOf,CompiledSQL


class MapperTest(_fixtures.FixtureTest):

    @testing.resolve_artifact_names
    def test_prop_shadow(self):
        """A backref name may not shadow an existing property name."""

        mapper(Address, addresses)
        mapper(User, users,
            properties={
            'addresses':relationship(Address, backref='email_address')
        })
        assert_raises(sa.exc.ArgumentError, sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_update_attr_keys(self):
        """test that update()/insert() use the correct key when given InstrumentedAttributes."""
        
        mapper(User, users, properties={
            'foobar':users.c.name
        })

        users.insert().values({User.foobar:'name1'}).execute()
        eq_(sa.select([User.foobar]).where(User.foobar=='name1').execute().fetchall(), [('name1',)])

        users.update().values({User.foobar:User.foobar + 'foo'}).execute()
        eq_(sa.select([User.foobar]).where(User.foobar=='name1foo').execute().fetchall(), [('name1foo',)])
        
    @testing.resolve_artifact_names
    def test_utils(self):
        from sqlalchemy.orm.util import _is_mapped_class,_is_aliased_class
        
        class Foo(object):
            x = "something"
            @property
            def y(self):
                return "somethign else"
        m = mapper(Foo, users)
        a1 = aliased(Foo)
        
        f = Foo()

        for fn, arg, ret in [
            (_is_mapped_class, Foo.x, False),
            (_is_mapped_class, Foo.y, False),
            (_is_mapped_class, Foo, True),
            (_is_mapped_class, f, False),
            (_is_mapped_class, a1, True),
            (_is_mapped_class, m, True),
            (_is_aliased_class, a1, True),
            (_is_aliased_class, Foo.x, False),
            (_is_aliased_class, Foo.y, False),
            (_is_aliased_class, Foo, False),
            (_is_aliased_class, f, False),
            (_is_aliased_class, a1, True),
            (_is_aliased_class, m, False),
        ]:
            assert fn(arg) == ret



    @testing.resolve_artifact_names
    def test_prop_accessor(self):
        mapper(User, users)
        assert_raises(NotImplementedError,
                          getattr, sa.orm.class_mapper(User), 'properties')


    @testing.resolve_artifact_names
    def test_bad_cascade(self):
        mapper(Address, addresses)
        assert_raises(sa.exc.ArgumentError,
                          relationship, Address, cascade="fake, all, delete-orphan")

    @testing.resolve_artifact_names
    def test_exceptions_sticky(self):
        """test preservation of mapper compile errors raised during hasattr()."""
        
        mapper(Address, addresses, properties={
            'user':relationship(User)
        })
        
        hasattr(Address.user, 'property')
        assert_raises_message(sa.exc.InvalidRequestError, r"suppressed within a hasattr\(\)", compile_mappers)
    
    @testing.resolve_artifact_names
    def test_column_prefix(self):
        mapper(User, users, column_prefix='_', properties={
            'user_name': synonym('_name')
        })

        s = create_session()
        u = s.query(User).get(7)
        eq_(u._name, 'jack')
        eq_(u._id,7)
        u2 = s.query(User).filter_by(user_name='jack').one()
        assert u is u2

    @testing.resolve_artifact_names
    def test_no_pks_1(self):
        s = sa.select([users.c.name]).alias('foo')
        assert_raises(sa.exc.ArgumentError, mapper, User, s)

    @testing.resolve_artifact_names
    def test_no_pks_2(self):
        s = sa.select([users.c.name]).alias()
        assert_raises(sa.exc.ArgumentError, mapper, User, s)

    @testing.resolve_artifact_names
    def test_recompile_on_other_mapper(self):
        """A compile trigger on an already-compiled mapper still triggers a check against all mappers."""
        mapper(User, users)
        sa.orm.compile_mappers()
        assert sa.orm.mapperlib._new_mappers is False

        m = mapper(Address, addresses, properties={
                'user': relationship(User, backref="addresses")})

        assert m.compiled is False
        assert sa.orm.mapperlib._new_mappers is True
        u = User()
        assert User.addresses
        assert sa.orm.mapperlib._new_mappers is False

    @testing.resolve_artifact_names
    def test_compile_on_session(self):
        m = mapper(User, users)
        session = create_session()
        session.connection(m)

    @testing.resolve_artifact_names
    def test_incomplete_columns(self):
        """Loading from a select which does not contain all columns"""
        mapper(Address, addresses)
        s = create_session()
        a = s.query(Address).from_statement(
            sa.select([addresses.c.id, addresses.c.user_id])).first()
        eq_(a.user_id, 7)
        eq_(a.id, 1)
        # email address auto-defers
        assert 'email_addres' not in a.__dict__
        eq_(a.email_address, 'jack@bean.com')

    @testing.resolve_artifact_names
    def test_column_not_present(self):
        assert_raises_message(sa.exc.ArgumentError, "not represented in the mapper's table", mapper, User, users, properties={
            'foo':addresses.c.user_id
        })
        
    @testing.resolve_artifact_names
    def test_bad_constructor(self):
        """If the construction of a mapped class fails, the instance does not get placed in the session"""
        class Foo(object):
            def __init__(self, one, two, _sa_session=None):
                pass

        mapper(Foo, users, extension=sa.orm.scoped_session(
            create_session).extension)

        sess = create_session()
        assert_raises(TypeError, Foo, 'one', _sa_session=sess)
        eq_(len(list(sess)), 0)
        assert_raises(TypeError, Foo, 'one')
        Foo('one', 'two', _sa_session=sess)
        eq_(len(list(sess)), 1)

    @testing.resolve_artifact_names
    def test_constructor_exc_1(self):
        """Exceptions raised in the mapped class are not masked by sa decorations"""
        ex = AssertionError('oops')
        sess = create_session()

        class Foo(object):
            def __init__(self, **kw):
                raise ex
        mapper(Foo, users)

        try:
            Foo()
            assert False
        except Exception, e:
            assert e is ex

        sa.orm.clear_mappers()
        mapper(Foo, users, extension=sa.orm.scoped_session(
            create_session).extension)
        def bad_expunge(foo):
            raise Exception("this exception should be stated as a warning")

        sess.expunge = bad_expunge
        assert_raises(sa.exc.SAWarning, Foo, _sa_session=sess)

    @testing.resolve_artifact_names
    def test_constructor_exc_2(self):
        """TypeError is raised for illegal constructor args, whether or not explicit __init__ is present [ticket:908]."""

        class Foo(object):
            def __init__(self):
                pass
        class Bar(object):
            pass

        mapper(Foo, users)
        mapper(Bar, addresses)
        assert_raises(TypeError, Foo, x=5)
        assert_raises(TypeError, Bar, x=5)

    @testing.resolve_artifact_names
    def test_props(self):
        m = mapper(User, users, properties = {
            'addresses' : relationship(mapper(Address, addresses))
        }).compile()
        assert User.addresses.property is m.get_property('addresses')

    @testing.resolve_artifact_names
    def test_compile_on_prop_1(self):
        mapper(User, users, properties = {
            'addresses' : relationship(mapper(Address, addresses))
        })
        User.addresses.any(Address.email_address=='foo@bar.com')

    @testing.resolve_artifact_names
    def test_compile_on_prop_2(self):
        mapper(User, users, properties = {
            'addresses' : relationship(mapper(Address, addresses))
        })
        eq_(str(User.id == 3), str(users.c.id==3))

    @testing.resolve_artifact_names
    def test_compile_on_prop_3(self):
        class Foo(User):pass
        mapper(User, users)
        mapper(Foo, addresses, inherits=User)
        assert getattr(Foo().__class__, 'name').impl is not None

    @testing.resolve_artifact_names
    def test_deferred_subclass_attribute_instrument(self):
        class Foo(User):pass
        mapper(User, users)
        compile_mappers()
        mapper(Foo, addresses, inherits=User)
        assert getattr(Foo().__class__, 'name').impl is not None

    @testing.resolve_artifact_names
    def test_extension_collection_frozen(self):
        class Foo(User):pass
        m = mapper(User, users)
        mapper(Order, orders)
        compile_mappers()
        mapper(Foo, addresses, inherits=User)
        ext_list = [AttributeExtension()]
        m.add_property('somename', column_property(users.c.name, extension=ext_list))
        m.add_property('orders', relationship(Order, extension=ext_list, backref='user'))
        assert len(ext_list) == 1

        assert Foo.orders.impl.extensions is User.orders.impl.extensions
        assert Foo.orders.impl.extensions is not ext_list
        
        compile_mappers()
        assert len(User.somename.impl.extensions) == 1
        assert len(Foo.somename.impl.extensions) == 1
        assert len(Foo.orders.impl.extensions) == 3
        assert len(User.orders.impl.extensions) == 3
        

    @testing.resolve_artifact_names
    def test_compile_on_get_props_1(self):
        m =mapper(User, users)
        assert not m.compiled
        assert list(m.iterate_properties)
        assert m.compiled

    @testing.resolve_artifact_names
    def test_compile_on_get_props_2(self):
        m= mapper(User, users)
        assert not m.compiled
        assert m.get_property('name')
        assert m.compiled
        
    @testing.resolve_artifact_names
    def test_add_property(self):
        assert_col = []

        class User(_base.ComparableEntity):
            def _get_name(self):
                assert_col.append(('get', self._name))
                return self._name
            def _set_name(self, name):
                assert_col.append(('set', name))
                self._name = name
            name = property(_get_name, _set_name)

            def _uc_name(self):
                if self._name is None:
                    return None
                return self._name.upper()
            uc_name = property(_uc_name)
            uc_name2 = property(_uc_name)

        m = mapper(User, users)
        mapper(Address, addresses)

        class UCComparator(sa.orm.PropComparator):
            __hash__ = None
            
            def __eq__(self, other):
                cls = self.prop.parent.class_
                col = getattr(cls, 'name')
                if other is None:
                    return col == None
                else:
                    return sa.func.upper(col) == sa.func.upper(other)

        m.add_property('_name', deferred(users.c.name))
        m.add_property('name', synonym('_name'))
        m.add_property('addresses', relationship(Address))
        m.add_property('uc_name', sa.orm.comparable_property(UCComparator))
        m.add_property('uc_name2', sa.orm.comparable_property(
                UCComparator, User.uc_name2))

        sess = create_session(autocommit=False)
        assert sess.query(User).get(7)

        u = sess.query(User).filter_by(name='jack').one()

        def go():
            eq_(len(u.addresses),
                len(self.static.user_address_result[0].addresses))
            eq_(u.name, 'jack')
            eq_(u.uc_name, 'JACK')
            eq_(u.uc_name2, 'JACK')
            eq_(assert_col, [('get', 'jack')], str(assert_col))
        self.sql_count_(2, go)

        u.name = 'ed'
        u3 = User()
        u3.name = 'some user'
        sess.add(u3)
        sess.flush()
        sess.rollback()

    @testing.resolve_artifact_names
    def test_replace_property(self):
        m = mapper(User, users)
        m.add_property('_name',users.c.name)
        m.add_property('name', synonym('_name'))

        sess = create_session()
        u = sess.query(User).filter_by(name='jack').one()
        eq_(u._name, 'jack')
        eq_(u.name, 'jack')
        u.name = 'jacko'
        assert m._columntoproperty[users.c.name] is m.get_property('_name')

        sa.orm.clear_mappers()

        m = mapper(User, users)
        m.add_property('name', synonym('_name', map_column=True))

        sess.expunge_all()
        u = sess.query(User).filter_by(name='jack').one()
        eq_(u._name, 'jack')
        eq_(u.name, 'jack')
        u.name = 'jacko'
        assert m._columntoproperty[users.c.name] is m.get_property('_name')

    @testing.resolve_artifact_names
    def test_synonym_replaces_backref(self):
        assert_calls = []
        class Address(object):
            def _get_user(self):
                assert_calls.append("get")
                return self._user
            def _set_user(self, user):
                assert_calls.append("set")
                self._user = user
            user = property(_get_user, _set_user)

        # synonym is created against nonexistent prop
        mapper(Address, addresses, properties={
            'user':synonym('_user')
        })
        sa.orm.compile_mappers()

        # later, backref sets up the prop
        mapper(User, users, properties={
            'addresses':relationship(Address, backref='_user')
        })

        sess = create_session()
        u1 = sess.query(User).get(7)
        u2 = sess.query(User).get(8)
        # comparaison ops need to work
        a1 = sess.query(Address).filter(Address.user==u1).one()
        eq_(a1.id, 1)
        a1.user = u2
        assert a1.user is u2
        eq_(assert_calls, ["set", "get"])

    @testing.resolve_artifact_names
    def test_self_ref_synonym(self):
        t = Table('nodes', MetaData(),
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('parent_id', Integer, ForeignKey('nodes.id')))

        class Node(object):
            pass

        mapper(Node, t, properties={
            '_children':relationship(Node, backref=backref('_parent', remote_side=t.c.id)),
            'children':synonym('_children'),
            'parent':synonym('_parent')
        })

        n1 = Node()
        n2 = Node()
        n1.children.append(n2)
        assert n2.parent is n2._parent is n1
        assert n1.children[0] is n1._children[0] is n2
        eq_(str(Node.parent == n2), ":param_1 = nodes.parent_id")

    @testing.resolve_artifact_names
    def test_illegal_non_primary(self):
        mapper(User, users)
        mapper(Address, addresses)
        try:
            mapper(User, users, non_primary=True, properties={
                'addresses':relationship(Address)
            }).compile()
            assert False
        except sa.exc.ArgumentError, e:
            assert "Attempting to assign a new relationship 'addresses' to a non-primary mapper on class 'User'" in str(e)

    @testing.resolve_artifact_names
    def test_illegal_non_primary_2(self):
        try:
            mapper(User, users, non_primary=True)
            assert False
        except sa.exc.InvalidRequestError, e:
            assert "Configure a primary mapper first" in str(e)

    @testing.resolve_artifact_names
    def test_prop_filters(self):
        t = Table('person', MetaData(),
                  Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
                  Column('type', String(128)),
                  Column('name', String(128)),
                  Column('employee_number', Integer),
                  Column('boss_id', Integer, ForeignKey('person.id')),
                  Column('vendor_id', Integer))

        class Person(object): pass
        class Vendor(Person): pass
        class Employee(Person): pass
        class Manager(Employee): pass
        class Hoho(object): pass
        class Lala(object): pass

        class HasDef(object):
            def name(self):
                pass
            
        p_m = mapper(Person, t, polymorphic_on=t.c.type,
                     include_properties=('id', 'type', 'name'))
        e_m = mapper(Employee, inherits=p_m, polymorphic_identity='employee',
          properties={
            'boss': relationship(Manager, backref=backref('peon', ), remote_side=t.c.id)
          },
          exclude_properties=('vendor_id',))

        m_m = mapper(Manager, inherits=e_m, polymorphic_identity='manager',
                     include_properties=('id', 'type'))

        v_m = mapper(Vendor, inherits=p_m, polymorphic_identity='vendor',
                     exclude_properties=('boss_id', 'employee_number'))
        h_m = mapper(Hoho, t, include_properties=('id', 'type', 'name'))
        l_m = mapper(Lala, t, exclude_properties=('vendor_id', 'boss_id'),
                     column_prefix="p_")

        hd_m = mapper(HasDef, t, column_prefix="h_")
        
        p_m.compile()
        #sa.orm.compile_mappers()

        def assert_props(cls, want):
            have = set([n for n in dir(cls) if not n.startswith('_')])
            want = set(want)
            eq_(have, want)

        def assert_instrumented(cls, want):
            have = set([p.key for p in class_mapper(cls).iterate_properties])
            want = set(want)
            eq_(have, want)
        
        assert_props(HasDef, ['h_boss_id', 'h_employee_number', 'h_id', 'name', 'h_name', 'h_vendor_id', 'h_type'])    
        assert_props(Person, ['id', 'name', 'type'])
        assert_instrumented(Person, ['id', 'name', 'type'])
        assert_props(Employee, ['boss', 'boss_id', 'employee_number',
                                'id', 'name', 'type'])
        assert_instrumented(Employee,['boss', 'boss_id', 'employee_number',
                                                        'id', 'name', 'type'])
        assert_props(Manager, ['boss', 'boss_id', 'employee_number', 'peon',
                               'id', 'name', 'type'])
                               
        # 'peon' and 'type' are both explicitly stated properties
        assert_instrumented(Manager, ['peon', 'type', 'id'])
        
        assert_props(Vendor, ['vendor_id', 'id', 'name', 'type'])
        assert_props(Hoho, ['id', 'name', 'type'])
        assert_props(Lala, ['p_employee_number', 'p_id', 'p_name', 'p_type'])

        # excluding the discriminator column is currently not allowed
        class Foo(Person):
            pass
        assert_raises(sa.exc.InvalidRequestError, mapper, Foo, inherits=Person, polymorphic_identity='foo', exclude_properties=('type',) )
    
    @testing.resolve_artifact_names
    def test_mapping_to_join(self):
        """Mapping to a join"""
        usersaddresses = sa.join(users, addresses,
                                 users.c.id == addresses.c.user_id)
        mapper(User, usersaddresses, primary_key=[users.c.id])
        l = create_session().query(User).order_by(users.c.id).all()
        eq_(l, self.static.user_result[:3])

    @testing.resolve_artifact_names
    def test_mapping_to_join_no_pk(self):
        m = mapper(Address, addresses.join(email_bounces))
        m.compile()
        assert addresses in m._pks_by_table
        assert email_bounces not in m._pks_by_table

        sess = create_session()
        a = Address(id=10, email_address='e1')
        sess.add(a)
        sess.flush()

        eq_(addresses.count().scalar(), 6)
        eq_(email_bounces.count().scalar(), 5)

    @testing.resolve_artifact_names
    def test_mapping_to_outerjoin(self):
        """Mapping to an outer join with a nullable composite primary key."""


        mapper(User, users.outerjoin(addresses),
               primary_key=[users.c.id, addresses.c.id],
               properties=dict(
            address_id=addresses.c.id))

        session = create_session()
        l = session.query(User).order_by(User.id, User.address_id).all()

        eq_(l, [
            User(id=7, address_id=1),
            User(id=8, address_id=2),
            User(id=8, address_id=3),
            User(id=8, address_id=4),
            User(id=9, address_id=5),
            User(id=10, address_id=None)])

    @testing.resolve_artifact_names
    def test_mapping_to_outerjoin_no_partial_pks(self):
        """test the allow_partial_pks=False flag."""


        mapper(User, users.outerjoin(addresses),
                allow_partial_pks=False,
               primary_key=[users.c.id, addresses.c.id],
               properties=dict(
            address_id=addresses.c.id))

        session = create_session()
        l = session.query(User).order_by(User.id, User.address_id).all()

        eq_(l, [
            User(id=7, address_id=1),
            User(id=8, address_id=2),
            User(id=8, address_id=3),
            User(id=8, address_id=4),
            User(id=9, address_id=5),
            None])

    @testing.resolve_artifact_names
    def test_custom_join(self):
        """select_from totally replace the FROM parameters."""

        mapper(Item, items)

        mapper(Order, orders, properties=dict(
            items=relationship(Item, order_items)))

        mapper(User, users, properties=dict(
            orders=relationship(Order)))

        session = create_session()
        l = (session.query(User).
             select_from(users.join(orders).
                         join(order_items).
                         join(items)).
             filter(items.c.description == 'item 4')).all()

        eq_(l, [self.static.user_result[0]])

    @testing.resolve_artifact_names
    def test_cancel_order_by(self):
        mapper(User, users, order_by=users.c.name.desc())

        assert "order by users.name desc" in str(create_session().query(User).statement).lower()
        assert "order by" not in str(create_session().query(User).order_by(None).statement).lower()
        assert "order by users.name asc" in str(create_session().query(User).order_by(User.name.asc()).statement).lower()

        eq_(
            create_session().query(User).all(),
            [User(id=7, name=u'jack'), User(id=9, name=u'fred'), User(id=8, name=u'ed'), User(id=10, name=u'chuck')]
        )

        eq_(
            create_session().query(User).order_by(User.name).all(),
            [User(id=10, name=u'chuck'), User(id=8, name=u'ed'), User(id=9, name=u'fred'), User(id=7, name=u'jack')]
        )
        
    # 'Raises a "expression evaluation not supported" error at prepare time
    @testing.fails_on('firebird', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_function(self):
        """Mapping to a SELECT statement that has functions in it."""

        s = sa.select([users,
                       (users.c.id * 2).label('concat'),
                       sa.func.count(addresses.c.id).label('count')],
                      users.c.id == addresses.c.user_id,
                      group_by=[c for c in users.c]).alias('myselect')

        mapper(User, s, order_by=s.c.id)
        sess = create_session()
        l = sess.query(User).all()

        for idx, total in enumerate((14, 16)):
            eq_(l[idx].concat, l[idx].id * 2)
            eq_(l[idx].concat, total)

    @testing.resolve_artifact_names
    def test_count(self):
        """The count function on Query."""

        mapper(User, users)

        session = create_session()
        q = session.query(User)

        eq_(q.count(), 4)
        eq_(q.filter(User.id.in_([8,9])).count(), 2)
        eq_(q.filter(users.c.id.in_([8,9])).count(), 2)

        eq_(session.query(User.id).count(), 4)
        eq_(session.query(User.id).filter(User.id.in_((8, 9))).count(), 2)

    @testing.resolve_artifact_names
    def test_many_to_many_count(self):
        mapper(Keyword, keywords)
        mapper(Item, items, properties=dict(
            keywords = relationship(Keyword, item_keywords, lazy='select')))

        session = create_session()
        q = (session.query(Item).
             join('keywords').
             distinct().
             filter(Keyword.name == "red"))
        eq_(q.count(), 2)

    @testing.resolve_artifact_names
    def test_override_1(self):
        """Overriding a column raises an error."""
        def go():
            mapper(User, users,
                   properties=dict(
                       name=relationship(mapper(Address, addresses))))

        assert_raises(sa.exc.ArgumentError, go)

    @testing.resolve_artifact_names
    def test_override_2(self):
        """exclude_properties cancels the error."""
        
        mapper(User, users,
               exclude_properties=['name'],
               properties=dict(
                   name=relationship(mapper(Address, addresses))))
        
        assert bool(User.name)
        
    @testing.resolve_artifact_names
    def test_override_3(self):
        """The column being named elsewhere also cancels the error,"""
        mapper(User, users,
               properties=dict(
                   name=relationship(mapper(Address, addresses)),
                   foo=users.c.name))

    @testing.resolve_artifact_names
    def test_synonym(self):

        assert_col = []
        class extendedproperty(property):
            attribute = 123
            def __getitem__(self, key):
                return 'value'

        class User(object):
            def _get_name(self):
                assert_col.append(('get', self.name))
                return self.name
            def _set_name(self, name):
                assert_col.append(('set', name))
                self.name = name
            uname = extendedproperty(_get_name, _set_name)

        mapper(User, users, properties=dict(
            addresses = relationship(mapper(Address, addresses), lazy='select'),
            uname = synonym('name'),
            adlist = synonym('addresses'),
            adname = synonym('addresses')
        ))
        
        # ensure the synonym can get at the proxied comparators without
        # an explicit compile
        User.name == 'ed'
        User.adname.any()

        assert hasattr(User, 'adlist')
        # as of 0.4.2, synonyms always create a property
        assert hasattr(User, 'adname')

        # test compile
        assert not isinstance(User.uname == 'jack', bool)
        
        assert User.uname.property
        assert User.adlist.property
        
        sess = create_session()
        
        # test RowTuple names
        row = sess.query(User.id, User.uname).first()
        assert row.uname == row[1]
        
        u = sess.query(User).filter(User.uname=='jack').one()

        fixture = self.static.user_address_result[0].addresses
        eq_(u.adlist, fixture)

        addr = sess.query(Address).filter_by(id=fixture[0].id).one()
        u = sess.query(User).filter(User.adname.contains(addr)).one()
        u2 = sess.query(User).filter(User.adlist.contains(addr)).one()

        assert u is u2

        assert u not in sess.dirty
        u.uname = "some user name"
        assert len(assert_col) > 0
        eq_(assert_col, [('set', 'some user name')])
        eq_(u.uname, "some user name")
        eq_(assert_col, [('set', 'some user name'), ('get', 'some user name')])
        eq_(u.name, "some user name")
        assert u in sess.dirty

        eq_(User.uname.attribute, 123)
        eq_(User.uname['key'], 'value')

    @testing.resolve_artifact_names
    def test_synonym_column_location(self):
        def go():
            mapper(User, users, properties={
                'not_name':synonym('_name', map_column=True)})

        assert_raises_message(
            sa.exc.ArgumentError,
            ("Can't compile synonym '_name': no column on table "
             "'users' named 'not_name'"),
            go)

    @testing.resolve_artifact_names
    def test_column_synonyms(self):
        """Synonyms which automatically instrument properties, set up aliased column, etc."""


        assert_col = []
        class User(object):
            def _get_name(self):
                assert_col.append(('get', self._name))
                return self._name
            def _set_name(self, name):
                assert_col.append(('set', name))
                self._name = name
            name = property(_get_name, _set_name)

        mapper(Address, addresses)
        mapper(User, users, properties = {
            'addresses':relationship(Address, lazy='select'),
            'name':synonym('_name', map_column=True)
        })

        # test compile
        assert not isinstance(User.name == 'jack', bool)

        assert hasattr(User, 'name')
        assert hasattr(User, '_name')

        sess = create_session()
        u = sess.query(User).filter(User.name == 'jack').one()
        eq_(u.name, 'jack')
        u.name = 'foo'
        eq_(u.name, 'foo')
        eq_(assert_col, [('get', 'jack'), ('set', 'foo'), ('get', 'foo')])

    @testing.resolve_artifact_names
    def test_synonym_map_column_conflict(self):
        assert_raises(
            sa.exc.ArgumentError,
            mapper,
            User, users, properties=util.OrderedDict([
                ('_user_id', users.c.id),
                ('id', synonym('_user_id', map_column=True)),
            ])
        )

        assert_raises(
            sa.exc.ArgumentError,
            mapper,
            User, users, properties=util.OrderedDict([
                ('id', synonym('_user_id', map_column=True)),
                ('_user_id', users.c.id),
            ])
        )

    @testing.resolve_artifact_names
    def test_comparable(self):
        class extendedproperty(property):
            attribute = 123
            
            def method1(self):
                return "method1"
            
            def __getitem__(self, key):
                return 'value'

        class UCComparator(sa.orm.PropComparator):
            __hash__ = None
            
            def method1(self):
                return "uccmethod1"
                
            def method2(self, other):
                return "method2"
                
            def __eq__(self, other):
                cls = self.prop.parent.class_
                col = getattr(cls, 'name')
                if other is None:
                    return col == None
                else:
                    return sa.func.upper(col) == sa.func.upper(other)

        def map_(with_explicit_property):
            class User(object):
                @extendedproperty
                def uc_name(self):
                    if self.name is None:
                        return None
                    return self.name.upper()
            if with_explicit_property:
                args = (UCComparator, User.uc_name)
            else:
                args = (UCComparator,)

            mapper(User, users, properties=dict(
                    uc_name = sa.orm.comparable_property(*args)))
            return User

        for User in (map_(True), map_(False)):
            sess = create_session()
            sess.begin()
            q = sess.query(User)

            assert hasattr(User, 'name')
            assert hasattr(User, 'uc_name')

            eq_(User.uc_name.method1(), "method1")
            eq_(User.uc_name.method2('x'), "method2")

            assert_raises_message(
                AttributeError, 
                "Neither 'extendedproperty' object nor 'UCComparator' object has an attribute 'nonexistent'", 
                getattr, User.uc_name, 'nonexistent')
            
            # test compile
            assert not isinstance(User.uc_name == 'jack', bool)
            u = q.filter(User.uc_name=='JACK').one()

            assert u.uc_name == "JACK"
            assert u not in sess.dirty

            u.name = "some user name"
            eq_(u.name, "some user name")
            assert u in sess.dirty
            eq_(u.uc_name, "SOME USER NAME")

            sess.flush()
            sess.expunge_all()

            q = sess.query(User)
            u2 = q.filter(User.name=='some user name').one()
            u3 = q.filter(User.uc_name=='SOME USER NAME').one()

            assert u2 is u3

            eq_(User.uc_name.attribute, 123)
            eq_(User.uc_name['key'], 'value')
            sess.rollback()

    @testing.resolve_artifact_names
    def test_comparable_column(self):
        class MyComparator(sa.orm.properties.ColumnProperty.Comparator):
            __hash__ = None
            def __eq__(self, other):
                # lower case comparison
                return func.lower(self.__clause_element__()) == func.lower(other)
                
            def intersects(self, other):
                # non-standard comparator
                return self.__clause_element__().op('&=')(other)
                
        mapper(User, users, properties={
            'name':sa.orm.column_property(users.c.name, comparator_factory=MyComparator)
        })
        
        assert_raises_message(
            AttributeError, 
            "Neither 'InstrumentedAttribute' object nor 'MyComparator' object has an attribute 'nonexistent'", 
            getattr, User.name, "nonexistent")

        eq_(str((User.name == 'ed').compile(dialect=sa.engine.default.DefaultDialect())) , "lower(users.name) = lower(:lower_1)")
        eq_(str((User.name.intersects('ed')).compile(dialect=sa.engine.default.DefaultDialect())), "users.name &= :name_1")
        

    @testing.resolve_artifact_names
    def test_reentrant_compile(self):
        class MyFakeProperty(sa.orm.properties.ColumnProperty):
            def post_instrument_class(self, mapper):
                super(MyFakeProperty, self).post_instrument_class(mapper)
                m2.compile()
            
        m1 = mapper(User, users, properties={
            'name':MyFakeProperty(users.c.name)
        })
        m2 = mapper(Address, addresses)
        compile_mappers()

        sa.orm.clear_mappers()
        class MyFakeProperty(sa.orm.properties.ColumnProperty):
            def post_instrument_class(self, mapper):
                super(MyFakeProperty, self).post_instrument_class(mapper)
                m1.compile()
            
        m1 = mapper(User, users, properties={
            'name':MyFakeProperty(users.c.name)
        })
        m2 = mapper(Address, addresses)
        compile_mappers()
        
    @testing.resolve_artifact_names
    def test_reconstructor(self):
        recon = []

        class User(object):
            @reconstructor
            def reconstruct(self):
                recon.append('go')

        mapper(User, users)

        User()
        eq_(recon, [])
        create_session().query(User).first()
        eq_(recon, ['go'])

    @testing.resolve_artifact_names
    def test_reconstructor_inheritance(self):
        recon = []
        class A(object):
            @reconstructor
            def reconstruct(self):
                recon.append('A')

        class B(A):
            @reconstructor
            def reconstruct(self):
                recon.append('B')

        class C(A):
            @reconstructor
            def reconstruct(self):
                recon.append('C')

        mapper(A, users, polymorphic_on=users.c.name,
               polymorphic_identity='jack')
        mapper(B, inherits=A, polymorphic_identity='ed')
        mapper(C, inherits=A, polymorphic_identity='chuck')

        A()
        B()
        C()
        eq_(recon, [])

        sess = create_session()
        sess.query(A).first()
        sess.query(B).first()
        sess.query(C).first()
        eq_(recon, ['A', 'B', 'C'])

    @testing.resolve_artifact_names
    def test_unmapped_reconstructor_inheritance(self):
        recon = []
        class Base(object):
            @reconstructor
            def reconstruct(self):
                recon.append('go')

        class User(Base):
            pass

        mapper(User, users)

        User()
        eq_(recon, [])

        create_session().query(User).first()
        eq_(recon, ['go'])

    @testing.resolve_artifact_names
    def test_unmapped_error(self):
        mapper(Address, addresses)
        sa.orm.clear_mappers()

        mapper(User, users, properties={
            'addresses':relationship(Address)
        })

        assert_raises(sa.orm.exc.UnmappedClassError, sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_oldstyle_mixin(self):
        class OldStyle:
            pass
        class NewStyle(object):
            pass

        class A(NewStyle, OldStyle):
            pass

        mapper(A, users)

        class B(OldStyle, NewStyle):
            pass

        mapper(B, users)

class DocumentTest(testing.TestBase):
        
    def test_doc_propagate(self):
        metadata = MetaData()
        t1 = Table('t1', metadata,
            Column('col1', Integer, primary_key=True, doc="primary key column"),
            Column('col2', String, doc="data col"),
            Column('col3', String, doc="data col 2"),
            Column('col4', String, doc="data col 3"),
            Column('col5', String),
        )
        t2 = Table('t2', metadata,
            Column('col1', Integer, primary_key=True, doc="primary key column"),
            Column('col2', String, doc="data col"),
            Column('col3', Integer, ForeignKey('t1.col1'), doc="foreign key to t1.col1")
        )

        class Foo(object):
            pass
        
        class Bar(object):
            pass
            
        mapper(Foo, t1, properties={
            'bars':relationship(Bar, 
                                    doc="bar relationship", 
                                    backref=backref('foo',doc='foo relationship')
                                ),
            'foober':column_property(t1.c.col3, doc='alternate data col'),
            'hoho':synonym(t1.c.col4, doc="syn of col4")
        })
        mapper(Bar, t2)
        compile_mappers()
        eq_(Foo.col1.__doc__, "primary key column")
        eq_(Foo.col2.__doc__, "data col")
        eq_(Foo.col5.__doc__, None)
        eq_(Foo.foober.__doc__, "alternate data col")
        eq_(Foo.bars.__doc__, "bar relationship")
        eq_(Foo.hoho.__doc__, "syn of col4")
        eq_(Bar.col1.__doc__, "primary key column")
        eq_(Bar.foo.__doc__, "foo relationship")
        
        
        
class OptionsTest(_fixtures.FixtureTest):

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_synonym_options(self):
        mapper(User, users, properties=dict(
            addresses = relationship(mapper(Address, addresses), lazy='select',
                                 order_by=addresses.c.id),
            adlist = synonym('addresses')))


        def go():
            sess = create_session()
            u = (sess.query(User).
                 order_by(User.id).
                 options(sa.orm.joinedload('adlist')).
                 filter_by(name='jack')).one()
            eq_(u.adlist,
                [self.static.user_address_result[0].addresses[0]])
        self.assert_sql_count(testing.db, go, 1)

    @testing.resolve_artifact_names
    def test_eager_options(self):
        """A lazy relationship can be upgraded to an eager relationship."""
        mapper(User, users, properties=dict(
            addresses = relationship(mapper(Address, addresses),
                                 order_by=addresses.c.id)))

        sess = create_session()
        l = (sess.query(User).
             order_by(User.id).
             options(sa.orm.joinedload('addresses'))).all()

        def go():
            eq_(l, self.static.user_address_result)
        self.sql_count_(0, go)

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_eager_options_with_limit(self):
        mapper(User, users, properties=dict(
            addresses=relationship(mapper(Address, addresses), lazy='select')))

        sess = create_session()
        u = (sess.query(User).
             options(sa.orm.joinedload('addresses')).
             filter_by(id=8)).one()

        def go():
            eq_(u.id, 8)
            eq_(len(u.addresses), 3)
        self.sql_count_(0, go)

        sess.expunge_all()

        u = sess.query(User).filter_by(id=8).one()
        eq_(u.id, 8)
        eq_(len(u.addresses), 3)

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_lazy_options_with_limit(self):
        mapper(User, users, properties=dict(
            addresses = relationship(mapper(Address, addresses), lazy='joined')))

        sess = create_session()
        u = (sess.query(User).
             options(sa.orm.lazyload('addresses')).
             filter_by(id=8)).one()

        def go():
            eq_(u.id, 8)
            eq_(len(u.addresses), 3)
        self.sql_count_(1, go)

    @testing.resolve_artifact_names
    def test_eager_degrade(self):
        """An eager relationship automatically degrades to a lazy relationship if eager columns are not available"""
        mapper(User, users, properties=dict(
            addresses = relationship(mapper(Address, addresses), lazy='joined')))

        sess = create_session()
        # first test straight eager load, 1 statement
        def go():
            l = sess.query(User).order_by(User.id).all()
            eq_(l, self.static.user_address_result)
        self.sql_count_(1, go)

        sess.expunge_all()

        # then select just from users.  run it into instances.
        # then assert the data, which will launch 3 more lazy loads
        # (previous users in session fell out of scope and were removed from
        # session's identity map)
        r = users.select().order_by(users.c.id).execute()
        def go():
            l = list(sess.query(User).instances(r))
            eq_(l, self.static.user_address_result)
        self.sql_count_(4, go)

    @testing.resolve_artifact_names
    def test_eager_degrade_deep(self):
        # test with a deeper set of eager loads.  when we first load the three
        # users, they will have no addresses or orders.  the number of lazy
        # loads when traversing the whole thing will be three for the
        # addresses and three for the orders.
        mapper(Address, addresses)

        mapper(Keyword, keywords)

        mapper(Item, items, properties=dict(
            keywords=relationship(Keyword, secondary=item_keywords,
                              lazy='joined',
                              order_by=item_keywords.c.keyword_id)))

        mapper(Order, orders, properties=dict(
            items=relationship(Item, secondary=order_items, lazy='joined',
                           order_by=order_items.c.item_id)))

        mapper(User, users, properties=dict(
            addresses=relationship(Address, lazy='joined',
                               order_by=addresses.c.id),
            orders=relationship(Order, lazy='joined',
                            order_by=orders.c.id)))

        sess = create_session()

        # first test straight eager load, 1 statement
        def go():
            l = sess.query(User).order_by(User.id).all()
            eq_(l, self.static.user_all_result)
        self.assert_sql_count(testing.db, go, 1)

        sess.expunge_all()

        # then select just from users.  run it into instances.
        # then assert the data, which will launch 6 more lazy loads
        r = users.select().execute()
        def go():
            l = list(sess.query(User).instances(r))
            eq_(l, self.static.user_all_result)
        self.assert_sql_count(testing.db, go, 6)

    @testing.resolve_artifact_names
    def test_lazy_options(self):
        """An eager relationship can be upgraded to a lazy relationship."""
        mapper(User, users, properties=dict(
            addresses = relationship(mapper(Address, addresses), lazy='joined')
        ))

        sess = create_session()
        l = (sess.query(User).
             order_by(User.id).
             options(sa.orm.lazyload('addresses'))).all()

        def go():
            eq_(l, self.static.user_address_result)
        self.sql_count_(4, go)

    @testing.resolve_artifact_names
    def test_option_propagate(self):
        mapper(User, users, properties=dict(
            orders = relationship(Order)
        ))
        mapper(Order, orders, properties=dict(
            items = relationship(Item, secondary=order_items)
        ))
        mapper(Item, items)
        
        sess = create_session()
        
        oalias = aliased(Order)
        opt1 = sa.orm.joinedload(User.orders, Order.items)
        opt2a, opt2b = sa.orm.contains_eager(User.orders, Order.items, alias=oalias)
        u1 = sess.query(User).join((oalias, User.orders)).options(opt1, opt2a, opt2b).first()
        ustate = attributes.instance_state(u1)
        assert opt1 in ustate.load_options
        assert opt2a not in ustate.load_options
        assert opt2b not in ustate.load_options
        
        import pickle
        pickle.dumps(u1)

class DeepOptionsTest(_fixtures.FixtureTest):
    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        mapper(Keyword, keywords)

        mapper(Item, items, properties=dict(
            keywords=relationship(Keyword, item_keywords,
                              order_by=item_keywords.c.item_id)))

        mapper(Order, orders, properties=dict(
            items=relationship(Item, order_items,
                           order_by=items.c.id)))

        mapper(User, users, order_by=users.c.id, properties=dict(
            orders=relationship(Order, order_by=orders.c.id)))

    @testing.resolve_artifact_names
    def test_deep_options_1(self):
        sess = create_session()

        # joinedload nothing.
        u = sess.query(User).all()
        def go():
            x = u[0].orders[1].items[0].keywords[1]
        self.assert_sql_count(testing.db, go, 3)

    @testing.resolve_artifact_names
    def test_deep_options_2(self):
        """test (joined|subquery)load_all() options"""
        
        sess = create_session()

        l = (sess.query(User).
              options(sa.orm.joinedload_all('orders.items.keywords'))).all()
        def go():
            x = l[0].orders[1].items[0].keywords[1]
        self.sql_count_(0, go)

        sess = create_session()

        l = (sess.query(User).
              options(sa.orm.subqueryload_all('orders.items.keywords'))).all()
        def go():
            x = l[0].orders[1].items[0].keywords[1]
        self.sql_count_(0, go)


    @testing.resolve_artifact_names
    def test_deep_options_3(self):
        sess = create_session()

        # same thing, with separate options calls
        q2 = (sess.query(User).
              options(sa.orm.joinedload('orders')).
              options(sa.orm.joinedload('orders.items')).
              options(sa.orm.joinedload('orders.items.keywords')))
        u = q2.all()
        def go():
            x = u[0].orders[1].items[0].keywords[1]
        self.sql_count_(0, go)

    @testing.resolve_artifact_names
    def test_deep_options_4(self):
        sess = create_session()

        assert_raises_message(
            sa.exc.ArgumentError,
            r"Can't find entity Mapper\|Order\|orders in Query.  "
            r"Current list: \['Mapper\|User\|users'\]",
            sess.query(User).options, sa.orm.joinedload(Order.items))

        # joinedload "keywords" on items.  it will lazy load "orders", then
        # lazy load the "items" on the order, but on "items" it will eager
        # load the "keywords"
        q3 = sess.query(User).options(sa.orm.joinedload('orders.items.keywords'))
        u = q3.all()
        def go():
            x = u[0].orders[1].items[0].keywords[1]
        self.sql_count_(2, go)

        sess = create_session()
        q3 = sess.query(User).options(
                    sa.orm.joinedload(User.orders, Order.items, Item.keywords))
        u = q3.all()
        def go():
            x = u[0].orders[1].items[0].keywords[1]
        self.sql_count_(2, go)

class ValidatorTest(_fixtures.FixtureTest):
    @testing.resolve_artifact_names
    def test_scalar(self):
        class User(_base.ComparableEntity):
            @validates('name')
            def validate_name(self, key, name):
                assert name != 'fred'
                return name + ' modified'
                
        mapper(User, users)
        sess = create_session()
        u1 = User(name='ed')
        eq_(u1.name, 'ed modified')
        assert_raises(AssertionError, setattr, u1, "name", "fred")
        eq_(u1.name, 'ed modified')
        sess.add(u1)
        sess.flush()
        sess.expunge_all()
        eq_(sess.query(User).filter_by(name='ed modified').one(), User(name='ed'))
        

    @testing.resolve_artifact_names
    def test_collection(self):
        class User(_base.ComparableEntity):
            @validates('addresses')
            def validate_address(self, key, ad):
                assert '@' in ad.email_address
                return ad
                
        mapper(User, users, properties={'addresses':relationship(Address)})
        mapper(Address, addresses)
        sess = create_session()
        u1 = User(name='edward')
        assert_raises(AssertionError, u1.addresses.append, Address(email_address='noemail'))
        u1.addresses.append(Address(id=15, email_address='foo@bar.com'))
        sess.add(u1)
        sess.flush()
        sess.expunge_all()
        eq_(
            sess.query(User).filter_by(name='edward').one(), 
            User(name='edward', addresses=[Address(email_address='foo@bar.com')])
        )

class ComparatorFactoryTest(_fixtures.FixtureTest, AssertsCompiledSQL):
    @testing.resolve_artifact_names
    def test_kwarg_accepted(self):
        class DummyComposite(object):
            def __init__(self, x, y):
                pass
        
        from sqlalchemy.orm.interfaces import PropComparator
        
        class MyFactory(PropComparator):
            pass
            
        for args in (
            (column_property, users.c.name),
            (deferred, users.c.name),
            (synonym, 'name'),
            (composite, DummyComposite, users.c.id, users.c.name),
            (relationship, Address),
            (backref, 'address'),
            (comparable_property, ),
            (dynamic_loader, Address)
        ):
            fn = args[0]
            args = args[1:]
            fn(comparator_factory=MyFactory, *args)
        
    @testing.resolve_artifact_names
    def test_column(self):
        from sqlalchemy.orm.properties import ColumnProperty
        
        class MyFactory(ColumnProperty.Comparator):
            __hash__ = None
            def __eq__(self, other):
                return func.foobar(self.__clause_element__()) == func.foobar(other)
        mapper(User, users, properties={'name':column_property(users.c.name, comparator_factory=MyFactory)})
        self.assert_compile(User.name == 'ed', "foobar(users.name) = foobar(:foobar_1)", dialect=default.DefaultDialect())
        self.assert_compile(aliased(User).name == 'ed', "foobar(users_1.name) = foobar(:foobar_1)", dialect=default.DefaultDialect())

    @testing.resolve_artifact_names
    def test_synonym(self):
        from sqlalchemy.orm.properties import ColumnProperty
        class MyFactory(ColumnProperty.Comparator):
            __hash__ = None
            def __eq__(self, other):
                return func.foobar(self.__clause_element__()) == func.foobar(other)
        mapper(User, users, properties={'name':synonym('_name', map_column=True, comparator_factory=MyFactory)})
        self.assert_compile(User.name == 'ed', "foobar(users.name) = foobar(:foobar_1)", dialect=default.DefaultDialect())
        self.assert_compile(aliased(User).name == 'ed', "foobar(users_1.name) = foobar(:foobar_1)", dialect=default.DefaultDialect())

    @testing.resolve_artifact_names
    def test_relationship(self):
        from sqlalchemy.orm.properties import PropertyLoader

        class MyFactory(PropertyLoader.Comparator):
            __hash__ = None
            def __eq__(self, other):
                return func.foobar(self.__clause_element__().c.user_id) == func.foobar(other.id)

        class MyFactory2(PropertyLoader.Comparator):
            __hash__ = None
            def __eq__(self, other):
                return func.foobar(self.__clause_element__().c.id) == func.foobar(other.user_id)
                
        mapper(User, users)
        mapper(Address, addresses, properties={
            'user':relationship(User, comparator_factory=MyFactory, 
                backref=backref("addresses", comparator_factory=MyFactory2)
            )
            }
        )
        self.assert_compile(Address.user == User(id=5), "foobar(addresses.user_id) = foobar(:foobar_1)", dialect=default.DefaultDialect())
        self.assert_compile(User.addresses == Address(id=5, user_id=7), "foobar(users.id) = foobar(:foobar_1)", dialect=default.DefaultDialect())

        self.assert_compile(aliased(Address).user == User(id=5), "foobar(addresses_1.user_id) = foobar(:foobar_1)", dialect=default.DefaultDialect())
        self.assert_compile(aliased(User).addresses == Address(id=5, user_id=7), "foobar(users_1.id) = foobar(:foobar_1)", dialect=default.DefaultDialect())

    
class DeferredTest(_fixtures.FixtureTest):

    @testing.resolve_artifact_names
    def test_basic(self):
        """A basic deferred load."""

        mapper(Order, orders, order_by=orders.c.id, properties={
            'description': deferred(orders.c.description)})

        o = Order()
        self.assert_(o.description is None)

        q = create_session().query(Order)
        def go():
            l = q.all()
            o2 = l[2]
            x = o2.description

        self.sql_eq_(go, [
            ("SELECT orders.id AS orders_id, "
             "orders.user_id AS orders_user_id, "
             "orders.address_id AS orders_address_id, "
             "orders.isopen AS orders_isopen "
             "FROM orders ORDER BY orders.id", {}),
            ("SELECT orders.description AS orders_description "
             "FROM orders WHERE orders.id = :param_1",
             {'param_1':3})])

    @testing.resolve_artifact_names
    def test_unsaved(self):
        """Deferred loading does not kick in when just PK cols are set."""

        mapper(Order, orders, properties={
            'description': deferred(orders.c.description)})

        sess = create_session()
        o = Order()
        sess.add(o)
        o.id = 7
        def go():
            o.description = "some description"
        self.sql_count_(0, go)

    @testing.resolve_artifact_names
    def test_synonym_group_bug(self):
        mapper(Order, orders, properties={
            'isopen':synonym('_isopen', map_column=True),
            'description':deferred(orders.c.description, group='foo')
        })
        
        sess = create_session()
        o1 = sess.query(Order).get(1)
        eq_(o1.description, "order 1")

    @testing.resolve_artifact_names
    def test_unsaved_2(self):
        mapper(Order, orders, properties={
            'description': deferred(orders.c.description)})

        sess = create_session()
        o = Order()
        sess.add(o)
        def go():
            o.description = "some description"
        self.sql_count_(0, go)

    @testing.resolve_artifact_names
    def test_unsaved_group(self):
        """Deferred loading doesnt kick in when just PK cols are set"""

        mapper(Order, orders, order_by=orders.c.id, properties=dict(
            description=deferred(orders.c.description, group='primary'),
            opened=deferred(orders.c.isopen, group='primary')))

        sess = create_session()
        o = Order()
        sess.add(o)
        o.id = 7
        def go():
            o.description = "some description"
        self.sql_count_(0, go)

    @testing.resolve_artifact_names
    def test_unsaved_group_2(self):
        mapper(Order, orders, order_by=orders.c.id, properties=dict(
            description=deferred(orders.c.description, group='primary'),
            opened=deferred(orders.c.isopen, group='primary')))

        sess = create_session()
        o = Order()
        sess.add(o)
        def go():
            o.description = "some description"
        self.sql_count_(0, go)

    @testing.resolve_artifact_names
    def test_save(self):
        m = mapper(Order, orders, properties={
            'description': deferred(orders.c.description)})

        sess = create_session()
        o2 = sess.query(Order).get(2)
        o2.isopen = 1
        sess.flush()

    @testing.resolve_artifact_names
    def test_group(self):
        """Deferred load with a group"""
        mapper(Order, orders, properties=util.OrderedDict([
            ('userident', deferred(orders.c.user_id, group='primary')),
            ('addrident', deferred(orders.c.address_id, group='primary')),
            ('description', deferred(orders.c.description, group='primary')),
            ('opened', deferred(orders.c.isopen, group='primary'))
        ]))

        sess = create_session()
        q = sess.query(Order).order_by(Order.id)
        def go():
            l = q.all()
            o2 = l[2]
            eq_(o2.opened, 1)
            eq_(o2.userident, 7)
            eq_(o2.description, 'order 3')

        self.sql_eq_(go, [
            ("SELECT orders.id AS orders_id "
             "FROM orders ORDER BY orders.id", {}),
            ("SELECT orders.user_id AS orders_user_id, "
             "orders.address_id AS orders_address_id, "
             "orders.description AS orders_description, "
             "orders.isopen AS orders_isopen "
             "FROM orders WHERE orders.id = :param_1",
             {'param_1':3})])

        o2 = q.all()[2]
        eq_(o2.description, 'order 3')
        assert o2 not in sess.dirty
        o2.description = 'order 3'
        def go():
            sess.flush()
        self.sql_count_(0, go)

    @testing.resolve_artifact_names
    def test_preserve_changes(self):
        """A deferred load operation doesn't revert modifications on attributes"""
        mapper(Order, orders, properties = {
            'userident': deferred(orders.c.user_id, group='primary'),
            'description': deferred(orders.c.description, group='primary'),
            'opened': deferred(orders.c.isopen, group='primary')
        })
        sess = create_session()
        o = sess.query(Order).get(3)
        assert 'userident' not in o.__dict__
        o.description = 'somenewdescription'
        eq_(o.description, 'somenewdescription')
        def go():
            eq_(o.opened, 1)
        self.assert_sql_count(testing.db, go, 1)
        eq_(o.description, 'somenewdescription')
        assert o in sess.dirty

    @testing.resolve_artifact_names
    def test_commits_state(self):
        """
        When deferred elements are loaded via a group, they get the proper
        CommittedState and don't result in changes being committed

        """
        mapper(Order, orders, properties = {
            'userident':deferred(orders.c.user_id, group='primary'),
            'description':deferred(orders.c.description, group='primary'),
            'opened':deferred(orders.c.isopen, group='primary')})

        sess = create_session()
        o2 = sess.query(Order).get(3)

        # this will load the group of attributes
        eq_(o2.description, 'order 3')
        assert o2 not in sess.dirty
        # this will mark it as 'dirty', but nothing actually changed
        o2.description = 'order 3'
        # therefore the flush() shouldnt actually issue any SQL
        self.assert_sql_count(testing.db, sess.flush, 0)

    @testing.resolve_artifact_names
    def test_options(self):
        """Options on a mapper to create deferred and undeferred columns"""

        mapper(Order, orders)

        sess = create_session()
        q = sess.query(Order).order_by(Order.id).options(defer('user_id'))

        def go():
            q.all()[0].user_id
                
        self.sql_eq_(go, [
            ("SELECT orders.id AS orders_id, "
             "orders.address_id AS orders_address_id, "
             "orders.description AS orders_description, "
             "orders.isopen AS orders_isopen "
             "FROM orders ORDER BY orders.id", {}),
            ("SELECT orders.user_id AS orders_user_id "
             "FROM orders WHERE orders.id = :param_1",
             {'param_1':1})])
        sess.expunge_all()

        q2 = q.options(sa.orm.undefer('user_id'))
        self.sql_eq_(q2.all, [
            ("SELECT orders.id AS orders_id, "
             "orders.user_id AS orders_user_id, "
             "orders.address_id AS orders_address_id, "
             "orders.description AS orders_description, "
             "orders.isopen AS orders_isopen "
             "FROM orders ORDER BY orders.id",
             {})])

    @testing.resolve_artifact_names
    def test_undefer_group(self):
        mapper(Order, orders, properties=util.OrderedDict([
            ('userident',deferred(orders.c.user_id, group='primary')),
            ('description',deferred(orders.c.description, group='primary')),
            ('opened',deferred(orders.c.isopen, group='primary'))
            ]
            ))

        sess = create_session()
        q = sess.query(Order).order_by(Order.id)
        def go():
            l = q.options(sa.orm.undefer_group('primary')).all()
            o2 = l[2]
            eq_(o2.opened, 1)
            eq_(o2.userident, 7)
            eq_(o2.description, 'order 3')

        self.sql_eq_(go, [
            ("SELECT orders.user_id AS orders_user_id, "
             "orders.description AS orders_description, "
             "orders.isopen AS orders_isopen, "
             "orders.id AS orders_id, "
             "orders.address_id AS orders_address_id "
             "FROM orders ORDER BY orders.id",
             {})])

    @testing.resolve_artifact_names
    def test_locates_col(self):
        """Manually adding a column to the result undefers the column."""

        mapper(Order, orders, properties={
            'description':deferred(orders.c.description)})

        sess = create_session()
        o1 = sess.query(Order).order_by(Order.id).first()
        def go():
            eq_(o1.description, 'order 1')
        self.sql_count_(1, go)

        sess = create_session()
        o1 = (sess.query(Order).
              order_by(Order.id).
              add_column(orders.c.description).first())[0]
        def go():
            eq_(o1.description, 'order 1')
        self.sql_count_(0, go)

    @testing.resolve_artifact_names
    def test_deep_options(self):
        mapper(Item, items, properties=dict(
            description=deferred(items.c.description)))
        mapper(Order, orders, properties=dict(
            items=relationship(Item, secondary=order_items)))
        mapper(User, users, properties=dict(
            orders=relationship(Order, order_by=orders.c.id)))

        sess = create_session()
        q = sess.query(User).order_by(User.id)
        l = q.all()
        item = l[0].orders[1].items[1]
        def go():
            eq_(item.description, 'item 4')
        self.sql_count_(1, go)
        eq_(item.description, 'item 4')

        sess.expunge_all()
        l = q.options(sa.orm.undefer('orders.items.description')).all()
        item = l[0].orders[1].items[1]
        def go():
            eq_(item.description, 'item 4')
        self.sql_count_(0, go)
        eq_(item.description, 'item 4')


class SecondaryOptionsTest(_base.MappedTest):
    """test that the contains_eager() option doesn't bleed into a secondary load."""

    run_inserts = 'once'

    run_deletes = None
    
    @classmethod
    def define_tables(cls, metadata):
        Table("base", metadata, 
            Column('id', Integer, primary_key=True),
            Column('type', String(50), nullable=False)
        )
        Table("child1", metadata,
            Column('id', Integer, ForeignKey('base.id'), primary_key=True),
            Column('child2id', Integer, ForeignKey('child2.id'), nullable=False)
        )
        Table("child2", metadata,
            Column('id', Integer, ForeignKey('base.id'), primary_key=True),
        )
        Table('related', metadata,
            Column('id', Integer, ForeignKey('base.id'), primary_key=True),
        )
        
    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        class Base(_base.ComparableEntity):
            pass
        class Child1(Base):
            pass
        class Child2(Base):
            pass
        class Related(_base.ComparableEntity):
            pass
        mapper(Base, base, polymorphic_on=base.c.type, properties={
            'related':relationship(Related, uselist=False)
        })
        mapper(Child1, child1, inherits=Base, polymorphic_identity='child1', properties={
            'child2':relationship(Child2, primaryjoin=child1.c.child2id==base.c.id, foreign_keys=child1.c.child2id)
        })
        mapper(Child2, child2, inherits=Base, polymorphic_identity='child2')
        mapper(Related, related)
        
    @classmethod
    @testing.resolve_artifact_names
    def insert_data(cls):
        base.insert().execute([
            {'id':1, 'type':'child1'},
            {'id':2, 'type':'child1'},
            {'id':3, 'type':'child1'},
            {'id':4, 'type':'child2'},
            {'id':5, 'type':'child2'},
            {'id':6, 'type':'child2'},
        ])
        child2.insert().execute([
            {'id':4},
            {'id':5},
            {'id':6},
        ])
        child1.insert().execute([
            {'id':1, 'child2id':4},
            {'id':2, 'child2id':5},
            {'id':3, 'child2id':6},
        ])
        related.insert().execute([
            {'id':1},
            {'id':2},
            {'id':3},
            {'id':4},
            {'id':5},
            {'id':6},
        ])
        
    @testing.resolve_artifact_names
    def test_contains_eager(self):
        sess = create_session()
        
        
        child1s = sess.query(Child1).join(Child1.related).options(sa.orm.contains_eager(Child1.related)).order_by(Child1.id)

        def go():
            eq_(
                child1s.all(),
                [Child1(id=1, related=Related(id=1)), Child1(id=2, related=Related(id=2)), Child1(id=3, related=Related(id=3))]
            )
        self.assert_sql_count(testing.db, go, 1)
        
        c1 = child1s[0]
 
        self.assert_sql_execution(
            testing.db, 
            lambda: c1.child2, 
            CompiledSQL(
                "SELECT base.id AS base_id, child2.id AS child2_id, base.type AS base_type "
                "FROM base JOIN child2 ON base.id = child2.id "
                "WHERE base.id = :param_1",
                {'param_1':4}
            )
        )

    @testing.resolve_artifact_names
    def test_joinedload_on_other(self):
        sess = create_session()

        child1s = sess.query(Child1).join(Child1.related).options(sa.orm.joinedload(Child1.related)).order_by(Child1.id)

        def go():
            eq_(
                child1s.all(),
                [Child1(id=1, related=Related(id=1)), Child1(id=2, related=Related(id=2)), Child1(id=3, related=Related(id=3))]
            )
        self.assert_sql_count(testing.db, go, 1)
        
        c1 = child1s[0]

        self.assert_sql_execution(
            testing.db, 
            lambda: c1.child2, 
            CompiledSQL(
            "SELECT base.id AS base_id, child2.id AS child2_id, base.type AS base_type "
            "FROM base JOIN child2 ON base.id = child2.id WHERE base.id = :param_1",

#   joinedload- this shouldn't happen
#            "SELECT base.id AS base_id, child2.id AS child2_id, base.type AS base_type, "
#            "related_1.id AS related_1_id FROM base JOIN child2 ON base.id = child2.id "
#            "LEFT OUTER JOIN related AS related_1 ON base.id = related_1.id WHERE base.id = :param_1",
                {'param_1':4}
            )
        )

    @testing.resolve_artifact_names
    def test_joinedload_on_same(self):
        sess = create_session()

        child1s = sess.query(Child1).join(Child1.related).options(sa.orm.joinedload(Child1.child2, Child2.related)).order_by(Child1.id)

        def go():
            eq_(
                child1s.all(),
                [Child1(id=1, related=Related(id=1)), Child1(id=2, related=Related(id=2)), Child1(id=3, related=Related(id=3))]
            )
        self.assert_sql_count(testing.db, go, 4)
        
        c1 = child1s[0]

        # this *does* joinedload
        self.assert_sql_execution(
            testing.db, 
            lambda: c1.child2, 
            CompiledSQL(
                "SELECT base.id AS base_id, child2.id AS child2_id, base.type AS base_type, "
                "related_1.id AS related_1_id FROM base JOIN child2 ON base.id = child2.id "
                "LEFT OUTER JOIN related AS related_1 ON base.id = related_1.id WHERE base.id = :param_1",
                {'param_1':4}
            )
        )
        

class DeferredPopulationTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table("thing", metadata,
            Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
            Column("name", String(20)))

        Table("human", metadata,
            Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
            Column("thing_id", Integer, ForeignKey("thing.id")),
            Column("name", String(20)))

    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        class Human(_base.BasicEntity): pass
        class Thing(_base.BasicEntity): pass

        mapper(Human, human, properties={"thing": relationship(Thing)})
        mapper(Thing, thing, properties={"name": deferred(thing.c.name)})
    
    @classmethod
    @testing.resolve_artifact_names
    def insert_data(cls):
        thing.insert().execute([
            {"id": 1, "name": "Chair"},
        ])

        human.insert().execute([
            {"id": 1, "thing_id": 1, "name": "Clark Kent"},
        ])
        
    def _test(self, thing):
        assert "name" in attributes.instance_state(thing).dict

    @testing.resolve_artifact_names
    def test_no_previous_query(self):
        session = create_session()
        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
        self._test(thing)

    @testing.resolve_artifact_names
    def test_query_twice_with_clear(self):
        session = create_session()
        result = session.query(Thing).first()
        session.expunge_all()
        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
        self._test(thing)

    @testing.resolve_artifact_names
    def test_query_twice_no_clear(self):
        session = create_session()
        result = session.query(Thing).first()
        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
        self._test(thing)
    
    @testing.resolve_artifact_names
    def test_joinedload_with_clear(self):
        session = create_session()
        human = session.query(Human).options(sa.orm.joinedload("thing")).first()
        session.expunge_all()
        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
        self._test(thing)

    @testing.resolve_artifact_names
    def test_joinedload_no_clear(self):
        session = create_session()
        human = session.query(Human).options(sa.orm.joinedload("thing")).first()
        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
        self._test(thing)

    @testing.resolve_artifact_names
    def test_join_with_clear(self):
        session = create_session()
        result = session.query(Human).add_entity(Thing).join("thing").first()
        session.expunge_all()
        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
        self._test(thing)

    @testing.resolve_artifact_names
    def test_join_no_clear(self):
        session = create_session()
        result = session.query(Human).add_entity(Thing).join("thing").first()
        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
        self._test(thing)
    
        
class CompositeTypesTest(_base.MappedTest):

    @classmethod
    def define_tables(cls, metadata):
        Table('graphs', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('version_id', Integer, primary_key=True, nullable=True),
            Column('name', String(30)))

        Table('edges', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('graph_id', Integer, nullable=False),
            Column('graph_version_id', Integer, nullable=False),
            Column('x1', Integer),
            Column('y1', Integer),
            Column('x2', Integer),
            Column('y2', Integer),
            sa.ForeignKeyConstraint(
            ['graph_id', 'graph_version_id'],
            ['graphs.id', 'graphs.version_id']))

        Table('foobars', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('x1', Integer, default=2),
            Column('x2', Integer),
            Column('x3', Integer, default=15),
            Column('x4', Integer)
        )
        
    @testing.resolve_artifact_names
    def test_basic(self):
        class Point(object):
            def __init__(self, x, y):
                self.x = x
                self.y = y
            def __composite_values__(self):
                return [self.x, self.y]
            __hash__ = None
            def __eq__(self, other):
                return isinstance(other, Point) and other.x == self.x and other.y == self.y
            def __ne__(self, other):
                return not isinstance(other, Point) or not self.__eq__(other)

        class Graph(object):
            pass
        class Edge(object):
            def __init__(self, start, end):
                self.start = start
                self.end = end

        mapper(Graph, graphs, properties={
            'edges':relationship(Edge)
        })
        mapper(Edge, edges, properties={
            'start':sa.orm.composite(Point, edges.c.x1, edges.c.y1),
            'end': sa.orm.composite(Point, edges.c.x2, edges.c.y2)
        })

        sess = create_session()
        g = Graph()
        g.id = 1
        g.version_id=1
        g.edges.append(Edge(Point(3, 4), Point(5, 6)))
        g.edges.append(Edge(Point(14, 5), Point(2, 7)))
        sess.add(g)
        sess.flush()

        sess.expunge_all()
        g2 = sess.query(Graph).get([g.id, g.version_id])
        for e1, e2 in zip(g.edges, g2.edges):
            eq_(e1.start, e2.start)
            eq_(e1.end, e2.end)

        g2.edges[1].end = Point(18, 4)
        sess.flush()
        sess.expunge_all()
        e = sess.query(Edge).get(g2.edges[1].id)
        eq_(e.end, Point(18, 4))

        e.end.x = 19
        e.end.y = 5
        sess.flush()
        sess.expunge_all()
        eq_(sess.query(Edge).get(g2.edges[1].id).end, Point(19, 5))

        g.edges[1].end = Point(19, 5)

        sess.expunge_all()
        def go():
            g2 = (sess.query(Graph).
                  options(sa.orm.joinedload('edges'))).get([g.id, g.version_id])
            for e1, e2 in zip(g.edges, g2.edges):
                eq_(e1.start, e2.start)
                eq_(e1.end, e2.end)
        self.assert_sql_count(testing.db, go, 1)

        # test comparison of CompositeProperties to their object instances
        g = sess.query(Graph).get([1, 1])
        assert sess.query(Edge).filter(Edge.start==Point(3, 4)).one() is g.edges[0]

        assert sess.query(Edge).filter(Edge.start!=Point(3, 4)).first() is g.edges[1]

        eq_(sess.query(Edge).filter(Edge.start==None).all(), [])

        # query by columns
        eq_(sess.query(Edge.start, Edge.end).all(), [(3, 4, 5, 6), (14, 5, 19, 5)])

        e = g.edges[1]
        e.end.x = e.end.y = None
        sess.flush()
        eq_(sess.query(Edge.start, Edge.end).all(), [(3, 4, 5, 6), (14, 5, None, None)])


    @testing.resolve_artifact_names
    def test_pk(self):
        """Using a composite type as a primary key"""

        class Version(object):
            def __init__(self, id, version):
                self.id = id
                self.version = version
            def __composite_values__(self):
                return (self.id, self.version)
            __hash__ = None
            def __eq__(self, other):
                return other.id == self.id and other.version == self.version
            def __ne__(self, other):
                return not self.__eq__(other)

        class Graph(object):
            def __init__(self, version):
                self.version = version

        mapper(Graph, graphs, properties={
            'version':sa.orm.composite(Version, graphs.c.id,
                                       graphs.c.version_id)})

        sess = create_session()
        g = Graph(Version(1, 1))
        sess.add(g)
        sess.flush()

        sess.expunge_all()
        g2 = sess.query(Graph).get([1, 1])
        eq_(g.version, g2.version)
        sess.expunge_all()

        g2 = sess.query(Graph).get(Version(1, 1))
        eq_(g.version, g2.version)

        # test pk mutation
        @testing.fails_on('mssql', 'Cannot update identity columns.')
        def update_pk():
            g2.version = Version(2, 1)
            sess.flush()
            g3 = sess.query(Graph).get(Version(2, 1))
            eq_(g2.version, g3.version)
        update_pk()

        # test pk with one column NULL
        # TODO: can't seem to get NULL in for a PK value
        # in either mysql or postgresql, autoincrement=False etc.
        # notwithstanding
        @testing.fails_on_everything_except("sqlite")
        def go():
            g = Graph(Version(2, None))
            sess.add(g)
            sess.flush()
            sess.expunge_all()
            g2 = sess.query(Graph).filter_by(version=Version(2, None)).one()
            eq_(g.version, g2.version)
        go()
        
    @testing.resolve_artifact_names
    def test_attributes_with_defaults(self):
        class Foobar(object):
            pass

        class FBComposite(object):
            def __init__(self, x1, x2, x3, x4):
                self.x1 = x1
                self.x2 = x2
                self.x3 = x3
                self.x4 = x4
            def __composite_values__(self):
                return self.x1, self.x2, self.x3, self.x4
            __hash__ = None
            def __eq__(self, other):
                return other.x1 == self.x1 and other.x2 == self.x2 and other.x3 == self.x3 and other.x4 == self.x4
            def __ne__(self, other):
                return not self.__eq__(other)

        mapper(Foobar, foobars, properties=dict(
            foob=sa.orm.composite(FBComposite, foobars.c.x1, foobars.c.x2, foobars.c.x3, foobars.c.x4)
        ))

        sess = create_session()
        f1 = Foobar()
        f1.foob = FBComposite(None, 5, None, None)
        sess.add(f1)
        sess.flush()

        assert f1.foob == FBComposite(2, 5, 15, None)
        
        
        f2 = Foobar()
        sess.add(f2)
        sess.flush()
        assert f2.foob == FBComposite(2, None, 15, None)
        
    
    @testing.resolve_artifact_names
    def test_set_composite_values(self):
        class Foobar(object):
            pass
        
        class FBComposite(object):
            def __init__(self, x1, x2, x3, x4):
                self.x1val = x1
                self.x2val = x2
                self.x3 = x3
                self.x4 = x4
            def __composite_values__(self):
                return self.x1val, self.x2val, self.x3, self.x4
            def __set_composite_values__(self, x1, x2, x3, x4):
                self.x1val = x1
                self.x2val = x2
                self.x3 = x3
                self.x4 = x4
            __hash__ = None
            def __eq__(self, other):
                return other.x1val == self.x1val and other.x2val == self.x2val and other.x3 == self.x3 and other.x4 == self.x4
            def __ne__(self, other):
                return not self.__eq__(other)
        
        mapper(Foobar, foobars, properties=dict(
            foob=sa.orm.composite(FBComposite, foobars.c.x1, foobars.c.x2, foobars.c.x3, foobars.c.x4)
        ))
        
        sess = create_session()
        f1 = Foobar()
        f1.foob = FBComposite(None, 5, None, None)
        sess.add(f1)
        sess.flush()
        
        assert f1.foob == FBComposite(2, 5, 15, None)
    
    @testing.resolve_artifact_names
    def test_save_null(self):
        """test saving a null composite value
        
        See google groups thread for more context:
        http://groups.google.com/group/sqlalchemy/browse_thread/thread/0c6580a1761b2c29
        
        """
        class Point(object):
            def __init__(self, x, y):
                self.x = x
                self.y = y
            def __composite_values__(self):
                return [self.x, self.y]
            __hash__ = None
            def __eq__(self, other):
                return other.x == self.x and other.y == self.y
            def __ne__(self, other):
                return not self.__eq__(other)

        class Graph(object):
            pass
        class Edge(object):
            def __init__(self, start, end):
                self.start = start
                self.end = end

        mapper(Graph, graphs, properties={
            'edges':relationship(Edge)
        })
        mapper(Edge, edges, properties={
            'start':sa.orm.composite(Point, edges.c.x1, edges.c.y1),
            'end':sa.orm.composite(Point, edges.c.x2, edges.c.y2)
        })

        sess = create_session()
        g = Graph()
        g.id = 1
        g.version_id=1
        e = Edge(None, None)
        g.edges.append(e)
        
        sess.add(g)
        sess.flush()
        
        sess.expunge_all()
        
        g2 = sess.query(Graph).get([1, 1])
        assert g2.edges[-1].start.x is None
        assert g2.edges[-1].start.y is None


class NoLoadTest(_fixtures.FixtureTest):
    run_inserts = 'once'
    run_deletes = None

    @testing.resolve_artifact_names
    def test_basic(self):
        """A basic one-to-many lazy load"""
        m = mapper(User, users, properties=dict(
            addresses = relationship(mapper(Address, addresses), lazy='noload')
        ))
        q = create_session().query(m)
        l = [None]
        def go():
            x = q.filter(User.id == 7).all()
            x[0].addresses
            l[0] = x
        self.assert_sql_count(testing.db, go, 1)

        self.assert_result(l[0], User,
            {'id' : 7, 'addresses' : (Address, [])},
            )

    @testing.resolve_artifact_names
    def test_options(self):
        m = mapper(User, users, properties=dict(
            addresses = relationship(mapper(Address, addresses), lazy='noload')
        ))
        q = create_session().query(m).options(sa.orm.lazyload('addresses'))
        l = [None]
        def go():
            x = q.filter(User.id == 7).all()
            x[0].addresses
            l[0] = x
        self.sql_count_(2, go)

        self.assert_result(l[0], User,
            {'id' : 7, 'addresses' : (Address, [{'id' : 1}])},
            )


class AttributeExtensionTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('t1', 
            metadata,
            Column('id', Integer, primary_key=True),
            Column('type', String(40)),
            Column('data', String(50))
            
        )

    @testing.resolve_artifact_names
    def test_cascading_extensions(self):
        ext_msg = []
        
        class Ex1(sa.orm.AttributeExtension):
            def set(self, state, value, oldvalue, initiator):
                ext_msg.append("Ex1 %r" % value)
                return "ex1" + value
                
        class Ex2(sa.orm.AttributeExtension):
            def set(self, state, value, oldvalue, initiator):
                ext_msg.append("Ex2 %r" % value)
                return "ex2" + value
        
        class A(_base.BasicEntity):
            pass
        class B(A):
            pass
        class C(B):
            pass
            
        mapper(A, t1, polymorphic_on=t1.c.type, polymorphic_identity='a', properties={
            'data':column_property(t1.c.data, extension=Ex1())
        })
        mapper(B, polymorphic_identity='b', inherits=A)
        mc = mapper(C, polymorphic_identity='c', inherits=B, properties={
            'data':column_property(t1.c.data, extension=Ex2())
        })
        
        a1 = A(data='a1')
        b1 = B(data='b1')
        c1 = C(data='c1')
        
        eq_(a1.data, 'ex1a1')
        eq_(b1.data, 'ex1b1')
        eq_(c1.data, 'ex2c1')
        
        a1.data = 'a2'
        b1.data='b2'
        c1.data = 'c2'
        eq_(a1.data, 'ex1a2')
        eq_(b1.data, 'ex1b2')
        eq_(c1.data, 'ex2c2')
        
        eq_(ext_msg, ["Ex1 'a1'", "Ex1 'b1'", "Ex2 'c1'", "Ex1 'a2'", "Ex1 'b2'", "Ex2 'c2'"])
        
    
class MapperExtensionTest(_fixtures.FixtureTest):
    run_inserts = None
    
    def extension(self):
        methods = []

        class Ext(sa.orm.MapperExtension):
            def instrument_class(self, mapper, cls):
                methods.append('instrument_class')
                return sa.orm.EXT_CONTINUE

            def init_instance(self, mapper, class_, oldinit, instance, args, kwargs):
                methods.append('init_instance')
                return sa.orm.EXT_CONTINUE

            def init_failed(self, mapper, class_, oldinit, instance, args, kwargs):
                methods.append('init_failed')
                return sa.orm.EXT_CONTINUE

            def translate_row(self, mapper, context, row):
                methods.append('translate_row')
                return sa.orm.EXT_CONTINUE

            def create_instance(self, mapper, selectcontext, row, class_):
                methods.append('create_instance')
                return sa.orm.EXT_CONTINUE

            def reconstruct_instance(self, mapper, instance):
                methods.append('reconstruct_instance')
                return sa.orm.EXT_CONTINUE

            def append_result(self, mapper, selectcontext, row, instance, result, **flags):
                methods.append('append_result')
                return sa.orm.EXT_CONTINUE

            def populate_instance(self, mapper, selectcontext, row, instance, **flags):
                methods.append('populate_instance')
                return sa.orm.EXT_CONTINUE

            def before_insert(self, mapper, connection, instance):
                methods.append('before_insert')
                return sa.orm.EXT_CONTINUE

            def after_insert(self, mapper, connection, instance):
                methods.append('after_insert')
                return sa.orm.EXT_CONTINUE

            def before_update(self, mapper, connection, instance):
                methods.append('before_update')
                return sa.orm.EXT_CONTINUE

            def after_update(self, mapper, connection, instance):
                methods.append('after_update')
                return sa.orm.EXT_CONTINUE

            def before_delete(self, mapper, connection, instance):
                methods.append('before_delete')
                return sa.orm.EXT_CONTINUE

            def after_delete(self, mapper, connection, instance):
                methods.append('after_delete')
                return sa.orm.EXT_CONTINUE

        return Ext, methods

    @testing.resolve_artifact_names
    def test_basic(self):
        """test that common user-defined methods get called."""
        Ext, methods = self.extension()

        mapper(User, users, extension=Ext())
        sess = create_session()
        u = User(name='u1')
        sess.add(u)
        sess.flush()
        u = sess.query(User).populate_existing().get(u.id)
        sess.expunge_all()
        u = sess.query(User).get(u.id)
        u.name = 'u1 changed'
        sess.flush()
        sess.delete(u)
        sess.flush()
        eq_(methods,
            ['instrument_class', 'init_instance', 'before_insert',
             'after_insert', 'translate_row', 'populate_instance',
             'append_result', 'translate_row', 'create_instance',
             'populate_instance', 'reconstruct_instance', 'append_result',
             'before_update', 'after_update', 'before_delete', 'after_delete'])

    @testing.resolve_artifact_names
    def test_inheritance(self):
        Ext, methods = self.extension()

        class AdminUser(User):
            pass

        mapper(User, users, extension=Ext())
        mapper(AdminUser, addresses, inherits=User)

        sess = create_session()
        am = AdminUser(name='au1', email_address='au1@e1')
        sess.add(am)
        sess.flush()
        am = sess.query(AdminUser).populate_existing().get(am.id)
        sess.expunge_all()
        am = sess.query(AdminUser).get(am.id)
        am.name = 'au1 changed'
        sess.flush()
        sess.delete(am)
        sess.flush()
        eq_(methods,
            ['instrument_class', 'instrument_class', 'init_instance',
             'before_insert', 'after_insert', 'translate_row',
             'populate_instance', 'append_result', 'translate_row',
             'create_instance', 'populate_instance', 'reconstruct_instance',
             'append_result', 'before_update', 'after_update', 'before_delete',
             'after_delete'])

    @testing.resolve_artifact_names
    def test_after_with_no_changes(self):
        """after_update is called even if no columns were updated."""

        Ext, methods = self.extension()

        mapper(Item, items, extension=Ext() , properties={
            'keywords': relationship(Keyword, secondary=item_keywords)})
        mapper(Keyword, keywords, extension=Ext())

        sess = create_session()
        i1 = Item(description="i1")
        k1 = Keyword(name="k1")
        sess.add(i1)
        sess.add(k1)
        sess.flush()
        eq_(methods,
            ['instrument_class', 'instrument_class', 'init_instance',
             'init_instance', 'before_insert', 'after_insert',
             'before_insert', 'after_insert'])

        del methods[:]
        i1.keywords.append(k1)
        sess.flush()
        eq_(methods, ['before_update', 'after_update'])


    @testing.resolve_artifact_names
    def test_inheritance_with_dupes(self):
        """Inheritance with the same extension instance on both mappers."""
        Ext, methods = self.extension()

        class AdminUser(User):
            pass

        ext = Ext()
        mapper(User, users, extension=ext)
        mapper(AdminUser, addresses, inherits=User, extension=ext)

        sess = create_session()
        am = AdminUser(name="au1", email_address="au1@e1")
        sess.add(am)
        sess.flush()
        am = sess.query(AdminUser).populate_existing().get(am.id)
        sess.expunge_all()
        am = sess.query(AdminUser).get(am.id)
        am.name = 'au1 changed'
        sess.flush()
        sess.delete(am)
        sess.flush()
        eq_(methods,
            ['instrument_class', 'instrument_class', 'init_instance',
             'before_insert', 'after_insert', 'translate_row',
             'populate_instance', 'append_result', 'translate_row',
             'create_instance', 'populate_instance', 'reconstruct_instance',
             'append_result', 'before_update', 'after_update', 'before_delete',
             'after_delete'])
        
    @testing.resolve_artifact_names
    def test_create_instance(self):
        class CreateUserExt(sa.orm.MapperExtension):
            def create_instance(self, mapper, selectcontext, row, class_):
                return User.__new__(User)
                
        mapper(User, users, extension=CreateUserExt())
        sess = create_session()
        u1 = User()
        u1.name = 'ed'
        sess.add(u1)
        sess.flush()
        sess.expunge_all()
        assert sess.query(User).first()
        

class RequirementsTest(_base.MappedTest):
    """Tests the contract for user classes."""

    @classmethod
    def define_tables(cls, metadata):
        Table('ht1', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('value', String(10)))
        Table('ht2', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('ht1_id', Integer, ForeignKey('ht1.id')),
              Column('value', String(10)))
        Table('ht3', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('value', String(10)))
        Table('ht4', metadata,
              Column('ht1_id', Integer, ForeignKey('ht1.id'), primary_key=True),
              Column('ht3_id', Integer, ForeignKey('ht3.id'), primary_key=True))
        Table('ht5', metadata,
              Column('ht1_id', Integer, ForeignKey('ht1.id'), primary_key=True))
        Table('ht6', metadata,
              Column('ht1a_id', Integer, ForeignKey('ht1.id'), primary_key=True),
              Column('ht1b_id', Integer, ForeignKey('ht1.id'), primary_key=True),
              Column('value', String(10)))

    # Py2K
    @testing.resolve_artifact_names
    def test_baseclass(self):
        class OldStyle:
            pass

        assert_raises(sa.exc.ArgumentError, mapper, OldStyle, ht1)

        assert_raises(sa.exc.ArgumentError, mapper, 123)
        
        class NoWeakrefSupport(str):
            pass

        # TODO: is weakref support detectable without an instance?
        #self.assertRaises(sa.exc.ArgumentError, mapper, NoWeakrefSupport, t2)
    # end Py2K
    
    @testing.resolve_artifact_names
    def test_comparison_overrides(self):
        """Simple tests to ensure users can supply comparison __methods__.

        The suite-level test --options are better suited to detect
        problems- they add selected __methods__ across the board on all
        ORM tests.  This test simply shoves a variety of operations
        through the ORM to catch basic regressions early in a standard
        test run.
        """

        # adding these methods directly to each class to avoid decoration
        # by the testlib decorators.
        class _Base(object):
            def __init__(self, value='abc'):
                self.value = value
            def __nonzero__(self):
                return False
            def __hash__(self):
                return hash(self.value)
            def __eq__(self, other):
                if isinstance(other, type(self)):
                    return self.value == other.value
                return False

        class H1(_Base):
            pass
        class H2(_Base):
            pass
        class H3(_Base):
            pass
        class H6(_Base):
            pass

        mapper(H1, ht1, properties={
            'h2s': relationship(H2, backref='h1'),
            'h3s': relationship(H3, secondary=ht4, backref='h1s'),
            'h1s': relationship(H1, secondary=ht5, backref='parent_h1'),
            't6a': relationship(H6, backref='h1a',
                                primaryjoin=ht1.c.id==ht6.c.ht1a_id),
            't6b': relationship(H6, backref='h1b',
                                primaryjoin=ht1.c.id==ht6.c.ht1b_id),
            })
        mapper(H2, ht2)
        mapper(H3, ht3)
        mapper(H6, ht6)

        s = create_session()
        for i in range(3):
            h1 = H1()
            s.add(h1)

        h1.h2s.append(H2())
        h1.h3s.extend([H3(), H3()])
        h1.h1s.append(H1())

        s.flush()
        eq_(ht1.count().scalar(), 4)

        h6 = H6()
        h6.h1a = h1
        h6.h1b = h1

        h6 = H6()
        h6.h1a = h1
        h6.h1b = x = H1()
        assert x in s

        h6.h1b.h2s.append(H2())

        s.flush()

        h1.h2s.extend([H2(), H2()])
        s.flush()

        h1s = s.query(H1).options(sa.orm.joinedload('h2s')).all()
        eq_(len(h1s), 5)

        self.assert_unordered_result(h1s, H1,
                                     {'h2s': []},
                                     {'h2s': []},
                                     {'h2s': (H2, [{'value': 'abc'},
                                                   {'value': 'abc'},
                                                   {'value': 'abc'}])},
                                     {'h2s': []},
                                     {'h2s': (H2, [{'value': 'abc'}])})

        h1s = s.query(H1).options(sa.orm.joinedload('h3s')).all()

        eq_(len(h1s), 5)
        h1s = s.query(H1).options(sa.orm.joinedload_all('t6a.h1b'),
                                  sa.orm.joinedload('h2s'),
                                  sa.orm.joinedload_all('h3s.h1s')).all()
        eq_(len(h1s), 5)

    @testing.resolve_artifact_names
    def test_nonzero_len_recursion(self):
        class H1(object):
            def __len__(self):
                return len(self.get_value())
            
            def get_value(self):
                self.value = "foobar"
                return self.value

        class H2(object):
            def __nonzero__(self):
                return bool(self.get_value())

            def get_value(self):
                self.value = "foobar"
                return self.value
                
        mapper(H1, ht1)
        mapper(H2, ht1)
        
        h1 = H1()
        h1.value = "Asdf"
        h1.value = "asdf asdf" # ding

        h2 = H2()
        h2.value = "Asdf"
        h2.value = "asdf asdf" # ding
        
class MagicNamesTest(_base.MappedTest):

    @classmethod
    def define_tables(cls, metadata):
        Table('cartographers', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('name', String(50)),
              Column('alias', String(50)),
              Column('quip', String(100)))
        Table('maps', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('cart_id', Integer,
                     ForeignKey('cartographers.id')),
              Column('state', String(2)),
              Column('data', sa.Text))

    @classmethod
    def setup_classes(cls):
        class Cartographer(_base.BasicEntity):
            pass

        class Map(_base.BasicEntity):
            pass

    @testing.resolve_artifact_names
    def test_mappish(self):
        mapper(Cartographer, cartographers, properties=dict(
            query=cartographers.c.quip))
        mapper(Map, maps, properties=dict(
            mapper=relationship(Cartographer, backref='maps')))

        c = Cartographer(name='Lenny', alias='The Dude',
                         query='Where be dragons?')
        m = Map(state='AK', mapper=c)

        sess = create_session()
        sess.add(c)
        sess.flush()
        sess.expunge_all()
    
        for C, M in ((Cartographer, Map),
                     (sa.orm.aliased(Cartographer), sa.orm.aliased(Map))):
            c1 = (sess.query(C).
                  filter(C.alias=='The Dude').
                  filter(C.query=='Where be dragons?')).one()
            m1 = sess.query(M).filter(M.mapper==c1).one()

    @testing.resolve_artifact_names
    def test_direct_stateish(self):
        for reserved in (sa.orm.attributes.ClassManager.STATE_ATTR,
                         sa.orm.attributes.ClassManager.MANAGER_ATTR):
            t = Table('t', sa.MetaData(),
                      Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
                      Column(reserved, Integer))
            class T(object):
                pass

            assert_raises_message(
                KeyError,
                ('%r: requested attribute name conflicts with '
                 'instrumentation attribute of the same name.' % reserved),
                mapper, T, t)

    @testing.resolve_artifact_names
    def test_indirect_stateish(self):
        for reserved in (sa.orm.attributes.ClassManager.STATE_ATTR,
                         sa.orm.attributes.ClassManager.MANAGER_ATTR):
            class M(object):
                pass

            assert_raises_message(
                KeyError,
                ('requested attribute name conflicts with '
                 'instrumentation attribute of the same name'),
                mapper, M, maps, properties={
                  reserved: maps.c.state})



www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.