test_relationships.py :  » Database » SQLAlchemy » SQLAlchemy-0.6.0 » test » orm » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » SQLAlchemy 
SQLAlchemy » SQLAlchemy 0.6.0 » test » orm » test_relationships.py
from sqlalchemy.test.testing import assert_raises,assert_raises_message
import datetime
import sqlalchemy as sa
from sqlalchemy.test import testing
from sqlalchemy import Integer,String,ForeignKey,MetaData,and_
from sqlalchemy.test.schema import Table,Column
from sqlalchemy.orm import mapper,relationship,relation,\
                    backref, create_session, compile_mappers, clear_mappers, sessionmaker
from sqlalchemy.test.testing import eq_,startswith_
from test.orm import _base,_fixtures


class RelationshipTest(_base.MappedTest):
    """An extended topological sort test

    This is essentially an extension of the "dependency.py" topological sort
    test.  In this test, a table is dependent on two other tables that are
    otherwise unrelated to each other.  The dependency sort must ensure that
    this childmost table is below both parent tables in the outcome (a bug
    existed where this was not always the case).

    While the straight topological sort tests should expose this, since the
    sorting can be different due to subtle differences in program execution,
    this test case was exposing the bug whereas the simpler tests were not.

    """

    run_setup_mappers = 'once'
    run_inserts = 'once'
    run_deletes = None

    @classmethod
    def define_tables(cls, metadata):
        Table("tbl_a", metadata,
            Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
            Column("name", String(128)))
        Table("tbl_b", metadata,
            Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
            Column("name", String(128)))
        Table("tbl_c", metadata,
            Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
            Column("tbl_a_id", Integer, ForeignKey("tbl_a.id"), nullable=False),
            Column("name", String(128)))
        Table("tbl_d", metadata,
            Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
            Column("tbl_c_id", Integer, ForeignKey("tbl_c.id"), nullable=False),
            Column("tbl_b_id", Integer, ForeignKey("tbl_b.id")),
            Column("name", String(128)))

    @classmethod
    def setup_classes(cls):
        class A(_base.Entity):
            pass
        class B(_base.Entity):
            pass
        class C(_base.Entity):
            pass
        class D(_base.Entity):
            pass

    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        mapper(A, tbl_a, properties=dict(
            c_rows=relationship(C, cascade="all, delete-orphan", backref="a_row")))
        mapper(B, tbl_b)
        mapper(C, tbl_c, properties=dict(
            d_rows=relationship(D, cascade="all, delete-orphan", backref="c_row")))
        mapper(D, tbl_d, properties=dict(
            b_row=relationship(B)))

    @classmethod
    @testing.resolve_artifact_names
    def insert_data(cls):
        session = create_session()
        a = A(name='a1')
        b = B(name='b1')
        c = C(name='c1', a_row=a)

        d1 = D(name='d1', b_row=b, c_row=c)
        d2 = D(name='d2', b_row=b, c_row=c)
        d3 = D(name='d3', b_row=b, c_row=c)
        session.add(a)
        session.add(b)
        session.flush()

    @testing.resolve_artifact_names
    def testDeleteRootTable(self):
        session = create_session()
        a = session.query(A).filter_by(name='a1').one()

        session.delete(a)
        session.flush()

    @testing.resolve_artifact_names
    def testDeleteMiddleTable(self):
        session = create_session()
        c = session.query(C).filter_by(name='c1').one()

        session.delete(c)
        session.flush()


