"""Strategies for creating new instances of Engine types.
These are semi-private implementation classes which provide the
underlying behavior for the "strategy" keyword argument available on
:func:`~sqlalchemy.engine.create_engine`. Current available options are
``plain``, ``threadlocal``, and ``mock``.
New strategies can be added via new ``EngineStrategy`` classes.
"""
from operator import attrgetter
from sqlalchemy.engine import base,threadlocal,url
from sqlalchemy import util,exc
from sqlalchemy import pool
strategies = {}
class EngineStrategy(object):
"""An adaptor that processes input arguements and produces an Engine.
Provides a ``create`` method that receives input arguments and
produces an instance of base.Engine or a subclass.
"""
def __init__(self):
strategies[self.name] = self
def create(self, *args, **kwargs):
"""Given arguments, returns a new Engine instance."""
raise NotImplementedError()
class DefaultEngineStrategy(EngineStrategy):
"""Base class for built-in stratgies."""
pool_threadlocal = False
def create(self, name_or_url, **kwargs):
# create url.URL object
u = url.make_url(name_or_url)
dialect_cls = u.get_dialect()
dialect_args = {}
# consume dialect arguments from kwargs
for k in util.get_cls_kwargs(dialect_cls):
if k in kwargs:
dialect_args[k] = kwargs.pop(k)
dbapi = kwargs.pop('module', None)
if dbapi is None:
dbapi_args = {}
for k in util.get_func_kwargs(dialect_cls.dbapi):
if k in kwargs:
dbapi_args[k] = kwargs.pop(k)
dbapi = dialect_cls.dbapi(**dbapi_args)
dialect_args['dbapi'] = dbapi
# create dialect
dialect = dialect_cls(**dialect_args)
# assemble connection arguments
(cargs, cparams) = dialect.create_connect_args(u)
cparams.update(kwargs.pop('connect_args', {}))
# look for existing pool or create
pool = kwargs.pop('pool', None)
if pool is None:
def connect():
try:
return dialect.connect(*cargs, **cparams)
except Exception, e:
# Py3K
#raise exc.DBAPIError.instance(None, None, e) from e
# Py2K
import sys
raise exc.DBAPIError.instance(None, None, e), None, sys.exc_info()[2]
# end Py2K
creator = kwargs.pop('creator', connect)
poolclass = (kwargs.pop('poolclass', None) or
getattr(dialect_cls, 'poolclass', poollib.QueuePool))
pool_args = {}
# consume pool arguments from kwargs, translating a few of
# the arguments
translate = {'logging_name': 'pool_logging_name',
'echo': 'echo_pool',
'timeout': 'pool_timeout',
'recycle': 'pool_recycle',
'use_threadlocal':'pool_threadlocal'}
for k in util.get_cls_kwargs(poolclass):
tk = translate.get(k, k)
if tk in kwargs:
pool_args[k] = kwargs.pop(tk)
pool_args.setdefault('use_threadlocal', self.pool_threadlocal)
pool = poolclass(creator, **pool_args)
else:
if isinstance(pool, poollib._DBProxy):
pool = pool.get_pool(*cargs, **cparams)
else:
pool = pool
# create engine.
engineclass = self.engine_cls
engine_args = {}
for k in util.get_cls_kwargs(engineclass):
if k in kwargs:
engine_args[k] = kwargs.pop(k)
_initialize = kwargs.pop('_initialize', True)
# all kwargs should be consumed
if kwargs:
raise TypeError(
"Invalid argument(s) %s sent to create_engine(), "
"using configuration %s/%s/%s. Please check that the "
"keyword arguments are appropriate for this combination "
"of components." % (','.join("'%s'" % k for k in kwargs),
dialect.__class__.__name__,
pool.__class__.__name__,
engineclass.__name__))
engine = engineclass(pool, dialect, u, **engine_args)
if _initialize:
do_on_connect = dialect.on_connect()
if do_on_connect:
def on_connect(conn, rec):
conn = getattr(conn, '_sqla_unwrap', conn)
if conn is None:
return
do_on_connect(conn)
pool.add_listener({'first_connect': on_connect, 'connect':on_connect})
def first_connect(conn, rec):
c = base.Connection(engine, connection=conn)
dialect.initialize(c)
pool.add_listener({'first_connect':first_connect})
return engine
class PlainEngineStrategy(DefaultEngineStrategy):
"""Strategy for configuring a regular Engine."""
name = 'plain'
engine_cls = base.Engine
PlainEngineStrategy()
class ThreadLocalEngineStrategy(DefaultEngineStrategy):
"""Strategy for configuring an Engine with thredlocal behavior."""
name = 'threadlocal'
pool_threadlocal = True
engine_cls = threadlocal.TLEngine
ThreadLocalEngineStrategy()
class MockEngineStrategy(EngineStrategy):
"""Strategy for configuring an Engine-like object with mocked execution.
Produces a single mock Connectable object which dispatches
statement execution to a passed-in function.
"""
name = 'mock'
def create(self, name_or_url, executor, **kwargs):
# create url.URL object
u = url.make_url(name_or_url)
dialect_cls = u.get_dialect()
dialect_args = {}
# consume dialect arguments from kwargs
for k in util.get_cls_kwargs(dialect_cls):
if k in kwargs:
dialect_args[k] = kwargs.pop(k)
# create dialect
dialect = dialect_cls(**dialect_args)
return MockEngineStrategy.MockConnection(dialect, executor)
class MockConnection(base.Connectable):
def __init__(self, dialect, execute):
self._dialect = dialect
self.execute = execute
engine = property(lambda s: s)
dialect = property(attrgetter('_dialect'))
name = property(lambda s: s._dialect.name)
def contextual_connect(self, **kwargs):
return self
def compiler(self, statement, parameters, **kwargs):
return self._dialect.compiler(
statement, parameters, engine=self, **kwargs)
def create(self, entity, **kwargs):
kwargs['checkfirst'] = False
from sqlalchemy.engine import ddl
ddl.SchemaGenerator(self.dialect, self, **kwargs).traverse(entity)
def drop(self, entity, **kwargs):
kwargs['checkfirst'] = False
from sqlalchemy.engine import ddl
ddl.SchemaDropper(self.dialect, self, **kwargs).traverse(entity)
def execute(self, object, *multiparams, **params):
raise NotImplementedError()
MockEngineStrategy()
|