# Welcome to RealPyODBC
# Version 0.2 beta
# This class help you to connect your python script with ODBC engine.
# I need at least ctypes 0.9.2 for work.
#
# This class is not db-api 2.0 compatible. If you want to help me to do it
# please modify it and send me an e-mail with your work!
# All the comunity will thanks you.
#
# Please send bugs and reports to michele.petrazzo@unipex.it
#
# TO-DO
# Make compatibility with db-api 2.0, so add:
# apilevel, theadsafety, paramstyle, cursor, exceptions, ....
#
# This software if released with MIT Licence
'''
A little example
dsn_test = 'vp'
user = 'someuser'
od = odbc()
#Dsn list
DSN_list = od.EnumateDSN()
od.ConnectOdbc(dsn_test, user)
#Get tables list
tables = od.GeTables()
#Get fields on the table
cols = od.ColDescr(tables[0])
#Make a query
od.Query('SELECT * FROM %s' % tables[0])
#Get results
print od.FetchMany(2)
print od.FetchAll()
#Close before exit
od.close()'''
import sys, os, ctypes
from ctypes.util import find_library
import ctypes as C
CSB = ctypes.create_string_buffer
VERBOSE = 1
#Costants
SQL_FETCH_NEXT = 0x01
SQL_FETCH_FIRST = 0x02
SQL_FETCH_LAST = 0x04
SQL_INVALID_HANDLE = -2
SQL_SUCCESS = 0
SQL_SUCCESS_WITH_INFO = 1
SQL_NO_DATA_FOUND = 100
SQL_NULL_HANDLE = 0
SQL_HANDLE_ENV = 1
SQL_HANDLE_DBC = 2
SQL_HANDLE_DESCR = 4
SQL_HANDLE_STMT = 3
SQL_ATTR_ODBC_VERSION = 200
SQL_OV_ODBC2 = 2
SQL_OV_ODBC3 = 3
SQL_TABLE_NAMES = 3
# /* SQL data type codes */
SQL_UNKNOWN_TYPE = 0
SQL_CHAR = 1
SQL_NUMERIC = 2
SQL_DECIMAL = 3
SQL_INTEGER = 4
SQL_SMALLINT = 5
SQL_FLOAT = 6
SQL_REAL = 7
SQL_DOUBLE = 8
SQL_DATE, SQL_DATETIME = 9, 9
SQL_TIME = 10
SQL_TIMESTAMP = 11
SQL_VARCHAR = 12
#Only for odbc api >= 3.0
SQL_TYPE_DATE = 91
SQL_TYPE_TIME = 92
SQL_TYPE_TIMESTAMP = 93
SQL_LONGVARCHAR = -1
SQL_BINARY = -2
SQL_VARBINARY = -3
SQL_LONGVARBINARY = -4
SQL_BIGINT = -5
SQL_TINYINT = -6
SQL_BIT = -7
#SQLColAttributes defines
SQL_DESC_COUNT = 1001
SQL_DESC_TYPE = 1002
SQL_DESC_LENGTH = 1003
SQL_DESC_OCTET_LENGTH_PTR = 1004
SQL_DESC_PRECISION = 1005
SQL_DESC_SCALE = 1006
SQL_DESC_DATETIME_INTERVAL_CODE = 1007
SQL_DESC_NULLABLE = 1008
SQL_DESC_INDICATOR_PTR = 1009
SQL_DESC_DATA_PTR = 1010
SQL_DESC_NAME = 1011
SQL_DESC_UNNAMED = 1012
SQL_DESC_OCTET_LENGTH = 1013
SQL_DESC_ALLOC_TYPE = 1099
SQL_COLUMN_PRECISION = 4
SQL_COLUMN_SCALE = 5
SQL_DESC_DISPLAY_SIZE = 6
SQL_COLUMN_NULLABLE = 7
#Information requested by SQLGetInfo()
SQL_DATA_SOURCE_NAME = 2
SQL_DRIVER_HDBC = 3
SQL_DRIVER_HENV = 4
SQL_DRIVER_HSTMT = 5
SQL_DRIVER_NAME = 6
SQL_DRIVER_VER = 7
SQL_FETCH_DIRECTION = 8
SQL_ODBC_API_CONFORMANCE = 9
SQL_ODBC_VER = 10
SQL_SERVER_NAME = 13
SQL_SEARCH_PATTERN_ESCAPE = 14
SQL_DBMS_NAME = 17
SQL_DBMS_VER = 18
SQL_ACCESSIBLE_TABLES = 19
SQL_ACCESSIBLE_PROCEDURES = 20
SQL_CURSOR_COMMIT_BEHAVIOR = 23
SQL_DATA_SOURCE_READ_ONLY = 25
SQL_DEFAULT_TXN_ISOLATION = 26
SQL_IDENTIFIER_CASE = 28
SQL_IDENTIFIER_QUOTE_CHAR = 29
SQL_DRIVER_ODBC_VER = 77
SQL_INFO = {SQL_DATA_SOURCE_NAME: "DATA_SOURCE_NAME", SQL_DRIVER_NAME: "DRIVER_NAME",
SQL_DRIVER_VER: "DRIVER_VER", SQL_DBMS_NAME: "DBMS_NAME",
SQL_SERVER_NAME: "SERVER_NAME",
SQL_ODBC_VER: "ODBC_VER", SQL_DRIVER_ODBC_VER: "DRIVER_ODBC_VER"}
#Types, thanks to mx constants
SqlTypes = {0:'TYPE_NULL',1:'CHAR',2:'NUMERIC',3:'DECIMAL',4:'INTEGER', \
5:'SMALLINT',6:'FLOAT',7:'REAL',8:'DOUBLE',9:'DATE',10:'TIME',\
11:'TIMESTAMP',12:'VARCHAR',
91: 'TYPE_DATE', 92: 'TYPE_TIME', 93: 'TYPE_TIMESTAMP', -1: 'LONGVARCHAR',
-10: 'WCHAR_LONGVARCHAR', -9: 'WCHAR_VARCHAR', -8: 'WCHAR', -7: 'BIT', -6: 'TINYINT', -5: 'BIGINT',
-4: 'LONGVARBINARY', -3: 'VARBINARY', -2: 'BINARY'}
#Sql data types to python conversion, thanks to mx constants
SqlDT_ToPy = {0: SQL_UNKNOWN_TYPE, 1: SQL_CHAR,
12: SQL_VARCHAR, -1: SQL_LONGVARCHAR,
-8: SQL_CHAR, -9: SQL_CHAR, -10: SQL_CHAR,
-2: SQL_BINARY, -3: SQL_VARBINARY, -4: SQL_VARBINARY,
-6: SQL_TINYINT, 5: SQL_SMALLINT, 4: SQL_INTEGER, -7: SQL_BIT,
-5: SQL_BIGINT,
3: SQL_DECIMAL, 2: SQL_NUMERIC, 7: SQL_REAL, 6: SQL_FLOAT, 8: SQL_DOUBLE,
9: SQL_DATE, 10: SQL_TIME, 11: SQL_DATETIME,
91: SQL_TYPE_DATE, 92: SQL_TYPE_TIME, 93: SQL_TIMESTAMP }
class STRUCT_DATE(C.Structure):
_fields_ = [("year", C.c_short),
("month", C.c_short),
("day", C.c_short)]
Sql_DT_Convert = {SQL_DATE: STRUCT_DATE, SQL_TYPE_DATE: STRUCT_DATE}
#Control the corrispondences
[ SqlDT_ToPy[k] for k in SqlTypes.keys() ]
[ SqlTypes[k] for k in SqlDT_ToPy.keys() ]
#Custom exceptions
class OdbcNoLibrary(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class OdbcLibraryError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class OdbcInvalidHandle(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class OdbcGenericError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class cursor(object):
"""
"""
_db_connection = None
arraysize = 1
def __init__(self):
"""
"""
self._queryExecuted = False
def execute(self, q, *args, **kw):
"""
"""
self._db_connection.Query(q)
self._queryExecuted = True
def fetchone(self):
"""
"""
return self._db_connection.FetchOne()
def fetchmany(self, size=arraysize):
"""
"""
return self._db_connection.FetchMany(size)
def fetchall(self):
"""
"""
return self._db_connection.FetchAll()
def close(self):
"""
"""
self._queryExecuted = False
def _setConnection(self, conn):
"""
"""
self._db_connection = conn
def _GetRowCount(self):
"""
"""
if not self._queryExecuted:
ret = -1
else:
ret = self._db_connection.NumOfRow()
return ret
def _GetDescription(self):
"""
"""
if not self._queryExecuted:
ret = None
else:
ret = self._db_connection.ColDescr()
return ret
description = property(_GetDescription)
rowcount = property(_GetRowCount)
class odbc(object):
"""This class implement a odbc connection. It use ctypes for work.
"""
cursor = cursor
def __init__(self):
"""Init variables and connect to the engine"""
self._connectDb = 0
self._info = "Not yet connected"
self._sql_db_version = -1
os.environ['PATH'] = os.environ['PATH'] + ';' + os.path.abspath(os.path.dirname(__file__))
if sys.platform == 'win32':
functForLoad = ctypes.windll
library_path = "odbc32"
else:
functForLoad = ctypes.cdll
library_path = "odbc"
realPath = find_library(library_path)
if not realPath:
raise OdbcNoLibrary, \
"I cannot find the library at %s. Did you pass me a vaid path?" % libraryName
#Load the library. If there is problems, leave ctypes raise exceptions
self.odbc = getattr(functForLoad, realPath)
lstFunct = ("SQLAllocHandle", "SQLSetEnvAttr", "SQLConnect", "SQLTables", "SQLExecDirect",
"SQLNumResultCols", "SQLRowCount", "SQLDescribeCol", "SQLDataSources", "SQLGetDiagRec",
"SQLFetch", "SQLBindCol", "SQLCloseCursor", "SQLFreeHandle", "SQLDisconnect",
"SQLColAttribute", "SQLGetInfo")
for funct in lstFunct:
f = getattr(self.odbc, funct)
f.restype = C.c_short
self.env_h = C.c_int()
self.dbc_h = C.c_int() #connection handle
self.stmt_h = C.c_int()
ret = self.odbc.SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, ctypes.byref(self.env_h))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_ENV, self.env_h, ret)
ret = self.odbc.SQLSetEnvAttr(self.env_h, SQL_ATTR_ODBC_VERSION, SQL_OV_ODBC2, 0)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_ENV, self.env_h, ret)
ret = self.odbc.SQLAllocHandle(SQL_HANDLE_DBC, self.env_h, ctypes.byref(self.dbc_h))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_DBC, self.dbc_h, ret)
#Bad hack for set the connection
cursor._db_connection = self
def connect(self, dsn, user, passwd = ''):
"""Connect to odbc, we need dsn, user and optionally password"""
self.dsn = dsn
self.user = user
self.passwd = passwd
sn = ctypes.create_string_buffer(dsn)
un = ctypes.create_string_buffer(user)
pw = ctypes.create_string_buffer(passwd)
ret = self.odbc.SQLConnect(self.dbc_h, sn, len(sn), un, len(un), pw, len(pw))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_DBC, self.dbc_h, ret)
self.__set_stmt_h()
self._connectDb = 1
self._info = self.GetInfo()
def GetInfo(self):
"""
"""
dinfo = dict()
for key, val in SQL_INFO.iteritems():
value = CSB(255)
len_used = C.c_int()
self.odbc.SQLGetInfo( self.dbc_h, key, C.byref(value), len(value), C.byref(len_used) )
dinfo[val] = value.value
#Set interanl db version
self._sql_db_version = int( dinfo["DRIVER_ODBC_VER"].split(".")[0] )
return dinfo
def GeTables(self):
"""Return a list with all tables"""
self.__set_stmt_h()
#We want only tables
t_type = ctypes.create_string_buffer('TABLE')
ret = self.odbc.SQLTables(self.stmt_h, None, 0, None, 0, None, 0, \
ctypes.byref(t_type), len(t_type))
if not ret == SQL_SUCCESS:
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
data = self.__bind(SQL_TABLE_NAMES)
return self.__fetch( [data] )
def Query(self, q):
"""Make a query"""
self.__set_stmt_h()
ret = self.odbc.SQLExecDirect(self.stmt_h, q, len(q))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
def FetchOne(self):
return self._fetch(1)
def FetchMany(self, rows):
return self._fetch(rows)
def FetchAll(self):
return self._fetch()
def NumOfCols(self):
"""Get the number of cols"""
NOC = ctypes.c_int()
ret = self.odbc.SQLNumResultCols(self.stmt_h, ctypes.byref(NOC))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
return NOC.value
def NumOfRow(self):
"""Get the number of rows"""
NOR = ctypes.c_int()
ret = self.odbc.SQLRowCount(self.stmt_h, ctypes.byref(NOR))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
return NOR.value
def ColDescr(self, table=None, query=None):
"""We return a list with a tuple for every col:
field, type, number of digits, allow null"""
if (table or query):
q = query or "SELECT * FROM " + table
self.Query(q)
NOC = self.NumOfCols()
ColDescr = list()
for col in xrange(1, NOC+1):
#Use more efficient ColAttr is ww can
if self._sql_db_version > 2:
name = self._getColAttr(col, SQL_DESC_NAME)
type_code = self._getColAttr(col, SQL_DESC_TYPE)
disp_size = self._getColAttr(col, SQL_DESC_DISPLAY_SIZE)
scale = self._getColAttr(col, SQL_COLUMN_SCALE)
precision = self._getColAttr(col, SQL_COLUMN_PRECISION)
try: null_ok = self._getColAttr(col, SQL_COLUMN_NULLABLE)
except: null_ok = None
else:
CName = CSB(1024); Cname_ptr = C.c_int(); Ctype = C.c_int()
Csize = C.c_int(); NOdigits = C.c_int(); Allow_nuls = C.c_int()
ret = self.odbc.SQLDescribeCol(self.stmt_h, col,
C.byref(CName), len(CName), C.byref(Cname_ptr),
C.byref(Ctype),C.byref(Csize),C.byref(NOdigits), C.byref(Allow_nuls))
name = CName.value
type_code = SqlTypes.get(Ctype.value, SqlTypes[0])
disp_size = None
scale = None
precision = NOdigits.value
null_ok = Allow_nuls.value
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
#name, type_code, display_size, internal_size, precision, scale, null_ok
ColDescr.append( (name, type_code, disp_size, None, precision, scale, null_ok) )
return ColDescr
def _getColAttr(self, col, FieldIdentifier):
""" Return the column attribute
"""
pBuff = CSB(1024); lbuff = C.c_int(); nAttr = C.c_int()
ret = self.odbc.SQLColAttribute(self.stmt_h, col, FieldIdentifier,
C.byref(pBuff), len(pBuff),
C.byref(lbuff), C.byref(nAttr) )
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
#If there are some values on the pBuff and its leng has some value, use the buffer
if pBuff.value and lbuff.value:
ret_value = pBuff.value
else:
ret_value = nAttr.value
return ret_value
def EnumateDSN(self):
"""Return a list with [name, descrition]"""
dsn = ctypes.create_string_buffer(1024)
desc = ctypes.create_string_buffer(1024)
dsn_len = ctypes.c_int()
desc_len = ctypes.c_int()
dsn_list = []
while 1:
ret = self.odbc.SQLDataSources(self.env_h, SQL_FETCH_NEXT, \
dsn, len(dsn), ctypes.byref(dsn_len), desc, len(desc), ctypes.byref(desc_len))
if ret == SQL_NO_DATA_FOUND:
break
elif not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, stmt_h, ret)
else:
dsn_list.append((dsn.value, desc.value))
return dsn_list
def ctrl_err(self, ht, h, val_ret):
"""Method for make a control of the errors
We get type of handle, handle, return value
Return a raise with a list"""
state = ctypes.create_string_buffer(5)
NativeError = ctypes.c_int()
Message = ctypes.create_string_buffer(1024*10)
Buffer_len = ctypes.c_int()
err_list = []
number_errors = 1
while 1:
ret = self.odbc.SQLGetDiagRec(ht, h, number_errors, state, \
NativeError, Message, len(Message), ctypes.byref(Buffer_len))
if ret == SQL_NO_DATA_FOUND:
#No more data, I can raise
raise OdbcGenericError, err_list
break
elif ret == SQL_INVALID_HANDLE:
#The handle passed is an invalid handle
raise OdbcInvalidHandle, 'SQL_INVALID_HANDLE'
elif ret == SQL_SUCCESS:
err_list.append((state.value, Message.value, NativeError.value))
number_errors += 1
def close(self):
"""Call me before exit, please"""
self.__CloseCursor()
self.__CloseHandle()
def __set_stmt_h(self):
self.__CloseHandle(SQL_HANDLE_STMT, self.stmt_h)
ret = self.odbc.SQLAllocHandle(SQL_HANDLE_STMT, self.dbc_h, ctypes.byref(self.stmt_h))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
def __fetch(self, cols, NOR = 0):
if not NOR: NOR = self.NumOfRow()
rows = []
while NOR:
row = []
ret = self.odbc.SQLFetch(self.stmt_h)
if ret == SQL_NO_DATA_FOUND:
break
elif not ret == SQL_SUCCESS:
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
for col in cols:
row.append(col.value)
rows.append(row)
NOR -= 1
return rows
def __bind(self, col):
#Get column data type
data = CSB(255)
buff_indicator = C.c_int()
ret = self.odbc.SQLBindCol(self.stmt_h, col, SQL_CHAR, C.byref(data), \
C.sizeof(data), C.byref(buff_indicator))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
return data
def _fetch(self, NOR = 0):
col_vars = []
buff = C.c_int()
for col in xrange(1, self.NumOfCols() +1):
ret = self.__bind(col)
col_vars.append( ret )
return self.__fetch(col_vars, NOR)
def __CloseCursor(self):
ret = self.odbc.SQLCloseCursor(self.stmt_h)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_ENV, self.stmt_h, ret)
return
def __CloseHandle(self, ht='', h=0):
if ht:
if not h.value: return
ret = self.odbc.SQLFreeHandle(ht, h)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_ENV, self.stmt_h, ret)
return
if self.stmt_h.value:
if VERBOSE: print 's'
ret = self.odbc.SQLFreeHandle(SQL_HANDLE_STMT, self.stmt_h)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_ENV, self.stmt_h, ret)
if self.dbc_h.value:
if self._connectDb:
if VERBOSE: print 'disc'
ret = self.odbc.SQLDisconnect(self.dbc_h)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_DBC, self.dbc_h, ret)
if VERBOSE: print 'dbc'
ret = self.odbc.SQLFreeHandle(SQL_HANDLE_DBC, self.dbc_h)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_DBC, self.dbc_h, ret)
if self.env_h.value:
if VERBOSE: print 'env'
ret = self.odbc.SQLFreeHandle(SQL_HANDLE_ENV, self.env_h)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_ENV, self.env_h, ret)
info = property(GetInfo)
if __name__ == "__main__":
if sys.platform == "win32":
dsn_test = 'vp'
else:
dsn_test = 'pg'
user = 'tutti'
od = odbc()
#Dsn list
DSN_list = od.EnumateDSN()
od.connect(dsn_test, user)
#print od.info
#Get tables list
tables = od.GeTables()
if sys.platform == "win32": t = tables[0][0]
else: t = "ttt"
#Get fields on the table
cols = od.ColDescr(t)
#print cols
#Make a query
cur = od.cursor()
cur.execute('SELECT * FROM %s ORDER BY id' % t)
print [(x[0], x[1]) for x in cur.description]
#Get results
print cur.fetchmany()
#print od.FetchAll()
#Close before exit
cur.close()
|