# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008 - 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: Join.py 10330 2009-12-24 22:13:38Z grisha $
"""
Join Module and Resource Definition
"""
import tempfile
import os
from decimal import Decimal
from datetime import datetime,timedelta
import time
from sqlite3 import dbapi2
import snaplogic.components as components
from snaplogic.common import snap_log,sqlite_iter
from snaplogic.common import version_info
from snaplogic.cc import component_api
from snaplogic.cc.component_api import ComponentAPI
from snaplogic.cc import prop
from snaplogic.snapi_base import keys
from snaplogic.common.snap_exceptions import *
from snaplogic.common.data_types import SnapNumber,SnapString,SnapDateTime
from snaplogic.components.computils import parse_date_with_fractions
JOIN_FIELD_1 = "Join field 1"
JOIN_FIELD_2 = "Join field 2"
JOIN_EXPR = "Join expression"
JOIN_EXPRS = "Join expressions"
JOIN_FQN_FIELD = "Fully qualified field"
JOIN_OUTPUT_FIELD = "Output field"
JOIN_ALIAS = "Alias"
JOIN_ALIASES = "Aliases"
class Join(ComponentAPI):
"""
Join.
"""
api_version = '1.0'
component_version = '1.3'
capabilities = {
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 2,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : ComponentAPI.UNLIMITED_VIEWS,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : 1,
}
component_description = "Equijoin"
component_label = "Join"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/Join" % \
version_info.doc_uri_version
def _adapt_Decimal(self, dec):
return float(dec)
def create_resource_template(self):
"""
Create Join resource template.
"""
join_field_1 = prop.SimpleProp(JOIN_FIELD_1, SnapString, "Field to join",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] },
required=True)
join_field_2 = prop.SimpleProp(JOIN_FIELD_2, SnapString, "Field to join with",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] },
required=True)
join_expr = prop.DictProp(JOIN_EXPR, join_field_1, "Join expression", 2, required=True)
join_expr[JOIN_FIELD_1] = join_field_1
join_expr[JOIN_FIELD_2] = join_field_2
join_exprs = prop.ListProp("Join expressions", join_expr, min_size = 1, required=True)
self.set_property_def(JOIN_EXPRS, join_exprs)
join_fqn_field = prop.SimpleProp(JOIN_FQN_FIELD, SnapString, "Fully-qualified input field",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] }, required=True)
join_output_field = prop.SimpleProp(JOIN_OUTPUT_FIELD, SnapString, "Corresponding output field",
{'lov': [ keys.CONSTRAINT_LOV_OUTPUT_FIELD] }, required=True)
join_alias = prop.DictProp(JOIN_ALIAS, join_field_1, "Alias", 2, required=False)
join_alias[JOIN_FQN_FIELD] = join_fqn_field
join_alias[JOIN_OUTPUT_FIELD] = join_output_field
join_aliases = prop.ListProp("Aliases", join_alias)
self.set_property_def(JOIN_ALIASES, join_aliases)
def _has_field(self, view, field_name):
"""
Given a view definition, return True if field_name is a valid field.
"""
for field_def in view[keys.VIEW_FIELDS]:
if field_def[keys.FIELD_NAME] == field_name:
return True
return False
def validate(self, err_obj):
"""
Validate the Join resource for correctness.
"""
res_ok = True
input_view_names = self.list_input_view_names()
output_views = self.list_output_view_names()
output_view_name = output_views[keys.SINGLE_VIEW]
# Check that each alias is valid with no duplicates
alias_output_field_names = []
# Since the alias specs are not required we must not presume any are present.
alias_specs = self.get_property_value(JOIN_ALIASES)
if not alias_specs:
alias_specs = []
for i, alias in enumerate(alias_specs):
alias_fqn = alias[JOIN_FQN_FIELD]
alias_output_field_name = alias[JOIN_OUTPUT_FIELD]
# Again, parameters make life difficult here...
if not component_api.has_param(alias_output_field_name):
err_idx = err_obj.get_property_err(JOIN_ALIASES)[i][JOIN_OUTPUT_FIELD]
if alias_output_field_name in alias_output_field_names:
res_ok = False
err_idx.set_message("Duplicate alias output field name '%s'." % alias_output_field_name)
else:
alias_output_field_names.append(alias_output_field_name)
# Check that the output view definition is valid.
# We could throw false positives here if the user did wild things
# with parameters...
output_view = self.get_output_view_def(output_view_name)
output_view_field_names = [ d[keys.FIELD_NAME] for d in output_view[keys.VIEW_FIELDS] ]
# Subtract the set of alias fields
remainder = [field for field in output_view_field_names if field not in alias_output_field_names]
# Now check that each remaining field exists only once in any of the input views
# If we don't find it we can't output it.
# If we find it more than once then we don't know which one was intended and we require an alias.
for field in remainder:
view_list = []
for input_view in input_view_names:
if self._has_field(self.get_input_view_def(input_view), field):
view_list.append(input_view)
ref_count = len(view_list)
if ref_count == 0:
res_ok = False
err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][output_view_field_names.index(field)].set_message(
"Output view field '%s' is neither an input view field name nor alias output field name." % field)
elif ref_count > 1:
res_ok = False
view_list.sort()
found_in_views = ', '.join(view_list)
err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][output_view_field_names.index(field)].set_message(
"Output view field '%s' is ambiguous (found in views %s) and requires an alias." % (field, found_in_views))
# We can't perform the remaining checks until the user has fixed the issues.
if not res_ok:
return
# Now chase the output field types to make sure they match.
output_view_fields = output_view[keys.VIEW_FIELDS]
for i, output_field in enumerate(output_view_fields):
output_field_name = output_field[keys.FIELD_NAME]
output_field_type = output_field[keys.FIELD_TYPE]
# Follow the output field through any alias.
alias_found = False
for alias in alias_specs:
if output_field_name == alias[JOIN_OUTPUT_FIELD]:
input_alias = alias[JOIN_FQN_FIELD].split('.')
input_view_name = input_alias[0]
input_field_name = input_alias[1]
alias_found = True
break
# If we didn't find it in the alias list, we have to hunt for it in the input views
if not alias_found:
for input_view_name in input_view_names:
if self._has_field(self.get_input_view_def(input_view_name), output_field_name):
input_field_name = output_field_name
break
# We should have an input_view_name now
input_view = self.get_input_view_def(input_view_name)
for input_field in input_view[keys.VIEW_FIELDS]:
if input_field_name == input_field[keys.FIELD_NAME]:
if output_field_type != input_field[keys.FIELD_TYPE]:
err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][i].set_message(
"Output view field '%s' type '%s' does not match corresponding input view '%s' field '%s' type '%s'." \
% (output_field_name, output_field_type, input_view_name, input_field[keys.FIELD_NAME],
input_field[keys.FIELD_TYPE]))
def _process_record(self, data, input_view, list_of_active_input_views):
"""
Processes a record from incoming views. The order doesn't matter, it just stores records.
When all input views are exhausted, L{_send_output} is called to send the result of the join.
See L{ComponentAPI.process_input_views}.
"""
view_name = input_view.name
if data:
insert_stmt = self._insert_stmts[view_name]
vals = [data[field] for field in input_view.field_names]
self._cursor.execute(insert_stmt, vals)
else:
self._finished_views[view_name] = True
for is_done in self._finished_views.values():
if not is_done:
return
self._send_output()
def _send_output(self):
"""
Sends the output.
"""
output_view = self._output_views.values()[keys.SINGLE_VIEW]
out_fields = ([field[keys.FIELD_NAME] for field in output_view.fields])
aliases = self.get_property_value(JOIN_ALIASES)
alias_dict = {}
for alias in aliases:
fqn_field = alias[JOIN_FQN_FIELD]
output_field = alias[JOIN_OUTPUT_FIELD]
# Need to quote all strings to avoid clashes with sqlite keywords
fqn_field_quotedl = ["\"" + elem + "\"" for elem in fqn_field.split('.')]
fqn_field_quoted = ".".join(fqn_field_quotedl)
alias_dict[output_field] = fqn_field_quoted
select_fields = []
for out_field in out_fields:
if alias_dict.has_key(out_field):
select_fields.append(alias_dict[out_field])
else:
select_fields.append("\"" + out_field + "\"")
select_stmt = "SELECT "
select_stmt += ','.join(select_fields)
from_list = ["\"" + expr + "\"" for expr in self._input_views.keys()]
select_stmt += " FROM " + ','.join(from_list)
select_stmt += " WHERE "
join_exprs = self.get_property_value(JOIN_EXPRS)
where_list = []
for join_expr in join_exprs:
# Need to quote all strings to avoid clashes with sqlite keywords
j1list = (join_expr[JOIN_FIELD_1]).split('.')
j1quoted = ["\"" + elem + "\"" for elem in j1list]
j1 = ".".join(j1quoted)
j2list = (join_expr[JOIN_FIELD_2]).split('.')
j2quoted = ["\"" + elem + "\"" for elem in j2list]
j2 = ".".join(j2quoted)
where_list.append(j2 + "=" + j1)
select_stmt += " AND ".join(where_list)
self._cursor.execute(select_stmt)
output_view_fields = output_view.fields
types_list = [output_field[keys.FIELD_TYPE] for output_field in output_view_fields]
for row in sqlite_iter(self._cursor):
i = 0
out_rec = output_view.create_record()
for field in out_rec.field_names:
if row[i] is None:
out_rec[field] = row[i]
elif types_list[i] == SnapNumber:
out_rec[field] = Decimal(str(row[i]))
elif types_list[i] == SnapDateTime:
out_rec[field] = parse_date_with_fractions(row[i])
else:
out_rec[field] = row[i]
i += 1
output_view.write_record(out_rec)
output_view.completed()
def execute(self, input_views, output_views):
# So before we go too far, we need 1 output view connected
# and all the defined input views connected.
try:
output_view = output_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No output view connected.")
# We don't need a try/except here because we will probe delicately.
connected_input_views = input_views.keys()
defined_input_views = self.list_input_view_names()
for view in defined_input_views:
if view not in connected_input_views:
raise SnapComponentError("Not all input views connected.")
try:
self._db_file = None
self._db_file_name = None
self._cursor = None
self._con = None
self._finished_views = {}
self._input_views = input_views
self._output_views = output_views
# Create a temp file to hold the SQLite database.
# Note that mkstemp opens the file as well, which we don't need,
# so close the temp file after it's been created.
(self._db_file, self._db_file_name) = tempfile.mkstemp(".db","snapjoin")
os.close(self._db_file)
self._con = sqlite.connect(self._db_file_name)
self._cursor = self._con.cursor()
sqlite.register_adapter(Decimal, float)
self._fqn_fields = []
view_process_dict = {}
self._insert_stmts = {}
for input_view_name in self._input_views.keys():
input_view = self._input_views[input_view_name]
view_process_dict[input_view] = self._process_record
input_view_fields = input_view.fields
self._finished_views[input_view_name] = False
field_clause = ",".join(["\""+field[keys.FIELD_NAME]+"\"" for field in input_view.fields])
value_clause = ",".join(['?' for i in input_view.fields])
self._insert_stmts[input_view_name] = "INSERT INTO %s (%s) VALUES (%s)" % ("\""+input_view_name+"\"", field_clause, value_clause)
stmt = "CREATE TABLE \"%s\" (" % input_view_name
first = True
for input_field in input_view_fields:
if not first:
stmt += ", "
else:
first = False
field = input_field[keys.FIELD_NAME]
field_type = input_field[keys.FIELD_TYPE]
stmt += "\"" + field + "\"" + " "
self._fqn_fields.append(input_view_name + "." + field)
if field_type == SnapString:
stmt += "TEXT"
elif field_type == SnapNumber:
stmt += "DECIMAL"
elif field_type == SnapDateTime:
stmt += 'DATETIME'
else:
raise SnapObjTypeError('Unknown type %s', field_type)
stmt += ")"
self._cursor.execute(stmt)
self.process_input_views(view_process_dict)
finally:
self._cleanup()
def _cleanup(self):
"""
Clean up resources...
"""
if self._cursor:
try:
self._cursor.close()
except:
pass
if self._con:
try:
self._con.close()
except:
pass
if self._db_file_name:
try:
os.remove(self._db_file_name)
except:
pass
def upgrade_1_0_to_1_1(self):
"""
Add LOV constraints
"""
# Save property values.
# We need to recreate the properties, which resets their values.
join_specs_value = self.get_property_value(JOIN_EXPRS)
alias_specs_value = self.get_property_value(JOIN_ALIASES)
# Delete the properties before recreating them
self.del_property_def(JOIN_EXPRS)
self.del_property_def(JOIN_ALIASES)
# Redefine properties to include the constraint.
# This is copy and paste code from create_resource_template for version 1.0.
# The reason we cannot call create_resource_template directly here is
# that in a future version of this component the body of create_resource_template
# may change, so the invocation will no longer be correct.
# So we simply copy the code from version 1.0 create_resource_template here.
join_field_1 = prop.SimpleProp(JOIN_FIELD_1, SnapString, "Field to join",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] },
required=True)
join_field_2 = prop.SimpleProp(JOIN_FIELD_2, SnapString, "Field to join with",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] },
required=True)
join_expr = prop.DictProp(JOIN_EXPR, join_field_1, "Join expression", 2, required=True)
join_expr[JOIN_FIELD_1] = join_field_1
join_expr[JOIN_FIELD_2] = join_field_2
join_exprs = prop.ListProp("Join expressions", join_expr, required=True)
self.set_property_def(JOIN_EXPRS, join_exprs)
join_fqn_field = prop.SimpleProp(JOIN_FQN_FIELD, SnapString, "Fully-qualified input field",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] })
join_output_field = prop.SimpleProp(JOIN_OUTPUT_FIELD, SnapString, "Corresponding output field",
{'lov': [ keys.CONSTRAINT_LOV_OUTPUT_FIELD] })
join_alias = prop.DictProp(JOIN_ALIAS, join_field_1, "Alias", 2, required=True)
join_alias[JOIN_FQN_FIELD] = join_fqn_field
join_alias[JOIN_OUTPUT_FIELD] = join_output_field
join_aliases = prop.ListProp("Aliases", join_alias)
self.set_property_def(JOIN_ALIASES, join_aliases)
# End copy and paste (create_resource_template for version 1.0)
# Restore the value
self.set_property_value(JOIN_EXPRS, join_specs_value)
self.set_property_value(JOIN_ALIASES, alias_specs_value)
def upgrade_1_1_to_1_2(self):
""" Add size constraint on join expressions """
saved_join_exprs = self.get_property_value(JOIN_EXPRS)
# Recreate join expressions property
join_field_1 = prop.SimpleProp(JOIN_FIELD_1, SnapString, "Field to join",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] },
required=True)
join_field_2 = prop.SimpleProp(JOIN_FIELD_2, SnapString, "Field to join with",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] },
required=True)
join_expr = prop.DictProp(JOIN_EXPR, join_field_1, "Join expression", 2, required=True)
join_expr[JOIN_FIELD_1] = join_field_1
join_expr[JOIN_FIELD_2] = join_field_2
join_exprs = prop.ListProp("Join expressions", join_expr, min_size = 1, required=True)
self.set_property_def(JOIN_EXPRS, join_exprs)
# Restore join expressions value
self.set_property_value(JOIN_EXPRS, saved_join_exprs)
def upgrade_1_2_to_1_3(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
|