db_lib.py :  » Business-Application » hylaPEx » hylapex » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Business Application » hylaPEx 
hylaPEx » hylapex » db_lib.py
#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys, os, exceptions, traceback
from operator import itemgetter

from library import utils

#Always import *our* odbc interface
import RealPyOdbc

#Internal debug functions
DEBUG = 1

#Start with all the modules are loaded
USE_MYSQL, USE_ODBC, USE_SQLITE, USE_PYODBC = 1, 1, 1, 0

#Try mysql
import MySQLdb
try:
    import MySQLdb
    import MySQLdb.cursors
except ImportError:
    if DEBUG: print "Error on load mysql driver"
    USE_MYSQL = 0

#And mx odbc
try:
    import pyodbc
    USE_PYODBC = 1
except ImportError:
    if DEBUG: print "Error on load PYODBC"
    ODBC = RealPyOdbc
    
#and sqlite2
if sys.version > '2.5':
    import sqlite3 as sqlite
else:
    try:
        from pysqlite2 import dbapi2
    except:
        if DEBUG: print "Error on load pysqlite2"
        USE_SQLITE = 0

import psycopg2

class _base_db(object):
    """ Base and common class for the db
    """
    def __init__(self, *args, **kw):
        
        self._cursorClass = None
        self._functLog = None
        self._my_db = None
        self._debugL = kw.get("debug", None)

        #If passme into the debug param the function for log, keep it
        if callable(self._debugL):
            self._functLog = self._debugL
        elif "functLog" in kw and callable(kw["functLog"]):
            self._functLog = kw["functLog"]
        self._connOK = 0
    
    def conn(self, *args, **kw):
        if self._connOK:
            self.close()
            
        if kw.pop("no_cursors_class", None):
            self._cursorClass = None
        
        ret = self._conn(*args, **kw)
        self._connOK = 1
        return ret
    
    def queryReturn(self, *args, **kw):
        self._logDebug(*args, **kw)
        if not self._connOK:
            self._logDebug("No connection", story=1)
            return 3, 'Not connected'
        
        #Do the real query
        cur = self._getCursor()
        if cur:
            ret = self._qRet(cur, *args, **kw)
            cur.close()
        else:
            ret = (1, "No cursor")

        del cur

        return ret

    def queryNoReturn(self, q, *args, **kw):
        self._logDebug(q, *args, **kw)
        if not self._connOK:
            self._logDebug("Not already connected", story=1)
            return 3, 'Not connected'
        
        cur = self._getCursor()
        q = [x for x in q.split(";") if x.strip()]
        ret = self._qNoRet(cur, q, *args, **kw)
        cur.close()
        
        return ret
        
    def close(self):
        """ Close the connection, if need
        """
        if not self._connOK: return
        if not self._my_db: return
        
        #print "close"
        self._my_db.close()
        self._connOK = 0
    
    #Common functions
        
    def _logDebug(self, *args, **kw):
        """ Make the debug, and its story if asked
        """
        story = kw.pop("story", None)
        if story:
            story = utils.create_story_call()

        if self._debugL and self._functLog:
            try:
                self._functLog(*args, **kw)
            except Exception, ex:
                msg_err = "Error on log _base_db::_logDebug: " + str(ex)
                self._functLog(msg_err)
                print msg_err
            if story: self._functLog(story)

        elif self._debugL:
            print "Out from debug on db:"
            print args
            print kw
            if story: print story
        else:
            pass
        
        if story: 
            for i in story: print i
            print "\n\n"
        
        
    def _getCursor(self):
        """ Return a new cursor. Is delegate to you to call the
            close method!!!
        """
        if not self._my_db:
            return None
        if self._cursorClass:
            cur = self._my_db.cursor(self._cursorClass)
        else:
            cur = self._my_db.cursor()
        return cur
        
    def __del__(self):
        """ Close the db
        """
        self.close()
        print "Closing db..."

    #To overwrite
    def query_last(self):
        pass
    def _conn(self, *args, **kw):
        pass
    def _qRet(self, *args, **kw):
        pass
    def _qNoRet(self, *args, **kw):
        pass

