rh_frontend.py :  » Development » SnapLogic » snaplogic » common » snapstream » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » common » snapstream » rh_frontend.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2008-2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id: rh_frontend.py 6314 2009-02-11 01:07:59Z grisha $

from __future__ import with_statement

"""
Request Handler functionality for dealing with SnapStream HTTP requests.

This module provides convenience functions for transferring data in and out of SelectablePipes
associated with SnapStream Reader and Writer objects in and out of HTTP request streams.

"""

import httplib
from threading import Lock

from snaplogic import rp
from snaplogic.common.snap_exceptions import *
from snaplogic.common.snapstream.pipe_writer import PipeWriter
from snaplogic.common.snapstream.pipe_reader import PipeReader
from snaplogic.common.snapstream.multi_pipe import MultiPipeAggregatorThread
from snaplogic.common.snapstream.selectable_binary_pipe import SelectableBinaryPipe
from snaplogic.common.snapstream.selectable_rp_pipe import *
from snaplogic.common.snapstream.memory import DEFAULT_BLOCK_SIZE

_post_streams = {}
_mutex = Lock()

def _negotiate(http_req, accept_string, content_types):
    """
    Negotiate the HTTP content type.

    Parses the HTTP accept string and compares it against the list of supported content types. If
    a match is found, the content type name (e.g. application/x-snap-asn1) is returned. If no match is found,
    None is returned.

    If negotiation fails, the HTTP connection is sent the UNSUPPORTED MEDIA TYPE (415) error code.

    @param http_req: An HTTP request object.
    @type http_req: L{snaplogic.server.http_request.HttpRequest}
    
    @param accept_string: An HTTP accept string such as the value of an Accept header.
    @type accept_string: str

    @param content_types: A list of content type names to consider for satisfying the accept string. If None,
                          the list defaults to the supported RP content types.
    @type content_types: list

    @return: The negotiated content type name or None if negotiation fails.
    @rtype: str

    """
    if not accept_string:
        accept_string = "*/*"
    accept_list = accept_string.split(',')
    matched_ct = rp.select_content_type(accept_list, content_types)
    if matched_ct is None:
        # XXX log
        http_req.send_response_headers(httplib.UNSUPPORTED_MEDIA_TYPE, None, False)
        return None

    return matched_ct
    
def process_get(http_req, content_types=None, view_name="Unspecified"):
    """
    Process an HTTP GET request for a SnapStream output.

    First, content negotiation will occur on the stream using the Accept header as the list of content types
    supported by the client. If negotiation fails, the Unsupported Media Type (415) error response is sent
    and the function returns (None, None).

    The content_types parameter specifies the content types supported by the component. If this value is None,
    the list of content types supported by the RP layer are used by default. Otherwise, content_types must be
    a list of content type names the component supports for binary mode. No RP translation will occur when
    content_types is not None, even if the list is a subset of the RP's supported list.

    Otherwise, negotiation succeeded and a L{SelectableRPWriterPipe} or L{SelectableBinaryPipe} is created
    depending on if the stream is record or binary mode. This pipe is used to initialize a L{PipeWriter}
    stream object to be used as the component output. Finally, the tuple (stream, pipe) is returned.

    The pipe object should then be passed on to L{process_output} so that data sent into the stream by the
    component is sent out to the HTTP connection.

    @param http_req: An HTTP request object.
    @type http_req: L{snaplogic.server.http_request.HttpRequest}

    @param content_types: A list of component-supported content types or None to use the RP layer.
    @type content_types: list

    @param view_name: Output view name used for debugging/error messages.
    @type view_name: str

    @return: A tuple of (stream, pipe) for the stream object to use for the output view and a pipe to give to
             L{process_output}.
    @rtype: tuple

    """
    matched_ct = _negotiate(http_req, http_req.http_accept, content_types)
    if matched_ct is None:
        return (None, None)
    
    if content_types is None:
        # Record stream. Need an RP
        selected_rp = rp.get_rp(matched_ct)
        pipe = SelectableRPWriterPipe(selected_rp)
        mode = PipeWriter.RECORD_STREAM
    else:
        # Binary stream. No RP used.
        pipe = SelectableBinaryPipe()
        mode = PipeWriter.BINARY_STREAM

    stream = PipeWriter(pipe, matched_ct, mode, http_req.raw_uri, view_name)
    pipe.content_type = stream.content_type
    return (pipe, stream)

