# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008-2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: rh_frontend.py 6314 2009-02-11 01:07:59Z grisha $
from __future__ import with_statement
"""
Request Handler functionality for dealing with SnapStream HTTP requests.
This module provides convenience functions for transferring data in and out of SelectablePipes
associated with SnapStream Reader and Writer objects in and out of HTTP request streams.
"""
import httplib
from threading import Lock
from snaplogic import rp
from snaplogic.common.snap_exceptions import *
from snaplogic.common.snapstream.pipe_writer import PipeWriter
from snaplogic.common.snapstream.pipe_reader import PipeReader
from snaplogic.common.snapstream.multi_pipe import MultiPipeAggregatorThread
from snaplogic.common.snapstream.selectable_binary_pipe import SelectableBinaryPipe
from snaplogic.common.snapstream.selectable_rp_pipe import *
from snaplogic.common.snapstream.memory import DEFAULT_BLOCK_SIZE
_post_streams = {}
_mutex = Lock()
def _negotiate(http_req, accept_string, content_types):
"""
Negotiate the HTTP content type.
Parses the HTTP accept string and compares it against the list of supported content types. If
a match is found, the content type name (e.g. application/x-snap-asn1) is returned. If no match is found,
None is returned.
If negotiation fails, the HTTP connection is sent the UNSUPPORTED MEDIA TYPE (415) error code.
@param http_req: An HTTP request object.
@type http_req: L{snaplogic.server.http_request.HttpRequest}
@param accept_string: An HTTP accept string such as the value of an Accept header.
@type accept_string: str
@param content_types: A list of content type names to consider for satisfying the accept string. If None,
the list defaults to the supported RP content types.
@type content_types: list
@return: The negotiated content type name or None if negotiation fails.
@rtype: str
"""
if not accept_string:
accept_string = "*/*"
accept_list = accept_string.split(',')
matched_ct = rp.select_content_type(accept_list, content_types)
if matched_ct is None:
# XXX log
http_req.send_response_headers(httplib.UNSUPPORTED_MEDIA_TYPE, None, False)
return None
return matched_ct
def process_get(http_req, content_types=None, view_name="Unspecified"):
"""
Process an HTTP GET request for a SnapStream output.
First, content negotiation will occur on the stream using the Accept header as the list of content types
supported by the client. If negotiation fails, the Unsupported Media Type (415) error response is sent
and the function returns (None, None).
The content_types parameter specifies the content types supported by the component. If this value is None,
the list of content types supported by the RP layer are used by default. Otherwise, content_types must be
a list of content type names the component supports for binary mode. No RP translation will occur when
content_types is not None, even if the list is a subset of the RP's supported list.
Otherwise, negotiation succeeded and a L{SelectableRPWriterPipe} or L{SelectableBinaryPipe} is created
depending on if the stream is record or binary mode. This pipe is used to initialize a L{PipeWriter}
stream object to be used as the component output. Finally, the tuple (stream, pipe) is returned.
The pipe object should then be passed on to L{process_output} so that data sent into the stream by the
component is sent out to the HTTP connection.
@param http_req: An HTTP request object.
@type http_req: L{snaplogic.server.http_request.HttpRequest}
@param content_types: A list of component-supported content types or None to use the RP layer.
@type content_types: list
@param view_name: Output view name used for debugging/error messages.
@type view_name: str
@return: A tuple of (stream, pipe) for the stream object to use for the output view and a pipe to give to
L{process_output}.
@rtype: tuple
"""
matched_ct = _negotiate(http_req, http_req.http_accept, content_types)
if matched_ct is None:
return (None, None)
if content_types is None:
# Record stream. Need an RP
selected_rp = rp.get_rp(matched_ct)
pipe = SelectableRPWriterPipe(selected_rp)
mode = PipeWriter.RECORD_STREAM
else:
# Binary stream. No RP used.
pipe = SelectableBinaryPipe()
mode = PipeWriter.BINARY_STREAM
stream = PipeWriter(pipe, matched_ct, mode, http_req.raw_uri, view_name)
pipe.content_type = stream.content_type
return (pipe, stream)
def process_post(http_req, content_types=None, view_name="Unspecified"):
"""
Process an HTTP POST request for a SnapStream output.
First, content negotiation will occur on the stream using the Accept header as the list of content types
supported by the client. If negotiation fails, the Unsupported Media Type (415) error response is sent
and the function returns (None, None).
The content_types parameter specifies the content types supported by the component. If this value is None,
the list of content types supported by the RP layer are used by default. Otherwise, content_types must be
a list of content type names the component supports for binary mode. No RP translation will occur when
content_types is not None, even if the list is a subset of the RP's supported list.
Otherwise, negotiation succeeded and a L{SelectableRPReaderPipe} or L{SelectableBinaryPipe} is created
depending on if the stream is record or binary mode. This pipe is used to initialize a L{PipeReader}
stream object to be used as the component input. Finally, the tuple (stream, pipe) is returned.
In the case that a special header, SnapStream-Continue, exists in the HTTP headers, the request is considered
part of a multi-POST protocol that allows multiple HTTP PUSH requests to be aggregated together into a single
stream. When the first POST requests arrives for a particular input view URL with a SnapStream-Continue header
with the value 'yes', additional steps are taken to record that this is a multi-POST chunk. As more POSTs
arrive and continue to have the SnapStream-Continue header set to 'yes', their data is queued up for when
the previous requests is finished. A final POST request to this URL without the SnapStream-Continue header
signals that this request is the final in the series and the input view will receive EOF once the data in this
request is read.
The pipe object should then be passed on to L{process_input} so that data sent into the stream by the
component is sent out to the HTTP connection.
@param http_req: An HTTP request object.
@type http_req: L{snaplogic.server.http_request.HttpRequest}
@param content_types: A list of component-supported content types or None to use the RP layer.
@type content_types: list
@param view_name: Input view name used for debugging/error messages.
@type view_name: str
@return: A tuple of (stream, pipe) for the stream object to use for the input view and a pipe to give to
L{process_input}.
@rtype: tuple
"""
matched_ct = _negotiate(http_req, http_req.http_content_type, content_types)
if matched_ct is None:
return (None, None)
if content_types is None:
# Record stream. Need an RP
selected_rp = rp.get_rp(matched_ct)
pipe = SelectableRPReaderPipe(selected_rp)
mode = PipeReader.RECORD_STREAM
else:
# Binary stream. No RP used.
pipe = SelectableBinaryPipe()
mode = PipeReader.BINARY_STREAM
with _mutex:
if http_req.snap_stream_continue or http_req.raw_uri in _post_streams:
if http_req.raw_uri not in _post_streams:
if mode is PipeReader.RECORD_STREAM:
dest_pipe = SelectableObjectPipe()
else:
dest_pipe = SelectableBinaryPipe()
aggregator = MultiPipeAggregatorThread(dest_pipe, "MultiPipe Aggregator: " + http_req.raw_uri)
stream = PipeReader(dest_pipe, matched_ct, mode, http_req.raw_uri, view_name)
_post_streams[http_req.raw_uri] = (stream, aggregator)
aggregator.start()
else:
(stream, aggregator) = _post_streams[http_req.raw_uri]
# Make sure that the new content type is the same as the original
if stream.content_type != matched_ct:
# XXX log
http_req.send_response_headers(http_req.CONFLICT)
http_req.output.write(stream.content_type)
stream._pipe.abort()
del _post_streams[http_req.raw_uri]
return (None, None)
# If this is the last chunk, remove from global dictionary
if not http_req.snap_stream_continue:
del _post_streams[http_req.raw_uri]
aggregator.add_pipe(pipe, True)
else:
aggregator.add_pipe(pipe)
else:
stream = PipeReader(pipe, matched_ct, mode, http_req.raw_uri, view_name)
pipe.content_type = stream.content_type
return (pipe, stream)
def stream_output(http_req, pipe):
"""
Stream the output of a pipe into an HttpRequest object.
Consumes data from a pipe object created by L{process_get} and writes it out as a reponse to the HTTP
connection until the pipe is closed. This call will block until the the stream is closed.
@param http_req: An HTTP request object.
@type http_req: L{snaplogic.server.http_request.HttpRequest}
@param pipe: A pipe object to read from. Created by L{process_get}.
@type pipe: L{snaplogic.common.snapstream.selectable_pipe.SelectablePipe}
"""
http_req.send_response_headers(200, [('Content-Type', pipe.content_type)], False)
# XXX Write an empty string to force WSGI to send the headers
http_req._output.write('')
try:
while True:
data = pipe.get()
if data is not None:
http_req._output.write(data)
else:
break
except SnapStreamIOError:
# The other side of the pipe had issues and presumably logged something about it.
pass
except Exception, e:
# Some other exception occured. Most likely this is due to the output socket being closed prematurely.
# Premature closing usually means the downstream component failed. So we'll abort the
pipe.abort()
def stream_input(http_req, pipe):
"""
Stream input from HTTP POST to a pipe.
Consumes data the HTTP connection object and writes it into the pipe created by L{process_get}. Once all
data is read from the connection, the pipe is closed. The call will block until all data has been read.
@param http_req: An HTTP request object.
@type http_req: L{snaplogic.server.http_request.HttpRequest}
@param pipe: A pipe object to write to. Created by L{process_post}.
@type pipe: L{snaplogic.common.snapstream.selectable_pipe.SelectablePipe}
"""
http_req.send_response_headers(200, [('Content-Type', pipe.content_type)], False)
try:
length = http_req.content_length
block_size = DEFAULT_BLOCK_SIZE
while length:
read_size = block_size if (block_size < length) else length
length -= read_size
data = http_req._input.read(read_size)
if not data:
break
pipe.put(data)
except Exception, e:
# XXX log exception
pipe.abort()
raise e
finally:
pipe.close()
|