dbpool.py :  » Web-Server » Jon-Python » jonpy-0.08 » jon » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Server » Jon Python 
Jon Python » jonpy 0.08 » jon » dbpool.py
# $Id: dbpool.py,v 1.6 2010/03/20 18:25:33 jribbens Exp $

import weakref as _weakref
import Queue as _Queue
import thread as _thread
import time as _time


_log_level = 0
_log_name = "/tmp/dbpool.log"
_log_file = None
_log_lock = _thread.allocate_lock()

apilevel = "2.0"
threadsafety = 2

_dbmod = None
_lock = _thread.allocate_lock()
_refs = {}

_COPY_ATTRS = ("paramstyle", "Warning", "Error", "InterfaceError",
  "DatabaseError", "DataError", "OperationalError", "IntegrityError",
  "InternalError", "ProgrammingError", "NotSupportedError")


def _log(level, message, *args, **kwargs):
  global _log_file

  if _log_level >= level:
    if args or kwargs:
      argslist = [repr(arg) for arg in args]
      argslist.extend("%s=%r" % item for item in kwargs.items())
      message += "(" + ", ".join(argslist) + ")"
    _log_lock.acquire()
    try:
      if not _log_file:
        _log_file = open(_log_name, "a", 1)
      _log_file.write("%s %s\n" % (_time.strftime("%b %d %H:%M:%S"), message))
    finally:
      _log_lock.release()


def set_database(dbmod, minconns, timeout=0):
  if minconns < 1:
    raise ValueError("minconns must be greater than or equal to 1")
  if _dbmod is not None:
    if _dbmod == dbmod:
      return
    raise Exception("dbpool module is already in use")
  if len(dbmod.apilevel) != 3 or dbmod.apilevel[:2] != "2." or \
    not dbmod.apilevel[2].isdigit():
    raise ValueError("specified database module is not DB API 2.0 compliant")
  if dbmod.threadsafety < 1:
    raise ValueError("specified database module must have threadsafety level"
      " of at least 1")
  _log(1, "set_database", dbmod.__name__, minconns, timeout)
  g = globals()
  g["_dbmod"] = dbmod
  g["_available"] = {}
  g["_minconns"] = minconns
  g["_timeout"] = timeout
  for v in _COPY_ATTRS:
    g[v] = getattr(dbmod, v)


def connect(*args, **kwargs):
  if _dbmod is None:
    raise Exception("No database module has been specified")
  key = repr(args) + "\0" + repr(kwargs)
  _log(1, "connect", *args, **kwargs)
  try:
    while True:
      conn = _available[key].get(0)
      if _timeout == 0 or _time.time() - conn._lastuse < _timeout:
        _log(2, "connect: returning connection %r from _available" % conn)
        return conn
      else:
        conn._inner._connection = None
        _log(2, "connect: discarded connection %r from _available due to age" %
          conn)
  except (KeyError, _Queue.Empty):
    conn = _Connection(None, None, *args, **kwargs)
    _log(2, "connect: created new connection %r" % conn)
    return conn


def _make_available(conn):
  key = repr(conn._args) + "\0" + repr(conn._kwargs)
  _log(2, "_make_available", conn)
  _lock.acquire()
  try:
    try:
      _available[key].put(conn, 0)
      _log(3, "_make_available: put into existing _available slot")
    except KeyError:
      _log(3, "_make_available: created new _available slot")
      q = _Queue.Queue(_minconns)
      q.put(conn, 0)
      _available[key] = q
    except _Queue.Full:
      conn._inner._connection = None
      _log(3, "_make_available: discarded, _available slot full")
  finally:
    _lock.release()


def _connection_notinuse(ref):
  # if the Python interpreter is exiting, the globals might already have
  # been deleted, so check for them explicitly
  _log(2, "_connection_notinuse", ref)
  if _refs is None or _make_available is None or _Connection is None:
    return
  inner = _refs[ref]
  del _refs[ref]
  _log(3, "_connection_notinuse: inner=%r" % inner)
  inner._cursorref = None
  if inner._connection is not None:
    _make_available(_Connection(inner))


class _Connection(object):
  def __init__(self, inner, *args, **kwargs):
    self._inner = None
    _log(4, "_Connection", self, inner, *args, **kwargs)
    if inner is None:
      self._inner = _InnerConnection(*args, **kwargs)
      _log(5, "_Connection: new inner=%r" % self._inner)
    else:
      self._inner = inner
    self._inner._outerref = _weakref.ref(self)
    ref = _weakref.ref(self, _connection_notinuse)
    _log(5, "_Connection: ref=%r" % ref)
    _refs[ref] = self._inner

  def __repr__(self):
    return "<dbpool._Connection(%r) at %x>" % (self._inner, id(self))

  def cursor(self, *args, **kwargs):
    # this method would not be necessary (i.e. the __getattr__ would take
    # care of it) but if someone does dbpool.connect().cursor() all in one
    # expression, the outer _Connection class was getting garbage-collected
    # (and hence the actual database connection being put back in the pool)
    # *in the middle of the expression*, i.e. after connect() was called but
    # before cursor() was called. So you could end up with 2 cursors on the
    # same database connection.
    return self._inner.cursor(*args, **kwargs)

  def __getattr__(self, attr):
    return getattr(self._inner, attr)

    
class _InnerConnection(object):
  def __init__(self, connection, *args, **kwargs):
    self._connection = None
    _log(4, "_InnerConnection", self, connection, *args, **kwargs)
    self._args = args
    self._kwargs = kwargs
    if connection is None:
      _log(2, "_InnerConnection: Calling actual connect", *args, **kwargs)
      self._connection = _dbmod.connect(*args, **kwargs)
    else:
      _log(5, "_InnerConnection: Re-using connection %r" % connection)
      self._connection = connection
    self._cursorref = None
    self._outerref = None
    self._lock = _thread.allocate_lock()
    self._lastuse = _time.time()

  def __repr__(self):
    return "<dbpool._InnerConnection(%r) at %x>" % (self._connection, id(self))

  def close(self):
    _log(3, "_Connection.close", self)
    if self._cursorref is not None:
      c = self._cursorref()
      if c is not None:
        _log(4, "_Connection.close: closing cursor %r" % c)
        c.close()
    self._cursorref = None
    self._outerref = None
    conn = self._connection
    if conn:
      self._connection = None
      _make_available(_Connection(None, conn, *self._args, **self._kwargs))

  def __getattr__(self, attr):
    return getattr(self._connection, attr)

  def cursor(self, *args, **kwargs):
    _log(3, "cursor", self, *args, **kwargs)
    self._lock.acquire()
    try:
      if self._cursorref is None or self._cursorref() is None:
        c = _Cursor(self, *args, **kwargs)
        self._cursorref = _weakref.ref(c)
        self._lastuse = _time.time()
        return c
    finally:
      self._lock.release()
    _log(3, "cursor: creating new connection")
    return connect(*self._args, **self._kwargs).cursor(*args, **kwargs)


class _Cursor(object):
  def __init__(self, connection, *args, **kwargs):
    self._cursor = None
    _log(4, "_Cursor", connection, *args, **kwargs)
    self._connection = connection
    self._outer = connection._outerref()
    self._cursor = connection._connection.cursor(*args, **kwargs)
  
  def __repr__(self):
    return "<dbpool._Cursor(%r) at %x>" % (self._cursor, id(self))

  def close(self):
    _log(4, "_Cursor.close", self)
    self._connection._cursorref = None
    self._connection = None
    self._cursor.close()
    self._outer = None

  def __getattr__(self, attr):
    return getattr(self._cursor, attr)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.