class RelationshipTest2(_base.MappedTest):
    """The ultimate relationship() test:
    
    company         employee
    ----------      ----------
    company_id <--- company_id ------+
    name                ^            |
                        +------------+
                      
                    emp_id <---------+
                    name             |
                    reports_to_id ---+
    
    employee joins to its sub-employees
    both on reports_to_id, *and on company_id to itself*.
    
    As of 0.5.5 we are making a slight behavioral change,
    such that the custom foreign_keys setting
    on the o2m side has to be explicitly 
    unset on the backref m2o side - this to suit
    the vast majority of use cases where the backref()
    is to receive the same foreign_keys argument 
    as the forwards reference.   But we also
    have smartened the remote_side logic such that 
    you don't even need the custom fks setting.
    
    """

    @classmethod
    def define_tables(cls, metadata):
        Table('company_t', metadata,
              Column('company_id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('name', sa.Unicode(30)))

        Table('employee_t', metadata,
              Column('company_id', Integer, primary_key=True),
              Column('emp_id', Integer, primary_key=True),
              Column('name', sa.Unicode(30)),
              Column('reports_to_id', Integer),
              sa.ForeignKeyConstraint(
                  ['company_id'],
                  ['company_t.company_id']),
              sa.ForeignKeyConstraint(
                  ['company_id', 'reports_to_id'],
                  ['employee_t.company_id', 'employee_t.emp_id']))

    @classmethod
    def setup_classes(cls):
        class Company(_base.Entity):
            pass

        class Employee(_base.Entity):
            def __init__(self, name, company, emp_id, reports_to=None):
                self.name = name
                self.company = company
                self.emp_id = emp_id
                self.reports_to = reports_to
        
    @testing.resolve_artifact_names
    def test_explicit(self):
        mapper(Company, company_t)
        mapper(Employee, employee_t, properties= {
            'company':relationship(Company, primaryjoin=employee_t.c.company_id==company_t.c.company_id, backref='employees'),
            'reports_to':relationship(Employee, primaryjoin=
                sa.and_(
                    employee_t.c.emp_id==employee_t.c.reports_to_id,
                    employee_t.c.company_id==employee_t.c.company_id
                ),
                remote_side=[employee_t.c.emp_id, employee_t.c.company_id],
                foreign_keys=[employee_t.c.reports_to_id],
                backref=backref('employees', foreign_keys=None))
        })

        self._test()

    @testing.resolve_artifact_names
    def test_implicit(self):
        mapper(Company, company_t)
        mapper(Employee, employee_t, properties= {
            'company':relationship(Company, backref='employees'),
            'reports_to':relationship(Employee,
                remote_side=[employee_t.c.emp_id, employee_t.c.company_id],
                foreign_keys=[employee_t.c.reports_to_id],
                backref=backref('employees', foreign_keys=None)
                )
        })

        self._test()

    @testing.resolve_artifact_names
    def test_very_implicit(self):
        mapper(Company, company_t)
        mapper(Employee, employee_t, properties= {
            'company':relationship(Company, backref='employees'),
            'reports_to':relationship(Employee,
                remote_side=[employee_t.c.emp_id, employee_t.c.company_id],
                backref='employees'
                )
        })

        self._test()
    
    @testing.resolve_artifact_names
    def test_very_explicit(self):
        mapper(Company, company_t)
        mapper(Employee, employee_t, properties= {
            'company':relationship(Company, backref='employees'),
            'reports_to':relationship(Employee,
                _local_remote_pairs = [
                        (employee_t.c.reports_to_id, employee_t.c.emp_id),
                        (employee_t.c.company_id, employee_t.c.company_id)
                ],
                foreign_keys=[employee_t.c.reports_to_id],
                backref=backref('employees', foreign_keys=None)
                )
        })

        self._test()
        
    @testing.resolve_artifact_names
    def _test(self):
        sess = create_session()
        c1 = Company()
        c2 = Company()

        e1 = Employee(u'emp1', c1, 1)
        e2 = Employee(u'emp2', c1, 2, e1)
        e3 = Employee(u'emp3', c1, 3, e1)
        e4 = Employee(u'emp4', c1, 4, e3)
        e5 = Employee(u'emp5', c2, 1)
        e6 = Employee(u'emp6', c2, 2, e5)
        e7 = Employee(u'emp7', c2, 3, e5)

        sess.add_all((c1, c2))
        sess.flush()
        sess.expunge_all()

        test_c1 = sess.query(Company).get(c1.company_id)
        test_e1 = sess.query(Employee).get([c1.company_id, e1.emp_id])
        assert test_e1.name == 'emp1', test_e1.name
        test_e5 = sess.query(Employee).get([c2.company_id, e5.emp_id])
        assert test_e5.name == 'emp5', test_e5.name
        assert [x.name for x in test_e1.employees] == ['emp2', 'emp3']
        assert sess.query(Employee).get([c1.company_id, 3]).reports_to.name == 'emp1'
        assert sess.query(Employee).get([c2.company_id, 3]).reports_to.name == 'emp5'

class RelationshipTest3(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table("jobs", metadata,
              Column("jobno", sa.Unicode(15), primary_key=True),
              Column("created", sa.DateTime, nullable=False,
                     default=datetime.datetime.now),
              Column("deleted", sa.Boolean, nullable=False, default=False))

        Table("pageversions", metadata,
              Column("jobno", sa.Unicode(15), primary_key=True),
              Column("pagename", sa.Unicode(30), primary_key=True),
              Column("version", Integer, primary_key=True, default=1),
              Column("created", sa.DateTime, nullable=False,
                     default=datetime.datetime.now),
              Column("md5sum", String(32)),
              Column("width", Integer, nullable=False, default=0),
              Column("height", Integer, nullable=False, default=0),
              sa.ForeignKeyConstraint(
                  ["jobno", "pagename"],
                  ["pages.jobno", "pages.pagename"]))

        Table("pages", metadata,
              Column("jobno", sa.Unicode(15), ForeignKey("jobs.jobno"),
                     primary_key=True),
              Column("pagename", sa.Unicode(30), primary_key=True),
              Column("created", sa.DateTime, nullable=False,
                     default=datetime.datetime.now),
              Column("deleted", sa.Boolean, nullable=False, default=False),
              Column("current_version", Integer))

        Table("pagecomments", metadata,
              Column("jobno", sa.Unicode(15), primary_key=True),
              Column("pagename", sa.Unicode(30), primary_key=True),
              Column("comment_id", Integer, primary_key=True,
                     autoincrement=False),
              Column("content", sa.UnicodeText),
              sa.ForeignKeyConstraint(
                  ["jobno", "pagename"],
                  ["pages.jobno", "pages.pagename"]))

    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        class Job(_base.Entity):
            def create_page(self, pagename):
                return Page(job=self, pagename=pagename)
        class PageVersion(_base.Entity):
            def __init__(self, page=None, version=None):
                self.page = page
                self.version = version
        class Page(_base.Entity):
            def __init__(self, job=None, pagename=None):
                self.job = job
                self.pagename = pagename
                self.currentversion = PageVersion(self, 1)
            def add_version(self):
                self.currentversion = PageVersion(
                    page=self, version=self.currentversion.version+1)
                comment = self.add_comment()
                comment.closeable = False
                comment.content = u'some content'
                return self.currentversion
            def add_comment(self):
                nextnum = max([-1] + [c.comment_id for c in self.comments]) + 1
                newcomment = PageComment()
                newcomment.comment_id = nextnum
                self.comments.append(newcomment)
                newcomment.created_version = self.currentversion.version
                return newcomment
        class PageComment(_base.Entity):
            pass

        mapper(Job, jobs)
        mapper(PageVersion, pageversions)
        mapper(Page, pages, properties={
            'job': relationship(
                 Job,
                 backref=backref('pages',
                                 cascade="all, delete-orphan",
                                 order_by=pages.c.pagename)),
            'currentversion': relationship(
                 PageVersion,
                 uselist=False,
                 primaryjoin=sa.and_(
                     pages.c.jobno==pageversions.c.jobno,
                     pages.c.pagename==pageversions.c.pagename,
                     pages.c.current_version==pageversions.c.version),
                 post_update=True),
            'versions': relationship(
                 PageVersion,
                 cascade="all, delete-orphan",
                 primaryjoin=sa.and_(pages.c.jobno==pageversions.c.jobno,
                                     pages.c.pagename==pageversions.c.pagename),
                 order_by=pageversions.c.version,
                 backref=backref('page',lazy='joined')
                )})
        mapper(PageComment, pagecomments, properties={
            'page': relationship(
                  Page,
                  primaryjoin=sa.and_(pages.c.jobno==pagecomments.c.jobno,
                                      pages.c.pagename==pagecomments.c.pagename),
                  backref=backref("comments",
                                  cascade="all, delete-orphan",
                                  order_by=pagecomments.c.comment_id))})

    @testing.resolve_artifact_names
    def test_basic(self):
        """A combination of complicated join conditions with post_update."""

        j1 = Job(jobno=u'somejob')
        j1.create_page(u'page1')
        j1.create_page(u'page2')
        j1.create_page(u'page3')

        j2 = Job(jobno=u'somejob2')
        j2.create_page(u'page1')
        j2.create_page(u'page2')
        j2.create_page(u'page3')

        j2.pages[0].add_version()
        j2.pages[0].add_version()
        j2.pages[1].add_version()

        s = create_session()
        s.add_all((j1, j2))

        s.flush()

        s.expunge_all()
        j = s.query(Job).filter_by(jobno=u'somejob').one()
        oldp = list(j.pages)
        j.pages = []

        s.flush()

        s.expunge_all()
        j = s.query(Job).filter_by(jobno=u'somejob2').one()
        j.pages[1].current_version = 12
        s.delete(j)
        s.flush()

class RelationshipTest4(_base.MappedTest):
    """Syncrules on foreign keys that are also primary"""

    @classmethod
    def define_tables(cls, metadata):
        Table("tableA", metadata,
              Column("id",Integer,primary_key=True, test_needs_autoincrement=True),
              Column("foo",Integer,),
              test_needs_fk=True)
              
        Table("tableB",metadata,
              Column("id",Integer,ForeignKey("tableA.id"),primary_key=True),
              test_needs_fk=True)

    @classmethod
    def setup_classes(cls):
        class A(_base.Entity):
            pass

        class B(_base.Entity):
            pass

    @testing.resolve_artifact_names
    def test_onetoone_switch(self):
        """test that active history is enabled on a one-to-many/one that has use_get==True"""
        
        mapper(A, tableA, properties={
            'b':relationship(B, cascade="all,delete-orphan", uselist=False)})
        mapper(B, tableB)
        
        compile_mappers()
        assert A.b.property.strategy.use_get
        
        sess = create_session()
        
        a1 = A()
        sess.add(a1)
        sess.flush()
        sess.close()
        a1 = sess.query(A).first()
        a1.b = B()
        sess.flush()
        
    @testing.resolve_artifact_names
    def test_no_delete_PK_AtoB(self):
        """A cant be deleted without B because B would have no PK value."""
        mapper(A, tableA, properties={
            'bs':relationship(B, cascade="save-update")})
        mapper(B, tableB)

        a1 = A()
        a1.bs.append(B())
        sess = create_session()
        sess.add(a1)
        sess.flush()

        sess.delete(a1)
        try:
            sess.flush()
            assert False
        except AssertionError, e:
            startswith_(str(e),
                        "Dependency rule tried to blank-out "
                        "primary key column 'tableB.id' on instance ")

    @testing.resolve_artifact_names
    def test_no_delete_PK_BtoA(self):
        mapper(B, tableB, properties={
            'a':relationship(A, cascade="save-update")})
        mapper(A, tableA)

        b1 = B()
        a1 = A()
        b1.a = a1
        sess = create_session()
        sess.add(b1)
        sess.flush()
        b1.a = None
        try:
            sess.flush()
            assert False
        except AssertionError, e:
            startswith_(str(e),
                        "Dependency rule tried to blank-out "
                        "primary key column 'tableB.id' on instance ")

    @testing.fails_on_everything_except('sqlite', 'mysql')
    @testing.resolve_artifact_names
    def test_nullPKsOK_BtoA(self):
        # postgresql cant handle a nullable PK column...?
        tableC = Table('tablec', tableA.metadata,
            Column('id', Integer, primary_key=True),
            Column('a_id', Integer, ForeignKey('tableA.id'),
                   primary_key=True, autoincrement=False, nullable=True))
        tableC.create()

        class C(_base.Entity):
            pass
        mapper(C, tableC, properties={
            'a':relationship(A, cascade="save-update")
        })
        mapper(A, tableA)

        c1 = C()
        c1.id = 5
        c1.a = None
        sess = create_session()
        sess.add(c1)
        # test that no error is raised.
        sess.flush()

    @testing.resolve_artifact_names
    def test_delete_cascade_BtoA(self):
        """No 'blank the PK' error when the child is to be deleted as part of a cascade"""

        for cascade in ("save-update, delete",
                        #"save-update, delete-orphan",
                        "save-update, delete, delete-orphan"):
            mapper(B, tableB, properties={
                'a':relationship(A, cascade=cascade, single_parent=True)
            })
            mapper(A, tableA)

            b1 = B()
            a1 = A()
            b1.a = a1
            sess = create_session()
            sess.add(b1)
            sess.flush()
            sess.delete(b1)
            sess.flush()
            assert a1 not in sess
            assert b1 not in sess
            sess.expunge_all()
            sa.orm.clear_mappers()

    @testing.resolve_artifact_names
    def test_delete_cascade_AtoB(self):
        """No 'blank the PK' error when the child is to be deleted as part of a cascade"""
        for cascade in ("save-update, delete",
                        #"save-update, delete-orphan",
                        "save-update, delete, delete-orphan"):
            mapper(A, tableA, properties={
                'bs':relationship(B, cascade=cascade)
            })
            mapper(B, tableB)

            a1 = A()
            b1 = B()
            a1.bs.append(b1)
            sess = create_session()
            sess.add(a1)
            sess.flush()

            sess.delete(a1)
            sess.flush()
            assert a1 not in sess
            assert b1 not in sess
            sess.expunge_all()
            sa.orm.clear_mappers()

    @testing.resolve_artifact_names
    def test_delete_manual_AtoB(self):
        mapper(A, tableA, properties={
            'bs':relationship(B, cascade="none")})
        mapper(B, tableB)

        a1 = A()
        b1 = B()
        a1.bs.append(b1)
        sess = create_session()
        sess.add(a1)
        sess.add(b1)
        sess.flush()

        sess.delete(a1)
        sess.delete(b1)
        sess.flush()
        assert a1 not in sess
        assert b1 not in sess
        sess.expunge_all()

    @testing.resolve_artifact_names
    def test_delete_manual_BtoA(self):
        mapper(B, tableB, properties={
            'a':relationship(A, cascade="none")})
        mapper(A, tableA)

        b1 = B()
        a1 = A()
        b1.a = a1
        sess = create_session()
        sess.add(b1)
        sess.add(a1)
        sess.flush()
        sess.delete(b1)
        sess.delete(a1)
        sess.flush()
        assert a1 not in sess
        assert b1 not in sess

class RelationshipToUniqueTest(_base.MappedTest):
    """test a relationship based on a primary join against a unique non-pk column"""
    
    @classmethod
    def define_tables(cls, metadata):
        Table("table_a", metadata,
                        Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
                        Column("ident", String(10), nullable=False, unique=True),
                        )

        Table("table_b", metadata,
                        Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
                        Column("a_ident", String(10), ForeignKey('table_a.ident'), nullable=False),
                        )
    
    @classmethod
    def setup_classes(cls):
        class A(_base.ComparableEntity):
            pass
        class B(_base.ComparableEntity):
            pass

    @testing.resolve_artifact_names
    def test_switch_parent(self):
        mapper(A, table_a)
        mapper(B, table_b, properties={"a": relationship(A, backref="bs")})

        session = create_session()
        a1, a2 = A(ident="uuid1"), A(ident="uuid2")
        session.add_all([a1, a2])
        a1.bs = [
            B(), B()
        ]
        session.flush()
        session.expire_all()
        a1, a2 = session.query(A).all()

        for b in list(a1.bs):
            b.a = a2
        session.delete(a1)
        session.flush()
    
class RelationshipTest5(_base.MappedTest):
    """Test a map to a select that relates to a map to the table."""

    @classmethod
    def define_tables(cls, metadata):
        Table('items', metadata,
              Column('item_policy_num', String(10), primary_key=True,
                     key='policyNum'),
              Column('item_policy_eff_date', sa.Date, primary_key=True,
                     key='policyEffDate'),
              Column('item_type', String(20), primary_key=True,
                     key='type'),
              Column('item_id', Integer, primary_key=True,
                     key='id', autoincrement=False))

    @testing.resolve_artifact_names
    def test_basic(self):
        class Container(_base.Entity):
            pass
        class LineItem(_base.Entity):
            pass

        container_select = sa.select(
            [items.c.policyNum, items.c.policyEffDate, items.c.type],
            distinct=True,
            ).alias('container_select')

        mapper(LineItem, items)

        mapper(Container,
               container_select,
               order_by=sa.asc(container_select.c.type),
               properties=dict(
                   lineItems=relationship(LineItem,
                       lazy='select',
                       cascade='all, delete-orphan',
                       order_by=sa.asc(items.c.id),
                       primaryjoin=sa.and_(
                         container_select.c.policyNum==items.c.policyNum,
                         container_select.c.policyEffDate==items.c.policyEffDate,
                         container_select.c.type==items.c.type),
                       foreign_keys=[
                         items.c.policyNum,
                         items.c.policyEffDate,
                         items.c.type])))

        session = create_session()
        con = Container()
        con.policyNum = "99"
        con.policyEffDate = datetime.date.today()
        con.type = "TESTER"
        session.add(con)
        for i in range(0, 10):
            li = LineItem()
            li.id = i
            con.lineItems.append(li)
            session.add(li)
        session.flush()
        session.expunge_all()
        newcon = session.query(Container).first()
        assert con.policyNum == newcon.policyNum
        assert len(newcon.lineItems) == 10
        for old, new in zip(con.lineItems, newcon.lineItems):
            eq_(old.id, new.id)

class RelationshipTest6(_base.MappedTest):
    """test a relationship with a non-column entity in the primary join, 
    is not viewonly, and also has the non-column's clause mentioned in the 
    foreign keys list.
    
    """
    
    @classmethod
    def define_tables(cls, metadata):
        Table('tags', metadata, Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
            Column("data", String(50)),
        )

        Table('tag_foo', metadata, 
            Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
            Column('tagid', Integer),
            Column("data", String(50)),
        )

    @testing.resolve_artifact_names
    def test_basic(self):
        class Tag(_base.ComparableEntity):
            pass
        class TagInstance(_base.ComparableEntity):
            pass

        mapper(Tag, tags, properties={
            'foo':relationship(TagInstance, 
               primaryjoin=sa.and_(tag_foo.c.data=='iplc_case',
                                tag_foo.c.tagid==tags.c.id),
               foreign_keys=[tag_foo.c.tagid, tag_foo.c.data],
               ),
        })

        mapper(TagInstance, tag_foo)

        sess = create_session()
        t1 = Tag(data='some tag')
        t1.foo.append(TagInstance(data='iplc_case'))
        t1.foo.append(TagInstance(data='not_iplc_case'))
        sess.add(t1)
        sess.flush()
        sess.expunge_all()
        
        # relationship works
        eq_(sess.query(Tag).all(), [Tag(data='some tag', foo=[TagInstance(data='iplc_case')])])
        
        # both TagInstances were persisted
        eq_(
            sess.query(TagInstance).order_by(TagInstance.data).all(), 
            [TagInstance(data='iplc_case'), TagInstance(data='not_iplc_case')]
        )

class BackrefPropagatesForwardsArgs(_base.MappedTest):
    
    @classmethod
    def define_tables(cls, metadata):
        Table('users', metadata, 
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('name', String(50))
        )
        Table('addresses', metadata, 
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('user_id', Integer),
            Column('email', String(50))
        )
    
    @classmethod
    def setup_classes(cls):
        class User(_base.ComparableEntity):
            pass
        class Address(_base.ComparableEntity):
            pass
    
    @testing.resolve_artifact_names
    def test_backref(self):
        
        mapper(User, users, properties={
            'addresses':relationship(Address, 
                        primaryjoin=addresses.c.user_id==users.c.id, 
                        foreign_keys=addresses.c.user_id,
                        backref='user')
        })
        mapper(Address, addresses)
        
        sess = sessionmaker()()
        u1 = User(name='u1', addresses=[Address(email='a1')])
        sess.add(u1)
        sess.commit()
        eq_(sess.query(Address).all(), [
            Address(email='a1', user=User(name='u1'))
        ])
    
class AmbiguousJoinInterpretedAsSelfRef(_base.MappedTest):
    """test ambiguous joins due to FKs on both sides treated as self-referential.
    
    this mapping is very similar to that of test/orm/inheritance/query.py
    SelfReferentialTestJoinedToBase , except that inheritance is not used
    here.
    
    """
    
    @classmethod
    def define_tables(cls, metadata):
        subscriber_table = Table('subscriber', metadata,
           Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
           Column('dummy', String(10)) # to appease older sqlite version
          )

        address_table = Table('address',
                 metadata,
                 Column('subscriber_id', Integer, ForeignKey('subscriber.id'), primary_key=True),
                 Column('type', String(1), primary_key=True),
                 )

    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        subscriber_and_address = subscriber.join(address, 
          and_(address.c.subscriber_id==subscriber.c.id, address.c.type.in_(['A', 'B', 'C'])))

        class Address(_base.ComparableEntity):
            pass

        class Subscriber(_base.ComparableEntity):
            pass

        mapper(Address, address)

        mapper(Subscriber, subscriber_and_address, properties={
           'id':[subscriber.c.id, address.c.subscriber_id],
           'addresses' : relationship(Address, 
                backref=backref("customer"))
           })
        
    @testing.resolve_artifact_names
    def test_mapping(self):
        from sqlalchemy.orm.interfaces import ONETOMANY,MANYTOONE
        sess = create_session()
        assert Subscriber.addresses.property.direction is ONETOMANY
        assert Address.customer.property.direction is MANYTOONE
        
        s1 = Subscriber(type='A',
                addresses = [
                    Address(type='D'),
                    Address(type='E'),
                ]
        )
        a1 = Address(type='B', customer=Subscriber(type='C'))
        
        assert s1.addresses[0].customer is s1
        assert a1.customer.addresses[0] is a1
        
        sess.add_all([s1, a1])
        
        sess.flush()
        sess.expunge_all()
        
        eq_(
            sess.query(Subscriber).order_by(Subscriber.type).all(),
            [
                Subscriber(id=1, type=u'A'), 
                Subscriber(id=2, type=u'B'), 
                Subscriber(id=2, type=u'C')
            ]
        )


class ManualBackrefTest(_fixtures.FixtureTest):
    """Test explicit relationships that are backrefs to each other."""

    run_inserts = None
    
    @testing.resolve_artifact_names
    def test_o2m(self):
        mapper(User, users, properties={
            'addresses':relationship(Address, back_populates='user')
        })
        
        mapper(Address, addresses, properties={
            'user':relationship(User, back_populates='addresses')
        })
        
        sess = create_session()
        
        u1 = User(name='u1')
        a1 = Address(email_address='foo')
        u1.addresses.append(a1)
        assert a1.user is u1
        
        sess.add(u1)
        sess.flush()
        sess.expire_all()
        assert sess.query(Address).one() is a1
        assert a1.user is u1
        assert a1 in u1.addresses

    @testing.resolve_artifact_names
    def test_invalid_key(self):
        mapper(User, users, properties={
            'addresses':relationship(Address, back_populates='userr')
        })
        
        mapper(Address, addresses, properties={
            'user':relationship(User, back_populates='addresses')
        })
        
        assert_raises(sa.exc.InvalidRequestError, compile_mappers)
        
    @testing.resolve_artifact_names
    def test_invalid_target(self):
        mapper(User, users, properties={
            'addresses':relationship(Address, back_populates='dingaling'),
        })
        
        mapper(Dingaling, dingalings)
        mapper(Address, addresses, properties={
            'dingaling':relationship(Dingaling)
        })
        
        assert_raises_message(sa.exc.ArgumentError, 
            r"reverse_property 'dingaling' on relationship User.addresses references "
            "relationship Address.dingaling, which does not reference mapper Mapper\|User\|users", 
            compile_mappers)
        
class JoinConditionErrorTest(testing.TestBase):
    
    def test_clauseelement_pj(self):
        from sqlalchemy.ext.declarative import declarative_base
        Base = declarative_base()
        class C1(Base):
            __tablename__ = 'c1'
            id = Column('id', Integer, primary_key=True)
        class C2(Base):
            __tablename__ = 'c2'
            id = Column('id', Integer, primary_key=True)
            c1id = Column('c1id', Integer, ForeignKey('c1.id'))
            c2 = relationship(C1, primaryjoin=C1.id)
        
        assert_raises(sa.exc.ArgumentError, compile_mappers)

    def test_clauseelement_pj_false(self):
        from sqlalchemy.ext.declarative import declarative_base
        Base = declarative_base()
        class C1(Base):
            __tablename__ = 'c1'
            id = Column('id', Integer, primary_key=True)
        class C2(Base):
            __tablename__ = 'c2'
            id = Column('id', Integer, primary_key=True)
            c1id = Column('c1id', Integer, ForeignKey('c1.id'))
            c2 = relationship(C1, primaryjoin="x"=="y")

        assert_raises(sa.exc.ArgumentError, compile_mappers)
    
    def test_only_column_elements(self):
        m = MetaData()
        t1 = Table('t1', m, 
            Column('id', Integer, primary_key=True),
            Column('foo_id', Integer, ForeignKey('t2.id')),
        )
        t2 = Table('t2', m,
            Column('id', Integer, primary_key=True),
            )
        class C1(object):
            pass
        class C2(object):
            pass

        mapper(C1, t1, properties={'c2':relationship(C2,  primaryjoin=t1.join(t2))})
        mapper(C2, t2)
        assert_raises(sa.exc.ArgumentError, compile_mappers)
    
    def test_fk_error_raised(self):
        m = MetaData()
        t1 = Table('t1', m, 
            Column('id', Integer, primary_key=True),
            Column('foo_id', Integer, ForeignKey('t2.nonexistent_id')),
        )
        t2 = Table('t2', m,
            Column('id', Integer, primary_key=True),
            )

        t3 = Table('t3', m,
            Column('id', Integer, primary_key=True),
            Column('t1id', Integer, ForeignKey('t1.id'))
        )
        
        class C1(object):
            pass
        class C2(object):
            pass
        
        mapper(C1, t1, properties={'c2':relationship(C2)})
        mapper(C2, t3)
        
        assert_raises(sa.exc.NoReferencedColumnError, compile_mappers)
    
    def test_join_error_raised(self):
        m = MetaData()
        t1 = Table('t1', m, 
            Column('id', Integer, primary_key=True),
        )
        t2 = Table('t2', m,
            Column('id', Integer, primary_key=True),
            )

        t3 = Table('t3', m,
            Column('id', Integer, primary_key=True),
            Column('t1id', Integer)
        )

        class C1(object):
            pass
        class C2(object):
            pass

        mapper(C1, t1, properties={'c2':relationship(C2)})
        mapper(C2, t3)

        assert_raises(sa.exc.ArgumentError, compile_mappers)
    
    def teardown(self):
        clear_mappers()    
        
class TypeMatchTest(_base.MappedTest):
    """test errors raised when trying to add items whose type is not handled by a relationship"""

    @classmethod
    def define_tables(cls, metadata):
        Table("a", metadata,
              Column('aid', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(30)))
        Table("b", metadata,
               Column('bid', Integer, primary_key=True, test_needs_autoincrement=True),
               Column("a_id", Integer, ForeignKey("a.aid")),
               Column('data', String(30)))
        Table("c", metadata,
              Column('cid', Integer, primary_key=True, test_needs_autoincrement=True),
              Column("b_id", Integer, ForeignKey("b.bid")),
              Column('data', String(30)))
        Table("d", metadata,
              Column('did', Integer, primary_key=True, test_needs_autoincrement=True),
              Column("a_id", Integer, ForeignKey("a.aid")),
              Column('data', String(30)))

    @testing.resolve_artifact_names
    def test_o2m_oncascade(self):
        class A(_base.Entity): pass
        class B(_base.Entity): pass
        class C(_base.Entity): pass
        mapper(A, a, properties={'bs':relationship(B)})
        mapper(B, b)
        mapper(C, c)

        a1 = A()
        b1 = B()
        c1 = C()
        a1.bs.append(b1)
        a1.bs.append(c1)
        sess = create_session()
        try:
            sess.add(a1)
            assert False
        except AssertionError, err:
            eq_(str(err),
                "Attribute 'bs' on class '%s' doesn't handle "
                "objects of type '%s'" % (A, C))

    @testing.resolve_artifact_names
    def test_o2m_onflush(self):
        class A(_base.Entity): pass
        class B(_base.Entity): pass
        class C(_base.Entity): pass
        mapper(A, a, properties={'bs':relationship(B, cascade="none")})
        mapper(B, b)
        mapper(C, c)

        a1 = A()
        b1 = B()
        c1 = C()
        a1.bs.append(b1)
        a1.bs.append(c1)
        sess = create_session()
        sess.add(a1)
        sess.add(b1)
        sess.add(c1)
        assert_raises_message(sa.orm.exc.FlushError,
                                 "Attempting to flush an item", sess.flush)

    @testing.resolve_artifact_names
    def test_o2m_nopoly_onflush(self):
        class A(_base.Entity): pass
        class B(_base.Entity): pass
        class C(B): pass
        mapper(A, a, properties={'bs':relationship(B, cascade="none")})
        mapper(B, b)
        mapper(C, c, inherits=B)

        a1 = A()
        b1 = B()
        c1 = C()
        a1.bs.append(b1)
        a1.bs.append(c1)
        sess = create_session()
        sess.add(a1)
        sess.add(b1)
        sess.add(c1)
        assert_raises_message(sa.orm.exc.FlushError,
                                 "Attempting to flush an item", sess.flush)

    @testing.resolve_artifact_names
    def test_m2o_nopoly_onflush(self):
        class A(_base.Entity): pass
        class B(A): pass
        class D(_base.Entity): pass
        mapper(A, a)
        mapper(B, b, inherits=A)
        mapper(D, d, properties={"a":relationship(A, cascade="none")})
        b1 = B()
        d1 = D()
        d1.a = b1
        sess = create_session()
        sess.add(b1)
        sess.add(d1)
        assert_raises_message(sa.orm.exc.FlushError,
                                 "Attempting to flush an item", sess.flush)

    @testing.resolve_artifact_names
    def test_m2o_oncascade(self):
        class A(_base.Entity): pass
        class B(_base.Entity): pass
        class D(_base.Entity): pass
        mapper(A, a)
        mapper(B, b)
        mapper(D, d, properties={"a":relationship(A)})
        b1 = B()
        d1 = D()
        d1.a = b1
        sess = create_session()
        assert_raises_message(AssertionError,
                                 "doesn't handle objects of type", sess.add, d1)

class TypedAssociationTable(_base.MappedTest):

    @classmethod
    def define_tables(cls, metadata):
        class MySpecialType(sa.types.TypeDecorator):
            impl = String
            def process_bind_param(self, value, dialect):
                return "lala" + value
            def process_result_value(self, value, dialect):
                return value[4:]

        Table('t1', metadata,
              Column('col1', MySpecialType(30), primary_key=True),
              Column('col2', String(30)))
        Table('t2', metadata,
              Column('col1', MySpecialType(30), primary_key=True),
              Column('col2', String(30)))
        Table('t3', metadata,
              Column('t1c1', MySpecialType(30), ForeignKey('t1.col1')),
              Column('t2c1', MySpecialType(30), ForeignKey('t2.col1')))

    @testing.resolve_artifact_names
    def testm2m(self):
        """Many-to-many tables with special types for candidate keys."""

        class T1(_base.Entity): pass
        class T2(_base.Entity): pass
        mapper(T2, t2)
        mapper(T1, t1, properties={
            't2s':relationship(T2, secondary=t3, backref='t1s')})

        a = T1()
        a.col1 = "aid"
        b = T2()
        b.col1 = "bid"
        c = T2()
        c.col1 = "cid"
        a.t2s.append(b)
        a.t2s.append(c)
        sess = create_session()
        sess.add(a)
        sess.flush()

        assert t3.count().scalar() == 2

        a.t2s.remove(c)
        sess.flush()

        assert t3.count().scalar() == 1

class ViewOnlyM2MBackrefTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table("t1", metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(40)))
        Table("t2", metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(40)),
        )
        Table("t1t2", metadata,
            Column('t1id', Integer, ForeignKey('t1.id'), primary_key=True),
            Column('t2id', Integer, ForeignKey('t2.id'), primary_key=True),
        )
    
    @testing.resolve_artifact_names
    def test_viewonly(self):
        class A(_base.ComparableEntity):pass
        class B(_base.ComparableEntity):pass
        
        mapper(A, t1, properties={
            'bs':relationship(B, secondary=t1t2, backref=backref('as_', viewonly=True))
        })
        mapper(B, t2)
        
        sess = create_session()
        a1 = A()
        b1 = B(as_=[a1])

        sess.add(a1)
        sess.flush()
        eq_(
            sess.query(A).first(), A(bs=[B(id=b1.id)])
        )
        eq_(
            sess.query(B).first(), B(as_=[A(id=a1.id)])
        )
        
class ViewOnlyOverlappingNames(_base.MappedTest):
    """'viewonly' mappings with overlapping PK column names."""

    @classmethod
    def define_tables(cls, metadata):
        Table("t1", metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(40)))
        Table("t2", metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(40)),
            Column('t1id', Integer, ForeignKey('t1.id')))
        Table("t3", metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(40)),
            Column('t2id', Integer, ForeignKey('t2.id')))

    @testing.resolve_artifact_names
    def test_three_table_view(self):
        """A three table join with overlapping PK names.

        A third table is pulled into the primary join condition using
        overlapping PK column names and should not produce 'conflicting column'
        error.

        """
        class C1(_base.Entity): pass
        class C2(_base.Entity): pass
        class C3(_base.Entity): pass

        mapper(C1, t1, properties={
            't2s':relationship(C2),
            't2_view':relationship(C2,
                               viewonly=True,
                               primaryjoin=sa.and_(t1.c.id==t2.c.t1id,
                                                   t3.c.t2id==t2.c.id,
                                                   t3.c.data==t1.c.data))})
        mapper(C2, t2)
        mapper(C3, t3, properties={
            't2':relationship(C2)})

        c1 = C1()
        c1.data = 'c1data'
        c2a = C2()
        c1.t2s.append(c2a)
        c2b = C2()
        c1.t2s.append(c2b)
        c3 = C3()
        c3.data='c1data'
        c3.t2 = c2b
        sess = create_session()
        sess.add(c1)
        sess.add(c3)
        sess.flush()
        sess.expunge_all()

        c1 = sess.query(C1).get(c1.id)
        assert set([x.id for x in c1.t2s]) == set([c2a.id, c2b.id])
        assert set([x.id for x in c1.t2_view]) == set([c2b.id])

