# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008-2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: MySQL.py 9428 2009-10-22 23:03:13Z grisha $
import datetime
from decimal import Decimal
import MySQLdb
from MySQLdb import FIELD_TYPE,converters
from MySQLdb.cursors import SSCursor
from snaplogic.common.snap_exceptions import SnapComponentError
from snaplogic.components.DBUtils import SnapDBAdapter
from snaplogic.common.data_types import SnapString,SnapNumber,SnapDateTime
# See http://dev.mysql.com/doc/refman/5.0/en/c-api-datatypes.html
TYPE_CODE_TO_NATIVE_TYPE = {
FIELD_TYPE.TINY : 'tinyint',
FIELD_TYPE.SHORT : 'smallint',
FIELD_TYPE.LONG : 'int',
FIELD_TYPE.INT24 : 'mediumint',
FIELD_TYPE.LONGLONG : 'bigint',
FIELD_TYPE.DECIMAL : 'decimal',
FIELD_TYPE.NEWDECIMAL : 'decimal',
FIELD_TYPE.FLOAT : 'float',
FIELD_TYPE.DOUBLE : 'double',
FIELD_TYPE.BIT : 'bit',
FIELD_TYPE.TIMESTAMP : 'timestamp',
FIELD_TYPE.DATE : 'date',
FIELD_TYPE.TIME : 'time',
FIELD_TYPE.DATETIME : 'datetime',
FIELD_TYPE.YEAR : 'year',
FIELD_TYPE.STRING : 'char',
FIELD_TYPE.VAR_STRING : 'varchar',
FIELD_TYPE.BLOB : 'text',
FIELD_TYPE.TINY_BLOB : 'tinytext',
FIELD_TYPE.MEDIUM_BLOB: 'mediumtext',
FIELD_TYPE.LONG_BLOB : 'longtext'
}
NATIVE_TYPE_TO_SNAP_TYPE = {
'bigint' : SnapNumber,
'bit' : SnapNumber,
'boolean' : SnapNumber,
'tinyint' : SnapNumber,
'smallint' : SnapNumber,
'mediumint' : SnapNumber,
'int' : SnapNumber,
'float' : SnapNumber,
'double' : SnapNumber,
'real' : SnapNumber,
'decimal' : SnapNumber,
'char' : SnapString,
'varchar' : SnapString,
'tinytext' : SnapString,
'text' : SnapString,
'mediumtext' : SnapString,
'longtext' : SnapString,
'enum' : SnapString,
'datetime' : SnapDateTime,
'time' : SnapDateTime,
'date' : SnapDateTime,
'timestamp' : SnapDateTime,
'year' : SnapNumber
}
def conv_NUM(v):
"""
Convert a number to a Decimal.
Note: This and other conv_* methods are custom converters we
add to the MySQLdb as per
http://mysql-python.sourceforge.net/MySQLdb-1.2.2/public/MySQLdb.converters-module.html
"""
return Decimal(v)
def conv_DATE(v):
"""
Convert date returned as a string into a datetime object.
@param v: date
@type v: str
@return: datetime object
@rtype: datetime
"""
return datetime.datetime(*map(int, v.split(u'-')))
def conv_TIME(v):
"""
Convert time (returned as a string)
@param v: time
@type v: str
@return: datetime object with today's date and the provided time
@rtype: datetime
"""
parts = map(int, v.split(u':'))
if len(parts) < 4:
microseconds = 0
else:
microseconds = parts[3]
t = datetime.time(parts[0], parts[1], parts[2], microseconds)
return datetime.datetime.combine(datetime.date.today(), t)
def conv_BIT(v):
"""
Parse a BIT string into a number.
"""
cnt = 0
retval = 0
if len(v) > 1:
pass
for ch in v[::-1]:
cval = ord(ch)
retval += cval << (cnt * 8)
cnt += 1
return Decimal(retval)
CONV = converters.conversions.copy()
CONV.update({
FIELD_TYPE.TIME : conv_TIME,
FIELD_TYPE.LONG : conv_NUM,
FIELD_TYPE.LONGLONG : conv_NUM,
FIELD_TYPE.TINY : conv_NUM,
FIELD_TYPE.DATE : conv_DATE,
FIELD_TYPE.DECIMAL : conv_NUM,
FIELD_TYPE.DOUBLE : conv_NUM,
FIELD_TYPE.YEAR : conv_NUM,
FIELD_TYPE.FLOAT : conv_NUM,
FIELD_TYPE.SHORT : conv_NUM,
FIELD_TYPE.INT24 : conv_NUM,
FIELD_TYPE.BIT : conv_BIT
})
class MySQL(SnapDBAdapter):
"""
Implementation of L{SnapDBAdapter} for MySQL.
"""
def __init__(self, *args, **kwargs):
if 'conv' not in kwargs:
kwargs['conv'] = CONV
self._db = kwargs['db']
kwargs['charset'] = 'utf8'
kwargs['cursorclass'] = SSCursor
conn = MySQLdb.connect(**kwargs)
super(MySQL, self).__init__(conn, MySQLdb)
def get_default_schema(self):
"""
See L{SnapDBAdapter.get_default_schema}. Default here is assumed
to be the "db" property of L{snaplogic.components.ConnectionMySQL}.
"""
return self._db
def list_tables(self, schema = None):
"""
See L{SnapDBAdapter.list_tables}.
"""
if not schema:
schema = self.get_default_schema()
cur = self.cursor()
sql = "SELECT table_name FROM information_schema.tables WHERE table_schema = %s"
cur.execute(sql, (schema))
result = cur.fetchall()
result = [row[0] for row in result]
cur.close()
return result
def limit_rows_clause(self, limit=1):
"""
See L{SnapDBAdapter.limit_rows_clause()}
"""
return "LIMIT %s" % limit
def get_snap_view_metadata(self, table_name):
view_def = {}
primary_key = []
view_def['primary_key'] = primary_key
field_defs = []
(schema, table_name) = self._parse_table_name(table_name)
view_def['schema'] = schema
cur = self.cursor()
sql = "SELECT * FROM information_schema.columns WHERE LOWER(table_schema) = LOWER(%s) AND LOWER(table_name) = LOWER(%s) ORDER BY ordinal_position ASC"
cur.execute(sql, (schema, table_name))
result = cur.fetchall()
if not result:
raise SnapComponentError("Table '%s' not found in schema '%s'" % (table_name, schema))
indices = {}
for i in range(len(cur.description)):
meta = cur.description[i]
indices[meta[0]] = i
for row in result:
# These we need for actual metadata
name = row[indices['COLUMN_NAME']]
data_type = row[indices['DATA_TYPE']]
try:
snap_type = self.native_type_to_snap_type(data_type)
except KeyError:
# Unsupported type
continue
key = row[indices['COLUMN_KEY']]
if key == 'PRI':
primary_key.append(name)
# These we need for description
desc = []
col_type = row[indices['COLUMN_TYPE']]
desc.append("Definition: %s" % col_type)
default = row[indices['COLUMN_DEFAULT']]
if default:
desc.append("Default: %s" % default)
nullable = row[indices['IS_NULLABLE']]
desc.append("Nullable: %s" % nullable)
char_max_len = row[indices['CHARACTER_MAXIMUM_LENGTH']]
if char_max_len:
desc.append("Max length: %s" % char_max_len)
charset = row[indices['NUMERIC_PRECISION']]
if charset:
desc.append("Character set: %s" % charset)
collation = row[indices['COLLATION_NAME']]
if collation:
desc.append("Collation: %s" % collation)
precision = row[indices['NUMERIC_PRECISION']]
if precision:
desc.append("Precision: %s" % precision)
scale = row[indices['NUMERIC_SCALE']]
if scale:
desc.append("Scale: %s" % scale)
desc = '; '.join(desc)
field_def = (name, snap_type, desc,)
field_defs.append(field_def)
cur.close()
view_def['fields'] = tuple(field_defs)
return view_def
|