pdo.py :  » Database » Python-Database-Objects » PDO-1.3.2 » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » Python Database Objects 
Python Database Objects » PDO 1.3.2 » pdo.py
"""###############################################################
#  PDO:Python Database Objects - Version 1.3.2                #
###############################################################
#  info@neurokode.com                                         #
###############################################################
#  Copyright (c) 2003, NeuroKode Labs, All rights reserved.   #
#  Redistribution and use in source and binary forms, with    #
#  or without modification, are permitted provided that the   #
#  following conditions are met:                              #
#                                                             #
#  Redistributions of source code must retain the above       #
#  copyright notice, this list of conditions and the          #
#  following disclaimer.                                      #
#  Redistributions in binary form must reproduce the above    #
#  copyright notice, this list of conditions and the          #
#  following disclaimer in the documentation and/or other     #
#  materials provided with the distribution.                  #
#  Neither the name of NeuroKode Labs nor the names of its    #
#  contributors may be used to endorse or promote products    #
#  derived from this software without specific prior          #
#  written permission.                                        #
#                                                             #
#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND     #
#  CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED            #
#  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED     #
#  WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A            #
#  PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE   #
#  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,  #
#  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
#  DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF     #
#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR        #
#  PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON   #
#  ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT       #
#  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)     #
#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN   #
#  IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.              #
###############################################################
# Supports:                                                   #
#     MySQLdb                                                 #
#     PySQLite                                                #
#     SQLite                                                  #
#     kinterbasedb                                            #
#     mxODBC                                                  #
#     DB2                                                     #
#     cx_Oracle                                               #
#     adodbapi                                                #
#     psycopg                                                 #
###############################################################"""

import string
import sys

