test_postgresql.py :  » Database » SQLAlchemy » SQLAlchemy-0.6.0 » test » dialect » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » SQLAlchemy 
SQLAlchemy » SQLAlchemy 0.6.0 » test » dialect » test_postgresql.py
# coding: utf-8
from sqlalchemy.test.testing import eq_,assert_raises,assert_raises_message
from sqlalchemy.test import engines
import datetime
import decimal
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy import exc,schema,types
from sqlalchemy.dialects.postgresql import base
from sqlalchemy.engine.strategies import MockEngineStrategy
from sqlalchemy.test import *
from sqlalchemy.test.util import round_decimal
from sqlalchemy.sql import table,column
from sqlalchemy.test.testing import eq_
from test.engine._base import TablesTest
import logging

class SequenceTest(TestBase, AssertsCompiledSQL):
    def test_basic(self):
        seq = Sequence("my_seq_no_schema")
        dialect = postgresql.PGDialect()
        assert dialect.identifier_preparer.format_sequence(seq) == "my_seq_no_schema"

        seq = Sequence("my_seq", schema="some_schema")
        assert dialect.identifier_preparer.format_sequence(seq) == "some_schema.my_seq"

        seq = Sequence("My_Seq", schema="Some_Schema")
        assert dialect.identifier_preparer.format_sequence(seq) == '"Some_Schema"."My_Seq"'

class CompileTest(TestBase, AssertsCompiledSQL):
    __dialect__ = postgresql.dialect()

    def test_update_returning(self):
        dialect = postgresql.dialect()
        table1 = table('mytable',
            column('myid', Integer),
            column('name', String(128)),
            column('description', String(128)),
        )

        u = update(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name)
        self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect)

        u = update(table1, values=dict(name='foo')).returning(table1)
        self.assert_compile(u, "UPDATE mytable SET name=%(name)s "\
            "RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect)

        u = update(table1, values=dict(name='foo')).returning(func.length(table1.c.name))
        self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING length(mytable.name) AS length_1", dialect=dialect)

        
    def test_insert_returning(self):
        dialect = postgresql.dialect()
        table1 = table('mytable',
            column('myid', Integer),
            column('name', String(128)),
            column('description', String(128)),
        )

        i = insert(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name)
        self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING mytable.myid, mytable.name", dialect=dialect)

        i = insert(table1, values=dict(name='foo')).returning(table1)
        self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) "\
            "RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect)

        i = insert(table1, values=dict(name='foo')).returning(func.length(table1.c.name))
        self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING length(mytable.name) AS length_1", dialect=dialect)
    
    @testing.uses_deprecated(r".*argument is deprecated.  Please use statement.returning.*")
    def test_old_returning_names(self):
        dialect = postgresql.dialect()
        table1 = table('mytable',
            column('myid', Integer),
            column('name', String(128)),
            column('description', String(128)),
        )

        u = update(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
        self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect)

        u = update(table1, values=dict(name='foo'), postgresql_returning=[table1.c.myid, table1.c.name])
        self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect)

        i = insert(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
        self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING mytable.myid, mytable.name", dialect=dialect)
        
    def test_create_partial_index(self):
        m = MetaData()
        tbl = Table('testtbl', m, Column('data',Integer))
        idx = Index('test_idx1', tbl.c.data, postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10))
        idx = Index('test_idx1', tbl.c.data, postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10))
        
        # test quoting and all that
        idx2 = Index('test_idx2', tbl.c.data, postgresql_where=and_(tbl.c.data > 'a', tbl.c.data < "b's"))

        self.assert_compile(schema.CreateIndex(idx), 
            "CREATE INDEX test_idx1 ON testtbl (data) WHERE data > 5 AND data < 10", dialect=postgresql.dialect())

        self.assert_compile(schema.CreateIndex(idx2), 
            "CREATE INDEX test_idx2 ON testtbl (data) WHERE data > 'a' AND data < 'b''s'", dialect=postgresql.dialect())
            
    @testing.uses_deprecated(r".*'postgres_where' argument has been renamed.*")
    def test_old_create_partial_index(self):
        tbl = Table('testtbl', MetaData(), Column('data',Integer))
        idx = Index('test_idx1', tbl.c.data, postgres_where=and_(tbl.c.data > 5, tbl.c.data < 10))

        self.assert_compile(schema.CreateIndex(idx), 
            "CREATE INDEX test_idx1 ON testtbl (data) WHERE data > 5 AND data < 10", dialect=postgresql.dialect())
    
    def test_extract(self):
        t = table('t', column('col1', DateTime), column('col2', Date), column('col3', Time),
                    column('col4', postgresql.INTERVAL)
        )

        for field in 'year', 'month', 'day', 'epoch', 'hour':
            for expr, compiled_expr in [

                ( t.c.col1, "t.col1 :: timestamp" ),
                ( t.c.col2, "t.col2 :: date" ),
                ( t.c.col3, "t.col3 :: time" ),
                (func.current_timestamp() - datetime.timedelta(days=5),
                    "(CURRENT_TIMESTAMP - %(current_timestamp_1)s) :: timestamp"
                ),
                (func.current_timestamp() + func.current_timestamp(), 
                    "CURRENT_TIMESTAMP + CURRENT_TIMESTAMP" # invalid, no cast.
                ),
                (text("foo.date + foo.time"), 
                    "foo.date + foo.time" # plain text.  no cast.
                ),
                (func.current_timestamp() + datetime.timedelta(days=5), 
                    "(CURRENT_TIMESTAMP + %(current_timestamp_1)s) :: timestamp"
                ),
                (t.c.col2 + t.c.col3,
                    "(t.col2 + t.col3) :: timestamp"
                ),
                # addition is commutative
                (t.c.col2 + datetime.timedelta(days=5),
                    "(t.col2 + %(col2_1)s) :: timestamp"
                ),
                (datetime.timedelta(days=5) + t.c.col2,
                    "(%(col2_1)s + t.col2) :: timestamp"
                ),
                (t.c.col1 + t.c.col4,
                    "(t.col1 + t.col4) :: timestamp"
                ),
                # subtraction is not
                (t.c.col1 - datetime.timedelta(seconds=30),
                    "(t.col1 - %(col1_1)s) :: timestamp"
                ),
                (datetime.timedelta(seconds=30) - t.c.col1,
                    "%(col1_1)s - t.col1" # invalid - no cast.
                ),
                (func.coalesce(t.c.col1, func.current_timestamp()),
                    "coalesce(t.col1, CURRENT_TIMESTAMP) :: timestamp"
                ),
                (t.c.col3 + datetime.timedelta(seconds=30),
                    "(t.col3 + %(col3_1)s) :: time"
                ),
                (func.current_timestamp() - func.coalesce(t.c.col1, func.current_timestamp()),
                    "(CURRENT_TIMESTAMP - coalesce(t.col1, CURRENT_TIMESTAMP)) :: interval",
                ),
                (3 * func.foobar(type_=Interval),
                    "(%(foobar_1)s * foobar()) :: interval"
                ),
                (literal(datetime.timedelta(seconds=10)) - literal(datetime.timedelta(seconds=10)),
                    "(%(param_1)s - %(param_2)s) :: interval"
                ),
                (t.c.col3 + "some string", # dont crack up on entirely unsupported types
                    "t.col3 + %(col3_1)s"
                )
            ]:
                self.assert_compile(
                    select([extract(field, expr)]).select_from(t),
                    "SELECT EXTRACT(%s FROM %s) AS anon_1 FROM t" % (
                        field, 
                        compiled_expr
                    )
                )

