test_cascade.py :  » Database » SQLAlchemy » SQLAlchemy-0.6.0 » test » orm » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » SQLAlchemy 
SQLAlchemy » SQLAlchemy 0.6.0 » test » orm » test_cascade.py

from sqlalchemy.test.testing import assert_raises,assert_raises_message
from sqlalchemy import Integer,String,ForeignKey,Sequence,exc
from sqlalchemy.test.schema import Table,Column
from sqlalchemy.orm import mapper,relationship,create_session,sessionmaker,class_mapper,backref
from sqlalchemy.orm import attributes,exc
from sqlalchemy.test import testing
from sqlalchemy.test.testing import eq_
from test.orm import _base,_fixtures


class O2MCascadeTest(_fixtures.FixtureTest):
    run_inserts = None

    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        mapper(Address, addresses)
        mapper(User, users, properties = dict(
            addresses = relationship(Address, cascade="all, delete-orphan", backref="user"),
            orders = relationship(
                mapper(Order, orders), cascade="all, delete-orphan", order_by=orders.c.id)
        ))
        mapper(Dingaling,dingalings, properties={
            'address':relationship(Address)
        })

    @testing.resolve_artifact_names
    def test_list_assignment(self):
        sess = create_session()
        u = User(name='jack', orders=[
                 Order(description='someorder'),
                 Order(description='someotherorder')])
        sess.add(u)
        sess.flush()
        sess.expunge_all()

        u = sess.query(User).get(u.id)
        eq_(u, User(name='jack',
                    orders=[Order(description='someorder'),
                            Order(description='someotherorder')]))

        u.orders=[Order(description="order 3"), Order(description="order 4")]
        sess.flush()
        sess.expunge_all()

        u = sess.query(User).get(u.id)
        eq_(u, User(name='jack',
                    orders=[Order(description="order 3"),
                            Order(description="order 4")]))

        eq_(sess.query(Order).order_by(Order.id).all(),
            [Order(description="order 3"), Order(description="order 4")])

        o5 = Order(description="order 5")
        sess.add(o5)
        assert_raises_message(orm_exc.FlushError, "is an orphan", sess.flush)

    @testing.resolve_artifact_names
    def test_save_update_sends_pending(self):
        """test that newly added and deleted collection items are cascaded on save-update"""
        
        sess = sessionmaker(expire_on_commit=False)()
        o1, o2, o3 = Order(description='o1'), Order(description='o2'), Order(description='o3')
        u = User(name='jack', orders=[o1, o2])
        sess.add(u)
        sess.commit()
        sess.close()
        
        u.orders.append(o3)
        u.orders.remove(o1)
        
        sess.add(u)
        assert o1 in sess
        assert o2 in sess
        assert o3 in sess
        sess.commit()

    
    @testing.resolve_artifact_names
    def test_delete(self):
        sess = create_session()
        u = User(name='jack',
                 orders=[Order(description='someorder'),
                         Order(description='someotherorder')])
        sess.add(u)
        sess.flush()

        sess.delete(u)
        sess.flush()
        assert users.count().scalar() == 0
        assert orders.count().scalar() == 0

    @testing.resolve_artifact_names
    def test_delete_unloaded_collections(self):
        """Unloaded collections are still included in a delete-cascade by default."""
        sess = create_session()
        u = User(name='jack',
                 addresses=[Address(email_address="address1"),
                            Address(email_address="address2")])
        sess.add(u)
        sess.flush()
        sess.expunge_all()
        assert addresses.count().scalar() == 2
        assert users.count().scalar() == 1

        u = sess.query(User).get(u.id)

        assert 'addresses' not in u.__dict__
        sess.delete(u)
        sess.flush()
        assert addresses.count().scalar() == 0
        assert users.count().scalar() == 0

    @testing.resolve_artifact_names
    def test_cascades_onlycollection(self):
        """Cascade only reaches instances that are still part of the collection,
        not those that have been removed"""

        sess = create_session()
        u = User(name='jack',
                 orders=[Order(description='someorder'),
                         Order(description='someotherorder')])
        sess.add(u)
        sess.flush()

        o = u.orders[0]
        del u.orders[0]
        sess.delete(u)
        assert u in sess.deleted
        assert o not in sess.deleted
        assert o in sess

        u2 = User(name='newuser', orders=[o])
        sess.add(u2)
        sess.flush()
        sess.expunge_all()
        assert users.count().scalar() == 1
        assert orders.count().scalar() == 1
        eq_(sess.query(User).all(),
            [User(name='newuser',
                  orders=[Order(description='someorder')])])

    @testing.resolve_artifact_names
    def test_cascade_nosideeffects(self):
        """test that cascade leaves the state of unloaded scalars/collections unchanged."""
        
        sess = create_session()
        u = User(name='jack')
        sess.add(u)
        assert 'orders' not in u.__dict__

        sess.flush()
        
        assert 'orders' not in u.__dict__

        a = Address(email_address='foo@bar.com')
        sess.add(a)
        assert 'user' not in a.__dict__
        a.user = u
        sess.flush()
        
        d = Dingaling(data='d1')
        d.address_id = a.id
        sess.add(d)
        assert 'address' not in d.__dict__
        sess.flush()
        assert d.address is a
        
    @testing.resolve_artifact_names
    def test_cascade_delete_plusorphans(self):
        sess = create_session()
        u = User(name='jack',
                 orders=[Order(description='someorder'),
                         Order(description='someotherorder')])
        sess.add(u)
        sess.flush()
        assert users.count().scalar() == 1
        assert orders.count().scalar() == 2

        del u.orders[0]
        sess.delete(u)
        sess.flush()
        assert users.count().scalar() == 0
        assert orders.count().scalar() == 0

    @testing.resolve_artifact_names
    def test_collection_orphans(self):
        sess = create_session()
        u = User(name='jack',
                 orders=[Order(description='someorder'),
                         Order(description='someotherorder')])
        sess.add(u)
        sess.flush()

        assert users.count().scalar() == 1
        assert orders.count().scalar() == 2

        u.orders[:] = []

        sess.flush()

        assert users.count().scalar() == 1
        assert orders.count().scalar() == 0

