# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: selectable_pipe.py 7923 2009-06-19 23:49:04Z dmitri $
from __future__ import with_statement
"""
A pipe (one producer, one consumer), which supports non-blocking
operation and a pipe_select() statement, specifically for those pipes.
This is a means of communication between two threads. A single
consumer thread may have multiple pipes open from a number of
producers. The consumer then can use the provided pipe_select() function
to wait until data is available in any one of the pipes.
Data may also be fetched directly from any given pipe via the
get() function.
pipe_select() and get() also support timeout parameters.
VERY IMPORTANT: These are not a multi-consumer queues of sorts! Only a
single consumer is allowed for each pipe. The pipe_select() function may only
be called from that one consumer...
"""
from Queue import Queue,Empty
from threading import Thread,RLock,Condition
import time
from snaplogic.common.snap_exceptions import *
class SelectablePipeException(SnapException):
"""Base class for all SelectablePipe exceptions."""
def __init__(self, *param):
SnapException.__init__(self, "Selectable pipe error: ", *param)
class SelectablePipeEmpty(SelectablePipeException):
"""Raised when an attempt is made to retrieve an item from an empty pipe."""
def __init__(self, *param):
SnapException.__init__(self, "Selectable pipe empty error: ", *param)
class SelectablePipeTimeout(SelectablePipeException):
"""Raised when a blocking operation on the pipe has timed out."""
def __init__(self, *param):
SnapException.__init__(self, "Selectable pipe timeout error: ", *param)
class SelectablePipe(object):
"""
Single producer, single consumer non-blocking, selectable pipe.
Provides a pipe capability between a single producer and consumer.
Items are placed on the pipe, and can be read from the pipe. A
consumer may choose to collect multiple pipes into a list and then
use this list as a parameter to the pipe_select() function that is
provided with this module. Thus, the name: selectable pipe.
"""
def __init__(self):
"""Initialize a selectable pipe."""
self._pipe = []
self._mutex = RLock()
self._cond = Condition(self._mutex)
# The _select_queue variable is used to hold a reference to the
# queue that we will use to commnunicate with the pipe_select() statement,
# if we are ever used that way.
self._select_queue = None
self._open = True
self._aborted = False
def _available_write(self):
"""
Internal method to determine if a pipe is available for writing.
This method should be implemented by children of this class.
@return: True if the pipe can be written to. Otherwise, False.
@rtype: bool
"""
raise NotImplementedError()
def _available_read(self):
"""
Internal method to determine if a pipe is available for reading.
This method should be implemented by children of this class.
@return: True if the an item is available to read. Otherwise, false.
@rtype: bool
"""
raise NotImplementedError()
def _get(self):
"""
Internal method to get an item from the pipe.
This method should be implemented by children of this class.
@return: Next item in the pipe.
@raises SelectablePipeError: No data item available.
"""
raise NotImplementedError()
def _put(self, item):
"""
Internal method to put an item into the pipe.
This method should be implemented by children of this class.
@param item: An item to place into the pipe.
"""
raise NotImplementedError()
def _check_pipe_status(self):
"""
Checks the status of the stream before a get().
This method captures the internal checks for closed/aborted pipes. If False is returned, it means the
pipe was closed normally. True means that get() should attempt to look for an item. An exception is
raised if the pipe was aborted.
@return: True if get() should check for an item. False means the stream is closed.
@rtype: bool
@raise SnapStreamIOError: The pipe was aborted via abort().
"""
if not self._open and not self._available_read():
if self._aborted:
raise SnapStreamIOError("SelectablePipe aborted by data source.")
else:
return False
return True
def _notify(self):
"""
Notify that the pipe is in a ready state.
When data becomes available or a close/abort event arrives, this method will insure that pipe_select()
and blocked get() calls are aware of the event.
"""
if self._select_queue:
# If this pipe is currently being waited upon via the pipe_select()
# function, we need to communicate to the select that we are
# ready. This is done by putting ourselves into the queue,
# which was given to us by pipe_select().
self._select_queue.put(self)
# To prevent inserting ourselves multiple times into the select
# queue, we are removing our reference to that queue once we have
# placed ourselves into it.
self._select_queue = None
# If anyone is waiting for us to have data, we can now wake them up.
self._cond.notifyAll()
def put(self, item):
"""
Place a new item into the pipe.
The item can be any Python object but not the value None.
@param item: A Python object to be pushed into the pipe.
@type item: Any Python object.
@raises SnapStreamIOError: Pipe was already closed.
@raises SnapObjTypeError: Attempt to put None into the pipe.
"""
if item is None:
raise SnapObjTypeError("None value is not allowed in SelectablePipe.")
self._cond.acquire()
try:
if not self._open:
raise SnapStreamIOError("Unable to put data into a closed SelectablePipe.")
while not self._available_write():
self._cond.wait()
if not self._open:
raise SnapStreamIOError("Unable to put data into a closed SelectablePipe.")
self._put(item)
if self._available_read():
self._notify()
finally:
self._cond.release()
def get(self, block=True, timeout=None):
"""
Retrieve an item from the pipe.
By default, this will block until data is available in the pipe.
However, if 'block' has been specified as False, then the call
will always return right away. If there is data, it will return
with the data item, if there is no data, a 'SelectablePipeEmpty'
exception will be raised.
It is also possible to block only for a specific time. The 'timeout'
parameter is specified in seconds (floating point number). It is
ignored if blocking is set to 'False'. If it is a blocking call,
however, then we will not block more than whatever was specified as
the timeout. If we time out, then a 'SelectablePipeTimeout' exception
is raised.
When no more data is available because the pipe has been closed and the buffer exhausted,
None is returned.
@param block: Flag indicating whether this should be a blocking
operation (True, default) or not (False).
@type block: boolean
@param timeout: Maximum block time in seconds, before returning with
a 'SelectablePipeTimeout' exception.
@type timeout: float
@return: If successful, the next data item from the pipe.
@raise SelectablePipeTimeout: If a timeout has occurred.
@raise SelectablePipeEmpty: If a non-blocking call returns without data.
@raise SnapStreamIOError: If the producer side used abort() to abort the pipe.
"""
self._cond.acquire()
try:
if not self._check_pipe_status():
return None
elif block:
while not self._available_read():
self._cond.wait(timeout)
if not self._check_pipe_status():
return None
elif not self._available_read():
raise SelectablePipeEmpty()
item = self._get()
if self._available_write():
self._cond.notifyAll()
return item
finally:
self._cond.release()
# XXX TODO: This was commented out because it's not used and untested yet, but could provide a much more
# XXX efficient method of transfering blocks of data between pipes.
# def transfer(self, src_pipe, block=True):
# """
# Transfer data from one pipe to another.
#
# Attempts to transfer all data from src_pipe to this pipe. If block is True, then all data will transfer
# before this method returns unless there is an error. This means the method will not return until src_pipe
# is closed or aborted since more data may arrive until that point. If block is False, then only as much data
# as can be immediately moved will be transferred.
#
# If the src_pipe is aborted before or during this call, this pipe will also abort itself.
#
# @param src_pipe: The source pipe of data to transfer.
# @type src_pipe: L{SelectablePipe}
#
# @param block: Flag specifying if the code should block until all data can be transferred from the pipe.
# @type block: bool
#
# """
# with self._cond:
# if not self._open:
# raise SnapStreamIOError("Unable to put data into a closed SelectablePipe.")
#
# try:
# did_transfer = False
# with src_pipe._cond:
# while True:
# if src_pipe._check_pipe_status():
# Data is available to read in src_pipe.
# if not self._available_write():
# if block:
# self._cond.wait()
# else:
# return
# self._put(src_pipe._get())
# did_transfer = True
# elif src_pipe._open and block:
# No data left in src_pipe but still open. Need to wait for more data.
# src_pipe._cond.wait()
# else:
# break
#
# if did_transfer:
# self._notify()
# except SnapStreamIOError:
# self._open = False
# self._abort = True
# self._notify()
# raise
def close(self):
"""
Close the pipe (on the producer side).
Closes the pipe such that once the pipe becomes unavailable, any further attempts to use get() will
result in a None value. The check for availability is done so that any buffered data may be properly
exhausted first.
If the pipe was previously aborted via abort(), this will have no effect.
"""
self._cond.acquire()
try:
if self._open:
self._open = False
self._notify()
finally:
self._cond.release()
def abort(self):
"""
Aborts the pipe.
Aborting closes the pipe from further input or output. In addition, once the buffer becomes unavailable, any
further attempt to use get() or put() will result in a SnapStreamIOError indicating the pipe was aborted.
If the pipe was previously closed via close(), this will have no effect.
"""
self._cond.acquire()
try:
if self._open:
self._open = False
self._aborted = True
self._notify()
finally:
self._cond.release()
def _join_select_queue(self, squeue):
"""
Prepare a pipe for usage by pipe_select().
This is a private function of this module, and should not be used
directly by the user of a pipe.
A thread using pipe_select() to wait until data becomes available in
one or more pipes uses a blocking thread-safe queue (the Queue
module) to wait for any pipes notifying it of arrived data.
Thus, such a queue has been created and is now passed to the pipe,
which will remember it and use it if any new data arrives.
@param squeue: A queue on which a pipe can place itself once data arrives.
@type squeue: Queue
"""
self._cond.acquire()
try:
if self._available_read() or not self._open:
# If we have data already pending or hit EOF then we can just place ourselves
# on this queue and don't have to store a reference to it.
squeue.put(self)
else:
# No data yet, so we remember a reference to that queue. Once new
# data arrives, we will place ourselves onto the queue (see put()).
self._select_queue = squeue
finally:
self._cond.release()
def _leave_select_queue(self):
"""
Clean up after pipe_select() is done with a pipe.
See comment for _join_select_queue() for details. After pipe_select() is done,
this function is called on all pipes that pipe_select() was waiting for, and
the reference to the queue is removed from the pipe.
We also return an indication as to whether there was any data now in this
pipe, so that pipe_select() can choose to add this pipe to its internal list of
pipes with data, that it will return.
@return: Flag indicating whether the pipe contains any data at this point.
"""
self._cond.acquire()
try:
if self._available_read() or not self._open:
r = True
else:
r = False
self._select_queue = None
finally:
self._cond.release()
return r
def pipe_select(pipe_list, timeout=None):
"""
Wait until at least one of the pipes in the specified list has data.
A list of pipes to be monitored is passed in. Unless a timeout
is specified (as (fractions of) seconds) the function will block the
caller until data becomes available on at least one of those pipes.
A list is returned, which contains only the pipes that now have
data. Thus, the return is a subset of the list that was passed
in.
If a timeout was specified and expired before any data has arrived,
a 'SelectablePipeTimeout' exception is raised.
By the time we return there might be additional pipes that have data,
and which are not listed in the result, because the data arrived after
pipe_select() was done with its work.
The caller does not have to process all the pipes that are returned
by pipe_select(). If pipe_select() is called again then any pipes that were not
processed (and thus still have data) will simply appear again in the
result returned by pipe_select().
Note that if there are any pipes with pending data when pipe_select() is called,
the function will return immediately, containing those pipes in the result
list.
@param pipe_list: List of pipes that should be monitored.
@type pipe_list: list
@param timeout: Maximum blocking time in seconds. None if no timeout
is desired.
@type timeout: float
@return: List of pipes that contain data.
@raise SelectablePipeTimeout: When a timeout has occurred.
"""
# We create a queue for select, on which all the pipes that
# have data can place themselves.
select_queue = Queue()
for p in pipe_list:
p._join_select_queue(select_queue)
# Wait until anyone has any data. The queue may already have
# some pipes in it at this point.
with_data = []
try:
fq = select_queue.get(True, timeout)
except Empty:
raise SelectablePipeTimeout()
with_data.append(fq)
# Remove all pipes from the select queue and add them
# to our own list if they have data. We have at least
# one element with data already.
for p in pipe_list:
if p is fq:
# Don't need to add that one, since we have
# it already in the list
continue
if p._leave_select_queue():
# Take all pipes out of the select queue, but add
# them to our result if they have data at this point.
with_data.append(p)
return with_data
|