# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: input_view.py 7261 2009-04-16 01:05:17Z dhiraj $
from snaplogic.common.data_tracing import DataTracer
from snaplogic.common.snap_exceptions import *
from snaplogic.common.data_types import Record
class InputViewBase(object):
"""
This base class defines some of the interfaces needed by a component to read from an input view.
This base class is derived from to implement the full interface needed by components. We have this
class hierarchy, so that one derived class can be implemented for use by the CC and another derived
class can be used by the "CC simulator" that can be used to unit test components.
"""
def __init__(self, view_name, fields = None, content_type = None, is_pass_thru = False):
"""
Initialize the input view object.
@param view_name: Name of the input view
@type view_name: str
@param fields: sequence of 3 tuples, where each tuple consists of (name, type, field documentation)
@type fields: sequence of tuples
@param content_type: If binary view, this is set to the content types supported by the view.
@type content_type: str
@param is_pass_thru: Set to True, if the input view is pass through, False otherwise.
@type is_pass_thru: bool
"""
self.name = view_name
self.is_pass_through = is_pass_thru
self.record_count = 0
self.byte_count = 0
self.is_binary = fields is None
if self.is_binary:
self.read_record = self._error_binary
self._blk_sz = 500
self.content_type = content_type
else:
self.read_binary = self._error_record
self.get_binary_content_type = self._error_record
self.set_binary_block_size = self._error_record
self.fields = fields
self.field_names = []
self.field_types = []
for f in self.fields:
self.field_names.append(f[0])
self.field_types.append(f[1])
self.field_names = tuple(self.field_names)
self.field_types = tuple(self.field_types)
self._python_types_map = Record.create_python_types_map(self.field_names, self.field_types)
self._is_closed = False
def _error_record(self, *args, **kargs):
"""Generate an error message for calling invalid method for record mode. The params are ignored."""
raise SnapObjTypeError("This method is not supported in record mode view")
def _error_binary(self, *args, **kargs):
"""Generate an error message for calling invalid method for binary mode. The params are ignored."""
raise SnapObjTypeError("This method is not supported in binary mode view")
def set_binary_block_size(self, num_bytes):
"""
Set the number of bytes that define a binary block size.
This becomes the default size that is read in from this binary view. This method is only valid for
binary mode views.
@param num_bytes: The number of bytes in a binary block.
@type num_bytes: int
"""
num_bytes = int(num_bytes)
if num_bytes < 0:
raise SnapValueError("The value for binary block size is not positive (%s)" % num_bytes)
self._blk_sz = num_bytes
def get_binary_content_type(self):
"""
Return the content type that has been negotiated by the stream of this input view.
This method is only valid for binary mode views.
@return: Content type that have been negotiated by the stream.
@rtype: str
"""
return self.snap_stream.content_type
def _close(self):
"""
Close and cleanup this input view.
This method is meant for the internal use of the CC.
"""
raise SnapException("Internal Error - This method has not been implemented")
class InputView(InputViewBase):
"""
This class provides the interface needed by a component to read from an input view.
An input view can have only one stream associated with it. The view can be in record
mode or binary mode. By default, views are in record mode and data is read from them
using the read_record() method which returns a Record object. In binary mode,
the data is returned as string containing binary data and read_binary() method is
used for this purpose.
NOTE: A view at its definition time is defined as record or binary and this does not
change at runtime.
"""
def __init__(self, view_name, sstream, fields = None, field_mapper = None, null_field_indices=None,
is_pass_thru = False, trace_flag = False, fq_res_name = None, top_pipe_id = None):
"""
Initialize the input view object.
@param view_name: Name of the input view
@type view_name: str
@param sstream: SnapStream object for that input view object
@type sstream: L{SnapStream}
@param fields: sequence of 3 tuples, where each tuple consists of (name, type, field documentation)
@type fields: sequence of tuples
@param null_field_indices: List of index numbers of fields that need to be hard coded to None.
@type null_field_indices: list
@param is_pass_thru: Set to True, if the input view is pass through, False otherwise.
@type is_pass_thru: bool
@param trace_flag: If set to True, then records written out will be dumped into a trace file. Default is False.
@type trace_flag: bool
@param fq_res_name: Fully qualified resource name (the names of enclosing pipelines prefixed to it in dotted
notation format.
@type fq_res_name: str
@param top_pipe_id: Runtime id of the topmost enclosing pipeline which triggered the execution of this resource
@type top_pipe_id: str
"""
super(InputView, self).__init__(view_name, fields, sstream.content_type, is_pass_thru)
self.snap_stream = sstream
self.tracer = DataTracer() if trace_flag else None
if self.is_binary:
if self.tracer is not None:
self.tracer.begin_binary_tracing(top_pipe_id, fq_res_name, "in", self.name)
self.content_type = sstream.content_type
else:
self._field_mapper = field_mapper # Do this to reduce the number of ".". Saves cycles.
self._mapper_len = len(self._field_mapper) # Precompute to save CPU cycles.
self._null_field_indices = null_field_indices if null_field_indices is not None else []
if self.tracer is not None:
self.tracer.begin_record_tracing(top_pipe_id, fq_res_name, "in", self.name)
def read_record(self):
"""
Read a record from the input view.
This method can only be called for a record mode view. The method blocks until the record is
read or the stream ends. If the stream ends, then None is returned back to the caller.
@return: Record or None.
@rtype: L{Record} or None.
"""
raw_rec = self.snap_stream.read_record()
if raw_rec is None:
return None
if self.tracer is not None:
self.tracer.trace_record(raw_rec)
record = Record(self.name, self.field_names, self._python_types_map)
for idx in self._null_field_indices:
# Set to None all fields that are supposed to be hard coded to None.
record[record.field_names[idx]] = None
if self.is_pass_through:
external_pt = record._external_pass_through[self.name] = []
internal_pt = record._internal_pass_through[self.name] = []
i = 0
for raw_field in raw_rec:
if i >= self._mapper_len:
external_pt.append(raw_field)
elif self._field_mapper[i] is not None:
for o in self._field_mapper[i]:
record[record.field_names[o]] = raw_field
else:
# It is mentioned in the mapper, but mentioned as unlinked, hence, internal pass through.
internal_pt.append(raw_field)
i += 1
else:
i = 0
while i < self._mapper_len:
for o in self._field_mapper[i]:
record[record.field_names[o]] = raw_rec[i]
i += 1
self.record_count += 1
return record
def read_binary(self, num_bytes = None):
"""
Read binary data from the input view.
This method can only be called for a binary mode view. The method blocks until the requested size is
read or the stream ends. If the stream ends, then None is returned back to the caller. If the caller
has not specified a size, then the size of a binary block is read. This size is 500 by default and
can be adjusted by the method set_binary_block().
@param num_bytes: The number of bytes of binary data to read (optional)
@type num_bytes: int or None
@return: Binary string or None.
@rtype: str or None.
"""
if num_bytes is not None:
bin_str = self.snap_stream.read_bytes(num_bytes)
else:
bin_str = self.snap_stream.read_bytes(self._blk_sz)
if bin_str is not None:
self.byte_count += len(bin_str)
if self.tracer is not None:
self.tracer.trace_bytes(bin_str)
return bin_str
def _close(self):
"""
Close and cleanup this input view.
This method is meant for the internal use of the CC.
"""
self.snap_stream.close()
self._is_closed = True
|