# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: DateMath.py 10330 2009-12-24 22:13:38Z grisha $
"""
DateMath module evaluates various arithmetic operations on datetime values
"""
import os
import re
from sqlite3 import dbapi2
from datetime import datetime
import time
from decimal import Decimal
from sets import Set
from snaplogic.common import version_info
import snaplogic.components as components
from snaplogic.cc import component_api
from snaplogic.cc.component_api import ComponentAPI
from snaplogic.cc import prop
from snaplogic.snapi_base import resdef,keys
from snaplogic.common import snap_log
from snaplogic.common.data_types import SnapNumber,SnapString,SnapDateTime
from snaplogic.common.snap_exceptions import *
import shlex
EXPRESSION = "Date expression"
OUTPUT_FIELD_NAME = "An output view field name"
OUTPUT_FIELD_TYPE = "Expression result type"
DATE_SPEC = "Date expression specification"
DATE_SPECS = "Date expressions"
DATE_FORMAT_STRING = "%Y-%m-%d %H:%M:%S"
class DateMath(ComponentAPI):
"""
DateMath
"""
api_version = '1.0'
component_version = '1.2'
capabilities = {
# The case for 0 input views can be made in using this resource as an e.g.
# timestamp generator (by using expression 'now')
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 0,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : 1,
ComponentAPI.CAPABILITY_ALLOW_PASS_THROUGH : True
}
component_description = "This component evaluates various arithmetic operations on datetime values."
component_label = "Date Operations"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/DateOperations" % \
version_info.doc_uri_version
result_types = {'days' : '%J', 'seconds' : '%s', 'hours' : '%s', 'minutes' : '%s', 'datetime' : '%Y-%m-%d %H:%M:%S'}
def create_resource_template(self):
"""
Create DateMath resource template.
"""
output_field_name = prop.SimpleProp(OUTPUT_FIELD_NAME,
SnapString,
"What output field the result corresponds to",
{'lov': [ keys.CONSTRAINT_LOV_OUTPUT_FIELD] },
True)
output_field_type = prop.SimpleProp(OUTPUT_FIELD_TYPE, SnapString, "Result Type",
{"lov": sorted(self.result_types.keys())}, True)
expression = prop.SimpleProp(EXPRESSION,
SnapString,
"Defines the date expression that is to be evaluated.",
None,
True)
date_spec = prop.DictProp(DATE_SPEC,
None,
"Date expression specification dictionary",
3,
3,
True,
True)
date_spec[OUTPUT_FIELD_NAME] = output_field_name
date_spec[OUTPUT_FIELD_TYPE] = output_field_type
date_spec[EXPRESSION] = expression
date_specs = prop.ListProp(DATE_SPECS,
date_spec,
"",
1,
resdef.UNLIMITED_ENTRIES,
True)
self.set_property_def(DATE_SPECS, date_specs)
def validate(self, err_obj):
"""
Component-specific correctness validation logic.
@param err_obj: Object for error reporting
@type err_obj: L{SimplePropErr} or L{ListPropErr} or L{DictPropErr}
"""
output_views = self.list_output_view_names()
output_view_name = output_views[keys.SINGLE_VIEW]
output_view = self.get_output_view_def(output_view_name)
# Dictionary of output field types to names
output_fields_name_type = {}
output_fields_names = []
for output_field in output_view[keys.VIEW_FIELDS]:
output_fields_names.append(output_field[keys.FIELD_NAME])
output_fields_name_type[output_field[keys.FIELD_NAME]] = output_field[keys.FIELD_TYPE]
date_specs = self.get_property_value(DATE_SPECS)
# We need to build a list of date_output_field_names for later use in validating the output view fields.
date_output_field_names = []
for i, spec in enumerate(date_specs):
output_name = spec[OUTPUT_FIELD_NAME]
property_output_type = spec[OUTPUT_FIELD_TYPE]
expression = spec[EXPRESSION]
date_output_field_names.append(output_name)
# Check 1: Verify that result types selected by the user in the property
# reasonably correspond to output field types.
# Here is a matrix of allowed type mixes:
# property_output_type output_field_type
# ---------------------------------------
# datetime datetime,string
# days,secs,mins,hrs number,string
#
# Note that this is partially so because we do not (yet) have a good way
# to mask some string representation of a date *into* a datetime type.
output_field_type = output_fields_name_type[output_name]
if (output_field_type == SnapDateTime) and (property_output_type != 'datetime'):
err_obj.get_property_err(DATE_SPECS)[i][OUTPUT_FIELD_TYPE].set_message(
"Output view field '%s' type mismatch: marked as '%s' in date expression \"%s\" specification;" \
" has to be a 'string' or 'number' (not '%s') in output view." \
% (output_name, property_output_type, spec[EXPRESSION], output_field_type))
elif (property_output_type == 'datetime') and not \
(output_field_type == SnapDateTime or output_field_type == SnapString):
err_obj.get_property_err(DATE_SPECS)[i][OUTPUT_FIELD_TYPE].set_message(
"Output view field '%s' type mismatch: marked as '%s' in output view, and '%s' in date " \
"expression \"%s\" specification. Set output field type to 'datetime' in output view, " \
"or select different expression result type." \
% (output_name, output_field_type, property_output_type, spec[EXPRESSION]))
input_views = self.list_input_view_names()
input_fields_in_expressions = self.get_referenced_fields(expression)
if len(input_fields_in_expressions) > 0:
# Check 2: If no input views are defined, no input field references should
# be present in the expressions.
if input_views == []:
err_obj.get_property_err(DATE_SPECS).set_message(
"Cannot have input field names (e.g. ${%s}) present in expressions "\
"while no input views are defined." % input_fields_in_expressions[0])
else:
# Check 3: Input view defined. Make sure input view field references of the
# form ${field001} in expressions match input view field names.
input_view = self.get_input_view_def(input_views[keys.SINGLE_VIEW])
input_view_fields = [ d[keys.FIELD_NAME] for d in input_view[keys.VIEW_FIELDS] ]
for input_field in input_fields_in_expressions:
if input_field not in input_view_fields:
err_obj.get_property_err(DATE_SPECS).set_message(
"Input field name '%s' not present in input view." % input_field)
# Check 4: All output fields must have a corresponding date expression.
# NB: Those that don't need one can use values passed-through from inputs.
# Get a list of input view field names to check output field names against.
# Note that there may be no input view defined, so check first
input_views = self.list_input_view_names()
if len(input_views) > 0:
input_view_name = input_views[keys.SINGLE_VIEW]
input_view = self.get_input_view_def(input_view_name)
input_field_names = [ d[keys.FIELD_NAME] for d in input_view[keys.VIEW_FIELDS] ]
input_field_types = [ d[keys.FIELD_TYPE] for d in input_view[keys.VIEW_FIELDS] ]
else:
input_field_names = []
for i, field in enumerate(output_fields_names):
if field in date_output_field_names:
# There is an expression defined on this field.
# There is nothing else to validate, move on to the next field.
pass
elif field in input_field_names:
# If output field name is the same as input field name.
# the field value will be transferred unchanged, so validate the type
input_field_type = input_field_types[input_field_names.index(field)]
output_field_type = output_fields_name_type[field]
if input_field_type != output_field_type:
err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][i].set_message(
"Output view '%s' field '%s' type '%s' does not match corresponding input view '%s' field '%s' type '%s'." %
(output_view_name, field, output_field_type, input_view_name, field, input_field_type))
else:
# There is no expression defined for this output field, neither
# there is an input field with the same name: log an error.
err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][i].set_message(
"Output view field '%s' does not have a corresponding date expression." % field)
def _cleanup(self):
"""
Clean up resources.
"""
if self._cur:
try:
self._cur.close()
except:
pass
if self._con:
try:
self._con.close()
except:
pass
def execute(self, input_views, output_views):
try:
self._out_view = output_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No output view connected.")
if input_views != {}:
try:
self._input_view = input_views[self.list_input_view_names()[keys.SINGLE_VIEW]]
except IndexError:
raise SnapComponentError("No input view connected.")
else:
self._input_view = {}
try:
self._execute()
finally:
self._cleanup()
def _execute(self):
# Using sqlite in this way provides us a safe "sandbox" for executing
# sqlite statements in-memory only during the lifetime of this component.
self._con = sqlite.connect( ":memory:" )
self._cur = self._con.cursor()
if self._input_view != {}:
input_field_names = self._input_view.field_names
input_field_types = self._input_view.field_types
else:
input_field_names = []
input_field_types = []
output_field_names = list(self._out_view.field_names)
output_field_types = list(self._out_view.field_types)
date_specs = self.get_property_value(DATE_SPECS)
self.log(snap_log.LEVEL_DEBUG, "DateMath date_specs: %s" % date_specs)
# Make a map of expressions corresponding to output field names,
# as well as output field types corresponding to output field names.
output_name_to_function = {}
output_name_to_type = {}
for spec in date_specs:
output_name_to_function[spec[OUTPUT_FIELD_NAME]] = spec[EXPRESSION]
output_name_to_type[spec[OUTPUT_FIELD_NAME]] = spec[OUTPUT_FIELD_TYPE]
self.log(snap_log.LEVEL_DEBUG, "DateMath: output_name_to_function: %s" % output_name_to_function)
self.log(snap_log.LEVEL_DEBUG, "DateMath: output_name_to_type: %s" % output_name_to_type)
# Make a list of common fields: fields with same names in the input and output views.
# These fields will be copied as is from input to output.
common_fields = (Set(output_field_names) - Set(output_name_to_function.keys())) & Set(input_field_names)
out_rec = self._out_view.create_record()
first = True
# vals_names stores an ordered list of input field names as they are encountered (as
# parameters) in the expressions. If the same input field is used in multiple properties,
# it will be repeated in this list.
vals_names = []
used_output_fields = []
used_output_field_types = []
# stmt collects all expressions into a single comma-separated sqlite statement
stmt = "SELECT "
for f in output_field_names:
if output_name_to_function.has_key(f):
# Save all used output fields and their types
used_output_fields.append(f)
index = output_field_names.index(f)
used_output_field_types.append(output_field_types[index])
expression = output_name_to_function[f]
type = output_name_to_type[f]
# Extract all tokens (i.e. input field names) wrapped inside '${token}'
flds = re.findall('\${(\w+)}', expression)
# Also extract (again) only the ones that are at the beginning or end of expression,
# or have whitespace around them. In other words, extract 'shipdate' and 'orderdate'
# from '${shipdate} - ${orderdate}', but not 'somenum' from '+${somenum}'.
fldsw = re.findall('^\${(\w+)}\s', expression)
fldsw.extend(re.findall('\s\${(\w+)}\s', expression))
fldsw.extend(re.findall('\s\${(\w+)}$', expression))
for fld in flds:
# Check if this is a field name in inputview
if fld in input_field_names:
# Substitute field name with '?', which will later (at the time of sqlite
# statement execution) be substituted with its value, using vals_names for lookup.
#
# In case expression contains '+${somenum} day', replacing it with '+? day' will
# not work: executing such sqlite statement will not substitute (bind) a value to
# "?" in this case because the ? is inside quotes (''). Instead, we use sql concat
# operator, ||, to break the quotes around ?, i.e. '+'||?||' day'
if fld in fldsw:
expression = re.sub('\${' + fld + '}', '?', expression)
else:
expression = re.sub('\${' + fld + '}', '\'||?||\'', expression)
vals_names.append(fld)
else:
raise SnapComponentError("Unexpected field %s that is not present in the input view encountered in expression %s" % (fld, expression))
# Substitute correct formatting string for the user-selected expression result type
ex = "strftime('%s'" % self.result_types[type]
expression = str(expression)
# Tokenize datetime math expression to figure out if it is of a type
# datetime_function('timestring', 'modifier', 'modifier' ...)
# or of a type
# datetime_function('timestring', 'modifier', ...) [+-] datetime_function('timestring', 'modifier', ...)
# Depending on this, "strftime('%s'" string has to be inserted into sqlite expression
# being built here either once, or multiple times.
lexer = shlex.shlex(expression)
# Since we use ||?|| in the expressions outside quotes (see comment above), we tell shlex to
# consider | and ? word characters so it doesn't tokenize them separately.
lexer.wordchars += '|'
lexer.wordchars += '?'
specialAppendFlag = False
while True:
t = lexer.get_token()
if not t:
break
self.log(snap_log.LEVEL_DEBUG, "DateMath: TOKEN: [%s]" % t)
if t == '+' or t == '-':
# This is arithmetic operation between multiple datetimes.
ex += ") %s" % t
ex += " strftime('%s'" % self.result_types[type]
elif specialAppendFlag:
ex += t
specialAppendFlag = False
elif t == "||?||\'":
# This is how shlex tokenizes expression like "'+'||?||' day'"
# (the preceding token in this case would be "'+'")
# We want to simply append this, as well as the next token, e.g. " day'".
ex += t
specialAppendFlag = True
else:
# This is just the next modifier; append them with commas between them.
ex += ", %s" % t
ex += ")"
self.log(snap_log.LEVEL_DEBUG, "DateMath: Expression: %s" % ex)
if not first:
stmt += ", "
else:
first = False
stmt += ex
self.log(snap_log.LEVEL_DEBUG, "DateMath: sqlite statement: %s" % stmt)
while True:
# Create output record
output_record = self._out_view.create_record()
# We may or may not have an input view
if self._input_view != {}:
# If there is an input view, read a record from it
# and use the fields to bind variables into the expression.
input_record = self._input_view.read_record()
if input_record is None:
# EOF
break
# vals list stores actual values for input fields which names were
# collected (above) from the expressions properties.
vals = []
for field_name in vals_names:
val = input_record[field_name]
if isinstance(val, Decimal):
vals.append(str(val))
else:
vals.append(val)
self.log(snap_log.LEVEL_DEBUG, "DateMath: input values list: %s" % vals)
self._cur.execute(stmt, vals)
# Transfer pass through fields, and fields matched on name
output_record.transfer_pass_through_fields(input_record)
output_record.transfer_matching_fields(input_record, common_fields)
else:
# If there is no input view, just execute the statement without any variables
self._cur.execute(stmt)
# Read the results of sqlite statement execution into records
for row in self._cur:
self.log(snap_log.LEVEL_DEBUG, "DateMath: result row: %s" % str(row))
i = 0
for field in used_output_fields:
if row[i] is None:
output_record[field] = row[i]
elif used_output_field_types[i] == SnapDateTime:
output_record[field] = datetime(*(time.strptime(row[i], DATE_FORMAT_STRING)[0:6]))
else:
# output type is either string or number, barring future new types
if output_name_to_type[field] == 'minutes':
res = float(row[i])/60
elif output_name_to_type[field] == 'hours':
res = float(row[i])/3600
else:
res = row[i]
if (used_output_field_types[i] == SnapNumber):
# NB: str() is necessary here to convert float to Decimal
output_record[field] = Decimal(str(res))
else: # output_field_types[i] == SnapString:
u = str(res).decode('utf-8')
output_record[field] = u
i += 1
self._out_view.write_record(output_record)
# If there are no input views, we execute only once
if self._input_view == {}:
break
self._out_view.completed()
def upgrade_1_0_to_1_1(self):
"""
Add source constraint to Field property
"""
# Save the property value.
# We need to recreate the property, which resets the value
property_value = self.get_property_value(DATE_SPECS)
output_field_name = prop.SimpleProp(OUTPUT_FIELD_NAME,
SnapString,
"What output field the result corresponds to",
{'lov': [ keys.CONSTRAINT_LOV_OUTPUT_FIELD] },
True)
output_field_type = prop.SimpleProp(OUTPUT_FIELD_TYPE, SnapString, "Result Type",
{"lov": sorted(self.result_types.keys())}, True)
expression = prop.SimpleProp(EXPRESSION,
SnapString,
"Defines the date expression that is to be evaluated.",
None,
True)
date_spec = prop.DictProp(DATE_SPEC,
None,
"Date expression specification dictionary",
3,
3,
True,
True)
date_spec[OUTPUT_FIELD_NAME] = output_field_name
date_spec[OUTPUT_FIELD_TYPE] = output_field_type
date_spec[EXPRESSION] = expression
date_specs = prop.ListProp(DATE_SPECS,
date_spec,
"",
1,
resdef.UNLIMITED_ENTRIES,
True)
self.set_property_def(DATE_SPECS, date_specs)
# Restore the value
self.set_property_value(DATE_SPECS, property_value)
def upgrade_1_1_to_1_2(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
|