class FloatCoercionTest(TablesTest, AssertsExecutionResults):
    __only_on__ = 'postgresql'
    __dialect__ = postgresql.dialect()

    @classmethod
    def define_tables(cls, metadata):
        data_table = Table('data_table', metadata,
            Column('id', Integer, primary_key=True),
            Column('data', Integer)
        )

    @classmethod
    @testing.resolve_artifact_names
    def insert_data(cls):
        data_table.insert().execute(
            {'data':3},
            {'data':5},
            {'data':7},
            {'data':2},
            {'data':15},
            {'data':12},
            {'data':6},
            {'data':478},
            {'data':52},
            {'data':9},
        )
    
    @testing.resolve_artifact_names
    def test_float_coercion(self):
        for type_, result in [
            (Numeric, decimal.Decimal('140.381230939')),
            (Float, 140.381230939),
            (Float(asdecimal=True), decimal.Decimal('140.381230939')),
            (Numeric(asdecimal=False), 140.381230939),
        ]:
            ret = testing.db.execute(
                select([
                    func.stddev_pop(data_table.c.data, type_=type_)
                ])
            ).scalar()

            eq_(round_decimal(ret, 9), result)

            ret = testing.db.execute(
                select([
                    cast(func.stddev_pop(data_table.c.data), type_)
                ])
            ).scalar()
            eq_(round_decimal(ret, 9), result)
    
    @testing.provide_metadata
    def test_arrays(self):
        t1 = Table('t', metadata, 
            Column('x', postgresql.ARRAY(Float)),
            Column('y', postgresql.ARRAY(postgresql.REAL)),
            Column('z', postgresql.ARRAY(postgresql.DOUBLE_PRECISION)),
            Column('q', postgresql.ARRAY(Numeric))
        )
        metadata.create_all()
        t1.insert().execute(x=[5], y=[5], z=[6], q=[6.4])
        row = t1.select().execute().first()
        eq_(
            row, 
            ([5], [5], [6], [decimal.Decimal("6.4")])
        )
        
class EnumTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
    __only_on__ = 'postgresql'
    __dialect__ = postgresql.dialect()
    
    def test_compile(self):
        e1 = Enum('x', 'y', 'z', name="somename")
        e2 = Enum('x', 'y', 'z', name="somename", schema='someschema')
        
        self.assert_compile(
            postgresql.CreateEnumType(e1), 
            "CREATE TYPE somename AS ENUM ('x','y','z')"
        )

        self.assert_compile(
            postgresql.CreateEnumType(e2), 
            "CREATE TYPE someschema.somename AS ENUM ('x','y','z')"
        )

        self.assert_compile(
            postgresql.DropEnumType(e1), 
            "DROP TYPE somename"
        )

        self.assert_compile(
            postgresql.DropEnumType(e2), 
            "DROP TYPE someschema.somename"
        )
        
        t1 = Table('sometable', MetaData(), Column('somecolumn', e1))
        self.assert_compile(
            schema.CreateTable(t1),
            "CREATE TABLE sometable ("
            "somecolumn somename"
            ")"
        )
        t1 = Table('sometable', MetaData(), 
                    Column('somecolumn', Enum('x', 'y', 'z', native_enum=False))
                )
        self.assert_compile(
            schema.CreateTable(t1),
            "CREATE TABLE sometable ("
            "somecolumn VARCHAR(1), "
            "CHECK (somecolumn IN ('x', 'y', 'z'))"
            ")"
        )

    
    @testing.fails_on('postgresql+zxjdbc', 
                        'zxjdbc fails on ENUM: column "XXX" is of type XXX '
                        'but expression is of type character varying')
    @testing.fails_on('postgresql+pg8000', 
                        'zxjdbc fails on ENUM: column "XXX" is of type XXX '
                        'but expression is of type text')
    def test_create_table(self):
        metadata = MetaData(testing.db)
        t1 = Table('table', metadata,
            Column('id', Integer, primary_key=True),
            Column('value', Enum('one', 'two', 'three', name='onetwothreetype'))
        )
        t1.create()
        t1.create(checkfirst=True) # check the create
        try:
            t1.insert().execute(value='two')
            t1.insert().execute(value='three')
            t1.insert().execute(value='three')
            eq_(t1.select().order_by(t1.c.id).execute().fetchall(), 
                [(1, 'two'), (2, 'three'), (3, 'three')]
            )
        finally:
            metadata.drop_all()
            metadata.drop_all()
    
    def test_name_required(self):
        metadata = MetaData(testing.db)
        etype = Enum('four', 'five', 'six', metadata=metadata)
        assert_raises(exc.ArgumentError, etype.create)
        assert_raises(exc.ArgumentError, etype.compile, dialect=postgresql.dialect())
    
    @testing.fails_on('postgresql+zxjdbc', 
                        'zxjdbc fails on ENUM: column "XXX" is of type XXX '
                        'but expression is of type character varying')
    @testing.fails_on('postgresql+pg8000', 
                        'zxjdbc fails on ENUM: column "XXX" is of type XXX '
                        'but expression is of type text')
    def test_unicode_labels(self):
        metadata = MetaData(testing.db)
        t1 = Table('table', metadata,
            Column('id', Integer, primary_key=True),
            Column('value', Enum(u'rveill', u'drle', u'Sil', name='onetwothreetype'))
        )
        metadata.create_all()
        try:
            t1.insert().execute(value=u'drle')
            t1.insert().execute(value=u'rveill')
            t1.insert().execute(value=u'Sil')
            eq_(t1.select().order_by(t1.c.id).execute().fetchall(), 
                [(1, u'drle'), (2, u'rveill'), (3, u'Sil')]
            )
            
            m2 = MetaData(testing.db)
            t2 = Table('table', m2, autoload=True)
            assert t2.c.value.type.enums == (u'rveill', u'drle', u'Sil')
            
        finally:
            metadata.drop_all()
        
    def test_standalone_enum(self):
        metadata = MetaData(testing.db)
        etype = Enum('four', 'five', 'six', name='fourfivesixtype', metadata=metadata)
        etype.create()
        try:
            assert testing.db.dialect.has_type(testing.db, 'fourfivesixtype')
        finally:
            etype.drop()
            assert not testing.db.dialect.has_type(testing.db, 'fourfivesixtype')
    
        metadata.create_all()
        try:
            assert testing.db.dialect.has_type(testing.db, 'fourfivesixtype')
        finally:
            metadata.drop_all()
            assert not testing.db.dialect.has_type(testing.db, 'fourfivesixtype')
    
    def test_reflection(self):
        metadata = MetaData(testing.db)
        etype = Enum('four', 'five', 'six', name='fourfivesixtype', metadata=metadata)
        t1 = Table('table', metadata,
            Column('id', Integer, primary_key=True),
            Column('value', Enum('one', 'two', 'three', name='onetwothreetype')),
            Column('value2', etype)
        )
        metadata.create_all()
        try:
            m2 = MetaData(testing.db)
            t2 = Table('table', m2, autoload=True)
            assert t2.c.value.type.enums == ('one', 'two', 'three')
            assert t2.c.value.type.name == 'onetwothreetype'
            assert t2.c.value2.type.enums == ('four', 'five', 'six')
            assert t2.c.value2.type.name == 'fourfivesixtype'
        finally:
            metadata.drop_all()

    def test_schema_reflection(self):
        metadata = MetaData(testing.db)
        etype = Enum('four', 'five', 'six', 
                        name='fourfivesixtype', 
                        schema='test_schema', 
                        metadata=metadata)
        t1 = Table('table', metadata,
            Column('id', Integer, primary_key=True),
            Column('value', Enum('one', 'two', 'three', 
                                name='onetwothreetype', schema='test_schema')),
            Column('value2', etype)
        )
        metadata.create_all()
        try:
            m2 = MetaData(testing.db)
            t2 = Table('table', m2, autoload=True)
            assert t2.c.value.type.enums == ('one', 'two', 'three')
            assert t2.c.value.type.name == 'onetwothreetype'
            assert t2.c.value2.type.enums == ('four', 'five', 'six')
            assert t2.c.value2.type.name == 'fourfivesixtype'
            assert t2.c.value2.type.schema == 'test_schema'
        finally:
            metadata.drop_all()
        
