# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008 - 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: DBLookup.py 10330 2009-12-24 22:13:38Z grisha $
"""
DBLookup component and resource definition.
"""
import re
from decimal import Decimal
from urlparse import urlparse
from snaplogic.common.snap_exceptions import *
from snaplogic.common.data_types import SnapString,SnapNumber,SnapDateTime,\
Record
from snaplogic.common import version_info
import snaplogic.components as components
from snaplogic.components import DBUtils
from snaplogic.cc import component_api
from snaplogic.cc.component_api import ComponentAPI
from snaplogic.components.DBComponent import DBComponent
from snaplogic.snapi_base import keys
import snaplogic.cc.prop as prop
from snaplogic import snapi
class DBLookup(DBComponent):
"""
Component to perform a database lookup.
This component can run an SQL query against a database using the input fields from a particular record as
substitution values for the query.
An example SQL statement with subsitution keywords embedded within it follows:
SELECT * FROM table WHERE column = ${field}
In this example query, '${field}' will be replaced with the value of the 'field' input field
for each record that passes through this component.
"""
api_version = '1.0'
component_version = '1.2'
capabilities = {
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : 1,
ComponentAPI.CAPABILITY_ALLOW_PASS_THROUGH : True
}
component_description = "This component performs a database lookup."
component_label = "DB Lookup"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/DateDimension" % \
version_info.doc_uri_version
def create_resource_template(self):
self._create_common_db_props()
self.set_property_def("SQLStmt",
prop.SimpleProp("SQL statement",
SnapString,
"The SQL statement to be executed while processing records",
None,
True))
self.set_property_def("IgnoreMultipleRows",
prop.SimpleProp("Ignore multiple rows",
"boolean",
"A boolean flag indicating if multiple rows should be ignored."
" If not ignored, they will generate an exception.",
None,
True))
self.set_property_value("IgnoreMultipleRows", False)
def validate(self, err_obj):
"""
All the properties are marked as required so validate doesn't have to worry about checking those.
We do need to check that the SQL statement columns map correctly to our input and output fields.
"""
# DBConnect: Just check that this looks like a URI that begins with http:// or /
self._common_db_validate(err_obj)
# SQLStmt: Just check that this is a select.
val = self.get_property_value("SQLStmt")
if not component_api.has_param(val):
if not val.strip().lower().startswith('select'):
err_obj.get_property_err("SQLStmt").set_message("SQL statement must be a SELECT statement.")
else:
# SQLStmt: check that that any input.fieldname variables in the SQLStmt match field names in the
# input view.
bind_cols = self.get_referenced_fields(val, True)
input_views = self.list_input_view_names()
input_view = self.get_input_view_def(input_views[keys.SINGLE_VIEW])
input_view_fields = [ d[keys.FIELD_NAME] for d in input_view[keys.VIEW_FIELDS] ]
for bind_col in bind_cols:
if bind_col not in input_view_fields:
err_obj.get_property_err("SQLStmt").set_message("Input field(s) '%s' not present in input view." % bind_col)
def upgrade_1_0_to_1_1(self):
self._upgrade_field_refs("SQLStmt")
self._upgrade_to_use_refs()
def upgrade_1_1_to_1_2(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
def execute(self, input_views, output_views):
# Connect to database and get a cursor
try:
output_view = output_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No output view connected.")
try:
input_view = input_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No input view connected.")
connect_resdef = self.get_referenced_resdef("DBConnect")
if not connect_resdef:
connect_uri = self.get_property_value("DBConnect")
connect_resdef = self.get_local_resource_object(connect_uri)
self._db = None
try:
self._db = DBUtils.get_connection_from_resdef(connect_resdef)
self._cursor = self._db.cursor()
self._cursor.arraysize = 1
self._query = self.get_property_value('SQLStmt')
self._ignore_mult_rows = self.get_property_value('IgnoreMultipleRows')
self._fields_to_replace = self.get_referenced_fields(self._query)
self._bind_vars = self._db.bindVariableList(self._fields_to_replace)
replacements = {}
for i in range(len(self._fields_to_replace)):
replacements[self._fields_to_replace[i]] = self._bind_vars[i]
self._query = self.replace_referenced_fields(self._query, replacements)
record = input_view.read_record()
if record is None:
self._cursor.close()
self._db.close()
output_view.completed()
return
while record is not None:
output_view.write_record(self._runQuery(record, output_view))
record = input_view.read_record()
output_view.completed()
finally:
# The reason to not close cursor if got_stopped is that
# at least in MySQL case, if we have
# a large data set remaining (see #2090), we'd still
# sit here and try to exhaust it. Now, this may
# just be a MySQL special case, but still, since the
# connection object is not shared, and closing connection
# will close the cursor, we can just close the
# connection on got_stopped and not worry.
try:
self._db.close()
except Exception, e:
pass
def _runQuery(self, record, output_view):
"""
Run the query (after substitution) and return a data record for output.
This may be overridden to handle the result set differently. For example, if no results are returned
one might want to return a default set of values.
@param record: An input record.
@type record: snaplogic.common.DataTypes.Record
@return: A record for output.
@rtype: snaplogic.common.DataTypes.Record
@raise SnapValueError: No result rows were returned or more than one row was returned and the
IgnoreMultipleRows property is False.
"""
fake_rec = Record(None, None, None)
fake_rec.view_name = record.view_name
fake_rec.field_names = []
for field_to_replace in self._fields_to_replace:
if field_to_replace in fake_rec.field_names:
continue
fake_rec.field_names.append(field_to_replace)
fake_rec.python_types_map = {}
for field_name in self._fields_to_replace:
fake_rec.python_types_map[field_name] = record.python_types_map[field_name]
fake_rec[field_name] = record[field_name]
bind_values = self._db.bindValueContainer(fake_rec)
self._cursor.execute(self._query, bind_values)
row = self._cursor.fetchone()
if row is None:
raise SnapValueError('Zero rows returned from SQL query')
elif not self._ignore_mult_rows and (self._cursor.fetchone() is not None):
raise SnapValueError('SQL query returned too many rows')
out_rec = output_view.create_record()
out_rec.transfer_pass_through_fields(record)
field_num = 0
# Process fields
for field in row:
field_name = out_rec.field_names[field_num]
field_num += 1
out_rec[field_name] = field
return out_rec
|