# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id:component_runtime.py 1006 2008-01-25 04:06:55Z dhiraj $
import re
import time
import copy
import threading
import decimal
import datetime
from pprint import PrettyPrinter
from snaplogic import rp
from snaplogic import cc
from snaplogic.common.snap_exceptions import *
from snaplogic.common import snap_log
from snaplogic.common.runtime_status import RuntimeStatus
from snaplogic.common import snap_http_lib
from snaplogic.common import runtime_table,snap_params
from snaplogic.common import resource_runtime
from snaplogic.common import snap_control
from snaplogic.common import snapstream
from snaplogic.common.snapstream.stream_driver import StreamDriver
from snaplogic.common.snapstream.url_reader import URLReader
from snaplogic.common.snapstream import rh_frontend
from snaplogic.common import uri_prefix
from snaplogic import snapi_base
from snaplogic.snapi_base import resdef
from snaplogic.snapi_base import keys
from snaplogic.cc.input_view import InputView
from snaplogic.cc.output_view import OutputView
from snaplogic.cc import registration
import snaplogic.cc.prop as prop
validate_recs = True
# Flags for data tracing that are set based on
# the config file. These global flags will be
# OR-ed with the sn.trace_data parameter values
# gotten per pipeline.
glob_inp_trace_flag = False
glob_out_trace_flag = False
pp = PrettyPrinter(indent=4)
def init(conf):
global validate_recs, glob_inp_trace_flag, glob_out_trace_flag
if "type_check_records" in conf:
if conf["type_check_records"] == "yes":
validate_recs = True
elif conf["type_check_records"] == "no":
validate_recs = False
else:
raise SnapException("Config file entry 'type_check_records' must have a yes/no value, found '%s'" %
conf["type_check_records"])
if snap_params.TRACE_DATA_CONFIG in conf:
trace_config = conf[snap_params.TRACE_DATA_CONFIG]
if isinstance(trace_config, list):
trace_config = ','.join(trace_config)
(glob_inp_trace_flag, glob_out_trace_flag, err_msg) = get_trace_flags(trace_config)
if err_msg:
cc.log(snap_log.LEVEL_WARN,
"Error parsing config option %s: %s, tracing will not be set" % (snap_params.TRACE_DATA_CONFIG, err_msg))
glob_inp_trace_flag = glob_out_trace_flag = False
class ComponentThread(threading.Thread):
"""
Encapsulates the component class and provides thread interface.
The primary purpose of this class is to keep the component's backend code out of
the ComponentAPI codehus providing the component author with simple code to refer
to and also in the process, minimize chances of accidentally disrupting the backend
code in the derived component class.
"""
def __init__(self, comp_object, resource_name, context_name, rid, inp_trace_flag, out_trace_flag):
"""
Initialize the ComponentThread object.
@param comp_object: The object of a class derived from ComponentAPI.
@type comp_object: derived from L{ComponentAPI}
@param resource_name: The name given to the resource inside the pipeline that
is executing it. Or, a name given by a client that is directly eexecuting
the resource. The resource here is based on the component.
@type resource_name: str
@param trace_flag: If set to true, that record tracing should be enabled.
@type trace_flag: bool
"""
threading.Thread.__init__(self)
# The instantiated component object.
self.component = comp_object
"""The component object being executed by this thread."""
self.got_stopped = False
"""This flag is set to True when a STOP request is received."""
# Staging area for collecting output stream connections, as they trickle in.
# These will be organized under their respective output view objects, before
# the component is started.
self._output_streams = []
# This dictionary keeps track of input streams that are using HTTP POST method
# to send data.
self._posted_input_streams = {}
self.input_views = {}
"""Input views of the component that have got streams associated with them. Dict of L{InputView}"""
self.output_views = {}
"""Output views of the component that have got streams associated with them. Dict of L{OutputView}"""
self.inp_trace_flag = inp_trace_flag
"""Flag to indicate if input tracing is enabled or disabled."""
self.out_trace_flag = out_trace_flag
"""Flag to indicate if output tracing is enabled or disabled."""
self.top_pipeline_rid = rid.split(".")[0]
""" The rid of the topmost pipeline, inside which this resource runs."""
self.fully_qualified_resource_name = \
context_name + "." + resource_name if context_name is not None else resource_name
self.setName(resource_name)
self.rid = rid
cc.stats_group.get_stat("number_of_spawned_component_threads").inc()
def set_finished_state(self, runtime_status_uri, state):
"""
Change state of runtime resource.
This method locks and changes the state of resource runtime. It should NOT be called
if the caller has already got a lock the resource runtime object.
@param runtime_status_uri: The status URI of the resource runtime object that must be
modified.
@type runtime_status_uri: str
@param state: The new state to be set to.
@type state: str
"""
res_rt = runtime_table.get_status_uri_entry(runtime_status_uri)
res_rt.lock.acquire()
try:
res_rt.status.state = state
res_rt.exit_time = time.time()
# Remove any POST input URIs.
del_uri = [v.uri for v in res_rt.input_views.values() if v.http_method == "POST"]
if len(del_uri) > 0:
runtime_table.remove_runtime_view_uris(del_uri)
finally:
res_rt.lock.release()
def process_posted_input_stream(self, http_req, res_rt, view_name):
"""
Sending POST data by stream method (specifically in HTTP 1.0)
is a challenge, as one has to specify content length of data being sent upfront.
To deal with this issue, it was decided that CC will accept a sequence of POSTs,
to the same input URI, to allow data to be sent in chunks.
The dictionary below will indicate to followup POST requests, that a initial
POST request to the view has already been sent and has resulted in a stream and
view object being created and therefore there is no need to create a view object.
The followup request only needs to interact with the snap stream module to get
associated with the existing snap stream.
@param http_req: The HTTP POST request.
@type http_req: L{HttpRequest}
@param res_rt: Resource runtime which has that input view.
@type res_rt: L{resource_runtime.ServerResourceRuntime}
@param view_name: Name of the input view.
@type view_name: str
"""
res_rt.lock.acquire()
try:
if view_name not in self._posted_input_streams:
# This is the first POST to this input view, create an input view object for it.
if res_rt.status.state != RuntimeStatus.Prepared:
self.component.log(snap_log.LEVEL_ERR,
"Received initial POST on input view %s (resource %s) when not in Prepared state"
% (view_name, res_rt.resource_uri))
return (http_req.FORBIDDEN, "The resource is already running.")
ret = self._add_posted_input_view_object(http_req, res_rt, view_name)
if ret is None:
return (http_req.INTERNAL_SERVER_ERROR, None)
else:
(pipe, stream) = ret
# We use this dictionary, even though similar view information is stored in the ComponentAPI
# class, because there is always a chance that the data in ComponentAPI class may be accidentally
# messed up by component authors.
self._posted_input_streams[view_name] = stream
else:
# This is a followup POST.
if res_rt.resdef.input_is_record_mode(view_name):
content_types = None
else:
content_types = res_rt.resdef.get_input_view_content_types(view_name)
(pipe, stream) = request_handler.process_post(http_req, content_types, view_name)
# Just do a sanity check to make sure snap stream layer has returned the same stream
# object as the last time.
if stream != self._posted_input_streams[view_name]:
self.component.log(snap_log.LEVEL_ERR,
"The stream object for followup POST to input view %s (%s) does not match %s" %
(view_name, res_rt.resource_uri, http_req.snap_stream_continue))
return (http_req.INTERNAL_SERVER_ERROR, None)
finally:
res_rt.lock.release()
# Finally, we let the thread go into a loop, reading from the POST.
request_handler.stream_input(http_req, pipe)
def _add_posted_input_view_object(self, http_req, res_rt, view_name):
"""
Create a input view object for the first POST to an input view URI.
@param http_req: The HTTP POST request.
@type http_req: L{HttpRequest}
@param res_rt: Resource runtime which has that input view.
@type res_rt: L{resource_runtime.ServerResourceRuntime}
@param view_name: Name of the input view.
@type view_name: str
@return: A sequence (pipe, snap stream) if successful, else None
@rtype: 2-tuple or None.
"""
if res_rt.resdef.input_is_record_mode(view_name):
is_pass_thru = res_rt.input_views[view_name].is_pass_through
(field_mapper, field_numbers, null_dest_field_indices) = _process_field_links(res_rt, res_rt.resdef,
view_name)
fields = res_rt.resdef.list_input_view_fields(view_name)
# User defined content types is only a feature of binary mode views.
content_types = None
else:
content_types = res_rt.resdef.get_input_view_content_types(view_name)
# Fields and pass-through is only a feature of record mode views.
is_pass_thru = False
fields = None
field_mapper = None
field_numbers = None
null_dest_field_indices = None
(pipe, stream) = request_handler.process_post(http_req, content_types, view_name)
if stream is None:
res_rt.log(snap_log.LEVEL_ERR, "Failed to create stream for POST request to %s" % http_req.path)
return None
inp_view = InputView(view_name, stream, fields, field_mapper, null_dest_field_indices,
is_pass_thru, self.inp_trace_flag, self.fully_qualified_resource_name,
self.top_pipeline_rid)
if view_name in self.input_views:
# This should never happen.
res_rt.log(snap_log.LEVEL_ERR, "CC has strange mismatch for resource %s, POST-ed view %s URI %s" %
(res_rt.resource_uri, view_name, http_req.path))
return None
self.input_views[view_name] = inp_view
return (pipe, stream)
def setup_input_views(self, res_def, res_rt):
"""
Establishes snap stream with all the input sources by calling GET on them. For Input views, that
have been requested to provide a URI for POST-ing, a URI is setup. In this case, The actual stream
for that view be setup when the POST request arrives.
@returns: Returns the list of input view objects created.
@rtype: list of L{InputView}
"""
input_views = []
for (inp_name, view) in res_rt.input_views.iteritems():
if view.http_method == "POST":
# We don't create a stream for this. We register a URI for it.
u = runtime_table.add_runtime_view_entry(res_rt.resource_uri, view.is_record,
(res_rt.runtime_status_uri, False, inp_name))
res_rt.input_views[inp_name].uri = snap_http_lib.concat_paths(cc.my_process_uri, u)
continue
if res_def.input_is_record_mode(inp_name):
(field_mapper, field_numbers, null_dest_field_indices) = _process_field_links(res_rt, res_def, inp_name)
fields = res_def.list_input_view_fields(view.name)
if view.is_pass_through:
uri = view.uri
else:
# Does not accept all source view fields, as it isn't a pass-through input view.
# For efficiency reasons, tell the source view what fields we are interested in,
# while making the GET request
uri = _append_params_to_view_uri(view.uri, field_numbers)
# Record mode views don't invlove user defined content types.
content_types = None
res_rt.log(snap_log.LEVEL_DEBUG, "Create input stream: view <%s> field_links <%s> uri <%s>"
% (view.name, view.field_links, uri))
else:
uri = view.uri
content_types = res_def.get_input_view_content_types(inp_name)
# Binary mode views don't involve fields and pass-through
fields = None
field_mapper = None
field_numbers = None
null_dest_field_indices = None
try:
stream = snapstream.open_url(uri, content_types, view.name)
stream.field_numbers = field_numbers
inp_view = InputView(view.name, stream, fields, field_mapper, null_dest_field_indices,
view.is_pass_through, self.inp_trace_flag, self.fully_qualified_resource_name,
self.top_pipeline_rid)
input_views.append(inp_view)
# We only need to start the stream driver if we're reading from the URL
if isinstance(stream, URLReader):
driver = StreamDriver(stream, res_rt)
driver.start()
except SnapException, e:
enew = SnapIOError("Failed to setup input streams for resource %s (%s). Target URI %s" %
(res_rt.resource_uri, res_rt.resource_name, uri))
raise SnapException.chain(enew, e)
# Setup the input streams as we must have got input URLs from the prepare request
for inp_view in input_views:
self.input_views[inp_view.name] = inp_view
def run(self):
res_rt = runtime_table.get_status_uri_entry(self.component.runtime_status_uri)
res_rt.lock.acquire()
try:
output_uris = []
for v in res_rt.output_views.values():
local_outputs = snapstream.open_local_writers(v.uri)
self._output_streams += local_outputs
output_uris.append(v.uri)
snapstream.unregister_output_views(output_uris)
finally:
res_rt.lock.release()
view_dict = {}
for out in self._output_streams:
if out.view_name not in view_dict:
view_dict[out.view_name] = [out]
else:
view_dict[out.view_name].append(out)
resdef = self.component._resdef
for view_name in view_dict:
if resdef.output_is_record_mode(view_name):
fields = resdef.list_output_view_fields(view_name)
if view_name in resdef.list_pass_through_output_views():
pass_through_inputs = resdef.get_output_view_pass_through(view_name)[keys.INPUT_VIEWS]
else:
pass_through_inputs = None
else:
fields = None
pass_through_inputs = None
view = resdef.get_output_view(view_name)
self.output_views[view_name] = OutputView(view_name, view_dict[view_name], fields,
pass_through_inputs, validate_recs, self.out_trace_flag,
self.fully_qualified_resource_name, self.top_pipeline_rid)
for inp in self.input_views.values():
self.component._stream_to_input_view[inp.snap_stream] = inp
exec_complete = False
try:
self.component.execute(dict(self.input_views), dict(self.output_views))
exec_complete = True
except SnapStreamError, e:
if self.got_stopped:
self.component.log(snap_log.LEVEL_INFO, "Component %s (resource name %s) was stopped" %
(self.component.__class__.__name__, self.getName()))
else:
# Log specifically as SNAPSTREAM error. This will allow us to filter out snap stream
# messages using the facility ID.
(ignore_log, ss_elog, ignore_rlog) = cc.logger.make_specific_loggers(snap_log.LOG_SNAPSTREAM,
self.rid, self.getName(),
self.component._invoker_username)
ss_elog(e, "Component %s execution failed with snapstream exception" % self.component.__class__.__name__)
except Exception, e:
self.component.elog(e, "Component execution failed")
finally:
if self.got_stopped:
self.set_finished_state(self.component.runtime_status_uri, RuntimeStatus.Stopped)
elif exec_complete:
# We had a graceful return from the process() method and no one tried stopped the component
self.set_finished_state(self.component.runtime_status_uri, RuntimeStatus.Completed)
else:
# The component exited with a failure.
self.set_finished_state(self.component.runtime_status_uri, RuntimeStatus.Failed)
for inp in self.input_views.values():
try:
inp._close()
except:
pass
for out in self.output_views.values():
try:
out._close()
except:
pass
if self.component.notification_uri is not None:
try:
res_rt = runtime_table.get_status_uri_entry(self.component.runtime_status_uri)
res_rt.lock.acquire()
try:
res_rt.status.statistics = _gather_statistics(self)
notify_req = snap_control.create_notification_request(res_rt.resource_name, res_rt.status)
finally:
res_rt.lock.release()
snapi_base.send_req("PUT", self.component.notification_uri, notify_req)
except Exception, e:
# Just log and move on.
self.component.elog(e, "Component resource notification send failed")
def stop(self):
"""Stop the running component."""
self.got_stopped = True
self.component.got_stopped = True
# Close the streams of the component. That should get the attention of the component.
for inp in self.input_views.values():
try:
inp._close()
except:
pass
for out in self.output_views.values():
try:
out._close()
except:
pass
def get_trace_flags(trace_option):
"""
Parses the trace_option and returns the boolean flags
or an error message.
@param trace_option: option setting trace - L{snap_params.VALUE_INPUT}, L{snap_params.VALUE_OUTPUT},
or both, comma-separated
@type trace_option: str
@return: a tuple with 3 elements:
1. input_trace flag (bool)
2. output_trace flag (bool)
3. error message (if this is not None, the above flags should be ignored -
we could not parse the trace_option)
@rtype: tuple
"""
err_msg = None
if trace_option is None:
return (False, False, None)
if not isinstance(trace_option, str) and not isinstance(trace_option, unicode):
return (False, False, "String or unicode expected, received %s" % type(trace_option))
trace_option = trace_option.strip()
if not trace_option:
return (False, False, None)
inp_trace_flag = out_trace_flag = False
l = trace_option.split(",")
for v in l:
v = v.strip().lower()
if not v:
continue
if v == snap_params.VALUE_INPUT.lower():
inp_trace_flag = True
elif v == snap_params.VALUE_OUTPUT.lower():
out_trace_flag = True
else:
err_msg = 'Expected one of: "%s", "%s", "%s,%s"; received %s' % (snap_params.VALUE_INPUT, snap_params.VALUE_OUTPUT, snap_params.VALUE_INPUT, snap_params.VALUE_OUTPUT, trace_option)
break
return (inp_trace_flag, out_trace_flag, err_msg)
def process_prepare_request(http_req):
"""
Prepare a component for execution.
This PREPARE request results in the creation of a resource runtime object (res_rt)
containing information from the PREPARE request like:
- pipeline's runtime id (if any)
- resource name of the component in a pipeline or the naem given by a client
- URIs of output views that the component resource is linked to.
- HTTP methods used to access input views (POST or GET)
- Params for the resource.
The component now needs to provide the following missing information in the res_rt.
- Runtime status URI for the resource runtime that was just created. This is made
available by the runtime table, when the res_rt is added to it.
- Runtime output view URIs for the output views of the component. These URIs are
created by adding entries to the view runtime table
- Runtime input view URIs, if the agent wants to use http method POST to send data
to the input view. Again, these URIs are created by adding entries to the view
runtime table
Once this information is generated by this function, it is returned in the PREPARE
response document.
@param http_req: The HTTP request that initiated the PREPARE request.
@type http_req: L{HttpRequest}
@return: (HTTP status code, response object). The response object will be an error
string the HTTP code is not OK. If it is OK, then the response object will be
the PREPARE response document.
@rtype: 2-tuple
"""
resource_uri = http_req.path
http_req.make_input_rp()
try:
mesg = http_req.input.next()
except StopIteration:
cc.log(snap_log.LEVEL_ERR, "POST to resource URI %s was missing prepare message" % resource_uri)
return (http_req.BAD_REQUEST, "Resource definition missing")
#print pp.pformat(mesg)
try:
resdef_dict = http_req.input.next()
except StopIteration:
return (http_req.BAD_REQUEST, "Resource definition missing")
try:
# Routine cleanup of runtime table.
runtime_table.remove_completed_runtime_entries()
res_rt = resource_runtime.ServerResourceRuntime(resource_uri)
snap_control.parse_prepare_request(mesg, res_rt)
res_def = resdef_module.ResDef(resdef_dict)
(res_rt.log, res_rt.elog, ignore_rlog) = cc.logger.make_specific_loggers(res_def.get_component_name(),
res_rt.rid, res_rt.resource_name, http_req.invoker)
except Exception, e:
cc.elog(e, "Prepare request to %s failed" % resource_uri)
return (http_req.INTERNAL_SERVER_ERROR, None)
try:
retval = resource_runtime.synch_with_resdef(res_rt, res_def)
if retval is not None:
return retval
comp_name = res_def.get_component_name()
if comp_name is None:
cc.log(snap_log.LEVEL_ERR, "POST to resource URI %s had a resdef without a component name." % resource_uri)
return (http_req.BAD_REQUEST, "Resource definition has not component name in it")
res_rt.log(snap_log.LEVEL_DEBUG, "Doing prepare for resource name \"%s\", component \"%s\"" %
(res_rt.resource_name, comp_name))
comp_class = registration.get_component_class(comp_name)
if comp_class is None:
res_rt.log(snap_log.LEVEL_ERR, "Component %s specified in resource definition %s was not found."
%(comp_name, resource_uri))
return (http_req.INTERNAL_SERVER_ERROR, "Resource definition for %s was invalid" % resource_uri)
# Create and populate the component object.
component = comp_class()
config_dict = registration.get_component_config(comp_name)
r = res_def._check_param_values(res_rt.parameters)
if r is not None:
return (http_req.BAD_REQUEST, r)
params = res_def._update_param_values_with_defaults(res_rt.parameters)
# Remove the snaplagic specific param, as it should not be passed onto the component user space.
trace_param = params.pop(snap_params.TRACE_DATA, None)
(inp_trace_flag, out_trace_flag, err_msg) = get_trace_flags(trace_param)
if err_msg:
return (http_req.BAD_REQUEST, "Invalid value for %s: %s" % (snap_params.TRACE_DATA, err_msg))
inp_trace_flag = inp_trace_flag or glob_inp_trace_flag
out_trace_flag = out_trace_flag or glob_out_trace_flag
# This substitutes param values into resdef properties.
prop.substitute_params_in_resdef(res_def, params)
component._init_comp(config_dict, res_def, res_rt.resource_name, res_rt.rid, False, http_req.invoker,
res_rt.resource_ref)
component._set_parameters(params)
res_rt.status.state = RuntimeStatus.Prepared
res_rt.resdef = res_def
runtime_table.add_runtime_entry(res_rt, cc.my_process_uri)
res_rt.lock.acquire()
try:
component.runtime_status_uri = res_rt.runtime_status_uri
res_rt.comp_thread = ComponentThread(component, res_rt.resource_name, res_rt.context_name,
res_rt.rid, inp_trace_flag, out_trace_flag)
res_rt.comp_thread.setup_input_views(res_def, res_rt)
# Populate the output view URIs
_setup_output_view_uris(res_rt)
# Create the prepare response document and send it out.
resp = snap_control.create_prepare_response(res_rt)
finally:
res_rt.lock.release()
return (http_req.OK, resp)
except Exception, e:
res_rt.elog(e, "Prepare request to %s failed" % resource_uri)
return (http_req.INTERNAL_SERVER_ERROR, None)
def _setup_output_view_uris(res_rt):
"""Creates and sets output URIs for output views in the runtime table"""
for out_name in res_rt.output_views:
if res_rt.resdef.output_is_record_mode(out_name):
content_types = None
else:
content_types = res_rt.resdef.get_output_view_content_types(out_name)
u = runtime_table.add_runtime_view_entry(res_rt.resource_uri, res_rt.output_views[out_name].is_record,
(res_rt.runtime_status_uri, True, out_name))
res_rt.output_views[out_name].uri = snap_http_lib.concat_paths(cc.my_process_uri, u)
snapstream.register_output_view(res_rt.output_views[out_name].uri, content_types, out_name)
def _process_field_links(res_rt, res_def, inp_view_name):
"""
Process the field links received in the prepare request.
The field links are received in the prepare request in the following format:
[
["src_field1", ["dest_field1", "dest_field2"]],
["src_field2", []],
["src_field3", ["dest_field3"],
[None, ["dest_field4", "dest_field5"]] # Destination fields that have been hard coded to the value None.
]
In this format, every source field name is specified (whether it is linked or not) in the order
defined by the output view. The destination field names linked to the source field are specified as a list,
since there can be more than one. If the source field is not linked, then an empty list should be specified.
"""
view = res_rt.input_views[inp_view_name]
input_field_names = res_def.list_input_field_names(inp_view_name)
null_dest_field_indices = []
if view.field_links == resource_runtime.EXACT_MATCH:
# EXACT_MATCH means that the requestor has not bothered to provide field link information,
# but has assured the input view that data sent will exactly match the fields described by
# the input view. For this reason, we cookup a field link here, which exactly matches
# the fields of the input view. This situation typically happens when the agent sending
# data to the input view is not another component in a pipeline, but some client using
# snapi interface to POST data to the input view. Such a client does not have ability to
# create field links.
field_links = []
for f in input_field_names:
field_links.append((f, [f]))
else:
# Remove the nullified dest fields information from field_links and place it in a separate list.
field_links = []
unmapped_fields = list(input_field_names)
for (f, l) in view.field_links:
for dest_name in l:
if dest_name not in input_field_names:
raise SnapValueError(
"Resource %s (%s) input view \"%s\" does not have field \"%s\ specified in field linking" %
(res_rt.resource_uri, res_rt.resource_name, inp_view_name, dest_name))
if dest_name not in unmapped_fields:
# If dest field_name was in input_field_names list, but not in unmapped_fields, then it must be
# because the field has been linked to more than one source field.
raise SnapValueError("Resource %s (%s) input view \"%s\" has the input field \"%s\" linked twice" %
(res_rt.resource_uri, res_rt.resource_name, inp_view_name, dest_name))
unmapped_fields.remove(dest_name)
if f is None:
null_dest_field_indices = [input_field_names.index(dname) for dname in l]
else:
field_links.append((f, l))
if len(unmapped_fields) != 0:
raise SnapValueError("Resource %s (%s) input view \"%s\" has unlinked fields %s" %
(res_rt.resource_uri, res_rt.resource_name, inp_view_name, unmapped_fields))
src_field_view_size = len(field_links)
src_field_position_nums = None
if not view.is_pass_through:
# If this view is not pass through, then filter out the source fields that are not manually linked.
src_position = 0
src_field_position_nums = []
new_links = []
for (src_field_name, dest_field_names) in field_links:
if len(dest_field_names) > 0:
src_field_position_nums.append(src_position)
new_links.append((src_field_name, dest_field_names))
src_position += 1
field_links = new_links
unmapped_fields = list(input_field_names)
field_mapper = []
for (src_field_name, dest_field_names) in field_links:
if len(dest_field_names) == 0:
# Pass through field
field_mapper.append(None)
continue
# Set the indices of these destination fields in the field mapper
field_mapper.append([input_field_names.index(dest_name) for dest_name in dest_field_names])
if (src_field_position_nums is not None) and (len(src_field_position_nums) == src_field_view_size):
# If we are using all the source field names, then there is no need to return the source field
# positions needed, we want all of them from the source.
src_field_position_nums = None
return (field_mapper, src_field_position_nums, null_dest_field_indices)
def _append_params_to_view_uri(uri, src_field_nums):
"""
Appends field numbers specified as parameter in the URL args
"""
params = {}
if src_field_nums is not None:
prev = None
strval = ""
skipped_one = False
src_field_nums.sort()
for num in src_field_nums:
if prev != None and prev + 1 == num :
prev = num
# It is contiguous, instead of writing 1,2,3 the aim is to write 1-3
skipped_one = True
continue
if skipped_one:
strval += ("-%s,%s" % (prev, num))
skipped_one = False
prev = num
continue
prev = num
if len(strval):
strval += "," + str(num)
else:
strval += str(num)
if skipped_one:
strval += ("-%s" % num)
params[snap_params.FIELD_NUMBERS] = strval
return snap_http_lib.add_params_to_uri(uri, params)
def process_runtime_put_request(http_req):
"""
Called when CC gets a PUT request to a runtime URI.
This PUT request can be one of two possible requests:
1) A start request to a component runtime that is in prepared state. The URI is the status URI previously
returned in the prepare response.
2) A stop request to a component runtime that is in prepared or started state. The URI is the status URI
previously returned in the prepare response.
"""
res_rt = runtime_table.get_status_uri_entry(http_req.path, http_req.method)
http_req.make_input_rp()
try:
mesg = http_req.input.next()
except StopIteration:
res_rt.log(snap_log.LEVEL_ERR, "PUT request did not have data")
return (http_req.BAD_REQUEST, "PUT request did not have data")
response = None
(req, req_data) = snap_control.parse_put_request(mesg)
res_rt.lock.acquire()
try:
state = res_rt.status.state
# Start Request
if req == "start":
if state == RuntimeStatus.Prepared:
output_uris = [v.uri for v in res_rt.output_views.values()]
runtime_table.remove_runtime_view_uris(output_uris)
res_rt.comp_thread.component.notification_uri = req_data
res_rt.comp_thread.start()
res_rt.status.state = RuntimeStatus.Started
response = snap_control.create_start_stop_response(res_rt.status)
else:
res_rt.log(snap_log.LEVEL_ERR, "Cannot send start request to resource %s (%s) that is in state: %s" %
(res_rt.resource_uri, res_rt.resource_name, state))
return (http_req.FORBIDDEN, "The resource is not in prepared state")
# Stop Request
elif req == "stop":
if state == RuntimeStatus.Prepared:
# It is only in prepared stated. There is no component thread running. Just mark the runtime
# state as stopped and set the time of exit of this runtime resource.
output_uris = [v.uri for v in res_rt.output_views.values()]
runtime_table.remove_runtime_view_uris(output_uris +
[v.uri for v in res_rt.input_views.values() if v.http_method == "POST"])
snapstream.unregister_output_views(output_uris)
res_rt.status.state = RuntimeStatus.Stopped
res_rt.exit_time = time.time()
response = snap_control.create_start_stop_response(res_rt.status)
elif state == RuntimeStatus.Started:
# We have already started the component thread. Request that thread to stop and set state
# to Stopping.
res_rt.comp_thread.stop()
res_rt.status.state = RuntimeStatus.Stopping
response = snap_control.create_start_stop_response(res_rt.status)
elif state not in (RuntimeStatus.Completed, RuntimeStatus.Failed,
RuntimeStatus.Stopped, RuntimeStatus.Stopping):
# If the component has already entered one of the 3 finished states or Stopping state, then
# we ignore this stop request. If not, then we should have never received this stop request,
# raise an error.
res_rt.log(snap_log.LEVEL_ERR, "Cannot send stop request to resource %s (%s) that is in state: %s" %
(res_rt.resource_uri, res_rt.resource_name, state))
return (http_req.FORBIDDEN, "Resource in state %s cannot be stopped" % state)
else:
res_rt.log(snap_log.LEVEL_ERR, "Unknown PUT request to resource %s (%s): %s" %
(res_rt.resource_uri, res_rt.resource_name, req))
return (http_req.BAD_REQUEST, None)
finally:
res_rt.lock.release()
return (http_req.OK, response)
def process_runtime_get_request(http_req):
"""
Process a get request for a runtime URI.
This can be one of two possible types of request:
1) A request to start a snapstream to a runtime output view URI.
2) A request to get status of the component runtime.
"""
if http_req.path.startswith(uri_prefix.RUNTIME_STATUS_ENTRY):
resp = _status_get_request(http_req)
return (http_req.OK, resp)
elif http_req.path.startswith(uri_prefix.RUNTIME_VIEW):
ret = _output_view_get_request(http_req)
# No status can be written after processing streaming data. Just return None.
return ret
else:
cc.log(snap_log.LEVEL_ERR, "No such runtime uri found (%s)" % http_req.path)
return (http_req.NOT_FOUND, "No such runtime uri found (%s)" % http_req.path)
def _status_get_request(http_req):
"""
Process request for the component runtime status.
"""
res_rt = runtime_table.get_status_uri_entry(http_req.path, http_req.method)
res_rt.lock.acquire()
try:
res_rt.status.statistics = _gather_statistics(res_rt.comp_thread)
resp = snap_control.create_status_info(res_rt.status)
finally:
res_rt.lock.release()
return resp
def _gather_statistics(comp_thread):
statistics = {}
statistics[keys.INPUT_VIEWS] = {}
statistics[keys.OUTPUT_VIEWS] = {}
for inp_name in comp_thread.input_views:
inp_stat = statistics[keys.INPUT_VIEWS][inp_name] = {}
if comp_thread.input_views[inp_name].is_binary:
inp_stat[keys.STATISTICS_BYTE_COUNT] = comp_thread.input_views[inp_name].byte_count
else:
inp_stat[keys.STATISTICS_RECORD_COUNT] = comp_thread.input_views[inp_name].record_count
for out_name in comp_thread.output_views:
out_stat = statistics[keys.OUTPUT_VIEWS][out_name] = {}
if comp_thread.output_views[out_name].is_binary:
out_stat[keys.STATISTICS_BYTE_COUNT] = comp_thread.output_views[out_name].byte_count
else:
out_stat[keys.STATISTICS_RECORD_COUNT] = comp_thread.output_views[out_name].record_count
return statistics
def _output_view_get_request(http_req):
"""
Add an output stream to the component which was created as a result of a GET request
"""
try:
field_numbers = _parse_view_uri_params(http_req.params)
(runtime_status_uri, is_output, view_name) = runtime_table.get_view_uri_entry(http_req.path)
res_rt = runtime_table.get_status_uri_entry(runtime_status_uri)
except Exception, e:
cc.elog(e, "Failed to process GET request to output view at %s" % http_req.path)
return (http_req.INTERNAL_SERVER_ERROR, None)
if not is_output:
res_rt.log(snap_log.LEVEL_ERR, "Input view \"%s\" (uri %s) received a GET request. Expecting a POST" %
(view_name, http_req.path))
return (http_req.METHOD_NOT_ALLOWED, None)
res_rt.lock.acquire()
try:
if res_rt.resdef.output_is_record_mode(view_name):
content_types = None
else:
content_types = res_rt.resdef.get_output_view_content_types(view_name)
finally:
res_rt.lock.release()
(pipe, stream) = request_handler.process_get(http_req, content_types, view_name)
if stream is None:
res_rt.log(snap_log.LEVEL_ERR, "Failed to create stream for GET request to %s" % http_req.path)
return (http_req.INTERNAL_SERVER_ERROR, None)
# Since each output view can have multiple streams, we need to keep field_numbers information on a
# per-stream basis.
stream.field_numbers = field_numbers
res_rt.lock.acquire()
try:
res_rt.comp_thread._output_streams.append(stream)
finally:
res_rt.lock.release()
# This handler method will go into a loop and the thread will return only when the stream is
# closed.
request_handler.stream_output(http_req, pipe)
return None
def _parse_view_uri_params(params):
"""
Get field numbers specified as parameter in the request URL.
This is specified, when the downstream input view is requesting only a subset of the
output view's fields. This is an optimization to prevent an output view from sending
fields that are not even linked to the downstream input view.
@param params: A param dictionary that might have the fields list param.
@type params: dict
@return: List of field numbers that have been requested by down stream input view.
@rtype: list
"""
num_list = None
strval = params.get(snap_params.FIELD_NUMBERS)
if strval is not None:
values = strval.split(',')
num_list = []
for v in values:
if v.find('-') != -1:
vrange = v.split('-')
if len(vrange) != 2:
raise SnapFormatError("Field number arg <%s> has invalid range <%s>" %
(strval, v))
try:
lowval = int(vrange[0])
highval = int(vrange[1])
except ValueError:
raise SnapFormatError("Field number arg <%s> has non numeric range <%s>" %
(strval, v))
if highval <= lowval:
raise SnapFormatError("Field number arg <%s> has invalid range <%s>" %
(strval, v))
tmplist = range(lowval, highval + 1)
num_list.extend(tmplist)
else:
try:
val = int(v)
except ValueError:
raise SnapFormatError("Field number arg <%s> has non numeric value <%s>" %
(strval, v))
num_list.append(val)
num_list.sort()
return num_list
def process_runtime_post_request(http_req):
"""
Process POST request to an input view URI.
This request is made by a external client, POSTing to input view.
"""
if http_req.path.startswith(uri_prefix.RUNTIME_RECORD_VIEW):
# We only fetch an RP for the record mode view. We should not do it for binary mode view.
http_req.make_input_rp()
(runtime_status_uri, is_output, view_name) = runtime_table.get_view_uri_entry(http_req.path)
res_rt = runtime_table.get_status_uri_entry(runtime_status_uri)
if is_output:
res_rt.log(snap_log.LEVEL_ERR,
"Output view \"%s\" (resource %s uri %s) received a POST request. Expecting a GET"
% (view_name, res_rt.resource_name, res_rt.resource_uri))
return (http_req.METHOD_NOT_ALLOWED, None)
res_rt.comp_thread.process_posted_input_stream(http_req, res_rt, view_name)
return None
|