# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008 - 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: http_to_pipe.py 10195 2009-12-17 19:48:18Z grisha $
__docformat__ = "epytext en"
"""
Launches a specified resource based on a POST request.
This component receives POST request for a URI. The URI (described below)
identifies the resource to be launched and the input view to be written into.
URI structure:
Like all components, http_to_pipe component will be registered under a URI. Lets
say this registered URI is /feed. The POST requests should be made to URIs
with the following structure:
http://example.com:8080/feed/uri-of-the-resource/input-view-name.
For example, if the resource URI is /res/financialinfo and the input
view is input1, then the URI advertised would be:
http://example.com:8080/pipe/res/financialinfo/input1
If the resource takes parameters like currency (currency) and minimum earnings
(min_earnings), then these parameters can be specified in the query string as follows:
http://example.com:8080/pipe/res/financialinfo/input1?currency=dollars&min_earnings=50000
Apart from these custom parameters that resources define, http_to_pipe recognizes the param
sn.output_view. The POST request can use this param to specify an output view in the
resource. The data read from this output view will be returned in the POST response. Much
like pipe_to_http, one can control and format the data returned by the output with these
parameters:
To request data starting from a particular record number in the stream, specify the sn.start
parameter with the number of the record at which to start returning records. This sn.start
value has to be 1 or higher and applies only for record mode output views. For example,
to start returning from record #100, specify:
http://example.com:8080/pipe/res/financialinfo/input1?sn.output_view=output1&sn.start=100
To specify an upper limit on the number of records returned by output stream, use sn.limit param.
For example:
http://example.com:8080/pipe/res/financialinfo/input1?sn.output_view=output1&sn.limit=1000
sn.start and sn.limit parameters can be combined in a request. Specifying
sn.count=records, returns the total number of records available from the output stream.
This value can change for successive pipeline executions, if the data being retrieved changes
over time. This parameter cannot be combined with sn.start and sn.limit.
sn.content_type is an optional argument which specifies the format in which content should
be returned. Currently, application/json and application/javascript are supported.
sn.stream_header can be specified to have a special header returned before the data is sent.
sn.stream_footer similarly, species a footer that can be written out after all the data is
written. These params are currently used only in conjunction with application/javascript
content_type to provide output in JSONP format. They are ignored in other circumstances.
"""
import time
import sys
import traceback
from cStringIO import StringIO
from snaplogic.common.config import snap_config
from snaplogic.common import snap_http_lib,snap_log,headers
from snaplogic import rp
from snaplogic import server
from snaplogic.server import RhResponse
from snaplogic.snapi_base import keys,exec_interface
from snaplogic.snapi_base.exceptions import SnapiHttpException
from snaplogic.common.snap_exceptions import *
from http_pipe_common import process_http_uri,end_output,PipeSessionInfo,write_rec_to_http,write_http_output
main_process_uri = None
pipe_to_http_uri_prefix = None
def initialize():
"""Initialize the service at server startup time."""
global main_process_uri
global cache_timeout
global pipe_to_http_uri_prefix
conf = snap_config.get_instance()
main_process_uri = conf.get_section("common")['main_process_uri']
pipe_to_http_uri_prefix = conf.get_section("main")["pipe_to_http_uri_prefix"]
def rh_http_to_pipe(http_req):
"""
Called at runtime to start the processing of POST request by this service.
@param http_req: HTTP request object.
@type http_req: L{HttpRequest}
"""
if http_req.method != 'POST':
return RhResponse(http_req.METHOD_NOT_ALLOWED, "HTTP method %s not supported" % http_req.method)
pinfo = PipeSessionInfo(http_req, pipe_to_http_uri_prefix)
# Remove the special params from the param list.
args = {}
for a in http_req.params:
if a.startswith("sn."):
continue
args[a] = http_req.params[a]
ret = process_http_uri(http_req, pinfo)
if ret is not None:
return ret
try:
ret = _run_resource(pinfo, args, http_req)
except Exception, e:
server.elog(e, "Failed to process request for resource %s" % http_req.path)
return RhResponse(http_req.INTERNAL_SERVER_ERROR, "Failed to process request for resource %s." % http_req.path)
if ret is not None:
server.log(snap_log.LEVEL_ERR, "Failed to process request for resource %s" % http_req.path)
return ret
try:
end_output(pinfo)
except Exception, e:
server.elog(e, "Failed to process request for resource %s." % http_req.path)
return RhResponse(http_req.INTERNAL_SERVER_ERROR, "Failed to process request for resource %s." % http_req.path)
def _run_resource(pinfo, args, http_req = None):
"""
Run the pipeline specified in the URL.
@param pinfo: Information related to the current request session.
@type pinfo: L{PipeSessionInfo}
@param args: The params for the pipeline.
@type args: dict
@return: None, if all goes well, else RhResponse object.
@rtype: L{RhResponse}
"""
resource_uri = snap_http_lib.concat_paths(main_process_uri, pinfo.resource_uri)
# See if we need to send output from some output view
if pinfo.output_view_name is not None:
outputs = {pinfo.output_view_name : pinfo.content_type}
else:
outputs = None
if pinfo.input_is_record:
pinfo.http_req.make_input_rp()
client_inp = pinfo.http_req.input
inputs = {pinfo.input_view_name : None}
else:
client_inp = pinfo.http_req._input
inputs = {pinfo.input_view_name : pinfo.http_req.http_content_type}
# We use the server token as auth to make the resource execution request.
# The invoker header is set to the username (if any) that was used to execute the POST request.
custom_header = {headers.SERVER_TOKEN_HEADER : server.server_token}
if pinfo.http_req.username is not None:
custom_header[headers.INVOKER_HEADER] = pinfo.http_req.username
try:
h = exec_interface.exec_resource("HttpToPipe", resource_uri, pinfo.resdef, inputs, outputs, args)
except SnapiHttpException, e:
return RhResponse(e.status, e.body)
inp = h.inputs[pinfo.input_view_name]
if pinfo.input_is_record:
for rec in client_inp:
inp.write_record(rec)
else:
d = client_inp.read(500)
while len(d) > 0:
inp.write_binary(d)
d = client_inp.read(500)
inp.close()
if pinfo.output_view_name is not None:
res_output = h.outputs[pinfo.output_view_name]
try:
if pinfo.output_is_record:
# Next, write records out to the client and to the cache.
r = res_output.read_record()
while r is not None:
pinfo.current_record_num += 1
write_rec_to_http(pinfo, r)
r = res_output.read_record()
else:
d = res_output.read_binary(500)
while d is not None:
write_http_output(pinfo, d)
d = res_output.read_binary(500)
if pinfo.count_req is not None:
write_http_output(pinfo, pinfo.current_record_num)
end_output(pinfo)
finally:
res_output.close()
s = h.wait(1)
if s.state != s.Completed:
return RhResponse(http_req.INTERNAL_SERVER_ERROR, "Failed to process request for resource %s (state %s)." %
(http_req.path, s.state))
return None
|