pipe_to_http.py :  » Development » SnapLogic » snaplogic » server » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » server » pipe_to_http.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2008-2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id: pipe_to_http.py 9868 2009-11-21 17:50:45Z dhiraj $
__docformat__ = "epytext en"
"""

Launches a specified pipeline and serves the output data from the pipeline.

This component receives GET request for a URI. The URI (described below)
identifies the resource to be launched and the output view to be read.
The first field of each output record is returned in the GET request stream.

URI structure:
Like all components, PipeToHttp component will be registerd under a URI. Lets
say this registered URI is /pipe. The GET requests should be made to URIs
with the following structure:
http://example.com:8080/pipe/uri-of-the-resource/output-view-name.
For example, if the resource URI is /res/financialinfo and the output
view is output1, then the URI advertised would be:
http://example.com:8080/pipe/res/financialinfo/output1

If the resource takes parameters like  currency (currency) and minimum earnings
(min_earnings), then these parameters can be specified in the query string as follows:
http://example.com:8080/pipe/res/financialinfo/output1?currency=dollars&min_earnings=50000

Apart from these custom parameters that resources define, PipeToHttp recognizes
some special parameters of its own, like sn.start, sn.limit and sn.count.

If you want to request data starting from a particular record number in the stream, then
you can do so as follows:
http://example.com:8080/pipe/res/financialinfo/output1?sn.start=100
This sn.start value has to be 1 or higher.

You can also specify an upper limit on the number of records to return:
http://example.com:8080/pipe/res/financialinfo/output1?sn.limit=1000

sn.start and sn.limit parameters can be combined in a request. Specifying
sn.count=records, returns the total number of records available from the stream.
This is a point in time count of records. It can change, if the data being retrieved is
changing. This parameter cannot be combined with sn.start and sn.limit.

sn.content_type is an optional argument which specifies the format in which content should
be returned. Currently, application/json and application/javascript are supported.

sn.stream_header can be specified to have a special header returned before the data is sent.
sn.stream_footer similarly, species a footer that can be written out after all the data is
written. These params are currently used only in conjunction with application/javascript
content_type to provide output in JSONP format. They are ignored in other circumstances.

"""

import os.path
import urlparse
import urllib
import time
import sys
import traceback
from cStringIO import StringIO

from snaplogic.common.config import snap_config
from snaplogic.common import snap_http_lib,snap_log,headers,parse_size_param
from snaplogic.rp import get_rp,select_content_type,SNAP_ASN1_CONTENT_TYPE
from snaplogic import server
from snaplogic.server import RhResponse
from snaplogic.server import data_cache
from snaplogic.server import repository
from snaplogic.server import auth
from snaplogic.server.http_pipe_common import PipeSessionInfo,write_rec_to_http,write_http_output,end_output,\
                                              process_http_uri
from snaplogic.snapi_base import keys,exec_interface
from snaplogic.snapi_base.resdef import ResDef
from snaplogic.common.snap_exceptions import *

#TODO: XXXXX fix timeout
cache_timeout = 0
main_process_uri = None
pipe_to_http_uri_prefix = None

def initialize():
    """Initialize the Pipe To Http service at server startup."""
    global main_process_uri
    global cache_timeout
    global pipe_to_http_uri_prefix
    conf = snap_config.get_instance()
    main_process_uri = conf.get_section("common")['main_process_uri']
    pipe_to_http_uri_prefix = conf.get_section("main")["pipe_to_http_uri_prefix"]
    cache_dir = conf.get_section("data_cache")['cache_dir']
    sz_str = conf.get_section("data_cache")['cache_size']
    cache_sz = parse_size_param(sz_str)
    if cache_sz is None:
        raise SnapValueError("Cache size %s is an invalid value" % sz_str)
    high_mark = int(conf.get_section("data_cache")['high_water_mark'])
    low_mark = int(conf.get_section("data_cache")['low_water_mark'])
    cache_timeout = int(conf.get_section("data_cache")['cache_timeout'])
    data_cache.init_cache(cache_dir, cache_sz, low_mark, high_mark, server.log, server.elog)
    
class _NullWriteStream(object):
    """Used as a mock stream object."""
    def write(self, inp):
        """Discards whatever is written."""
        return

