# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008 - 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: DBRead.py 10330 2009-12-24 22:13:38Z grisha $
"""
DBRead Module and Resource Definition
"""
from snaplogic.common.data_types import SnapString
from snaplogic.common import version_info
from snaplogic.cc import component_api
from snaplogic.cc.component_api import ComponentAPI
from snaplogic.snapi_base import keys
import snaplogic.cc.prop as prop
from snaplogic.components import DBUtils
from snaplogic.components.DBComponent import DBComponent
from snaplogic.common.snap_exceptions import SnapComponentError
class DBRead(DBComponent):
"""
This class implements the DB Read component.
"""
api_version = '1.0'
component_version = '1.2'
capabilities = {
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 0,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : 0,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : 1,
}
component_description = "Reads data from DB sources."
component_label = "DB Reader"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/DBRead" % \
version_info.doc_uri_version
def create_resource_template(self):
self._create_common_db_props()
self.set_property_def("SQLStmt",
prop.SimpleProp("SQL statement",
SnapString,
"The SQL statement to be executed while reading",
None,
True))
def validate(self, err_obj):
"""
All the properties are marked as required so validate
doesn't have to worry about checking those.
"""
self._common_db_validate(err_obj)
# SQLStmt: Just check that this is a select.
val = self.get_property_value("SQLStmt")
if not component_api.has_param(val) and not val.strip().lower().startswith('select'):
err_obj.get_property_err("SQLStmt").set_message("SQL statement must be a SELECT statement.")
def _get_props_to_check_for_suggest(self):
"""
See L{DBComponent._get_props_to_check_for_suggest}.
"""
return ['SQLStmt']
def _db_suggest_resource_values(self, err_obj, conn):
"""
Attempt to discover output view given connection info.
See L{DBComponent._db_suggest_resource_values}.
"""
sqlstmt = self.get_property_value('SQLStmt')
view_metadata = conn.get_snap_view_metadata_from_select(sqlstmt)
fields = view_metadata['fields']
out_views = self.list_output_view_names()
view_name = 'Output'
if out_views:
view_name = out_views[0]
self.remove_output_view_def(view_name)
self.add_record_output_view_def(view_name, fields, sqlstmt)
def execute(self, input_views, output_views):
try:
output_view = output_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No output view connected.")
sqlstmt = self.get_property_value("SQLStmt")
connect_resdef = self.get_referenced_resdef("DBConnect")
if not connect_resdef:
connect_uri = self.get_property_value("DBConnect")
connect_resdef = self.get_local_resource_object(connect_uri)
conn = None
cursor = None
try:
conn = DBUtils.get_connection_from_resdef(connect_resdef)
cursor = conn.cursor()
cursor.arraysize = 1024
cursor.execute(sqlstmt)
for rec in DBUtils.get_db_record_set(cursor):
# Possibly - sniff cursor status and verify :
# A. Number of columns is equal to input view
# B. Column data types compatible with view data types
out_rec = output_view.create_record()
field_num = 0
for field in rec:
field_name = out_rec.field_names[field_num]
field_num += 1
out_rec[field_name] = field
output_view.write_record(out_rec)
output_view.completed()
finally:
# The reason to not close cursor is that
# that at least in MySQL case, if we have
# a large data set remaining (see #2090), we'd still
# sit here and try to exhaust it. Now, this may
# just be a MySQL special case, but still, since the
# connection object is not shared, and closing connection
# will close the cursor, we can just close the
# connection on got_stopped and not worry.
if conn is not None:
conn.close()
def upgrade_1_0_to_1_1(self):
self._upgrade_to_use_refs()
def upgrade_1_1_to_1_2(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
|