MergeSorter.py :  » Development » SnapLogic » snaplogic » components » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » components » MergeSorter.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id: MergeSorter.py 10330 2009-12-24 22:13:38Z grisha $

"""
MergeSorter Module and Resource Definition.

This module provides functionality to merge sorted streams of
records. It is important to remember that this component has
been specifically designed to merge streams that are already sorted!
No attempt should be made to process non-sorted input streams.

The component attempts to merge on the go. Records are received from
all input views, merged and written out. Contrary to other sort
components, it does not use a database. This allows the merge component
to work on very large data streams, where it would be infeasible to
use the DB approach.

On the other hand, the merge component now is required to perform its
own internal buffering, if one of the streams is blocked, while the
other streams continue to provide records.

All views (input and output) have to be exactly the same. Fields have
to have the same names and types.

In theory, the definition of a sort order should not be necessary
(in conjunction with the definition of sort fields). After all, it 
should be possible to derive the sort order of the streams by
examining the first records of any one of the streams. However,
in case of streams that contain equal records, or just one record,
this is not feasible. Therefore, the sort order is still a requirement.

"""

__docformat__ = "epytext en"

from snaplogic.common.data_types import Record
from snaplogic.common import version_info
from snaplogic.cc import component_api
from snaplogic.cc.component_api import ComponentAPI
import snaplogic.cc.prop as prop
from snaplogic import components
from snaplogic.common.snap_exceptions import *
from snaplogic.snapi_base import keys


# Public names 
__all__ = [ "MergeSorter" ]

# Definitions for the sort specs
SORT_FIELD = "Sort field"
SORT_ORDER = "Sort order"
SORT_SPEC  = "Sort spec"
SORT_SPECS = "Sort specs"

#
# Some helper functions
#

def _compare_records(rec1, rec2, sort_fields):
    """
    Compare two records according to the sort descriptor.

    Indicates whether rec1 is less (-1), equal (0) or greater (1)
    than rec2. The sort order is determined via the descriptor
    in sort_fields.

    @param rec1:        One of the two records to compare.
    @type  rec1:        L{SnapRecord}

    @param rec2:        One of the two records to compare.
    @type  rec2:        L{SnapRecord}

    @param sort_fields: Description of the relevant fields and the
                        desired sort order.
    @type  sort_fields: List of tuples, first element in tuple is the
                        name of the relevant field, second element is
                        a boolean flag indicating whether we should use
                        ascending order (True) for a given field or
                        descending order (False).

    @return:            -1, 0 or 1, indicating that rec1 is smaller, equal
                        or greater than rec2
    @rtype:             int

    """
    for (fname, order) in sort_fields:
        f1 = rec1[fname]
        f2 = rec2[fname]
        # Could use a closure, which specifies prepared return
        # values depending on the order, so that we can safe
        # ourselves the comparison to 'order' everytime. For
        # now I leave it as is for clarity's sake.
        if order:
            if f1 < f2:
                return -1
            elif f1 > f2:
                return 1
        else:
            if f1 < f2:
                return 1
            elif f1 > f2:
                return -1
    return 0


def _find_smallest_record(stream_records, merge_fields):
    """
    Find the stream that has seen the smallest last record.

    We need to keep track of which stream has currently seen
    the smallest record. We have a dictionary that maps the
    view/stream names to stored records. This function here
    is called whenever a new record has been received. We check
    which of the streams is now holding the smallest record.

    @param stream_records:  Dictionary that maps view names to
                            stored records.
    @type  stream_records:  Name/record pairs with the view name
                            as lookup key.

    @return: The name of the view/stream with the smallest record.
    @rtype:  string or None, which indicates that we don't have
             entries for all views yet.

    """
    view_names = stream_records.keys()
    sname      = view_names[0]
    smallest   = stream_records[sname]
    if smallest is None:
        return None
    for k in view_names[1:]:
        brec = stream_records[k]
        if brec is None:
            # If anyone else is still None (not initialized yet),
            # then we also cannot determine reliably the smallest
            # one yet.
            return None
        if smallest is None  or  _compare_records(stream_records[k], smallest, merge_fields) == -1:
            smallest = stream_records[k]
            sname = k

    return sname


