# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: Splitter.py 10330 2009-12-24 22:13:38Z grisha $
"""
Component to take a single input and write it to two or more outputs.
Each input view record is written to all connected output views. There is no defined order of which of the output views
is written to first, but record order is preserved.
All input and output views must by identical, meaning that they contain fields of the same type and name in
the same order.
This component supports unconnected output views, but at least one output view must be connected.
"""
__docformat__ = "epytext en"
from snaplogic.snapi_base import keys
from snaplogic.common.data_types import Record
from snaplogic.common import version_info
from snaplogic.cc.component_api import ComponentAPI
import snaplogic.cc.prop as prop
from snaplogic import components
from snaplogic.common.snap_exceptions import *
# Public names
__all__ = [ "Splitter" ]
class Splitter(ComponentAPI):
"""
A component to duplicate input records across multiple output streams.
"""
api_version = '1.0'
component_version = '1.1'
capabilities = {
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 2,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : ComponentAPI.UNLIMITED_VIEWS,
ComponentAPI.CAPABILITY_ALLOW_PASS_THROUGH : True
}
component_description = "Create multiple output streams from a single input stream."
component_label = "Splitter"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/Splitter" % \
version_info.doc_uri_version
def create_resource_template(self):
"""
Create Splitter resource definition template.
This component has no properties.
"""
pass
def validate(self, err_obj):
"""
Check the validity of a proposed resdef for this component.
The only requirement is that the input view and output views
all have the same definition.
"""
input_view_names = self.list_input_view_names()
output_view_names = self.list_output_view_names()
# All output views and input views must match field names and types. We
# don't care about descriptions.
input_view_name = input_view_names[keys.SINGLE_VIEW]
input_view = self.get_input_view_def(input_view_name)
input_view_fields = input_view[keys.VIEW_FIELDS]
# Make sure the field count matches before we iterate over the fields.
for output_view_name in output_view_names:
output_view = self.get_output_view_def(output_view_name)
if len(output_view[keys.VIEW_FIELDS]) != len(input_view_fields):
err_obj.get_output_view_err()[output_view_name].set_message(
"Output view '%s' field count '%d' does not match corresponding input view '%s' field count '%d'." \
% (output_view_name, len(output_view[keys.VIEW_FIELDS]),
input_view_name, len(input_view_fields)))
else:
# Each field name and type matches?
for i, output_field in enumerate(output_view[keys.VIEW_FIELDS]):
output_field_name = output_field[keys.FIELD_NAME]
output_field_type = output_field[keys.FIELD_TYPE]
input_field_name = input_view_fields[i][keys.FIELD_NAME]
input_field_type = input_view_fields[i][keys.FIELD_TYPE]
if output_field_name != input_field_name:
err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][i].set_message(
"Output view '%s' field name '%s' does not match corresponding input view '%s' field name '%s'." \
% (output_view_name, output_field_name,
input_view_name, input_field_name))
elif output_field_type != input_field_type:
err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][i].set_message(
"Output view '%s' field '%s' type '%s' does not match corresponding input view '%s' field '%s' type '%s'." \
% (output_view_name, output_field_name, output_field_type,
input_view_name, input_field_name, input_field_type))
def execute(self, input_views, output_views):
"""
Execute: Sent each record to every connected output view.
"""
# Some initial sanity checking
# Need at least 1 input and 1 output.
active_input_views = [] + input_views.values()
if not active_input_views:
raise SnapComponentError("No input views connected.")
active_output_views = [] + output_views.values()
if not active_output_views:
raise SnapComponentError("No output view connected.")
input_view = active_input_views[keys.SINGLE_VIEW]
while True:
record = input_view.read_record()
if record is None:
break
for output_view in active_output_views:
output_view.write_record(record)
for output_view in active_output_views:
output_view.completed()
def upgrade_1_0_to_1_1(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
|