# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: MergeSorter.py 10330 2009-12-24 22:13:38Z grisha $
"""
MergeSorter Module and Resource Definition.
This module provides functionality to merge sorted streams of
records. It is important to remember that this component has
been specifically designed to merge streams that are already sorted!
No attempt should be made to process non-sorted input streams.
The component attempts to merge on the go. Records are received from
all input views, merged and written out. Contrary to other sort
components, it does not use a database. This allows the merge component
to work on very large data streams, where it would be infeasible to
use the DB approach.
On the other hand, the merge component now is required to perform its
own internal buffering, if one of the streams is blocked, while the
other streams continue to provide records.
All views (input and output) have to be exactly the same. Fields have
to have the same names and types.
In theory, the definition of a sort order should not be necessary
(in conjunction with the definition of sort fields). After all, it
should be possible to derive the sort order of the streams by
examining the first records of any one of the streams. However,
in case of streams that contain equal records, or just one record,
this is not feasible. Therefore, the sort order is still a requirement.
"""
__docformat__ = "epytext en"
from snaplogic.common.data_types import Record
from snaplogic.common import version_info
from snaplogic.cc import component_api
from snaplogic.cc.component_api import ComponentAPI
import snaplogic.cc.prop as prop
from snaplogic import components
from snaplogic.common.snap_exceptions import *
from snaplogic.snapi_base import keys
# Public names
__all__ = [ "MergeSorter" ]
# Definitions for the sort specs
SORT_FIELD = "Sort field"
SORT_ORDER = "Sort order"
SORT_SPEC = "Sort spec"
SORT_SPECS = "Sort specs"
#
# Some helper functions
#
def _compare_records(rec1, rec2, sort_fields):
"""
Compare two records according to the sort descriptor.
Indicates whether rec1 is less (-1), equal (0) or greater (1)
than rec2. The sort order is determined via the descriptor
in sort_fields.
@param rec1: One of the two records to compare.
@type rec1: L{SnapRecord}
@param rec2: One of the two records to compare.
@type rec2: L{SnapRecord}
@param sort_fields: Description of the relevant fields and the
desired sort order.
@type sort_fields: List of tuples, first element in tuple is the
name of the relevant field, second element is
a boolean flag indicating whether we should use
ascending order (True) for a given field or
descending order (False).
@return: -1, 0 or 1, indicating that rec1 is smaller, equal
or greater than rec2
@rtype: int
"""
for (fname, order) in sort_fields:
f1 = rec1[fname]
f2 = rec2[fname]
# Could use a closure, which specifies prepared return
# values depending on the order, so that we can safe
# ourselves the comparison to 'order' everytime. For
# now I leave it as is for clarity's sake.
if order:
if f1 < f2:
return -1
elif f1 > f2:
return 1
else:
if f1 < f2:
return 1
elif f1 > f2:
return -1
return 0
def _find_smallest_record(stream_records, merge_fields):
"""
Find the stream that has seen the smallest last record.
We need to keep track of which stream has currently seen
the smallest record. We have a dictionary that maps the
view/stream names to stored records. This function here
is called whenever a new record has been received. We check
which of the streams is now holding the smallest record.
@param stream_records: Dictionary that maps view names to
stored records.
@type stream_records: Name/record pairs with the view name
as lookup key.
@return: The name of the view/stream with the smallest record.
@rtype: string or None, which indicates that we don't have
entries for all views yet.
"""
view_names = stream_records.keys()
sname = view_names[0]
smallest = stream_records[sname]
if smallest is None:
return None
for k in view_names[1:]:
brec = stream_records[k]
if brec is None:
# If anyone else is still None (not initialized yet),
# then we also cannot determine reliably the smallest
# one yet.
return None
if smallest is None or _compare_records(stream_records[k], smallest, merge_fields) == -1:
smallest = stream_records[k]
sname = k
return sname
class MergeSorter(ComponentAPI):
"""
A component to merge multiple already sorted streams into a single, sorted output stream.
"""
api_version = '1.0'
component_version = '1.3'
capabilities = {
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 2,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : ComponentAPI.UNLIMITED_VIEWS,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : 1,
ComponentAPI.CAPABILITY_ALLOW_PASS_THROUGH : False
}
component_description = "Merges multiple, sorted streams into a single, sorted output stream."
component_label = "Merge Sorter"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/MergeSorter" % \
version_info.doc_uri_version
def create_resource_template(self):
"""
Create MergeSorter resource definition template.
We define a 'sort spec'. An individual sort field is specified as a dictionary,
containing the name of the field as well as its sort order. These small dictionaries
are then arranged into a list, so that more than one sort field can be specified.
"""
sort_field = prop.SimpleProp(SORT_FIELD, "string", "Output field to sort on",
{'lov': [ keys.CONSTRAINT_LOV_OUTPUT_FIELD] }, required=True)
sort_order = prop.SimpleProp(SORT_ORDER, "string", "Sort order (asc or desc)",
{ "lov" : [ 'asc', 'desc' ] }, required=True)
sort_spec = prop.DictProp(SORT_SPECS, sort_field, "describe me", 2, required=True)
sort_spec[SORT_FIELD] = sort_field
sort_spec[SORT_ORDER] = sort_order
sort_specs = prop.ListProp("Sort specifications", sort_spec, required=True)
self.set_property_def(SORT_SPECS, sort_specs)
def validate(self, err_obj):
"""
Check the validity of a proposed resdef for this component.
"""
in_view_names = self.list_input_view_names()
out_view_names = self.list_output_view_names()
# All input views must be identical
first_view = self.get_input_view_def(in_view_names[0])
inputs_identical = True
for view_name in in_view_names[1:]:
view = self.get_input_view_def(view_name)
if view['fields'] != first_view['fields']:
err_obj.get_input_view_err().set_message("All input views must have the same field definition.")
inputs_identical = False
break
if inputs_identical:
# The output view must be the same as the input views
out_view = self.get_output_view_def(out_view_names[0])
if view['fields'] != out_view['fields']:
err_obj.get_output_view_err().set_message("Output view must be identical to input views.")
# The views must contain all the fields that are specified in the sort specs
sort_specs = self.get_property_value(SORT_SPECS)
if not component_api.has_param(sort_specs):
sfspec_names = [ s[SORT_FIELD] for s in sort_specs ]
if not component_api.has_param(sfspec_names):
view_field_names = [ f[0] for f in first_view['fields'] ]
not_in = [ sf for sf in sfspec_names if sf not in view_field_names ]
if not_in:
err_obj.get_input_view_err(in_view_names[0]).set_message(
"Input view does not contain these sort field(s) specified in the sort spec: %s." %
', '.join(not_in))
def execute(self, input_views, output_views):
"""
Execute the merge sorting of multiple sorted input streams.
"""
# Some initial sanity checking
try:
self._output_view = output_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No output view connected.")
# We don't need a try/except here because we will probe delicately.
connected_input_views = input_views.keys()
defined_input_views = self.list_input_view_names()
for view in defined_input_views:
if view not in connected_input_views:
raise SnapComponentError("Not all input views connected.")
# Preparation for the processing task.
# Remove fully qualified prefix, which may be present in the sort field names.
# For example: 'input1.foobar' becomes 'foobar'.
self._input_views = input_views.values()
self._merge_fields = []
sort_specs = self.get_property_value(SORT_SPECS)
tmp_merge_fields = [ (s[SORT_FIELD], s[SORT_ORDER]) for s in sort_specs ]
for sf in tmp_merge_fields:
fqn_field_name_path = sf[0].split(".")
if len(fqn_field_name_path) == 2:
field_name = fqn_field_name_path[1]
else:
field_name = sf[0]
sort_order = sf[1]
if sort_order.upper() == 'ASC':
asc_order_flag = True
else:
asc_order_flag = False
# We store the name of the sort field and its sort order in this handy
# list here, so that we can do comparisons of records based on this information.
# We have replaced the sort-order string with a boolean value, so that this
# processing can be done more quickly later on.
self._merge_fields.append( (field_name, asc_order_flag) )
self._buffer = []
self._last_records = {}
# We initialize the last_records dictionary, because the non-presence
# of a view in that mapping will be used as a sign that this view/stream
# is finished.
for v in self.list_input_view_names():
self._last_records[v] = None
# Start processing the input values from any input view
still_active_views = [] + self._input_views
while still_active_views:
# Wait until one or more view has some data
views_with_data = self.select_input_view(still_active_views)
if not views_with_data:
# I guess we are all done? Normally we block until data is
# retrieved, but if we return here with an empty list then
# there obviously is nothing more to do.
break
# Read one record from each of the views that currently have some data
finished_views = []
for view in views_with_data:
record = view.read_record()
if not record:
# This input stream has come to an end (an ended stream is included
# in the result list of a select. If all input views have finished,
# flush all remaining records to the output view.
del self._last_records[view.name]
if len(self._last_records) == 0:
for r in self._buffer:
self._output_view.write_record(r)
# Remember the ones that need to be deleted after we are done with
# this loop
finished_views.append(view)
continue
# Ensure that the records for that view are sorted. This component does
# not perform actual sorting itself.
if self._last_records[view.name] and _compare_records(self._last_records[view.name], record, self._merge_fields) == 1:
raise SnapComponentError("Input records in view '%s' are not sorted." % view.name)
# Place this new record into our merge buffer
self._last_records[view.name] = record
name_of_smallest = _find_smallest_record(self._last_records, self._merge_fields)
# We do a binary search to find the right place for the new record in our sorted buffer.
lo = 0
hi = len(self._buffer)
while lo < hi:
mid = (lo+hi)//2
if _compare_records(record, self._buffer[mid], self._merge_fields) == -1:
hi = mid
else:
lo = mid+1
self._buffer.insert(lo, record)
if name_of_smallest is not None:
self._flush_buffer(self._last_records[name_of_smallest])
# Any views that have finished in the last round?
for i in finished_views:
still_active_views.remove(i)
self._output_view.completed()
def _flush_buffer(self, smallest_record):
"""
Flush as much of our record buffer as possible.
This writes as many entries out of the buffer as
possible, thereby also reducing the buffer size.
The buffer is a sorted list, and consequently, we
are writing (and removing) entries from the start
of the list.
How much can be written? That depends on the smallest
record we have seen so far from a still active input
stream. Since the input streams themselves are sorted,
we can assume that we will not see a new value in the
future on any stream, which is smaller (!) than the
smallest value we are currently seeing on any stream.
Therefore, we can write all entries that are smaller
or equal to the smallest one we have last seen from
any input stream.
The smallest last seen record is passed to us. We will
write all buffer entries that are smaller or euqual to
that.
@param smallest_record: The record up to which (and
including) we can flush the buffer.
@type smallest_record: L{SnapRecord}
"""
for (idx, rec) in enumerate(self._buffer):
if _compare_records(rec, smallest_record, self._merge_fields) == 1:
# We have reached larger records, thus we can stop here
del self._buffer[0:idx]
return
self._output_view.write_record(rec)
# Looks like we were able to flush the whole thing
self._buffer = []
def upgrade_1_0_to_1_1(self):
"""
Add source constraint to Field property
"""
# Save the property value.
# We need to recreate the property, which resets the value
property_value = self.get_property_value(SORT_SPECS)
sort_field = prop.SimpleProp(SORT_FIELD, "string", "Input field to sort on",
{'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] }, required=True)
sort_order = prop.SimpleProp(SORT_ORDER, "string", "Sort order (asc or desc)",
{ "lov" : [ 'asc', 'desc' ] }, required=True)
sort_spec = prop.DictProp(SORT_SPECS, sort_field, "describe me", 2, required=True)
sort_spec[SORT_FIELD] = sort_field
sort_spec[SORT_ORDER] = sort_order
sort_specs = prop.ListProp("Sort specifications", sort_spec, required=True)
self.set_property_def(SORT_SPECS, sort_specs)
# Restore the value
self.set_property_value(SORT_SPECS, property_value)
def upgrade_1_1_to_1_2(self):
"""
Change constraint on the sort field to reference the output field.
Because all views in this components must match we could do that.
"""
# Save the property value.
# We need to recreate the property, which resets the value
property_value = self.get_property_value(SORT_SPECS)
sort_field = prop.SimpleProp(SORT_FIELD, "string", "Output field to sort on",
{'lov': [ keys.CONSTRAINT_LOV_OUTPUT_FIELD] }, required=True)
sort_order = prop.SimpleProp(SORT_ORDER, "string", "Sort order (asc or desc)",
{ "lov" : [ 'asc', 'desc' ] }, required=True)
sort_spec = prop.DictProp(SORT_SPECS, sort_field, "describe me", 2, required=True)
sort_spec[SORT_FIELD] = sort_field
sort_spec[SORT_ORDER] = sort_order
sort_specs = prop.ListProp("Sort specifications", sort_spec, required=True)
self.set_property_def(SORT_SPECS, sort_specs)
# Restore the value
self.set_property_value(SORT_SPECS, property_value)
def upgrade_1_2_to_1_3(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
|