class MergeSorter(ComponentAPI):
    """
    A component to merge multiple already sorted streams into a single, sorted output stream.

    """

    api_version = '1.0'
    component_version = '1.3'
    
    capabilities = {
        ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT    : 2,
        ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT    : ComponentAPI.UNLIMITED_VIEWS,
        ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT   : 1,
        ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT   : 1,
        ComponentAPI.CAPABILITY_ALLOW_PASS_THROUGH        : False
    }
    
    component_description = "Merges multiple, sorted streams into a single, sorted output stream."
    component_label       = "Merge Sorter"
    component_doc_uri     = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/MergeSorter" % \
                                                        version_info.doc_uri_version


    def create_resource_template(self):
        """
        Create MergeSorter resource definition template.

        We define a 'sort spec'. An individual sort field is specified as a dictionary,
        containing the name of the field as well as its sort order. These small dictionaries
        are then arranged into a list, so that more than one sort field can be specified.

        """
        sort_field = prop.SimpleProp(SORT_FIELD, "string", "Output field to sort on", 
                                     {'lov': [ keys.CONSTRAINT_LOV_OUTPUT_FIELD] }, required=True)
        sort_order = prop.SimpleProp(SORT_ORDER, "string", "Sort order (asc or desc)",
                                     { "lov" : [ 'asc', 'desc' ] }, required=True)
        sort_spec  = prop.DictProp(SORT_SPECS, sort_field, "describe me", 2, required=True)
        sort_spec[SORT_FIELD] = sort_field
        sort_spec[SORT_ORDER] = sort_order
        sort_specs = prop.ListProp("Sort specifications", sort_spec, required=True)
        self.set_property_def(SORT_SPECS, sort_specs)


    def validate(self, err_obj):
        """
        Check the validity of a proposed resdef for this component.

        """
        in_view_names  = self.list_input_view_names()
        out_view_names = self.list_output_view_names()
        
        # All input views must be identical
        first_view = self.get_input_view_def(in_view_names[0])
        inputs_identical = True
        for view_name in in_view_names[1:]:
            view = self.get_input_view_def(view_name)
            if view['fields'] != first_view['fields']:
                err_obj.get_input_view_err().set_message("All input views must have the same field definition.")
                inputs_identical = False
                break

        if inputs_identical:
            # The output view must be the same as the input views
            out_view = self.get_output_view_def(out_view_names[0])
            if view['fields'] != out_view['fields']:
                err_obj.get_output_view_err().set_message("Output view must be identical to input views.")
        
        # The views must contain all the fields that are specified in the sort specs
        sort_specs = self.get_property_value(SORT_SPECS)
        if not component_api.has_param(sort_specs):
            sfspec_names = [ s[SORT_FIELD] for s in sort_specs ]
            if not component_api.has_param(sfspec_names):
                view_field_names = [ f[0] for f in first_view['fields'] ]
                not_in = [ sf for sf in sfspec_names if sf not in view_field_names ]
                if not_in:
                   err_obj.get_input_view_err(in_view_names[0]).set_message(
                            "Input view does not contain these sort field(s) specified in the sort spec: %s." % 
                            ', '.join(not_in))

    def execute(self, input_views, output_views):
        """
        Execute the merge sorting of multiple sorted input streams.

        """
        # Some initial sanity checking
        try:
            self._output_view = output_views.values()[keys.SINGLE_VIEW] 
        except IndexError:
            raise SnapComponentError("No output view connected.")

        # We don't need a try/except here because we will probe delicately.
        connected_input_views = input_views.keys()
        defined_input_views = self.list_input_view_names()
        for view in defined_input_views:
            if view not in connected_input_views:
                raise SnapComponentError("Not all input views connected.")

        # Preparation for the processing task.

        # Remove fully qualified prefix, which may be present in the sort field names.
        # For example: 'input1.foobar' becomes 'foobar'.
        self._input_views = input_views.values()
        self._merge_fields = []
        sort_specs = self.get_property_value(SORT_SPECS)
        tmp_merge_fields = [ (s[SORT_FIELD], s[SORT_ORDER]) for s in sort_specs ]
        for sf in tmp_merge_fields:
            fqn_field_name_path = sf[0].split(".")
            if len(fqn_field_name_path) == 2:
                field_name = fqn_field_name_path[1]
            else:
                field_name = sf[0]
            sort_order = sf[1]
            if sort_order.upper() == 'ASC':
                asc_order_flag = True
            else:
                asc_order_flag = False

            # We store the name of the sort field and its sort order in this handy
            # list here, so that we can do comparisons of records based on this information.
            # We have replaced the sort-order string with a boolean value, so that this
            # processing can be done more quickly later on.
            self._merge_fields.append( (field_name, asc_order_flag) )

        self._buffer           = []
        self._last_records     = {}
        # We initialize the last_records dictionary, because the non-presence
        # of a view in that mapping will be used as a sign that this view/stream
        # is finished.
        for v in self.list_input_view_names():
            self._last_records[v] = None

        # Start processing the input values from any input view
        still_active_views = [] + self._input_views
        while still_active_views:
            # Wait until one or more view has some data
            views_with_data = self.select_input_view(still_active_views)
            if not views_with_data:
                # I guess we are all done? Normally we block until data is
                # retrieved, but if we return here with an empty list then
                # there obviously is nothing more to do.
                break

            # Read one record from each of the views that currently have some data
            finished_views = []
            for view in views_with_data:
                record = view.read_record()
                if not record:
                    # This input stream has come to an end (an ended stream is included
                    # in the result list of a select. If all input views have finished,
                    # flush all remaining records to the output view.
                    del self._last_records[view.name]
                    if len(self._last_records) == 0:
                        for r in self._buffer:
                            self._output_view.write_record(r)
                    # Remember the ones that need to be deleted after we are done with
                    # this loop
                    finished_views.append(view)
                    continue

                # Ensure that the records for that view are sorted. This component does
                # not perform actual sorting itself.
                if self._last_records[view.name]  and  _compare_records(self._last_records[view.name], record, self._merge_fields) == 1:
                    raise SnapComponentError("Input records in view '%s' are not sorted." % view.name)

                # Place this new record into our merge buffer
                self._last_records[view.name] = record
                name_of_smallest = _find_smallest_record(self._last_records, self._merge_fields)
            
                # We do a binary search to find the right place for the new record in our sorted buffer.
                lo = 0
                hi = len(self._buffer)
                while lo < hi:
                    mid = (lo+hi)//2
                    if _compare_records(record, self._buffer[mid], self._merge_fields) == -1:
                        hi = mid
                    else:
                        lo = mid+1
                self._buffer.insert(lo, record)

                if name_of_smallest is not None:
                    self._flush_buffer(self._last_records[name_of_smallest])

            # Any views that have finished in the last round?
            for i in finished_views:
                still_active_views.remove(i)

        self._output_view.completed()


    def _flush_buffer(self, smallest_record):
        """
        Flush as much of our record buffer as possible.

        This writes as many entries out of the buffer as
        possible, thereby also reducing the buffer size.
        The buffer is a sorted list, and consequently, we
        are writing (and removing) entries from the start
        of the list.

        How much can be written? That depends on the smallest
        record we have seen so far from a still active input
        stream. Since the input streams themselves are sorted,
        we can assume that we will not see a new value in the
        future on any stream, which is smaller (!) than the
        smallest value we are currently seeing on any stream.

        Therefore, we can write all entries that are smaller
        or equal to the smallest one we have last seen from
        any input stream.

        The smallest last seen record is passed to us. We will
        write all buffer entries that are smaller or euqual to
        that.

        @param smallest_record: The record up to which (and
                                including) we can flush the buffer.
        @type  smallest_record: L{SnapRecord}

        """

        for (idx, rec) in enumerate(self._buffer):
            if _compare_records(rec, smallest_record, self._merge_fields) == 1:
                # We have reached larger records, thus we can stop here
                del self._buffer[0:idx]
                return
            self._output_view.write_record(rec)

        # Looks like we were able to flush the whole thing
        self._buffer = []
        
    def upgrade_1_0_to_1_1(self):
        """
         Add source constraint to Field property
         
        """
        
        # Save the property value.
        # We need to recreate the property, which resets the value
        property_value = self.get_property_value(SORT_SPECS)

        sort_field = prop.SimpleProp(SORT_FIELD, "string", "Input field to sort on", 
                                     {'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] }, required=True)
        sort_order = prop.SimpleProp(SORT_ORDER, "string", "Sort order (asc or desc)",
                                     { "lov" : [ 'asc', 'desc' ] }, required=True)
        sort_spec  = prop.DictProp(SORT_SPECS, sort_field, "describe me", 2, required=True)
        sort_spec[SORT_FIELD] = sort_field
        sort_spec[SORT_ORDER] = sort_order
        sort_specs = prop.ListProp("Sort specifications", sort_spec, required=True)
        self.set_property_def(SORT_SPECS, sort_specs)
        
        # Restore the value
        self.set_property_value(SORT_SPECS, property_value)

    def upgrade_1_1_to_1_2(self):
        """
         Change constraint on the sort field to reference the output field.
         Because all views in this components must match we could do that. 
         
        """
        
        # Save the property value.
        # We need to recreate the property, which resets the value
        property_value = self.get_property_value(SORT_SPECS)

        sort_field = prop.SimpleProp(SORT_FIELD, "string", "Output field to sort on", 
                                     {'lov': [ keys.CONSTRAINT_LOV_OUTPUT_FIELD] }, required=True)
        sort_order = prop.SimpleProp(SORT_ORDER, "string", "Sort order (asc or desc)",
                                     { "lov" : [ 'asc', 'desc' ] }, required=True)
        sort_spec  = prop.DictProp(SORT_SPECS, sort_field, "describe me", 2, required=True)
        sort_spec[SORT_FIELD] = sort_field
        sort_spec[SORT_ORDER] = sort_order
        sort_specs = prop.ListProp("Sort specifications", sort_spec, required=True)
        self.set_property_def(SORT_SPECS, sort_specs)
        
        # Restore the value
        self.set_property_value(SORT_SPECS, property_value)

    def upgrade_1_2_to_1_3(self):
        """
        No-op upgrade only to change component doc URI during the upgrade
        which will be by cc_info before calling this method.
        
        """
        pass


www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.