# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: view_registry.py 5239 2008-11-14 20:12:24Z sasha $
from __future__ import with_statement
"""
A registry for SnapStreams used for optimizing data transfer between component threads of the same process.
"""
from threading import Lock
from snaplogic import rp
from snaplogic.common.snap_exceptions import *
from snaplogic.common.snapstream.selectable_object_pipe import SelectableObjectPipe
from snaplogic.common.snapstream.selectable_binary_pipe import SelectableBinaryPipe
from snaplogic import rp
_registered_output_views = {}
"""
A dictionary of registered output views.
Maps the upstream output view URL to its acceptible content types and list of internal reader pipes
linked to it.
"""
_reg_lock = Lock()
"""A lock for accessing class global data."""
class RegisteredView(object):
class Link(object):
def __init__(self, url, content_type, pipe):
self.url = url
self.content_type = content_type
self.pipe = pipe
def __init__(self, accept_types, view_name):
"""
Initialization.
@param accept_types: A list of acceptable content type specifiers None for record-based streams.
@type accept_types: list
"""
self.output_accept_types = accept_types
self.linked_pipes = []
self.name = view_name
def connect(self, url, input_accept_types):
"""
Connect to this registered view.
The input_accept_types specified are negotiated with the content types this view was registered with. If a
matching content type can be found, return SelectablePipe object that is linked to this view. The pipe will
later attach to a SnapStream writer object that is returned by L{snaplogic.common.snapstream.open_local_writers}
which the upstream component will use to send data.
When successful, method returns a tuple of (content_type, pipe) specifying the negotiated content type
and a pipe object for transferring data between threads. The content_type will be one of the content types
contained in input_accept_types if it was a list or None if using record stream mode. The pipe object
will be an instance of L{snaplogic.common.snapstream.selectable_object_pipe.SelectableObjectPipe} for
record streams and L{snaplogic.common.snapstream.selectable_binary_pipe.SelectableBinaryPipe} for binary
streams.
@param input_accept_types: A list of acceptable content types or None for record-based streams.
@type input_accept_types: list
@return: A tuple of (content_type, pipe) to receive data from the output view.
@rtype: L{snaplogic.common.snapstream.selectable_pipe.SelectablePipe}
@raise SnapStreamNegotiationError: Negotiation failed between endpoints.
"""
with _reg_lock:
if (input_accept_types is None) and (self.output_accept_types is None):
# Both sides are using record streams
pipe = SelectableObjectPipe()
content_type = None
elif (input_accept_types is not None) and (self.output_accept_types is not None):
# Both sides using binary streams. Need content negotiation
content_type = rp.select_content_type(input_accept_types, self.output_accept_types)
if content_type is not None:
pipe = SelectableBinaryPipe()
else:
raise SnapStreamNegotiationError("No compatible content type found.",
("URL", url), ("View", self.name))
else:
raise SnapStreamNegotiationError("Stream type (binary/record) mistmatch.",
("URL", url), ("View", self.name))
self.linked_pipes.append(self.Link(url, content_type, pipe))
return (content_type, pipe)
def register_output_view(url, accept_types=None, view_name='Unspecified'):
"""
Register an output view for use with SnapStream.
Registers an output view with the given URL and acceptible content types for later use with SnapStream.
The main purpose of registration is to allow for optimized transfer of data between different component
threads in the same process.
If accept_types is not given or None, use record-based streams. Otherwise, accept_types must be a list of
content type specification strings that detail what encodings the output view's component supports.
@param url: URL of the output view.
@type url: string
@param accept_types: A list of acceptable content type specifiers or None for record-based streams.
@type accept_types: list
@param view_name: Name of output view. Used for debugging/error messages.
@type view_name: string
"""
with _reg_lock:
_registered_output_views[url] = RegisteredView(accept_types, view_name)
def unregister_output_view(url):
"""
Unregister an output view from SnapStream.
Performs cleanup for an output view previously registered when no longer needed. In a successful run, this should
only be called in the START phase of pipeline execution. Readers may be linked with the output view at any point in
the PREPARE phase but are guaranteed to have finished by START.
It may be called at any point in an error case. If readers later try to attach to an output view in the PREPARE
phase, they will raise an error similar to as if they had tried to connect to a no longer running resource.
@param url: URL of output view previously registered.
@type url: string
"""
with _reg_lock:
try:
del _registered_output_views[url]
except KeyError:
pass
def get_view(url):
"""
Retrieve the registration object for a output view URL.
If the url is registered, returns the L{RegisteredView} object assigned to it. Otherwise, returns None.
@param url: An output view URL to look up.
@type url: string
@return: Registration object for the output view URL if found. If not found, None is returned instead.
@rtype: L{RegisteredView}
"""
# The URL can have query parameters that are not needed, so strip them off
base_url = url.split('?', 1)[0]
with _reg_lock:
return _registered_output_views.get(base_url, None)
|