class O2OCascadeTest(_fixtures.FixtureTest):
    run_inserts = None
    
    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        mapper(Address, addresses)
        mapper(User, users, properties = {
            'address':relationship(Address, backref=backref("user", single_parent=True), uselist=False)
        })

    @testing.resolve_artifact_names
    def test_single_parent_raise(self):
        a1 = Address(email_address='some address')
        u1 = User(name='u1', address=a1)
        
        assert_raises(sa_exc.InvalidRequestError, Address, email_address='asd', user=u1)
        
        a2 = Address(email_address='asd')
        u1.address = a2
        assert u1.address is not a1
        assert a1.user is None
        
        
        
class O2MBackrefTest(_fixtures.FixtureTest):
    run_inserts = None

    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        mapper(User, users, properties = dict(
            orders = relationship(
                mapper(Order, orders), cascade="all, delete-orphan", backref="user")
        ))

    @testing.resolve_artifact_names
    def test_lazyload_bug(self):
        sess = create_session()

        u = User(name="jack")
        sess.add(u)
        sess.expunge(u)

        o1 = Order(description='someorder')
        o1.user = u
        sess.add(u)
        assert u in sess
        assert o1 in sess


class NoSaveCascadeTest(_fixtures.FixtureTest):
    """test that backrefs don't force save-update cascades to occur
    when the cascade initiated from the forwards side."""
    
    @testing.resolve_artifact_names
    def test_unidirectional_cascade_o2m(self):
        mapper(Order, orders)
        mapper(User, users, properties = dict(
            orders = relationship(
                Order, backref=backref("user", cascade=None))
        ))
        
        sess = create_session()
        
        o1 = Order()
        sess.add(o1)
        u1 = User(orders=[o1])
        assert u1 not in sess
        assert o1 in sess
        
        sess.expunge_all()
        
        o1 = Order()
        u1 = User(orders=[o1])
        sess.add(o1)
        assert u1 not in sess
        assert o1 in sess

    @testing.resolve_artifact_names
    def test_unidirectional_cascade_m2o(self):
        mapper(Order, orders, properties={
            'user':relationship(User, backref=backref("orders", cascade=None))
        })
        mapper(User, users)
        
        sess = create_session()
        
        u1 = User()
        sess.add(u1)
        o1 = Order()
        o1.user = u1
        assert o1 not in sess
        assert u1 in sess
        
        sess.expunge_all()

        u1 = User()
        o1 = Order()
        o1.user = u1
        sess.add(u1)
        assert o1 not in sess
        assert u1 in sess

    @testing.resolve_artifact_names
    def test_unidirectional_cascade_m2m(self):
        mapper(Item, items, properties={
            'keywords':relationship(Keyword, secondary=item_keywords, cascade="none", backref="items")
        })
        mapper(Keyword, keywords)

        sess = create_session()

        i1 = Item()
        k1 = Keyword()
        sess.add(i1)
        i1.keywords.append(k1)
        assert i1 in sess
        assert k1 not in sess
        
        sess.expunge_all()
        
        i1 = Item()
        k1 = Keyword()
        sess.add(i1)
        k1.items.append(i1)
        assert i1 in sess
        assert k1 not in sess
        
    
class O2MCascadeNoOrphanTest(_fixtures.FixtureTest):
    run_inserts = None

    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        mapper(User, users, properties = dict(
            orders = relationship(
                mapper(Order, orders), cascade="all")
        ))

    @testing.resolve_artifact_names
    def test_cascade_delete_noorphans(self):
        sess = create_session()
        u = User(name='jack',
                 orders=[Order(description='someorder'),
                         Order(description='someotherorder')])
        sess.add(u)
        sess.flush()
        assert users.count().scalar() == 1
        assert orders.count().scalar() == 2

        del u.orders[0]
        sess.delete(u)
        sess.flush()
        assert users.count().scalar() == 0
        assert orders.count().scalar() == 1