class connect:
    def __init__(self, raw_connect_string):
        #Declare instance level variables for later comparisons:
        self.__module=""
        self.__connection_vars={}
        self.active=""
        self.PDODB=""
        con_required = {}
        con_defaults = {}
        con_rename = {}
        
        #Set Up: Parse the incomming connection string.
        connection_item_list = string.split(raw_connect_string, ";")
        for a in connection_item_list:
            if len(a) > 3:  # if incomplete fragment, skip
                b = string.split(a,"=")
                if string.lower(b[0])=="module":
                    self.__module = b[1]
                    self.module = b[1]
                    try:
                        self.PDODB = __import__(b[1],globals(),locals(),['connect'])
                        self.paramstyle = self.PDODB.paramstyle
                        self.last_paramark = 0   # user for numeric param markers in queries
                    except:
                        # you get here if ANY error occurs
                        self.active = 0
                        self.ErrorMessage="Unable to load Database Module: Database specific Error:" + str(sys.exc_type) + "\n" + str(sys.exc_value)
                        raise pdo_connection_error, self.ErrorMessage
                else:
                    if len(b) < 2:
                        b.append("")  # empty value
                    self.__connection_vars[string.lower(b[0])]=b[1]
                    
        #Validation: Setup
        con_required['MySQLdb'] = ['host','port','user','passwd','db']
        con_required['pgdb'] = ['user','password','database']
        con_required['sqlite'] = ['db']
        con_required['kinterbasdb'] = ['dsn', 'user', 'password']
        con_required['mx.ODBC.Windows'] = ['dsn', 'user', 'password']
        con_required['mx.ODBC.iODBC'] = ['dsn', 'user', 'password']
        con_required['mx.ODBC.unixODBC'] = ['dsn', 'user', 'password']
        con_required['DB2'] = ['dsn', 'user', 'password']
        con_required['cx_Oracle'] = ['dsn', 'user', 'password', 'mode']
        con_required['adodbapi'] = []
        con_required['psycopg'] = ['user','password','database']
        
        con_defaults['MySQLdb'] = {}
        con_defaults['MySQLdb']['host'] = 'localhost'
        con_defaults['MySQLdb']['port'] = 3306
        con_defaults['pgdb'] = {}
        con_defaults['sqlite'] = {}
        con_defaults['kinterbasdb'] = {}
        con_defaults['mx.ODBC.Windows'] = {}
        con_defaults['mx.ODBC.iODBC'] = {}
        con_defaults['mx.ODBC.unixODBC'] = {}
        con_defaults['DB2'] = {}
        con_defaults['adodbapi'] = {}
        con_defaults['cx_Oracle'] = {}
        con_defaults['cx_Oracle']['mode'] = 0
        con_defaults['psycopg']={}
        
        con_rename['MySQLdb'] = {}
        con_rename['MySQLdb']['password'] = "passwd"
        con_rename['pgdb'] = {}
        con_rename['DB2'] = {}
        con_rename['adodbapi'] = {}
        con_rename['pgdb']['passwd'] = "password"
        con_rename['pgdb']['db'] = "database"
        con_rename['sqlite'] = {}
        con_rename['kinterbasdb'] = {}
        con_rename['kinterbasdb']['passwd'] = "password"
        con_rename['mx.ODBC.Windows'] = {}
        con_rename['mx.ODBC.Windows']['passwd'] = "password"
        con_rename['mx.ODBC.iODBC'] = {}
        con_rename['mx.ODBC.iODBC']['passwd'] = "password"
        con_rename['mx.ODBC.unixODBC'] = {}
        con_rename['mx.ODBC.unixODBC']['passwd'] = "password"
        con_rename['DB2']['passwd'] = "pwd"
        con_rename['DB2']['user'] = "uid"
        con_rename['cx_Oracle'] = {}
        con_rename['cx_Oracle']['passwd'] = "password"
        con_rename['psycopg']={}
        con_rename['psycopg']['passwd'] = "password"
        con_rename['psycopg']['dbname'] = "database"
        
        #Validation: Rename any fields from pdo-name to db-specific name.
        for myKey in con_rename[self.__module].keys():
            if self.__connection_vars.has_key(myKey):
                self.__connection_vars[con_rename[self.__module][myKey]]=self.__connection_vars[myKey]
                del self.__connection_vars[myKey]

        #Validation: Now set any missing fields to defaults.
        for myKey in con_defaults[self.__module].keys():
            if not self.__connection_vars.has_key(myKey):
                self.__connection_vars[myKey] = con_defaults[self.__module][myKey]

        #Validation: Now make sure we have the required fields.
        any_missing = []
        for item in con_required[self.__module]:
            if not self.__connection_vars.has_key(item):
                any_missing.append(item)
                
        #Validation: Supported Module
        if not con_required.has_key(self.__module):
            self.active=0
            raise pdo_connection_error, "Unsupported DB-API Module."
        
        #print self.__connection_vars.keys()
        #Connection: Try the connection or raise an Error Message
        try:
            #EXCEPTION: Check the module to see if the user is trying to connect through the
            #cx_Oracle Module. If they are use Non-Keywords based Connection String.
            if self.module=="cx_Oracle":
                self.db = self.PDODB.connect(self.__connection_vars['user'], self.__connection_vars['password'], self.__connection_vars['dsn'], int(self.__connection_vars['mode']))
            elif self.module=="adodbapi":
                # rebuild the connection string, everything except module is input
                con_str = ""
                for item in self.__connection_vars.keys():
                    con_str += item + "=" + self.__connection_vars[item] + ";"
                self.db = self.PDODB.connect(con_str)
            else:
                self.db = self.PDODB.connect(**self.__connection_vars)
            self.active=1
           
        except:
            self.active=0
            raise pdo_connection_error, "Unable to connect to the Database: Database specific Error:" + str(sys.exc_type) + "\n" + str(sys.exc_value)

    def close(self):
        try:
            self.db.close()
            self.active=0
            
        except:
            raise pdo_connection_error, "Unable to close connection to the Database:  Database specific Error: " + str(sys.exc_type) + "\n" + str(sys.exc_value)
    
    
    def openRS(self, sql_statement):
        #Depricated: Use connection.open()
        self.rs = resultsource(self, sql_statement)
        return self.rs
        
    def open(self, sql_statement="", Params=None):
        self.rs = resultsource(self, sql_statement, Params)
        return self.rs

    def simpleCMD(self, sql_statement):
        #Depricated: use .execute()
        self.se = SimpleExecute(self, sql_statement)
        return self.se
        
    def pmark(self):
        if self.paramstyle == 'qmark':
            return "?"
        elif self.paramstyle == 'numeric':
            self.last_paramark += 1
            return ":" + str(self.last_paramark) 
        else: 
          
            raise pdo_pmark_error, "pdo_pmark_error: Invalid use of pmark(): the current paramstyle is not qmark(?) or numeric(:1)."  
        
    def execute(self, sql_statement="", paramlist=None):
        if paramlist == None:
            self.se = SimpleExecute(self, sql_statement)
        else:
            self.se = ParamExecute(self, sql_statement, paramlist)
        self.last_paramark = 0
        return self.se
        
    def execute_many(self, sql_statement, paramlist):
        self.se = ParamExecuteMulti(self, sql_statement, paramlist)
        self.last_paramark = 0
        return self.se
        
    def get_cursor(self):
        self.cursor=self.db.cursor()
        return self.cursor