def rh_pipe_to_http(http_req):
    """
    Called at runtime to start the processing of GET request by this service.
    
    @param http_req:        HTTP request object.
    @type  http_req:        L{HttpRequest}
    
    """
    if http_req.method != 'GET':
        return RhResponse(http_req.METHOD_NOT_ALLOWED, "HTTP method %s not supported" % http_req.method)
    
    pinfo = PipeSessionInfo(http_req, pipe_to_http_uri_prefix)
    
    # Remove the special params from the param list.
    args = {}
    for a in http_req.params:
        if a.startswith("sn."):
            continue
        args[a] = http_req.params[a]  
    new_entry = None
    
    ret = process_http_uri(http_req, pinfo)
    if ret is not None:
        return ret
    
    # Check if the response to this request is in the cache.
    if data_cache.is_enabled() and cache_timeout > 0:
        try:
            new_entry = _lookup_cache(pinfo, args)
            if new_entry is None:
                # Cache hit, we are done.
                return
        except Exception, e:
            server.elog(e, "Failed to process request for resource %s." % http_req.path)
            return RhResponse(http_req.INTERNAL_SERVER_ERROR, "Failed to process request for resource %s." %
                               http_req.path)
        
    try:
        ret = _run_resource(pinfo, pinfo.resource_uri, pinfo.output_view_name, args, new_entry)
    except Exception, e:
        server.elog(e, "Failed to process request for resource %s" % http_req.path)
        if new_entry:
            new_entry.discard()
        return RhResponse(http_req.INTERNAL_SERVER_ERROR, "Failed to process request for resource %s." % http_req.path)
    
    if ret is not None:
        server.log(snap_log.LEVEL_ERR, "Failed to process request for resource %s" % http_req.path)
        if new_entry:
            new_entry.discard()
        return ret

    try:
        end_output(pinfo)
        if new_entry:
            new_entry.commit(time.time() + cache_timeout)
    except Exception, e:
        server.elog(e, "Failed to process request for resource %s." % http_req.path)
        return RhResponse(http_req.INTERNAL_SERVER_ERROR, "Failed to process request for resource %s." % http_req.path)
        
def _lookup_cache(pinfo, args):
    """
    
    Looks up the cache for a cache hit on the URL and args being requested.
    
    Please note that the args here are stripped off of any special SnapLogic specific args and that
    information is provided separately as method params. These method params include params to get
    a subset of the cached data and a param to return only the size of the cached data in terms
    of record count.
    
    This method should only be calle if caching is enabled.
    
    @param args: The parameters for the pipeline.
    @type args:  dict
    
    @return: None if there was a cache hit. A newly "write opened" cache entry, if there was a cache
        miss.
    @rtype:  L{data_cache.CacheEntry}
    
    @raise SnapHttpErrorCode: If the cache lookup failed and the method wants to return an HTTP error
        code back to the client.
    
    """
    # Use a copy, as we might modify it.
    args = dict(args)
    
    if not pinfo.output_is_record:
        # For Binary output, we include the content type name in the URI.
        args['sn.content_type'] = pinfo.content_type
    
    # We need to make sure that the params are placed in a sorted manner in the URI, so that the lookup
    # uses the same URI string for every lookup.
    arg_str = ""    
    if len(args) > 0:
        keys = args.keys()
        keys.sort()
        l = ["%s=%s" % (urllib.quote_plus(k) , urllib.quote_plus(args[k])) for k in keys]
        arg_str = "&".join(l)
    url = urlparse.urlunparse(('', '', urllib.quote_plus(pinfo.remaining_path, "/"), '', arg_str, ''))
    
    # Do the lookup
    entry = data_cache.open_entry(url, pinfo.guid, pinfo.gen_id, "r")
    if entry is not None:
        # We have a cache hit
        try:
            if pinfo.count_req is not None:
                write_http_output(pinfo, entry.row_count)
                end_output(pinfo)
                return None
            if entry is not None:
                if pinfo.output_is_record:
                    cached_stream = get_cached_stream(pinfo, entry)
                    if cached_stream is None:
                        # The specified range does not exist.
                        end_output(pinfo)
                        return None
                    for r in cached_stream:
                        pinfo.current_record_num += 1
                        if write_rec_to_http(pinfo, r):
                            break
                else:
                    d = entry.read(500)
                    while d:
                        write_http_output(pinfo, d)
                        d = entry.read(500)
            end_output(pinfo)
        finally:
            entry.close()

        return None
    else:
        return data_cache.open_entry(url, pinfo.guid, pinfo.gen_id, "w", pinfo.output_is_record)