class M2OCascadeTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table("extra", metadata,
            Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
            Column("prefs_id", Integer, ForeignKey("prefs.id")))

        Table('prefs', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(40)))

        Table('users', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('name', String(40)),
            Column('pref_id', Integer, ForeignKey('prefs.id')),
            Column('foo_id', Integer, ForeignKey('foo.id'))
            )

        Table('foo', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(40))
        )
    @classmethod
    def setup_classes(cls):
        class User(_fixtures.Base):
            pass
        class Pref(_fixtures.Base):
            pass
        class Extra(_fixtures.Base):
            pass
        class Foo(_fixtures.Base):
            pass
            
    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        mapper(Extra, extra)
        mapper(Pref, prefs, properties=dict(
            extra = relationship(Extra, cascade="all, delete")
        ))
        mapper(User, users, properties = dict(
            pref = relationship(Pref, lazy='joined', cascade="all, delete-orphan", single_parent=True  ),
            foo = relationship(Foo) # straight m2o
        ))
        mapper(Foo, foo)

    @classmethod
    @testing.resolve_artifact_names
    def insert_data(cls):
        u1 = User(name='ed', pref=Pref(data="pref 1", extra=[Extra()]))
        u2 = User(name='jack', pref=Pref(data="pref 2", extra=[Extra()]))
        u3 = User(name="foo", pref=Pref(data="pref 3", extra=[Extra()]))
        sess = create_session()
        sess.add_all((u1, u2, u3))
        sess.flush()
        sess.close()

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_orphan(self):
        sess = create_session()
        assert prefs.count().scalar() == 3
        assert extra.count().scalar() == 3
        jack = sess.query(User).filter_by(name="jack").one()
        jack.pref = None
        sess.flush()
        assert prefs.count().scalar() == 2
        assert extra.count().scalar() == 2

    @testing.resolve_artifact_names
    def test_cascade_on_deleted(self):
        """test a bug introduced by r6711"""

        sess = sessionmaker(expire_on_commit=True)()
        
        
        u1 = User(name='jack', foo=Foo(data='f1'))
        sess.add(u1)
        sess.commit()

        u1.foo = None

        # the error condition relies upon
        # these things being true
        assert User.foo.impl.active_history is False
        eq_(
            attributes.get_history(u1, 'foo'),
            ([None], (), [attributes.PASSIVE_NO_RESULT])
        )
        
        sess.add(u1)
        assert u1 in sess
        sess.commit()

    @testing.resolve_artifact_names
    def test_save_update_sends_pending(self):
        """test that newly added and deleted scalar items are cascaded on save-update"""

        sess = sessionmaker(expire_on_commit=False)()
        p1, p2 = Pref(data='p1'), Pref(data='p2')
        
        
        u = User(name='jack', pref=p1)
        sess.add(u)
        sess.commit()
        sess.close()

        u.pref = p2
        
        sess.add(u)
        assert p1 in sess
        assert p2 in sess
        sess.commit()

    @testing.fails_on('maxdb', 'FIXME: unknown')
    @testing.resolve_artifact_names
    def test_orphan_on_update(self):
        sess = create_session()
        jack = sess.query(User).filter_by(name="jack").one()
        p = jack.pref
        e = jack.pref.extra[0]
        sess.expunge_all()

        jack.pref = None
        sess.add(jack)
        sess.add(p)
        sess.add(e)
        assert p in sess
        assert e in sess
        sess.flush()
        assert prefs.count().scalar() == 2
        assert extra.count().scalar() == 2

    @testing.resolve_artifact_names
    def test_pending_expunge(self):
        sess = create_session()
        someuser = User(name='someuser')
        sess.add(someuser)
        sess.flush()
        someuser.pref = p1 = Pref(data='somepref')
        assert p1 in sess
        someuser.pref = Pref(data='someotherpref')
        assert p1 not in sess
        sess.flush()
        eq_(sess.query(Pref).with_parent(someuser).all(),
            [Pref(data="someotherpref")])

    @testing.resolve_artifact_names
    def test_double_assignment(self):
        """Double assignment will not accidentally reset the 'parent' flag."""

        sess = create_session()
        jack = sess.query(User).filter_by(name="jack").one()

        newpref = Pref(data="newpref")
        jack.pref = newpref
        jack.pref = newpref
        sess.flush()
        eq_(sess.query(Pref).order_by(Pref.id).all(),
            [Pref(data="pref 1"), Pref(data="pref 3"), Pref(data="newpref")])

class M2OCascadeDeleteTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('t1', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(50)),
              Column('t2id', Integer, ForeignKey('t2.id')))
        Table('t2', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(50)),
              Column('t3id', Integer, ForeignKey('t3.id')))
        Table('t3', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(50)))

    @classmethod
    def setup_classes(cls):
        class T1(_fixtures.Base):
            pass
        class T2(_fixtures.Base):
            pass
        class T3(_fixtures.Base):
            pass

    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        mapper(T1, t1, properties={'t2': relationship(T2, cascade="all")})
        mapper(T2, t2, properties={'t3': relationship(T3, cascade="all")})
        mapper(T3, t3)

    @testing.resolve_artifact_names
    def test_cascade_delete(self):
        sess = create_session()
        x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a')))
        sess.add(x)
        sess.flush()

        sess.delete(x)
        sess.flush()
        eq_(sess.query(T1).all(), [])
        eq_(sess.query(T2).all(), [])
        eq_(sess.query(T3).all(), [])

    @testing.resolve_artifact_names
    def test_cascade_delete_postappend_onelevel(self):
        sess = create_session()
        x1 = T1(data='t1', )
        x2 = T2(data='t2')
        x3 = T3(data='t3')
        sess.add_all((x1, x2, x3))
        sess.flush()

        sess.delete(x1)
        x1.t2 = x2
        x2.t3 = x3
        sess.flush()
        eq_(sess.query(T1).all(), [])
        eq_(sess.query(T2).all(), [])
        eq_(sess.query(T3).all(), [])

    @testing.resolve_artifact_names
    def test_cascade_delete_postappend_twolevel(self):
        sess = create_session()
        x1 = T1(data='t1', t2=T2(data='t2'))
        x3 = T3(data='t3')
        sess.add_all((x1, x3))
        sess.flush()

        sess.delete(x1)
        x1.t2.t3 = x3
        sess.flush()
        eq_(sess.query(T1).all(), [])
        eq_(sess.query(T2).all(), [])
        eq_(sess.query(T3).all(), [])

    @testing.resolve_artifact_names
    def test_preserves_orphans_onelevel(self):
        sess = create_session()
        x2 = T1(data='t1b', t2=T2(data='t2b', t3=T3(data='t3b')))
        sess.add(x2)
        sess.flush()
        x2.t2 = None

        sess.delete(x2)
        sess.flush()
        eq_(sess.query(T1).all(), [])
        eq_(sess.query(T2).all(), [T2()])
        eq_(sess.query(T3).all(), [T3()])

    @testing.future
    @testing.resolve_artifact_names
    def test_preserves_orphans_onelevel_postremove(self):
        sess = create_session()
        x2 = T1(data='t1b', t2=T2(data='t2b', t3=T3(data='t3b')))
        sess.add(x2)
        sess.flush()

        sess.delete(x2)
        x2.t2 = None
        sess.flush()
        eq_(sess.query(T1).all(), [])
        eq_(sess.query(T2).all(), [T2()])
        eq_(sess.query(T3).all(), [T3()])

    @testing.resolve_artifact_names
    def test_preserves_orphans_twolevel(self):
        sess = create_session()
        x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a')))
        sess.add(x)
        sess.flush()

        x.t2.t3 = None
        sess.delete(x)
        sess.flush()
        eq_(sess.query(T1).all(), [])
        eq_(sess.query(T2).all(), [])
        eq_(sess.query(T3).all(), [T3()])