class InsertTest(TestBase, AssertsExecutionResults):
    __only_on__ = 'postgresql'

    @classmethod
    def setup_class(cls):
        global metadata
        cls.engine= testing.db
        metadata = MetaData(testing.db)

    def teardown(self):
        metadata.drop_all()
        metadata.tables.clear()
        if self.engine is not testing.db:
            self.engine.dispose()

    def test_compiled_insert(self):
        table = Table('testtable', metadata,
            Column('id', Integer, primary_key=True),
            Column('data', String(30)))

        metadata.create_all()

        ins = table.insert(inline=True, values={'data':bindparam('x')}).compile()
        ins.execute({'x':"five"}, {'x':"seven"})
        assert table.select().execute().fetchall() == [(1, 'five'), (2, 'seven')]

    def test_foreignkey_missing_insert(self):
        t1 = Table('t1', metadata,
            Column('id', Integer, primary_key=True)
        )
        t2 = Table('t2', metadata,
            Column('id', Integer, ForeignKey('t1.id'), primary_key=True)
        )
        metadata.create_all()
        
        # want to ensure that 
        # "null value in column "id" violates not-null constraint" is raised (IntegrityError on psycoopg2,
        # but ProgrammingError on pg8000),
        # and not "ProgrammingError: (ProgrammingError) relationship "t2_id_seq" does not exist".
        # the latter corresponds to autoincrement behavior, which is not the case
        # here due to the foreign key.
        for eng in [
            engines.testing_engine(options={'implicit_returning':False}),
            engines.testing_engine(options={'implicit_returning':True}),
        ]:
            assert_raises_message(exc.DBAPIError, "violates not-null constraint", eng.execute, t2.insert())
        
        
    def test_sequence_insert(self):
        table = Table('testtable', metadata,
            Column('id', Integer, Sequence('my_seq'), primary_key=True),
            Column('data', String(30)))
        metadata.create_all()
        self._assert_data_with_sequence(table, "my_seq")

    def test_sequence_returning_insert(self):
        table = Table('testtable', metadata,
            Column('id', Integer, Sequence('my_seq'), primary_key=True),
            Column('data', String(30)))
        metadata.create_all()
        self._assert_data_with_sequence_returning(table, "my_seq")

    def test_opt_sequence_insert(self):
        table = Table('testtable', metadata,
            Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True),
            Column('data', String(30)))
        metadata.create_all()
        self._assert_data_autoincrement(table)

    def test_opt_sequence_returning_insert(self):
        table = Table('testtable', metadata,
            Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True),
            Column('data', String(30)))
        metadata.create_all()
        self._assert_data_autoincrement_returning(table)

    def test_autoincrement_insert(self):
        table = Table('testtable', metadata,
            Column('id', Integer, primary_key=True),
            Column('data', String(30)))
        metadata.create_all()
        self._assert_data_autoincrement(table)

    def test_autoincrement_returning_insert(self):
        table = Table('testtable', metadata,
            Column('id', Integer, primary_key=True),
            Column('data', String(30)))
        metadata.create_all()
        self._assert_data_autoincrement_returning(table)

    def test_noautoincrement_insert(self):
        table = Table('testtable', metadata,
            Column('id', Integer, primary_key=True, autoincrement=False),
            Column('data', String(30)))
        metadata.create_all()
        self._assert_data_noautoincrement(table)

    def _assert_data_autoincrement(self, table):
        self.engine = engines.testing_engine(options={'implicit_returning':False})
        metadata.bind = self.engine

        def go():
            # execute with explicit id
            r = table.insert().execute({'id':30, 'data':'d1'})
            assert r.inserted_primary_key == [30]

            # execute with prefetch id
            r = table.insert().execute({'data':'d2'})
            assert r.inserted_primary_key == [1]

            # executemany with explicit ids
            table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})

            # executemany, uses SERIAL
            table.insert().execute({'data':'d5'}, {'data':'d6'})

            # single execute, explicit id, inline
            table.insert(inline=True).execute({'id':33, 'data':'d7'})

            # single execute, inline, uses SERIAL
            table.insert(inline=True).execute({'data':'d8'})

        # note that the test framework doesnt capture the "preexecute" of a seqeuence
        # or default.  we just see it in the bind params.

        self.assert_sql(self.engine, go, [], with_sequences=[
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                {'id':30, 'data':'d1'}
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                {'id':1, 'data':'d2'}
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
            ),
            (
                "INSERT INTO testtable (data) VALUES (:data)",
                [{'data':'d5'}, {'data':'d6'}]
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                [{'id':33, 'data':'d7'}]
            ),
            (
                "INSERT INTO testtable (data) VALUES (:data)",
                [{'data':'d8'}]
            ),
        ])

        assert table.select().execute().fetchall() == [
            (30, 'd1'),
            (1, 'd2'),
            (31, 'd3'),
            (32, 'd4'),
            (2, 'd5'),
            (3, 'd6'),
            (33, 'd7'),
            (4, 'd8'),
        ]
        table.delete().execute()

        # test the same series of events using a reflected
        # version of the table
        m2 = MetaData(self.engine)
        table = Table(table.name, m2, autoload=True)

        def go():
            table.insert().execute({'id':30, 'data':'d1'})
            r = table.insert().execute({'data':'d2'})
            assert r.inserted_primary_key == [5]
            table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
            table.insert().execute({'data':'d5'}, {'data':'d6'})
            table.insert(inline=True).execute({'id':33, 'data':'d7'})
            table.insert(inline=True).execute({'data':'d8'})

        self.assert_sql(self.engine, go, [], with_sequences=[
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                {'id':30, 'data':'d1'}
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                {'id':5, 'data':'d2'}
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
            ),
            (
                "INSERT INTO testtable (data) VALUES (:data)",
                [{'data':'d5'}, {'data':'d6'}]
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                [{'id':33, 'data':'d7'}]
            ),
            (
                "INSERT INTO testtable (data) VALUES (:data)",
                [{'data':'d8'}]
            ),
        ])

        assert table.select().execute().fetchall() == [
            (30, 'd1'),
            (5, 'd2'),
            (31, 'd3'),
            (32, 'd4'),
            (6, 'd5'),
            (7, 'd6'),
            (33, 'd7'),
            (8, 'd8'),
        ]
        table.delete().execute()

    def _assert_data_autoincrement_returning(self, table):
        self.engine = engines.testing_engine(options={'implicit_returning':True})
        metadata.bind = self.engine

        def go():
            # execute with explicit id
            r = table.insert().execute({'id':30, 'data':'d1'})
            assert r.inserted_primary_key == [30]

            # execute with prefetch id
            r = table.insert().execute({'data':'d2'})
            assert r.inserted_primary_key == [1]

            # executemany with explicit ids
            table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})

            # executemany, uses SERIAL
            table.insert().execute({'data':'d5'}, {'data':'d6'})

            # single execute, explicit id, inline
            table.insert(inline=True).execute({'id':33, 'data':'d7'})

            # single execute, inline, uses SERIAL
            table.insert(inline=True).execute({'data':'d8'})
        
        self.assert_sql(self.engine, go, [], with_sequences=[
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                {'id':30, 'data':'d1'}
            ),
            (
                "INSERT INTO testtable (data) VALUES (:data) RETURNING testtable.id",
                {'data': 'd2'}
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
            ),
            (
                "INSERT INTO testtable (data) VALUES (:data)",
                [{'data':'d5'}, {'data':'d6'}]
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                [{'id':33, 'data':'d7'}]
            ),
            (
                "INSERT INTO testtable (data) VALUES (:data)",
                [{'data':'d8'}]
            ),
        ])

        assert table.select().execute().fetchall() == [
            (30, 'd1'),
            (1, 'd2'),
            (31, 'd3'),
            (32, 'd4'),
            (2, 'd5'),
            (3, 'd6'),
            (33, 'd7'),
            (4, 'd8'),
        ]
        table.delete().execute()

        # test the same series of events using a reflected
        # version of the table
        m2 = MetaData(self.engine)
        table = Table(table.name, m2, autoload=True)

        def go():
            table.insert().execute({'id':30, 'data':'d1'})
            r = table.insert().execute({'data':'d2'})
            assert r.inserted_primary_key == [5]
            table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
            table.insert().execute({'data':'d5'}, {'data':'d6'})
            table.insert(inline=True).execute({'id':33, 'data':'d7'})
            table.insert(inline=True).execute({'data':'d8'})

        self.assert_sql(self.engine, go, [], with_sequences=[
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                {'id':30, 'data':'d1'}
            ),
            (
                "INSERT INTO testtable (data) VALUES (:data) RETURNING testtable.id",
                {'data':'d2'}
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
            ),
            (
                "INSERT INTO testtable (data) VALUES (:data)",
                [{'data':'d5'}, {'data':'d6'}]
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                [{'id':33, 'data':'d7'}]
            ),
            (
                "INSERT INTO testtable (data) VALUES (:data)",
                [{'data':'d8'}]
            ),
        ])

        assert table.select().execute().fetchall() == [
            (30, 'd1'),
            (5, 'd2'),
            (31, 'd3'),
            (32, 'd4'),
            (6, 'd5'),
            (7, 'd6'),
            (33, 'd7'),
            (8, 'd8'),
        ]
        table.delete().execute()

    def _assert_data_with_sequence(self, table, seqname):
        self.engine = engines.testing_engine(options={'implicit_returning':False})
        metadata.bind = self.engine

        def go():
            table.insert().execute({'id':30, 'data':'d1'})
            table.insert().execute({'data':'d2'})
            table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
            table.insert().execute({'data':'d5'}, {'data':'d6'})
            table.insert(inline=True).execute({'id':33, 'data':'d7'})
            table.insert(inline=True).execute({'data':'d8'})

        self.assert_sql(self.engine, go, [], with_sequences=[
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                {'id':30, 'data':'d1'}
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                {'id':1, 'data':'d2'}
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
                [{'data':'d5'}, {'data':'d6'}]
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                [{'id':33, 'data':'d7'}]
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
                [{'data':'d8'}]
            ),
        ])

        assert table.select().execute().fetchall() == [
            (30, 'd1'),
            (1, 'd2'),
            (31, 'd3'),
            (32, 'd4'),
            (2, 'd5'),
            (3, 'd6'),
            (33, 'd7'),
            (4, 'd8'),
        ]

        # cant test reflection here since the Sequence must be
        # explicitly specified

    def _assert_data_with_sequence_returning(self, table, seqname):
        self.engine = engines.testing_engine(options={'implicit_returning':True})
        metadata.bind = self.engine

        def go():
            table.insert().execute({'id':30, 'data':'d1'})
            table.insert().execute({'data':'d2'})
            table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
            table.insert().execute({'data':'d5'}, {'data':'d6'})
            table.insert(inline=True).execute({'id':33, 'data':'d7'})
            table.insert(inline=True).execute({'data':'d8'})

        self.assert_sql(self.engine, go, [], with_sequences=[
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                {'id':30, 'data':'d1'}
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (nextval('my_seq'), :data) RETURNING testtable.id",
                {'data':'d2'}
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
                [{'data':'d5'}, {'data':'d6'}]
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (:id, :data)",
                [{'id':33, 'data':'d7'}]
            ),
            (
                "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
                [{'data':'d8'}]
            ),
        ])

        assert table.select().execute().fetchall() == [
            (30, 'd1'),
            (1, 'd2'),
            (31, 'd3'),
            (32, 'd4'),
            (2, 'd5'),
            (3, 'd6'),
            (33, 'd7'),
            (4, 'd8'),
        ]

        # cant test reflection here since the Sequence must be
        # explicitly specified

    def _assert_data_noautoincrement(self, table):
        self.engine = engines.testing_engine(options={'implicit_returning':False})
        metadata.bind = self.engine

        table.insert().execute({'id':30, 'data':'d1'})
        
        if self.engine.driver == 'pg8000':
            exception_cls = exc.ProgrammingError
        elif self.engine.driver == 'pypostgresql':
            exception_cls = Exception
        else:
            exception_cls = exc.IntegrityError
        
        assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'})
        assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'})

        assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'})

        assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'})

        table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
        table.insert(inline=True).execute({'id':33, 'data':'d4'})

        assert table.select().execute().fetchall() == [
            (30, 'd1'),
            (31, 'd2'),
            (32, 'd3'),
            (33, 'd4'),
        ]
        table.delete().execute()

        # test the same series of events using a reflected
        # version of the table
        m2 = MetaData(self.engine)
        table = Table(table.name, m2, autoload=True)
        table.insert().execute({'id':30, 'data':'d1'})

        assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'})
        assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'})

        table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
        table.insert(inline=True).execute({'id':33, 'data':'d4'})

        assert table.select().execute().fetchall() == [
            (30, 'd1'),
            (31, 'd2'),
            (32, 'd3'),
            (33, 'd4'),
        ]

