# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
#$Id: pipeline_api.py 7261 2009-04-16 01:05:17Z dhiraj $
"""
Provides functionality for authoring and validating pipeline resource definition.
"""
from copy import deepcopy
import re
from snaplogic.common.snap_exceptions import *
from snaplogic.snapi_base import keys
from snaplogic.snapi_base.resdef import PipelineResDef
from snaplogic.common import prop_err
from snaplogic.common import pipeline_prop_defs
from snaplogic import rp
# Public names
__all__ = [ "PipelineAPI"]
RESOURCE_NAME_PATTERN = re.compile(r'[^a-zA-Z0-9_ ]+')
class Pipeline(object):
view_mode_str = { True:"record mode", False:["binary mode"]}
def __init__(self, resdef):
self._resdef = resdef
def basic_validation(self, err_obj):
"""
Make sure the pipeline is sufficiently correct to run suggest_resource_values()
In order for a pipeline to be completely valid, we need to call suggest_resource_values(),
which will fill in the missing information. However, for suggest_resource_values() to be run,
the pipeline resdef must have some basic information correct. This method checks for that
information.
"""
links = self._resdef.dict[keys.VIEW_LINKS]
res_names = self._resdef.list_resource_names()
#
# Make sure resources and views mentioned in links exist, otherwise we cannot
# run output view modification algorithm in suggest_resource_values()
#
for (i, lnk_entry) in enumerate(links):
link_err = err_obj[keys.VIEW_LINKS][i]
if len(lnk_entry) != 5:
link_err[0].set_message("Link Error: The link should have five values, found %s" % len(lnk_entry))
continue
(src_name, src_viewname, dest_name, dest_viewname, field_links) = lnk_entry
if src_name not in res_names:
link_err[0].set_message("Link Error: Source resource '%s' does not exist in this pipeline" % src_name)
continue
src_res = self._resdef.get_resource(src_name)
if src_viewname not in src_res.list_output_view_names():
link_err[1].set_message("Link Error: Source resource '%s' does not have output view '%s'" %
(src_name, src_viewname))
continue
src_view = src_res.get_output_view(src_viewname)
if self._resdef.check_input_view_assignment(src_name, src_viewname) is not None:
link_err[1].set_message("Link Error: Source view '%s' of resource '%s' has been assigned as input view '%s' "
"of pipeline" % (src_name, src_viewname))
continue
if dest_name not in res_names:
link_err[2].set_message("Link Error: Destination resource '%s' does not exist in this pipeline" % dest_name)
continue
dest_res = self._resdef.get_resource(dest_name)
if dest_viewname not in dest_res.list_input_view_names():
link_err[3].set_message("Link Error: Destination resource '%s' does not have input view '%s'" %
(dest_name, dest_viewname))
continue
dest_view = dest_res.get_input_view(dest_viewname)
if src_view[keys.VIEW_IS_RECORD] != dest_view[keys.VIEW_IS_RECORD]:
# The two views are in differing modes
link_err.set_message("Linking not possible. Source view is in '%s', destination view is in '%s'" %
(self.view_mode_str[src_view[keys.VIEW_IS_RECORD]],
self.view_mode_str[dest_view[keys.VIEW_IS_RECORD]]))
continue
if src_view[keys.VIEW_IS_RECORD]:
# Do record mode view related checks.
# Make sure the destination fields in field_links are valid.
if field_links is None:
link_err.set_message("Linking of record mode views ('%s' to '%s') does not specify field links" %
(src_viewname, dest_viewname))
continue
field_link_err = link_err[4]
dest_field_names = dest_res.list_input_field_names(dest_viewname)
linked_dest_fields = []
for (j, field_lnk_entry) in enumerate(field_links):
if len(field_lnk_entry) != 2:
field_link_err[j][0].set_message(
"Field link is supposed to have two values (source field, dest field), received '%s' values" %
len(field_lnk_entry))
continue
(src_field_name, dest_field_name) = field_lnk_entry
if dest_field_name not in dest_field_names:
field_link_err[j][0].set_message(
"Destination field '%s' in the field link does not exist in the destination view '%s' of resource '%s'" %
(dest_field_name, dest_viewname, dest_name))
continue
if dest_field_name in linked_dest_fields:
field_link_err[j][0].set_message("Destination field '%s' of destination view '%s' resource '%s' "
"has been linked more than once in field level linking" %
(dest_field_name, dest_viewname, dest_name))
continue
else:
# This is linking of binary views. Shouldn't involve any field linking
if field_links != None:
link_err.set_message("Linking of binary mode views ('%s %s' to '%s %s') has field linking information" %
(src_name, src_viewname, dest_name, dest_viewname))
dest_ctypes = dest_res.get_input_view_content_types(dest_viewname)
src_ctypes = src_res.get_output_view_content_types(src_viewname)
negotiated_ctype = rp.select_content_type(dest_ctypes, src_ctypes)
if negotiated_ctype is None:
link_err.set_message("Linking Error: Binary source view '%s' (%s) and destination view '%s' (%s) have "
"incompatible content types" %(src_viewname, src_name, dest_viewname, dest_name))
#
# Make sure resource names and views in view assignment exist. Otherwise, we cannot
# populate the pipeline views in suggest_resource_values()
#
for pipe_viewname in self._resdef.list_input_assignments():
(res_name, res_viewname) = self._resdef.get_input_assignment(pipe_viewname)
if res_name not in res_names:
err_obj[keys.INPUT_VIEW_ASSIGNMENT][pipe_viewname][keys.RESOURCE_NAME].set_message(
"Input view assignment '%s' has unknown resource '%s'" %
(pipe_viewname, res_name))
continue
res = self._resdef.get_resource(res_name)
if res_viewname not in res.list_input_view_names():
err_obj[keys.INPUT_VIEW_ASSIGNMENT][pipe_viewname][keys.RESOURCE_VIEW_NAME].set_message(
"Input view assignment '%s': resource '%s' has no input view '%s'" %
(pipe_viewname, res_name, res_viewname))
for pipe_viewname in self._resdef.list_output_assignments():
(res_name, res_viewname) = self._resdef.get_output_assignment(pipe_viewname)
if res_name not in res_names:
err_obj[keys.OUTPUT_VIEW_ASSIGNMENT][pipe_viewname][keys.RESOURCE_NAME].set_message(
"Output view assignment '%s' has unknown resource '%s'" %
(pipe_viewname, res_name))
continue
res = self._resdef.get_resource(res_name)
if res_viewname not in res.list_output_view_names():
err_obj[keys.OUTPUT_VIEW_ASSIGNMENT][pipe_viewname][keys.RESOURCE_VIEW_NAME].set_message(
"Output view assignment '%s': resource '%s' has no output view '%s'" %
(pipe_viewname, res_name, res_viewname))
def validate(self, err_obj):
"""Validates the pipeline resource definition."""
self.basic_validation(err_obj)
if err_obj._to_resdef() is not None:
return
if not len(self._resdef.dict[keys.PIPELINE_RESOURCES]):
# OK, some wise guy actually tried this corner case.
err_obj[keys.PIPELINE_RESOURCES].set_message("Pipeline has no resources")
#
# Validate links
#
links = self._resdef.dict[keys.VIEW_LINKS]
res_names = self._resdef.list_resource_names()
for n in res_names:
m = RESOURCE_NAME_PATTERN.search(n)
if m is not None:
err_obj[keys.PIPELINE_RESOURCES][n].set_message("Resource name '%s' has the invalid character(s) '%s'"
% (n, m.group(0)))
continue
if self._resdef.get_resource_uri(n) is None:
err_obj[keys.PIPELINE_RESOURCES][n][keys.URI].set_message(
"Resource '%s' in pipeline has no URI specified" % n)
for (i, (src_name, src_viewname, dest_name, dest_viewname, field_links)) in enumerate(links):
src_res = self._resdef.get_resource(src_name)
src_view = src_res.get_output_view(src_viewname)
dest_res = self._resdef.get_resource(dest_name)
dest_view = dest_res.get_input_view(dest_viewname)
if src_view[keys.VIEW_IS_RECORD]:
# Do record mode view related checks.
# Make sure the field linking is correct.
link_err = err_obj[keys.VIEW_LINKS][i]
field_link_err = link_err[4]
src_field_names = src_res.list_output_field_names(src_viewname)
dest_field_names = dest_res.list_input_field_names(dest_viewname)
linked_dest_fields = []
for (j, (src_field_name, dest_field_name)) in enumerate(field_links):
dest_field_type = dest_res.get_input_field_type(dest_viewname, dest_field_name)
# It is possible that source field name has been set to None. This happens when
# during linking, the user wants to hard code the value of certain destination
# fields to None. They achieve this by setting a value of None, where normally
# a source field name is expected. So, check to see if a non-None source field
# name is specified, before attempting to validate it.
if src_field_name is not None:
src_field_type = src_res.get_output_field_type(src_viewname, src_field_name)
# Make sure field types match.
if src_field_type != dest_field_type:
field_link_err[j].set_message(
"Link '%s' (%s) to '%s' (%s): Types of field '%s' and '%s' don't match" %
(src_name, src_viewname, dest_name, dest_viewname, src_field_name,
dest_field_name))
if src_field_name not in src_field_names:
field_link_err[j][0].set_message(
"Source field '%s' in the field link does not exist in the source "
"view '%s' of resource '%s'" % (src_field_name, src_viewname, src_name))
if dest_field_name not in dest_field_names:
if dest_field_name in linked_dest_fields:
field_link_err[j][0].set_message(
"Destination field '%s' of destination view '%s' resource '%s' "
"has been linked more than once in field level linking" %
(dest_field_name, dest_viewname, dest_name))
continue
field_link_err[j][0].set_message(
"Destination field '%s' in the field link does not exist in the "
"destination view '%s' of resource '%s'" %
(dest_field_name, dest_viewname, dest_name))
else:
linked_dest_fields.append(dest_field_name)
# Now, make sure that all the dest fields have been mentioned in field level linking.
if len(linked_dest_fields) < len(dest_field_names):
unlinked_fields = [f for f in dest_field_names if f not in linked_dest_fields]
field_link_err.set_message("Fields '%s' in destination view '%s' of resource '%s' are unlinked" %
(str(unlinked_fields), dest_viewname, dest_name))
#
# Now we test and see if the client has populated the pipeline views and params and make them match
# the view assignments and pipeline param settings.
#
assigned_views = set(self._resdef.list_input_assignments())
actual_views = set(self._resdef.list_input_view_names())
d1 = assigned_views - actual_views
for pipe_viewname in d1:
err_obj[keys.INPUT_VIEW_ASSIGNMENT][pipe_viewname].set_message(
"Input view assignment '%s' has not populated into "
"input view section of the pipeline resource." % pipe_viewname)
d2 = actual_views - assigned_views
for pipe_viewname in d2:
err_obj[keys.INPUT_VIEWS][pipe_viewname].set_message(
"'%s' present in input view section of resource, but no corresponding entry found "
"in view assignment section of the resource." % pipe_viewname)
# If the views look like they are in synch, then dig deeper and see if their
# view definitions match.
if len(d1) == 0 and len(d2) == 0:
for pipe_viewname in self._resdef.list_input_assignments():
(res_name, res_viewname) = self._resdef.get_input_assignment(pipe_viewname)
res_view = self._resdef.get_resource(res_name).get_input_view(res_viewname)
pipe_view = self._resdef.get_input_view(pipe_viewname)
for k in [keys.VIEW_IS_RECORD, keys.INPUT_VIEW_IS_PASSTHROUGH, keys.VIEW_FIELDS,
keys.VIEW_CONTENT_TYPES, keys.DESCRIPTION]:
if (k in res_view) != (k in pipe_view):
err_obj[keys.INPUT_VIEW_ASSIGNMENT][pipe_viewname].set_message(
"Definition of pipeline input view '%s' does not match resource '%s' input view '%s'. "
"Key mismatch '%s'" % (pipe_viewname, res_name, res_viewname, k))
elif (k not in res_view) and (k not in pipe_view):
continue
if res_view[k] != pipe_view[k]:
err_obj[keys.INPUT_VIEW_ASSIGNMENT][pipe_viewname].set_message(
"Definition of pipeline input view '%s' does not match resource '%s' input view '%s'. "
"Key '%s'" % (pipe_viewname, res_name, res_viewname, k))
assigned_views = set(self._resdef.list_output_assignments())
actual_views = set(self._resdef.list_output_view_names())
d1 = assigned_views - actual_views
for pipe_viewname in d1:
err_obj[keys.OUTPUT_VIEW_ASSIGNMENT][pipe_viewname].set_message(
"Output view assignment '%s' has not populated into "
"output view section of the pipeline resource." % pipe_viewname)
d2 = actual_views - assigned_views
for pipe_viewname in d2:
err_obj[keys.OUTPUT_VIEWS][pipe_viewname].set_message(
"'%s' present in output view section of resource, but no corresponding entry found "
"in view assignment section of the resource." % pipe_viewname)
# If the views look like they are in synch, then dig deeper and see if their
# view definitions match.
if len(d1) == 0 and len(d2) == 0:
for pipe_viewname in self._resdef.list_output_assignments():
(res_name, res_viewname) = self._resdef.get_output_assignment(pipe_viewname)
res_view = self._resdef.get_resource(res_name).get_output_view(res_viewname)
pipe_view = self._resdef.get_output_view(pipe_viewname)
for k in [keys.VIEW_IS_RECORD, keys.VIEW_FIELDS, keys.VIEW_CONTENT_TYPES, keys.DESCRIPTION]:
if (k in res_view) != (k in pipe_view):
err_obj[keys.OUTPUT_VIEW_ASSIGNMENT][pipe_viewname].set_message(
"Definition of pipeline output view '%s' does not match resource '%s' output view '%s'. "
"Key mismatch '%s'" % (pipe_viewname, res_name, res_viewname, k))
elif k not in res_view and k not in pipe_view:
continue
if res_view[k] != pipe_view[k]:
err_obj[keys.OUTPUT_VIEW_ASSIGNMENT][pipe_viewname].set_message(
"Definition of pipeline output view '%s' does not match resource '%s' output view '%s'. "
"Key '%s'" % (pipe_viewname, res_name, res_viewname, k))
#
# Validate pass-through settings.
#
output_view_names = self._resdef.list_output_view_names()
input_view_names = self._resdef.list_input_view_names()
for out_name in self._resdef.list_pass_through_output_views():
input_views = self._resdef.get_output_view_pass_through(out_name)[keys.INPUT_VIEWS]
pass_err = err_obj[keys.OUTPUT_VIEW_PASSTHROUGH][out_name]
if out_name not in output_view_names:
pass_err.set_message("Output view '%s' specified in pass-through does not exist" % out_name)
continue
view = self._resdef.get_output_view(out_name)
if not view[keys.VIEW_IS_RECORD]:
pass_err.set_message("Output view '%s' specified in pass-through is not a record mode view" % out_name)
if len(input_views) == 0:
pass_err.set_message("Input view names list in pass-through entry for output view '%s' is empty" % out_name)
continue
for (i, inp_name) in enumerate(input_views):
if inp_name not in input_view_names:
pass_err[keys.PT_INPUT_VIEWS][i].set_message("Input view '%s' in pass-through entry for output "
"view '%s' does not exist" % (inp_name, out_name))
continue
#
# Validate parameter map.
#
#
# Make sure resources and resource params in param maps are valid.
#
param_err = err_obj[keys.PIPELINE_PARAM_MAP]
param_map = self._resdef.get_param_map()
# First: Figure out all the required params in the resources of the pipeline.
required_params = {}
for name in self._resdef.list_resource_names():
required_params[name] = []
res = self._resdef.get_resource(name)
for param_name in res.list_param_names():
param_def = res.get_param_def(param_name)
if param_def[keys.DEFAULT_VALUE] is None:
# It has no default value. It is a required param.
required_params[name].append(param_name)
for (i, (pipe_param, res_name, res_param_name, default_value)) in enumerate(param_map):
if pipe_param is None and default_value is None:
param_err[i][1].set_message("Parameter map entry number '%s' has neither pipeline "
"param name, nor default value specified" % i)
if res_name not in res_names:
param_err[i][2].set_message("Parameter map has unknown resource name '%s'" % res_name)
if res_param_name not in self._resdef.get_resource(res_name).list_param_names():
param_err[i][3].set_message("Parameter map has unknown param name '%s' for resource '%s'" %
(res_param_name, res_name))
# If this was a required param for the resource in the pipeline, then remove it from the required list.
# The removal indicates that the require parameter is being propery mapped by param map.
if res_name in required_params and res_param_name in required_params[res_name]:
required_params[res_name].remove(res_param_name)
# Now, report error on all the required params that have not been mapped.
for res_name in required_params:
if len(required_params[res_name]) > 0:
for param_name in required_params[res_name]:
err_obj[keys.PIPELINE_RESOURCES][res_name][keys.RESDEF][keys.PARAMS][param_name].set_message(
"Required param '%s' of resource '%s' is not in the parameter map" %
(param_name, res_name))
param_names_list = self._resdef.list_param_names()
# Collect the names of the pipeline parameters
found_pipe_params = []
for (i, (pipe_param, res_name, res_param_name, default_value)) in enumerate(param_map):
if pipe_param is not None:
if pipe_param not in param_names_list:
param_err[i][1].set_message("Parameter map has non existent pipeline param '%s'" % pipe_param)
else:
found_pipe_params.append(pipe_param)
# Make sure that the pipeline parameters mentioned in the map and in the paramater list are identical.
if len(found_pipe_params) < len(param_names_list):
extra_params = [p for p in param_names_list if p not in found_pipe_params]
for p in extra_params:
err_obj[keys.PARAMS][p].set_message("Pipeline parameter '%s' is not specified in the parameter map" % p)
def suggest_resource_values(self, err_obj):
"""
Suggests resource definition values for the pipeline.
This method does the following:
- Copies "assigned" view definitions from member resources to the pipeline.
- Creates pipeline param definitions based on the param map.
- For pass-through output views in member resources, the output view definition gets added fields,
if the linking makes this necessary.
"""
self.basic_validation(err_obj)
if err_obj._to_resdef() is not None:
return
links = self._resdef.dict[keys.VIEW_LINKS]
(status, sorted_list) = self._sort_resources()
if not status:
err_obj.set_message("The pipeline found the following resources to have a loop among them: %s" %
", ".join(sorted_list))
return
for res_name in sorted_list:
res = self._resdef.get_resource(res_name)
pt_output_map = res.dict[keys.OUTPUT_VIEW_PASSTHROUGH]
pt_inputs = {}
for out_name in pt_output_map:
for inp_name in pt_output_map[out_name][keys.INPUT_VIEWS]:
pt_inputs[inp_name] = []
self.compute_input_view_fields(res_name, res, pt_inputs)
for out_name in pt_output_map:
self.modify_output_view(out_name, pt_output_map[out_name][keys.INPUT_VIEWS], pt_inputs, res)
# Copy all param map entries to parameter list
self._resdef.copy_param_map_to_params()
# Copy all view definitions for assigned views.
self._resdef.copy_assigned_views()
def _sort_resources(self):
upstream_dict = dict.fromkeys(self._resdef.list_resource_names())
# First, create a dictionary which has resources as keys and the value
# is a list of resources upstream to that resource.
for (src_name, src_viewname, dest_name, dest_viewname, field_links) in self._resdef.dict[keys.VIEW_LINKS]:
if upstream_dict[dest_name] is None:
upstream_dict[dest_name] = []
# Need to do this check, since the downstream component might have more thn one link to
# upstream component.
if src_name not in upstream_dict[dest_name]:
upstream_dict[dest_name].append(src_name)
# Now, the above dictionary is used in coming up with a semi-sorted list, where the
# upstream resources are listed before their downstream resources. As an example,
# consider a pipeline like this
# CsvRead1 ----
# | --- Join -- CsvWrite
# CsvRead2 ----
# We do this by walking the dictionary and in each iteration, finding one resource
# (CsvRead1) that has no upstream resources. We remove all references
# to CsvRead1 in the dictionary and place it in the sorted list. The next iteration
# would remove CsvRead2. This allows the next iteration to find Join (since CsvRead1 and
# CsvRead2 are gone from the dictionary). Finally, CsvWrite is found.
# This approach would fail, if there was an illegal loop in the pipeline. The loop is
# assumed to be found, if an iteration fails to find a candidate to be removed from the
# dictionary.
sorted_list = []
while len(upstream_dict):
found = None
for res_name in upstream_dict:
if upstream_dict[res_name] is None or len(upstream_dict[res_name]) == 0:
found = res_name
break
if found is not None:
sorted_list.append(found)
del upstream_dict[found]
for res_name in upstream_dict:
if (upstream_dict[res_name] is not None) and (found in upstream_dict[res_name]):
upstream_dict[res_name].remove(found)
else:
# There must a loop in this linking.
return (False, upstream_dict.keys())
return (True, sorted_list)
def compute_input_view_fields(self, res_name, res, pt_inputs):
"""
Determine pass-through fields being provided by each of the input views.
@param res: Resource inside the pipeline
@type res: ResDef
@param res_name: Name of the resource in the pipeline.
@type res_name: str
@param pt_inputs: Dictionary, with pass-through input view names as keys. Values are empty.
When the method, returms, those values will be set to the names of the fields being
passed through by each view, as an ordered list.
@type pt_inputs: dict
"""
links = self._resdef.dict[keys.VIEW_LINKS]
for inp_name in pt_inputs:
view = res.get_input_view(inp_name)
pt_fields = []
for (src_name, src_viewname, dest_name, dest_viewname, field_links) in links:
if res_name == dest_name and inp_name == dest_viewname:
src_res = self._resdef.get_resource(src_name)
src_fields = src_res.get_output_view(src_viewname)[keys.VIEW_FIELDS]
for s in src_fields:
found = False
for l in field_links:
if s[0] == l[0]:
found = True
break
if not found:
pt_fields.append(s)
pt_inputs[inp_name] = pt_fields
def _compute_name(self, colliding_name, used_list):
"""
Compute a new name for a field, that has collision with an exisitng field name in used_list.
@param colliding_name: Name that had the collision.
@type colliding_name: str
@param used_list: List of existing names.
@type used_list: list
@return: New name.
@rtype: str
"""
new_name = colliding_name
i = 1
while new_name in used_list:
if i > 1000:
raise SnapResDefError("Unable to compute a new name for pass through field '%s' in list '%s'" %
(colliding_name, str(used_list)))
new_name = "%s.%s" % (colliding_name, i)
i += 1
return new_name
def modify_output_view(self, out_name, inp_name_list, pt_inputs, res):
out_view = res.get_output_view(out_name)
# Get the fieldnames that resource originally had (which excludes pass-through fields)
original_fields = deepcopy(out_view[keys.ORIGINAL_OUTPUT_FIELDS])
# Create entry which will hold this output view's pass-through fields (if it doesn't exist).
# This entry holds names computed for pass-through fields in a previous run. For example:
# out_view[keys.RENAMED_FIELDS_MAP]["inp1"]["first_name"] = "first_name.1"
# In that example, the field "first_name", passed through by input view "inp1" was renamed to
# "first_name.1" in the output view (probably due to some name collision). We remember this
# renamings, so we do not change the new name, every time we do this view modification.
if keys.RENAMED_FIELDS_MAP not in out_view:
out_view[keys.RENAMED_FIELDS_MAP] = {}
# This list keeps track of field names that have been used in the output view. This list
# is used to avoid field name collisions.
taken_field_names = [f[0] for f in original_fields]
for inp_name in inp_name_list:
# At the end of this iteration, this dictionary will map pass through fields of this
# input view to their name in the output view.
updated_renamed_fields_map = {}
pt_fields = pt_inputs[inp_name]
if inp_name in out_view[keys.RENAMED_FIELDS_MAP]:
renamed_fields_map = out_view[keys.RENAMED_FIELDS_MAP][inp_name]
else:
renamed_fields_map = {}
new_pt_fields = []
# First, try to see if previously used names for pass-through fields can be retained. The aim is
# to stick with a field name once it has been previously used. Because, that name might have been
# used in linking downstream. Of course, if it is not possible to retain the name due to a name
# clash, then we are forced to use a new name for the pass-through field.
for f in pt_fields:
if f[0] in renamed_fields_map:
out_field_name = renamed_fields_map[f[0]]
if out_field_name in taken_field_names:
# We have a collision.
out_field_name = self._compute_name(out_field_name, taken_field_names)
updated_renamed_fields_map[f[0]] = out_field_name
taken_field_names.append(out_field_name)
else:
new_pt_fields.append(f)
# Now, see if there are any new pass-through field, the ones that were not seen
# in a previous run of suggest_resource_values() and give them names.
for f in new_pt_fields:
if f[0] in taken_field_names:
new_name = self._compute_name(f[0], taken_field_names)
else:
new_name = f[0]
updated_renamed_fields_map[f[0]] = new_name
taken_field_names.append(new_name)
# Set output with the updated names map.
out_view[keys.RENAMED_FIELDS_MAP][inp_name] = updated_renamed_fields_map
# Finally, we are ready to return the completed set of fields provided by the output view.
modified_field_list = list(original_fields)
for inp_name in inp_name_list:
pt_fields = pt_inputs[inp_name]
for f in pt_fields:
field_name = out_view[keys.RENAMED_FIELDS_MAP][inp_name][f[0]]
modified_field_list.append((field_name, f[1], f[2]))
out_view[keys.VIEW_FIELDS] = modified_field_list
res._set_output_view(out_name, out_view)
def suggest_values(resdef_dict):
pipe_resdef = PipelineResDef(resdef_dict)
pipe = Pipeline(pipe_resdef)
#pp = PrettyPrinter(indent=4)
#print "PRE_SUGGEST: "
#pp.pprint(resdef_dict)
err_obj = prop_err.create_err_obj(resdef_dict, [False], pipeline_prop_defs.PIPELINE_RESOURCE_PROP)
e_dict = err_obj._to_resdef()
if e_dict is not None:
return (e_dict, None)
pipe.suggest_resource_values(err_obj)
e_dict = err_obj._to_resdef()
if e_dict is not None:
return (e_dict, None)
return (None, pipe._resdef.dict)
def validate(resdef_dict):
pipe_resdef = PipelineResDef(resdef_dict)
pipe = Pipeline(pipe_resdef)
#pp = PrettyPrinter(indent=4)
#print "VALIDATE: "
#pp.pprint(resdef_dict)
err_obj = prop_err.create_err_obj(resdef_dict, [False], pipeline_prop_defs.PIPELINE_RESOURCE_PROP)
e_dict = err_obj._to_resdef()
if e_dict is not None:
return e_dict
pipe.validate(err_obj)
e_dict = err_obj._to_resdef()
if e_dict is not None:
return e_dict
return None
|