class M2OCascadeDeleteOrphanTest(_base.MappedTest):

    @classmethod
    def define_tables(cls, metadata):
        Table('t1', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(50)),
              Column('t2id', Integer, ForeignKey('t2.id')))
        Table('t2', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(50)),
              Column('t3id', Integer, ForeignKey('t3.id')))
        Table('t3', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(50)))

    @classmethod
    def setup_classes(cls):
        class T1(_fixtures.Base):
            pass
        class T2(_fixtures.Base):
            pass
        class T3(_fixtures.Base):
            pass

    @classmethod
    @testing.resolve_artifact_names
    def setup_mappers(cls):
        mapper(T1, t1, properties=dict(
            t2=relationship(T2, cascade="all, delete-orphan", single_parent=True)))
        mapper(T2, t2, properties=dict(
            t3=relationship(T3, cascade="all, delete-orphan", single_parent=True, backref=backref('t2', uselist=False))))
        mapper(T3, t3)

    @testing.resolve_artifact_names
    def test_cascade_delete(self):
        sess = create_session()
        x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a')))
        sess.add(x)
        sess.flush()

        sess.delete(x)
        sess.flush()
        eq_(sess.query(T1).all(), [])
        eq_(sess.query(T2).all(), [])
        eq_(sess.query(T3).all(), [])

    @testing.resolve_artifact_names
    def test_deletes_orphans_onelevel(self):
        sess = create_session()
        x2 = T1(data='t1b', t2=T2(data='t2b', t3=T3(data='t3b')))
        sess.add(x2)
        sess.flush()
        x2.t2 = None

        sess.delete(x2)
        sess.flush()
        eq_(sess.query(T1).all(), [])
        eq_(sess.query(T2).all(), [])
        eq_(sess.query(T3).all(), [])

    @testing.resolve_artifact_names
    def test_deletes_orphans_twolevel(self):
        sess = create_session()
        x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a')))
        sess.add(x)
        sess.flush()

        x.t2.t3 = None
        sess.delete(x)
        sess.flush()
        eq_(sess.query(T1).all(), [])
        eq_(sess.query(T2).all(), [])
        eq_(sess.query(T3).all(), [])

    @testing.resolve_artifact_names
    def test_finds_orphans_twolevel(self):
        sess = create_session()
        x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a')))
        sess.add(x)
        sess.flush()

        x.t2.t3 = None
        sess.flush()
        eq_(sess.query(T1).all(), [T1()])
        eq_(sess.query(T2).all(), [T2()])
        eq_(sess.query(T3).all(), [])

    @testing.resolve_artifact_names
    def test_single_parent_raise(self):

        sess = create_session()
        
        y = T2(data='T2a')
        x = T1(data='T1a', t2=y)
        assert_raises(sa_exc.InvalidRequestError, T1, data='T1b', t2=y)

    @testing.resolve_artifact_names
    def test_single_parent_backref(self):

        sess = create_session()
        
        y = T3(data='T3a')
        x = T2(data='T2a', t3=y)

        # cant attach the T3 to another T2
        assert_raises(sa_exc.InvalidRequestError, T2, data='T2b', t3=y)
        
        # set via backref tho is OK, unsets from previous parent
        # first
        z = T2(data='T2b')
        y.t2 = z

        assert z.t3 is y
        assert x.t3 is None

