# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: output_view.py 7261 2009-04-16 01:05:17Z dhiraj $
from snaplogic.common.data_tracing import DataTracer
from snaplogic.common.snap_exceptions import *
from snaplogic.common.data_types import Record
class OutputViewBase(object):
"""
This base class defines some of the interfaces needed by a component to write to an output view.
This base class is derived from to implement the full interface needed by components. We have this
class hierarchy, so that one derived class can be implemented for use by the CC and another derived
class can be used by the "CC simulator" that can be used to unit test components.
"""
def __init__(self, view_name, fields = None, pass_through_inputs = None, validate_recs = False):
"""
Initialize output view object.
@param view_name: Name of the output view
@type view_name: str
@param fields: sequence of 3 tuples, where each tuple consists of (name, type, field documentation)
@type fields: sequence of tuples
@param pass_through_inputs: List of input view names, specified for this output view (if any).
@type pass_through_inputs: List of str
@param validate_recs: Set to True, if outgoing record must be validated.
@type validate_recs: bool
"""
self.name = view_name
self.is_binary = False
self.record_count = 0
self.byte_count = 0
self.is_binary = fields is None
if self.is_binary:
# Mark write_record method as not usable in binary mode view.
self.write_record = self.error_binary
self.end_record_stream = self.error_binary
else:
self.fields = fields
self.field_names = []
self.field_types = []
for f in self.fields:
self.field_names.append(f[0])
self.field_types.append(f[1])
self._python_types_map = Record.create_python_types_map(self.field_names, self.field_types)
self._python_types_list = [self._python_types_map[fname] for fname in self.field_names ]
self.field_names = tuple(self.field_names)
self.field_types = tuple(self.field_types)
self.pass_through_inputs = pass_through_inputs
self._idx_list = range(len(self.field_names))
self._validate_field_type = validate_recs
# Mark write_binary and list_binary_content_types methods as not usable in record mode.
self.write_binary = self.error_record
self.list_binary_content_types = self.error_record
self._is_closed = False
def error_record(self, *args, **kargs):
"""Generates an error message for calling invalid method for record mode. The params are ignored."""
raise SnapObjTypeError("This method is not supported in record mode view")
def error_binary(self, *args, **kargs):
"""Generates an error message for calling invalid method for binary mode. The params are ignored."""
raise SnapObjTypeError("This method is not supported in binary mode view")
def list_binary_content_types(self):
"""
Return the list of content types that been negotiated by the streams of this output view.
This method is only valid for binary mode views.
@return: List of content types that have been negotiated by the streams.
@rtype: list
"""
return self._content_type_map.keys()
def create_record(self):
"""
Create a record that matches the fields defined by this output view.
@return: Empty record with appropriate structure for this output view.
@rtype: L{Record}
"""
return Record(self.name, self.field_names, self._python_types_map)
class OutputView(OutputViewBase):
"""
This class provides the interface needed by a component to write to an output view.
An output view can be in record mode or binary mode. By default, views are in record mode
and send data in a record structure, via the method write_record(). In binary mode,
the data is treated as a string containing binary data and are written via the method
write_binary().
NOTE: A view at its definition time is defined as record or binary and this does not
change at runtime.
"""
def __init__(self, view_name, snap_stream_list, fields = None, pass_through_inputs = None, validate_recs = False,
trace_flag = False, fq_res_name = None, top_pipe_id = None):
"""
Initialize output view object.
@param view_name: Name of the output view
@type view_name: str
@param snap_stream_list: List of snap_stream objects associated with the output view.
@type snap_stream_list: list
@param fields: sequence of 3 tuples, where each tuple consists of (name, type, field documentation)
@type fields: sequence of tuples
@param pass_through_inputs: List of input view names, specified for this output view (if any).
@type pass_through_inputs: List of str
@param validate_recs: Set to True, if outgoing record must be validated.
@type validate_recs: bool
@param trace_flag: If set to True, then records written out will be dumped into a trace file. Default is False.
@type trace_flag: bool
@param fq_res_name: Fully qualified resource name (the names of enclosing pipelines prefixed to it in dotted
notation format.
@type fq_res_name: str
@param top_pipe_id: Runtime id of the topmost enclosing pipeline which triggered the execution of this resource
@type top_pipe_id: str
"""
super(OutputView, self).__init__(view_name, fields, pass_through_inputs, validate_recs)
self.snap_stream_list = snap_stream_list
self.tracer = DataTracer() if trace_flag else None
mode = None
for sstream in snap_stream_list:
if mode is not None:
# Make sure all streams in this view are in the same mode.
if mode != sstream.stream_mode:
raise SnapValueError("Output view \"%s\" has some streams as binary and others as record" %
self.name)
else:
mode = sstream.stream_mode
if self.is_binary:
if mode != sstream.BINARY_STREAM:
raise SnapValueError("Output view \"%s\" is in binary mode, but the streams are not." % self.name)
if self.tracer is not None:
self.tracer.begin_binary_tracing(top_pipe_id, fq_res_name, "out", self.name)
# Create a dictionary mapping the content types to the streams.
self._content_type_map = {}
for ss in self.snap_stream_list:
ctype = ss.content_type
if ctype not in self._content_type_map:
self._content_type_map[ctype] = [ss]
else:
self._content_type_map[ctype].append(ss)
else:
if mode != sstream.RECORD_STREAM:
raise SnapValueError("Output view \"%s\" is in record mode, but the streams are not." % self.name)
if self.tracer is not None:
self.tracer.begin_record_tracing(top_pipe_id, fq_res_name, "out", self.name)
def write_record(self, record):
"""
Writes the record to all the streams associated with the output view.
This method is only valid for record mode views.
@param record: The record that needs to be written.
@type record: L{Record}
"""
# We want to get None for fields that are not set.
raw_rec = [record.get(f) for f in record.field_names]
if self._validate_field_type:
i = 0
for f in raw_rec:
if f is not None and type(f) != self._python_types_list[i]:
raise SnapObjTypeError("Field '%s' for output view '%s' had wrong data type '%s'. Expected '%s'. "
"Please refer to https://www.snaplogic.org/trac/ticket/1355 for more details."
% (self.field_names[i], self.name, type(f).__name__,
self._python_types_list[i].__name__))
i += 1
if self.pass_through_inputs is not None:
for inp_name in self.pass_through_inputs:
raw_rec += record._internal_pass_through[inp_name]
for inp_name in self.pass_through_inputs:
raw_rec += record._external_pass_through[inp_name]
if self.tracer is not None:
self.tracer.trace_record(raw_rec)
for sstream in self.snap_stream_list:
if sstream.field_numbers != None:
# Specific field numbers have been requested by the downstream component
out_rec = [ field for (i, field) in enumerate(raw_rec) if i in sstream.field_numbers]
sstream.write_record(out_rec)
else:
sstream.write_record(raw_rec)
self.record_count += 1
def write_binary(self, binary_data, content_type):
"""
Write binary data to all the streams associated with the view that have negotiated to that content type.
@param binary_data: The data to be written out.
@type binary_data: str
@param content_type: The content type being written. Please make sure that only the content_type in the
list returned by get_binary_content_type() is used in this param.
@type content_type: str
@raise KeyError: If the specified content_type does not exist in any of the streams. Please make sure that
only the content_type in the list returned by get_binary_content_type() is used to avoid this
exception.
"""
if self.tracer is not None:
self.tracer.trace_bytes(binary_data)
for sstream in self._content_type_map[content_type]:
sstream.write_bytes(binary_data)
self.byte_count += len(binary_data)
def completed(self):
"""
Output to the view has been successfully completed.
While calling this in general after sending all the data is a good practice, it is actually
a "must" to call this for a record mode output view, otherwise the downstream components will
assume an error has occured.
"""
if not self.is_binary:
for sstream in self.snap_stream_list:
sstream.end_of_records()
self._close()
def _close(self):
"""
Close and do cleanup of the view object.
This method is meant for internal use by the CC.
"""
if self._is_closed:
return
if self.tracer is not None:
self.tracer.end_tracing()
for sstream in self.snap_stream_list:
sstream.close()
self._is_closed = True
|