class DomainReflectionTest(TestBase, AssertsExecutionResults):
    "Test PostgreSQL domains"

    __only_on__ = 'postgresql'

    @classmethod
    def setup_class(cls):
        con = testing.db.connect()
        for ddl in ('CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42',
                    'CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0'):
            try:
                con.execute(ddl)
            except exc.SQLError, e:
                if not "already exists" in str(e):
                    raise e
        con.execute('CREATE TABLE testtable (question integer, answer testdomain)')
        con.execute('CREATE TABLE test_schema.testtable(question integer, answer test_schema.testdomain, anything integer)')
        con.execute('CREATE TABLE crosschema (question integer, answer test_schema.testdomain)')

    @classmethod
    def teardown_class(cls):
        con = testing.db.connect()
        con.execute('DROP TABLE testtable')
        con.execute('DROP TABLE test_schema.testtable')
        con.execute('DROP TABLE crosschema')
        con.execute('DROP DOMAIN testdomain')
        con.execute('DROP DOMAIN test_schema.testdomain')

    def test_table_is_reflected(self):
        metadata = MetaData(testing.db)
        table = Table('testtable', metadata, autoload=True)
        eq_(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns")
        assert isinstance(table.c.answer.type, Integer)

    def test_domain_is_reflected(self):
        metadata = MetaData(testing.db)
        table = Table('testtable', metadata, autoload=True)
        eq_(str(table.columns.answer.server_default.arg), '42', "Reflected default value didn't equal expected value")
        assert not table.columns.answer.nullable, "Expected reflected column to not be nullable."

    def test_table_is_reflected_test_schema(self):
        metadata = MetaData(testing.db)
        table = Table('testtable', metadata, autoload=True, schema='test_schema')
        eq_(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns")
        assert isinstance(table.c.anything.type, Integer)

    def test_schema_domain_is_reflected(self):
        metadata = MetaData(testing.db)
        table = Table('testtable', metadata, autoload=True, schema='test_schema')
        eq_(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value")
        assert table.columns.answer.nullable, "Expected reflected column to be nullable."

    def test_crosschema_domain_is_reflected(self):
        metadata = MetaData(testing.db)
        table = Table('crosschema', metadata, autoload=True)
        eq_(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value")
        assert table.columns.answer.nullable, "Expected reflected column to be nullable."

    def test_unknown_types(self):
        from sqlalchemy.databases import postgresql

        ischema_names = postgresql.PGDialect.ischema_names
        postgresql.PGDialect.ischema_names = {}
        try:
            m2 = MetaData(testing.db)
            assert_raises(exc.SAWarning, Table, "testtable", m2, autoload=True)

            @testing.emits_warning('Did not recognize type')
            def warns():
                m3 = MetaData(testing.db)
                t3 = Table("testtable", m3, autoload=True)
                assert t3.c.answer.type.__class__ == sa.types.NullType

        finally:
            postgresql.PGDialect.ischema_names = ischema_names

class MiscTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
    __only_on__ = 'postgresql'

    def test_date_reflection(self):
        m1 = MetaData(testing.db)
        t1 = Table('pgdate', m1,
            Column('date1', DateTime(timezone=True)),
            Column('date2', DateTime(timezone=False))
            )
        m1.create_all()
        try:
            m2 = MetaData(testing.db)
            t2 = Table('pgdate', m2, autoload=True)
            assert t2.c.date1.type.timezone is True
            assert t2.c.date2.type.timezone is False
        finally:
            m1.drop_all()
    
    @testing.fails_on('+zxjdbc', 'The JDBC driver handles the version parsing')
    def test_version_parsing(self):
        class MockConn(object):
            def __init__(self, res):
                self.res = res
                
            def execute(self, str):
                return self
                
            def scalar(self):
                return self.res
                
        for string, version in [
            ("PostgreSQL 8.3.8 on i686-redhat-linux-gnu, compiled by GCC gcc (GCC) 4.1.2 20070925 (Red Hat 4.1.2-33)", (8, 3, 8)),
            ("PostgreSQL 8.5devel on x86_64-unknown-linux-gnu, compiled by GCC gcc (GCC) 4.4.2, 64-bit", (8, 5)),
        ]:
            
            eq_(testing.db.dialect._get_server_version_info(MockConn(string)), version)
    
    @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
    def test_notice_logging(self):
        log = logging.getLogger('sqlalchemy.dialects.postgresql')
        buf = logging.handlers.BufferingHandler(100)
        lev = log.level
        log.addHandler(buf)
        log.setLevel(logging.INFO)
        try:
            conn = testing.db.connect()
            trans = conn.begin()
            try:
                conn.execute("create table foo (id serial primary key)")
            finally:
                trans.rollback()
        finally:
            log.removeHandler(buf)
            log.setLevel(lev)

        msgs = " ".join(b.msg for b in buf.buffer)
        assert "will create implicit sequence" in msgs
        assert "will create implicit index" in msgs
         
        
    def test_pg_weirdchar_reflection(self):
        meta1 = MetaData(testing.db)
        subject = Table("subject", meta1,
                        Column("id$", Integer, primary_key=True),
                        )

        referer = Table("referer", meta1,
                        Column("id", Integer, primary_key=True),
                        Column("ref", Integer, ForeignKey('subject.id$')),
                        )
        meta1.create_all()
        try:
            meta2 = MetaData(testing.db)
            subject = Table("subject", meta2, autoload=True)
            referer = Table("referer", meta2, autoload=True)
            print str(subject.join(referer).onclause)
            self.assert_((subject.c['id$']==referer.c.ref).compare(subject.join(referer).onclause))
        finally:
            meta1.drop_all()

    @testing.fails_on('+zxjdbc', "Can't infer the SQL type to use "
                                "for an instance of "
                                "org.python.core.PyObjectDerived.")
    @testing.fails_on('+pg8000', "Can't determine correct type.")
    def test_extract(self):
        fivedaysago = datetime.datetime.now() - datetime.timedelta(days=5)
        for field, exp in (
                    ('year', fivedaysago.year),
                    ('month', fivedaysago.month),
                    ('day', fivedaysago.day),
            ):
            r = testing.db.execute(
                select([extract(field, func.now() + datetime.timedelta(days =-5))])
            ).scalar()
            eq_(r, exp)


    def test_checksfor_sequence(self):
        meta1 = MetaData(testing.db)
        t = Table('mytable', meta1,
            Column('col1', Integer, Sequence('fooseq')))
        try:
            testing.db.execute("CREATE SEQUENCE fooseq")
            t.create(checkfirst=True)
        finally:
            t.drop(checkfirst=True)

    def test_renamed_sequence_reflection(self):
        m1 = MetaData(testing.db)
        t = Table('t', m1, 
            Column('id', Integer, primary_key=True)
        )
        m1.create_all()
        try:
            m2 = MetaData(testing.db)
            t2 = Table('t', m2, autoload=True, implicit_returning=False)
            eq_(t2.c.id.server_default.arg.text, "nextval('t_id_seq'::regclass)")
            
            r = t2.insert().execute()
            eq_(r.inserted_primary_key, [1])
            
            testing.db.connect().\
                            execution_options(autocommit=True).\
                            execute("alter table t_id_seq rename to foobar_id_seq")
                            
            m3 = MetaData(testing.db)
            t3 = Table('t', m3, autoload=True, implicit_returning=False)
            eq_(t3.c.id.server_default.arg.text, "nextval('foobar_id_seq'::regclass)")

            r = t3.insert().execute()
            eq_(r.inserted_primary_key, [2])
            
        finally:
            m1.drop_all()
        
        
    def test_distinct_on(self):
        t = Table('mytable', MetaData(testing.db),
                  Column('id', Integer, primary_key=True),
                  Column('a', String(8)))
        eq_(
            str(t.select(distinct=t.c.a)),
            'SELECT DISTINCT ON (mytable.a) mytable.id, mytable.a \n'
            'FROM mytable')
        eq_(
            str(t.select(distinct=['id','a'])),
            'SELECT DISTINCT ON (id, a) mytable.id, mytable.a \n'
            'FROM mytable')
        eq_(
            str(t.select(distinct=[t.c.id, t.c.a])),
            'SELECT DISTINCT ON (mytable.id, mytable.a) mytable.id, mytable.a \n'
            'FROM mytable')

    def test_schema_reflection(self):
        """note: this test requires that the 'test_schema' schema be separate and accessible by the test user"""

        meta1 = MetaData(testing.db)
        users = Table('users', meta1,
            Column('user_id', Integer, primary_key = True),
            Column('user_name', String(30), nullable = False),
            schema="test_schema"
            )

        addresses = Table('email_addresses', meta1,
            Column('address_id', Integer, primary_key = True),
            Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
            Column('email_address', String(20)),
            schema="test_schema"
        )
        meta1.create_all()
        try:
            meta2 = MetaData(testing.db)
            addresses = Table('email_addresses', meta2, autoload=True, schema="test_schema")
            users = Table('users', meta2, mustexist=True, schema="test_schema")

            print users
            print addresses
            j = join(users, addresses)
            print str(j.onclause)
            self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause))
        finally:
            meta1.drop_all()

    def test_schema_reflection_2(self):
        meta1 = MetaData(testing.db)
        subject = Table("subject", meta1,
                        Column("id", Integer, primary_key=True),
                        )

        referer = Table("referer", meta1,
                        Column("id", Integer, primary_key=True),
                        Column("ref", Integer, ForeignKey('subject.id')),
                        schema="test_schema")
        meta1.create_all()
        try:
            meta2 = MetaData(testing.db)
            subject = Table("subject", meta2, autoload=True)
            referer = Table("referer", meta2, schema="test_schema", autoload=True)
            print str(subject.join(referer).onclause)
            self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
        finally:
            meta1.drop_all()

    def test_schema_reflection_3(self):
        meta1 = MetaData(testing.db)
        subject = Table("subject", meta1,
                        Column("id", Integer, primary_key=True),
                        schema='test_schema_2'
                        )

        referer = Table("referer", meta1,
                        Column("id", Integer, primary_key=True),
                        Column("ref", Integer, ForeignKey('test_schema_2.subject.id')),
                        schema="test_schema")

        meta1.create_all()
        try:
            meta2 = MetaData(testing.db)
            subject = Table("subject", meta2, autoload=True, schema="test_schema_2")
            referer = Table("referer", meta2, schema="test_schema", autoload=True)
            print str(subject.join(referer).onclause)
            self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
        finally:
            meta1.drop_all()

    def test_schema_roundtrips(self):
        meta = MetaData(testing.db)
        users = Table('users', meta,
            Column('id', Integer, primary_key=True),
            Column('name', String(50)), schema='test_schema')
        users.create()
        try:
            users.insert().execute(id=1, name='name1')
            users.insert().execute(id=2, name='name2')
            users.insert().execute(id=3, name='name3')
            users.insert().execute(id=4, name='name4')

            eq_(users.select().where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])
            eq_(users.select(use_labels=True).where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])

            users.delete().where(users.c.id==3).execute()
            eq_(users.select().where(users.c.name=='name3').execute().fetchall(), [])

            users.update().where(users.c.name=='name4').execute(name='newname')
            eq_(users.select(use_labels=True).where(users.c.id==4).execute().fetchall(), [(4, 'newname')])

        finally:
            users.drop()

    def test_preexecute_passivedefault(self):
        """test that when we get a primary key column back
        from reflecting a table which has a default value on it, we pre-execute
        that DefaultClause upon insert."""

        try:
            meta = MetaData(testing.db)
            testing.db.execute("""
             CREATE TABLE speedy_users
             (
                 speedy_user_id   SERIAL     PRIMARY KEY,

                 user_name        VARCHAR    NOT NULL,
                 user_password    VARCHAR    NOT NULL
             );
            """)

            t = Table("speedy_users", meta, autoload=True)
            r = t.insert().execute(user_name='user', user_password='lala')
            assert r.inserted_primary_key == [1]
            l = t.select().execute().fetchall()
            assert l == [(1, 'user', 'lala')]
        finally:
            testing.db.execute("drop table speedy_users")

    @testing.emits_warning()
    def test_index_reflection(self):
        """ Reflecting partial & expression-based indexes should warn """
        import warnings
        def capture_warnings(*args, **kw):
            capture_warnings._orig_showwarning(*args, **kw)
            capture_warnings.warnings.append(args)
        capture_warnings._orig_showwarning = warnings.warn
        capture_warnings.warnings = []

        m1 = MetaData(testing.db)
        t1 = Table('party', m1,
            Column('id', String(10), nullable=False),
            Column('name', String(20), index=True), 
            Column('aname', String(20))
            )
        m1.create_all()
        
        testing.db.execute("""
          create index idx1 on party ((id || name))
        """) 
        testing.db.execute("""
          create unique index idx2 on party (id) where name = 'test'
        """)
        
        testing.db.execute("""
            create index idx3 on party using btree
                (lower(name::text), lower(aname::text))
        """)
        
        try:
            m2 = MetaData(testing.db)

            warnings.warn = capture_warnings
            t2 = Table('party', m2, autoload=True)
      
            wrn = capture_warnings.warnings
            assert str(wrn[0][0]) == (
              "Skipped unsupported reflection of expression-based index idx1")
            assert str(wrn[1][0]) == (
              "Predicate of partial index idx2 ignored during reflection")
            assert len(t2.indexes) == 2
            # Make sure indexes are in the order we expect them in
            tmp = [(idx.name, idx) for idx in t2.indexes]
            tmp.sort()
            
            r1, r2 = [idx[1] for idx in tmp]

            assert r1.name == 'idx2'
            assert r1.unique == True
            assert r2.unique == False
            assert [t2.c.id] == r1.columns
            assert [t2.c.name] == r2.columns
        finally:
            warnings.warn = capture_warnings._orig_showwarning
            m1.drop_all()

    @testing.fails_on('postgresql+pypostgresql', 'pypostgresql bombs on multiple calls')
    def test_set_isolation_level(self):
        """Test setting the isolation level with create_engine"""
        eng = create_engine(testing.db.url)
        eq_(
            eng.execute("show transaction isolation level").scalar(),
            'read committed')
        eng = create_engine(testing.db.url, isolation_level="SERIALIZABLE")
        eq_(
            eng.execute("show transaction isolation level").scalar(),
            'serializable')
        eng = create_engine(testing.db.url, isolation_level="FOO")

        if testing.db.driver == 'zxjdbc':
            exception_cls = eng.dialect.dbapi.Error
        else:
            exception_cls = eng.dialect.dbapi.ProgrammingError
        assert_raises(exception_cls, eng.execute, "show transaction isolation level")
    
    @testing.fails_on('+zxjdbc', 
                        "psycopg2/pg8000 specific assertion")
    @testing.fails_on('pypostgresql', 
                        "psycopg2/pg8000 specific assertion")
    def test_numeric_raise(self):
        stmt = text("select cast('hi' as char) as hi", typemap={'hi':Numeric})
        assert_raises(
            exc.InvalidRequestError,
            testing.db.execute, stmt
        )
        
