# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008 - 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: DBComponent.py 10183 2009-12-16 23:58:55Z grisha $
"""
Contains definition of superclass for DB* components.
"""
from urlparse import urlparse
import re
from snaplogic.common.data_types import SnapString
from snaplogic.cc import component_api
from snaplogic.cc.component_api import ComponentAPI
from snaplogic.snapi_base import keys
import snaplogic.cc.prop as prop
from snaplogic.components import DBUtils
from snaplogic.common import prop_err
from snaplogic.common.snap_exceptions import SnapComponentError
COMMIT_METHOD_AUTO = 'auto'
"""Auto-commit - commit is called for each record operation."""
COMMIT_METHOD_WHEN_DONE = 'whendone'
"""Commit is done in the end when all records have been processed."""
COMMIT_METHOD_BY_PIPELINE = 'bypipeline'
"""Commit is done by the Pipeline -- currently not supported"""
COMMIT_METHODS = [ COMMIT_METHOD_AUTO, COMMIT_METHOD_WHEN_DONE]
class DBComponent(ComponentAPI):
"""
This class is what would be in Java an "abstract" class, containing
functionality that is common for DB* components, but not a
Component itself.
"""
old_field_ref_re = re.compile('\$[(]input[.]([\w]+)[)]')
"""Regexp specifying reference to a view field in an old style. This is only used for upgrades."""
def _common_db_validate(self, err_obj):
"""
Called from subclasses to validate properties common to
DB* components (those created with L{_create_common_db_props()}
"""
# DBConnect: Just check that this looks like a URI that begins with http:// or /
val = self.get_property_value("DBConnect")
if not component_api.has_param(val):
v = urlparse(val)
if v.scheme and (v.scheme not in ('http', 'https')):
err_obj.get_property_err("DBConnect").set_message("DBConnect URI '%s' has invalid scheme '%s'." %
(val, v.scheme))
if not v[2].startswith('/'):
err_obj.get_property_err("DBConnect").set_message("DBConnect URI '%s' is not a valid URI." % val)
def _create_common_db_props(self):
"""
To be called from L{ComponentAPI.create_resource_template}. This
will add property definitions for properties that are common across
all DB* components.
"""
self._add_db_con_ref()
self.set_property_def("DBConnect",
prop.SimpleProp("Connection Resource",
SnapString,
"The URI of a Connection Resource which contains the properties "
"needed to create a DB connection. Can be overridden by resource "
"reference in a pipeline.",
None,
True))
def _add_db_con_ref(self):
# Will make required in the next iteration
self.add_resource_ref_def("DBConnect", "Connection Resource", ["connection.db"], required=False)
def _upgrade_to_use_refs(self):
connect_value = self.get_property_value('DBConnect')
self.del_property_def("DBConnect")
self.set_property_def("DBConnect",
prop.SimpleProp("Connection Resource",
SnapString,
"The URI of a Connection Resource which contains the properties "
"needed to create a DB connection. Can be overridden by resource "
"reference in a pipeline.",
None,
True))
self.set_property_value("DBConnect", connect_value)
self._add_db_con_ref()
self.set_resource_ref('DBConnect',None)
def _get_props_to_check_for_suggest(self):
"""
To be implemented by subclasses.
@return: Properties to check (in addition to the common one
defined in L{_create_common_props}.
@rtype: sequence
@raise SnapComponentError: when subclasses don't implement it.
See also:
- L{_db_suggest_resource_values()}
- L{suggest_resource_values()}
- L{_create_common_db_props()}
"""
raise SnapComponentError("This method should be implemented by the author of %s" % self.__class__.__name__)
def _db_suggest_resource_values(self, err_obj, conn):
"""
To be implemented by subclasses. This will be used, in a
template method fashion, by L{suggest_resource_values()}
to
@param err_obj: The err_obj passed to L{suggest_resource_values()}
@param conn: Connection object.
@type conn: L{DBUtils.SnapDBAdapter}.
@raise: Whatever is needed. Any exception raised here will be
set as a message in err_obj.
"""
raise SnapComponentError("This method should be implemented by the author of %s" % self.__class__.__name__)
def _add_commit_method_prop(self):
"""
Adds CommitMethod property to the component.
"""
self.set_property_def('CommitMethod',
prop.SimpleProp("Commit method",
SnapString,
"Specifies how the rows will be commited. (auto | whendone)."
" Value auto will commit after each record."
" Value whendone will commit after the final record.",
{"lov": COMMIT_METHODS},
True))
self.set_property_value('CommitMethod', COMMIT_METHOD_AUTO)
def _upgrade_field_refs(self, prop_name):
"""
For use in upgrade, for upgrading old-style field references with new style ones.
@param prop_name: property to upgrade
"""
val = self.get_property_value(prop_name)
if val is None:
return None
def repl_func(match):
match = match.group(0)
match = match[len('$(input.'):-1]
match = "${%s}" % match
return match
new_val = DBComponent.old_field_ref_re.sub(repl_func, val)
self.set_property_value(prop_name, new_val)
def suggest_resource_values(self, err_obj):
"""
Using a template method pattern, this will perform the following
algorithm:
- Get properties to check via L{_get_props_to_check_for_suggest()}
method implemented by the subclasses
- Validate that these properties, together with the common ones
defined in L{_create_common_db_props()} are OK.
- Call _db_suggest_resource_values()
"""
prop_names = self._get_props_to_check_for_suggest()
prop_names.append('DBConnect')
new_err_obj = prop_err.ComponentResourceErr(self._resdef.dict)
errors = False
for prop_name in prop_names:
perr = new_err_obj.get_property_err(prop_name)
msg = perr.get_error()
if msg:
err_obj.get_property_err(prop_name).set_message(msg)
errors = True
if errors:
return
connect_uri = self.get_property_value("DBConnect")
connect_resdef = self.get_local_resource_object(connect_uri)
con_params = {}
con_param_names = connect_resdef.list_param_names()
for con_param_name in con_param_names:
pname = con_param_name[3:-1]
con_params[pname] = connect_resdef.get_param_default_value(con_param_name)
props_with_params = []
for con_prop in connect_resdef.list_property_names():
if con_prop == 'snap_internal' or con_prop == 'snap_general_info':
continue
con_prop_value = connect_resdef.get_property_value(con_prop)
# Try substituting default...
if component_api.has_param(con_prop_value):
con_prop_def = connect_resdef.get_property_def(con_prop)
con_prop_def = con_prop_def[keys.PROPERTY_DEFINITION]
con_prop_label = con_prop_def['label']
con_prop_def = prop.create_property_from_resdef(con_prop_def)
try:
new_value = prop.substitute_params_in_property(con_prop_value, con_params, con_prop_def)
except Exception, e:
err_obj.get_property_err('DBConnect').set_message('Error substituting parameters for DBConnect: %s' % e)
return
if component_api.has_param(new_value):
props_with_params.append(con_prop_label)
else:
connect_resdef.set_property_value(con_prop, new_value)
if props_with_params:
props_with_params.sort()
props_with_params = ', '.join(props_with_params)
err_obj.get_property_err('DBConnect').set_message('Could not substitute parameters for the following properties of DBConnect resources %s: %s' % (connect_uri, props_with_params))
return
v = connect_resdef.validate()
if v is not None:
err_obj.get_property_err('DBConnect').set_message('DBConnect resource %s has the following errors: %s' % (connect_uri, v))
return
conn = None
try:
conn = DBUtils.get_connection_from_resdef(connect_resdef)
self._db_suggest_resource_values(err_obj, conn)
except Exception, e:
self.elog(e)
err_obj.set_message('Error occurred: %s' % e)
finally:
try:
if conn:
conn.close()
except Exception, e:
self.elog(e)
|