class resultsource:
    def __init__(self, parent, select_statement="", Params=None):
        #Variable Declaration
        self.__cursor = parent.db.cursor()
        self.fields={}
                
        #Run the statement: Append an Error Message to the Instances Error String. 
        try:
            if Params == None:
                self.__cursor.execute(select_statement)
            else:
                self.__cursor.execute(select_statement, Params)
        except:
            raise pdo_resultset_error, "ResultSet Error: Database specific Error:" + str(sys.exc_type) + "\n" + str(sys.exc_value)
            
        #With a Valid RecordSet, bring the entire Recordset into Memory.
        #Set Our Cursor Point at the First Record of the First Row.     
        try:
            self.__rs = self.__cursor.fetchall()
            self.__row = -1
            self.__column = 0
            tmp_position=0
            for i in self.__cursor.description:
                self.fields[i[0]] = Field(i[0], i[1], i[2], i[3], i[4], i[5], i[6], tmp_position, self.__row, self.__rs)
                tmp_position = tmp_position + 1    
        except:
            raise pdo_resultset_error, "ResultSet Error: Database specific Error:" + str(sys.exc_type) + "\n" + str(sys.exc_value)

    def close(self):
        self.__cursor.close()
        self.__rs = None
        for field in self.fields.keys():
            self.fields[field].rows = None
            self.fields[field].cur_row = None
            del self.fields[field]
        #del self
    
    def __getitem__(self, Args=None):
        k = self.fields[Args]
        return k

    def __contains__(self, Args):
        return has_key(Args)

    def __setitem__(self):
        raise NotImplementedError, "This option is not supported"

    def keys(self, L=[]):
        L= None
        L = []
        for i in self.fields:
            L.append(i)
        return L

    def has_key(self, Args):
        if self.fields.has_key(Args):
            return 1
        else:
            return 0

    def get_cur_val(self, column):
        row = self.__row
        k = self.__rs[row][column]
        return k
        
    def row_count(self):
        return len(self.__rs)

    def first(self):
        #Return the first row of the result set.
        self.__row=0
        return 1

    def last(self):
        #Return the Last row of the result set.
        self.__row = len(self.__rs)-1
        return 1

    def reset_field_positions(self):
        for field in self.fields.keys():
            self.fields[field].cur_row = self.__row 

    def next(self):
        if self.__row==-1:
            current_position=0
        else:
            current_position = self.__row + 1
        
        row_count = len(self.__rs)

        result = 1
        if current_position==row_count:
            result = 0
        else:
            self.__row = current_position
            result =  1
        self.reset_field_positions()    
        return result    
    

    def prev(self):
        if self.__row==-1:
            current_position = 0
        else:
            current_position = self.__row - 1
        
        result = 1
        if self.__row==0:
            result = 0
        else:
            self.__row = current_position
            result = 1

        self.reset_field_positions()    
        return result

    def move(self, move_row):
        destination_row=self.__row + move_row
        row_count=len(self.__rs)
        
        result = 1
        if destination_row < 0 or destination_row > row_count:
            result = 0
        else:
            self.__row=destination_row
            result = 1

        self.reset_field_positions()    
        return result

    def moveto(self, move_row):
        row_count=len(self.__rs)
        if move_row < 0:
            start_row=row_count
        else:
            start_row=0
        destination_row=start_row+move_row
        
        result = 1
        if destination_row > row_count or destination_row < 0:
            result = 0
        else:
            self.__row=destination_row
            result = 1

        self.reset_field_positions()
        return result

    def array_from_column(self, column_name):
        self.cur_arr=[]
        current_row=self.__row
        self.__row=-1
        self.reset_field_positions()

        while self.next():
            self.cur_arr.append(self.fields[column_name].value)
        self.__row=current_row
        self.reset_field_positions()
        return self.cur_arr
        
    def dictionary_from_columns(self, column_key, column_value):
        self.cur_dict={}
        current_row=self.__row
        self.__row=-1
        self.reset_field_positions()

        while self.next():
            self.cur_dict[self.fields[column_key].value]=self.fields[column_value].value

        self.reset_field_positions()
        self.__row=current_row
        return self.cur_dict

    def reset(self):
        self.__row=-1
        return 1