class TimezoneTest(TestBase):
    """Test timezone-aware datetimes.

    psycopg will return a datetime with a tzinfo attached to it, if postgresql
    returns it.  python then will not let you compare a datetime with a tzinfo
    to a datetime that doesnt have one.  this test illustrates two ways to
    have datetime types with and without timezone info.
    """

    __only_on__ = 'postgresql'

    @classmethod
    def setup_class(cls):
        global tztable, notztable, metadata
        metadata = MetaData(testing.db)

        # current_timestamp() in postgresql is assumed to return TIMESTAMP WITH TIMEZONE
        tztable = Table('tztable', metadata,
            Column("id", Integer, primary_key=True),
            Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()),
            Column("name", String(20)),
        )
        notztable = Table('notztable', metadata,
            Column("id", Integer, primary_key=True),
            Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))),
            Column("name", String(20)),
        )
        metadata.create_all()

    @classmethod
    def teardown_class(cls):
        metadata.drop_all()

    def test_with_timezone(self):
        # get a date with a tzinfo
        somedate = testing.db.connect().scalar(func.current_timestamp().select())
        assert somedate.tzinfo
        
        tztable.insert().execute(id=1, name='row1', date=somedate)
        
        row = select([tztable.c.date], tztable.c.id==1).execute().first()
        eq_(row[0], somedate)
        eq_(somedate.tzinfo.utcoffset(somedate), row[0].tzinfo.utcoffset(row[0]))

        result = tztable.update(tztable.c.id==1).\
                        returning(tztable.c.date).execute(name='newname')
        row = result.first()
        assert row[0] >= somedate

    def test_without_timezone(self):
        # get a date without a tzinfo
        somedate = datetime.datetime(2005, 10, 20, 11, 52, 0)
        assert not somedate.tzinfo
        
        notztable.insert().execute(id=1, name='row1', date=somedate)

        row = select([notztable.c.date], notztable.c.id==1).execute().first()
        eq_(row[0], somedate)
        eq_(row[0].tzinfo, None)
        
        result = notztable.update(notztable.c.id==1).\
                        returning(notztable.c.date).execute(name='newname')
        row = result.first()
        assert row[0] >= somedate

