# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: DBAnalyzer.py 10330 2009-12-24 22:13:38Z grisha $
import urlparse
from snaplogic.common import version_info
from snaplogic.common.data_types import SnapString
from snaplogic.common.snap_exceptions import SnapComponentError
from snaplogic.common import snap_log
from snaplogic.snapi_base import keys
from snaplogic.snapi_base.exceptions import SnapiException,SnapiHttpException
from snaplogic.server.http_request import HttpRequest
import snaplogic.cc.prop as prop
from snaplogic.cc.component_api import ComponentAPI,has_param
from snaplogic.components import DBUtils
from snaplogic.components.computils import validation_utils1
DB_CONNECT_PROP = "DBConnect"
BASE_URI_PROP = "BaseURI"
ALTERNATE_SCHEMA_PROP = "AlternateSchema"
TABLE_NAME_PROP = "TableName"
SNAPLOGIC_USER_PROP = "SnapLogicUser"
SNAPLOGIC_PASSWD_PROP = "SnapLogicPassword"
RESOURCE_EXISTS_PROP = "ResourceExists"
COMP_TYPE_PROP = "ComponentType"
EXISTS_FAIL = "Fail"
EXISTS_SKIP = "Skip"
EXISTS_OVERWRITE = "Overwrite"
exists_lov = [EXISTS_FAIL, EXISTS_SKIP, EXISTS_OVERWRITE]
"""List of possible actions if a resource already exists."""
class DBAnalyzer(ComponentAPI):
"""
Discovers tables in a specified DB and creates Read, Insert, Update, Delete and Lookup resources for those tables.
This component takes a Connection Resource URI as a property value and analyzes the database instance that
it points to. The tables and table columns discovered from this analysis is then used to create resources
based on DBRead (read resource), DBWrite (insert, update and delete resources) and DBLookup (lookup resource).
These resources are created under the URI prefix specified by the BaseURI property. The resources generated
have the appropriate view definitions.
- For update resource, a sample SQL "where clause" is specified. This sample where clause searches for a match
on primary key, before doing an update. If the table has no primary key, then the first field of the table
is used in the sample where clause.
- For lookup resource, we provide a sample "where clause" using the same approach as the one used for update
resource. The input view of the lookup resource contains only the fields that are used in the where clause.
- For delete resource, we once again provide a sample where clause which matches on primary key
to delete records. However, if the table has no primary key, then no sample where clause is provided. We do this
to avoid providing a delete resource whose default where clause could unexpectedly delete large number of records.
"""
api_version = '1.0'
component_version = '1.3'
capabilities = {
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 0,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : 0,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 0,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : 0,
}
component_description = "This component analyzes tables in DB and creates resources that represent those tables."
component_label = "DB Analyzer"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/UserGuide/DBWizard" % \
version_info.doc_uri_version
def create_resource_template(self):
# TODO this class should really probably
# inherit from DBComponent
self.add_resource_ref_def("DBConnect", "Connection Resource", ["connection.db"], required=False)
self.set_property_def(DB_CONNECT_PROP,
prop.SimpleProp("Connection Resource",
SnapString,
"The URI of a Connection Resource which contains the properties "
"needed to create a DB connection. Can be overridden by resource "
"reference in a pipeline.",
None,
True))
self.set_property_def(BASE_URI_PROP,
prop.SimpleProp("Base URI",
SnapString,
"The base URI under which all the resources will be generated. "
"This URI must be a relative URI.",
None,
True))
self.set_property_def(ALTERNATE_SCHEMA_PROP,
prop.SimpleProp("Alternate Schema",
SnapString,
"This optional property can be used to specify an alternate schema "
"to explore. If this value is not specified, then the default schema "
"will be explored.",
None))
self.set_property_def(TABLE_NAME_PROP,
prop.SimpleProp("Table Name",
SnapString,
"This optional property can be used to specify a table name "
"to explore. If this value is not specified, then all tables in the "
"schema will be explored. The name may be optionally preceded by schema "
"qualifier, e.g., 'emp' or 'scott.emp'.",
None))
# Set the wizard attribute.
self.set_property_value(keys.SNAP_INTERNAL, {keys.PRIMARY_FUNCTION: 'wizard'})
self._create_resource_template_1_1()
self._add_component_choice()
def _create_resource_template_1_1(self):
"""Additional properties added in 1.1 release of this component."""
self.set_property_def(RESOURCE_EXISTS_PROP,
prop.SimpleProp("If Resource Exists",
SnapString,
"This property specifies how the wizard should respond when a URI at "
"which the wizard was going to create a Resource, already has a "
"Resource. Should it fail? Should it skip that resource? "
"Should it overwrite that resource?",
{"lov" : exists_lov},
True))
self.set_property_def(SNAPLOGIC_USER_PROP,
prop.SimpleProp("SnapLogic Username",
SnapString,
"The username (if any) to use while creating these DB resources. "
"If no credentials are specified and the DB resources are being created "
"in the same server as the wizard, then the credentials of the user "
"running the wizard will be used." ))
self.set_property_def(SNAPLOGIC_PASSWD_PROP,
prop.SimpleProp("SnapLogic Password",
SnapString,
"The SnapLogic Password. This is only needed if a username is specified.",
{keys.CONSTRAINT_OBFUSCATE : 0}))
# Set the wizard attribute.
self.set_property_value(RESOURCE_EXISTS_PROP, EXISTS_SKIP)
def _add_component_choice(self):
"""Add choice of Java and Python components."""
self.set_property_def(COMP_TYPE_PROP,
prop.SimpleProp("Component Type",
SnapString,
"Specify whether the generated resources should be based on Java or "
"Python DB components. Please note that you need the PE version of the "
"product to use Java DB components",
{ "lov" : [ "Python", "Java" ] },
True))
# We default to Python.
self.set_property_value(COMP_TYPE_PROP, "Python")
def validate(self, err_obj):
"""Nothing much to do here since all properties are defined as required and will be validated by the server."""
alt_schema = self.get_property_value(ALTERNATE_SCHEMA_PROP)
if ((alt_schema is not None) and (not has_param(alt_schema))):
table_name = self.get_property_value(TABLE_NAME_PROP)
if ((table_name is not None) and (not has_param(table_name))):
s = table_name.split('.')
if len(s) > 1:
if s[0].lower() != alt_schema.lower():
err_obj.get_property_err(ALTERNATE_SCHEMA_PROP).set_message(
"The alternate schema specified '%s' differs from the schema specified in table name '%s'"
% (alt_schema, s[0]))
comp_type = self.get_property_value(COMP_TYPE_PROP)
if comp_type == "Java":
if not version_info.is_pe():
err_obj.get_property_err(COMP_TYPE_PROP).set_message(
"Component Type 'Java' is not available in this edition of SnapLogic Server, please select "
"'Python' instead. If you would like to upgrade to Java based components, then please "
" contact SnnapLogic.")
def execute(self, input_views, output_views):
"""Creates the resources for tables in the DB."""
base_uri = self.get_property_value(BASE_URI_PROP)
base_uri = base_uri.strip()
parsed_uri = urlparse.urlparse(base_uri)
if parsed_uri.scheme:
self._srv_uri = urlparse.urlunparse(parsed_uri[0], parsed_uri[1], "", "", "", "")
else:
self._srv_uri = None
if has_param(base_uri):
raise SnapComponentError("Property '%s' needs a parameter value at runtime" % BASE_URI_PROP)
if not base_uri.startswith('/'):
raise SnapComponentError("The value of property '%s' must begin with a '/'" % BASE_URI_PROP)
self.process_creds()
self.if_exists = self.get_property_value(RESOURCE_EXISTS_PROP)
connect_resdef = self.get_resource_ref("DBConnect")
if not connect_resdef:
connect_uri = self.get_property_value('DBConnect')
connect_resdef = self.get_resource_object(connect_uri, self.creds)
conn = DBUtils.get_connection_from_resdef(connect_resdef)
alt_schema = self.get_property_value(ALTERNATE_SCHEMA_PROP)
table_name = self.get_property_value(TABLE_NAME_PROP)
comp_type = self.get_property_value(COMP_TYPE_PROP)
if comp_type == "Java":
comp_prefix = "org."
else:
comp_prefix = ""
if table_name is not None:
# Both an alternate schema name and table name have been specified.
s = table_name.split('.')
if len(s) > 1:
if alt_schema is not None:
# For some reason the user has redundantly qualified table name with schema name.
if s[0].lower() != alt_schema.lower():
raise SnapComponentError(
"The alternate schema specified '%s' differs from the schema specified in table name '%s'"
% (alt_schema, s[0]))
else:
alt_schema = s[0]
# Strip off schema info from the table name. It will be added later.
table_name = s[1]
table_name_list = [table_name]
else:
table_name_list = conn.list_tables(alt_schema)
for table_name in table_name_list:
if alt_schema is not None:
t = "%s.%s" % (alt_schema, table_name)
else:
t = table_name
try:
table_view_def = conn.get_snap_view_metadata(t)
except SnapComponentError, err:
# This shouldn't just die, should it?
self.log(snap_log.LEVEL_WARN, "Error creating resources for table %s: %s" % (t, str(err)))
self.elog(err)
continue
self.create_resdef_for_table(base_uri, connect_uri, table_name, table_view_def, comp_prefix)
def create_resdef_for_table(self, base_uri, connect_uri, table_name, table_def, comp_prefix):
"""
Creates all the resdefs for the specified table name and table definition.
For each table, a read, insert, update, upsert, delete, lookup resource is created. The
resources are creatyed under the following URIs:
http://server:port/<base URI>/<table name>/read
http://server:port/<base URI>/<table name>/insert
http://server:port/<base URI>/<table name>/update
http://server:port/<base URI>/<table name>/upsert
http://server:port/<base URI>/<table name>/delete
http://server:port/<base URI>/<table name>/lookup
@param base_uri: The URI prefix under which the resources will be created.
@type base_uri: str
@param connect_uri: The connection URI that the resdefs creeated will refere to.
@type connect_uri: str
@param table_name: Name of the table for which resdefs need to be created.
@type table_name: str
@param table_def: Definition of the table for which resdefs need to be created. The
structure of this table is as follows:
{ 'fields' : (("f1", "string", "desc1"), ("f2", "string", "desc2"),...),
'primary_key' : ["f1", "f2", ...],
'schema' : None or schema name,
}
@type table_def: dict
@param comp_prefix: Prefix to be added to all component names.
@type comp_prefix: str
"""
if self.creds is None:
author = self.get_invoker_username()
else:
author = self.creds[0]
#
# First, create the reader resource
#
dbr_resdef = self.create_resource_object(comp_prefix + 'snaplogic.components.DBRead', self._srv_uri, self.creds)
primary_key = table_def.get('primary_key', [])
fields = table_def.get('fields')
schema_name = table_def.get('schema')
if schema_name is not None:
base_uri += '/' + schema_name
if fields is None or len(fields) == 0:
# Can't do much for a table without any columns that we can expose.
return
table_column_names = []
for f in fields:
e = validation_utils1.validate_field_name(f[0])
if e is not None:
self.log(snap_log.LEVEL_WARN, "Excluding table '%s' as its field name has invalid characters: %s" %
(table_name, e))
return
table_column_names.append(f[0])
dbr_resdef.add_record_output_view("output", fields, "Output view for table %s" % table_name)
dbr_resdef.set_property_value('DBConnect', connect_uri)
dbr_resdef.set_property_value('SQLStmt', 'SELECT %s FROM %s' % (", ".join(table_column_names), table_name))
dbr_resdef.set_general_info('description', 'Read resource for table %s.' % table_name)
if author is not None:
dbr_resdef.set_general_info('author', author)
val = dbr_resdef.validate()
if val is not None:
raise SnapComponentError("Failed to validate DBRead resource for table '%s': %s" %
(table_name, val))
self.save_resource(dbr_resdef, base_uri + "/%s/read" % table_name)
#
# Create the INSERT resource.
#
dbw_resdef = self.create_resource_object(comp_prefix + 'snaplogic.components.DBWrite', self._srv_uri, self.creds)
dbw_resdef.set_property_value('DBConnect', connect_uri)
dbw_resdef.set_property_value('TableName', table_name)
dbw_resdef.set_property_value('WhereClause', '')
dbw_resdef.set_property_value('CommitMethod', 'auto')
dbw_resdef.add_record_input_view('input', fields, '')
dbw_resdef.set_property_value('QueryCommand', 'insert')
dbw_resdef.set_general_info('description', 'Insert resource for table %s.' % table_name)
if author is not None:
dbw_resdef.set_general_info('author', author)
val = dbw_resdef.validate()
if val is not None:
raise SnapComponentError("Failed to validate DBWrite resource for table '%s': %s" %
(table_name, val))
self.save_resource(dbw_resdef, base_uri + "/%s/insert" % table_name)
#
# Create the UPDATE resource.
#
dbw_resdef.set_property_value('QueryCommand', 'update')
if len(primary_key) > 0:
where_fields = primary_key
else:
where_fields = [table_column_names[0]]
where_clause = self.generate_where_clause(where_fields)
dbw_resdef.set_property_value("WhereClause", where_clause)
dbw_resdef.set_general_info('description', 'Update resource for table %s.' % table_name)
val = dbw_resdef.validate()
if val is not None:
raise SnapComponentError("Failed to validate DBWrite resource for table '%s': %s" %
(table_name, val))
self.save_resource(dbw_resdef, base_uri + "/%s/update" % table_name)
#
# Create the DELETE resource.
#
dbw_resdef.set_property_value('QueryCommand', 'delete')
if len(primary_key) > 0:
# If primary key information is available, then we can provide a sample delete statement
# with the primary key information in the where clause. This seems like a safe default
# where clause to provide for a delete operation.
where_clause = self.generate_where_clause(primary_key)
dbw_resdef.set_property_value("WhereClause", where_clause)
else:
# Not going to take any chances. Set where clause to None
dbw_resdef.set_property_value("WhereClause", None)
dbw_resdef.set_general_info('description', 'Delete resource for table %s.' % table_name)
val = dbw_resdef.validate()
if val is not None:
raise SnapComponentError("Failed to validate DBWrite resource for table '%s': %s" %
(table_name, val))
self.save_resource(dbw_resdef, base_uri + "/%s/delete" % table_name)
#
# Create the UPSERT resource.
#
dbu_resdef = self.create_resource_object(comp_prefix + 'snaplogic.components.DBUpsert', self._srv_uri,
self.creds)
dbu_resdef.set_property_value('DBConnect', connect_uri)
dbu_resdef.set_property_value('TableName', table_name)
dbu_resdef.add_record_input_view('input', fields, '')
if len(primary_key) > 0:
# If primary key information is available, then we will
# use the columns comprising primary key as Keys.
dbu_resdef.set_property_value("Keys", primary_key)
else:
# Will use all fields as Keys
dbu_resdef.set_property_value("Keys", [field[0] for field in fields])
dbu_resdef.set_general_info('description', 'Upsert resource for table %s.' % table_name)
if author is not None:
dbu_resdef.set_general_info('author', author)
val = dbu_resdef.validate()
if val is not None:
raise SnapComponentError("Failed to validate DBUpsert resource for table '%s': %s" %
(table_name, val))
self.save_resource(dbu_resdef, base_uri + "/%s/upsert" % table_name)
#
# Create DBlookup resource.
#
dbl_resdef = self.create_resource_object(comp_prefix + 'snaplogic.components.DBLookup', self._srv_uri,
self.creds)
if len(primary_key) > 0:
input_field_names = primary_key
else:
input_field_names = [table_column_names[0]]
input_view_fields = []
for fname in input_field_names:
idx = table_column_names.index(fname)
input_view_fields.append(fields[idx])
dbl_resdef.add_record_input_view("input", input_view_fields, "input")
dbl_resdef.add_record_output_view("output", fields, "output")
dbl_resdef.set_property_value('DBConnect', connect_uri)
where_clause = self.generate_where_clause(input_field_names)
dbl_resdef.set_property_value('SQLStmt', 'SELECT %s FROM %s WHERE %s' %
(", ".join(table_column_names), table_name, where_clause))
dbl_resdef.set_property_value('IgnoreMultipleRows', False)
dbl_resdef.set_general_info('description', 'DBLookup resource for table %s.' % table_name)
if author is not None:
dbl_resdef.set_general_info('author', author)
val = dbl_resdef.validate()
if val is not None:
raise SnapComponentError("Failed to validate DBLookup resource for table '%s': %s" %
(table_name, val))
self.save_resource(dbl_resdef, base_uri + "/%s/lookup" % table_name)
def save_resource(self, resdef, uri):
"""
Save the resource at the specified URI.
This method follows the response specified by the caller when a resource already exists. The
possible responses are:
Skip the resource.
Fail.
Overwrite the resource.
@param resdef: The resdef to be saved.
@type resdef: L{ResDef}
@param uri: The URI at which to save the resource.
@type uri: str
"""
if self.if_exists == EXISTS_OVERWRITE:
# Just save and return.
resdef.save(uri, True)
elif self.if_exists == EXISTS_FAIL:
# Fail if the resource exists.
resdef.save(uri)
else:
# We need to skip the resource if it already exists.
try:
resdef = self.get_resource_object(uri, self.creds)
except SnapiHttpException, e:
if e.status == HttpRequest.NOT_FOUND:
resdef.save(uri)
else:
# Some other error, need to report it.
raise
def process_creds(self):
"""Read a cred information (if any) from the properties and set it as object attribute."""
self.creds = None
username = self.get_property_value(SNAPLOGIC_USER_PROP)
if username is not None:
password = self.get_property_value(SNAPLOGIC_PASSWD_PROP)
self.creds = (username, password)
def generate_where_clause(self, input_field_names):
"""
Creates a where clause that matches record fields with a list of correspondingly named input view fields.
@param input_field_names: Input field names that are to be used in the where clause.
@type input_field_names: list
"""
where_clause_list = []
for fname in input_field_names:
where_clause_list.append('%s = ${%s}' % (fname, fname))
where_clause = " and ".join(where_clause_list)
return where_clause
def upgrade_1_0_to_1_1(self):
"""
Change 1.
Add SnapLogic credentials and resource exists properties.
"""
self._create_resource_template_1_1()
def upgrade_1_1_to_1_2(self):
# TODO this class should really probably
# inherit from DBComponent
connect_value = self.get_property_value('DBConnect')
self.del_property_def("DBConnect")
self.set_property_def("DBConnect",
prop.SimpleProp("Connection Resource",
SnapString,
"The URI of a Connection Resource which contains the properties "
"needed to create a DB connection. Can be overridden by resource "
"reference in a pipeline.",
None,
True))
self.set_property_value("DBConnect", connect_value)
self.add_resource_ref_def("DBConnect", "Connection Resource", ["connection.db"], required=False)
self.set_resource_ref('DBConnect',None)
self._add_component_choice()
def upgrade_1_2_to_1_3(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
|