from sqlalchemy.test.testing import eq_,assert_raises,assert_raises_message
import datetime, re, operator
from sqlalchemy import *
from sqlalchemy import exc,sql,util
from sqlalchemy.sql import table,column,label,compiler
from sqlalchemy.sql.expression import ClauseList
from sqlalchemy.engine import default
from sqlalchemy.databases import *
from sqlalchemy.test import *
table1 = table('mytable',
column('myid', Integer),
column('name', String),
column('description', String),
)
table2 = table(
'myothertable',
column('otherid', Integer),
column('othername', String),
)
table3 = table(
'thirdtable',
column('userid', Integer),
column('otherstuff', String),
)
metadata = MetaData()
# table with a schema
table4 = Table(
'remotetable', metadata,
Column('rem_id', Integer, primary_key=True),
Column('datatype_id', Integer),
Column('value', String(20)),
schema = 'remote_owner'
)
# table with a 'multipart' schema
table5 = Table(
'remotetable', metadata,
Column('rem_id', Integer, primary_key=True),
Column('datatype_id', Integer),
Column('value', String(20)),
schema = 'dbo.remote_owner'
)
users = table('users',
column('user_id'),
column('user_name'),
column('password'),
)
addresses = table('addresses',
column('address_id'),
column('user_id'),
column('street'),
column('city'),
column('state'),
column('zip')
)
class SelectTest(TestBase, AssertsCompiledSQL):
def test_attribute_sanity(self):
assert hasattr(table1, 'c')
assert hasattr(table1.select(), 'c')
assert not hasattr(table1.c.myid.self_group(), 'columns')
assert hasattr(table1.select().self_group(), 'columns')
assert not hasattr(select([table1.c.myid]).as_scalar().self_group(), 'columns')
assert not hasattr(table1.c.myid, 'columns')
assert not hasattr(table1.c.myid, 'c')
assert not hasattr(table1.select().c.myid, 'c')
assert not hasattr(table1.select().c.myid, 'columns')
assert not hasattr(table1.alias().c.myid, 'columns')
assert not hasattr(table1.alias().c.myid, 'c')
def test_table_select(self):
self.assert_compile(table1.select(),
"SELECT mytable.myid, mytable.name, "
"mytable.description FROM mytable")
self.assert_compile(select([table1, table2]),
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername FROM mytable, "
"myothertable")
def test_invalid_col_argument(self):
assert_raises(exc.ArgumentError, select, table1)
assert_raises(exc.ArgumentError, select, table1.c.myid)
def test_from_subquery(self):
"""tests placing select statements in the column clause of another select, for the
purposes of selecting from the exported columns of that select."""
s = select([table1], table1.c.name == 'jack')
self.assert_compile(
select(
[s],
s.c.myid == 7
)
,
"SELECT myid, name, description FROM (SELECT mytable.myid AS myid, "
"mytable.name AS name, mytable.description AS description FROM mytable "
"WHERE mytable.name = :name_1) WHERE myid = :myid_1")
sq = select([table1])
self.assert_compile(
sq.select(),
"SELECT myid, name, description FROM (SELECT mytable.myid AS myid, "
"mytable.name AS name, mytable.description AS description FROM mytable)"
)
sq = select(
[table1],
).alias('sq')
self.assert_compile(
sq.select(sq.c.myid == 7),
"SELECT sq.myid, sq.name, sq.description FROM "
"(SELECT mytable.myid AS myid, mytable.name AS name, "
"mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :myid_1"
)
sq = select(
[table1, table2],
and_(table1.c.myid ==7, table2.c.otherid==table1.c.myid),
use_labels = True
).alias('sq')
sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, \
mytable.description AS mytable_description, myothertable.otherid AS myothertable_otherid, \
myothertable.othername AS myothertable_othername FROM mytable, myothertable \
WHERE mytable.myid = :myid_1 AND myothertable.otherid = mytable.myid"
self.assert_compile(sq.select(), "SELECT sq.mytable_myid, sq.mytable_name, sq.mytable_description, sq.myothertable_otherid, \
sq.myothertable_othername FROM (" + sqstring + ") AS sq")
sq2 = select(
[sq],
use_labels = True
).alias('sq2')
self.assert_compile(sq2.select(), "SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, sq2.sq_mytable_description, \
sq2.sq_myothertable_otherid, sq2.sq_myothertable_othername FROM \
(SELECT sq.mytable_myid AS sq_mytable_myid, sq.mytable_name AS sq_mytable_name, \
sq.mytable_description AS sq_mytable_description, sq.myothertable_otherid AS sq_myothertable_otherid, \
sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") AS sq) AS sq2")
def test_select_from_clauselist(self):
self.assert_compile(
select([ClauseList(column('a'), column('b'))]).select_from('sometable'),
'SELECT a, b FROM sometable'
)
def test_use_labels(self):
self.assert_compile(
select([table1.c.myid==5], use_labels=True),
"SELECT mytable.myid = :myid_1 AS anon_1 FROM mytable"
)
self.assert_compile(
select([func.foo()], use_labels=True),
"SELECT foo() AS foo_1"
)
self.assert_compile(
select([not_(True)], use_labels=True),
"SELECT NOT :param_1" # TODO: should this make an anon label ??
)
self.assert_compile(
select([cast("data", Integer)], use_labels=True),
"SELECT CAST(:param_1 AS INTEGER) AS anon_1"
)
self.assert_compile(
select([func.sum(func.lala(table1.c.myid).label('foo')).label('bar')]),
"SELECT sum(lala(mytable.myid)) AS bar FROM mytable"
)
def test_paramstyles(self):
stmt = text("select :foo, :bar, :bat from sometable")
self.assert_compile(
stmt,
"select ?, ?, ? from sometable"
, dialect=default.DefaultDialect(paramstyle='qmark')
)
self.assert_compile(
stmt,
"select :foo, :bar, :bat from sometable"
, dialect=default.DefaultDialect(paramstyle='named')
)
self.assert_compile(
stmt,
"select %s, %s, %s from sometable"
, dialect=default.DefaultDialect(paramstyle='format')
)
self.assert_compile(
stmt,
"select :1, :2, :3 from sometable"
, dialect=default.DefaultDialect(paramstyle='numeric')
)
self.assert_compile(
stmt,
"select %(foo)s, %(bar)s, %(bat)s from sometable"
, dialect=default.DefaultDialect(paramstyle='pyformat')
)
def test_dupe_columns(self):
"""test that deduping is performed against clause element identity, not rendered result."""
self.assert_compile(
select([column('a'), column('a'), column('a')]),
"SELECT a, a, a"
, dialect=default.DefaultDialect()
)
c = column('a')
self.assert_compile(
select([c, c, c]),
"SELECT a"
, dialect=default.DefaultDialect()
)
a, b = column('a'), column('b')
self.assert_compile(
select([a, b, b, b, a, a]),
"SELECT a, b"
, dialect=default.DefaultDialect()
)
self.assert_compile(
select([bindparam('a'), bindparam('b'), bindparam('c')]),
"SELECT :a, :b, :c"
, dialect=default.DefaultDialect(paramstyle='named')
)
self.assert_compile(
select([bindparam('a'), bindparam('b'), bindparam('c')]),
"SELECT ?, ?, ?"
, dialect=default.DefaultDialect(paramstyle='qmark'),
)
self.assert_compile(
select(["a", "a", "a"]),
"SELECT a, a, a"
)
s = select([bindparam('a'), bindparam('b'), bindparam('c')])
s = s.compile(dialect=default.DefaultDialect(paramstyle='qmark'))
eq_(s.positiontup, ['a', 'b', 'c'])
def test_nested_uselabels(self):
"""test nested anonymous label generation. this
essentially tests the ANONYMOUS_LABEL regex.
"""
s1 = table1.select()
s2 = s1.alias()
s3 = select([s2], use_labels=True)
s4 = s3.alias()
s5 = select([s4], use_labels=True)
self.assert_compile(s5, "SELECT anon_1.anon_2_myid AS anon_1_anon_2_myid, anon_1.anon_2_name AS anon_1_anon_2_name, "\
"anon_1.anon_2_description AS anon_1_anon_2_description FROM (SELECT anon_2.myid AS anon_2_myid, anon_2.name AS anon_2_name, "\
"anon_2.description AS anon_2_description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description "\
"AS description FROM mytable) AS anon_2) AS anon_1")
def test_dont_overcorrelate(self):
self.assert_compile(select([table1], from_obj=[table1, table1.select()]), """SELECT mytable.myid, mytable.name, mytable.description FROM mytable, (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)""")
def test_full_correlate(self):
# intentional
t = table('t', column('a'), column('b'))
s = select([t.c.a]).where(t.c.a==1).correlate(t).as_scalar()
s2 = select([t.c.a, s])
self.assert_compile(s2, """SELECT t.a, (SELECT t.a WHERE t.a = :a_1) AS anon_1 FROM t""")
# unintentional
t2 = table('t2', column('c'), column('d'))
s = select([t.c.a]).where(t.c.a==t2.c.d).as_scalar()
s2 =select([t, t2, s])
assert_raises(exc.InvalidRequestError, str, s2)
# intentional again
s = s.correlate(t, t2)
s2 =select([t, t2, s])
self.assert_compile(s, "SELECT t.a WHERE t.a = t2.d")
def test_exists(self):
s = select([table1.c.myid]).where(table1.c.myid==5)
self.assert_compile(exists(s),
"EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
)
self.assert_compile(exists(s.as_scalar()),
"EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
)
self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)", params={'mytable_myid':5})
self.assert_compile(select([table1, exists([1], from_obj=table2)]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={})
self.assert_compile(select([table1, exists([1], from_obj=table2).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={})
self.assert_compile(
table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = mytable.myid)"
)
self.assert_compile(
table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = mytable.myid)"
)
self.assert_compile(
table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)).replace_selectable(table2, table2.alias()),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT * FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)"
)
self.assert_compile(
table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).replace_selectable(table2, table2.alias()),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable JOIN myothertable AS myothertable_1 ON mytable.myid = myothertable_1.otherid WHERE EXISTS (SELECT * FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)"
)
self.assert_compile(
select([
or_(
exists().where(table2.c.otherid=='foo'),
exists().where(table2.c.otherid=='bar')
)
]),
"SELECT (EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = :otherid_1)) "\
"OR (EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = :otherid_2)) AS anon_1"
)
def test_where_subquery(self):
s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s')
self.assert_compile(
select([users, s.c.street], from_obj=s),
"""SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""")
self.assert_compile(
table1.select(table1.c.myid == select([table1.c.myid], table1.c.name=='jack')),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.name = :name_1)"
)
self.assert_compile(
table1.select(table1.c.myid == select([table2.c.otherid], table1.c.name == table2.c.othername)),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT myothertable.otherid FROM myothertable WHERE mytable.name = myothertable.othername)"
)
self.assert_compile(
table1.select(exists([1], table2.c.otherid == table1.c.myid)),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)"
)
talias = table1.alias('ta')
s = subquery('sq2', [talias], exists([1], table2.c.otherid == talias.c.myid))
self.assert_compile(
select([s, table1])
,"SELECT sq2.myid, sq2.name, sq2.description, mytable.myid, mytable.name, mytable.description FROM (SELECT ta.myid AS myid, ta.name AS name, ta.description AS description FROM mytable AS ta WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = ta.myid)) AS sq2, mytable")
s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s')
self.assert_compile(
select([users, s.c.street], from_obj=s),
"""SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""")
# test constructing the outer query via append_column(), which occurs in the ORM's Query object
s = select([], exists([1], table2.c.otherid==table1.c.myid), from_obj=table1)
s.append_column(table1)
self.assert_compile(
s,
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)"
)
def test_orderby_subquery(self):
self.assert_compile(
table1.select(order_by=[select([table2.c.otherid], table1.c.myid==table2.c.otherid)]),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable ORDER BY (SELECT myothertable.otherid FROM myothertable WHERE mytable.myid = myothertable.otherid)"
)
self.assert_compile(
table1.select(order_by=[desc(select([table2.c.otherid], table1.c.myid==table2.c.otherid))]),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable ORDER BY (SELECT myothertable.otherid FROM myothertable WHERE mytable.myid = myothertable.otherid) DESC"
)
@testing.uses_deprecated('scalar option')
def test_scalar_select(self):
assert_raises_message(
exc.InvalidRequestError,
r"Select objects don't have a type\. Call as_scalar\(\) "
"on this Select object to return a 'scalar' version of this Select\.",
func.coalesce, select([table1.c.myid])
)
s = select([table1.c.myid], correlate=False).as_scalar()
self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable")
s = select([table1.c.myid]).as_scalar()
self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable")
s = select([table1.c.myid]).correlate(None).as_scalar()
self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable")
# test that aliases use as_scalar() when used in an explicitly scalar context
s = select([table1.c.myid]).alias()
self.assert_compile(select([table1.c.myid]).where(table1.c.myid==s), "SELECT mytable.myid FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable)")
self.assert_compile(select([table1.c.myid]).where(s > table1.c.myid), "SELECT mytable.myid FROM mytable WHERE mytable.myid < (SELECT mytable.myid FROM mytable)")
s = select([table1.c.myid]).as_scalar()
self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable")
# test expressions against scalar selects
self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :param_1 AS anon_1")
self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :param_1 AS anon_1")
self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :param_1 AS anon_1")
self.assert_compile(select([select([table1.c.name]).label('foo')]), "SELECT (SELECT mytable.name FROM mytable) AS foo")
# scalar selects should not have any attributes on their 'c' or 'columns' attribute
s = select([table1.c.myid]).as_scalar()
try:
s.c.foo
except exc.InvalidRequestError, err:
assert str(err) == 'Scalar Select expression has no columns; use this object directly within a column-level expression.'
try:
s.columns.foo
except exc.InvalidRequestError, err:
assert str(err) == 'Scalar Select expression has no columns; use this object directly within a column-level expression.'
zips = table('zips',
column('zipcode'),
column('latitude'),
column('longitude'),
)
places = table('places',
column('id'),
column('nm')
)
zip = '12345'
qlat = select([zips.c.latitude], zips.c.zipcode == zip).correlate(None).as_scalar()
qlng = select([zips.c.longitude], zips.c.zipcode == zip).correlate(None).as_scalar()
q = select([places.c.id, places.c.nm, zips.c.zipcode, func.latlondist(qlat, qlng).label('dist')],
zips.c.zipcode==zip,
order_by = ['dist', places.c.nm]
)
self.assert_compile(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE "
"zips.zipcode = :zipcode_1), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zipcode_2)) AS dist "
"FROM places, zips WHERE zips.zipcode = :zipcode_3 ORDER BY dist, places.nm")
zalias = zips.alias('main_zip')
qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode).as_scalar()
qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode).as_scalar()
q = select([places.c.id, places.c.nm, zalias.c.zipcode, func.latlondist(qlat, qlng).label('dist')],
order_by = ['dist', places.c.nm]
)
self.assert_compile(q, "SELECT places.id, places.nm, main_zip.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE zips.zipcode = main_zip.zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = main_zip.zipcode)) AS dist FROM places, zips AS main_zip ORDER BY dist, places.nm")
a1 = table2.alias('t2alias')
s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid).as_scalar()
j1 = table1.join(table2, table1.c.myid==table2.c.otherid)
s2 = select([table1, s1], from_obj=j1)
self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) AS anon_1 FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid")
def test_label_comparison(self):
x = func.lala(table1.c.myid).label('foo')
self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :param_1")
self.assert_compile(label('bar', column('foo', type_=String)) + "foo", "foo || :param_1")
def test_conjunctions(self):
a, b, c = 'a', 'b', 'c'
x = and_(a, b, c)
assert isinstance(x.type, Boolean)
assert str(x) == 'a AND b AND c'
self.assert_compile(
select([x.label('foo')]),
'SELECT a AND b AND c AS foo'
)
self.assert_compile(
and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()"),
"mytable.myid = :myid_1 AND mytable.name = :name_1 "\
"AND myothertable.othername = :othername_1 AND sysdate() = today()"
)
self.assert_compile(
and_(
table1.c.myid == 12,
or_(table2.c.othername=='asdf', table2.c.othername == 'foo', table2.c.otherid == 9),
"sysdate() = today()",
),
"mytable.myid = :myid_1 AND (myothertable.othername = :othername_1 OR "\
"myothertable.othername = :othername_2 OR myothertable.otherid = :otherid_1) AND sysdate() = today()",
checkparams = {'othername_1': 'asdf', 'othername_2':'foo', 'otherid_1': 9, 'myid_1': 12}
)
def test_distinct(self):
self.assert_compile(
select([table1.c.myid.distinct()]), "SELECT DISTINCT mytable.myid FROM mytable"
)
self.assert_compile(
select([distinct(table1.c.myid)]), "SELECT DISTINCT mytable.myid FROM mytable"
)
self.assert_compile(
select([table1.c.myid]).distinct(), "SELECT DISTINCT mytable.myid FROM mytable"
)
self.assert_compile(
select([func.count(table1.c.myid.distinct())]), "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable"
)
self.assert_compile(
select([func.count(distinct(table1.c.myid))]), "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable"
)
def test_operators(self):
for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'),
(operator.sub, '-'),
# Py3K
#(operator.truediv, '/'),
# Py2K
(operator.div, '/'),
# end Py2K
):
for (lhs, rhs, res) in (
(5, table1.c.myid, ':myid_1 %s mytable.myid'),
(5, literal(5), ':param_1 %s :param_2'),
(table1.c.myid, 'b', 'mytable.myid %s :myid_1'),
(table1.c.myid, literal(2.7), 'mytable.myid %s :param_1'),
(table1.c.myid, table1.c.myid, 'mytable.myid %s mytable.myid'),
(literal(5), 8, ':param_1 %s :param_2'),
(literal(6), table1.c.myid, ':param_1 %s mytable.myid'),
(literal(7), literal(5.5), ':param_1 %s :param_2'),
):
self.assert_compile(py_op(lhs, rhs), res % sql_op)
dt = datetime.datetime.today()
# exercise comparison operators
for (py_op, fwd_op, rev_op) in ((operator.lt, '<', '>'),
(operator.gt, '>', '<'),
(operator.eq, '=', '='),
(operator.ne, '!=', '!='),
(operator.le, '<=', '>='),
(operator.ge, '>=', '<=')):
for (lhs, rhs, l_sql, r_sql) in (
('a', table1.c.myid, ':myid_1', 'mytable.myid'),
('a', literal('b'), ':param_2', ':param_1'), # note swap!
(table1.c.myid, 'b', 'mytable.myid', ':myid_1'),
(table1.c.myid, literal('b'), 'mytable.myid', ':param_1'),
(table1.c.myid, table1.c.myid, 'mytable.myid', 'mytable.myid'),
(literal('a'), 'b', ':param_1', ':param_2'),
(literal('a'), table1.c.myid, ':param_1', 'mytable.myid'),
(literal('a'), literal('b'), ':param_1', ':param_2'),
(dt, literal('b'), ':param_2', ':param_1'),
(literal('b'), dt, ':param_1', ':param_2'),
):
# the compiled clause should match either (e.g.):
# 'a' < 'b' -or- 'b' > 'a'.
compiled = str(py_op(lhs, rhs))
fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql)
rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql)
self.assert_(compiled == fwd_sql or compiled == rev_sql,
"\n'" + compiled + "'\n does not match\n'" +
fwd_sql + "'\n or\n'" + rev_sql + "'")
for (py_op, op) in (
(operator.neg, '-'),
(operator.inv, 'NOT '),
):
for expr, sql in (
(table1.c.myid, "mytable.myid"),
(literal("foo"), ":param_1"),
):
compiled = str(py_op(expr))
sql = "%s%s" % (op, sql)
eq_(compiled, sql)
self.assert_compile(
table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :myid_1 AND mytable.name != :name_1"
)
self.assert_compile(
table1.select((table1.c.myid != 12) & ~(table1.c.name.between('jack','john'))),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :myid_1 AND "\
"NOT (mytable.name BETWEEN :name_1 AND :name_2)"
)
self.assert_compile(
table1.select((table1.c.myid != 12) & ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :myid_1 AND "\
"NOT (mytable.name = :name_1 AND mytable.name = :name_2 AND mytable.name = :name_3)"
)
self.assert_compile(
table1.select((table1.c.myid != 12) & ~table1.c.name),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :myid_1 AND NOT mytable.name"
)
self.assert_compile(
literal("a") + literal("b") * literal("c"), ":param_1 || :param_2 * :param_3"
)
# test the op() function, also that its results are further usable in expressions
self.assert_compile(
table1.select(table1.c.myid.op('hoho')(12)==14),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :myid_1) = :param_1"
)
# test that clauses can be pickled (operators need to be module-level, etc.)
clause = (table1.c.myid == 12) & table1.c.myid.between(15, 20) & table1.c.myid.like('hoho')
assert str(clause) == str(util.pickle.loads(util.pickle.dumps(clause)))
def test_like(self):
for expr, check, dialect in [
(table1.c.myid.like('somstr'), "mytable.myid LIKE :myid_1", None),
(~table1.c.myid.like('somstr'), "mytable.myid NOT LIKE :myid_1", None),
(table1.c.myid.like('somstr', escape='\\'), "mytable.myid LIKE :myid_1 ESCAPE '\\'", None),
(~table1.c.myid.like('somstr', escape='\\'), "mytable.myid NOT LIKE :myid_1 ESCAPE '\\'", None),
(table1.c.myid.ilike('somstr', escape='\\'), "lower(mytable.myid) LIKE lower(:myid_1) ESCAPE '\\'", None),
(~table1.c.myid.ilike('somstr', escape='\\'), "lower(mytable.myid) NOT LIKE lower(:myid_1) ESCAPE '\\'", None),
(table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid ILIKE %(myid_1)s ESCAPE '\\'", postgresql.PGDialect()),
(~table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid NOT ILIKE %(myid_1)s ESCAPE '\\'", postgresql.PGDialect()),
(table1.c.name.ilike('%something%'), "lower(mytable.name) LIKE lower(:name_1)", None),
(table1.c.name.ilike('%something%'), "mytable.name ILIKE %(name_1)s", postgresql.PGDialect()),
(~table1.c.name.ilike('%something%'), "lower(mytable.name) NOT LIKE lower(:name_1)", None),
(~table1.c.name.ilike('%something%'), "mytable.name NOT ILIKE %(name_1)s", postgresql.PGDialect()),
]:
self.assert_compile(expr, check, dialect=dialect)
def test_match(self):
for expr, check, dialect in [
(table1.c.myid.match('somstr'), "mytable.myid MATCH ?", sqlite.SQLiteDialect()),
(table1.c.myid.match('somstr'), "MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)", mysql.dialect()),
(table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", mssql.dialect()),
(table1.c.myid.match('somstr'), "mytable.myid @@ to_tsquery(%(myid_1)s)", postgresql.dialect()),
(table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", oracle.dialect()),
]:
self.assert_compile(expr, check, dialect=dialect)
def test_composed_string_comparators(self):
self.assert_compile(
table1.c.name.contains('jo'), "mytable.name LIKE '%%' || :name_1 || '%%'" , checkparams = {'name_1': u'jo'},
)
self.assert_compile(
table1.c.name.contains('jo'), "mytable.name LIKE concat(concat('%%', %s), '%%')" , checkparams = {'name_1': u'jo'},
dialect=mysql.dialect()
)
self.assert_compile(
table1.c.name.contains('jo', escape='\\'), "mytable.name LIKE '%%' || :name_1 || '%%' ESCAPE '\\'" , checkparams = {'name_1': u'jo'},
)
self.assert_compile( table1.c.name.startswith('jo', escape='\\'), "mytable.name LIKE :name_1 || '%%' ESCAPE '\\'" )
self.assert_compile( table1.c.name.endswith('jo', escape='\\'), "mytable.name LIKE '%%' || :name_1 ESCAPE '\\'" )
self.assert_compile( table1.c.name.endswith('hn'), "mytable.name LIKE '%%' || :name_1", checkparams = {'name_1': u'hn'}, )
self.assert_compile(
table1.c.name.endswith('hn'), "mytable.name LIKE concat('%%', %s)",
checkparams = {'name_1': u'hn'}, dialect=mysql.dialect()
)
self.assert_compile(
table1.c.name.startswith(u"hi \xf6 \xf5"), "mytable.name LIKE :name_1 || '%%'",
checkparams = {'name_1': u'hi \xf6 \xf5'},
)
self.assert_compile(column('name').endswith(text("'foo'")), "name LIKE '%%' || 'foo'" )
self.assert_compile(column('name').endswith(literal_column("'foo'")), "name LIKE '%%' || 'foo'" )
self.assert_compile(column('name').startswith(text("'foo'")), "name LIKE 'foo' || '%%'" )
self.assert_compile(column('name').startswith(text("'foo'")), "name LIKE concat('foo', '%%')", dialect=mysql.dialect())
self.assert_compile(column('name').startswith(literal_column("'foo'")), "name LIKE 'foo' || '%%'" )
self.assert_compile(column('name').startswith(literal_column("'foo'")), "name LIKE concat('foo', '%%')", dialect=mysql.dialect())
def test_multiple_col_binds(self):
self.assert_compile(
select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf', table1.c.myid == 'foo')),
"SELECT * FROM mytable WHERE mytable.myid = :myid_1 OR mytable.myid = :myid_2 OR mytable.myid = :myid_3"
)
def test_orderby_groupby(self):
self.assert_compile(
table2.select(order_by = [table2.c.otherid, asc(table2.c.othername)]),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid, myothertable.othername ASC"
)
self.assert_compile(
table2.select(order_by = [table2.c.otherid, table2.c.othername.desc()]),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
)
# generative order_by
self.assert_compile(
table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
)
self.assert_compile(
table2.select().order_by(table2.c.otherid).
order_by(table2.c.othername.desc()).order_by(None),
"SELECT myothertable.otherid, myothertable.othername FROM myothertable"
)
self.assert_compile(
select(
[table2.c.othername, func.count(table2.c.otherid)],
group_by = [table2.c.othername]),
"SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
"FROM myothertable GROUP BY myothertable.othername"
)
# generative group by
self.assert_compile(
select([table2.c.othername, func.count(table2.c.otherid)]).
group_by(table2.c.othername),
"SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
"FROM myothertable GROUP BY myothertable.othername"
)
self.assert_compile(
select([table2.c.othername, func.count(table2.c.otherid)]).
group_by(table2.c.othername).group_by(None),
"SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
"FROM myothertable"
)
self.assert_compile(
select([table2.c.othername, func.count(table2.c.otherid)],
group_by = [table2.c.othername],
order_by = [table2.c.othername]),
"SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
"FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername"
)
def test_for_update(self):
self.assert_compile(
table1.select(table1.c.myid==7, for_update=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
self.assert_compile(
table1.select(table1.c.myid==7, for_update="nowait"),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
self.assert_compile(
table1.select(table1.c.myid==7, for_update="nowait"),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT",
dialect=oracle.dialect())
self.assert_compile(
table1.select(table1.c.myid==7, for_update="read"),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE",
dialect=mysql.dialect())
self.assert_compile(
table1.select(table1.c.myid==7, for_update=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s FOR UPDATE",
dialect=mysql.dialect())
self.assert_compile(
table1.select(table1.c.myid==7, for_update=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE",
dialect=oracle.dialect())
def test_alias(self):
# test the alias for a table1. column names stay the same, table name "changes" to "foo".
self.assert_compile(
select([table1.alias('foo')])
,"SELECT foo.myid, foo.name, foo.description FROM mytable AS foo")
for dialect in (oracle.dialect(),):
self.assert_compile(
select([table1.alias('foo')])
,"SELECT foo.myid, foo.name, foo.description FROM mytable foo"
,dialect=dialect)
self.assert_compile(
select([table1.alias()]),
"SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
"FROM mytable AS mytable_1")
# create a select for a join of two tables. use_labels
# means the column names will have labels tablename_columnname,
# which become the column keys accessible off the Selectable object.
# also, only use one column from the second table and all columns
# from the first table1.
q = select(
[table1, table2.c.otherid],
table1.c.myid == table2.c.otherid, use_labels = True
)
# make an alias of the "selectable". column names
# stay the same (i.e. the labels), table name "changes" to "t2view".
a = alias(q, 't2view')
# select from that alias, also using labels. two levels of labels should produce two underscores.
# also, reference the column "mytable_myid" off of the t2view alias.
self.assert_compile(
a.select(a.c.mytable_myid == 9, use_labels = True),
"SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name "
"AS t2view_mytable_name, t2view.mytable_description AS t2view_mytable_description, "
"t2view.myothertable_otherid AS t2view_myothertable_otherid FROM "
"(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, "
"mytable.description AS mytable_description, myothertable.otherid AS "
"myothertable_otherid FROM mytable, myothertable WHERE mytable.myid = "
"myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :mytable_myid_1"
)
def test_prefixes(self):
self.assert_compile(table1.select().prefix_with("SQL_CALC_FOUND_ROWS").prefix_with("SQL_SOME_WEIRD_MYSQL_THING"),
"SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING "
"mytable.myid, mytable.name, mytable.description FROM mytable"
)
def test_text(self):
self.assert_compile(
text("select * from foo where lala = bar") ,
"select * from foo where lala = bar"
)
# test bytestring
self.assert_compile(select(
["foobar(a)", "pk_foo_bar(syslaal)"],
"a = 12",
from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"]
),
"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
"left outer join lala on foobar.foo = lala.foo WHERE a = 12"
)
# test unicode
self.assert_compile(select(
[u"foobar(a)", u"pk_foo_bar(syslaal)"],
u"a = 12",
from_obj = [u"foobar left outer join lala on foobar.foo = lala.foo"]
),
"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
"left outer join lala on foobar.foo = lala.foo WHERE a = 12"
)
# test building a select query programmatically with text
s = select()
s.append_column("column1")
s.append_column("column2")
s.append_whereclause("column1=12")
s.append_whereclause("column2=19")
s = s.order_by("column1")
s.append_from("table1")
self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE "
"column1=12 AND column2=19 ORDER BY column1")
self.assert_compile(
select(["column1", "column2"], from_obj=table1).alias('somealias').select(),
"SELECT somealias.column1, somealias.column2 FROM "
"(SELECT column1, column2 FROM mytable) AS somealias"
)
# test that use_labels doesnt interfere with literal columns
self.assert_compile(
select(["column1", "column2", table1.c.myid], from_obj=table1, use_labels=True),
"SELECT column1, column2, mytable.myid AS mytable_myid FROM mytable"
)
# test that use_labels doesnt interfere with literal columns that have textual labels
self.assert_compile(
select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=table1, use_labels=True),
"SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable"
)
s1 = select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1])
# test that "auto-labeling of subquery columns" doesnt interfere with literal columns,
# exported columns dont get quoted
self.assert_compile(
select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1]).select(),
"SELECT column1 AS foobar, column2 AS hoho, myid FROM "
"(SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)"
)
self.assert_compile(
select(['col1','col2'], from_obj='tablename').alias('myalias'),
"SELECT col1, col2 FROM tablename"
)
def test_binds_in_text(self):
self.assert_compile(
text("select * from foo where lala=:bar and hoho=:whee",
bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
"select * from foo where lala=:bar and hoho=:whee",
checkparams={'bar':4, 'whee': 7},
)
self.assert_compile(
text("select * from foo where clock='05:06:07'"),
"select * from foo where clock='05:06:07'",
checkparams={},
params={},
)
dialect = postgresql.dialect()
self.assert_compile(
text("select * from foo where lala=:bar and hoho=:whee",
bindparams=[bindparam('bar',4), bindparam('whee',7)]),
"select * from foo where lala=%(bar)s and hoho=%(whee)s",
checkparams={'bar':4, 'whee': 7},
dialect=dialect
)
# test escaping out text() params with a backslash
self.assert_compile(
text("select * from foo where clock='05:06:07' and mork='\:mindy'"),
"select * from foo where clock='05:06:07' and mork=':mindy'",
checkparams={},
params={},
dialect=dialect
)
dialect = sqlite.dialect()
self.assert_compile(
text("select * from foo where lala=:bar and hoho=:whee",
bindparams=[bindparam('bar',4), bindparam('whee',7)]),
"select * from foo where lala=? and hoho=?",
checkparams={'bar':4, 'whee':7},
dialect=dialect
)
self.assert_compile(select(
[table1, table2.c.otherid, "sysdate()", "foo, bar, lala"],
and_(
"foo.id = foofoo(lala)",
"datetime(foo) = Today",
table1.c.myid == table2.c.otherid,
)
),
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, sysdate(), foo, bar, lala "
"FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND "
"datetime(foo) = Today AND mytable.myid = myothertable.otherid")
self.assert_compile(select(
[alias(table1, 't'), "foo.f"],
"foo.f = t.id",
from_obj = ["(select f from bar where lala=heyhey) foo"]
),
"SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, "
"(select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
# test Text embedded within select_from(), using binds
generate_series = text(
"generate_series(:x, :y, :z) as s(a)",
bindparams=[bindparam('x'), bindparam('y'), bindparam('z')]
)
s =select([
(func.current_date() + literal_column("s.a")).label("dates")
]).select_from(generate_series)
self.assert_compile(
s,
"SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
checkparams={'y': None, 'x': None, 'z': None}
)
self.assert_compile(
s.params(x=5, y=6, z=7),
"SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
checkparams={'y': 6, 'x': 5, 'z': 7}
)
@testing.emits_warning('.*empty sequence.*')
def test_render_binds_as_literal(self):
"""test a compiler that renders binds inline into
SQL in the columns clause."""
dialect = default.DefaultDialect()
class Compiler(dialect.statement_compiler):
ansi_bind_rules = True
dialect.statement_compiler = Compiler
self.assert_compile(
select([literal("someliteral")]),
"SELECT 'someliteral'",
dialect=dialect
)
self.assert_compile(
select([table1.c.myid + 3]),
"SELECT mytable.myid + 3 AS anon_1 FROM mytable",
dialect=dialect
)
self.assert_compile(
select([table1.c.myid.in_([4, 5, 6])]),
"SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable",
dialect=dialect
)
self.assert_compile(
select([func.mod(table1.c.myid, 5)]),
"SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable",
dialect=dialect
)
self.assert_compile(
select([literal("foo").in_([])]),
"SELECT 'foo' != 'foo' AS anon_1",
dialect=dialect
)
assert_raises(
exc.CompileError,
bindparam("foo").in_([]).compile, dialect=dialect
)
def test_literal(self):
self.assert_compile(select([literal('foo')]), "SELECT :param_1")
self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]),
"SELECT :param_1 || :param_2 AS anon_1 FROM mytable")
def test_calculated_columns(self):
value_tbl = table('values',
column('id', Integer),
column('val1', Float),
column('val2', Float),
)
self.assert_compile(
select([value_tbl.c.id, (value_tbl.c.val2 -
value_tbl.c.val1)/value_tbl.c.val1]),
"SELECT values.id, (values.val2 - values.val1) / values.val1 AS anon_1 FROM values"
)
self.assert_compile(
select([value_tbl.c.id], (value_tbl.c.val2 -
value_tbl.c.val1)/value_tbl.c.val1 > 2.0),
"SELECT values.id FROM values WHERE (values.val2 - values.val1) / values.val1 > :param_1"
)
self.assert_compile(
select([value_tbl.c.id], value_tbl.c.val1 / (value_tbl.c.val2 - value_tbl.c.val1) /value_tbl.c.val1 > 2.0),
"SELECT values.id FROM values WHERE values.val1 / (values.val2 - values.val1) / values.val1 > :param_1"
)
def test_collate(self):
for expr in (select([table1.c.name.collate('latin1_german2_ci')]),
select([collate(table1.c.name, 'latin1_german2_ci')])):
self.assert_compile(
expr, "SELECT mytable.name COLLATE latin1_german2_ci AS anon_1 FROM mytable")
assert table1.c.name.collate('latin1_german2_ci').type is table1.c.name.type
expr = select([table1.c.name.collate('latin1_german2_ci').label('k1')]).order_by('k1')
self.assert_compile(expr,"SELECT mytable.name COLLATE latin1_german2_ci AS k1 FROM mytable ORDER BY k1")
expr = select([collate('foo', 'latin1_german2_ci').label('k1')])
self.assert_compile(expr,"SELECT :param_1 COLLATE latin1_german2_ci AS k1")
expr = select([table1.c.name.collate('latin1_german2_ci').like('%x%')])
self.assert_compile(expr,
"SELECT mytable.name COLLATE latin1_german2_ci "
"LIKE :param_1 AS anon_1 FROM mytable")
expr = select([table1.c.name.like(collate('%x%', 'latin1_german2_ci'))])
self.assert_compile(expr,
"SELECT mytable.name "
"LIKE :param_1 COLLATE latin1_german2_ci AS anon_1 "
"FROM mytable")
expr = select([table1.c.name.collate('col1').like(
collate('%x%', 'col2'))])
self.assert_compile(expr,
"SELECT mytable.name COLLATE col1 "
"LIKE :param_1 COLLATE col2 AS anon_1 "
"FROM mytable")
expr = select([func.concat('a', 'b').collate('latin1_german2_ci').label('x')])
self.assert_compile(expr,
"SELECT concat(:param_1, :param_2) "
"COLLATE latin1_german2_ci AS x")
expr = select([table1.c.name]).order_by(table1.c.name.collate('latin1_german2_ci'))
self.assert_compile(expr, "SELECT mytable.name FROM mytable ORDER BY mytable.name COLLATE latin1_german2_ci")
def test_percent_chars(self):
t = table("table%name",
column("percent%"),
column("%(oneofthese)s"),
column("spaces % more spaces"),
)
self.assert_compile(
t.select(use_labels=True),
'''SELECT "table%name"."percent%" AS "table%name_percent%", '''\
'''"table%name"."%(oneofthese)s" AS "table%name_%(oneofthese)s", '''\
'''"table%name"."spaces % more spaces" AS "table%name_spaces % more spaces" FROM "table%name"'''
)
def test_joins(self):
self.assert_compile(
join(table2, table1, table1.c.myid == table2.c.otherid).select(),
"SELECT myothertable.otherid, myothertable.othername, "
"mytable.myid, mytable.name, mytable.description FROM "
"myothertable JOIN mytable ON mytable.myid = myothertable.otherid"
)
self.assert_compile(
select(
[table1],
from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid)]
),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable JOIN myothertable ON mytable.myid = myothertable.otherid")
self.assert_compile(
select(
[join(join(table1, table2, table1.c.myid == table2.c.otherid),
table3, table1.c.myid == table3.c.userid)]
),
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername, thirdtable.userid, "
"thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid ="
" myothertable.otherid JOIN thirdtable ON mytable.myid = thirdtable.userid"
)
self.assert_compile(
join(users, addresses, users.c.user_id==addresses.c.user_id).select(),
"SELECT users.user_id, users.user_name, users.password, "
"addresses.address_id, addresses.user_id, addresses.street, "
"addresses.city, addresses.state, addresses.zip FROM users JOIN addresses "
"ON users.user_id = addresses.user_id"
)
self.assert_compile(
select([table1, table2, table3],
from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid).
outerjoin(table3, table1.c.myid==table3.c.userid)]
)
,"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername, thirdtable.userid,"
" thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid "
"= myothertable.otherid LEFT OUTER JOIN thirdtable ON mytable.myid ="
" thirdtable.userid"
)
self.assert_compile(
select([table1, table2, table3],
from_obj = [outerjoin(table1,
join(table2, table3, table2.c.otherid == table3.c.userid),
table1.c.myid==table2.c.otherid)]
)
,"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername, thirdtable.userid,"
" thirdtable.otherstuff FROM mytable LEFT OUTER JOIN (myothertable "
"JOIN thirdtable ON myothertable.otherid = thirdtable.userid) ON "
"mytable.myid = myothertable.otherid"
)
query = select(
[table1, table2],
or_(
table1.c.name == 'fred',
table1.c.myid == 10,
table2.c.othername != 'jack',
"EXISTS (select yay from foo where boo = lar)"
),
from_obj = [ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ]
)
self.assert_compile(query,
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername "
"FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = "
"myothertable.otherid WHERE mytable.name = :name_1 OR "
"mytable.myid = :myid_1 OR myothertable.othername != :othername_1 "
"OR EXISTS (select yay from foo where boo = lar)",
)
def test_compound_selects(self):
try:
union(table3.select(), table1.select())
except exc.ArgumentError, err:
assert str(err) == "All selectables passed to CompoundSelect must have identical numbers of columns; select #1 has 2 columns, select #2 has 3"
x = union(
select([table1], table1.c.myid == 5),
select([table1], table1.c.myid == 12),
order_by = [table1.c.myid],
)
self.assert_compile(x, "SELECT mytable.myid, mytable.name, mytable.description "\
"FROM mytable WHERE mytable.myid = :myid_1 UNION "\
"SELECT mytable.myid, mytable.name, mytable.description "\
"FROM mytable WHERE mytable.myid = :myid_2 ORDER BY mytable.myid")
x = union(
select([table1]),
select([table1])
)
x = union(x, select([table1]))
self.assert_compile(x, "(SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable UNION SELECT mytable.myid, mytable.name, "
"mytable.description FROM mytable) UNION SELECT mytable.myid,"
" mytable.name, mytable.description FROM mytable")
u1 = union(
select([table1.c.myid, table1.c.name]),
select([table2]),
select([table3])
)
self.assert_compile(u1, "SELECT mytable.myid, mytable.name "
"FROM mytable UNION SELECT myothertable.otherid, "
"myothertable.othername FROM myothertable "
"UNION SELECT thirdtable.userid, thirdtable.otherstuff "
"FROM thirdtable")
assert u1.corresponding_column(table2.c.otherid) is u1.c.myid
# TODO - why is there an extra space before the LIMIT ?
self.assert_compile(
union(
select([table1.c.myid, table1.c.name]),
select([table2]),
order_by=['myid'],
offset=10,
limit=5
),
"SELECT mytable.myid, mytable.name "
"FROM mytable UNION SELECT myothertable.otherid, myothertable.othername "
"FROM myothertable ORDER BY myid LIMIT 5 OFFSET 10"
)
self.assert_compile(
union(
select([table1.c.myid, table1.c.name, func.max(table1.c.description)],
table1.c.name=='name2',
group_by=[table1.c.myid, table1.c.name]),
table1.select(table1.c.name=='name1')
),
"SELECT mytable.myid, mytable.name, max(mytable.description) AS max_1 "
"FROM mytable WHERE mytable.name = :name_1 GROUP BY mytable.myid, "
"mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.name = :name_2"
)
self.assert_compile(
union(
select([literal(100).label('value')]),
select([literal(200).label('value')])
),
"SELECT :param_1 AS value UNION SELECT :param_2 AS value"
)
self.assert_compile(
union_all(
select([table1.c.myid]),
union(
select([table2.c.otherid]),
select([table3.c.userid]),
)
)
,
"SELECT mytable.myid FROM mytable UNION ALL "
"(SELECT myothertable.otherid FROM myothertable UNION "
"SELECT thirdtable.userid FROM thirdtable)"
)
s = select([column('foo'), column('bar')])
# ORDER BY's even though not supported by all DB's, are rendered if requested
self.assert_compile(union(s.order_by("foo"), s.order_by("bar")),
"SELECT foo, bar ORDER BY foo UNION SELECT foo, bar ORDER BY bar"
)
# self_group() is honored
self.assert_compile(
union(s.order_by("foo").self_group(), s.order_by("bar").limit(10).self_group()),
"(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, bar ORDER BY bar LIMIT 10)"
)
def test_compound_grouping(self):
s = select([column('foo'), column('bar')]).select_from('bat')
self.assert_compile(
union(union(union(s, s), s), s),
"((SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) "
"UNION SELECT foo, bar FROM bat) UNION SELECT foo, bar FROM bat"
)
self.assert_compile(
union(s, s, s, s),
"SELECT foo, bar FROM bat UNION SELECT foo, bar "
"FROM bat UNION SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat"
)
self.assert_compile(
union(s, union(s, union(s, s))),
"SELECT foo, bar FROM bat UNION (SELECT foo, bar FROM bat "
"UNION (SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat))"
)
self.assert_compile(
select([s.alias()]),
'SELECT anon_1.foo, anon_1.bar FROM (SELECT foo, bar FROM bat) AS anon_1'
)
self.assert_compile(
select([union(s, s).alias()]),
'SELECT anon_1.foo, anon_1.bar FROM '
'(SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) AS anon_1'
)
self.assert_compile(
select([except_(s, s).alias()]),
'SELECT anon_1.foo, anon_1.bar FROM '
'(SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat) AS anon_1'
)
# this query sqlite specifically chokes on
self.assert_compile(
union(
except_(s, s),
s
),
"(SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat) "
"UNION SELECT foo, bar FROM bat"
)
self.assert_compile(
union(
s,
except_(s, s),
),
"SELECT foo, bar FROM bat "
"UNION (SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat)"
)
# this solves it
self.assert_compile(
union(
except_(s, s).alias().select(),
s
),
"SELECT anon_1.foo, anon_1.bar FROM "
"(SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat) AS anon_1 "
"UNION SELECT foo, bar FROM bat"
)
self.assert_compile(
except_(
union(s, s),
union(s, s)
),
"(SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) "
"EXCEPT (SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat)"
)
s2 = union(s, s)
s3 = union(s2, s2)
self.assert_compile(s3, "(SELECT foo, bar FROM bat "
"UNION SELECT foo, bar FROM bat) "
"UNION (SELECT foo, bar FROM bat "
"UNION SELECT foo, bar FROM bat)")
self.assert_compile(
union(
intersect(s, s),
intersect(s, s)
),
"(SELECT foo, bar FROM bat INTERSECT SELECT foo, bar FROM bat) "
"UNION (SELECT foo, bar FROM bat INTERSECT SELECT foo, bar FROM bat)"
)
@testing.uses_deprecated()
def test_binds(self):
for (
stmt,
expected_named_stmt,
expected_positional_stmt,
expected_default_params_dict,
expected_default_params_list,
test_param_dict,
expected_test_params_dict,
expected_test_params_list
) in [
(
select(
[table1, table2],
and_(
table1.c.myid == table2.c.otherid,
table1.c.name == bindparam('mytablename')
)),
"""SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid AND mytable.name = :mytablename""",
"""SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid AND mytable.name = ?""",
{'mytablename':None}, [None],
{'mytablename':5}, {'mytablename':5}, [5]
),
(
select([table1], or_(table1.c.myid==bindparam('myid'), table2.c.otherid==bindparam('myid'))),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid OR myothertable.otherid = :myid",
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?",
{'myid':None}, [None, None],
{'myid':5}, {'myid':5}, [5,5]
),
(
text("SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid OR myothertable.otherid = :myid"),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid OR myothertable.otherid = :myid",
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?",
{'myid':None}, [None, None],
{'myid':5}, {'myid':5}, [5,5]
),
(
select([table1], or_(table1.c.myid==bindparam('myid', unique=True), table2.c.otherid==bindparam('myid', unique=True))),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid_1 OR myothertable.otherid = :myid_2",
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?",
{'myid_1':None, 'myid_2':None}, [None, None],
{'myid_1':5, 'myid_2': 6}, {'myid_1':5, 'myid_2':6}, [5,6]
),
(
bindparam('test', type_=String) + text("'hi'"),
":test || 'hi'",
"? || 'hi'",
{'test':None}, [None],
{}, {'test':None}, [None]
),
(
select([table1], or_(table1.c.myid==bindparam('myid'), table2.c.otherid==bindparam('myotherid'))).params({'myid':8, 'myotherid':7}),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid OR myothertable.otherid = :myotherid",
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?",
{'myid':8, 'myotherid':7}, [8, 7],
{'myid':5}, {'myid':5, 'myotherid':7}, [5,7]
),
(
select([table1], or_(table1.c.myid==bindparam('myid', value=7, unique=True), table2.c.otherid==bindparam('myid', value=8, unique=True))),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid_1 OR myothertable.otherid = :myid_2",
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?",
{'myid_1':7, 'myid_2':8}, [7,8],
{'myid_1':5, 'myid_2':6}, {'myid_1':5, 'myid_2':6}, [5,6]
),
]:
self.assert_compile(stmt, expected_named_stmt, params=expected_default_params_dict)
self.assert_compile(stmt, expected_positional_stmt, dialect=sqlite.dialect())
nonpositional = stmt.compile()
positional = stmt.compile(dialect=sqlite.dialect())
pp = positional.params
assert [pp[k] for k in positional.positiontup] == expected_default_params_list
assert nonpositional.construct_params(test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict)))
pp = positional.construct_params(test_param_dict)
assert [pp[k] for k in positional.positiontup] == expected_test_params_list
# check that params() doesnt modify original statement
s = select([table1], or_(table1.c.myid==bindparam('myid'), table2.c.otherid==bindparam('myotherid')))
s2 = s.params({'myid':8, 'myotherid':7})
s3 = s2.params({'myid':9})
assert s.compile().params == {'myid':None, 'myotherid':None}
assert s2.compile().params == {'myid':8, 'myotherid':7}
assert s3.compile().params == {'myid':9, 'myotherid':7}
# test using same 'unique' param object twice in one compile
s = select([table1.c.myid]).where(table1.c.myid==12).as_scalar()
s2 = select([table1, s], table1.c.myid==s)
self.assert_compile(s2,
"SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable WHERE mytable.myid = "\
":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)")
positional = s2.compile(dialect=sqlite.dialect())
pp = positional.params
assert [pp[k] for k in positional.positiontup] == [12, 12]
# check that conflicts with "unique" params are caught
s = select([table1], or_(table1.c.myid==7, table1.c.myid==bindparam('myid_1')))
assert_raises_message(exc.CompileError, "conflicts with unique bind parameter of the same name", str, s)
s = select([table1], or_(table1.c.myid==7, table1.c.myid==8, table1.c.myid==bindparam('myid_1')))
assert_raises_message(exc.CompileError, "conflicts with unique bind parameter of the same name", str, s)
def test_binds_no_hash_collision(self):
"""test that construct_params doesn't corrupt dict due to hash collisions"""
total_params = 100000
in_clause = [':in%d' % i for i in range(total_params)]
params = dict(('in%d' % i, i) for i in range(total_params))
sql = 'text clause %s' % ', '.join(in_clause)
t = text(sql)
eq_(len(t.bindparams), total_params)
c = t.compile()
pp = c.construct_params(params)
eq_(len(set(pp)), total_params, '%s %s' % (len(set(pp)), len(pp)))
eq_(len(set(pp.values())), total_params)
def test_bind_as_col(self):
t = table('foo', column('id'))
s = select([t, literal('lala').label('hoho')])
self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo")
assert [str(c) for c in s.c] == ["id", "hoho"]
@testing.emits_warning('.*empty sequence.*')
def test_in(self):
self.assert_compile(table1.c.myid.in_(['a']),
"mytable.myid IN (:myid_1)")
self.assert_compile(~table1.c.myid.in_(['a']),
"mytable.myid NOT IN (:myid_1)")
self.assert_compile(table1.c.myid.in_(['a', 'b']),
"mytable.myid IN (:myid_1, :myid_2)")
self.assert_compile(table1.c.myid.in_(iter(['a', 'b'])),
"mytable.myid IN (:myid_1, :myid_2)")
self.assert_compile(table1.c.myid.in_([literal('a')]),
"mytable.myid IN (:param_1)")
self.assert_compile(table1.c.myid.in_([literal('a'), 'b']),
"mytable.myid IN (:param_1, :myid_1)")
self.assert_compile(table1.c.myid.in_([literal('a'), literal('b')]),
"mytable.myid IN (:param_1, :param_2)")
self.assert_compile(table1.c.myid.in_(['a', literal('b')]),
"mytable.myid IN (:myid_1, :param_1)")
self.assert_compile(table1.c.myid.in_([literal(1) + 'a']),
"mytable.myid IN (:param_1 + :param_2)")
self.assert_compile(table1.c.myid.in_([literal('a') +'a', 'b']),
"mytable.myid IN (:param_1 || :param_2, :myid_1)")
self.assert_compile(table1.c.myid.in_([literal('a') + literal('a'), literal('b')]),
"mytable.myid IN (:param_1 || :param_2, :param_3)")
self.assert_compile(table1.c.myid.in_([1, literal(3) + 4]),
"mytable.myid IN (:myid_1, :param_1 + :param_2)")
self.assert_compile(table1.c.myid.in_([literal('a') < 'b']),
"mytable.myid IN (:param_1 < :param_2)")
self.assert_compile(table1.c.myid.in_([table1.c.myid]),
"mytable.myid IN (mytable.myid)")
self.assert_compile(table1.c.myid.in_(['a', table1.c.myid]),
"mytable.myid IN (:myid_1, mytable.myid)")
self.assert_compile(table1.c.myid.in_([literal('a'), table1.c.myid]),
"mytable.myid IN (:param_1, mytable.myid)")
self.assert_compile(table1.c.myid.in_([literal('a'), table1.c.myid +'a']),
"mytable.myid IN (:param_1, mytable.myid + :myid_1)")
self.assert_compile(table1.c.myid.in_([literal(1), 'a' + table1.c.myid]),
"mytable.myid IN (:param_1, :myid_1 + mytable.myid)")
self.assert_compile(table1.c.myid.in_([1, 2, 3]),
"mytable.myid IN (:myid_1, :myid_2, :myid_3)")
self.assert_compile(table1.c.myid.in_(select([table2.c.otherid])),
"mytable.myid IN (SELECT myothertable.otherid FROM myothertable)")
self.assert_compile(~table1.c.myid.in_(select([table2.c.otherid])),
"mytable.myid NOT IN (SELECT myothertable.otherid FROM myothertable)")
# test empty in clause
self.assert_compile(table1.c.myid.in_([]),
"mytable.myid != mytable.myid")
self.assert_compile(
select([table1.c.myid.in_(select([table2.c.otherid]))]),
"SELECT mytable.myid IN (SELECT myothertable.otherid FROM myothertable) AS anon_1 FROM mytable"
)
self.assert_compile(
select([table1.c.myid.in_(select([table2.c.otherid]).as_scalar())]),
"SELECT mytable.myid IN (SELECT myothertable.otherid FROM myothertable) AS anon_1 FROM mytable"
)
self.assert_compile(table1.c.myid.in_(
union(
select([table1.c.myid], table1.c.myid == 5),
select([table1.c.myid], table1.c.myid == 12),
)
), "mytable.myid IN ("\
"SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1 "\
"UNION SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_2)")
# test that putting a select in an IN clause does not blow away its ORDER BY clause
self.assert_compile(
select([table1, table2],
table2.c.otherid.in_(
select([table2.c.otherid], order_by=[table2.c.othername], limit=10, correlate=False)
),
from_obj=[table1.join(table2, table1.c.myid==table2.c.otherid)], order_by=[table1.c.myid]
),
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername FROM mytable "\
"JOIN myothertable ON mytable.myid = myothertable.otherid WHERE myothertable.otherid IN (SELECT myothertable.otherid "\
"FROM myothertable ORDER BY myothertable.othername LIMIT 10) ORDER BY mytable.myid"
)
def test_tuple(self):
self.assert_compile(tuple_(table1.c.myid, table1.c.name).in_([(1, 'foo'), (5, 'bar')]),
"(mytable.myid, mytable.name) IN ((:param_1, :param_2), (:param_3, :param_4))"
)
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_(
[tuple_(table2.c.otherid, table2.c.othername)]
),
"(mytable.myid, mytable.name) IN (myothertable.otherid, myothertable.othername)"
)
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_(
select([table2.c.otherid, table2.c.othername])
),
"(mytable.myid, mytable.name) IN (SELECT "
"myothertable.otherid, myothertable.othername FROM myothertable)"
)
def test_cast(self):
tbl = table('casttest',
column('id', Integer),
column('v1', Float),
column('v2', Float),
column('ts', TIMESTAMP),
)
def check_results(dialect, expected_results, literal):
eq_(len(expected_results), 5, 'Incorrect number of expected results')
eq_(str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)), 'CAST(casttest.v1 AS %s)' %expected_results[0])
eq_(str(cast(tbl.c.v1, Numeric(12, 9)).compile(dialect=dialect)), 'CAST(casttest.v1 AS %s)' %expected_results[1])
eq_(str(cast(tbl.c.ts, Date).compile(dialect=dialect)), 'CAST(casttest.ts AS %s)' %expected_results[2])
eq_(str(cast(1234, Text).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[3]))
eq_(str(cast('test', String(20)).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[4]))
# fixme: shoving all of this dialect-specific stuff in one test
# is now officialy completely ridiculous AND non-obviously omits
# coverage on other dialects.
sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(dialect=dialect)
if isinstance(dialect, type(mysql.dialect())):
eq_(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS DECIMAL) AS anon_1 \nFROM casttest")
else:
eq_(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC) AS anon_1 \nFROM casttest")
# first test with PostgreSQL engine
check_results(postgresql.dialect(), ['NUMERIC', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s')
# then the Oracle engine
check_results(oracle.dialect(), ['NUMERIC', 'NUMERIC(12, 9)', 'DATE', 'CLOB', 'VARCHAR(20 CHAR)'], ':param_1')
# then the sqlite engine
check_results(sqlite.dialect(), ['NUMERIC', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '?')
# then the MySQL engine
check_results(mysql.dialect(), ['DECIMAL', 'DECIMAL(12, 9)', 'DATE', 'CHAR', 'CHAR(20)'], '%s')
self.assert_compile(cast(text('NULL'), Integer), "CAST(NULL AS INTEGER)", dialect=sqlite.dialect())
self.assert_compile(cast(null(), Integer), "CAST(NULL AS INTEGER)", dialect=sqlite.dialect())
self.assert_compile(cast(literal_column('NULL'), Integer), "CAST(NULL AS INTEGER)", dialect=sqlite.dialect())
def test_date_between(self):
import datetime
table = Table('dt', metadata,
Column('date', Date))
self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))),
"SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2", checkparams={'date_1':datetime.date(2006,6,1), 'date_2':datetime.date(2006,6,5)})
self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))),
"SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2", checkparams={'date_1':datetime.date(2006,6,1), 'date_2':datetime.date(2006,6,5)})
def test_operator_precedence(self):
table = Table('op', metadata,
Column('field', Integer))
self.assert_compile(table.select((table.c.field == 5) == None),
"SELECT op.field FROM op WHERE (op.field = :field_1) IS NULL")
self.assert_compile(table.select((table.c.field + 5) == table.c.field),
"SELECT op.field FROM op WHERE op.field + :field_1 = op.field")
self.assert_compile(table.select((table.c.field + 5) * 6),
"SELECT op.field FROM op WHERE (op.field + :field_1) * :param_1")
self.assert_compile(table.select((table.c.field * 5) + 6),
"SELECT op.field FROM op WHERE op.field * :field_1 + :param_1")
self.assert_compile(table.select(5 + table.c.field.in_([5,6])),
"SELECT op.field FROM op WHERE :param_1 + (op.field IN (:field_1, :field_2))")
self.assert_compile(table.select((5 + table.c.field).in_([5,6])),
"SELECT op.field FROM op WHERE :field_1 + op.field IN (:param_1, :param_2)")
self.assert_compile(table.select(not_(and_(table.c.field == 5, table.c.field == 7))),
"SELECT op.field FROM op WHERE NOT (op.field = :field_1 AND op.field = :field_2)")
self.assert_compile(table.select(not_(table.c.field == 5)),
"SELECT op.field FROM op WHERE op.field != :field_1")
self.assert_compile(table.select(not_(table.c.field.between(5, 6))),
"SELECT op.field FROM op WHERE NOT (op.field BETWEEN :field_1 AND :field_2)")
self.assert_compile(table.select(not_(table.c.field) == 5),
"SELECT op.field FROM op WHERE (NOT op.field) = :param_1")
self.assert_compile(table.select((table.c.field == table.c.field).between(False, True)),
"SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2")
self.assert_compile(table.select(between((table.c.field == table.c.field), False, True)),
"SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2")
def test_naming(self):
s1 = select([table1.c.myid, table1.c.myid.label('foobar'), func.hoho(table1.c.name), func.lala(table1.c.name).label('gg')])
assert s1.c.keys() == ['myid', 'foobar', 'hoho(mytable.name)', 'gg']
meta = MetaData()
t1 = Table('mytable', meta, Column('col1', Integer))
for col, key, expr, label in (
(table1.c.name, 'name', 'mytable.name', None),
(table1.c.myid==12, 'mytable.myid = :myid_1', 'mytable.myid = :myid_1', 'anon_1'),
(func.hoho(table1.c.myid), 'hoho(mytable.myid)', 'hoho(mytable.myid)', 'hoho_1'),
(cast(table1.c.name, Numeric), 'CAST(mytable.name AS NUMERIC)', 'CAST(mytable.name AS NUMERIC)', 'anon_1'),
(t1.c.col1, 'col1', 'mytable.col1', None),
(column('some wacky thing'), 'some wacky thing', '"some wacky thing"', '')
):
if getattr(col, 'table', None) is not None:
t = col.table
else:
t = table1
s1 = select([col], from_obj=t)
assert s1.c.keys() == [key], s1.c.keys()
if label:
self.assert_compile(s1, "SELECT %s AS %s FROM mytable" % (expr, label))
else:
self.assert_compile(s1, "SELECT %s FROM mytable" % (expr,))
s1 = select([s1])
if label:
self.assert_compile(s1, "SELECT %s FROM (SELECT %s AS %s FROM mytable)" % (label, expr, label))
elif col.table is not None:
# sqlite rule labels subquery columns
self.assert_compile(s1, "SELECT %s FROM (SELECT %s AS %s FROM mytable)" % (key,expr, key))
else:
self.assert_compile(s1, "SELECT %s FROM (SELECT %s FROM mytable)" % (expr,expr))
def test_hints(self):
s = select([table1.c.myid]).with_hint(table1, "test hint %(name)s")
s2 = select([table1.c.myid]).\
with_hint(table1, "index(%(name)s idx)", 'oracle').\
with_hint(table1, "WITH HINT INDEX idx", 'sybase')
a1 = table1.alias()
s3 = select([a1.c.myid]).with_hint(a1, "index(%(name)s hint)")
subs4 = select([
table1, table2
]).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).\
with_hint(table1, 'hint1')
s4 = select([table3]).select_from(
table3.join(
subs4,
subs4.c.othername==table3.c.otherstuff
)
).\
with_hint(table3, 'hint3')
subs5 = select([
table1, table2
]).select_from(table1.join(table2, table1.c.myid==table2.c.otherid))
s5 = select([table3]).select_from(
table3.join(
subs5,
subs5.c.othername==table3.c.otherstuff
)
).\
with_hint(table3, 'hint3').\
with_hint(table1, 'hint1')
t1 = table('QuotedName', column('col1'))
s6 = select([t1.c.col1]).where(t1.c.col1>10).with_hint(t1, '%(name)s idx1')
a2 = t1.alias('SomeName')
s7 = select([a2.c.col1]).where(a2.c.col1>10).with_hint(a2, '%(name)s idx1')
mysql_d, oracle_d, sybase_d = \
mysql.dialect(), \
oracle.dialect(), \
sybase.dialect()
for stmt, dialect, expected in [
(s, mysql_d,
"SELECT mytable.myid FROM mytable test hint mytable"),
(s, oracle_d,
"SELECT /*+ test hint mytable */ mytable.myid FROM mytable"),
(s, sybase_d,
"SELECT mytable.myid FROM mytable test hint mytable"),
(s2, mysql_d,
"SELECT mytable.myid FROM mytable"),
(s2, oracle_d,
"SELECT /*+ index(mytable idx) */ mytable.myid FROM mytable"),
(s2, sybase_d,
"SELECT mytable.myid FROM mytable WITH HINT INDEX idx"),
(s3, mysql_d,
"SELECT mytable_1.myid FROM mytable AS mytable_1 "
"index(mytable_1 hint)"),
(s3, oracle_d,
"SELECT /*+ index(mytable_1 hint) */ mytable_1.myid FROM "
"mytable mytable_1"),
(s3, sybase_d,
"SELECT mytable_1.myid FROM mytable AS mytable_1 "
"index(mytable_1 hint)"),
(s4, mysql_d,
"SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable "
"hint3 INNER JOIN (SELECT mytable.myid, mytable.name, "
"mytable.description, myothertable.otherid, "
"myothertable.othername FROM mytable hint1 INNER "
"JOIN myothertable ON mytable.myid = myothertable.otherid) "
"ON othername = thirdtable.otherstuff"),
(s4, sybase_d,
"SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable "
"hint3 JOIN (SELECT mytable.myid, mytable.name, "
"mytable.description, myothertable.otherid, "
"myothertable.othername FROM mytable hint1 "
"JOIN myothertable ON mytable.myid = myothertable.otherid) "
"ON othername = thirdtable.otherstuff"),
(s4, oracle_d,
"SELECT /*+ hint3 */ thirdtable.userid, thirdtable.otherstuff "
"FROM thirdtable JOIN (SELECT /*+ hint1 */ mytable.myid,"
" mytable.name, mytable.description, myothertable.otherid,"
" myothertable.othername FROM mytable JOIN myothertable ON"
" mytable.myid = myothertable.otherid) ON othername ="
" thirdtable.otherstuff"),
# TODO: figure out dictionary ordering solution here
# (s5, oracle_d,
# "SELECT /*+ hint3 */ /*+ hint1 */ thirdtable.userid, "
# "thirdtable.otherstuff "
# "FROM thirdtable JOIN (SELECT mytable.myid,"
# " mytable.name, mytable.description, myothertable.otherid,"
# " myothertable.othername FROM mytable JOIN myothertable ON"
# " mytable.myid = myothertable.otherid) ON othername ="
# " thirdtable.otherstuff"),
(s6, oracle_d,
"""SELECT /*+ "QuotedName" idx1 */ "QuotedName".col1 """
"""FROM "QuotedName" WHERE "QuotedName".col1 > :col1_1"""),
(s7, oracle_d,
"""SELECT /*+ SomeName idx1 */ "SomeName".col1 FROM """
""""QuotedName" "SomeName" WHERE "SomeName".col1 > :col1_1"""),
]:
self.assert_compile(
stmt,
expected,
dialect=dialect
)
class CRUDTest(TestBase, AssertsCompiledSQL):
def test_insert(self):
# generic insert, will create bind params for all columns
self.assert_compile(insert(table1),
"INSERT INTO mytable (myid, name, description) "
"VALUES (:myid, :name, :description)")
# insert with user-supplied bind params for specific columns,
# cols provided literally
self.assert_compile(
insert(table1, {
table1.c.myid : bindparam('userid'),
table1.c.name : bindparam('username')}),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)")
# insert with user-supplied bind params for specific columns, cols
# provided as strings
self.assert_compile(
insert(table1, dict(myid = 3, name = 'jack')),
"INSERT INTO mytable (myid, name) VALUES (:myid, :name)"
)
# test with a tuple of params instead of named
self.assert_compile(
insert(table1, (3, 'jack', 'mydescription')),
"INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)",
checkparams = {'myid':3, 'name':'jack', 'description':'mydescription'}
)
self.assert_compile(
insert(table1, values={
table1.c.myid : bindparam('userid')
}).values({table1.c.name : bindparam('username')}),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)"
)
self.assert_compile(
insert(table1, values=dict(myid=func.lala())),
"INSERT INTO mytable (myid) VALUES (lala())")
def test_inline_insert(self):
metadata = MetaData()
table = Table('sometable', metadata,
Column('id', Integer, primary_key=True),
Column('foo', Integer, default=func.foobar()))
self.assert_compile(
table.insert(values={}, inline=True),
"INSERT INTO sometable (foo) VALUES (foobar())")
self.assert_compile(
table.insert(inline=True),
"INSERT INTO sometable (foo) VALUES (foobar())", params={})
def test_update(self):
self.assert_compile(
update(table1, table1.c.myid == 7),
"UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
params = {table1.c.name:'fred'})
self.assert_compile(
table1.update().where(table1.c.myid==7).
values({table1.c.myid:5}),
"UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
checkparams={'myid':5, 'myid_1':7})
self.assert_compile(
update(table1, table1.c.myid == 7),
"UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
params = {'name':'fred'})
self.assert_compile(
update(table1, values = {table1.c.name : table1.c.myid}),
"UPDATE mytable SET name=mytable.myid")
self.assert_compile(
update(table1,
whereclause = table1.c.name == bindparam('crit'),
values = {table1.c.name : 'hi'}),
"UPDATE mytable SET name=:name WHERE mytable.name = :crit",
params = {'crit' : 'notthere'},
checkparams={'crit':'notthere', 'name':'hi'})
self.assert_compile(
update(table1, table1.c.myid == 12,
values = {table1.c.name : table1.c.myid}),
"UPDATE mytable SET name=mytable.myid, description="
":description WHERE mytable.myid = :myid_1",
params = {'description':'test'},
checkparams={'description':'test', 'myid_1':12})
self.assert_compile(
update(table1, table1.c.myid == 12,
values = {table1.c.myid : 9}),
"UPDATE mytable SET myid=:myid, description=:description "
"WHERE mytable.myid = :myid_1",
params = {'myid_1': 12, 'myid': 9, 'description': 'test'})
self.assert_compile(
update(table1, table1.c.myid ==12),
"UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
params={'myid':18}, checkparams={'myid':18, 'myid_1':12})
s = table1.update(table1.c.myid == 12, values = {table1.c.name : 'lala'})
c = s.compile(column_keys=['id', 'name'])
self.assert_compile(
update(table1, table1.c.myid == 12,
values = {table1.c.name : table1.c.myid}
).values({table1.c.name:table1.c.name + 'foo'}),
"UPDATE mytable SET name=(mytable.name || :name_1), "
"description=:description WHERE mytable.myid = :myid_1",
params = {'description':'test'})
eq_(str(s), str(c))
self.assert_compile(update(table1,
(table1.c.myid == func.hoho(4)) &
(table1.c.name == literal('foo') + table1.c.name + literal('lala')),
values = {
table1.c.name : table1.c.name + "lala",
table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho'))
}), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), "
"name=(mytable.name || :name_1) "
"WHERE mytable.myid = hoho(:hoho_1) AND mytable.name = :param_2 || "
"mytable.name || :param_3")
def test_correlated_update(self):
# test against a straight text subquery
u = update(table1, values = {
table1.c.name :
text("(select name from mytable where id=mytable.id)")})
self.assert_compile(u,
"UPDATE mytable SET name=(select name from mytable "
"where id=mytable.id)")
mt = table1.alias()
u = update(table1, values = {
table1.c.name :
select([mt.c.name], mt.c.myid==table1.c.myid)
})
self.assert_compile(u,
"UPDATE mytable SET name=(SELECT mytable_1.name FROM "
"mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)")
# test against a regular constructed subquery
s = select([table2], table2.c.otherid == table1.c.myid)
u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s})
self.assert_compile(u,
"UPDATE mytable SET name=(SELECT myothertable.otherid, "
"myothertable.othername FROM myothertable WHERE "
"myothertable.otherid = mytable.myid) WHERE mytable.name = :name_1")
# test a non-correlated WHERE clause
s = select([table2.c.othername], table2.c.otherid == 7)
u = update(table1, table1.c.name==s)
self.assert_compile(u,
"UPDATE mytable SET myid=:myid, name=:name, "
"description=:description WHERE mytable.name = "
"(SELECT myothertable.othername FROM myothertable "
"WHERE myothertable.otherid = :otherid_1)")
# test one that is actually correlated...
s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
u = table1.update(table1.c.name==s)
self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = "\
"(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)")
def test_delete(self):
self.assert_compile(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :myid_1")
self.assert_compile(table1.delete().where(table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :myid_1")
self.assert_compile(table1.delete().where(table1.c.myid == 7).where(table1.c.name=='somename'), "DELETE FROM mytable WHERE mytable.myid = :myid_1 AND mytable.name = :name_1")
def test_correlated_delete(self):
# test a non-correlated WHERE clause
s = select([table2.c.othername], table2.c.otherid == 7)
u = delete(table1, table1.c.name==s)
self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = "\
"(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :otherid_1)")
# test one that is actually correlated...
s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
u = table1.delete(table1.c.name==s)
self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)")
def test_binds_that_match_columns(self):
"""test bind params named after column names
replace the normal SET/VALUES generation."""
t = table('foo', column('x'), column('y'))
u = t.update().where(t.c.x==bindparam('x'))
assert_raises(exc.CompileError, u.compile)
self.assert_compile(u, "UPDATE foo SET WHERE foo.x = :x", params={})
assert_raises(exc.CompileError, u.values(x=7).compile)
self.assert_compile(u.values(y=7), "UPDATE foo SET y=:y WHERE foo.x = :x")
assert_raises(exc.CompileError, u.values(x=7).compile, column_keys=['x', 'y'])
assert_raises(exc.CompileError, u.compile, column_keys=['x', 'y'])
self.assert_compile(u.values(x=3 + bindparam('x')),
"UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x")
self.assert_compile(u.values(x=3 + bindparam('x')),
"UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x",
params={'x':1})
self.assert_compile(u.values(x=3 + bindparam('x')),
"UPDATE foo SET x=(:param_1 + :x), y=:y WHERE foo.x = :x",
params={'x':1, 'y':2})
i = t.insert().values(x=3 + bindparam('x'))
self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x))")
self.assert_compile(i,
"INSERT INTO foo (x, y) VALUES ((:param_1 + :x), :y)",
params={'x':1, 'y':2})
i = t.insert().values(x=bindparam('y'))
self.assert_compile(i, "INSERT INTO foo (x) VALUES (:y)")
i = t.insert().values(x=bindparam('y'), y=5)
assert_raises(exc.CompileError, i.compile)
i = t.insert().values(x=3 + bindparam('y'), y=5)
assert_raises(exc.CompileError, i.compile)
i = t.insert().values(x=3 + bindparam('x2'))
self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))")
self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))", params={})
self.assert_compile(i, "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
params={'x':1, 'y':2})
self.assert_compile(i, "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
params={'x2':1, 'y':2})
def test_labels_no_collision(self):
t = table('foo', column('id'), column('foo_id'))
self.assert_compile(
t.update().where(t.c.id==5),
"UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :id_1"
)
self.assert_compile(
t.update().where(t.c.id==bindparam(key=t.c.id._label)),
"UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :foo_id_1"
)
class InlineDefaultTest(TestBase, AssertsCompiledSQL):
def test_insert(self):
m = MetaData()
foo = Table('foo', m,
Column('id', Integer))
t = Table('test', m,
Column('col1', Integer, default=func.foo(1)),
Column('col2', Integer, default=select([func.coalesce(func.max(foo.c.id))])),
)
self.assert_compile(t.insert(inline=True, values={}), "INSERT INTO test (col1, col2) VALUES (foo(:foo_1), (SELECT coalesce(max(foo.id)) AS coalesce_1 FROM foo))")
def test_update(self):
m = MetaData()
foo = Table('foo', m,
Column('id', Integer))
t = Table('test', m,
Column('col1', Integer, onupdate=func.foo(1)),
Column('col2', Integer, onupdate=select([func.coalesce(func.max(foo.c.id))])),
Column('col3', String(30))
)
self.assert_compile(t.update(inline=True, values={'col3':'foo'}), "UPDATE test SET col1=foo(:foo_1), col2=(SELECT coalesce(max(foo.id)) AS coalesce_1 FROM foo), col3=:col3")
class SchemaTest(TestBase, AssertsCompiledSQL):
def test_select(self):
self.assert_compile(table4.select(),
"SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id,"
" remote_owner.remotetable.value FROM remote_owner.remotetable")
self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')),
"SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id,"
" remote_owner.remotetable.value FROM remote_owner.remotetable WHERE "
"remote_owner.remotetable.datatype_id = :datatype_id_1 AND"
" remote_owner.remotetable.value = :value_1")
s = table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi'), use_labels=True)
self.assert_compile(s, "SELECT remote_owner.remotetable.rem_id AS"
" remote_owner_remotetable_rem_id, remote_owner.remotetable.datatype_id AS"
" remote_owner_remotetable_datatype_id, remote_owner.remotetable.value "
"AS remote_owner_remotetable_value FROM remote_owner.remotetable WHERE "
"remote_owner.remotetable.datatype_id = :datatype_id_1 AND "
"remote_owner.remotetable.value = :value_1")
# multi-part schema name
self.assert_compile(table5.select(),
'SELECT "dbo.remote_owner".remotetable.rem_id, '
'"dbo.remote_owner".remotetable.datatype_id, "dbo.remote_owner".remotetable.value '
'FROM "dbo.remote_owner".remotetable'
)
# multi-part schema name labels - convert '.' to '_'
self.assert_compile(table5.select(use_labels=True),
'SELECT "dbo.remote_owner".remotetable.rem_id AS'
' dbo_remote_owner_remotetable_rem_id, "dbo.remote_owner".remotetable.datatype_id'
' AS dbo_remote_owner_remotetable_datatype_id,'
' "dbo.remote_owner".remotetable.value AS dbo_remote_owner_remotetable_value FROM'
' "dbo.remote_owner".remotetable'
)
def test_alias(self):
a = alias(table4, 'remtable')
self.assert_compile(a.select(a.c.datatype_id==7),
"SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM"
" remote_owner.remotetable AS remtable "
"WHERE remtable.datatype_id = :datatype_id_1")
def test_update(self):
self.assert_compile(
table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}),
"UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "
"WHERE remote_owner.remotetable.value = :value_1")
def test_insert(self):
self.assert_compile(table4.insert(values=(2, 5, 'test')),
"INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES "
"(:rem_id, :datatype_id, :value)")
|