# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: selectable_rp_pipe.py 7930 2009-06-22 22:00:28Z dmitri $
"""
Contains the class SelectableRPPipe.
SelectableRPPipe is an implementation of SelectablePipe used to not only transfer data between streams
but conveniently perform an RP translation on it.
"""
from snaplogic.common.snap_exceptions import *
from snaplogic.common.snapstream.string_buffer import StringBuffer
from snaplogic.common.snapstream.selectable_object_pipe import SelectableObjectPipe
from snaplogic.common.snapstream.selectable_binary_pipe import SelectableBinaryPipe
from snaplogic.common.snap_exceptions import *
class SelectableRPReaderPipe(SelectableObjectPipe):
"""
A hybrid binary to object pipe with an automatic RP conversion.
This hybrid class of a binary and object SelectablePipe uses an RP Reader object to transform an incoming
binary stream that is RP-parseable into its object stream automatically. As the binary data is put into
the pipe, it is sent to the RP Reader that the object was initialized with. Objects are pulled from the RP
as enough data binary data arrives to make a full object.
"""
def __init__(self, rp):
super(SelectableRPReaderPipe, self).__init__()
if not rp.Reader.supports_non_blocking_read():
raise SnapObjTypeError("SelectableRPReaderPipe requires a non-blocking RP Reader object.")
self._input_buffer = StringBuffer()
self._reader = rp.Reader(self._input_buffer)
def _put(self, data):
self._input_buffer.write(data)
self._buffer.extend(self._reader.read_nb())
class SelectableRPWriterPipe(SelectableBinaryPipe):
"""
A hybrid object to binary pipe with an automatic RP conversion.
This hybrid class of a binary and object SelectablePipe uses an RP Writer object to transform an incoming
object stream into its binary stream automatically by the pipe consumer. As the object data is put into
the pipe, it is sent to the RP Writer that the object was initialized with. Binary data can be read from the
pipe for each object that was already written to the pipe.
"""
def __init__(self, rp):
super(SelectableRPWriterPipe, self).__init__()
self._writer = rp.Writer(self._buffer)
self._writer.initialize()
def _put(self, item):
self._writer.write(item)
def close(self):
# This method must be synchronized, just like all the rest of the "public" methods
# in pipes, otherwise one thread may be closing the pipe while another thread
# is reading or writing to it.
self._cond.acquire()
try:
if self._open:
self._writer.end()
# Note that we're calling the parent's method, which also
# obtains the lock (that's why we use a reentrant lock).
super(SelectableRPWriterPipe, self).close()
finally:
self._cond.release()
|