class TimePrecisionTest(TestBase, AssertsCompiledSQL):
    __dialect__ = postgresql.dialect()
    
    def test_compile(self):
        for (type_, expected) in [
            (postgresql.TIME(), "TIME WITHOUT TIME ZONE"),
            (postgresql.TIME(precision=5), "TIME(5) WITHOUT TIME ZONE"),
            (postgresql.TIME(timezone=True, precision=5), "TIME(5) WITH TIME ZONE"),
            (postgresql.TIMESTAMP(), "TIMESTAMP WITHOUT TIME ZONE"),
            (postgresql.TIMESTAMP(precision=5), "TIMESTAMP(5) WITHOUT TIME ZONE"),
            (postgresql.TIMESTAMP(timezone=True, precision=5), "TIMESTAMP(5) WITH TIME ZONE"),
        ]:
            self.assert_compile(type_, expected)
    
    @testing.only_on('postgresql', 'DB specific feature')
    def test_reflection(self):
        m1 = MetaData(testing.db)
        t1 = Table('t1', m1, 
            Column('c1', postgresql.TIME()),
            Column('c2', postgresql.TIME(precision=5)),
            Column('c3', postgresql.TIME(timezone=True, precision=5)), 
            Column('c4', postgresql.TIMESTAMP()), 
            Column('c5', postgresql.TIMESTAMP(precision=5)), 
            Column('c6', postgresql.TIMESTAMP(timezone=True, precision=5)), 
        
        )
        t1.create()
        try:
            m2 = MetaData(testing.db)
            t2 = Table('t1', m2, autoload=True)
            eq_(t2.c.c1.type.precision, None)
            eq_(t2.c.c2.type.precision, 5)
            eq_(t2.c.c3.type.precision, 5)
            eq_(t2.c.c4.type.precision, None)
            eq_(t2.c.c5.type.precision, 5)
            eq_(t2.c.c6.type.precision, 5)
            eq_(t2.c.c1.type.timezone, False)
            eq_(t2.c.c2.type.timezone, False)
            eq_(t2.c.c3.type.timezone, True)
            eq_(t2.c.c4.type.timezone, False)
            eq_(t2.c.c5.type.timezone, False)
            eq_(t2.c.c6.type.timezone, True)
        finally:
            t1.drop()
        
    
    