class Sqlite(_base_db):
    def __init__(self, db, tables=None, log=None, *args, **kw):
        
        super(Sqlite, self).__init__(*args, **kw)
        
        if not tables: tables = ()
        self._tablesToCreate = tables
        self._sqlite_dbName = db
        self._sqlite_file_log = log
    
    def _conn(self):
        
        par = {}
        #par["cached_statements"] = 1
        par["check_same_thread"] = 0
        #par["isolation_level"] = None
        #par["isolation_level"] = "IMMEDIATE"
        
        self._my_db = sqlite.connect(self._sqlite_dbName, **par)
        #self._my_db.isolation_level = None
        
        cur = self._getCursor()
        cur.execute("select tbl_name from sqlite_master where type='table' order by tbl_name")
        tables = cur.fetchall()
        if tables:
            tables = [ x[0] for x in tables ]
        
        if not  tables and self._tablesToCreate:
            for table in self._tablesToCreate:
                cur.execute(table)
                self._my_db.commit()
        cur.close()
        
        return tables

    def _qRet(self, cur, q, *args, **kw):
        """
        """
        ret_ec = 0
        ret_val = []
        if isinstance(q, (list, tuple)):
            ret_val = [self._qRet(query) for query in q]
            
            if filter(lambda x: x> 0, map(itemgetter(0), ret_val)):
                ret_ec = 1
        else:
            try:
                cur.execute(q, *args, **kw)
                self._my_db.commit()
                ret_val = cur.fetchall()
            except:
                print "q: ", q, args, kw
                ret_val = traceback.format_exc()
                ret_ec = 1
                self._logDebug("Exception on Sqlite::_qRet: %s" % str(ret_val), story=1)
        return ret_ec, ret_val
        
    def _qNoRet(self, cur, q, *args, **kw):
        try:
            for query in q:
                ret = cur.execute(query, *args, **kw)
            #cur.close()
            self._my_db.commit()
            return 0, 'ok'
        except:
            value = traceback.format_exc()
            print "noRet", q, args, kw
            self._logDebug("Exception on Sqlite::_qNoRet: %s" % str(value),story=1)
            return 1, value

    def query_last(self):
        return str( int(self.queryReturn('SELECT LAST_INSERT_ROWID()')[0][0]) )

    def get_cols(self, table):
        """
        """
        self._logDebug("get_cols. table=%s" % table)
        
        q = "select * from %s LIMIT 1" % table
        cur = self._getCursor()

        if not cur:
            return 3, "Not connected"

        ret = self._qRet(cur, q)
        
        if ret[0] == 0:
            ret = 0, [x[0] for x in cur.description]
        cur.close()
        
        return ret


class PgSql(_base_db):
    def __init__(self, *args, **kw):
        super(PgSql, self).__init__(*args, **kw)

    def _conn(self, db_host, db_name, db_user, db_passwd):
        """
        """
        DSN = ( "dbname='%s' user='%s' host='%s' password='%s'" %
                    (db_name, db_user, db_host, db_passwd) )
        try:
            self._my_db = psycopg2.connect(DSN)
        except:
            return 1, traceback.format_exc()
    
    def _qRet(self, cur, q, *args, **kw):
        """
        """
        ret_ec = 0
        try:
            cur.execute(q, *args, **kw)
            self._my_db.commit()
            ret_val = cur.fetchall()
        except:
            self._my_db.rollback()
            ret_ec = 1
            ret_val = traceback.format_exc()
            self._logDebug("Exception on PgSql::_qRet: %s" % ret_val)
        
        return ret_ec, ret_val
    
    def _qNoRet(self, cur, q, *args, **kw):
        try:
            for query in q:
                ret = cur.execute(query, *args, **kw)
                self._my_db.commit()
            return 0, 'ok'
        except:
            self._my_db.rollback()
            msg = traceback.format_exc()
            self._logDebug("Exception on PgSql::_qNoRet: %s" % msg)
            return 1, msg

class Mysql(_base_db):
    """ Class for talk with mysql
    """
    def __init__(self, *args, **kw):
        if not USE_MYSQL:
            raise AttributeError("module mysql not present")

        super(Mysql, self).__init__(*args, **kw)
        self._cursorClass = MySQLdb.cursors.DictCursor
        
    def _conn(self, vals):
        try:
            d = dict()
            d["host"] = vals['server']
            d["user"] = vals['user']
            d["passwd"] = vals.get('pass', "")
            d["db"] = vals['db']
            d["compress"] = 1
            self._my_db = MySQLdb.Connect(**d)
            self._my_db.autocommit(False) 
            return 0, 'ok'
        except:
            msg = "Exception on Mysql, Exception::_conn: %s" % traceback.format_exc()
            self._logDebug(msg)
            return 2, msg
        
    def _qRet(self, cur, q, *args, **kw):
        try:
            cur.execute(q, *args, **kw)
            self._my_db.commit()
            rows = cur.fetchall()
            return 0, rows
        except:
            self._my_db.rollback()
            msg = "Exception on Mysql, _qRet: %s" % traceback.format_exc()
            self._logDebug(msg)
            return 1, msg

    def _qNoRet(self, cur, q, *args, **kw):
        try:
            for query in q:
                ret = cur.execute(query, *args, **kw)
                self._my_db.commit()
            #ret = cur.execute(q, *args, **kw)
            #self._my_db.commit()
            return 0, ret
        except:
            self._my_db.rollback()
            msg = "Exception on Mysql, _qNoRet: %s" % traceback.format_exc()
            self._logDebug(msg)
            return 1, msg