class ViewOnlyUniqueNames(_base.MappedTest):
    """'viewonly' mappings with unique PK column names."""

    @classmethod
    def define_tables(cls, metadata):
        Table("t1", metadata,
            Column('t1id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(40)))
        Table("t2", metadata,
            Column('t2id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(40)),
            Column('t1id_ref', Integer, ForeignKey('t1.t1id')))
        Table("t3", metadata,
            Column('t3id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(40)),
            Column('t2id_ref', Integer, ForeignKey('t2.t2id')))

    @testing.resolve_artifact_names
    def test_three_table_view(self):
        """A three table join with overlapping PK names.

        A third table is pulled into the primary join condition using unique
        PK column names and should not produce 'mapper has no columnX' error.

        """
        class C1(_base.Entity): pass
        class C2(_base.Entity): pass
        class C3(_base.Entity): pass

        mapper(C1, t1, properties={
            't2s':relationship(C2),
            't2_view':relationship(C2,
                               viewonly=True,
                               primaryjoin=sa.and_(t1.c.t1id==t2.c.t1id_ref,
                                                   t3.c.t2id_ref==t2.c.t2id,
                                                   t3.c.data==t1.c.data))})
        mapper(C2, t2)
        mapper(C3, t3, properties={
            't2':relationship(C2)})

        c1 = C1()
        c1.data = 'c1data'
        c2a = C2()
        c1.t2s.append(c2a)
        c2b = C2()
        c1.t2s.append(c2b)
        c3 = C3()
        c3.data='c1data'
        c3.t2 = c2b
        sess = create_session()

        sess.add_all((c1, c3))
        sess.flush()
        sess.expunge_all()

        c1 = sess.query(C1).get(c1.t1id)
        assert set([x.t2id for x in c1.t2s]) == set([c2a.t2id, c2b.t2id])
        assert set([x.t2id for x in c1.t2_view]) == set([c2b.t2id])

class ViewOnlyLocalRemoteM2M(testing.TestBase):
    """test that local-remote is correctly determined for m2m"""
    
    def test_local_remote(self):
        meta = MetaData()
        
        t1 = Table('t1', meta,
                Column('id', Integer, primary_key=True),
            )
        t2 = Table('t2', meta,
                Column('id', Integer, primary_key=True),
            )
        t12 = Table('tab', meta,
                Column('t1_id', Integer, ForeignKey('t1.id',)),
                Column('t2_id', Integer, ForeignKey('t2.id',)),
            )
        
        class A(object): pass
        class B(object): pass
        mapper( B, t2, )
        m = mapper( A, t1, properties=dict(
                b_view = relationship( B, secondary=t12, viewonly=True),
                b_plain= relationship( B, secondary=t12),
            )
        )
        compile_mappers()
        assert m.get_property('b_view').local_remote_pairs == \
            m.get_property('b_plain').local_remote_pairs == \
            [(t1.c.id, t12.c.t1_id), (t2.c.id, t12.c.t2_id)]

        
    
class ViewOnlyNonEquijoin(_base.MappedTest):
    """'viewonly' mappings based on non-equijoins."""

    @classmethod
    def define_tables(cls, metadata):
        Table('foos', metadata,
                     Column('id', Integer, primary_key=True))
        Table('bars', metadata,
                     Column('id', Integer, primary_key=True),
                     Column('fid', Integer))

    @testing.resolve_artifact_names
    def test_viewonly_join(self):
        class Foo(_base.ComparableEntity):
            pass
        class Bar(_base.ComparableEntity):
            pass

        mapper(Foo, foos, properties={
            'bars':relationship(Bar,
                            primaryjoin=foos.c.id > bars.c.fid,
                            foreign_keys=[bars.c.fid],
                            viewonly=True)})

        mapper(Bar, bars)

        sess = create_session()
        sess.add_all((Foo(id=4),
                      Foo(id=9),
                      Bar(id=1, fid=2),
                      Bar(id=2, fid=3),
                      Bar(id=3, fid=6),
                      Bar(id=4, fid=7)))
        sess.flush()

        sess = create_session()
        eq_(sess.query(Foo).filter_by(id=4).one(),
            Foo(id=4, bars=[Bar(fid=2), Bar(fid=3)]))
        eq_(sess.query(Foo).filter_by(id=9).one(),
            Foo(id=9, bars=[Bar(fid=2), Bar(fid=3), Bar(fid=6), Bar(fid=7)]))


class ViewOnlyRepeatedRemoteColumn(_base.MappedTest):
    """'viewonly' mappings that contain the same 'remote' column twice"""

    @classmethod
    def define_tables(cls, metadata):
        Table('foos', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('bid1', Integer,ForeignKey('bars.id')),
              Column('bid2', Integer,ForeignKey('bars.id')))

        Table('bars', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(50)))

    @testing.resolve_artifact_names
    def test_relationship_on_or(self):
        class Foo(_base.ComparableEntity):
            pass
        class Bar(_base.ComparableEntity):
            pass

        mapper(Foo, foos, properties={
            'bars':relationship(Bar,
                            primaryjoin=sa.or_(bars.c.id == foos.c.bid1,
                                               bars.c.id == foos.c.bid2),
                            uselist=True,
                            viewonly=True)})
        mapper(Bar, bars)

        sess = create_session()
        b1 = Bar(id=1, data='b1')
        b2 = Bar(id=2, data='b2')
        b3 = Bar(id=3, data='b3')
        f1 = Foo(bid1=1, bid2=2)
        f2 = Foo(bid1=3, bid2=None)

        sess.add_all((b1, b2, b3))
        sess.flush()

        sess.add_all((f1, f2))
        sess.flush()

        sess.expunge_all()
        eq_(sess.query(Foo).filter_by(id=f1.id).one(),
            Foo(bars=[Bar(data='b1'), Bar(data='b2')]))
        eq_(sess.query(Foo).filter_by(id=f2.id).one(),
            Foo(bars=[Bar(data='b3')]))

class ViewOnlyRepeatedLocalColumn(_base.MappedTest):
    """'viewonly' mappings that contain the same 'local' column twice"""

    @classmethod
    def define_tables(cls, metadata):
        Table('foos', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(50)))

        Table('bars', metadata, Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('fid1', Integer, ForeignKey('foos.id')),
              Column('fid2', Integer, ForeignKey('foos.id')),
              Column('data', String(50)))

    @testing.resolve_artifact_names
    def test_relationship_on_or(self):
        class Foo(_base.ComparableEntity):
            pass
        class Bar(_base.ComparableEntity):
            pass

        mapper(Foo, foos, properties={
            'bars':relationship(Bar,
                            primaryjoin=sa.or_(bars.c.fid1 == foos.c.id,
                                               bars.c.fid2 == foos.c.id),
                            viewonly=True)})
        mapper(Bar, bars)

        sess = create_session()
        f1 = Foo(id=1, data='f1')
        f2 = Foo(id=2, data='f2')
        b1 = Bar(fid1=1, data='b1')
        b2 = Bar(fid2=1, data='b2')
        b3 = Bar(fid1=2, data='b3')
        b4 = Bar(fid1=1, fid2=2, data='b4')

        sess.add_all((f1, f2))
        sess.flush()

        sess.add_all((b1, b2, b3, b4))
        sess.flush()

        sess.expunge_all()
        eq_(sess.query(Foo).filter_by(id=f1.id).one(),
            Foo(bars=[Bar(data='b1'), Bar(data='b2'), Bar(data='b4')]))
        eq_(sess.query(Foo).filter_by(id=f2.id).one(),
            Foo(bars=[Bar(data='b3'), Bar(data='b4')]))

class ViewOnlyComplexJoin(_base.MappedTest):
    """'viewonly' mappings with a complex join condition."""

    @classmethod
    def define_tables(cls, metadata):
        Table('t1', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(50)))
        Table('t2', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(50)),
            Column('t1id', Integer, ForeignKey('t1.id')))
        Table('t3', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(50)))
        Table('t2tot3', metadata,
            Column('t2id', Integer, ForeignKey('t2.id')),
            Column('t3id', Integer, ForeignKey('t3.id')))

    @classmethod
    def setup_classes(cls):
        class T1(_base.ComparableEntity):
            pass
        class T2(_base.ComparableEntity):
            pass
        class T3(_base.ComparableEntity):
            pass

    @testing.resolve_artifact_names
    def test_basic(self):
        mapper(T1, t1, properties={
            't3s':relationship(T3, primaryjoin=sa.and_(
                t1.c.id==t2.c.t1id,
                t2.c.id==t2tot3.c.t2id,
                t3.c.id==t2tot3.c.t3id),
            viewonly=True,
            foreign_keys=t3.c.id, remote_side=t2.c.t1id)
        })
        mapper(T2, t2, properties={
            't1':relationship(T1),
            't3s':relationship(T3, secondary=t2tot3)
        })
        mapper(T3, t3)

        sess = create_session()
        sess.add(T2(data='t2', t1=T1(data='t1'), t3s=[T3(data='t3')]))
        sess.flush()
        sess.expunge_all()

        a = sess.query(T1).first()
        eq_(a.t3s, [T3(data='t3')])


    @testing.resolve_artifact_names
    def test_remote_side_escalation(self):
        mapper(T1, t1, properties={
            't3s':relationship(T3,
                           primaryjoin=sa.and_(t1.c.id==t2.c.t1id,
                                               t2.c.id==t2tot3.c.t2id,
                                               t3.c.id==t2tot3.c.t3id
                                               ),
                           viewonly=True,
                           foreign_keys=t3.c.id)})
        mapper(T2, t2, properties={
            't1':relationship(T1),
            't3s':relationship(T3, secondary=t2tot3)})
        mapper(T3, t3)
        assert_raises_message(sa.exc.ArgumentError,
                                 "Specify remote_side argument",
                                 sa.orm.compile_mappers)


