# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: multi_pipe.py 1749 2008-03-20 22:34:26Z kurt $
from __future__ import with_statement
"""
Utilities for handling the multi-POST pseudo-stream protocol.
"""
from threading import Thread,Condition
from snaplogic.common.snap_exceptions import *
class MultiPipeAggregatorThread(Thread):
"""
Aggregator thread for serializing multiple POST requests into a single stream.
For the multi-POST protocol, a series of POSTs can occur to the same view URL allowing a pseudo-streaming
capability. This convoluted protocol is currently necessary due to paste WSGI not supporting the chunked
HTTP transfer encoding.
This class created a thread whose purpose is to read data in from the various pipes and serialize their output
into a destination pipe. The serialization is done by reading all the data from each pipe in order and transfering
it to the output pipe. When one pipe is exhausted and has been closed, the class moves on to the next pipe
in the queue. If there currently aren't any pipes in the queue, there are two possibilties. If the last pipe
has not been added to the queue yet, the class waits until a new pipe is added. If the last pipe has been found,
the destination pipe is closed and the thread exits.
"""
def __init__(self, dest_pipe, thread_name=None):
"""
Initialization.
@param dest_pipe: A pipe to serialize data out to.
@type dest_pipe: L{snaplogic.common.snapstream.selectable_pipe.SelectablePipe}
@param thread_name: A name to assign the thread.
@type thread_name: string
"""
super(MultiPipeAggregatorThread, self).__init__(name=thread_name)
self._dest_pipe = dest_pipe
self._pipe_queue = []
self._cond = Condition()
self._last_pipe_flag = False
def add_pipe(self, pipe, last_flag=False):
"""
Add a pipe to the queue.
Adds the pipe to the queue for the transfer thread. If last_flag is given and True, this pipe is expected
to be the last pipe added. This will queue the thread that when the pipe queue is exhausted, it should exit.
@param pipe: A pipe to read as input.
@type pipe: L{snaplogic.common.snapstream.selectable_pipe.SelectablePipe}
@param last_flag: A flag indicating this is the last pipe that will be added.
@type last_flag: bool
"""
with self._cond:
if self._last_pipe_flag:
raise SnapValueError("Error adding new POST pipe to aggregator: last POST already received.")
self._pipe_queue.append(pipe)
self._last_pipe_flag = last_flag
self._cond.notify()
def _get_next_pipe(self):
"""
Get the next pipe from the queue.
If there are no more pipes in the queue and the last pipe has not been seen, this call will block
until a pipe is available.
@return: The next pipe object in the queue.
@rtype: L{snaplogic.common.snapstream.selectable_pipe.SelectablePipe}
"""
with self._cond:
while not self._pipe_queue:
if self._last_pipe_flag:
return None
else:
self._cond.wait()
return self._pipe_queue.pop(0)
def _pipe_iterator(self):
"""
Get an iterator over the pipe queue.
@return: An iterator over the pipe queue.
@rtype: iterator
"""
while True:
current_pipe = self._get_next_pipe()
if current_pipe is not None:
yield current_pipe
else:
return
def run(self):
for pipe in self._pipe_iterator():
try:
while True:
item = pipe.get()
if item is not None:
self._dest_pipe.put(item)
else:
break
except Exception:
self._dest_pipe.abort()
pipe.abort()
for pipe in self._pipe_iterator():
pipe.abort()
raise
self._dest_pipe.close()
|