# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: reader.py 2428 2008-04-11 21:04:43Z kurt $
"""
Contains the SnapStream Reader class.
"""
from snaplogic.common.snap_exceptions import *
from snaplogic.common.snapstream.stream_base import StreamBase
class Reader(StreamBase):
"""
Base class for SnapStream reader classes.
Reader is a base class for implementing the receiving side protocol of SnapStream for reading record-based
or binary data from a remote location for both inter-component and external client communication.
Children of this class should implement
"""
def __init__(self, url='Unspecified', view_name='Unspecified'):
super(Reader, self).__init__(url, view_name)
self._pipe = None
def _set_block_size(self, size):
self._block_size = size
self._pipe.block_size = size
block_size = property(lambda self: self._pipe.block_size,
_set_block_size)
def read_record(self):
"""
Read a record from the stream.
This call will block if a record is not immediately available.
@return: A list of field values for the next record or None for EOF.
@rtype: list
@raises SnapStreamIOError: The TCP connection failed.
@raises SnapStreamModeError: Stream is in binary mode.
"""
if self.stream_mode != self.RECORD_STREAM:
raise self._exception(SnapStreamModeError, "Attempt to read record from a binary stream.")
# XXX Deal with aborted stream
record = self._pipe.get()
if record is not None:
self._transferred_data(1)
return record
def read_bytes(self, size=None):
"""
Read raw bytes from a binary stream. This call will block if there is not enough data to fill a buffer
of size bytes. However, if the end of stream is reached before size bytes are read, whatever was
successfully read is returned. If no bytes remain, None is returned.
@param size: The number of bytes to read or None to use the current block size.
@type size: int
@return: A buffer of size bytes or less if EOF is reached. Once no data remains, None is returned.
@rtype: string
@raises SnapStreamIOError: The TCP connection failed.
@raises SnapStreamModeError: Stream is in record mode.
"""
if self.stream_mode != self.BINARY_STREAM:
raise self._exception(SnapStreamModeError, "Attempt to read binary data from a record stream")
if size is not None:
# Temporarily change the block size.of pipe
self._pipe.block_size = size
data = self._pipe.get()
if data is not None:
self._transferred_data(len(data))
if size is not None:
# Need to change the block size back to default
self._pipe.block_size = self.block_size
return data
def close(self):
"""
Close the underlying connection. In the case of a record-mode Writer stream, the
Writer.end_of_stream() method should be called prior to this method to indicate a successful
close. Otherwise, the receiving endpoint will consider it an erroneous close such as a Component run
failure.
"""
# XXX Currently this doesn't do anything since the pipe is attached to a WSGI request that doesn't close
# XXX until the thread request thread terminates.
pass
|