class ArrayTest(TestBase, AssertsExecutionResults):
    __only_on__ = 'postgresql'

    @classmethod
    def setup_class(cls):
        global metadata, arrtable
        metadata = MetaData(testing.db)

        arrtable = Table('arrtable', metadata,
            Column('id', Integer, primary_key=True),
            Column('intarr', postgresql.PGArray(Integer)),
            Column('strarr', postgresql.PGArray(Unicode()), nullable=False)
        )
        metadata.create_all()

    def teardown(self):
        arrtable.delete().execute()
        
    @classmethod
    def teardown_class(cls):
        metadata.drop_all()

    def test_reflect_array_column(self):
        metadata2 = MetaData(testing.db)
        tbl = Table('arrtable', metadata2, autoload=True)
        assert isinstance(tbl.c.intarr.type, postgresql.PGArray)
        assert isinstance(tbl.c.strarr.type, postgresql.PGArray)
        assert isinstance(tbl.c.intarr.type.item_type, Integer)
        assert isinstance(tbl.c.strarr.type.item_type, String)

    @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
    def test_insert_array(self):
        arrtable.insert().execute(intarr=[1,2,3], strarr=[u'abc', u'def'])
        results = arrtable.select().execute().fetchall()
        eq_(len(results), 1)
        eq_(results[0]['intarr'], [1,2,3])
        eq_(results[0]['strarr'], ['abc','def'])

    @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays')
    @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
    def test_array_where(self):
        arrtable.insert().execute(intarr=[1,2,3], strarr=[u'abc', u'def'])
        arrtable.insert().execute(intarr=[4,5,6], strarr=u'ABC')
        results = arrtable.select().where(arrtable.c.intarr == [1,2,3]).execute().fetchall()
        eq_(len(results), 1)
        eq_(results[0]['intarr'], [1,2,3])

    @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays')
    @testing.fails_on('postgresql+pypostgresql', 'pypostgresql fails in coercing an array')
    @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
    def test_array_concat(self):
        arrtable.insert().execute(intarr=[1,2,3], strarr=[u'abc', u'def'])
        results = select([arrtable.c.intarr + [4,5,6]]).execute().fetchall()
        eq_(len(results), 1)
        eq_(results[0][0], [1,2,3,4,5,6])

    @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays')
    @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
    def test_array_subtype_resultprocessor(self):
        arrtable.insert().execute(intarr=[4,5,6], strarr=[[u'm\xe4\xe4'], [u'm\xf6\xf6']])
        arrtable.insert().execute(intarr=[1,2,3], strarr=[u'm\xe4\xe4', u'm\xf6\xf6'])
        results = arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall()
        eq_(len(results), 2)
        eq_(results[0]['strarr'], [u'm\xe4\xe4', u'm\xf6\xf6'])
        eq_(results[1]['strarr'], [[u'm\xe4\xe4'], [u'm\xf6\xf6']])

    @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays')
    @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
    def test_array_mutability(self):
        class Foo(object): pass
        footable = Table('foo', metadata,
            Column('id', Integer, primary_key=True),
            Column('intarr', postgresql.PGArray(Integer), nullable=True)
        )
        mapper(Foo, footable)
        metadata.create_all()
        sess = create_session()

        foo = Foo()
        foo.id = 1
        foo.intarr = [1,2,3]
        sess.add(foo)
        sess.flush()
        sess.expunge_all()
        foo = sess.query(Foo).get(1)
        eq_(foo.intarr, [1,2,3])

        foo.intarr.append(4)
        sess.flush()
        sess.expunge_all()
        foo = sess.query(Foo).get(1)
        eq_(foo.intarr, [1,2,3,4])

        foo.intarr = []
        sess.flush()
        sess.expunge_all()
        eq_(foo.intarr, [])

        foo.intarr = None
        sess.flush()
        sess.expunge_all()
        eq_(foo.intarr, None)

        # Errors in r4217:
        foo = Foo()
        foo.id = 2
        sess.add(foo)
        sess.flush()

class TimestampTest(TestBase, AssertsExecutionResults):
    __only_on__ = 'postgresql'

    def test_timestamp(self):
        engine = testing.db
        connection = engine.connect()
        
        s = select(["timestamp '2007-12-25'"])
        result = connection.execute(s).first()
        eq_(result[0], datetime.datetime(2007, 12, 25, 0, 0))