def process_post(http_req, content_types=None, view_name="Unspecified"):
    """
    Process an HTTP POST request for a SnapStream output.

    First, content negotiation will occur on the stream using the Accept header as the list of content types
    supported by the client. If negotiation fails, the Unsupported Media Type (415) error response is sent
    and the function returns (None, None).

    The content_types parameter specifies the content types supported by the component. If this value is None,
    the list of content types supported by the RP layer are used by default. Otherwise, content_types must be
    a list of content type names the component supports for binary mode. No RP translation will occur when
    content_types is not None, even if the list is a subset of the RP's supported list.

    Otherwise, negotiation succeeded and a L{SelectableRPReaderPipe} or L{SelectableBinaryPipe} is created
    depending on if the stream is record or binary mode. This pipe is used to initialize a L{PipeReader}
    stream object to be used as the component input. Finally, the tuple (stream, pipe) is returned.

    In the case that a special header, SnapStream-Continue, exists in the HTTP headers, the request is considered
    part of a multi-POST protocol that allows multiple HTTP PUSH requests to be aggregated together into a single
    stream. When the first POST requests arrives for a particular input view URL with a SnapStream-Continue header
    with the value 'yes', additional steps are taken to record that this is a multi-POST chunk. As more POSTs
    arrive and continue to have the SnapStream-Continue header set to 'yes', their data is queued up for when
    the previous requests is finished. A final POST request to this URL without the SnapStream-Continue header
    signals that this request is the final in the series and the input view will receive EOF once the data in this
    request is read.

    The pipe object should then be passed on to L{process_input} so that data sent into the stream by the
    component is sent out to the HTTP connection.

    @param http_req: An HTTP request object.
    @type http_req: L{snaplogic.server.http_request.HttpRequest}

    @param content_types: A list of component-supported content types or None to use the RP layer.
    @type content_types: list

    @param view_name: Input view name used for debugging/error messages.
    @type view_name: str

    @return: A tuple of (stream, pipe) for the stream object to use for the input view and a pipe to give to
             L{process_input}.
    @rtype: tuple

    """
    matched_ct = _negotiate(http_req, http_req.http_content_type, content_types)
    if matched_ct is None:
        return (None, None)
    
    if content_types is None:
        # Record stream. Need an RP
        selected_rp = rp.get_rp(matched_ct)
        pipe = SelectableRPReaderPipe(selected_rp)
        mode = PipeReader.RECORD_STREAM
    else:
        # Binary stream. No RP used.
        pipe = SelectableBinaryPipe()
        mode = PipeReader.BINARY_STREAM

    with _mutex:
        if http_req.snap_stream_continue or http_req.raw_uri in _post_streams:
            if http_req.raw_uri not in _post_streams:
                if mode is PipeReader.RECORD_STREAM:
                    dest_pipe = SelectableObjectPipe()
                else:
                    dest_pipe = SelectableBinaryPipe()
                    
                aggregator = MultiPipeAggregatorThread(dest_pipe, "MultiPipe Aggregator: " + http_req.raw_uri)
                stream = PipeReader(dest_pipe, matched_ct, mode, http_req.raw_uri, view_name)
                _post_streams[http_req.raw_uri] = (stream, aggregator)
                aggregator.start()
            else:
                (stream, aggregator) = _post_streams[http_req.raw_uri]

                # Make sure that the new content type is the same as the original
                if stream.content_type != matched_ct:
                    # XXX log
                    http_req.send_response_headers(http_req.CONFLICT)
                    http_req.output.write(stream.content_type)
                    stream._pipe.abort()
                    del _post_streams[http_req.raw_uri]
                    return (None, None)

            # If this is the last chunk, remove from global dictionary
            if not http_req.snap_stream_continue:
                del _post_streams[http_req.raw_uri]
                aggregator.add_pipe(pipe, True)
            else:
                aggregator.add_pipe(pipe)
        else:
            stream = PipeReader(pipe, matched_ct, mode, http_req.raw_uri, view_name)
            
    pipe.content_type = stream.content_type
    return (pipe, stream)
        
def stream_output(http_req, pipe):
    """
    Stream the output of a pipe into an HttpRequest object.

    Consumes data from a pipe object created by L{process_get} and writes it out as a reponse to the HTTP
    connection until the pipe is closed. This call will block until the the stream is closed.

    @param http_req: An HTTP request object.
    @type http_req: L{snaplogic.server.http_request.HttpRequest}

    @param pipe: A pipe object to read from. Created by L{process_get}.
    @type pipe: L{snaplogic.common.snapstream.selectable_pipe.SelectablePipe}
    
    """
    http_req.send_response_headers(200, [('Content-Type', pipe.content_type)], False)
    # XXX Write an empty string to force WSGI to send the headers
    http_req._output.write('')

    try:
        while True:
            data = pipe.get()
            if data is not None:
                http_req._output.write(data)
            else:
                break
    except SnapStreamIOError:
        # The other side of the pipe had issues and presumably logged something about it.
        pass
    except Exception, e:
        # Some other exception occured. Most likely this is due to the output socket being closed prematurely.
        # Premature closing usually means the downstream component failed. So we'll abort the
        pipe.abort()
        
def stream_input(http_req, pipe):
    """
    Stream input from HTTP POST to a pipe.

    Consumes data the HTTP connection object and writes it into the pipe created by L{process_get}. Once all
    data is read from the connection, the pipe is closed. The call will block until all data has been read.

    @param http_req: An HTTP request object.
    @type http_req: L{snaplogic.server.http_request.HttpRequest}

    @param pipe: A pipe object to write to. Created by L{process_post}.
    @type pipe: L{snaplogic.common.snapstream.selectable_pipe.SelectablePipe}
    
    """
    http_req.send_response_headers(200, [('Content-Type', pipe.content_type)], False)

    try:
        length = http_req.content_length
        block_size = DEFAULT_BLOCK_SIZE
        while length:
            read_size = block_size if (block_size < length) else length
            length -= read_size
            data = http_req._input.read(read_size)
            if not data:
                break

            pipe.put(data)
    except Exception, e:
        # XXX log exception
        pipe.abort()
        raise e
    finally:
        pipe.close()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.