# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: Filter.py 10330 2009-12-24 22:13:38Z grisha $
"""
Filter Module and Resource Definition
"""
from snaplogic.common.snap_exceptions import SnapObjTypeError
from decimal import Decimal
from datetime import datetime
import time
import snaplogic.components as components
from snaplogic.common import snap_log
from snaplogic.components import computils
from snaplogic.common.data_types import SnapString,SnapNumber,SnapDateTime
from snaplogic.common import version_info
from snaplogic.cc import component_api
from snaplogic.cc.component_api import ComponentAPI
from snaplogic.cc import prop
from snaplogic.snapi_base import keys
from snaplogic.common.snap_exceptions import SnapComponentError
class Filter(ComponentAPI):
"""
Filter.
"""
api_version = '1.0'
component_version = '1.3'
capabilities = {
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : 1,
ComponentAPI.CAPABILITY_ALLOW_PASS_THROUGH : True
}
component_description = "Filter"
component_label = "Filter"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/Filter" % \
version_info.doc_uri_version
filter_operators = ['==','<','>','>=','<=','>=','<>','!=']
filter_value_type = SnapString
def create_resource_template(self):
"""
Create Filter resource template.
"""
self.set_property_def("Field", prop.SimpleProp("Field", SnapString, "Field to filter on",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] } ,
True))
self.set_property_def("Comparison", prop.SimpleProp("Comparison", SnapString, "Comparison operator",
{"lov": self.filter_operators}, True))
self.set_property_def("Value", prop.SimpleProp("Value", self.filter_value_type, "Value", None, True))
# The bypass property does not require a value.
self.set_property_def("Bypass", prop.SimpleProp("Wildcard for bypass", SnapString,
"String that indicates that Filter should be bypassed", None, False))
def validate(self, err_obj):
"""Validate that Field/Comparison/Value have been specified."""
filter_field = self.get_property_value("Field")
if component_api.has_param(filter_field):
# Can't do much if the value a parameter
pass
else:
# Validate that Field is one of our inputview field names.
# We are guaranteed that the input view is present before the component validate method
# is invoked.
filter_field = filter_field.strip()
input_views = self.list_input_view_names()
input_view = self.get_input_view_def(input_views[keys.SINGLE_VIEW])
input_view_fields = [ d[keys.FIELD_NAME] for d in input_view[keys.VIEW_FIELDS] ]
input_view_field_types = [ d[keys.FIELD_TYPE] for d in input_view[keys.VIEW_FIELDS] ]
# If Value isn't a parameter and the Field is a datetime type,
# then make sure that Value is one of our supported datetime formats.
filter_value = self.get_property_value("Value")
if not component_api.has_param(filter_value) and filter_field in input_view_fields:
try:
index = input_view_fields.index(filter_field)
type = input_view_field_types[index]
computils.convert_to_field_type(type, filter_value)
except Exception, e:
err_obj.get_property_err("Value").set_message(e.message)
# Make sure that the output view matches the input view. (at least the types)
input_views = self.list_input_view_names()
input_view_name = input_views[keys.SINGLE_VIEW]
input_view = self.get_input_view_def(input_view_name)
input_view_fields = input_view[keys.VIEW_FIELDS]
output_views = self.list_output_view_names()
output_view_name = output_views[keys.SINGLE_VIEW]
output_view = self.get_output_view_def(output_view_name)
# Field count matches?
if len(output_view[keys.VIEW_FIELDS]) != len(input_view_fields):
err_obj.get_output_view_err()[output_views[keys.SINGLE_VIEW]].set_message(
"Output view '%s' field count '%d' does not match corresponding input view '%s' field count '%d'." \
% (output_view_name, len(output_view[keys.VIEW_FIELDS]),
input_view_name, len(input_view_fields)))
else:
# Field types match?
for i, output_field in enumerate(output_view[keys.VIEW_FIELDS]):
output_field_name = output_field[keys.FIELD_NAME]
output_field_type = output_field[keys.FIELD_TYPE]
input_field_name = input_view_fields[i][keys.FIELD_NAME]
input_field_type = input_view_fields[i][keys.FIELD_TYPE]
if output_field_type != input_field_type:
err_obj.get_output_view_err()[output_views[keys.SINGLE_VIEW]][keys.VIEW_FIELDS][i].set_message(
"Output view '%s' field '%s' type '%s' does not match corresponding input view '%s' field '%s' type '%s'." \
% (output_view_name, output_field_name, output_field_type,
input_view_name, input_field_name, input_field_type))
def execute(self, input_views, output_views):
try:
output_view = output_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No output view connected.")
try:
input_view = input_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No input view connected.")
# Get the filter field...
filter_field = self.get_property_value("Field")
filter_field = filter_field.strip()
# ...find out its type
input_view_fields = input_view.fields
for input_field in input_view_fields:
if input_field[keys.FIELD_NAME] == filter_field:
filter_field_type = input_field[keys.FIELD_TYPE]
break
# Get the filter value
filter_value = self.get_property_value("Value")
filter_bypass = self.get_property_value("Bypass")
# If at runtime, the filter bypass property is not None and not "" and
# the filter predicate value matches then do not activate the filter,
# just pass the rows through to the output.
if filter_bypass is not None and filter_bypass != "" and filter_value == filter_bypass:
while True:
record = input_view.read_record()
if record is None:
break
output_view.write_record(record)
output_view.completed()
return
# Coercion required?
# Catch and throw some errors if types don't coerce nicely.
if self.filter_value_type != filter_field_type:
filter_value = computils.convert_to_field_type(filter_field_type, filter_value)
# Get the operator
filter_condition = self.get_property_value("Comparison")
# Unroll...
if filter_condition == '==':
while True:
record = input_view.read_record()
if record is None:
break
test_val = record[filter_field]
if test_val == filter_value:
output_view.write_record(record)
output_view.completed()
elif filter_condition == '>':
while True:
record = input_view.read_record()
if record is None:
break
test_val = record[filter_field]
if test_val > filter_value:
output_view.write_record(record)
output_view.completed()
elif filter_condition == '>=':
while True:
record = input_view.read_record()
if record is None:
break
test_val = record[filter_field]
if test_val >= filter_value:
output_view.write_record(record)
output_view.completed()
elif filter_condition == '<':
while True:
record = input_view.read_record()
if record is None:
break
test_val = record[filter_field]
if test_val < filter_value:
output_view.write_record(record)
output_view.completed()
elif filter_condition == '<=':
while True:
record = input_view.read_record()
if record is None:
break
test_val = record[filter_field]
if test_val <= filter_value:
output_view.write_record(record)
output_view.completed()
elif filter_condition == '!=' or filter_condition == '<>':
while True:
record = input_view.read_record()
if record is None:
break
test_val = record[filter_field]
if test_val != filter_value:
output_view.write_record(record)
output_view.completed()
def upgrade_1_0_to_1_1(self):
"""
Change 1.
Add the Wildcard Bypass value.
"""
# Create the new properties
self.set_property_def("Bypass", prop.SimpleProp("Wildcard for bypass", SnapString,
"String that indicates that Filter should be bypassed", None, False))
def upgrade_1_1_to_1_2(self):
"""
Add LOV constraint to Field property
"""
# Save the property value.
# We need to recreate the property, which resets the value
property_value = self.get_property_value("Field")
# Redefine "Field" property to include the constraint
self.del_property_def("Field")
self.set_property_def("Field", prop.SimpleProp("Field", SnapString, "Field to filter on",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] } ,
True))
# Restore the value
self.set_property_value("Field", property_value)
def upgrade_1_2_to_1_3(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
|