class ServerSideCursorsTest(TestBase, AssertsExecutionResults):
    __only_on__ = 'postgresql+psycopg2'

    @classmethod
    def setup_class(cls):
        global ss_engine
        ss_engine = engines.testing_engine(options={'server_side_cursors':True})

    @classmethod
    def teardown_class(cls):
        ss_engine.dispose()

    def test_uses_ss(self):
        result = ss_engine.execute("select 1")
        assert result.cursor.name
        
        result = ss_engine.execute(text("select 1"))
        assert result.cursor.name

        result = ss_engine.execute(select([1]))
        assert result.cursor.name

    def test_uses_ss_when_explicitly_enabled(self):
        engine = engines.testing_engine(options={'server_side_cursors':False})
        result = engine.execute(text("select 1"))
        # It should be off globally ...
        assert not result.cursor.name

        s = select([1]).execution_options(stream_results=True)
        result = engine.execute(s)
        # ... but enabled for this one.
        assert result.cursor.name

        # and this one
        result = engine.connect().execution_options(stream_results=True).execute("select 1")
        assert result.cursor.name
        
        # not this one
        result = engine.connect().execution_options(stream_results=False).execute(s)
        assert not result.cursor.name
        
    def test_ss_explicitly_disabled(self):
        s = select([1]).execution_options(stream_results=False)
        result = ss_engine.execute(s)
        assert not result.cursor.name

    def test_aliases_and_ss(self):
        engine = engines.testing_engine(options={'server_side_cursors':False})
        s1 = select([1]).execution_options(stream_results=True).alias()
        result = engine.execute(s1)
        assert result.cursor.name

        # s1's options shouldn't affect s2 when s2 is used as a from_obj.
        s2 = select([1], from_obj=s1)
        result = engine.execute(s2)
        assert not result.cursor.name

    def test_for_update_and_ss(self):
        s1 = select([1], for_update=True)
        result = ss_engine.execute(s1)
        assert result.cursor.name

        result = ss_engine.execute('SELECT 1 FOR UPDATE')
        assert result.cursor.name

    def test_orm_queries_with_ss(self):
        metadata = MetaData(testing.db)
        class Foo(object): pass
        footable = Table('foobar', metadata,
            Column('id', Integer, primary_key=True),
        )
        mapper(Foo, footable)
        metadata.create_all()
        try:
            sess = create_session()

            engine = engines.testing_engine(options={'server_side_cursors':False})
            result = engine.execute(sess.query(Foo).statement)
            assert not result.cursor.name, result.cursor.name
            result.close()

            q = sess.query(Foo).execution_options(stream_results=True)
            result = engine.execute(q.statement)
            assert result.cursor.name
            result.close()

            result = sess.query(Foo).execution_options(stream_results=True).subquery().execute()
            assert result.cursor.name
            result.close()
        finally:
            metadata.drop_all()
            
    def test_text_with_ss(self):
        engine = engines.testing_engine(options={'server_side_cursors':False})
        s = text('select 42')
        result = engine.execute(s)
        assert not result.cursor.name
        s = text('select 42').execution_options(stream_results=True)
        result = engine.execute(s)
        assert result.cursor.name

        
    def test_roundtrip(self):
        test_table = Table('test_table', MetaData(ss_engine),
            Column('id', Integer, primary_key=True),
            Column('data', String(50))
        )
        test_table.create(checkfirst=True)
        try:
            test_table.insert().execute(data='data1')

            nextid = ss_engine.execute(Sequence('test_table_id_seq'))
            test_table.insert().execute(id=nextid, data='data2')

            eq_(test_table.select().execute().fetchall(), [(1, 'data1'), (2, 'data2')])

            test_table.update().where(test_table.c.id==2).values(data=test_table.c.data + ' updated').execute()
            eq_(test_table.select().execute().fetchall(), [(1, 'data1'), (2, 'data2 updated')])
            test_table.delete().execute()
            eq_(test_table.count().scalar(), 0)
        finally:
            test_table.drop(checkfirst=True)

class SpecialTypesTest(TestBase, ComparesTables):
    """test DDL and reflection of PG-specific types """
    
    __only_on__ = 'postgresql'
    __excluded_on__ = (('postgresql', '<', (8, 3, 0)),)
    
    @classmethod
    def setup_class(cls):
        global metadata, table
        metadata = MetaData(testing.db)
        
        # create these types so that we can issue
        # special SQL92 INTERVAL syntax
        class y2m(types.UserDefinedType, postgresql.INTERVAL):
            def get_col_spec(self):
                return "INTERVAL YEAR TO MONTH"

        class d2s(types.UserDefinedType, postgresql.INTERVAL):
            def get_col_spec(self):
                return "INTERVAL DAY TO SECOND"
            
        table = Table('sometable', metadata,
            Column('id', postgresql.PGUuid, primary_key=True),
            Column('flag', postgresql.PGBit),
            Column('addr', postgresql.PGInet),
            Column('addr2', postgresql.PGMacAddr),
            Column('addr3', postgresql.PGCidr),
            Column('doubleprec', postgresql.DOUBLE_PRECISION),
            Column('plain_interval', postgresql.INTERVAL),
            Column('year_interval', y2m()),
            Column('month_interval', d2s()),
            Column('precision_interval', postgresql.INTERVAL(precision=3))
        )
        
        metadata.create_all()
        
        # cheat so that the "strict type check"
        # works
        table.c.year_interval.type = postgresql.INTERVAL()
        table.c.month_interval.type = postgresql.INTERVAL()
    
    @classmethod
    def teardown_class(cls):
        metadata.drop_all()
    
    def test_reflection(self):
        m = MetaData(testing.db)
        t = Table('sometable', m, autoload=True)
        
        self.assert_tables_equal(table, t, strict_types=True)
        assert t.c.plain_interval.type.precision is None
        assert t.c.precision_interval.type.precision == 3

class MatchTest(TestBase, AssertsCompiledSQL):
    __only_on__ = 'postgresql'
    __excluded_on__ = (('postgresql', '<', (8, 3, 0)),)

    @classmethod
    def setup_class(cls):
        global metadata, cattable, matchtable
        metadata = MetaData(testing.db)

        cattable = Table('cattable', metadata,
            Column('id', Integer, primary_key=True),
            Column('description', String(50)),
        )
        matchtable = Table('matchtable', metadata,
            Column('id', Integer, primary_key=True),
            Column('title', String(200)),
            Column('category_id', Integer, ForeignKey('cattable.id')),
        )
        metadata.create_all()

        cattable.insert().execute([
            {'id': 1, 'description': 'Python'},
            {'id': 2, 'description': 'Ruby'},
        ])
        matchtable.insert().execute([
            {'id': 1, 'title': 'Agile Web Development with Rails', 'category_id': 2},
            {'id': 2, 'title': 'Dive Into Python', 'category_id': 1},
            {'id': 3, 'title': "Programming Matz's Ruby", 'category_id': 2},
            {'id': 4, 'title': 'The Definitive Guide to Django', 'category_id': 1},
            {'id': 5, 'title': 'Python in a Nutshell', 'category_id': 1}
        ])

    @classmethod
    def teardown_class(cls):
        metadata.drop_all()

    @testing.fails_on('postgresql+pg8000', 'uses positional')
    @testing.fails_on('postgresql+zxjdbc', 'uses qmark')
    def test_expression_pyformat(self):
        self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title @@ to_tsquery(%(title_1)s)")

    @testing.fails_on('postgresql+psycopg2', 'uses pyformat')
    @testing.fails_on('postgresql+pypostgresql', 'uses pyformat')
    @testing.fails_on('postgresql+zxjdbc', 'uses qmark')
    def test_expression_positional(self):
        self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title @@ to_tsquery(%s)")

    def test_simple_match(self):
        results = matchtable.select().where(matchtable.c.title.match('python')).order_by(matchtable.c.id).execute().fetchall()
        eq_([2, 5], [r.id for r in results])

    def test_simple_match_with_apostrophe(self):
        results = matchtable.select().where(matchtable.c.title.match("Matz's")).execute().fetchall()
        eq_([3], [r.id for r in results])

    def test_simple_derivative_match(self):
        results = matchtable.select().where(matchtable.c.title.match('nutshells')).execute().fetchall()
        eq_([5], [r.id for r in results])

    def test_or_match(self):
        results1 = matchtable.select().where(or_(matchtable.c.title.match('nutshells'), 
                                                 matchtable.c.title.match('rubies'))
                                            ).order_by(matchtable.c.id).execute().fetchall()
        eq_([3, 5], [r.id for r in results1])
        results2 = matchtable.select().where(matchtable.c.title.match('nutshells | rubies'), 
                                            ).order_by(matchtable.c.id).execute().fetchall()
        eq_([3, 5], [r.id for r in results2])
        

    def test_and_match(self):
        results1 = matchtable.select().where(and_(matchtable.c.title.match('python'), 
                                                  matchtable.c.title.match('nutshells'))
                                            ).execute().fetchall()
        eq_([5], [r.id for r in results1])
        results2 = matchtable.select().where(matchtable.c.title.match('python & nutshells'), 
                                            ).execute().fetchall()
        eq_([5], [r.id for r in results2])

    def test_match_across_joins(self):
        results = matchtable.select().where(and_(cattable.c.id==matchtable.c.category_id, 
                                            or_(cattable.c.description.match('Ruby'), 
                                                matchtable.c.title.match('nutshells')))
                                           ).order_by(matchtable.c.id).execute().fetchall()
        eq_([1, 3, 5], [r.id for r in results])


www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.