class Odbc(_base_db):
    def __init__(self, *args, **kw):
        super(Odbc, self).__init__(*args, **kw)
        
        self._pyodbc_usage = USE_PYODBC

        #Load always our, internal, odbc library
        self._odbc_dbR = RealPyOdbc.odbc()
    
    def _conn(self, dsn='', user='', passwd='', *args, **kw):
        self._dsn = dsn
        self._user = user
        self._passwd = passwd
        
        #If the caller wants, not use mxodbc
        noMXODBC = kw.get("noMXODBC", None)
        dsn_odbc = "DSN=%s;UID=%s;PWD=%s" % (dsn, user, passwd)
        self._logDebug("Connect dsn: %s" % dsn_odbc)

        try:
            self._odbc_dbR.connect(dsn, user, passwd)
            if USE_PYODBC and not noMXODBC:
                try:
                    self._my_db = pyodbc.connect(dsn_odbc)
                except:
                    #can be a driver than not support autocommit?
                    self._my_db = pyodbc.connect(dsn_odbc, 1)
            else:
                self._my_db = self._odbc_dbR
                self._pyodbc_usage = 0

            return 0, 'Ok'

        except:
            ex = traceback.format_exc()
            print ex
            self._logDebug(ex)
            return 1, ex

    def enum_dsn(self):
        """
        """
        try:
            dsn = self._odbc_dbR.EnumateDSN()
            self._logDebug("Enumerate dsn: %s" % dsn)
            return 0, dsn
        except Exception, ex:
            self._logDebug("Error on enum_dsn: %s" % str(ex), story=1)
            return 1, ex
        
    def get_tables(self):
        """
        """
        try:
            if self._pyodbc_usage:
                tables = [ (x.table_name, None) for x in self._my_db.cursor().tables() if x.table_type == "TABLE"]
            else:
                tables = self._odbc_dbR.GeTables()
            self._logDebug("get_tables: %s" % tables)
            return 0, tables
        except Exception, ex:
            self._logDebug("Error on get_tables: %s" % str(ex), story=1)
            return 1, ex
    
    def get_cols(self, table=None, query=None):
        """
        """
        self._logDebug("get_cols. table=%s, query=%s" % (table, query) )

        q = query or "select * from %s LIMIT 1" % table
        cur = self._getCursor()

        if not cur:
            return 3, "Not connected"

        ret = self._qRet(cur, q, nor=1)

        if ret[0] == 3:
            #there is an error
            #can be an ms sql server limit bug, retry
            q = "select top 1 * from %s" % table
            cur = self._getCursor()
            ret = self._qRet(cur, q, nor=1)

        if ret[0] == 0:
            ret = 0, cur.description
        else:
            #the last try. Without limit or other because the driver can be that doesn't support limit!
            q = "select * from %s" % table
            cur = self._getCursor()
            ret = self._qRet(cur, q, nor=1)
            ret = 0, cur.description

        cur.close()

        return ret

    def _qRet(self, cur, q, *args, **kw):
        """
        """
        self._logDebug("_qRet:, start")
        if not cur:
            return 3, "Not connected"

        nor = kw.get("nor", 0)
        
        try:
            if isinstance(q, unicode): q = str(q)
            
            self._logDebug("_qRet: " + q)
            cur.execute(q)

            if nor == 1: f = [ cur.fetchone(), ]
            elif nor != 0: f = cur.fetchmany(nor)
            else: f = cur.fetchall()
            #delete the unwanted chars
            lstRow = []
            #Strange, but happen that f are: f[None]
            if f and f[0] is None:
                f = []

            for row in f:
                lstCol = []
                for col in row:
                    if col not in (None, "None"): lstCol.append(col)
                    else: lstCol.append("")
                lstRow.append(lstCol)
            return 0, lstRow

        except:
            value = traceback.format_exc()
            self._logDebug("Error on _qRet: %s" % str(value))
            ret_val = 2
            if "LIMIT" in q.upper():
                #can be a very NOT STANDARD MS-SQL bug?
                ret_val = 3

            return ret_val, value

    def _qNoRet(self, cur, q, *args, **kw):
        """
        """
        if not cur:
            return 3, "Not connected"

        try:
            for query in q:
                if isinstance(query, unicode): query = str(query)
                ret = cur.execute(query)
            return 0, ret
        except:
            value = traceback.format_exc()
            self._logDebug(value)
            return 1, value



class Db_print(dict):
    """ Class that will handle the user that print a fax.
        TODO: end the class
    """
    def __init__(self, *args, **kw):
        self._My_db = kw.pop("dbPrint")
        super(Db_print, self).__init__(*args, **kw)
    
    def keys(self, *args, **kw):
        self._My_db.GetAll()
        return super(Db_print, self).keys(*args, **kw)
        

if __name__ == '__main__':
    #print 'This class is for module use only'
    table_list = """CREATE TABLE fb (name text not null unique,
    fb_order int, fb_db_name text, fb_type text not null, fb_fields text,
    fb_other text, id integer primary key unique)"""
    q = "SELECT * FROM fb"
    
    par = dict( [ ("server", "server"), ("user", "michele"), ("db", "hylapex")] )
    d = Odbc(debug=1)
    d.conn("pg", "tutti")
    ret, valTables = d.get_tables()
    t = valTables[0][0]
    ret, val = d.get_cols(t)
    print ret, val
    d.queryReturn('SELECT * FROM %s' % t)
    #d.queryNoReturn()
    d.close()
    
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.