# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008 - 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: JoinOuter.py 10330 2009-12-24 22:13:38Z grisha $
"""
Join Module and Resource Definition
"""
import tempfile
import os
from decimal import Decimal
from datetime import datetime,timedelta
import time
from sqlite3 import dbapi2
from snaplogic.components.computils import parse_date_with_fractions
import snaplogic.components as components
from snaplogic.common import snap_log,sqlite_iter
from snaplogic.common import version_info
from snaplogic.cc import component_api
from snaplogic.cc.component_api import ComponentAPI
from snaplogic.cc import prop
from snaplogic.snapi_base import keys
from snaplogic.common.snap_exceptions import *
from snaplogic.common.data_types import SnapNumber,SnapString,SnapDateTime
# Join Expression property labels
JOIN_FIELD_1 = "Join field 1"
JOIN_FIELD_2 = "Join field 2"
JOIN_EXPR = "Join expression"
JOIN_EXPRS = "Join expressions"
# Alias property labels
JOIN_FQN_FIELD = "Fully qualified field"
JOIN_OUTPUT_FIELD = "Output field"
JOIN_ALIAS = "Alias"
JOIN_ALIASES = "Aliases"
# Outer Join Types
JOIN_OUTER_LEFT = "left"
JOIN_OUTER_RIGHT = "right"
JOIN_OUTER_FULL = "full"
# Default date mask for coercion attempts
DATE_FORMAT_STRING = "%Y-%m-%d %H:%M:%S"
class JoinOuter(ComponentAPI):
"""
OuterJoin.
"""
api_version = '1.0'
component_version = '1.3'
capabilities = {
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 2,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : 2,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : 1,
}
component_description = "Outer Join. An Outer Join returns all rows which satisfies the join condition plus those rows from the left input, right input or both inputs which did not satisfy the join condition."
component_label = "Join - Outer"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/JoinOuter" % \
version_info.doc_uri_version
def create_resource_template(self):
"""
Create Outer Join resource template.
"""
self.set_property_def("jointype",
prop.SimpleProp("Join type",
SnapString,
"Outer Join Type",
{"lov" : [JOIN_OUTER_LEFT, JOIN_OUTER_RIGHT, JOIN_OUTER_FULL]},
True))
self.set_property_def("leftinput",
prop.SimpleProp("Left input view",
SnapString,
"Name of the Input View to be considered the left input",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_VIEW] }))
join_field_1 = prop.SimpleProp(JOIN_FIELD_1, SnapString, "Field to join",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] }, required=True)
join_field_2 = prop.SimpleProp(JOIN_FIELD_2, SnapString, "Field to join with",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] }, required=True)
join_expr = prop.DictProp(JOIN_EXPR, join_field_1, "Join expression", 2, required=True)
join_expr[JOIN_FIELD_1] = join_field_1
join_expr[JOIN_FIELD_2] = join_field_2
join_exprs = prop.ListProp("Join expressions", join_expr, min_size = 1, required=True)
self.set_property_def(JOIN_EXPRS, join_exprs)
join_fqn_field = prop.SimpleProp(JOIN_FQN_FIELD, SnapString, "Fully-qualified input field",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] },
required=True)
join_output_field = prop.SimpleProp(JOIN_OUTPUT_FIELD, SnapString, "Corresponding output field",
{'lov': [ keys.CONSTRAINT_LOV_OUTPUT_FIELD] },
required=True)
join_alias = prop.DictProp(JOIN_ALIAS, join_field_1, "Alias", 2)
join_alias[JOIN_FQN_FIELD] = join_fqn_field
join_alias[JOIN_OUTPUT_FIELD] = join_output_field
join_aliases = prop.ListProp("Aliases", join_alias)
self.set_property_def(JOIN_ALIASES, join_aliases)
# Set the default to be left outer join
self.set_property_value("jointype", JOIN_OUTER_LEFT)
def _has_field(self, view, field_name):
"""
Given a view definition, return True if field_name is a valid field.
"""
for field_def in view[keys.VIEW_FIELDS]:
if field_def[keys.FIELD_NAME] == field_name:
return True
return False
def _validate_input_field_ref(self, field_ref, in_view_names, err_idx, prefix):
"""
Validate the correctness of a join expression input view reference.
"""
# We expect something in the form "view.field"
field_split = field_ref.split('.', 1)
if len(field_split) != 2:
err_idx.set_message("%s field '%s' must be of the form 'input_view_name.field_name'." % (prefix, field_ref))
return False
elif field_split[0] not in in_view_names:
err_idx.set_message("%s field '%s' references non-existent input view." % (prefix, field_ref))
return False
elif not self._has_field(self.get_input_view_def(field_split[0]), field_split[1]):
err_idx.set_message("%s field '%s' references non-existent input view field." % (prefix, field_ref))
return False
else:
return True
def _validate_output_field_ref(self, field_ref, output_view_name, err_idx, prefix):
"""
Validate the structure of a join alias output view field reference.
"""
# We expect something in the form "field"
if not self._has_field(self.get_output_view_def(output_view_name), field_ref):
err_idx.set_message("%s field '%s' references non-existent output view field." % (prefix, field_ref))
return False
else:
return True
def validate(self, err_obj):
"""
Validate the Join resource for correctness.
"""
res_ok = True
input_view_names = self.list_input_view_names()
output_views = self.list_output_view_names()
output_view_name = output_views[keys.SINGLE_VIEW]
# 1. Check that each join expression is valid
join_specs = self.get_property_value(JOIN_EXPRS)
for i, join_expr in enumerate(join_specs):
# Skip validating if parameter present
# Gee, should we even allow parameters here?
if not component_api.has_param(join_expr[JOIN_FIELD_1]):
err_idx = err_obj.get_property_err(JOIN_EXPRS)[i][JOIN_FIELD_1]
self._validate_input_field_ref(join_expr[JOIN_FIELD_1], input_view_names, err_idx, 'Join expression')
if not component_api.has_param(join_expr[JOIN_FIELD_2]):
err_idx = err_obj.get_property_err(JOIN_EXPRS)[i][JOIN_FIELD_2]
self._validate_input_field_ref(join_expr[JOIN_FIELD_2], input_view_names, err_idx, 'Join expression')
# 2. Check that each alias is valid with no duplicates
alias_output_field_names = []
# Since the alias specs are not required we must not presume any are present.
alias_specs = self.get_property_value(JOIN_ALIASES)
if not alias_specs:
alias_specs = []
for i, alias in enumerate(alias_specs):
alias_fqn = alias[JOIN_FQN_FIELD]
alias_output_field_name = alias[JOIN_OUTPUT_FIELD]
# Again, parameters make life difficult here...
if not component_api.has_param(alias_fqn):
err_idx = err_obj.get_property_err(JOIN_ALIASES)[i][JOIN_FQN_FIELD]
if not self._validate_input_field_ref(alias_fqn, input_view_names, err_idx, 'Alias'):
res_ok = False
if not component_api.has_param(alias_output_field_name):
err_idx = err_obj.get_property_err(JOIN_ALIASES)[i][JOIN_OUTPUT_FIELD]
if not self._validate_output_field_ref(alias_output_field_name, output_view_name, err_idx, 'Alias'):
res_ok = False
if alias_output_field_name in alias_output_field_names:
res_ok = False
err_idx.set_message("Duplicate alias output field name '%s'." % alias_output_field_name)
else:
alias_output_field_names.append(alias_output_field_name)
# 3. Check that the output view definition is valid.
# We could throw false positives here if the user did wild things
# with parameters...
output_view = self.get_output_view_def(output_view_name)
output_view_field_names = [ d[keys.FIELD_NAME] for d in output_view[keys.VIEW_FIELDS] ]
# Subtract the set of alias fields
remainder = [field for field in output_view_field_names if field not in alias_output_field_names]
# Now check that each remaining field exists only once in any of the input views
# If we don't find it we can't output it.
# If we find it more than once then we don't know which one was intended and we require an alias.
for field in remainder:
view_list = []
for input_view in input_view_names:
if self._has_field(self.get_input_view_def(input_view), field):
view_list.append(input_view)
ref_count = len(view_list)
if ref_count == 0:
res_ok = False
err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][output_view_field_names.index(field)].set_message(
"Output view field '%s' is neither an input view field name nor alias output field name." % field)
elif ref_count > 1:
res_ok = False
view_list.sort()
found_in_views = ', '.join(view_list)
err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][output_view_field_names.index(field)].set_message(
"Output view field '%s' is ambiguous (found in views %s) and requires an alias." % (field, found_in_views))
# We can't perform the remaining checks until the user has fixed the issues.
if not res_ok:
return
# 4. Now chase the output field types to make sure they match.
output_view_fields = output_view[keys.VIEW_FIELDS]
for i, output_field in enumerate(output_view_fields):
output_field_name = output_field[keys.FIELD_NAME]
output_field_type = output_field[keys.FIELD_TYPE]
# Follow the output field through any alias.
alias_found = False
for alias in alias_specs:
if output_field_name == alias[JOIN_OUTPUT_FIELD]:
input_alias = alias[JOIN_FQN_FIELD].split('.')
input_view_name = input_alias[0]
input_field_name = input_alias[1]
alias_found = True
break
# If we didn't find it in the alias list, we have to hunt for it in the input views
if not alias_found:
for input_view_name in input_view_names:
if self._has_field(self.get_input_view_def(input_view_name), output_field_name):
input_field_name = output_field_name
break
# We should have an input_view_name now
input_view = self.get_input_view_def(input_view_name)
for input_field in input_view[keys.VIEW_FIELDS]:
if input_field_name == input_field[keys.FIELD_NAME]:
if output_field_type != input_field[keys.FIELD_TYPE]:
err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][i].set_message(
"Output view field '%s' type '%s' does not match corresponding input view '%s' field '%s' type '%s'." \
% (output_field_name, output_field_type, input_view_name, input_field[keys.FIELD_NAME],
input_field[keys.FIELD_TYPE]))
# 5. If the join is left or right, make sure they specify which is the left table.
jointype = self.get_property_value("jointype")
if component_api.has_param(jointype):
# Can't do much if the value a parameter. Should probably disallow parameterizing this prop.
pass
elif jointype == JOIN_OUTER_LEFT or jointype == JOIN_OUTER_RIGHT:
if not self.get_property_value("leftinput"):
# For a left or right outer join the left input view must be specified.
# For a full outer join it's not needed.
# Note that the generic validation code in the CC validates the input view name.
# However because it's an optional property here we have to validate that it's provided
# when join type is left or right.
err_obj.get_property_err("leftinput").set_message(
"Left input view property must be specified for a left outer or right outer join.")
def _process_record(self, data, input_view, list_of_active_input_views):
"""
Processes a record from incoming views. The order doesn't matter, it just stores records.
When all input views are exhausted, L{_send_output} is called to send the result of the join.
See L{ComponentAPI.process_input_views}.
"""
view_name = input_view.name
if data:
insert_stmt = self._insert_stmts[view_name]
vals = [data[field] for field in input_view.field_names]
self._cursor.execute(insert_stmt, vals)
else:
self._finished_views[view_name] = True
for is_done in self._finished_views.values():
if not is_done:
return
self._send_output()
def _send_output(self):
"""
Sends the output.
"""
# Sqlite wants outer join syntax like this:
# SELECT t1.c1, c2, c3 FROM t1
# LEFT OUTER JOIN t2 ON (t1.c1 = t2.c1);
#
# Note that it will ignore the outer join keywords if you use "WHERE <predicate>" in place
# of "ON (<predicate>)". Weird.
#
# We could support multiple tables for an outer join but then it gets a bit tricky.
# The sqlite parser wants to get it in the form of:
# SELECT t1.c1, c2, c3, c4 FROM t1
# LEFT OUTER JOIN t2 ON (<t1 t2 preds>)
# LEFT OUTER JOIN t3 ON (<t2 t3 preds>)
#
# Strangely enough, sqlite is quite happy to accept:
# SELECT t1.c1, c2, c3, c4 FROM t1
# LEFT OUTER JOIN t2 ON (<t1 t2 preds> and <t2 t3 preds>)
# LEFT OUTER JOIN t3 ON (<t1 t2 preds> and <t2 t3 preds>)
#
# This makes life easier for us so that we don't have to parse and sort the predicates
# and put them in the right places. But it does cause other problems. The component
# doesn't really know what the left table is. We don't have a concept of a "left view"
# and in fact the views have no order or position. So we have to make the user tell us
# which input view is the left table.
output_view = self._output_views.values()[keys.SINGLE_VIEW]
out_fields = ([field[keys.FIELD_NAME] for field in output_view.fields])
aliases = self.get_property_value(JOIN_ALIASES)
alias_dict = {}
for alias in aliases:
fqn_field = alias[JOIN_FQN_FIELD]
output_field = alias[JOIN_OUTPUT_FIELD]
# Need to quote all strings to avoid clashes with sqlite keywords
fqn_field_quotedl = ["\"" + elem + "\"" for elem in fqn_field.split('.')]
fqn_field_quoted = ".".join(fqn_field_quotedl)
alias_dict[output_field] = fqn_field_quoted
select_fields = []
for out_field in out_fields:
if alias_dict.has_key(out_field):
select_fields.append(alias_dict[out_field])
else:
select_fields.append("\"" + out_field + "\"")
jointype = self.get_property_value("jointype")
join_exprs = self.get_property_value(JOIN_EXPRS)
where_list = []
for join_expr in join_exprs:
# Need to quote all strings to avoid clashes with sqlite keywords
j1list = (join_expr[JOIN_FIELD_1]).split('.')
j1quoted = ["\"" + elem + "\"" for elem in j1list]
j1 = ".".join(j1quoted)
j2list = (join_expr[JOIN_FIELD_2]).split('.')
j2quoted = ["\"" + elem + "\"" for elem in j2list]
j2 = ".".join(j2quoted)
where_list.append(j2 + "=" + j1)
left_table = self.get_property_value("leftinput")
# make a list of the tables. We want "left" in slot 0.
from_list = ["\"" + expr + "\"" for expr in self._input_views.keys()]
self.log(snap_log.LEVEL_DEBUG, "from_list before: %s, %s" % (from_list[0], from_list[1]))
# The left table only matters when join type is "left" or "right".
# If it is "full", and user didn't specify, just pick one.
# Be careful with the quotes. It won't be quoted if it came from the get_property call.
if jointype == JOIN_OUTER_FULL and left_table is None:
left_table = from_list[0]
else:
left_table = "\"" + left_table + "\""
# Now put left_table in slot 0 if it isn't already.
if from_list[0] != left_table:
from_list[1] = from_list[0]
from_list[0] = left_table
self.log(snap_log.LEVEL_DEBUG, "from_list after: %s, %s" % (from_list[0], from_list[1]))
# To make a FOJ with sqlite we make a LOJ union ROJ.
# So we cruise this loop once as a left then again as a right. Bonafide LOJ's or ROJ's
# just drop out after one pass through.
select_stmt = ""
while True:
select_stmt += "SELECT "
select_stmt += ','.join(select_fields)
if jointype == JOIN_OUTER_LEFT or jointype == JOIN_OUTER_FULL:
select_stmt += " FROM " + from_list[0] + " LEFT OUTER JOIN " + from_list[1]
elif jointype == JOIN_OUTER_RIGHT:
select_stmt += " FROM " + from_list[1] + " LEFT OUTER JOIN " + from_list[0]
else:
# wtf? Shouldn't be possible. Assert!
raise SnapComponentError("Unexpected jointype %s" % jointype)
select_stmt += " ON ( "
select_stmt += " AND ".join(where_list)
select_stmt += " )"
if jointype == JOIN_OUTER_FULL:
# Concatenate a ROJ to a LOJ with UNION
select_stmt += " UNION "
jointype = JOIN_OUTER_RIGHT
else:
break
self.log(snap_log.LEVEL_DEBUG, "sql: %s" % select_stmt)
self._cursor.execute(select_stmt)
output_view_fields = output_view.fields
types_list = [output_field[keys.FIELD_TYPE] for output_field in output_view_fields]
for row in sqlite_iter(self._cursor):
i = 0
out_rec = output_view.create_record()
for field in out_rec.field_names:
if row[i] is None:
out_rec[field] = row[i]
elif types_list[i] == SnapNumber:
out_rec[field] = Decimal(str(row[i]))
elif types_list[i] == SnapDateTime:
out_rec[field] = parse_date_with_fractions(row[i])
else:
out_rec[field] = row[i]
i += 1
output_view.write_record(out_rec)
output_view.completed()
def execute(self, input_views, output_views):
# So before we go too far, we need 1 output view connected
# and all the defined input views connected.
try:
output_view = output_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No output view connected.")
# We don't need a try/except here because we will probe delicately.
connected_input_views = input_views.keys()
defined_input_views = self.list_input_view_names()
for view in defined_input_views:
if view not in connected_input_views:
raise SnapComponentError("Not all input views connected.")
try:
self._db_file = None
self._db_file_name = None
self._cursor = None
self._con = None
self._finished_views = {}
self._input_views = input_views
self._output_views = output_views
# Create a temp file to hold the SQLite database.
# Note that mkstemp opens the file as well, which we don't need,
# so close the temp file after it's been created.
(self._db_file, self._db_file_name) = tempfile.mkstemp(".db","snapjoin")
os.close(self._db_file)
self._con = sqlite.connect(self._db_file_name)
self._cursor = self._con.cursor()
sqlite.register_adapter(Decimal, float)
self._fqn_fields = []
view_process_dict = {}
self._insert_stmts = {}
for input_view_name in self._input_views.keys():
input_view = self._input_views[input_view_name]
view_process_dict[input_view] = self._process_record
input_view_fields = input_view.fields
self._finished_views[input_view_name] = False
field_clause = ",".join(["\""+field[keys.FIELD_NAME]+"\"" for field in input_view.fields])
value_clause = ",".join(['?' for i in input_view.fields])
self._insert_stmts[input_view_name] = "INSERT INTO %s (%s) VALUES (%s)" % ("\""+input_view_name+"\"", field_clause, value_clause)
stmt = "CREATE TABLE \"%s\" (" % input_view_name
first = True
for input_field in input_view_fields:
if not first:
stmt += ", "
else:
first = False
field = input_field[keys.FIELD_NAME]
field_type = input_field[keys.FIELD_TYPE]
stmt += "\"" + field + "\"" + " "
self._fqn_fields.append(input_view_name + "." + field)
if field_type == SnapString:
stmt += "TEXT"
elif field_type == SnapNumber:
stmt += "DECIMAL"
elif field_type == SnapDateTime:
stmt += 'DATETIME'
else:
raise SnapObjTypeError('Unknown type %s', field_type)
stmt += ")"
self._cursor.execute(stmt)
self.process_input_views(view_process_dict)
finally:
self._cleanup()
def _cleanup(self):
"""
Clean up resources...
"""
if self._cursor:
try:
self._cursor.close()
except:
pass
if self._con:
try:
self._con.close()
except:
pass
if self._db_file_name:
try:
os.remove(self._db_file_name)
except:
pass
def upgrade_1_0_to_1_1(self):
"""
Add LOV constraints
"""
# Save property values.
# We need to recreate the properties, which resets their values.
leftinput_specs_value = self.get_property_value("leftinput")
join_specs_value = self.get_property_value(JOIN_EXPRS)
alias_specs_value = self.get_property_value(JOIN_ALIASES)
# Delete the properties before recreating them
self.del_property_def(JOIN_EXPRS)
self.del_property_def(JOIN_ALIASES)
# Redefine properties to include the constraint.
# This is copy and paste code from create_resource_template for version 1.0.
# The reason we cannot call create_resource_template directly here is
# that in a future version of this component the body of create_resource_template
# may change, so the invocation will no longer be correct.
# So we simply copy the code from version 1.0 create_resource_template here.
self.set_property_def("leftinput",
prop.SimpleProp("Left input view",
SnapString,
"Name of the Input View to be considered the left input",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_VIEW] }))
join_field_1 = prop.SimpleProp(JOIN_FIELD_1, SnapString, "Field to join",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] }, required=True)
join_field_2 = prop.SimpleProp(JOIN_FIELD_2, SnapString, "Field to join with",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] }, required=True)
join_expr = prop.DictProp(JOIN_EXPR, join_field_1, "Join expression", 2, required=True)
join_expr[JOIN_FIELD_1] = join_field_1
join_expr[JOIN_FIELD_2] = join_field_2
join_exprs = prop.ListProp("Join expressions", join_expr, required=True)
self.set_property_def(JOIN_EXPRS, join_exprs)
join_fqn_field = prop.SimpleProp(JOIN_FQN_FIELD, SnapString, "Fully-qualified input field",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] })
join_output_field = prop.SimpleProp(JOIN_OUTPUT_FIELD, SnapString, "Corresponding output field",
{'lov': [ keys.CONSTRAINT_LOV_OUTPUT_FIELD] },)
join_alias = prop.DictProp(JOIN_ALIAS, join_field_1, "Alias", 2, required=True)
join_alias[JOIN_FQN_FIELD] = join_fqn_field
join_alias[JOIN_OUTPUT_FIELD] = join_output_field
join_aliases = prop.ListProp("Aliases", join_alias)
self.set_property_def(JOIN_ALIASES, join_aliases)
# End copy and paste (create_resource_template for version 1.0)
# Restore the value
self.set_property_value("leftinput", leftinput_specs_value)
self.set_property_value(JOIN_EXPRS, join_specs_value)
self.set_property_value(JOIN_ALIASES, alias_specs_value)
def upgrade_1_1_to_1_2(self):
""" Add size constraint on join expressions """
saved_join_exprs = self.get_property_value(JOIN_EXPRS)
# Recreate join expressions property
join_field_1 = prop.SimpleProp(JOIN_FIELD_1, SnapString, "Field to join",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] },
required=True)
join_field_2 = prop.SimpleProp(JOIN_FIELD_2, SnapString, "Field to join with",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] },
required=True)
join_expr = prop.DictProp(JOIN_EXPR, join_field_1, "Join expression", 2, required=True)
join_expr[JOIN_FIELD_1] = join_field_1
join_expr[JOIN_FIELD_2] = join_field_2
join_exprs = prop.ListProp("Join expressions", join_expr, min_size = 1, required=True)
self.set_property_def(JOIN_EXPRS, join_exprs)
# Restore join expressions value
self.set_property_value(JOIN_EXPRS, saved_join_exprs)
def upgrade_1_2_to_1_3(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
|