# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: pipeline_manager.py 10295 2009-12-22 23:27:28Z dhiraj $
from __future__ import with_statement
"""
The pipeline manager (PM) module provides the functionality needed to
run a pipeline.
Terms:
agent - This could be the PM of a pipeline which encloses this pipeline
or it could be a client which is directly invoking this pipeline.
Description:
It provides the interfaces to Prepare, Start, Stop and Monitor a
pipeline resource. The module typically resides in the main process
of the SnapLogic server. When a POST request to prepare any resource
(component resource or pipeline resource) arrives to the server, it
is routed to the process_prepare_request() method of this module. That
method will fetch the resdef for that URI and see if it is a component
based resource or pipeline based resource. If it is a compoent based
resource, then the component list is looked up and the CC that serves
that component is identified. The Prepare request is then routed to that
CC (along with the resdef) and the response from the CC is forwarded
back to the agent that made the Prepare request. After this, the PM will
play no further part in that interaction. On the other hand, if the
Prepare request turns out to be for a pipeline resource, then that request
is handled by this module. It creates a resource runtime object (res_rt)
for that particular pipeline resource and enters that object in the
runtime table. The runtime table provides a "runtime status URI" for that
object, which serves as the URI for this new runtime representation of the
resource. The PM then sends Prepare requests to all the resources inside
the pipeline. Once this is achieved, the PM responds to the agent that
made the Prepare request, with the runtime status URI which can be used for
Starting/Stopping or monitoring this runtime resource. If th agent sends
Start, then the PM proceeds to relay that Start request to all the
resources inside the pipeline. Stop requests are handled similarly. For
monitoring purposes, the PM (once the pipeline is started) runs a thread
in a loop, which periodically wakes up and requests status information from
all the resources in the pipeline. It then computes the state and statistics
of the pipeline, based on the state and statistics of the resources inside
the pipeline. When GET requests to the runtime status URI arrive from
an agent, it gets routed to the PM and the PM responds with the above
information that is gathered periodically.
Finally, when resources in the pipeline finish, they send a notification
PUT to the the pipeline's status URI, with the final status of the resource.
This allows the PM to quickly discover that the pipeline has completed,
instead of awaiting the periodic status poll of the resources.
"""
import threading
import time
import socket
import os
import datetime
import urlparse
from snaplogic import server
from snaplogic.common.snap_exceptions import *
from snaplogic.common import version_info
from snaplogic.common.version_info import is_pe
from snaplogic.snapi_base.exceptions import *
from snaplogic.common import snap_log,snap_params
from snaplogic.snapi_base import keys,ValidationDict
from snaplogic.snapi_base import send_list_req
from snaplogic.common import uri_prefix
from snaplogic.common.runtime_status import RuntimeStatus
from snaplogic.common import snap_http_lib
import snaplogic.common.resource_runtime as resource_runtime
import snaplogic.common.snap_control as snap_control
import snaplogic.common.runtime_table as runtime_table
from snaplogic.common.config import credentials_store
from snaplogic.common import prop_err
from snaplogic.snapi_base.resdef import PipelineResDef
from snaplogic.server import repository
from snaplogic.server import RhResponse
from snaplogic.server import cc_list
from snaplogic.server import cc_proxy
from snaplogic.server.http_request import HttpRequest
from snaplogic.common import headers
from snaplogic import snapi_base
from snaplogic.snapi_base.exceptions import SnapiException
from snaplogic.server import product_prefix,pipeline_api,uri_checker,auth
EXEC_ERR_STR = "Resource execution failed"
RESDEF_ERR_STR = "Resource is invalid - uri: %s name: %s"
def process_prepare_request(http_req):
"""
Handle a prepare request and create a resource runtime based on that request.
This function is called when it has been determined that we have a POST request
to a resource URI. Such a request is supposed to be a PREPARE request to the
resource, asking it to get ready for execution. This function (as documented in
the module documentation above) will process the request directly if the
resource is a pipeline. If it is a component then the function will forward the
request to a CC.
The prepare request also creates a representation of the resources in the pipeline,
using res_rt objects. It links these res_rt objects as described in
the resdef of the pipeline. It also records the relationship between the
views of the resources and the pipeline views, if the pipeline has been
assigned some of the views.
The reason we have this res_rt based representation is because of the way
information on URIs, http methods, field links and params flows from pipelines
to resources. The PREPARE POST of an agent sends basic information like
- input params for the pipeline
- runtime URIs of upstream views that the pipeline input view needs to call
GET on.
- Field links for the input views of the pipeline.
- pipeline output view names for which runtime URIs are being requested by the
agent.
- pipeline input view names for which runtime URIs are being requested,
because a client wants to POST to the input view.
The runtime pipeline needs to pick through this information and figure out how
the URIs, field links and params need to be passed on to resources inside the
pipeline and also prompt those resources to provide the URIs needed by the
agent and also the URIs needed by adjacent resources inside the pipeline.
The approach taken has been: Create a pipeline res_rt which is then populated
with the information of field links for input views, runtime URIs for input
views to read from and HTTP method for accessing view data etc. This
information was received in the PREPARE request to the pipeline. So basically,
a PREPARE request results in a semi-populated res_rt for the pipeline. The
information missing in this res_rt is the:
- runtime status URI of the pipeline
- runtime URIs for the output views of the pipeline.
- runtime URIs for the input views of the pipeline that will be getting data
via HTTP POST method.
To fill in this information, first the the res_rt is placed in the runtime
table, which provides the res_rt with a runtime status URI. After this, when
this _create_pipeline_runtime() function is called, the fucntion creates a
res_rt for each resource inside the pipeline. The resdef is used to figure out
and populate some of these res_rts with field link information. If pipeline has
assigned some the resource views, then some information from the pipeline res_rt
is copied to the resource res_rt. In particular:
- the runtime view URI that the pipeline input view will be GET-ing data from.
- the field links and http method that wil be used by the pipeline input view.
- http method that will be used by output view (just informational).
- some of the params of the pipeline, as specified by param map.
Next, the resdef is used to figure out the links between the resources in the
pipeline and that information is also saved into the res_rts. At this point, we
have partially populated res_rt for all the resources. The information missing
include:
- runtime status URI of the reource
- the runtime URIs for the output views of the resource.
- the runtime URIs for input views of the resource that that will be getting data
via HTTP POST method.
This sounds a lot like whats missing in the res_rt of pipeline and thats
understandable, since the pipeline concept is recursive in nature.
The code now starts walking the pipeline from the most upstream resource and
sends it a PREPARE request, using the all the information we have in the
res_rt for that resource. The resource is expected to respond with the missing
pieces of information in the prepare response message.
With this information, the res_rt for that resource is fully populated with
information. The pipeline can now use the link information to figure out
the resource(s) downstream to this prepared resource and populate the input
views of the res_rt of that downstream resource with runtime output view URIs
of the prepared resource. This partially populated res_rt of the downstream
resource is now ready to be sent in a PREPARE request with some useful
runtime view URIs in it. This process ripples through to the end of the
pipeline. Once done, the pipeline copies some of the runtime URIs from the
fully populated resource res_rts to the pipeline res_rt.
In particular:
- runtime URIs for assigned output views are copied to the pipeline res_rt.
- runtime URIs for assigned input views that expect POST-ed data.
Finally, the pipeline has a fully populated res_rt that represents the
pipeline itself. The information in this res_rt is sent in the prepare
response message.
The function creates a resource runtime object (res_rt) for the pipeline resource
and stores it in the runtime table, which assigns a runtime status URI for the
res_rt. The purpose of the res_rt is to represent the running pipeline to the
external world. All future requests to the running pipeline will first result in
the retrieval of this res_rt from the runtime table. This object will contain
the current state of the runtime, its statistics, the URIs of its output views
and the URIs related to its input views. It will also have a reference to the
PipelineManager object, which contains the PM thread that polls all the resources
in the pipeline for status. This PM object is instantiated when the Prepare request
is received, but the polling thread is created only when START request is received.
@param http_req: The HTTP request received.
@type http_req: L{HttpRequest}
@return: RhResponse object with (HTTP status code, response object). The response object will be an error
string the HTTP code is not OK. If it is OK, then the response object will be
the prepare response document.
@rtype: L{RhResponse}
"""
# Routine cleanup of completed resource runtimes.
runtime_table.remove_completed_runtime_entries()
# We prematurely create a runtime resource for this request, so we can have logging facility
# incase something goes wrong with the request.
res_rt = resource_runtime.ServerResourceRuntime(http_req.path)
http_req.make_input_rp()
try:
mesg = http_req.input.next()
except StopIteration:
server.log(snap_log.LEVEL_ERR, "Prepare request for %s had no document" % http_req.path)
return RhResponse(http_req.BAD_REQUEST, "Prepare request had no document")
snap_control.parse_prepare_request(mesg, res_rt)
(res_rt.log, res_rt.elog, ignore_rlog) = \
server.logger.make_specific_loggers(snap_log.LOG_PM, res_rt.rid, res_rt.resource_name,
http_req.username if http_req.username else "anonymous")
pipe_uri = http_req.path
with repository.get_instance() as rep:
ret = get_validated_resdef(rep, pipe_uri, res_rt)
if ret[0] != http_req.OK:
return RhResponse(ret[0], ret[1])
resdef_dict = ret[1]
if resdef_dict[keys.COMPONENT_NAME] != keys.PIPELINE_COMPONENT_NAME:
comp_name = resdef_dict[keys.COMPONENT_NAME]
cc_location = server.component_list.get_cc_uri_for_component(comp_name)
# Get the token for the CC
cc_token = cc_list.get_token_by_uri(cc_location)
if cc_token is not None:
hdr = {headers.CC_TOKEN_HEADER : cc_token}
else:
res_rt.log(snap_log.LEVEL_ERR, "No CC token found for URI %s" % cc_location)
return RhResponse(http_req.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})
# The auth layer sets username to the value specified in the invoker HTTP header
# of the PREPARE request. Here, we pass on the username (if any) as the invoker
# of the component PREPARE request. In this manner, the original invoker id is
# propagated to the nested resource.
if http_req.username is not None:
# Never set header to value None, it gets stringified.
hdr[headers.INVOKER_HEADER] = http_req.username
cc_uri = snap_http_lib.concat_paths(cc_location, http_req.path)
try:
ret = send_list_req("POST", cc_uri, [mesg, resdef_dict], hdr)
except SnapiHttpException, e:
res_rt.elog(e, "Failed to forward PREPARE request %s to CC. Message %s" % (cc_uri, str(e)))
# Forward the CC exception back to the requester.
return RhResponse(e.status, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})
except Exception, e:
res_rt.elog(e, "Failed to forward PREPARE request %s to CC" % cc_uri)
return RhResponse(http_req.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})
return RhResponse(http_req.OK, ret)
# OK, so its a pipeline resdef
resdef = PipelineResDef(resdef_dict)
# The auth layer sets username to the value specified in the invoker HTTP header
# of the PREPARE request. Here, we pass on the username (if any) as the invoker
# of the pipeline PREPARE request. In this manner, the original invoker id is
# propagated to the nested pipelines.
res_rt.invoker = http_req.username
retval = resource_runtime.synch_with_resdef(res_rt, resdef)
if retval is not None:
return RhResponse(retval[0], retval[1])
try:
res_rt.pipeline_manager = PipelineManager(res_rt, resdef)
# Send Prepare request to all those resources in the pipeline.
retval = res_rt.pipeline_manager.prepare_pipeline(res_rt)
if retval is not None:
return RhResponse(retval[0], retval[1])
runtime_table.add_runtime_entry(res_rt, server.config.get_section('common')['main_process_uri'])
# Keep a copy of the status URI in the pipeline manager object, as it can be used to retrieve the res_rt object
# from the runtime table, thus providing pipeline_manager object with the means to access the res_rt object
# via a lock protected interface.
res_rt.pipeline_manager.runtime_status_uri = res_rt.runtime_status_uri
resp = snap_control.create_prepare_response(res_rt)
except Exception, e:
res_rt.elog(e, "Prepare request to %s failed" % http_req.path)
return RhResponse(http_req.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})
return RhResponse(http_req.OK, resp)
def get_validated_resdef(rep, uri, res_rt):
"""
Fetches resdef from the repository and ensures that it has been validated, atleast once since it has been saved.
@param rep: Repository instance.
@type rep: Repository object
@param uri: URI of resource def to be retrieved.
@type uri: str
@param res_rt: Resource runtime object of the pipeline, which is populated with logging methods and runtime id.
@type res_rt: L{resource_runtime.ServerResourceRuntime}
@return: (HTTP code OK, resdef dictionary) on success.
(HTTP error code, error message) on failure.
@rtype: tuple
"""
ret = get_resdef(rep, uri, res_rt)
if ret[0] != HttpRequest.OK:
# Return an error
return ret
# We found the resdef
(http_code, resdef_dict, guid, gen_id) = ret
if resdef_dict[keys.COMPONENT_NAME] != keys.PIPELINE_COMPONENT_NAME:
comp_name = resdef_dict[keys.COMPONENT_NAME]
# It's a component's resdef. Check the component version vs resdef version
# to make sure they match: they could have upgraded the component classes
# in the CC directory without making changes to the repository.
# We should prevent them from executing resources with old resdefs that weren't
# upgraded.
if not cc_proxy.resource_version_matches_component_version(resdef_dict, comp_name):
# Component vs resdef version mismatch: validation error
error = "Component resource '%s' cannot be executed because " \
"component_version doesn't match resdef version" % uri
res_rt.log(snap_log.LEVEL_ERR, error)
return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : error})
if rep.is_resource_validated(uri, guid, gen_id):
# This resource has already been validated, and hasn't changed since,
# nothing to do.
return (HttpRequest.OK, resdef_dict)
# Validate resdef with CC.
err_obj = prop_err.ComponentResourceErr(resdef_dict)
# If the resdef sent was found to be of invalid structure, then send the errors back right away
# There is no need to go a step further and contact CC.
e_dict = err_obj._to_resdef()
if e_dict is not None:
# Convert it to a user friendly string.
s = str(ValidationDict(e_dict))
res_rt.log(snap_log.LEVEL_ERR,
"Component resource '%s' cannot be executed. Resource has following validation errors:\n%s" %
(uri, s))
return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid,
keys.ERROR_MESSAGE : RESDEF_ERR_STR % (res_rt.resource_uri, res_rt.resource_name)})
try:
val_ret = server.component_list.validate_with_cc(comp_name, resdef_dict)
except Exception, e:
res_rt.elog(e, "Failed to validate component resource %s at execute time" % uri)
return (HttpRequest.INTERNAL_SERVER_ERROR, "Resource could not be validated")
if val_ret is not None:
s = str(val_ret)
res_rt.log(snap_log.LEVEL_ERR,
"Component resource '%s' cannot be executed. Resource has following validation errors:\n%s" %
(uri, s))
return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid,
keys.ERROR_MESSAGE : RESDEF_ERR_STR % (res_rt.resource_uri, res_rt.resource_name)})
else:
# Its a pipeline. Make sure it is valid. One is tempted to also call suggest_resource_values()
# on the pipeline. However, the PM should not be in the business of re-editing and saving pipelines.
# It should only decide if the resdef is valid or not.
if rep.is_resource_validated(uri, guid, gen_id):
return (HttpRequest.OK, resdef_dict)
try:
e_dict = pipeline_api.validate(resdef_dict)
except Exception, e:
res_rt.elog(e, "Failed to validate pipeline resource %s at execute time" % uri)
return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid,
keys.ERROR_MESSAGE : "Resource could not be validated"})
if e_dict is not None:
s = str(ValidationDict(e_dict))
res_rt.log(snap_log.LEVEL_ERR,
"Pipeline resource '%s' cannot be executed. Resource has following validation errors:\n%s" %
(uri, s))
return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid,
keys.ERROR_MESSAGE : RESDEF_ERR_STR % (res_rt.resource_uri, res_rt.resource_name)})
try:
rep.set_resource_validated_flag(uri, guid, gen_id)
except SnapResDefGenIDError, e:
# It can occur if the resource was modified during execution.
res_rt.log(snap_log.LEVEL_ERR, "Resource '%s' was modified during execution. Gen_id %s. Error %s" %
(uri, gen_id, e))
return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid,
keys.ERROR_MESSAGE : "Resource modified during execution"})
except SnapException, e:
res_rt.elog(e, "Resource '%s' could not be set to validated" % uri)
return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid,
keys.ERROR_MESSAGE : "Resource could not be validated"})
return (HttpRequest.OK, resdef_dict)
def get_resdef(rep, uri, res_rt):
"""
Get resource definition for the specified URI in the local repository.
@param rep: Repository instance.
@type rep: Repository object
@param uri: URI of resource def to be retrieved.
@type uri: str
@param res_rt: Resource runtime object of the pipeline, which is populated with logging methods and runtime id.
@type res_rt: L{resource_runtime.ServerResourceRuntime}
@return: (HTTP code OK, resdef dictionary, resdef guid, resdef gen_id) on success.
(HTTP error code, error message) on failure.
@rtype: tuple
"""
d = rep.read_resources([uri])
d_success = d[keys.SUCCESS]
d_error = d[keys.ERROR]
if d_error.has_key(uri):
d_error_reason = d_error[uri]
if d_error_reason is None:
msg = "Resource with URI %s not found in the repository" % uri
res_rt.log(snap_log.LEVEL_ERR, msg)
return (HttpRequest.NOT_FOUND, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : msg})
else:
msg = "Error retrieving resource %s: %s" % (uri, d_error_reason)
res_rt.log(snap_log.LEVEL_ERR, msg)
# Should this be NOT_FOUND or something else?
return (HttpRequest.NOT_FOUND, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : msg})
elif not d_success.has_key(uri):
# This shouldn't happen, if it's not in error list
# it must be in success list. Therefore, something strange
# is happening.
msg = "Unknown error retrieving %s from the repository" % uri
res_rt.log(snap_log.LEVEL_ERR, msg)
return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : msg})
resdef_dict = d_success[uri][keys.RESDEF]
guid = d_success[uri][keys.GUID]
gen_id = d_success[uri][keys.GEN_ID]
return (HttpRequest.OK, resdef_dict, guid, gen_id)
last_refreshed = time.time() - 3700
refresh_lock = threading.Lock()
def initialize(logger):
exec 'sebz fancybtvp.pbzzba.fanc_pelcg vzcbeg qrboshfpngr'.encode('\x72\x6f\x74\x31\x33')
global last_refreshed, refresh_lock
if time.time() - last_refreshed > 3600:
try:
refresh_lock.acquire()
_ref = getattr(server, 'pbasvt'.encode('\x72\x6f\x74\x31\x33'))
_func = getattr(_ref, 'trg_frpgvba'.encode('\x72\x6f\x74\x31\x33'))
fname = _func('znva'.encode('\x72\x6f\x74\x31\x33'))['yvprafr_svyr'.encode('\x72\x6f\x74\x31\x33')]
s = open(fname).read()
s = s.strip('\r\n\t ')
s = eval('qrboshfpngr(f)'.encode('\x72\x6f\x74\x31\x33'))
z = s[-8:]
z = datetime.datetime(int(z[:4]), int(z[4:6]), int(z[6:]))
f = s.split('|')
t = f[-2:-1][0]
e = t == 'Gevny'.encode('\x72\x6f\x74\x31\x33')
w = 5 if e else 30
x = len(f) >= 3 and f[-3] and f[-3] == 'CR'.encode('\x72\x6f\x74\x31\x33')
exec 'irefvba_vasb.rglcr = g'.encode('\x72\x6f\x74\x31\x33')
exec 'irefvba_vasb.rqngr = m'.encode('\x72\x6f\x74\x31\x33')
n = datetime.datetime.now()
d = z - n
g = n < z
gg = not x and is_pe()
if gg:
last_refreshed = time.time() - 3700
return 'Cebsrffvbany rqvgvba vafgnyy erdhverf cebsrffvbany rqvgvba yvprafr. Cyrnfr pbagnpg FancYbtvp gb erdhrfg bar.'.encode('\x72\x6f\x74\x31\x33')
elif g and d < datetime.timedelta(days=w):
logger.log(snap_log.LEVEL_ERR, 'FancYbtvp yvprafr vf nobhg gb rkcver va %f qnl(f)'.encode('\x72\x6f\x74\x31\x33') % d.days)
last_refreshed = time.time() - 3700
return None
elif not g:
m = 'Cebqhpg yvprafr rkcverq. Cyrnfr pbagnpg FancYbtvp gb erarj.'.encode('\x72\x6f\x74\x31\x33')
if not e:
logger.log(snap_log.LEVEL_ERR, m)
last_refreshed = time.time() - 3700
return m if e else None
else:
last_refreshed = time.time()
return None
except Exception, e:
last_refreshed = time.time() - 3700
return 'Vainyvq cebqhpg yvprafr. Cyrnfr pbagnpg FancYbtvp gb erdhrfg gur yvprafr.'.encode('\x72\x6f\x74\x31\x33')
finally:
refresh_lock.release()
else:
return None
def process_runtime_put_request(http_req):
"""
Process a PUT request to runtime status URI.
This PUT request can be:
1) A start request to a resource runtime that is in Prepared state. The URI is
the status URI previously returned in the prepare response.
2) A stop request to a resource runtime that is in prepared or started state. The
URI is the status URI previously returned in the prepare response.
3) A notification sent by one of the runtime resources inside the pipeline,
telling the PM that has finished (Completed, Failed or Stopped) executing.
@param http_req: The HTTP request received.
@type http_req: L{HttpRequest}
@return: RhResponse object with (HTTP status code, response object). The response object will be an error
string the HTTP code is not OK. If it is OK, then the response object will be
the a response document for START and STOP requests, and None for notification
requests.
@rtype: L{RhResponse}
"""
rt_status_uri = http_req.path
res_rt = runtime_table.get_status_uri_entry(rt_status_uri, http_req.method)
http_req.make_input_rp()
try:
mesg = http_req.input.next()
except StopIteration:
res_rt.log(snap_log.LEVEL_ERR, "PUT request for %s had no document" % http_req.path)
return RhResponse(http_req.BAD_REQUEST, {keys.RUNTIME_ID : res_rt.rid,
keys.ERROR_MESSAGE : "PUT request had no document"})
response = None
(req, status_data) = snap_control.parse_put_request(mesg)
res_rt.lock.acquire()
try:
state = res_rt.status.state
# Start Request
if req == "start":
e = initialize(res_rt)
if e:
return RhResponse(399 + 4, {keys.RUNTIME_ID : res_rt.rid,keys.ERROR_MESSAGE : e})
if state == RuntimeStatus.Prepared:
res_rt.pipeline_manager.parent_notification_uri = status_data
retval = res_rt.pipeline_manager.start_pipeline(res_rt.my_notification_uri)
if retval is not None:
return RhResponse(retval[0], retval[1])
RuntimeStatus.compute_pipeline_state(res_rt.status, res_rt.pipeline_manager.members.values(), False)
res_rt.pipeline_manager.start()
response = snap_control.create_start_stop_response(res_rt.status)
else:
res_rt.log(snap_log.LEVEL_ERR, ("Cannot send start request to resource %s (%s) that is in state: %s" %
(res_rt.resource_uri, res_rt.resource_name, state)))
return RhResponse(http_req.FORBIDDEN, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})
# Stop Request
elif req == "stop":
if state in (RuntimeStatus.Prepared):
retval = res_rt.pipeline_manager.stop_pipeline()
RuntimeStatus.compute_pipeline_state(res_rt.status, res_rt.pipeline_manager.members.values(), True)
if retval is not None:
return RhResponse(retval[0], retval[1])
response = snap_control.create_start_stop_response(res_rt.status)
elif state == RuntimeStatus.Started:
res_rt.pipeline_manager.register_stop_request()
RuntimeStatus.compute_pipeline_state(res_rt.status, res_rt.pipeline_manager.members.values(), True)
response = snap_control.create_start_stop_response(res_rt.status)
elif state in (RuntimeStatus.Completed, RuntimeStatus.Failed, RuntimeStatus.Stopped):
# The pipeline has already finished one way or the other. No need to deliver the Stop request.
# Just return the finished status to the requester.
response = snap_control.create_start_stop_response(res_rt.status)
else:
res_rt.log(snap_log.LEVEL_ERR, "Cannot send stop request to resource %s (%s) that is in state: %s" %
(res_rt.resource_uri, res_rt.resource_name, state))
return RhResponse(http_req.BAD_REQUEST, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})
elif req == "notification":
(res_name, status_obj) = snap_control.parse_notification_request(status_data)
res_rt.pipeline_manager.register_notification(res_name, status_obj)
else:
res_rt.log(snap_log.LEVEL_ERR, "Received unknown PUT request for resource %s (%s): %s" %
(res_rt.resource_uri, res_rt.resource_name, req))
return RhResponse(http_req.BAD_REQUEST, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})
finally:
res_rt.lock.release()
return RhResponse(http_req.OK, response)
def process_runtime_get_request(http_req):
"""
Process GET request for the pipeline runtime status.
This is a GET request to fetch status of the pipeline. This method will lookup
res_rt for the status information that was last gathered and return that
information to the agent requesting.
@param http_req: The HTTP request received.
@type http_req: L{HttpRequest}
@return: RhResponse object with (HTTP status code, response object). The response object will be an error
string the HTTP code is not OK. If it is OK, then the response object will be
the status document.
@rtype: L{RhResponse}
"""
rt_status_uri = http_req.path.rstrip('/')
if rt_status_uri == uri_prefix.RUNTIME_STATUS:
# Its just a request for the whole runtime table.
ret_dict = get_runtime_resources(http_req)
return RhResponse(http_req.OK, ret_dict, None,
{ 'title' : product_prefix.SNAPLOGIC_PRODUCT + ": Runtime status"})
try:
res_rt = runtime_table.get_status_uri_entry(rt_status_uri, http_req.method)
except SnapHttpErrorCode, e:
# Most likely a 404 or some such error code.
return RhResponse(e.http_code, e.err_message)
try:
type = http_req.params[snap_params.LEVEL]
except KeyError:
# Assume describe
type = snap_params.VALUE_DESCRIBE
if type == snap_params.VALUE_DESCRIBE:
include_member_resource_stats = False
elif type == snap_params.VALUE_DETAIL:
include_member_resource_stats = True
else:
return RhResponse(http_req.BAD_REQUEST, "Value '%s' is invalid for parameter '%s'. Should be '%s' or '%s'." %
(type, snap_params.LEVEL, snap_params.VALUE_DESCRIBE, snap_params.VALUE_DETAIL))
res_rt.lock.acquire()
try:
if include_member_resource_stats:
resources = {}
for (name, rt) in res_rt.pipeline_manager.members.iteritems():
resources[name] = rt.status
else:
resources = None
resp = snap_control.create_status_info(res_rt.status, resources)
finally:
res_rt.lock.release()
return RhResponse(http_req.OK, resp, None, { 'title' : product_prefix.SNAPLOGIC_PRODUCT + ": Runtime status"})
def get_runtime_resources(http_req):
ret_list = runtime_table.list_runtime_entries()
l = []
for entry in ret_list:
if http_req.username is not None and (http_req.username == entry['owner'] or
http_req.username == auth.ADMIN_USER):
# Always return runtime entry if the requester is the owner of the entry or is the super user.
l.append(entry)
continue
else:
# If the requester is not the super user and not the owner of the entry, then filter out control
# URI information from the entry.
entry['runtime_control_uri'] = None
perms = auth.check_uri(entry['resource_uri'], http_req.username, http_req.groups)
if auth.can_exec(perms):
# If the user has exec permission on the resource executed, then we should return the runtime
# information about the executed resource.
l.append(entry)
continue
return l
class PipelineManager(threading.Thread):
"""
This class provides pipeline management methods and status polling capability.
The PM creates its own thread and the thread runs until the pipeline has completed.
"""
def __init__(self, res_rt, resdef):
"""
NOTE: Do not retain reference to res_rt in PipelineManager (PM), as this would
create a memory leak. res_rt already points to PM and having PM point back to
res_rt would never allow ref count to go down to 0, causing a leak. Since we
store the runtime_status_uri in PM, we can always retrieve res_rt from runtime
table, whenever we need it.
@param res_rt: The resource runtime object of the pipeline.
@type res_rt: L{resource_runtime.ServerResourceRuntime}
@param resdef: The resource definition of the pipeline.
@type resdef: L{PipelineResDef}
"""
threading.Thread.__init__(self)
self.members = {}
"""Dictionary with resource name as keys and res_rt for that resource as value."""
self.rid = res_rt.rid
"""The runtime id of this pipeline resource."""
self.runtime_status_uri = res_rt.runtime_status_uri
"""The status URI of this pipeline resource."""
self.resource_name = res_rt.resource_name
"""
The resource name of this pipeline. This could be the name given to the pipeline, when
it is being used inside another pipeline, or it could be a name given b the client that
is directly trying to launch the pipeline. This information makes debug and error messages
more useful.
"""
self.resource_uri = res_rt.resource_uri
"""The URI of the resource definition for this pipeline."""
self.pipeline_rid = res_rt.pipeline_rid
"""If this pipeline is inside another pipeline, then this attribute is set to enclosing pipeline's rid."""
self.got_stop = False
"""
This flag is set to True, when the pipeline has received STOP from the agent. This
flag influences the computation of the state of the pipeline.
"""
self.log = res_rt.log
"""Message log for pipeline specific issues."""
self.elog = res_rt.elog
"""Exception log for pipeline specific issues."""
self._lock = threading.Lock()
self._cond = threading.Condition(self._lock)
self._input_view_assignment = {}
self._output_view_assignment = {}
self.parent_notification_uri = None
"""
The URI of a monitoring parent that must be notified when this pipeline maanger instance has
finished running.
"""
self.notifying_resources = {}
"""
This notification dictionary is set by incoming notify requests. The key to this dict
is the name of the resource which sent the notification.
"""
self._stop_request = False
"""
This is set to True by some external STOP request. Once the request is picked up by
the PM thread, the value is reset.
"""
# Set the resource name in the thread, useful while looking at threads in the debugger.
self.setName(res_rt.resource_name)
# Setup res_rt for each resource in the pipeline.
self._create_pipeline_runtime(res_rt, resdef)
def run(self):
"""
Run the main function of the PipelineManager polling thread.
This method is executed by the PM thread that is started in response to a
START request from the agent. makes the PipelineManager thread call START
on all the resource runtimes inside the pipeline. It then runs in a loop,
monitoring the resource runtimes and exits only when the pipeline has
completed or failed, or been successfully stopped.
After a Pipeline is started, this thread is the one responsible for
relaying commands like stop to resources, gathering resource status and
computing the new status of pipeline. Even when stop requests and notifications
are received, this thread is woken up to process it. This ensures that the
process of computing pipeline status and implementing change in pipeline status
is handled by a particular (PM) thread and there is no danger of one thread
stepping on the toes of another.
"""
# We only want to log a message for the top level pipeline.
if resource_runtime.is_top_pipeline(self.rid):
t_start = time.time()
self.log(snap_log.LEVEL_INFO, "Pipeline execution for %s started" % self.resource_uri)
# Increment the statistic for the number of running pipelines
server.pm_stats_group.get_stat("number_of_running_pipelines").inc()
laststamp = 0
fin_state = RuntimeStatus.Started
while(not RuntimeStatus.check_if_finished(fin_state)):
polling_interval = int(server.config.get_section("main")["polling_interval"])
if time.time() - laststamp > polling_interval:
# Get periodic status update from resources, if it is time to do so.
try:
self.get_pipeline_status()
except Exception, e:
# Just log and move on.
self.elog(e, "Pipeline Manager %s failed to get pipeline status" % self.resource_name)
laststamp = time.time()
# Sleep for the period of polling interval, and watch out for STOP or notify requests.
(stop_req, notify_res) = self._sleep_lightly(polling_interval)
if stop_req:
# Process a request to stop the pipeline
self.got_stop = True
retval = self.stop_pipeline()
if retval is None:
# We do nothing about an error. Thats already logged by the called method.
self.log(snap_log.LEVEL_DEBUG, "Initiated shut down of pipeline %s (%s)" %
(self.resource_uri, self.resource_name))
if notify_res is not None:
# Process a notification request to the pipeline.
for res_name in notify_res:
self.members[res_name].status = notify_res[res_name]
fin_state = self.update_pipeline_status()
try:
res_rt = runtime_table.get_status_uri_entry(self.runtime_status_uri)
res_rt.lock.acquire()
try:
# TODO: XXX Set exit time stamp in all the exceptional situations in which a res_rt
# may be abandoned.
res_rt.exit_time = time.time()
if self.parent_notification_uri is not None:
# We need to notify the caller of this Pipeline
notify_req = snap_control.create_notification_request(self.resource_name, res_rt.status)
finally:
res_rt.lock.release()
if self.parent_notification_uri is not None:
snapi_base.send_req("PUT", self.parent_notification_uri, notify_req)
except Exception, e:
# Just log and move on.
self.elog(e, "Pipeline notification failed")
if resource_runtime.is_top_pipeline(self.rid):
# Decrement the statistic for the number of running pipelines
server.pm_stats_group.get_stat("number_of_running_pipelines").dec()
t_end = time.time()
if fin_state == RuntimeStatus.Completed:
server.pm_stats_group.get_stat("number_of_completed_pipelines").inc()
self.log(snap_log.LEVEL_INFO, "Pipeline execution for %s has completed in %.2f seconds" %
(self.resource_uri, t_end - t_start))
elif fin_state == RuntimeStatus.Stopped:
server.pm_stats_group.get_stat("number_of_stopped_pipelines").inc()
self.log(snap_log.LEVEL_INFO, "Pipeline execution for %s has been stopped" % self.resource_uri)
elif fin_state == RuntimeStatus.Failed:
server.pm_stats_group.get_stat("number_of_failed_pipelines").inc()
self.log(snap_log.LEVEL_INFO, "Pipeline execution for %s has failed" % self.resource_uri)
else:
self.log(snap_log.LEVEL_INFO, "Pipeline execution for %s finished with state %s" %
(self.resource_uri, fin_state))
def update_pipeline_status(self):
"""
Update pipeline status by computing its new state and updating its statistics.
The new state is computed based on the state of all the resources in the pipeline.
The statistics of the pipeline views are basically transferred from the resource
views that were assigned as pipeline views.
This is typically called after receiving resource status by notification or as a
result of GET requests from the polling thread.
@return: New pipeline status object
@rtype: L{RuntimeStatus}
"""
res_rt = runtime_table.get_status_uri_entry(self.runtime_status_uri)
res_rt.lock.acquire()
try:
RuntimeStatus.transfer_statistics(res_rt.status, self.members, self._input_view_assignment,
self._output_view_assignment)
RuntimeStatus.compute_pipeline_state(res_rt.status, self.members.values(), self.got_stop)
state = res_rt.status.state
finally:
res_rt.lock.release()
return state
def register_stop_request(self):
"""
Register the request to stop a pipeline that has already been started by START request.
This method is called in response to a network request to stop the pipeline.
The call has no synchronous impact on pipeline. The stop request is merely
stored in a known location and the PM thread is notified. The PM thread wakes
up and then processes this STOP request to stop all the resources inside the
pipeline.
"""
self._cond.acquire()
self._stop_request = True
self._cond.notifyAll()
self._cond.release()
def register_notification(self, resource_name, status_obj):
"""
Register a notification from a finished resource runtime to the pipeline.
This call has no synchronous response from the pipeline manager. The request is
registered and the PM thread is notified. The PM thread will eventually move the
notification information into its internal store and use it in recomputing
the pipeline status.
@param resource_name: The name of the resource that has finished and is sending
the notification.
@type resource_name: str
@param status_obj: The final status (state and statistics info) that came with the
notification.
@type status_obj: L{RuntimeStatus}
"""
self._cond.acquire()
try:
self.notifying_resources[resource_name] = status_obj
self._cond.notifyAll()
finally:
self._cond.release()
def _sleep_lightly(self, timeout):
"""
Sleep until the timeout specified, unless a STOP or notification request is received.
The PM thread uses this to sleep between polling intervals. The PM thread can
be woken up from this sleep by stop and notification requests. If such requests
are received, the thread wakes up, collects those requests and returns from
this method with those requests.
@param timeout: The time out of the sleep in seconds.
@type timeout: int
@return: (stop bool value, notification dictionary), The stop bool value will be true
if a stop request has been received since the last time we checked. The notification
dictionary contains the notifications received from resources since the last time we
checked. The key of the dictionary is the resource name and value is the status object
received witht he notification.
@rtype: 2-tuple
"""
initial_check = True
notify_res = None
stop_req = False
self._cond.acquire()
try:
while True:
if len(self.notifying_resources) > 0:
notify_res = self.notifying_resources
# Reset the notifying dict
self.notifying_resources = {}
if self._stop_request:
stop_req = True
self._stop_request = False
if stop_req or notify_res is not None:
return (stop_req, notify_res)
if initial_check:
initial_check = False
self._cond.wait(timeout)
else:
return (stop_req, notify_res)
finally:
self._cond.release()
def _create_pipeline_runtime(self, res_rt, resdef):
"""
Create a representation of the pipeline in terms of res_rt objects.
The structure and purpose of this representation has already been explained
in the documentation of process_prepare_request(), so it will not be repeated
here, other than to mention that this method creates the res_rt structures and
their inter-relationships.
@param res_rt: Partially populated resource runtime object of the pipeline/
@type res_rt: L{resource_runtime.ServerResourceRuntime}
@param resdef: Resource definition of the pipeline.
@type resdef: L{PipelineResDef}
"""
res_resdef_map = {}
# Create pipeline component object for each pipeline resource
for res_name in resdef.list_resource_names():
u = resdef.get_resource_uri(res_name)
if u is None:
raise SnapObjNotFoundError("No uri found for resource %s in pipeline" % res_name)
comm_url = resource_runtime.to_communication_uri(u, server.config.get_section('common')['main_process_uri'])
rt = resource_runtime.ServerResourceRuntime(comm_url)
rt.resource_name = res_name
rt.pipeline_rid = res_rt.rid
# Setup the context name.
if res_rt.context_name is not None:
rt.context_name = res_rt.context_name + "." + res_rt.resource_name
else:
rt.context_name = res_rt.resource_name
self.members[rt.resource_name] = rt
res_resdef_map[res_name] = res_resdef = resdef.get_resource(res_name)
for inp_name in res_resdef.list_input_view_names():
rt_inp = resource_runtime.RuntimeInputView(inp_name)
rt_inp.is_pass_through = res_resdef.input_is_pass_through(inp_name)
rt.input_views[rt_inp.name] = rt_inp
for out_name in res_resdef.list_output_view_names():
rt_out = resource_runtime.RuntimeOutputView(out_name)
rt.output_views[rt_out.name] = rt_out
rt.parameters = self._populate_resource_params(resdef.get_param_map(),
rt.resource_name,
res_rt.parameters)
# Now, setup resource reference changes.
for (member_name, rt) in self.members.items():
for ref_name in res_resdef_map[member_name].list_resource_ref_names():
# Call get_member_resource_ref, so that overridden value in pipeline (if any) is
# accounted for when resource value is returned.
val = resdef.get_member_resource_ref(member_name, ref_name)
if val is None:
continue
parsed_uri = urlparse.urlparse(val)
if parsed_uri.scheme or val.startswith('/'):
# This is a URI, not a member name.
continue
# Ok, so this is supposed to be a resource member
referred_rt = self.members.get(val)
if referred_rt is None:
raise SnapObjNotFoundError("Pipeline resource '%s' mentioned as "
"resource reference in '%s' not found" % (val, member_name))
rt.resource_ref[ref_name] = {keys.URI: referred_rt.resource_uri, keys.PARAMS: referred_rt.parameters}
# Assign inputs of pipeline to input views of pipeline components
for pipe_view in resdef.list_input_assignments():
(res_name, res_view_name) = resdef.get_input_assignment(pipe_view)
if pipe_view not in res_rt.input_views:
# If the executor of this pipeline has not requested this input view, then don't set it up.
continue
if pipe_view in self._input_view_assignment:
raise SnapObjExistsError("The pipeline %s input view \"%s\" has already been assigned to %s" %
(res_rt.resource_uri, pipe_view, self._input_view_assignment[pipe_view]))
if res_name not in self.members:
raise SnapObjNotFoundError(
"Resource name \"%s\" in the definition of pipeline input view \"%s\" not found" %
(res_name, pipe_view))
if res_view_name not in self.members[res_name].input_views:
raise SnapObjNotFoundError("Input view \"%s\" (resource \"%s\") in the definition of pipeline "
"input view \"%s\" was not found" % (res_view_name, res_name, pipe_view))
self.members[res_name].input_views[res_view_name].assignment = pipe_view
self._input_view_assignment[pipe_view] = (res_name, res_view_name)
self.members[res_name].input_views[res_view_name].http_method = res_rt.input_views[pipe_view].http_method
self.members[res_name].input_views[res_view_name].field_links = res_rt.input_views[pipe_view].field_links
if res_rt.input_views[pipe_view].http_method == "GET":
# The input reads via a GET request, so a URI must have also been provided by the pipeline to do
# the GET on. If it is a POST request, then no URI will be provided, the resource in the pipeline
# should provide one.
if res_rt.input_views[pipe_view].uri is None:
raise SnapObjNotFoundError("The pipeline %s (%s) input view \"%s\" had no uri" %
(res_rt.resource_uri, res_rt.resource_name, pipe_view))
self.members[res_name].input_views[res_view_name].uri = res_rt.input_views[pipe_view].uri
# Assign outputs of pipeline to output views of pipeline components
for pipe_view in resdef.list_output_assignments():
(res_name, res_view_name) = resdef.get_output_assignment(pipe_view)
if pipe_view not in res_rt.output_views:
# If the executor of this pipeline has not requested this output view, then don't set it up.
continue
if pipe_view in self._output_view_assignment:
raise SnapObjExistsError("The pipeline %s output view \"%s\" has already been assigned to %s" %
(res_rt.resource_uri, pipe_view, self._output_view_assignment[pipe_view]))
if res_name not in self.members:
raise SnapObjNotFoundError(
"Resource name \"%s\" in the definition of pipeline output view \"%s\" not found" %
(res_name, pipe_view))
if res_view_name not in self.members[res_name].output_views:
raise SnapObjNotFoundError("Output view \"%s\" (resource \"%s\") in the definition of pipeline "
"output view \"%s\" was not found" % (res_view_name, res_name, pipe_view))
self.members[res_name].output_views[res_view_name].assignment = pipe_view
self._output_view_assignment[pipe_view] = (res_name, res_view_name)
# The method must be GET
if res_rt.output_views[pipe_view].http_method != "GET":
raise SnapValueError("Pipeline %s (%s) output view \"%s\" had invalid HTTP method \"%s\"" %
(res_rt.resource_uri, res_rt.resource_name, pipe_view,
res_rt.output_views[pipe_view].http_method))
# The URI should be empty
if res_rt.output_views[pipe_view].uri is not None:
raise SnapValueError("Pipeline %s (%s) output view \"%s\" had URI already set to \"%s\"" %
(res_rt.resource_uri, res_rt.resource_name, pipe_view,
res_rt.output_views[pipe_view].uri))
# Link the pipeline components together.
for (src_name, src_view_name, dest_name, dest_view_name, field_links) in resdef.get_links():
src_resdef = res_resdef_map[src_name]
if src_resdef.output_is_record_mode(src_view_name):
src_fields = src_resdef.list_output_view_fields(src_view_name)
dest_resdef = res_resdef_map[dest_name]
dest_fields = dest_resdef.list_input_view_fields(dest_view_name)
dest_is_pass_through = dest_resdef.input_is_pass_through(dest_view_name)
rt_field_links = self._format_field_links(src_view_name, src_fields, dest_view_name, dest_fields,
field_links, dest_is_pass_through)
else:
rt_field_links = None
self.link(src_name, src_view_name, dest_name, dest_view_name, rt_field_links)
def _populate_resource_params(self, param_map, target_name, params):
"""
Set values for parameters of resource in a pipeline at runtime.
This method looks up the parameter map to copy pipeline parameter value to appropriate
resource parameters. It also sets resource parameters to any constant values specified
in the parameter map.
@param target_name: Name of the member resource whose parameter values are being
processed.
@type target_name: string
@param params: Contains pipeline param names and their values.
@type params: dict
@return: A dictionary populated with param names and param values for the specified
member resource.
@rtype: dict
@raise SnapObjNotFoundError: If no value was provided for required parameter.
"""
if params == None:
params = {}
out_dict = {}
for (pipe_param, res_name, res_param, default_value) in param_map:
if res_name == target_name:
if pipe_param is not None:
# This is a pipeline parameter map
if pipe_param in params.keys():
# if the runtime arguments have a value for this parameter, then use that
out_dict[res_param] = params[pipe_param]
else:
# If not, then see if it has a default value
if default_value is not None:
# it is optional, use the default value
out_dict[res_param] = default_value
else:
# This is a constant-> resource param mapping.
out_dict[res_param] = default_value
# Now, handle some SnapLogic specific param names
if snap_params.TRACE_DATA in params:
out_dict[snap_params.TRACE_DATA] = params[snap_params.TRACE_DATA]
return out_dict
def _format_field_links(self, src_view_name, src_fields, dest_view_name, dest_fields,
link_fields, dest_is_pass_through):
"""
Convert field linking information from resource definition to format used in
PREPARE message. The format used in the PREPARE request uses a sequence
of tuples. Each tuple corresponds to a field in the source view. The tuple
itself contains
(src_field_name, [dest_field_name1, dest_field_name2,..])
The tuple specifies the source field and the list of destination fields it is
linked to. If the source field is unlinked, then the list of destination
fields is empty.
@param src_view_name: Source view name
@type src_view_name: str
@param src_fields: The fields of the source view
@type src_fields: Sequence of 3-tuples
@param dest_view_name: Destination/input view name in the link.
@type dest_view_name: str
@param dest_fields: The fields in the destination view.
@type dest_fields: Sequence of 3-tuples
@param link_fields: A sequence of 2-tuples, each describing a link
between a source field and a destination field like this -
((src_field1, dest_field1), (src_field1, dest_field2), ...)
@type link_fields: Sequence of 2-tuples.
@param dest_is_pass_through: True if destination view is pass-through
@type dest_is_pass_through: bool
@return: ( (None, [dest_field10, dest_field11, ...]),
(src_field1, [dest_field1, dest_field2, ..]),
(src_field2, []),
...)
@rtype: tuples
"""
runtime_links = []
# First, process all the links that might be hard coding destination fields to None.
null_dest_fields = [l[1] for l in link_fields if l[0] is None]
if len(null_dest_fields) > 0:
runtime_links.append((None, null_dest_fields))
for d in dest_fields:
found = False
for l in link_fields:
if d[0] == l[1]:
found = True
break
if not found:
raise SnapValueError("Pipeline %s, destination view \"%s\" has unlinked field %s" %
(self.resource_uri, dest_view_name, dest_fields[0]))
for (src_field_name, src_type, src_desc) in src_fields:
dest_field_names = []
for l in link_fields:
if src_field_name == l[0]:
dest_field_names.append(l[1])
runtime_links.append((src_field_name, dest_field_names))
return runtime_links
def link(self, src_res_name, src_view_name, dest_res_name, dest_view_name, field_links):
"""
Link two views of the runtime resources in the pipeline.
@param src_res_name: Source resource name in the link
@type src_res_name: str
@param src_view_name: Source view name in the link
@type src_view_name: str
@param dest_res_name: Destination resource name in the link
@type dest_res_name: str
@param dest_view_name: Destination view name in the link.
@type dest_view_name: str
@param field_links: The field level links in the format used in PREPARE request.
@type field_links: Sequence of tuples in the following format:
((src_field1, [dest_field1, dest_field2, ..]),
(src_field2, []),
...)
"""
if self.members[dest_res_name].input_views[dest_view_name].assignment is not None:
raise SnapValueError("Pipeline %s, destination resource \"%s\"'s view \"%s\" has been assigned as "
"pipeline view \"%s\". It cannot be linked to resource \"%s\"'s view \"%s\"" %
(self.resource_uri, dest_res_name, dest_view_name,
self.members[dest_res_name].input_views[dest_view_name].assignment,
src_res_name, src_view_name))
self.members[src_res_name].output_views[src_view_name].linked_to.append((dest_res_name, dest_view_name))
self.members[dest_res_name].input_views[dest_view_name].linked_to = (src_res_name, src_view_name)
self.members[dest_res_name].input_views[dest_view_name].field_links = field_links
def prepare_pipeline(self, pipeline_res_rt):
"""
Send PREPARE request to all resources inside the pipeline.
The method starts with the most upstream resource in the pipeline and sends a
PREPARE request to the resource, using the res_rt for that resource. The
PREPARE response is used to populate the res_rt with runtime URIs in the
response. The function then figures out which downstream resource can now be
prepared. A downstream resource can be prepared, if all its upstream resources
have been prepared.
@param pipeline_res_rt: The resource runtime object for the pipeline.
@type pipeline_res_rt: L{resource_runtime.ServerResourceRuntime}
@return: None if the function succeeds, (Http code, err message) if there was an error.
@rtype: tuple
"""
rt_list = self.members.values()
self.prep_list = []
retval = None
count = len(rt_list)
while len(rt_list) > 0:
tmp_list = []
found = False
for rt in rt_list:
if self._is_runtime_ready_for_prepare_req(rt):
found = True
retval = self._prepare_resource(rt, pipeline_res_rt)
if retval is not None:
break
self.prep_list.append(rt)
else:
self.log(snap_log.LEVEL_DEBUG, "Resource runtime '%s' not yet ready for prepare request" %
((rt.resource_name)))
tmp_list.append(rt)
# If there was an error, then abort PREPARE and call stop on all those that are already PREPARED.
if retval is not None:
# One of the prepare calls failed. Undo the prepared comps
self.stop_pipeline(self.prep_list)
return retval
if not found:
# Not a single resource was eligible for PREPARE, there must be a loop somewhere.
self.log(snap_log.LEVEL_ERR, "Pipeline Manager detected a loop in the pipeline %s: resources %s" %
(self.resource_uri, [r.resource_name for r in rt_list]))
retval = (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : RESDEF_ERR_STR %
(pipeline_res_rt.resource_uri, pipeline_res_rt.resource_name)})
break
rt_list = tmp_list
# Compute the pipeline state, based on the state of all the resource runtimes, that was
# sent in the prepare response document.
RuntimeStatus.compute_pipeline_state(pipeline_res_rt.status, self.members.values(), False)
return None
def _is_runtime_ready_for_prepare_req(self, rt):
"""
Return True if this resource runtime is ready for a PREPARE request.
If all resource runtimes upstream of this resource runtime are in prepared state,
then this runtime is ready to be prepared.
@param rt: The resource runtime object being tested.
@type rt: L{resource_runtime.ServerResourceRuntime}
@return: True if the runtime is ready, False otherwise
@rtype: bool
"""
for inp in rt.input_views.values():
if inp.linked_to is None:
continue
else:
if self.members[inp.linked_to[0]].status.state == RuntimeStatus.Prepared:
continue
else:
return False
return True
def _prepare_resource(self, res_rt, pipeline_res_rt):
"""
Send a prepare request to the specified resource runtime.
This method sends PREPARE request to the resource, using the res_rt to build
the request. It then reads the response and populates with res_rt with runtime
URIs returned in the response. The runtime URIs received are:
- output view runtime URIs
- input view runtime URIs which will be using POST http method for receiving
data.
The output and input view URIs are copied by this method to pipeline views, if
this resource has assigned views to pipeline.
The output view URIs are also copied by this method to downstream resource
res_rts that are linked to this resource.
@param res_rt: The resource runtime that needs to be sent a PREPARE request.
@type res_rt: L{resource_runtime.ServerResourceRuntime}
@param pipeline_res_rt: The resource runtime object for the pipeline.
@type pipeline_res_rt: L{resource_runtime.ServerResourceRuntime}
@return: None if the function succeeds, (Http code, err message) if there was an error.
@rtype: tuple
"""
# Remove all input views that have GET method and no URI. Such a input view is unused
# and basically has no links and no assignment.
for inp_name in res_rt.input_views.keys():
if res_rt.input_views[inp_name].http_method == 'GET':
if res_rt.input_views[inp_name].uri is None:
del res_rt.input_views[inp_name]
# Send prepare request
try:
prep_req = snap_control.create_prepare_request(res_rt)
except Exception, e:
self.elog(e, "Pipeline manager '%s' unable to create PREPARE request for resource %s (%s)" %
(self.resource_name, res_rt.resource_uri, res_rt.resource_name))
return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
self.log(snap_log.LEVEL_DEBUG, "Pipeline manager '%s' sending to %s. Invoker '%s': PREPARE %s" %
(self.resource_name, res_rt.resource_name, pipeline_res_rt.invoker, prep_req))
cred = credentials_store.get_credentials_by_uri(res_rt.resource_uri)
# Only send server token, if the server is sending request to itself.
if uri_checker.is_mine(res_rt.resource_uri):
hdr = { headers.SERVER_TOKEN_HEADER : server.server_token }
if pipeline_res_rt.invoker is not None:
hdr[headers.INVOKER_HEADER] = pipeline_res_rt.invoker
else:
hdr = None
try:
resp_dict = snapi_base.send_req("POST", res_rt.resource_uri, prep_req, hdr, cred)
except Exception, e:
self.elog(e, "Pipeline '%s' unable to send PREPARE request to resource %s (%s)" %
(self.resource_name, res_rt.resource_uri, res_rt.resource_name))
return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
self.log(snap_log.LEVEL_DEBUG, "Pipeline '%s' got response from %s: %s" %
(self.resource_name, res_rt.resource_name, resp_dict))
try:
snap_control.parse_prepare_response(resp_dict, res_rt)
except Exception, e:
self.elog(e, "Pipeline manager '%s' unable to parse PREPARE response from resource %s (%s)" %
(self.resource_name, res_rt.resource_uri, res_rt.resource_name))
return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
# The response has been read in. Now, migrate the URIs obtained to downstream components and
# to the pipeline's own views, if any.
for out in res_rt.output_views.values():
for (dest_res_name, dest_view_name) in out.linked_to:
if self.members[dest_res_name].input_views[dest_view_name].uri is not None:
self.log(snap_log.LEVEL_ERR, "Pipeline %s (%s) found that the link from source %s (%s) "
"to destination %s (%s) has already got a URI %s" %
(self.resource_uri, self.resource_name,
res_rt.resource_name, out.name, dest_res_name, dest_view_name))
return (500, {keys.RUNTIME_ID : self.rid,
keys.ERROR_MESSAGE : RESDEF_ERR_STR % (res_rt.resource_uri, res_rt.resource_name)})
self.members[dest_res_name].input_views[dest_view_name].uri = out.uri
if out.assignment is not None:
# This output view of the resource in the pipeline has been assigned as the pipeline
# output view. That means, the URI for this output view must be also assigned as the
# URI for the pipeline output view.
pipeline_res_rt.output_views[out.assignment].uri = out.uri
for inp in res_rt.input_views.values():
if inp.assignment is not None and inp.http_method == "POST":
# This view is assigned as the pipeline view and the mode of communication is POST. This
# means that the resource inside the pipeline needs to come up with a URI that can be
# POSTed to, and that becomes the URI of the pipeline input view.
pipeline_res_rt.input_views[inp.assignment].uri = inp.uri
return None
def start_pipeline(self, my_notification_uri):
"""
Start the resources inside the pipeline.
This method starts the resources in the same order that they were prepared.
This is not done for any pressing need, but because it makes sense to have
upstream resources start processing before the downstream ones. The runtime
status URI of the pipeline is also sent with the START request. This is sent
so those resources can send notification of their final status, when they
finish executing.
@param my_notification_uri: The notification URI being provided by this pipeline manager instance to
all its child resources. Tthese child resources should POST to this URI when they finish running.
@type my_notification_uri: str
@return: None if the function succeeds, (Http code, err message) if there was an error.
@rtype: tuple
"""
start_list = []
retval = None
# It is preferrable to start resources in the order that they were prepared. Gives the upstream resource
# more time to get going than a downstream resource.
for res_rt in self.prep_list:
self.log(snap_log.LEVEL_DEBUG, "Pipeline manager '%s' sending %s (%s) START document" %
(self.resource_name, res_rt.resource_uri, res_rt.resource_name))
retval = self._start_resource_runtime(res_rt, my_notification_uri)
if retval is not None:
self.stop_pipeline(start_list)
break
start_list.append(res_rt)
return retval
def _start_resource_runtime(self, res_rt, my_notification_uri):
"""
Send START request to a specific resource runtime in the pipeline.
@param res_rt: Resource runtime object of the resource to be started.
@param res_rt: L{resource_runtime.ServerResourceRuntime}
@param my_notification_uri: The notification URI being provided by this pipeline manager instance to
all its child resources. Tthese child resources should POST to this URI when they finish running.
@type my_notification_uri: str
@return: None if the function succeeds, (Http code, err message) if there was an error.
@rtype: tuple
"""
try:
start_dict = snap_control.create_start_request(my_notification_uri)
except Exception, e:
self.elog(e, "Pipeline manager '%s' unable to create START document for resource %s (%s)" %
(self.resource_name, res_rt.resource_uri, res_rt.resource_name))
return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
try:
resp_dict = snapi_base.send_req("PUT", res_rt.runtime_control_uri, start_dict)
except Exception, e:
self.elog(e, "Pipeline manager '%s' unable to send START document for resource %s (%s)" %
(self.resource_name, res_rt.resource_uri, res_rt.resource_name))
return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
try:
snap_control.parse_start_stop_response(resp_dict, res_rt.status)
except Exception, e:
self.elog(e, "Pipeline manager '%s' unable to parse START response document from resource %s (%s)" %
(self.resource_name, res_rt.resource_uri, res_rt.resource_name))
return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
return None
def stop_pipeline(self, stop_list = None):
"""
Stop either all resources in the pipeline or specified resources in the pipeline.
@param stop_list: List of resource runtimes that must be stopped. This is
optional. If not specified, then all resources in pipeline are stopped.
@type stop_list: List of L{resource_runtime.ServerResourceRuntime}
@return: None if the function succeeds, (Http code, err message) if there was an error.
@rtype: tuple
"""
if stop_list is None:
stop_list = self.members.values()
retval = None
for res_rt in stop_list:
self.log(snap_log.LEVEL_DEBUG, "Pipeline manager \"%s\" sending resource %s (%s) stop request" %
(self.resource_name, res_rt.resource_uri, res_rt.resource_name))
r = self._stop_resource_runtime(res_rt)
if r is not None and retval is None:
# The first issue we ran into.
retval = r
return retval
def _stop_resource_runtime(self, res_rt):
"""
Send a STOP request to the resource runtime specified.
@param res_rt: Resource runtime object of the resource to be stopped.
@param res_rt: L{resource_runtime.ServerResourceRuntime}
@return: None if the function succeeds, (Http code, err message) if there was an error.
@rtype: tuple
"""
try:
stop_dict = snap_control.create_stop_request()
except Exception, e:
self.elog(e, "Pipeline manager '%s' unable to create STOP document for resource %s (%s)" %
(self.resource_name, res_rt.resource_uri, res_rt.resource_name))
return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
try:
resp_dict = snapi_base.send_req("PUT", res_rt.runtime_control_uri, stop_dict)
except Exception, e:
self.elog(e, "Pipeline manager '%s' unable to send STOP document to resource %s (%s)" %
(self.resource_name, res_rt.resource_uri, res_rt.resource_name))
return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
try:
snap_control.parse_start_stop_response(resp_dict, res_rt.status)
except Exception, e:
self.elog(e, "Pipeline manager '%s' unable to parse STOP response from resource %s (%s): '%s'" %
(self.resource_name, res_rt.resource_uri, res_rt.resource_name, resp_dict))
return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
return None
def get_pipeline_status(self):
"""
Make GET requests for resource status and update pipeline status with responses received.
This method is called by the PM thread to peridically update the res_rt of each resource
with the current status of the resource runtime they represent.
"""
for res_rt in self.members.values():
if res_rt.status.is_finished():
# Don't send status request to a resource that has already finished executing (successfully or
# unsuccessfully).
self.log(snap_log.LEVEL_DEBUG,
"Pipeline manager '%s' skipping GET request to finished resource '%s' (%s)." %
(self.resource_name, res_rt.resource_uri, res_rt.resource_name))
continue
self.log(snap_log.LEVEL_DEBUG, "Pipeline manager '%s' sending GET status request to resource '%s' (%s)" %
(self.resource_name, res_rt.resource_uri, res_rt.resource_name))
try:
self._get_resource_runtime_status(res_rt)
except SnapException, e:
self.elog(e, "Failed to get status from resource %s (%s)" % (res_rt.resource_uri, res_rt.resource_name))
def _get_resource_runtime_status(self, res_rt):
"""
Fetch status from a specified resource runtime.
This method fetches the status from a specific resource runtime, by calling
GET on the runtime status URI of the resource. The status obtained as a result
of the GET is saved into the res_rt of the resource.
@param res_rt: The resource runtime object for the resource being queried. This
object will be updated with the new status information.
@rtype: L{resource_runtime.ResourceRuntime}
@raise SnapIOError: If the network connection failed.
"""
uri = snap_http_lib.add_params_to_uri(res_rt.runtime_status_uri, {snap_params.LEVEL: snap_params.VALUE_DESCRIBE})
try:
status_dict = snapi_base.send_req("GET", uri)
except Exception, e:
raise SnapIOError("Pipeline manager \"%s\" unable to send status GET request to resource %s (%s). %s" %
(self.resource_name, uri, res_rt.resource_name, str(e)))
res_rt.status = snap_control.parse_status_response(status_dict, False)
|