sqlite_backend.py :  » Project-Management » Trac » Trac-0.11.7 » trac » db » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Project Management » Trac 
Trac » Trac 0.11.7 » trac » db » sqlite_backend.py
# -*- coding: utf-8 -*-
#
# Copyright (C)2005-2009 Edgewall Software
# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at http://trac.edgewall.org/wiki/TracLicense.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at http://trac.edgewall.org/log/.
#
# Author: Christopher Lenz <cmlenz@gmx.de>

import os
import re
import weakref

from trac.core import *
from trac.db.api import IDatabaseConnector
from trac.db.util import ConnectionWrapper
from trac.util import get_pkginfo,getuser
from trac.util.translation import _

_like_escape_re = re.compile(r'([/_%])')

try:
    import pysqlite2.dbapi2 as sqlite
    have_pysqlite = 2
except ImportError:
    try:
        import sqlite3 as sqlite
        have_pysqlite = 2
    except ImportError:
        try:
            import sqlite
            have_pysqlite = 1
        except ImportError:
            have_pysqlite = 0

if have_pysqlite == 2:
    _ver = sqlite.sqlite_version_info
    sqlite_version = _ver[0] * 10000 + _ver[1] * 100 + int(_ver[2])
    sqlite_version_string = '%d.%d.%d' % (_ver[0], _ver[1], int(_ver[2]))

    class PyFormatCursor(sqlite.Cursor):
        def _rollback_on_error(self, function, *args, **kwargs):
            try:
                return function(self, *args, **kwargs)
            except sqlite.DatabaseError, e:
                self.cnx.rollback()
                raise
        def execute(self, sql, args=None):
            if args:
                sql = sql % (('?',) * len(args))
            return self._rollback_on_error(sqlite.Cursor.execute, sql,
                                           args or [])
        def executemany(self, sql, args=None):
            if args:
                sql = sql % (('?',) * len(args[0]))
            return self._rollback_on_error(sqlite.Cursor.executemany, sql,
                                           args or [])

    # EagerCursor taken from the example in pysqlite's repository:
    #
    #   http://oss.itsystementwicklung.de/hg/pysqlite/raw-file/0a726720f540/misc/eager.py
    #
    # Only change is to subclass it from PyFormatCursor instead of
    # sqlite.Cursor.

    class EagerCursor(PyFormatCursor):
        def __init__(self, con):
            PyFormatCursor.__init__(self, con)
            self.rows = []
            self.pos = 0

        def execute(self, *args):
            result = PyFormatCursor.execute(self, *args)
            self.rows = PyFormatCursor.fetchall(self)
            self.pos = 0
            return result

        def fetchone(self):
            try:
                row = self.rows[self.pos]
                self.pos += 1
                return row
            except IndexError:
                return None

        def fetchmany(self, num=None):
            if num is None:
                num = self.arraysize

            result = self.rows[self.pos:self.pos+num]
            self.pos += num
            return result

        def fetchall(self):
            result = self.rows[self.pos:]
            self.pos = len(self.rows)
            return result

        # needed for the tests (InMemoryDatabase doesn't use IterableCursor)

        def __iter__(self):
            while True:
                row = self.fetchone()
                if not row:
                    return
                yield row


elif have_pysqlite == 1:
    _ver = sqlite._sqlite.sqlite_version_info()
    sqlite_version = _ver[0] * 10000 + _ver[1] * 100 + _ver[2]
    sqlite_version_string = '%d.%d.%d' % _ver

    class SQLiteUnicodeCursor(sqlite.Cursor):
        def _convert_row(self, row):
            return tuple([(isinstance(v, str) and [v.decode('utf-8')] or [v])[0]
                          for v in row])
        def fetchone(self):
            row = sqlite.Cursor.fetchone(self)
            return row and self._convert_row(row) or None
        def fetchmany(self, num):
            rows = sqlite.Cursor.fetchmany(self, num)
            return rows != None and [self._convert_row(row)
                                     for row in rows] or []
        def fetchall(self):
            rows = sqlite.Cursor.fetchall(self)
            return rows != None and [self._convert_row(row)
                                     for row in rows] or []


def _to_sql(table):
    sql = ["CREATE TABLE %s (" % table.name]
    coldefs = []
    for column in table.columns:
        ctype = column.type.lower()
        if column.auto_increment:
            ctype = "integer PRIMARY KEY"
        elif len(table.key) == 1 and column.name in table.key:
            ctype += " PRIMARY KEY"
        elif ctype == "int":
            ctype = "integer"
        coldefs.append("    %s %s" % (column.name, ctype))
    if len(table.key) > 1:
        coldefs.append("    UNIQUE (%s)" % ','.join(table.key))
    sql.append(',\n'.join(coldefs) + '\n);')
    yield '\n'.join(sql)
    for index in table.indices:
        unique = index.unique and 'UNIQUE' or ''
        yield "CREATE %s INDEX %s_%s_idx ON %s (%s);" % (unique, table.name,
              '_'.join(index.columns), table.name, ','.join(index.columns))