def get_cached_stream(pinfo, entry):
    """
    Returns an iterator object for reading records from the cache entry.
    
    @param pinfo: The current pipe session state information.
    @type pinfo:  L{PipeSessionInfo}
    
    @param entry: Cache entry returned by the cache.
    @type entry:  L{data_cache.CachEntry}
    
    @return: iterator object or None, if the range of records requested goes past the end of stream.
    @rtype:  iterator or None.
    
    """

    if pinfo.starting_rec is not None:
        pinfo.current_record_num = entry.seek_nearest_record(pinfo.starting_rec) - 1
        if pinfo.current_record_num is None:
            raise SnapValueError("Failed to find starting record number %s." % (pinfo.starting_rec))
        
    cache_data_rp = get_rp(SNAP_ASN1_CONTENT_TYPE)
    if cache_data_rp is None:
        raise SnapValueError("Cache data content type %s was not handled by RP layer" % SNAP_ASN1_CONTENT_TYPE)
    cached_stream = cache_data_rp.Reader(entry)
    
    if pinfo.starting_rec is not None and (pinfo.current_record_num < pinfo.starting_rec - 1):
        # seek_nearest_record() only gets us as close as the indexing in cache allows us to the
        # starting record. We now need to move upto the exact point just before the starting record.
        for r in cached_stream:
            pinfo.current_record_num += 1
            if pinfo.current_record_num == pinfo.starting_rec - 1:
                return cached_stream
        return None
    else:
        return cached_stream
    
def _run_resource(pinfo, resource_uri, view_name, args, cache_entry = None):
    """
    Run the pipeline specified in the URL.
    
    @param pinfo: Information related to the current request session.
    @type pinfo:  L{PipeSessionInfo}
    
    @param resource_uri: The resource that should be executed.
    @type resource_uri:  str
    
    @param view_name: Name of the output view name to read.
    @type view_name:  str
    
    @param args: The params for the pipeline.
    @type args:  dict
    
    @param cache_entry: File object for the new entry in cache.
    @type cache_entry:  L{data_cache.CacheEntry}
    
    @return: None, if all goes well, else RhResponse object.
    @rtype: L{RhResponse}
    
    """
       
    resource_uri = snap_http_lib.concat_paths(main_process_uri, resource_uri)
    outputs = {view_name:None}
    # We use the server token as auth to make the resource execution request.
    # The invoker header is set to the username (if any) that was used to execute the GET request.
    custom_header = { headers.SERVER_TOKEN_HEADER : server.server_token }
    if pinfo.http_req.username is not None:
        custom_header[headers.INVOKER_HEADER] = pinfo.http_req.username
    h = exec_interface.exec_resource("PipeToHttp", resource_uri, pinfo.resdef, None, outputs, args, None, custom_header)
    res_output = h.outputs[view_name]
    
    try:
        if pinfo.output_is_record:
            # We need to first initialize the asn.1 stream that is being saved to cache.
            if cache_entry:
                rp = get_rp(SNAP_ASN1_CONTENT_TYPE)
                cache_stream = rp.Writer(_NullWriteStream())
                s = cache_stream.initialize()
                if s:
                    cache_entry.write(s)
            
            # Next, write records out to the client and to the cache.
            r = res_output.read_record()
            while r is not None:
                pinfo.current_record_num += 1
                if cache_entry:
                    if (pinfo.current_record_num % 100 == 0) or (pinfo.current_record_num == 1):
                        s = cache_stream.write(r)
                        # Make an index entry for the first record and every 100 records.
                        cache_entry.write(s, pinfo.current_record_num, 1)
                    else:
                        s = cache_stream.write(r)
                        cache_entry.write(s, None, 1)
                write_rec_to_http(pinfo, r)
                r = res_output.read_record()
            # Finally, end the asn.1 stream to the cache.
            if cache_entry:
                s = cache_stream.end()
                cache_entry.write(s)
        else:
            d = res_output.read_binary(500)
            while d is not None:
                write_http_output(pinfo, d)
                if cache_entry:
                    cache_entry.write(d)
                d = res_output.read_binary(500)
            
    finally:
         res_output.close()
        
    if pinfo.count_req is not None:
        write_http_output(pinfo, pinfo.current_record_num)
    end_output(pinfo)
    
    return None
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.