# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008-2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: Oracle.py 9960 2009-12-02 01:02:49Z grisha $
import cx_Oracle
import threading
import datetime
import os
from decimal import Decimal
from distutils.version import LooseVersion
from snaplogic.components.DBUtils import SnapDBAdapter
from snaplogic.common.snap_exceptions import SnapException,SnapComponentError
from snaplogic.common.data_types import SnapDateTime,SnapNumber,SnapString
"""
Oracle (and cx_Oracle driver)-specific implementation of L{SnapDBAdapter}
"""
CURRENT_CX_ORACLE_VERSION = LooseVersion(cx_Oracle.version)
CX_ORACLE_AFTER_5 = CURRENT_CX_ORACLE_VERSION >= LooseVersion('5.0')
"""See http://python.net/crew/atuining/cx_Oracle/html/node4.html"""
TYPE_CODE_TO_NATIVE_TYPE = {
cx_Oracle.DATETIME : 'date',
cx_Oracle.TIMESTAMP : 'timestamp',
cx_Oracle.NUMBER : 'number',
cx_Oracle.STRING : 'varchar2',
cx_Oracle.FIXED_CHAR : 'char',
cx_Oracle.LONG_STRING : 'long',
cx_Oracle.BINARY : 'raw',
cx_Oracle.CLOB : 'clob',
cx_Oracle.BLOB : 'blob',
cx_Oracle.NCLOB : 'nclob'
}
if CX_ORACLE_AFTER_5:
TYPE_CODE_TO_NATIVE_TYPE_5 = {
cx_Oracle.UNICODE : 'nvarchar2',
cx_Oracle.FIXED_UNICODE : 'nchar',
}
TYPE_CODE_TO_NATIVE_TYPE.update(TYPE_CODE_TO_NATIVE_TYPE_5)
NATIVE_TYPE_TO_SNAP_TYPE = {
'nvarchar2' : SnapString,
'varchar2' : SnapString,
'char' : SnapString,
'nchar' : SnapString,
'long' : SnapString,
'float' : SnapNumber,
'number' : SnapNumber,
'date' : SnapDateTime,
'timestamp' : SnapDateTime,
'interval' : SnapDateTime
}
class OracleCursorWrapper(object):
"""
A wrapper around DB API 2.0 cursor, to handle encoding and type conversion.
Will be returned by L{SnapCx_Oracle.cursor}
"""
def __init__(self, cursor, snap_conn):
"""
Initialize.
@param cursor: DB API 2.0 cursor object, to which most requests will
be delegated.
@type: cursor
@param snap_conn: The L{SnapDBAdapter} object that generated this cursor
@type snap_conn: L{SnapDBAdapter}
"""
self._snap_conn = snap_conn
self._delegate = cursor
self._delegate.numbersAsStrings = 1
self._metadata = None
self._string_fields = None
self._lob_fields = None
self._num_fields = None
def convert_row(self, row):
"""
Convert a row of data in native data types into a row of Snap types.
@param row: row returned by database
@type row: tuple
@return: row converted to Snap data types
@rtype: list
"""
if self._metadata is not None and self._string_fields is None and self._num_fields is None:
return row
if not row:
return row
if self._metadata is None:
self._metadata = self._delegate.description
self._string_fields = None
self._num_fields = None
i = 0
for col_metadata in self._metadata:
type_code = col_metadata[1]
native_type = self._snap_conn.type_code_to_native_type(type_code)
snap_type = self._snap_conn.native_type_to_snap_type(native_type)
if snap_type == SnapString:
if not CX_ORACLE_AFTER_5 or not type_code in [cx_Oracle.UNICODE, cx_Oracle.FIXED_UNICODE]:
if self._string_fields is None:
self._string_fields = []
self._string_fields.append(i)
elif snap_type == SnapNumber:
if self._num_fields is None:
self._num_fields = []
self._num_fields.append(i)
else:
pass
i += 1
new_row = list(row)
if self._string_fields is not None:
for i in self._string_fields:
if new_row[i] is not None:
new_row[i] = new_row[i].decode(self._snap_conn._conn.encoding)
if self._num_fields is not None:
for i in self._num_fields:
val = new_row[i]
if val is not None:
val = str(val)
try:
val = Decimal(val)
except:
# This will happen if the locale is such that group and
# decimal delimiters are, for instance, European -
# period for a group, comma for decimal
val = val.replace(self._snap_conn._decimal_delimiter, '.')
val = val.replace(self._snap_conn._group_delimiter, '')
val = Decimal(val)
new_row[i] = val
return new_row
def convert_results(self, rs):
"""
Convert the result set from native data types to Snap data types.
This is similar to L{convert_row}, except it acts on the entire result
set
@param rs: Result set to convert
@type rs: list or tuple
@return: converted result set
@type: list
"""
if self._metadata is not None and self._string_fields is None and self._num_fields is None:
return rs
if not rs:
return rs
converted_rs = []
for row in rs:
new_row = self.convert_row(row)
converted_rs.append(new_row)
return converted_rs
def fetchone(self):
"""
Same as cursor.fetchone() specified in DB API 2.0, except returning
Snap data types.
"""
row = self._delegate.fetchone()
if row is not None:
row = self.convert_row(row)
return row
def fetchmany(self, size=None):
"""
Same as cursor.fetchmany() specified in DB API 2.0, except returning
Snap data types.
"""
if size is None:
size = self._delegate.arraysize
rs = self._delegate.fetchmany(size)
rs = self.convert_results(rs)
return rs
def fetchall(self):
"""
Same as cursor.fetchall() specified in DB API 2.0, except returning
Snap data types.
"""
rs = self._delegate.fetchall()
rs = self.convert_results(rs)
return rs
def execute(self, operation, parameters=None):
"""
Same as cursor.execute() specified in DB API 2.0. Used to properly
encode SQL statement and parameters.
"""
# https://www.snaplogic.org/trac/ticket/1351
self._metadata = None
if type(operation) == unicode:
try:
operation = operation.encode(self._snap_conn._conn.encoding)
except UnicodeEncodeError, e:
snap_exc = SnapException("Cannot encode SQL command (%s) into the Oracle client encoding (%s); consider setting NLS_LANG variable appropriately" % (operation, self._snap_conn._conn.encoding))
snap_exc.append(e)
raise snap_exc
try:
if parameters:
enc_params = {}
for p_name in parameters:
p_val = parameters[p_name]
p_val_t = type(p_val)
if p_val_t == unicode:
p_val = p_val.encode(self._snap_conn._conn.encoding)
enc_params[str(p_name)] = p_val
parameters = enc_params
return self._delegate.execute(operation, parameters)
else:
return self._delegate.execute(operation)
except Exception, e:
raise
def __getattr__(self, name):
"""
Used to delegate to the native cursor object those methods that are not
wrapped by this class.
"""
result = getattr(self._delegate, name)
return result
class Oracle(SnapDBAdapter):
"""
Implementation of L{SnapDBAdapter} for Oracle.
"""
def __init__(self, *args, **kwargs):
if not os.environ.has_key('NLS_LANG'):
os.environ['NLS_LANG'] = 'American_America.UTF8'
kw2 = {}
for k in kwargs.keys():
v = kwargs[k]
if isinstance(v, unicode):
v = v.encode('utf-8')
kw2[k] = v
conn = cx_Oracle.connect(**kwargs)
self._user = kwargs['user']
cursor = conn.cursor()
# We need to keep this result because setting this value later will work for binding
# but not for retrieving data.
# See also
# http://sourceforge.net/mailarchive/forum.php?thread_name=48C1B0FC.4060809%40snaplogic.org&forum_name=cx-oracle-users
cursor.execute("SELECT value FROM nls_session_parameters WHERE parameter='NLS_NUMERIC_CHARACTERS'");
num_char_result = cursor.fetchall()[0][0]
self._decimal_delimiter = num_char_result[0]
self._group_delimiter = num_char_result[1]
# This request will work for binding...
cursor.execute("ALTER session SET NLS_NUMERIC_CHARACTERS = '.,'")
cursor.execute("ALTER session SET TIME_ZONE = '00:00'")
conn.commit()
cursor.close()
super(Oracle, self).__init__(conn, cx_Oracle)
def cursor(self):
"""
See L{SnapDBAdapter.cursor} and L{OracleCursorWrapper}
"""
native_cursor = SnapDBAdapter.cursor(self)
my_cursor = OracleCursorWrapper(native_cursor, self)
return my_cursor
def upsert(self, table, row, keys, table_metadata=None):
"""
Oracle-specific implementation of L{SnapDBAdapter.upsert()
by using MERGE.
"""
field_names = row.keys()
inner_select_clause = [':%s AS %s' % (f, f) for f in field_names]
sql = "MERGE INTO " + \
table + \
" t1 USING (SELECT " + \
', '.join(inner_select_clause) + \
" FROM DUAL) t2 ON (";
set_clause = ["t1.%s = t2.%s" % (key, key) for key in keys]
sql += ' AND '.join(set_clause)
sql += ")"
sql += " WHEN MATCHED THEN UPDATE SET "
fields_to_set = list(set(field_names) - set(keys))
update_clause = ["t1.%s = t2.%s" % (f, f) for f in fields_to_set]
sql += ",".join(update_clause)
sql += " WHEN NOT MATCHED THEN INSERT ("
sql += ",".join(field_names)
sql += ") VALUES ("
bound_field_names = self.bindVariableList(field_names)
sql += ",".join(bound_field_names)
sql += ")"
cur = self.cursor()
cur.execute(sql, row)
def fix_bound_values(self, record):
"""
Given a record (really, a sequence) whose elements are
Python objects, returns a sequence whose elements
are of the type that the DB expects.
@param record: sequence of values
@type record: sequence
@return: a record with elements converted to types the DB expects.
@rtype: sequence
"""
new_result = {}
for param in record.keys():
value = record[param]
param_t = type(param)
if param_t == unicode:
param = param.encode(self._conn.encoding)
value_t = type(value)
if value_t == unicode:
value = value.encode(self._conn.encoding)
elif value_t == Decimal:
value = str(value)
new_result[param] = value
return new_result
def get_default_schema(self):
"""
See L{SnapDBAdapter.get_default_schema}. Default here is assumed
to be the user connected to Oracle.
"""
return self._user
def list_tables(self, schema = None):
"""
See L{SnapDBAdapter.list_tables}.
"""
if not schema:
schema = self.get_default_schema()
cur = self.cursor()
bind = {':schema' : schema }
sql = """
SELECT table_name FROM all_all_tables
WHERE LOWER(owner) = :schema
UNION ALL
SELECT view_name AS table_name FROM all_views
WHERE LOWER(owner) = :schema
"""
cur.execute(sql, bind)
result = cur.fetchall()
result = [row[0] for row in result]
cur.close()
return result
def limit_rows_clause(self, limit=1):
"""
See L{SnapDBAdapter.limit_rows_clause()}
"""
return "AND rownum<=%s" % limit
def get_snap_view_metadata(self, table_name):
view_def = {}
field_defs = []
(schema, table_name) = self._parse_table_name(table_name)
view_def['schema'] = schema
cur = self.cursor()
sql = """
SELECT * FROM all_tab_columns
WHERE LOWER(owner) = LOWER(:schema) AND
LOWER(table_name) = LOWER(:table_name)
ORDER BY column_id ASC
"""
bind = {':schema' : schema, ':table_name' : table_name}
cur.execute(sql, bind)
result = cur._delegate.fetchall()
if not result:
raise SnapComponentError("Table '%s' not found in schema '%s'" % (table_name, schema))
indices = {}
for i in range(len(cur.description)):
meta = cur.description[i]
indices[meta[0]] = i
for row in result:
# These we need for actual metadata
name = row[indices['COLUMN_NAME']]
data_type = row[indices['DATA_TYPE']].lower()
if data_type.startswith('timestamp('):
data_type = 'timestamp'
elif data_type.startswith('interval '):
data_type = 'interval'
snap_type = self.native_type_to_snap_type(data_type)
desc = []
data_type_mod = row[indices['DATA_TYPE_MOD']]
if data_type_mod:
pass
data_type_owner = row[indices['DATA_TYPE_OWNER']]
if data_type_owner:
pass
nullable = row[indices['NULLABLE']]
desc.append("Nullable: %s" % nullable)
length = row[indices['DATA_LENGTH']]
if length:
desc.append("Length: %s" % length)
precision = row[indices['DATA_PRECISION']]
if precision:
desc.append("Precision: %s" % precision)
scale = row[indices['DATA_SCALE']]
if scale:
desc.append("Scale: %s" % scale)
default = row[indices['DATA_DEFAULT']]
if default:
desc.append("Default: %s" % default)
charset = row[indices['CHARACTER_SET_NAME']]
if charset:
desc.append("Character set: %s" % charset)
desc = '; '.join(desc)
field_def = (name, snap_type, desc,)
field_defs.append(field_def)
pkey_sql = """
SELECT
aic.column_name
FROM all_ind_columns aic
LEFT JOIN all_constraints alc
ON aic.index_name = alc.constraint_name AND
aic.table_name = alc.table_name AND
aic.table_owner = alc.owner
WHERE
LOWER(aic.table_name) = LOWER(:table_name)
AND LOWER(aic.table_owner) = LOWER(:schema)
AND alc.constraint_type = 'P'
"""
cur.execute(pkey_sql, bind)
pkey_rs = cur.fetchall()
primary_key = [row[0] for row in pkey_rs]
cur.close()
view_def['fields'] = tuple(field_defs)
view_def['primary_key'] = primary_key
return view_def
|