# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: __init__.py 3388 2008-06-19 21:11:37Z kurt $
from __future__ import with_statement
"""
Streaming package used within pipeline data flow to pass record or binary streamed data between components and
external client endpoints.
"""
from snaplogic.common.snap_exceptions import *
from snaplogic.common.snapstream import selectable_pipe
from snaplogic.common.snapstream import view_registry
from snaplogic.common.snapstream.pipe_writer import PipeWriter
from snaplogic.common.snapstream.pipe_reader import PipeReader
from snaplogic.common.snapstream.url_reader import URLReader
from snaplogic.common.snapstream import memory
from snaplogic.common.snap_exceptions import *
from snaplogic.common import snap_log
from snaplogic.common.config import snap_config
from snaplogic import cc
OPTIMIZATION_ENABLED = True
MAX_STREAM_BUFFER_SIZE_KEY = "max_stream_buffer_size"
OPTIMIZE_STREAMS_KEY = "optimize_streams"
def init(config):
global OPTIMIZATION_ENABLED
cc_config = config.get_section("cc")
try:
value = int(cc_config[MAX_STREAM_BUFFER_SIZE_KEY])
except ValueError, e:
value = -1
if value <= 0:
raise SnapException.chain(e,
SnapValueError("Config parameter '%s' must be an integer greater than 0." %
MAX_STREAM_BUFFER_SIZE_KEY))
else:
memory.MAX_BUFFER_SIZE = value
OPTIMIZATION_ENABLED = snap_config.parse_bool(cc_config[OPTIMIZE_STREAMS_KEY])
if not OPTIMIZATION_ENABLED:
cc.log(snap_log.LEVEL_WARN, "Stream optimization disabled.")
def select_streams(stream_list, timeout=None):
"""
This is a module-level function capable of providing the resemblance of select() for SnapStream Reader
objects. Given the list of streams stream_list, this call blocks until at least one stream has available
data. The list of all streams with available data are returned. If timeout is not None and no streams
become available within the time limit specified, the empty list is returned instead.
If a stream encounters an error, it will be treated as becoming available and cause this function to
return it with the list of available streams. The error will remain silent until the stream is actually
read by the caller.
@param stream_list: A list of SnapStream Reader objects.
@type stream_list: list of objects.
@param timeout: A floating point value giving a number of seconds to wait before returning without
available streams. If None, this function will wait indefinitely until a stream
becomes available.
@type timeout: float
@return: A list of streams. Each stream object will be available for reading. It is possible the
read will result in an error state if the stream experienced an error during this function
call.
@raises SnapValueError: Timeout is not a floating point value.
"""
# This code assumes that pipe_select() returns a list in the same order as stream_list
pipes = [stream._pipe for stream in stream_list]
avail_pipes = selectable_pipe.pipe_select(pipes, timeout)
avail_streams = []
for i in range(len(pipes)):
if pipes[i] is avail_pipes[0]:
avail_pipes.pop(0)
avail_streams.append(stream_list[i])
if not avail_pipes:
break
return avail_streams
register_output_view = view_registry.register_output_view
"""Convenience alias for L{view_registry.register_output_view}."""
unregister_output_view = view_registry.unregister_output_view
"""Convenience alias for L{view_registry.unregister_output_view}."""
def unregister_output_views(url_list):
"""
Unregister a list of output view URLs.
@param url_list: A list of output view URLs.
@type url_list: list
"""
for url in url_list:
view_registry.unregister_output_view(url)
def open_local_writers(url):
"""
Return a list of writer streams setup for inter-thread communication.
Used as part of the in-process optimization, this method looks for any reader streams that have linked themselves
to this output view URL. The output view must have been previously registered via L{register_output_view}. For
each reader that was linked, a writer object is initialized and returned for it.
This method will automatically unregister the output view URL and is expected to only be called once. It is ok
to call L{unregister_output_view} multiple times on the same URL and will not result in an error.
@param url: The URL of a previously registered output view.
@type url: string
@return: A list of Writer objects for each downstream view linked to the output view at url.
@rtype: list
@raises SnapStreamNegotiationError: Negotiation failed between endpoints.
"""
view = view_registry.get_view(url)
linked_pipes = view.linked_pipes if view is not None else []
writers = []
for link in linked_pipes:
mode = PipeWriter.RECORD_STREAM if link.content_type is None else PipeWriter.BINARY_STREAM
w = PipeWriter(link.pipe, link.content_type, mode, link.url, view.name)
writers.append(w)
return writers
def open_url(url, accept_types=None, view_name='Unspecified'):
"""
Create a SnapStream reader object for an upstream URL output view.
If url points to an output view external to the current process, this method opens an HTTP connection. If url
is local to the current process, an alternate and optimized transfer process is used instead.
If accept_types is not given or None, create record stream. Otherwise, accept_types must be a list of
content type specification strings that detail what supported encodings this side of the connection supports.
@param url: URL of output view to connect to.
@type url: string
@param accept_types: A list of acceptable content type specifiers or None for record-based streams.
@type accept_types: list
@param view_name: The name of the input view this reader is associated with.
@type view_name: string
@return: A new SnapStream reader object.
@rtype: L{snaplogic.common.snapstream.reader.Reader}
@raise SnapStreamNegotiationError: Negotiation failed between endpoints.
@raise SnapStreamConnectionError: Unable to open a connection to the given URL.
"""
view = view_registry.get_view(url)
if view is not None and OPTIMIZATION_ENABLED:
(content_type, pipe) = view.connect(url, accept_types)
mode = PipeReader.RECORD_STREAM if content_type is None else PipeReader.BINARY_STREAM
return PipeReader(pipe, content_type, mode, url, view_name)
else:
return URLReader(url, accept_types, view_name)
|