class M2MCascadeTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('a', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(30)),
            test_needs_fk=True
            )
        Table('b', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(30)),
            test_needs_fk=True
            
            )
        Table('atob', metadata,
            Column('aid', Integer, ForeignKey('a.id')),
            Column('bid', Integer, ForeignKey('b.id')),
            test_needs_fk=True
            
            )
        Table('c', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(30)),
              Column('bid', Integer, ForeignKey('b.id')),
              test_needs_fk=True
              
              )

    @classmethod
    def setup_classes(cls):
        class A(_fixtures.Base):
            pass
        class B(_fixtures.Base):
            pass
        class C(_fixtures.Base):
            pass

    @testing.resolve_artifact_names
    def test_delete_orphan(self):
        mapper(A, a, properties={
            # if no backref here, delete-orphan failed until [ticket:427] was
            # fixed
            'bs': relationship(B, secondary=atob, cascade="all, delete-orphan", single_parent=True)
        })
        mapper(B, b)

        sess = create_session()
        b1 = B(data='b1')
        a1 = A(data='a1', bs=[b1])
        sess.add(a1)
        sess.flush()

        a1.bs.remove(b1)
        sess.flush()
        assert atob.count().scalar() ==0
        assert b.count().scalar() == 0
        assert a.count().scalar() == 1

    @testing.resolve_artifact_names
    def test_delete_orphan_dynamic(self):
        mapper(A, a, properties={
            # if no backref here, delete-orphan failed until [ticket:427] was
            # fixed
            'bs': relationship(B, secondary=atob, 
                    cascade="all, delete-orphan", single_parent=True,lazy="dynamic")
        })
        mapper(B, b)

        sess = create_session()
        b1 = B(data='b1')
        a1 = A(data='a1', bs=[b1])
        sess.add(a1)
        sess.flush()

        a1.bs.remove(b1)
        sess.flush()
        assert atob.count().scalar() ==0
        assert b.count().scalar() == 0
        assert a.count().scalar() == 1

    @testing.resolve_artifact_names
    def test_delete_orphan_cascades(self):
        mapper(A, a, properties={
            # if no backref here, delete-orphan failed until [ticket:427] was
            # fixed
            'bs':relationship(B, secondary=atob, cascade="all, delete-orphan", single_parent=True)
        })
        mapper(B, b, properties={'cs':relationship(C, cascade="all, delete-orphan")})
        mapper(C, c)

        sess = create_session()
        b1 = B(data='b1', cs=[C(data='c1')])
        a1 = A(data='a1', bs=[b1])
        sess.add(a1)
        sess.flush()

        a1.bs.remove(b1)
        sess.flush()
        assert atob.count().scalar() ==0
        assert b.count().scalar() == 0
        assert a.count().scalar() == 1
        assert c.count().scalar() == 0

    @testing.resolve_artifact_names
    def test_cascade_delete(self):
        mapper(A, a, properties={
            'bs':relationship(B, secondary=atob, cascade="all, delete-orphan", single_parent=True)
        })
        mapper(B, b)

        sess = create_session()
        a1 = A(data='a1', bs=[B(data='b1')])
        sess.add(a1)
        sess.flush()

        sess.delete(a1)
        sess.flush()
        assert atob.count().scalar() ==0
        assert b.count().scalar() == 0
        assert a.count().scalar() == 0

    @testing.resolve_artifact_names
    def test_single_parent_raise(self):
        mapper(A, a, properties={
            'bs':relationship(B, secondary=atob, cascade="all, delete-orphan", single_parent=True)
        })
        mapper(B, b)

        sess = create_session()
        b1 =B(data='b1')
        a1 = A(data='a1', bs=[b1])
        
        assert_raises(sa_exc.InvalidRequestError,
                A, data='a2', bs=[b1]
            )

    @testing.resolve_artifact_names
    def test_single_parent_backref(self):
        """test that setting m2m via a uselist=False backref bypasses the single_parent raise"""
        
        mapper(A, a, properties={
            'bs':relationship(B, 
                secondary=atob, 
                cascade="all, delete-orphan", single_parent=True,
                backref=backref('a', uselist=False))
        })
        mapper(B, b)

        sess = create_session()
        b1 =B(data='b1')
        a1 = A(data='a1', bs=[b1])
        
        assert_raises(
            sa_exc.InvalidRequestError,
            A, data='a2', bs=[b1]
        )
        
        a2 = A(data='a2')
        b1.a = a2
        assert b1 not in a1.bs
        assert b1 in a2.bs

class UnsavedOrphansTest(_base.MappedTest):
    """Pending entities that are orphans"""

    @classmethod
    def define_tables(cls, metadata):
        Table('users', metadata,
            Column('user_id', Integer,primary_key=True, test_needs_autoincrement=True),
            Column('name', String(40)))

        Table('addresses', metadata,
            Column('address_id', Integer,primary_key=True, test_needs_autoincrement=True),
            Column('user_id', Integer, ForeignKey('users.user_id')),
            Column('email_address', String(40)))

    @classmethod
    def setup_classes(cls):
        class User(_fixtures.Base):
            pass
        class Address(_fixtures.Base):
            pass

    @testing.resolve_artifact_names
    def test_pending_standalone_orphan(self):
        """An entity that never had a parent on a delete-orphan cascade can't be saved."""

        mapper(Address, addresses)
        mapper(User, users, properties=dict(
            addresses=relationship(Address, cascade="all,delete-orphan", backref="user")
        ))
        s = create_session()
        a = Address()
        s.add(a)
        try:
            s.flush()
        except orm_exc.FlushError, e:
            pass
        assert a.address_id is None, "Error: address should not be persistent"

    @testing.resolve_artifact_names
    def test_pending_collection_expunge(self):
        """Removing a pending item from a collection expunges it from the session."""

        mapper(Address, addresses)
        mapper(User, users, properties=dict(
            addresses=relationship(Address, cascade="all,delete-orphan", backref="user")
        ))
        s = create_session()

        u = User()
        s.add(u)
        s.flush()
        a = Address()

        u.addresses.append(a)
        assert a in s

        u.addresses.remove(a)
        assert a not in s

        s.delete(u)
        s.flush()

        assert a.address_id is None, "Error: address should not be persistent"

    @testing.resolve_artifact_names
    def test_nonorphans_ok(self):
        mapper(Address, addresses)
        mapper(User, users, properties=dict(
            addresses=relationship(Address, cascade="all,delete", backref="user")
        ))
        s = create_session()
        u = User(name='u1', addresses=[Address(email_address='ad1')])
        s.add(u)
        a1 = u.addresses[0]
        u.addresses.remove(a1)
        assert a1 in s
        s.flush()
        s.expunge_all()
        eq_(s.query(Address).all(), [Address(email_address='ad1')])


