# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: Mixer.py 10330 2009-12-24 22:13:38Z grisha $
"""
Component to combine multiple identical input views into a single output view.
Each input view is combined into a single output view. There is no defined order in the output stream.
Whichever input stream has an available record first will be sent to the output stream.
All input and output views must by identical, meaning that they contain fields of the same type and name in
the same order.
"""
__docformat__ = "epytext en"
from snaplogic.snapi_base import keys
from snaplogic.common.data_types import Record
from snaplogic.common import version_info
from snaplogic.cc.component_api import ComponentAPI
import snaplogic.cc.prop as prop
from snaplogic import components
from snaplogic.common.snap_exceptions import *
# Public names
__all__ = [ "Mixer" ]
class Mixer(ComponentAPI):
"""
A component to merge multiple input streams into a single output stream.
"""
api_version = '1.0'
component_version = '1.1'
capabilities = {
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 2,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : ComponentAPI.UNLIMITED_VIEWS,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : 1,
}
component_description = "Merges multiple, input streams into a single output stream."
component_label = "Mixer"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/Mixer" % \
version_info.doc_uri_version
def create_resource_template(self):
"""
Create Mixer resource definition template.
No parameters are needed.
"""
pass
def validate(self, err_obj):
"""
Check the validity of a proposed resdef for this component.
The only requirement is that the input views and output view
all have the same definition.
"""
input_view_names = self.list_input_view_names()
output_view_names = self.list_output_view_names()
output_view_name = output_view_names[keys.SINGLE_VIEW]
# All output views and input views must match field names and types. We
# don't care about descriptions.
output_view = self.get_output_view_def(output_view_name)
output_view_fields = output_view[keys.VIEW_FIELDS]
# Make sure the field count matches before we iterate over the fields.
for input_view_name in input_view_names:
input_view = self.get_input_view_def(input_view_name)
if len(input_view[keys.VIEW_FIELDS]) != len(output_view_fields):
err_obj.get_input_view_err()[input_view_name].set_message(
"Output view '%s' field count '%d' does not match corresponding input view '%s' field count '%d'."
% (output_view_name, len(output_view_fields), input_view_name, len(input_view[keys.VIEW_FIELDS])))
else:
# Each field name and type matches?
for i, input_field in enumerate(input_view[keys.VIEW_FIELDS]):
input_field_name = input_field[keys.FIELD_NAME]
input_field_type = input_field[keys.FIELD_TYPE]
output_field_name = output_view_fields[i][keys.FIELD_NAME]
output_field_type = output_view_fields[i][keys.FIELD_TYPE]
if input_field_name != output_field_name:
err_obj.get_input_view_err()[input_view_name][keys.VIEW_FIELDS][i].set_message(
"Output view '%s' field name '%s' does not match corresponding input view '%s' field name '%s'."
% (output_view_name, output_field_name, input_view_name, input_field_name))
elif input_field_type != output_field_type:
err_obj.get_input_view_err()[input_view_name][keys.VIEW_FIELDS][i].set_message(
"Output view '%s' field '%s' type '%s' does not match corresponding input view '%s' field '%s' type '%s'."
% (output_view_name, output_field_name, output_field_type,
input_view_name, input_field_name, input_field_type))
def execute(self, input_views, output_views):
"""
Execute the merge sorting of multiple sorted input streams.
"""
# Some initial sanity checking
active_input_views = [] + input_views.values()
if not active_input_views:
raise SnapComponentError("No input view connected.")
try:
output_view = output_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No output view connected.")
# Preparation for the processing task.
while active_input_views:
views_with_data = self.select_input_view(active_input_views)
finished_view_list = []
# Get the records from all the views that currently have data
for view in views_with_data:
record = view.read_record()
if not record:
# We get None as a record, if the view is closing on the
# other side. We are keeping track of those that have
# closed in this round, and create a reverse sorted list
# of their indices in the active_input_view list.
# We can't just simply erase them right now out of the
# list since that would screw up our iteration over the list.
finished_view_list.append(view)
continue
output_view.write_record(record)
# Any views that have finished in the last round?
for view in finished_view_list:
# Now we can delete them (starting from the back) from the
# list of active views.
active_input_views.remove(view)
output_view.completed()
def upgrade_1_0_to_1_1(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
|