dbapi20.py :  » Database » Gadfly » gadflyZip » gadfly » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » Gadfly 
Gadfly » gadflyZip » gadfly » dbapi20.py

# -*- coding: UTF-8 -*-
"""implements the db api 2.0 for the gadfly database
"""

# $Id: dbapi20.py,v 1.2 2003/10/09 03:15:01 zenzen Exp $

from database import gadfly,error
from dbapi20error import *

import os
import os.path

__all__ = ['connect', 'apilevel', 'threadsafety', 'paramstyle']

apilevel = '2.0'

# see email from the gadfly list
threadsafety = 1 

paramstyle = 'qmark'

def connect(*args, **kwargs):
    connection = None
    try:
        connection = Connection(*args, **kwargs)
    except IOError, ioe:
        raise DatabaseError(str(ioe))
    return connection

class Connection:

    # DB API 2.0 Optional Extension
    Warning = Warning
    Error = Error
    InterfaceError = InterfaceError
    DatabaseError = DatabaseError
    OperationalError = OperationalError
    IntegrityError = IntegrityError
    InternalError = InternalError
    ProgrammingError = ProgrammingError
    NotSupportedError = NotSupportedError
    
    def __init__(self, *args, **kwargs):
        self.closed = 0
        self._gadfly = gadfly(*args, **kwargs)
        
    def __checkClosed(self):
        if self.closed:
            raise OperationalError('Connection already closed.')
        
    def close(self):
        self.__checkClosed()
        self.closed = 1
        self._gadfly.close()
        
    def commit(self):
        self.__checkClosed()
        self._gadfly.commit()
        
    def rollback(self):
        self.__checkClosed()
        self._gadfly.rollback()
        
        
    def cursor(self):
        """
        """
        self.__checkClosed()
        cursor = self._gadfly.cursor()
        return Cursor(self, cursor)

class Cursor:
    
    def __init__(self, connection, cursor):
        self.connection = connection
        self.cursor = cursor
        self.closed = 0

        # api - attribute
        self.description = None
        self.rowcount = -1
        self.arraysize = 1

    def __checkClosed(self):
        if self.closed or self.connection.closed:
            raise OperationalError('Cursor already closed.')
     
    def __makeRowCount(self):
        self.rowcount = -1
        
    def callproc(self, procname, *parameters):
        self.__checkClosed()
        return self.cursor(procname, *parameters)
    
    def close(self):
        self.__checkClosed()
        self.closed = 1
        return self.cursor.close()
    
    def execute(self, operation, *parameters):
        self.__checkClosed()
        self.cursor.execute(operation, *parameters)
        self.description = self.cursor.description
        self.__makeRowCount()
        
    def executemany(self, operation, seq_of_parameters):
        self.__checkClosed()
        for parameters in seq_of_parameters:
            self.execute(operation, parameters)
    
    def fetchone(self):
        result = None
        self.__checkClosed()

        try:
            result = self.cursor.fetchone()
        except error, message:
            # gadfly use string exception and the only way to
            # difference between reason is the message
            if message == "no more results":
                pass
            else:
                raise OperationalError(message)
        except AttributeError, e:
            raise OperationalError(str(e))
        return result
    
    def fetchmany(self, size=None):
        result = None
        self.__checkClosed()

        if size is None:
            size = self.arraysize

        try:
            result = self.cursor.fetchmany(size)
        except error, message:
            raise OperationalError(message)
        except AttributeError, e:
            raise OperationalError(str(e))
        
        return result
    
    def fetchall(self):
        result = None
        self.__checkClosed()
        try:
            result = self.cursor.fetchall()
        except error, message:
            raise OperationalError(message)
        except AttributeError, e:
            raise OperationalError(str(e))
        return result
    
    def setinputsizes(self, sizes):
        return self.cursor.setinputsizes(sizes)
    
    def setoutputsize(self, size, column=None):
        return self.cursor.setoutputsize(size, column)
    
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.