class UnsavedOrphansTest2(_base.MappedTest):
    """same test as UnsavedOrphans only three levels deep"""

    @classmethod
    def define_tables(cls, meta):
        Table('orders', meta,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('name', String(50)))

        Table('items', meta,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('order_id', Integer, ForeignKey('orders.id'),
                   nullable=False),
            Column('name', String(50)))

        Table('attributes', meta,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('item_id', Integer, ForeignKey('items.id'),
                   nullable=False),
            Column('name', String(50)))

    @testing.resolve_artifact_names
    def test_pending_expunge(self):
        class Order(_fixtures.Base):
            pass
        class Item(_fixtures.Base):
            pass
        class Attribute(_fixtures.Base):
            pass

        mapper(Attribute, attributes)
        mapper(Item, items, properties=dict(
            attributes=relationship(Attribute, cascade="all,delete-orphan", backref="item")
        ))
        mapper(Order, orders, properties=dict(
            items=relationship(Item, cascade="all,delete-orphan", backref="order")
        ))

        s = create_session()
        order = Order(name="order1")
        s.add(order)

        attr = Attribute(name="attr1")
        item = Item(name="item1", attributes=[attr])

        order.items.append(item)
        order.items.remove(item)

        assert item not in s
        assert attr not in s

        s.flush()
        assert orders.count().scalar() == 1
        assert items.count().scalar() == 0
        assert attributes.count().scalar() == 0

class UnsavedOrphansTest3(_base.MappedTest):
    """test not expunging double parents"""

    @classmethod
    def define_tables(cls, meta):
        Table('sales_reps', meta,
            Column('sales_rep_id', Integer,primary_key=True, test_needs_autoincrement=True),
            Column('name', String(50)))
        Table('accounts', meta,
            Column('account_id', Integer,primary_key=True, test_needs_autoincrement=True),
            Column('balance', Integer))
        Table('customers', meta,
            Column('customer_id', Integer,primary_key=True, test_needs_autoincrement=True),
            Column('name', String(50)),
            Column('sales_rep_id', Integer,
                   ForeignKey('sales_reps.sales_rep_id')),
            Column('account_id', Integer,
                   ForeignKey('accounts.account_id')))

    @testing.resolve_artifact_names
    def test_double_parent_expunge_o2m(self):
        """test the delete-orphan uow event for multiple delete-orphan parent relationships."""
        
        class Customer(_fixtures.Base):
            pass
        class Account(_fixtures.Base):
            pass
        class SalesRep(_fixtures.Base):
            pass

        mapper(Customer, customers)
        mapper(Account, accounts, properties=dict(
            customers=relationship(Customer,
                               cascade="all,delete-orphan",
                               backref="account")))
        mapper(SalesRep, sales_reps, properties=dict(
            customers=relationship(Customer,
                               cascade="all,delete-orphan",
                               backref="sales_rep")))
        s = create_session()

        a = Account(balance=0)
        sr = SalesRep(name="John")
        s.add_all((a, sr))
        s.flush()

        c = Customer(name="Jane")

        a.customers.append(c)
        sr.customers.append(c)
        assert c in s

        a.customers.remove(c)
        assert c in s, "Should not expunge customer yet, still has one parent"

        sr.customers.remove(c)
        assert c not in s, "Should expunge customer when both parents are gone"

    @testing.resolve_artifact_names
    def test_double_parent_expunge_o2o(self):
        """test the delete-orphan uow event for multiple delete-orphan parent relationships."""

        class Customer(_fixtures.Base):
            pass
        class Account(_fixtures.Base):
            pass
        class SalesRep(_fixtures.Base):
            pass

        mapper(Customer, customers)
        mapper(Account, accounts, properties=dict(
            customer=relationship(Customer,
                               cascade="all,delete-orphan",
                               backref="account", uselist=False)))
        mapper(SalesRep, sales_reps, properties=dict(
            customer=relationship(Customer,
                               cascade="all,delete-orphan",
                               backref="sales_rep", uselist=False)))
        s = create_session()

        a = Account(balance=0)
        sr = SalesRep(name="John")
        s.add_all((a, sr))
        s.flush()

        c = Customer(name="Jane")

        a.customer = c
        sr.customer = c
        assert c in s

        a.customer = None
        assert c in s, "Should not expunge customer yet, still has one parent"

        sr.customer = None
        assert c not in s, "Should expunge customer when both parents are gone"

        