class SQLiteConnector(Component):
    """SQLite database support."""
    implements(IDatabaseConnector)

    def __init__(self):
        self._version = None
        self.error = None

    def get_supported_schemes(self):
        if not have_pysqlite:
            self.error = _("Cannot load Python bindings for SQLite")
        elif sqlite_version >= 30303:
            if sqlite.version_info < (1, 0, 7):
                self.error = _("Need at least PySqlite 1.0.7 or higher")
            elif sqlite.version_info[0] == 2 and \
                    sqlite.version_info < (2, 0, 7):
                self.error = _("Need at least PySqlite 2.0.7 or higher")
        yield ('sqlite', self.error and -1 or 1)

    def get_connection(self, path, log=None, params={}):
        if not self._version:
            self._version = get_pkginfo(sqlite).get(
                'version', '%d.%d.%s' % sqlite.version_info)
            self.env.systeminfo.extend([('SQLite', sqlite_version_string),
                                        ('pysqlite', self._version)])
            if have_pysqlite == 1:
                self.log.warning("Support for SQLite v2 and PySqlite 1.0.x "
                                 "will be dropped in version 0.12, see "
                                 "http://trac.edgewall.org/wiki/"
                                 "PySqlite#UpgradingSQLitefrom2.xto3.x")
        return SQLiteConnection(path, log, params)

    def init_db(cls, path, log=None, params={}):
        if path != ':memory:':
            # make the directory to hold the database
            if os.path.exists(path):
                raise TracError('Database already exists at %s' % path)
            os.makedirs(os.path.split(path)[0])
        if isinstance(path, unicode): # needed with 2.4.0
            path = path.encode('utf-8')
        cnx = sqlite.connect(path, timeout=int(params.get('timeout', 10000)))
        cursor = cnx.cursor()
        from trac.db_default import schema
        for table in schema:
            for stmt in cls.to_sql(table):
                cursor.execute(stmt)
        cnx.commit()

    def to_sql(cls, table):
        return _to_sql(table)

    def backup(self, dest_file):
        """Simple SQLite-specific backup of the database.

        @param dest_file: Destination file basename
        """
        import shutil
        db_str = self.config.get('trac', 'database')
        db_name = os.path.join(self.env.path, db_str[7:])
        shutil.copy(db_name, dest_file)
        if not os.path.exists(dest_file):
            raise TracError("Backup attempt failed")
        return dest_file

class SQLiteConnection(ConnectionWrapper):
    """Connection wrapper for SQLite."""

    __slots__ = ['_active_cursors', '_eager']

    poolable = have_pysqlite and sqlite_version >= 30308 \
                             and sqlite.version_info >= (2,5,0)

    def __init__(self, path, log=None, params={}):
        assert have_pysqlite > 0
        self.cnx = None
        if path != ':memory:':
            if not os.access(path, os.F_OK):
                raise TracError('Database "%s" not found.' % path)

            dbdir = os.path.dirname(path)
            if not os.access(path, os.R_OK + os.W_OK) or \
                   not os.access(dbdir, os.R_OK + os.W_OK):
                raise TracError('The user %s requires read _and_ write ' \
                                'permissions to the database file %s and the ' \
                                'directory it is located in.' \
                                % (getuser(), path))

        if have_pysqlite == 2:
            self._active_cursors = weakref.WeakKeyDictionary()
            timeout = int(params.get('timeout', 10.0))
            self._eager = params.get('cursor', 'eager') == 'eager'
            # eager is default, can be turned off by specifying ?cursor=
            if isinstance(path, unicode): # needed with 2.4.0
                path = path.encode('utf-8')
            cnx = sqlite.connect(path, detect_types=sqlite.PARSE_DECLTYPES,
                                 check_same_thread=sqlite_version < 30301,
                                 timeout=timeout)
        else:
            timeout = int(params.get('timeout', 10000))
            cnx = sqlite.connect(path, timeout=timeout, encoding='utf-8')
            
        ConnectionWrapper.__init__(self, cnx, log)

    if have_pysqlite == 2:
        def cursor(self):
            cursor = self.cnx.cursor((PyFormatCursor, EagerCursor)[self._eager])
            self._active_cursors[cursor] = True
            cursor.cnx = self
            return cursor

        def rollback(self):
            for cursor in self._active_cursors.keys():
                cursor.close()
            self.cnx.rollback()

    else:
        def cursor(self):
            self.cnx._checkNotClosed("cursor")
            return SQLiteUnicodeCursor(self.cnx, self.cnx.rowclass)

    def cast(self, column, type):
        if sqlite_version >= 30203:
            return 'CAST(%s AS %s)' % (column, type)
        elif type == 'int':
            # hack to force older SQLite versions to convert column to an int
            return '1*' + column
        else:
            return column

    def concat(self, *args):
        return '||'.join(args)

    def like(self):
        if sqlite_version >= 30100:
            return "LIKE %s ESCAPE '/'"
        else:
            return 'LIKE %s'

    def like_escape(self, text):
        if sqlite_version >= 30100:
            return _like_escape_re.sub(r'/\1', text)
        else:
            return text

    if have_pysqlite == 2:
        def get_last_id(self, cursor, table, column='id'):
            return cursor.lastrowid
    else:
        def get_last_id(self, cursor, table, column='id'):
            return self.cnx.db.sqlite_last_insert_rowid()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.