Mixer.py :  » Development » SnapLogic » snaplogic » components » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » components » Mixer.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id: Mixer.py 10330 2009-12-24 22:13:38Z grisha $

"""
Component to combine multiple identical input views into a single output view.

Each input view is combined into a single output view. There is no defined order in the output stream.
Whichever input stream has an available record first will be sent to the output stream.

All input and output views must by identical, meaning that they contain fields of the same type and name in
the same order.

"""

__docformat__ = "epytext en"

from snaplogic.snapi_base import keys
from snaplogic.common.data_types import Record
from snaplogic.common import version_info
from snaplogic.cc.component_api import ComponentAPI
import snaplogic.cc.prop as prop
from snaplogic import components
from snaplogic.common.snap_exceptions import *


# Public names 
__all__ = [ "Mixer" ]


class Mixer(ComponentAPI):
    """
    A component to merge multiple input streams into a single output stream.

    """

    api_version = '1.0'
    component_version = '1.1'
    
    capabilities = {
        ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT    : 2,
        ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT    : ComponentAPI.UNLIMITED_VIEWS,
        ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT   : 1,
        ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT   : 1,
    }
    
    component_description = "Merges multiple, input streams into a single output stream."
    component_label       = "Mixer"
    component_doc_uri     = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/Mixer" % \
                                                        version_info.doc_uri_version


    def create_resource_template(self):
        """
        Create Mixer resource definition template.

        No parameters are needed.

        """
        pass


    def validate(self, err_obj):
        """
        Check the validity of a proposed resdef for this component.

        The only requirement is that the input views and output view
        all have the same definition.

        """
        input_view_names  = self.list_input_view_names()
        output_view_names = self.list_output_view_names()
        output_view_name = output_view_names[keys.SINGLE_VIEW]
                
        # All output views and input views must match field names and types. We
        # don't care about descriptions.
        output_view = self.get_output_view_def(output_view_name)
        output_view_fields = output_view[keys.VIEW_FIELDS]

        # Make sure the field count matches before we iterate over the fields.
        for input_view_name in input_view_names:
            input_view = self.get_input_view_def(input_view_name)
            if len(input_view[keys.VIEW_FIELDS]) != len(output_view_fields):
                err_obj.get_input_view_err()[input_view_name].set_message(
                    "Output view '%s' field count '%d' does not match corresponding input view '%s' field count '%d'."
                    % (output_view_name, len(output_view_fields), input_view_name, len(input_view[keys.VIEW_FIELDS])))                                                                          
            else:
                # Each field name and type matches? 
                for i, input_field in enumerate(input_view[keys.VIEW_FIELDS]):
                    input_field_name = input_field[keys.FIELD_NAME]
                    input_field_type = input_field[keys.FIELD_TYPE]
                    output_field_name = output_view_fields[i][keys.FIELD_NAME]
                    output_field_type = output_view_fields[i][keys.FIELD_TYPE]
                    if input_field_name != output_field_name:
                        err_obj.get_input_view_err()[input_view_name][keys.VIEW_FIELDS][i].set_message(
                            "Output view '%s' field name '%s' does not match corresponding input view '%s' field name '%s'."
                            % (output_view_name, output_field_name, input_view_name, input_field_name))                                                                                                                                   
                    elif input_field_type != output_field_type:
                        err_obj.get_input_view_err()[input_view_name][keys.VIEW_FIELDS][i].set_message(
                            "Output view '%s' field '%s' type '%s' does not match corresponding input view '%s' field '%s' type '%s'."                                                                                                       
                            % (output_view_name, output_field_name, output_field_type,
                               input_view_name, input_field_name, input_field_type))

    def execute(self, input_views, output_views):
        """
        Execute the merge sorting of multiple sorted input streams.

        """
        # Some initial sanity checking
        
        active_input_views = [] + input_views.values()
        if not active_input_views:
            raise SnapComponentError("No input view connected.")

        try:
            output_view = output_views.values()[keys.SINGLE_VIEW] 
        except IndexError:
            raise SnapComponentError("No output view connected.")


        # Preparation for the processing task.
        while active_input_views:
            views_with_data = self.select_input_view(active_input_views)
            finished_view_list = []
            # Get the records from all the views that currently have data
            for view in views_with_data:
                record = view.read_record()
                if not record:
                    # We get None as a record, if the view is closing on the
                    # other side. We are keeping track of those that have
                    # closed in this round, and create a reverse sorted list
                    # of their indices in the active_input_view list.
                    # We can't just simply erase them right now out of the
                    # list since that would screw up our iteration over the list.
                    finished_view_list.append(view)
                    continue
                output_view.write_record(record)

            # Any views that have finished in the last round?
            for view in finished_view_list:
                # Now we can delete them (starting from the back) from the
                # list of active views.
                active_input_views.remove(view)
                
        output_view.completed()

    def upgrade_1_0_to_1_1(self):
        """
        No-op upgrade only to change component doc URI during the upgrade
        which will be by cc_info before calling this method.
        
        """
        pass
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.