# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: writer.py 2428 2008-04-11 21:04:43Z kurt $
"""
Contains the SnapStream Writer class.
The SnapStream Writer defines an interface for writing to a SnapStream HTTP connection.
"""
from snaplogic.common.snap_exceptions import *
from snaplogic.common import snap_http_lib
from snaplogic.common.snapstream.stream_base import StreamBase
from snaplogic.common.snapstream.selectable_object_pipe import SelectableObjectPipe
class Writer(StreamBase):
def __init__(self, url='Unspecified', view_name='Unspecified'):
super(Writer, self).__init__(url, view_name)
self._pipe = None
self._end_of_records = False
self.field_numbers = self._process_uri_params(snap_http_lib.get_params_from_uri(url))
def _process_uri_params(self, params):
"""
Get field numbers specified as parameter in the request URL.
This is specified, when the downstream input view is requesting only a subset of the
output view's fields. This is an optimization to prevent an output view from sending
fields that are not even linked to the downstream input view.
@param params: A param dictionary that might have the fields list param.
@type params: dict
@return: List of field numbers that have been requested by down stream input view.
@rtype: list
"""
try:
strval = params["sn.field_numbers"]
except KeyError:
return None
values = strval.split(',')
num_list = []
for v in values:
if v.find('-') != -1:
vrange = v.split('-')
if len(vrange) != 2:
raise SnapStreamModeError("Field number arg <%s> has invalid range <%s>" %
(strval, v))
try:
lowval = int(vrange[0])
highval = int(vrange[1])
except ValueError:
raise SnapStreamModeError("Field number arg <%s> has non numeric range <%s>" %
(strval, v))
if highval <= lowval:
raise SnapStreamModeError("Field number arg <%s> has invalid range <%s>" %
(strval, v))
tmplist = range(lowval, highval + 1)
num_list.extend(tmplist)
else:
try:
val = int(v)
except ValueError:
raise SnapStreamModeError("Field number arg <%s> has non numeric value <%s>" %
(strval, v))
num_list.append(val)
num_list.sort()
return num_list
def write_record(self, record):
"""
Write a record to the stream. This call will block if the transmit buffer becomes full until a network
write frees up room.
@raises SnapStreamIOError: The TCP connection failed.
@raises SnapStreamModeError Stream is in binary mode.
"""
if self.stream_mode != self.RECORD_STREAM:
raise self._exception(SnapStreamModeError, "Attempt to write record to a binary stream.")
self._pipe.put(record)
self._transferred_data(1)
def write_bytes(self, data):
"""
Write raw bytes to a binary stream. This call will block if the transmit buffer becomes full until a
network write frees up room.
@raises SnapStreamIOError: The TCP connection failed.
@raises SnapStreamModeError: Stream is in record mode.
"""
if self.stream_mode != self.BINARY_STREAM:
raise self._exception(SnapStreamModeError, "Attempt to write binary data to a record stream.")
self._pipe.put(data)
self._transferred_data(len(data))
def end_of_records(self):
"""
This method signals the end of records to the underlying RP. It should only be called when all records
have been sent via Writer.write_record(). If this method is not called prior to Writer.close(), the
receiving endpoint will consider the close to be erroneous (i.e. the component failed). Calling this
method for a binary stream is an error.
@raises SnapStreamIOError: The TCP connection failed.
@raises SnapStreamModeError: Stream is in record mode.
"""
if self.stream_mode != self.RECORD_STREAM:
raise self._exception(SnapStreamModeError,
"Calling end_of_records() on a binary stream is not allowed.")
self._end_of_records = True
def close(self):
"""
Close the underlying connection. In the case of a record-mode Writer stream, the
Writer.end_of_stream() method should be called prior to this method to indicate a successful
close. Otherwise, the receiving endpoint will consider it an erroneous close such as a Component run
failure.
"""
if self.stream_mode is not self.RECORD_STREAM or self._end_of_records:
self._pipe.close()
else:
self._pipe.abort()
|