class Field:
    #Field Object. Returns information about the specified field.
    #Based upon DB-API 2.0 cursor.description:
    # name, type_code,display_size, internal_size, precision, scale,null_ok 
    def  __init__(self, fname, ftype, defined_size, internal_size, precision, scale, null_ok, position, current_row, rows_list):
        self.name = fname
        self.type_id = ftype
        self.size = defined_size
        self.length = internal_size
        self.precision = precision
        self.scale = scale
        self.null_ok = null_ok
        self.position = position
        self.cur_row = current_row
        self.rows = rows_list

    def __getattr__(self, name):
        if (name=="value"):
            return self.get_cur_val(self.position)
            
    def get_cur_val(self, position):
        row = self.cur_row
        k = self.rows[row][position]
        return k
            
            

class ParamExecute:
    def __init__(self, parent, Statement, Params):
        self.__cursor = parent.db.cursor()
        try:
            self.affected_rows = self.__cursor.execute(Statement, Params)
        except:
            raise pdo_param_execute_error, "Paramater-enabled Execute Error: Database specific Error:" + str(sys.exc_type) + "\n" + str(sys.exc_value)
            self.affected_rows = 0

class ParamExecuteMulti:     # same as above, but calls executemany
    def __init__(self, parent, Statement, Params):
        self.__cursor = parent.db.cursor()
        try:
            self.affected_rows = self.__cursor.executemany(Statement, Params)
            
        except:
            raise pdo_param_execute_error, "Paramater-enabled ExecuteMany Error: Database specific Error:" + str(sys.exc_type) + "\n" + str(sys.exc_value)
            self.affected_rows = 0
            
class SimpleExecute:
    def __init__(self, parent, Statement):
        self.__cursor = parent.db.cursor()
        try:
            self.affected_rows = self.__cursor.execute(Statement)
            try:
                self.insertid=self.__cursor.lastrowid
            except:
                self.insertid = None                
        except:
            raise pdo_simple_execute_error, "Simple Execute Error: Database specific Error:" + str(sys.exc_type) + "\n" + str(sys.exc_value)
            self.affected_rows = 0

class pdo_pmark_error(Exception):
    def __init__(self, value):
        self.value = value
    def __str__(self):
        return repr(self.value)
            
class pdo_connection_error(Exception):
    def __init__(self, value):
        self.value=value
    def __str__(self):
        return repr(self.value)

class pdo_resultset_error(Exception):
    def __init__(self, value):
        self.value=value
    def __str__(self):
        return repr(self.value)
        
class pdo_simple_execute_error(Exception):
    def __init__(self, value):
        self.value=value
    def __str__(self):
        return repr(self.value)                                                                                                                                                                                                                                                                       

class pdo_param_execute_error(Exception):
    def __init__(self, value):
        self.value=value
    def __str__(self):
        return repr(self.value)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.