class DoubleParentOrphanTest(_base.MappedTest):
    """test orphan detection for an entity with two parent relationships"""

    @classmethod
    def define_tables(cls, metadata):
        Table('addresses', metadata,
            Column('address_id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('street', String(30)),
        )

        Table('homes', metadata,
            Column('home_id', Integer, primary_key=True, key="id", test_needs_autoincrement=True),
            Column('description', String(30)),
            Column('address_id', Integer, ForeignKey('addresses.address_id'),
                   nullable=False),
        )

        Table('businesses', metadata,
            Column('business_id', Integer, primary_key=True, key="id", test_needs_autoincrement=True),
            Column('description', String(30), key="description"),
            Column('address_id', Integer, ForeignKey('addresses.address_id'),
                   nullable=False),
        )

    @testing.resolve_artifact_names
    def test_non_orphan(self):
        """test that an entity can have two parent delete-orphan cascades, and persists normally."""

        class Address(_fixtures.Base):
            pass
        class Home(_fixtures.Base):
            pass
        class Business(_fixtures.Base):
            pass

        mapper(Address, addresses)
        mapper(Home, homes, properties={'address':relationship(Address, cascade="all,delete-orphan", single_parent=True)})
        mapper(Business, businesses, properties={'address':relationship(Address, cascade="all,delete-orphan", single_parent=True)})

        session = create_session()
        h1 = Home(description='home1', address=Address(street='address1'))
        b1 = Business(description='business1', address=Address(street='address2'))
        session.add_all((h1,b1))
        session.flush()
        session.expunge_all()

        eq_(session.query(Home).get(h1.id), Home(description='home1', address=Address(street='address1')))
        eq_(session.query(Business).get(b1.id), Business(description='business1', address=Address(street='address2')))

    @testing.resolve_artifact_names
    def test_orphan(self):
        """test that an entity can have two parent delete-orphan cascades, and is detected as an orphan
        when saved without a parent."""

        class Address(_fixtures.Base):
            pass
        class Home(_fixtures.Base):
            pass
        class Business(_fixtures.Base):
            pass

        mapper(Address, addresses)
        mapper(Home, homes, properties={'address':relationship(Address, cascade="all,delete-orphan", single_parent=True)})
        mapper(Business, businesses, properties={'address':relationship(Address, cascade="all,delete-orphan", single_parent=True)})

        session = create_session()
        a1 = Address()
        session.add(a1)
        try:
            session.flush()
            assert False
        except orm_exc.FlushError, e:
            assert True

class CollectionAssignmentOrphanTest(_base.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('table_a', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('name', String(30)))
        Table('table_b', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('name', String(30)),
              Column('a_id', Integer, ForeignKey('table_a.id')))

    @testing.resolve_artifact_names
    def test_basic(self):
        class A(_fixtures.Base):
            pass
        class B(_fixtures.Base):
            pass

        mapper(A, table_a, properties={
            'bs':relationship(B, cascade="all, delete-orphan")
            })
        mapper(B, table_b)

        a1 = A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')])

        sess = create_session()
        sess.add(a1)
        sess.flush()

        sess.expunge_all()

        eq_(sess.query(A).get(a1.id),
            A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')]))

        a1 = sess.query(A).get(a1.id)
        assert not class_mapper(B)._is_orphan(
            attributes.instance_state(a1.bs[0]))
        a1.bs[0].foo='b2modified'
        a1.bs[1].foo='b3modified'
        sess.flush()

        sess.expunge_all()
        eq_(sess.query(A).get(a1.id),
            A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')]))

class O2MConflictTest(_base.MappedTest):
    """test that O2M dependency detects a change in parent, does the
    right thing, and even updates the collection/attribute.
    
    """
    
    @classmethod
    def define_tables(cls, metadata):
        Table("parent", metadata,
            Column("id", Integer, primary_key=True, test_needs_autoincrement=True)
        )
        Table("child", metadata,
            Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
            Column('parent_id', Integer, ForeignKey('parent.id'), nullable=False)
        )
    
    @classmethod
    def setup_classes(cls):
        class Parent(_base.ComparableEntity):
            pass
        class Child(_base.ComparableEntity):
            pass
    
    @testing.resolve_artifact_names
    def _do_delete_old_test(self):
        sess = create_session()
        
        p1, p2, c1 = Parent(), Parent(), Child()
        if Parent.child.property.uselist:
            p1.child.append(c1)
        else:
            p1.child = c1
        sess.add_all([p1, c1])
        sess.flush()
        
        sess.delete(p1)
        
        if Parent.child.property.uselist:
            p2.child.append(c1)
        else:
            p2.child = c1
        sess.add(p2)

        sess.flush()
        eq_(sess.query(Child).filter(Child.parent_id==p2.id).all(), [c1])

    @testing.resolve_artifact_names
    def _do_move_test(self):
        sess = create_session()

        p1, p2, c1 = Parent(), Parent(), Child()
        if Parent.child.property.uselist:
            p1.child.append(c1)
        else:
            p1.child = c1
        sess.add_all([p1, c1])
        sess.flush()

        if Parent.child.property.uselist:
            p2.child.append(c1)
        else:
            p2.child = c1
        sess.add(p2)

        sess.flush()
        eq_(sess.query(Child).filter(Child.parent_id==p2.id).all(), [c1])
        
    @testing.resolve_artifact_names
    def test_o2o_delete_old(self):
        mapper(Parent, parent, properties={
            'child':relationship(Child, uselist=False)
        })
        mapper(Child, child)
        self._do_delete_old_test()
        self._do_move_test()

    @testing.resolve_artifact_names
    def test_o2m_delete_old(self):
        mapper(Parent, parent, properties={
            'child':relationship(Child, uselist=True)
        })
        mapper(Child, child)
        self._do_delete_old_test()
        self._do_move_test()

    @testing.resolve_artifact_names
    def test_o2o_backref_delete_old(self):
        mapper(Parent, parent, properties={
            'child':relationship(Child, uselist=False, backref='parent')
        })
        mapper(Child, child)
        self._do_delete_old_test()
        self._do_move_test()
        
    @testing.resolve_artifact_names
    def test_o2o_delcascade_delete_old(self):
        mapper(Parent, parent, properties={
            'child':relationship(Child, uselist=False, cascade="all, delete")
        })
        mapper(Child, child)
        self._do_delete_old_test()
        self._do_move_test()

    @testing.resolve_artifact_names
    def test_o2o_delorphan_delete_old(self):
        mapper(Parent, parent, properties={
            'child':relationship(Child, uselist=False, cascade="all, delete, delete-orphan")
        })
        mapper(Child, child)
        self._do_delete_old_test()
        self._do_move_test()

    @testing.resolve_artifact_names
    def test_o2o_delorphan_backref_delete_old(self):
        mapper(Parent, parent, properties={
            'child':relationship(Child, uselist=False, 
                                        cascade="all, delete, delete-orphan", 
                                        backref='parent')
        })
        mapper(Child, child)
        self._do_delete_old_test()
        self._do_move_test()

    @testing.resolve_artifact_names
    def test_o2o_backref_delorphan_delete_old(self):
        mapper(Parent, parent)
        mapper(Child, child, properties = {
            'parent' : relationship(Parent, uselist=False, single_parent=True, 
                                backref=backref('child', uselist=False), 
                                cascade="all,delete,delete-orphan")
        })
        self._do_delete_old_test()
        self._do_move_test()

    @testing.resolve_artifact_names
    def test_o2m_backref_delorphan_delete_old(self):
        mapper(Parent, parent)
        mapper(Child, child, properties = {
            'parent' : relationship(Parent, uselist=False, single_parent=True, 
                                backref=backref('child', uselist=True), 
                                cascade="all,delete,delete-orphan")
        })
        self._do_delete_old_test()
        self._do_move_test()
        

class PartialFlushTest(_base.MappedTest):
    """test cascade behavior as it relates to object lists passed to flush().
    
    """
    @classmethod
    def define_tables(cls, metadata):
        Table("base", metadata,
            Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
            Column("descr", String(50))
        )

        Table("noninh_child", metadata, 
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('base_id', Integer, ForeignKey('base.id'))
        )

        Table("parent", metadata,
            Column("id", Integer, ForeignKey("base.id"), primary_key=True)
        )
        Table("inh_child", metadata,
            Column("id", Integer, ForeignKey("base.id"), primary_key=True),
            Column("parent_id", Integer, ForeignKey("parent.id"))
        )

    @testing.uses_deprecated()
    @testing.resolve_artifact_names
    def test_o2m_m2o(self):
        class Base(_base.ComparableEntity):
            pass
        class Child(_base.ComparableEntity):
            pass

        mapper(Base, base, properties={
            'children':relationship(Child, backref='parent')
        })
        mapper(Child, noninh_child)

        sess = create_session()

        c1, c2 = Child(), Child()
        b1 = Base(descr='b1', children=[c1, c2])
        sess.add(b1)

        assert c1 in sess.new
        assert c2 in sess.new
        sess.flush([b1])

        # c1, c2 get cascaded into the session on o2m.
        # not sure if this is how I like this 
        # to work but that's how it works for now.
        assert c1 in sess and c1 not in sess.new
        assert c2 in sess and c2 not in sess.new
        assert b1 in sess and b1 not in sess.new

        sess = create_session()
        c1, c2 = Child(), Child()
        b1 = Base(descr='b1', children=[c1, c2])
        sess.add(b1)
        sess.flush([c1])
        # m2o, otoh, doesn't cascade up the other way.
        assert c1 in sess and c1 not in sess.new
        assert c2 in sess and c2 in sess.new
        assert b1 in sess and b1 in sess.new

        sess = create_session()
        c1, c2 = Child(), Child()
        b1 = Base(descr='b1', children=[c1, c2])
        sess.add(b1)
        sess.flush([c1, c2])
        # m2o, otoh, doesn't cascade up the other way.
        assert c1 in sess and c1 not in sess.new
        assert c2 in sess and c2 not in sess.new
        assert b1 in sess and b1 in sess.new

    @testing.uses_deprecated()
    @testing.resolve_artifact_names
    def test_circular_sort(self):
        """test ticket 1306"""
        
        class Base(_base.ComparableEntity):
            pass
        class Parent(Base):
            pass
        class Child(Base):
            pass

        mapper(Base,base)

        mapper(Child, inh_child,
            inherits=Base,
            properties={'parent': relationship(
                Parent,
                backref='children', 
                primaryjoin=inh_child.c.parent_id == parent.c.id
            )}
        )


        mapper(Parent,parent, inherits=Base)

        sess = create_session()
        p1 = Parent()

        c1, c2, c3 = Child(), Child(), Child()
        p1.children = [c1, c2, c3]
        sess.add(p1)
        
        sess.flush([c1])
        assert p1 in sess.new
        assert c1 not in sess.new
        assert c2 in sess.new
        
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.