class ExplicitLocalRemoteTest(_base.MappedTest):

    @classmethod
    def define_tables(cls, metadata):
        Table('t1', metadata,
            Column('id', String(50), primary_key=True, test_needs_autoincrement=True),
            Column('data', String(50)))
        Table('t2', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(50)),
            Column('t1id', String(50)))

    @classmethod
    @testing.resolve_artifact_names
    def setup_classes(cls):
        class T1(_base.ComparableEntity):
            pass
        class T2(_base.ComparableEntity):
            pass

    @testing.resolve_artifact_names
    def test_onetomany_funcfk(self):
        # use a function within join condition.  but specifying
        # local_remote_pairs overrides all parsing of the join condition.
        mapper(T1, t1, properties={
            't2s':relationship(T2,
                           primaryjoin=t1.c.id==sa.func.lower(t2.c.t1id),
                           _local_remote_pairs=[(t1.c.id, t2.c.t1id)],
                           foreign_keys=[t2.c.t1id])})
        mapper(T2, t2)

        sess = create_session()
        a1 = T1(id='number1', data='a1')
        a2 = T1(id='number2', data='a2')
        b1 = T2(data='b1', t1id='NuMbEr1')
        b2 = T2(data='b2', t1id='Number1')
        b3 = T2(data='b3', t1id='Number2')
        sess.add_all((a1, a2, b1, b2, b3))
        sess.flush()
        sess.expunge_all()

        eq_(sess.query(T1).first(),
            T1(id='number1', data='a1', t2s=[
               T2(data='b1', t1id='NuMbEr1'),
               T2(data='b2', t1id='Number1')]))

    @testing.resolve_artifact_names
    def test_manytoone_funcfk(self):
        mapper(T1, t1)
        mapper(T2, t2, properties={
            't1':relationship(T1,
                          primaryjoin=t1.c.id==sa.func.lower(t2.c.t1id),
                          _local_remote_pairs=[(t2.c.t1id, t1.c.id)],
                          foreign_keys=[t2.c.t1id],
                          uselist=True)})

        sess = create_session()
        a1 = T1(id='number1', data='a1')
        a2 = T1(id='number2', data='a2')
        b1 = T2(data='b1', t1id='NuMbEr1')
        b2 = T2(data='b2', t1id='Number1')
        b3 = T2(data='b3', t1id='Number2')
        sess.add_all((a1, a2, b1, b2, b3))
        sess.flush()
        sess.expunge_all()

        eq_(sess.query(T2).filter(T2.data.in_(['b1', 'b2'])).all(),
            [T2(data='b1', t1=[T1(id='number1', data='a1')]),
             T2(data='b2', t1=[T1(id='number1', data='a1')])])

    @testing.resolve_artifact_names
    def test_onetomany_func_referent(self):
        mapper(T1, t1, properties={
            't2s':relationship(T2,
                           primaryjoin=sa.func.lower(t1.c.id)==t2.c.t1id,
                           _local_remote_pairs=[(t1.c.id, t2.c.t1id)],
                           foreign_keys=[t2.c.t1id])})
        mapper(T2, t2)

        sess = create_session()
        a1 = T1(id='NuMbeR1', data='a1')
        a2 = T1(id='NuMbeR2', data='a2')
        b1 = T2(data='b1', t1id='number1')
        b2 = T2(data='b2', t1id='number1')
        b3 = T2(data='b2', t1id='number2')
        sess.add_all((a1, a2, b1, b2, b3))
        sess.flush()
        sess.expunge_all()

        eq_(sess.query(T1).first(),
            T1(id='NuMbeR1', data='a1', t2s=[
              T2(data='b1', t1id='number1'),
              T2(data='b2', t1id='number1')]))

    @testing.resolve_artifact_names
    def test_manytoone_func_referent(self):
        mapper(T1, t1)
        mapper(T2, t2, properties={
            't1':relationship(T1,
                          primaryjoin=sa.func.lower(t1.c.id)==t2.c.t1id,
                          _local_remote_pairs=[(t2.c.t1id, t1.c.id)],
                          foreign_keys=[t2.c.t1id], uselist=True)})

        sess = create_session()
        a1 = T1(id='NuMbeR1', data='a1')
        a2 = T1(id='NuMbeR2', data='a2')
        b1 = T2(data='b1', t1id='number1')
        b2 = T2(data='b2', t1id='number1')
        b3 = T2(data='b3', t1id='number2')
        sess.add_all((a1, a2, b1, b2, b3))
        sess.flush()
        sess.expunge_all()

        eq_(sess.query(T2).filter(T2.data.in_(['b1', 'b2'])).all(),
            [T2(data='b1', t1=[T1(id='NuMbeR1', data='a1')]),
             T2(data='b2', t1=[T1(id='NuMbeR1', data='a1')])])

    @testing.resolve_artifact_names
    def test_escalation_1(self):
        mapper(T1, t1, properties={
            't2s':relationship(T2,
                           primaryjoin=t1.c.id==sa.func.lower(t2.c.t1id),
                           _local_remote_pairs=[(t1.c.id, t2.c.t1id)],
                           foreign_keys=[t2.c.t1id],
                           remote_side=[t2.c.t1id])})
        mapper(T2, t2)
        assert_raises(sa.exc.ArgumentError, sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_escalation_2(self):
        mapper(T1, t1, properties={
            't2s':relationship(T2,
                           primaryjoin=t1.c.id==sa.func.lower(t2.c.t1id),
                           _local_remote_pairs=[(t1.c.id, t2.c.t1id)])})
        mapper(T2, t2)
        assert_raises(sa.exc.ArgumentError, sa.orm.compile_mappers)

class InvalidRemoteSideTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('t1', metadata,
            Column('id', Integer, primary_key=True),
            Column('data', String(50)),
            Column('t_id', Integer, ForeignKey('t1.id'))
            )

    @classmethod
    @testing.resolve_artifact_names
    def setup_classes(cls):
        class T1(_base.ComparableEntity):
            pass

    @testing.resolve_artifact_names
    def test_o2m_backref(self):
        mapper(T1, t1, properties={
            't1s':relationship(T1, backref='parent')
        })

        assert_raises_message(sa.exc.ArgumentError, "T1.t1s and back-reference T1.parent are "
                    "both of the same direction <symbol 'ONETOMANY>.  Did you "
                    "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_m2o_backref(self):
        mapper(T1, t1, properties={
            't1s':relationship(T1, backref=backref('parent', remote_side=t1.c.id), remote_side=t1.c.id)
        })

        assert_raises_message(sa.exc.ArgumentError, "T1.t1s and back-reference T1.parent are "
                    "both of the same direction <symbol 'MANYTOONE>.  Did you "
                    "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_o2m_explicit(self):
        mapper(T1, t1, properties={
            't1s':relationship(T1, back_populates='parent'),
            'parent':relationship(T1, back_populates='t1s'),
        })

        # can't be sure of ordering here
        assert_raises_message(sa.exc.ArgumentError, 
                    "both of the same direction <symbol 'ONETOMANY>.  Did you "
                    "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_m2o_explicit(self):
        mapper(T1, t1, properties={
            't1s':relationship(T1, back_populates='parent', remote_side=t1.c.id),
            'parent':relationship(T1, back_populates='t1s', remote_side=t1.c.id)
        })

        # can't be sure of ordering here
        assert_raises_message(sa.exc.ArgumentError, 
                    "both of the same direction <symbol 'MANYTOONE>.  Did you "
                    "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers)

        
class InvalidRelationshipEscalationTest(_base.MappedTest):

    @classmethod
    def define_tables(cls, metadata):
        Table('foos', metadata,
              Column('id', Integer, primary_key=True),
              Column('fid', Integer))
        Table('bars', metadata,
              Column('id', Integer, primary_key=True),
              Column('fid', Integer))

    @classmethod
    def setup_classes(cls):
        class Foo(_base.Entity):
            pass
        class Bar(_base.Entity):
            pass

    @testing.resolve_artifact_names
    def test_no_join(self):
        mapper(Foo, foos, properties={
            'bars':relationship(Bar)})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not determine join condition between parent/child "
            "tables on relationship", sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_no_join_self_ref(self):
        mapper(Foo, foos, properties={
            'foos':relationship(Foo)})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not determine join condition between parent/child "
            "tables on relationship", sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_no_equated(self):
        mapper(Foo, foos, properties={
            'bars':relationship(Bar,
                            primaryjoin=foos.c.id>bars.c.fid)})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not determine relationship direction for primaryjoin condition",
            sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_no_equated_fks(self):
        mapper(Foo, foos, properties={
            'bars':relationship(Bar,
                            primaryjoin=foos.c.id>bars.c.fid,
                            foreign_keys=bars.c.fid)})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not locate any equated, locally mapped column pairs "
            "for primaryjoin condition", sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_ambiguous_fks(self):
        mapper(Foo, foos, properties={
            'bars':relationship(Bar,
                            primaryjoin=foos.c.id==bars.c.fid,
                            foreign_keys=[foos.c.id, bars.c.fid])})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError, 
                "Do the columns in 'foreign_keys' represent only the "
                "'foreign' columns in this join condition ?", 
                sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_ambiguous_remoteside_o2m(self):
        mapper(Foo, foos, properties={
            'bars':relationship(Bar,
                            primaryjoin=foos.c.id==bars.c.fid,
                            foreign_keys=[bars.c.fid],
                            remote_side=[foos.c.id, bars.c.fid],
                            viewonly=True
                            )})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError, 
                "could not determine any local/remote column pairs",
                sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_ambiguous_remoteside_m2o(self):
        mapper(Foo, foos, properties={
            'bars':relationship(Bar,
                            primaryjoin=foos.c.id==bars.c.fid,
                            foreign_keys=[foos.c.id],
                            remote_side=[foos.c.id, bars.c.fid],
                            viewonly=True
                            )})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError, 
                "could not determine any local/remote column pairs",
                sa.orm.compile_mappers)
        
    
    @testing.resolve_artifact_names
    def test_no_equated_self_ref(self):
        mapper(Foo, foos, properties={
            'foos':relationship(Foo,
                            primaryjoin=foos.c.id>foos.c.fid)})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not determine relationship direction for primaryjoin condition",
            sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_no_equated_self_ref(self):
        mapper(Foo, foos, properties={
            'foos':relationship(Foo,
                            primaryjoin=foos.c.id>foos.c.fid,
                            foreign_keys=[foos.c.fid])})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not locate any equated, locally mapped column pairs "
            "for primaryjoin condition", sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_no_equated_viewonly(self):
        mapper(Foo, foos, properties={
            'bars':relationship(Bar,
                            primaryjoin=foos.c.id>bars.c.fid,
                            viewonly=True)})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not determine relationship direction for primaryjoin condition",
            sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_no_equated_self_ref_viewonly(self):
        mapper(Foo, foos, properties={
            'foos':relationship(Foo,
                            primaryjoin=foos.c.id>foos.c.fid,
                            viewonly=True)})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Specify the 'foreign_keys' argument to indicate which columns "
            "on the relationship are foreign.", sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_no_equated_self_ref_viewonly_fks(self):
        mapper(Foo, foos, properties={
            'foos':relationship(Foo,
                            primaryjoin=foos.c.id>foos.c.fid,
                            viewonly=True,
                            foreign_keys=[foos.c.fid])})

        sa.orm.compile_mappers()
        eq_(Foo.foos.property.local_remote_pairs, [(foos.c.id, foos.c.fid)])

    @testing.resolve_artifact_names
    def test_equated(self):
        mapper(Foo, foos, properties={
            'bars':relationship(Bar,
                            primaryjoin=foos.c.id==bars.c.fid)})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not determine relationship direction for primaryjoin condition",
            sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_equated_self_ref(self):
        mapper(Foo, foos, properties={
            'foos':relationship(Foo,
                            primaryjoin=foos.c.id==foos.c.fid)})

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not determine relationship direction for primaryjoin condition",
            sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_equated_self_ref_wrong_fks(self):
        mapper(Foo, foos, properties={
            'foos':relationship(Foo,
                            primaryjoin=foos.c.id==foos.c.fid,
                            foreign_keys=[bars.c.id])})

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not determine relationship direction for primaryjoin condition",
            sa.orm.compile_mappers)


class InvalidRelationshipEscalationTestM2M(_base.MappedTest):

    @classmethod
    def define_tables(cls, metadata):
        Table('foos', metadata,
              Column('id', Integer, primary_key=True))
        Table('foobars', metadata,
              Column('fid', Integer), Column('bid', Integer))
        Table('bars', metadata,
              Column('id', Integer, primary_key=True))

    @classmethod
    @testing.resolve_artifact_names
    def setup_classes(cls):
        class Foo(_base.Entity):
            pass
        class Bar(_base.Entity):
            pass

    @testing.resolve_artifact_names
    def test_no_join(self):
        mapper(Foo, foos, properties={
            'bars': relationship(Bar, secondary=foobars)})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not determine join condition between parent/child tables "
            "on relationship", sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_no_secondaryjoin(self):
        mapper(Foo, foos, properties={
            'bars': relationship(Bar,
                            secondary=foobars,
                            primaryjoin=foos.c.id > foobars.c.fid)})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not determine join condition between parent/child tables "
            "on relationship",
            sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_bad_primaryjoin(self):
        mapper(Foo, foos, properties={
            'bars': relationship(Bar,
                             secondary=foobars,
                             primaryjoin=foos.c.id > foobars.c.fid,
                             secondaryjoin=foobars.c.bid<=bars.c.id)})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not determine relationship direction for primaryjoin condition",
            sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_bad_secondaryjoin(self):
        mapper(Foo, foos, properties={
            'bars':relationship(Bar,
                            secondary=foobars,
                            primaryjoin=foos.c.id == foobars.c.fid,
                            secondaryjoin=foobars.c.bid <= bars.c.id,
                            foreign_keys=[foobars.c.fid])})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not determine relationship direction for secondaryjoin "
            "condition", sa.orm.compile_mappers)

    @testing.resolve_artifact_names
    def test_no_equated_secondaryjoin(self):
        mapper(Foo, foos, properties={
            'bars':relationship(Bar,
                            secondary=foobars,
                            primaryjoin=foos.c.id == foobars.c.fid,
                            secondaryjoin=foobars.c.bid <= bars.c.id,
                            foreign_keys=[foobars.c.fid, foobars.c.bid])})
        mapper(Bar, bars)

        assert_raises_message(
            sa.exc.ArgumentError,
            "Could not locate any equated, locally mapped column pairs for "
            "secondaryjoin condition", sa.orm.compile_mappers)


class RelationDeprecationTest(_base.MappedTest):
    run_inserts = 'once'
    run_deletes = None

    @classmethod
    def define_tables(cls, metadata):
        Table('users_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('name', String(64)))

        Table('addresses_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('user_id', Integer, ForeignKey('users_table.id')),
              Column('email_address', String(128)),
              Column('purpose', String(16)),
              Column('bounces', Integer, default=0))

    @classmethod
    def setup_classes(cls):
        class User(_base.BasicEntity):
            pass

        class Address(_base.BasicEntity):
            pass

    @classmethod
    def fixtures(cls):
        return dict(
            users_table=(
            ('id', 'name'),
            (1, 'jack'),
            (2, 'ed'),
            (3, 'fred'),
            (4, 'chuck')),

            addresses_table=(
            ('id', 'user_id', 'email_address', 'purpose', 'bounces'),
            (1, 1, 'jack@jack.home', 'Personal', 0),
            (2, 1, 'jack@jack.bizz', 'Work', 1),
            (3, 2, 'ed@foo.bar', 'Personal', 0),
            (4, 3, 'fred@the.fred', 'Personal', 10)))

    @testing.resolve_artifact_names
    def test_relation(self):
        mapper(User, users_table, properties=dict(
            addresses=relation(Address, backref='user'),
            ))
        mapper(Address, addresses_table)

        session = create_session()

        ed = session.query(User).filter(User.addresses.any(
            Address.email_address == 'ed@foo.bar')).one()



www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.