pipeline_manager.py :  » Development » SnapLogic » snaplogic » server » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » server » pipeline_manager.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id: pipeline_manager.py 10295 2009-12-22 23:27:28Z dhiraj $

from __future__ import with_statement

"""
The pipeline manager (PM) module provides the functionality needed to
run a pipeline.

Terms:
agent - This could be the PM of a pipeline which encloses this pipeline
or it could be a client which is directly invoking this pipeline.

Description:
It provides the interfaces to Prepare, Start, Stop and Monitor a
pipeline resource. The module typically resides in the main process
of the SnapLogic server. When a POST request to prepare any resource
(component resource or pipeline resource) arrives to the server, it
is routed to the process_prepare_request() method of this module. That
method will fetch the resdef for that URI and see if it is a component
based resource or pipeline based resource. If it is a compoent based
resource, then the component list is looked up and the CC that serves
that component is identified. The Prepare request is then routed to that
CC (along with the resdef) and the response from the CC is forwarded
back to the agent that made the Prepare request. After this, the PM will
play no further part in that interaction. On the other hand, if the
Prepare request turns out to be for a pipeline resource, then that request
is handled by this module. It creates a resource runtime object (res_rt)
for that particular pipeline resource and enters that object in the
runtime table. The runtime table provides a "runtime status URI" for that
object, which serves as the URI for this new runtime representation of the
resource. The PM then sends Prepare requests to all the resources inside
the pipeline. Once this is achieved, the PM responds to the agent that
made the Prepare request, with the runtime status URI which can be used for
Starting/Stopping or monitoring this runtime resource. If th agent sends
Start, then the PM proceeds to relay that Start request to all the
resources inside the pipeline. Stop requests are handled similarly. For
monitoring purposes, the PM (once the pipeline is started) runs a thread
in a loop, which periodically wakes up and requests status information from
all the resources in the pipeline. It then computes the state and statistics
of the pipeline, based on the state and statistics of the resources inside
the pipeline. When GET requests to the runtime status URI arrive from
an agent, it gets routed to the PM and the PM responds with the above
information that is gathered periodically.

Finally, when resources in the pipeline finish, they send a notification
PUT to the the pipeline's status URI, with the final status of the resource.
This allows the PM to quickly discover that the pipeline has completed,
instead of awaiting the periodic status poll of the resources.

"""
import threading
import time
import socket
import os
import datetime
import urlparse

from snaplogic import server
from snaplogic.common.snap_exceptions import *
from snaplogic.common import version_info
from snaplogic.common.version_info import is_pe
from snaplogic.snapi_base.exceptions import *
from snaplogic.common import snap_log,snap_params
from snaplogic.snapi_base import keys,ValidationDict
from snaplogic.snapi_base import send_list_req
from snaplogic.common import uri_prefix
from snaplogic.common.runtime_status import RuntimeStatus
from snaplogic.common import snap_http_lib
import snaplogic.common.resource_runtime as resource_runtime
import snaplogic.common.snap_control as snap_control
import snaplogic.common.runtime_table as runtime_table
from snaplogic.common.config import credentials_store
from snaplogic.common import prop_err
from snaplogic.snapi_base.resdef import PipelineResDef
from snaplogic.server import repository
from snaplogic.server import RhResponse
from snaplogic.server import cc_list
from snaplogic.server import cc_proxy
from snaplogic.server.http_request import HttpRequest
from snaplogic.common import headers
from snaplogic import snapi_base
from snaplogic.snapi_base.exceptions import SnapiException
from snaplogic.server import product_prefix,pipeline_api,uri_checker,auth

EXEC_ERR_STR = "Resource execution failed"
RESDEF_ERR_STR = "Resource is invalid - uri: %s name: %s"

def process_prepare_request(http_req):
    """
    Handle a prepare request and create a resource runtime based on that request.
    
    This function is called when it has been determined that we have a POST request
    to a resource URI. Such a request is supposed to be a PREPARE request to the
    resource, asking it to get ready for execution. This function (as documented in
    the module documentation above) will process the request directly if the
    resource is a pipeline. If it is a component then the function will forward the
    request to a CC.
    
    The prepare request also creates a representation of the resources in the pipeline,
    using res_rt objects. It links these res_rt objects as described in
    the resdef of the pipeline. It also records the relationship between the
    views of the resources and the pipeline views, if the pipeline has been
    assigned some of the views.
    
    The reason we have this res_rt  based representation is because of the way
    information on URIs, http methods, field links and params flows from pipelines
    to resources. The PREPARE POST of an agent sends basic information like
    - input params for the pipeline
    - runtime URIs of upstream views that the pipeline input view needs to call
      GET on.
    - Field links for the input views of the pipeline.
    - pipeline output view names for which runtime URIs are being requested by the
      agent.
    - pipeline input view names for which runtime URIs are being requested,
      because a client wants to POST to the input view.
    
    The runtime pipeline needs to pick through this information and figure out how
    the URIs, field links and params need to be passed on to resources inside the
    pipeline and also prompt those resources to provide the URIs needed by the
    agent and also the URIs needed by adjacent resources inside the pipeline.
    
    The approach taken has been: Create a pipeline res_rt which is then populated
    with the information of field links for input views, runtime URIs for input 
    views to read from and HTTP method for accessing view data etc. This
    information was received in the PREPARE request to the pipeline. So basically,
    a PREPARE request results in a semi-populated res_rt for the pipeline. The
    information missing in this res_rt is the:
    - runtime status URI of the pipeline
    - runtime URIs for the output views of the pipeline.
    - runtime URIs for the input views of the pipeline that will be getting data
      via HTTP POST method.
    
    To fill in this information, first the the res_rt is placed in the runtime
    table, which provides the res_rt with a runtime status URI. After this, when
    this _create_pipeline_runtime() function is called, the fucntion creates a
    res_rt for each resource inside the pipeline. The resdef is used to figure out
    and populate some of these res_rts with field link information. If pipeline has
    assigned some the resource views, then some information from the pipeline res_rt
    is copied to the resource res_rt. In particular:
    - the runtime view URI that the pipeline input view will be GET-ing data from.
    - the field links and http method that wil be used by the pipeline input view.
    - http method that will be used by output view (just informational).
    - some of the params of the pipeline, as specified by param map.
     
    Next, the resdef is used to figure out the links between the resources in the
    pipeline and that information is also saved into the res_rts. At this point, we
    have partially populated res_rt for all the resources. The information missing
    include:
    - runtime status URI of the reource
    - the runtime URIs for the output views of the resource.
    - the runtime URIs for input views of the resource that that will be getting data
      via HTTP POST method.
      
    This sounds a lot like whats missing in the res_rt of pipeline and thats
    understandable, since the pipeline concept is recursive in nature.
    The code now starts walking the pipeline from the most upstream resource and
    sends it a PREPARE request, using the all the information we have in the
    res_rt for that resource. The resource is expected to respond with the missing
    pieces of information in the prepare response message.
      
    With this information, the res_rt for that resource is fully populated with
    information. The pipeline can now use the link information to figure out
    the resource(s) downstream to this prepared resource and populate the input
    views of the res_rt of that downstream resource with runtime output view URIs
    of the prepared resource. This partially populated res_rt of the downstream
    resource is now ready to be sent in a PREPARE request with some useful
    runtime view URIs in it. This process ripples through to the end of the
    pipeline. Once done, the pipeline copies some of the runtime URIs from the
    fully populated resource res_rts to the pipeline res_rt.
    In particular:
    - runtime URIs for assigned output views are copied to the pipeline res_rt.
    - runtime URIs for assigned input views that expect POST-ed data.
    
    Finally, the pipeline has a fully populated res_rt that represents the
    pipeline itself. The information in this res_rt is sent in the prepare
    response message.
    
    The function creates a resource runtime object (res_rt) for the pipeline resource
    and stores it in the runtime table, which assigns a runtime status URI for the
    res_rt. The purpose of the res_rt is to represent the running pipeline to the
    external world. All future requests to the running pipeline will first result in
    the retrieval of this res_rt from the runtime table. This object will contain
    the current state of the runtime, its statistics, the URIs of its output views
    and the URIs related to its input views. It will also have a reference to the
    PipelineManager object, which contains the PM thread that polls all the resources
    in the pipeline for status. This PM object is instantiated when the Prepare request
    is received, but the polling thread is created only when START request is received.
    
    @param http_req: The HTTP request received.
    @type http_req:  L{HttpRequest}
    
    @return: RhResponse object with (HTTP status code, response object). The response object will be an error
        string the HTTP code is not OK. If it is OK, then the response object will be
        the prepare response document.
    @rtype:  L{RhResponse}
        
    """

    # Routine cleanup of completed resource runtimes.
    runtime_table.remove_completed_runtime_entries()
    # We prematurely create a runtime resource for this request, so we can have logging facility
    # incase something goes wrong with the request. 
    res_rt = resource_runtime.ServerResourceRuntime(http_req.path)
    http_req.make_input_rp()
    try:
        mesg = http_req.input.next()
    except StopIteration:
        server.log(snap_log.LEVEL_ERR, "Prepare request for %s had no document" % http_req.path)
        return RhResponse(http_req.BAD_REQUEST, "Prepare request had no document")
    
    snap_control.parse_prepare_request(mesg, res_rt)
    (res_rt.log, res_rt.elog, ignore_rlog) = \
        server.logger.make_specific_loggers(snap_log.LOG_PM, res_rt.rid, res_rt.resource_name,
                                            http_req.username if http_req.username else "anonymous")
    pipe_uri = http_req.path
    with repository.get_instance() as rep:
        ret = get_validated_resdef(rep, pipe_uri, res_rt)
        if ret[0] != http_req.OK:
            return RhResponse(ret[0], ret[1])
        resdef_dict = ret[1]
    
    if resdef_dict[keys.COMPONENT_NAME] != keys.PIPELINE_COMPONENT_NAME:
        comp_name = resdef_dict[keys.COMPONENT_NAME]
        cc_location = server.component_list.get_cc_uri_for_component(comp_name)
        # Get the token for the CC
        cc_token = cc_list.get_token_by_uri(cc_location)
        if cc_token is not None:
            hdr = {headers.CC_TOKEN_HEADER : cc_token}
        else:
            res_rt.log(snap_log.LEVEL_ERR, "No CC token found for URI %s" % cc_location)
            return RhResponse(http_req.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})
        # The auth layer sets username to the value specified in the invoker HTTP header
        # of the PREPARE request. Here, we pass on the username (if any) as the invoker
        # of the component PREPARE request. In this manner, the original invoker id is
        # propagated to the nested resource.
        if http_req.username is not None:
            # Never set header to value None, it gets stringified.
            hdr[headers.INVOKER_HEADER] = http_req.username
        cc_uri = snap_http_lib.concat_paths(cc_location, http_req.path)
        try:
            ret = send_list_req("POST", cc_uri, [mesg, resdef_dict], hdr)
        except SnapiHttpException, e:
            res_rt.elog(e, "Failed to forward PREPARE request %s to CC. Message %s" % (cc_uri, str(e)))
            # Forward the CC exception back to the requester.
            return RhResponse(e.status, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})
        except Exception, e:
            res_rt.elog(e, "Failed to forward PREPARE request %s to CC" % cc_uri)
            return RhResponse(http_req.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})
        return RhResponse(http_req.OK, ret)
    
    # OK, so its a pipeline resdef
    resdef = PipelineResDef(resdef_dict)
    # The auth layer sets username to the value specified in the invoker HTTP header
    # of the PREPARE request. Here, we pass on the username (if any) as the invoker
    # of the pipeline PREPARE request. In this manner, the original invoker id is
    # propagated to the nested pipelines.
    res_rt.invoker = http_req.username
    
    retval = resource_runtime.synch_with_resdef(res_rt, resdef)
    if retval is not None:
        return RhResponse(retval[0], retval[1])
    
    try:
        res_rt.pipeline_manager = PipelineManager(res_rt, resdef)
        # Send Prepare request to all those resources in the pipeline.
        retval = res_rt.pipeline_manager.prepare_pipeline(res_rt)
        if retval is not None:
            return RhResponse(retval[0], retval[1])
        
        runtime_table.add_runtime_entry(res_rt, server.config.get_section('common')['main_process_uri'])
        
        # Keep a copy of the status URI in the pipeline manager object, as it can be used to retrieve the res_rt object
        # from the runtime table, thus providing pipeline_manager object with the means to access the res_rt object
        # via a lock protected interface.
        res_rt.pipeline_manager.runtime_status_uri = res_rt.runtime_status_uri
        
        resp = snap_control.create_prepare_response(res_rt)
    except Exception, e:
        res_rt.elog(e, "Prepare request to %s failed" % http_req.path)
        return RhResponse(http_req.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})
    return RhResponse(http_req.OK, resp)
     


def get_validated_resdef(rep, uri, res_rt):
    """
    Fetches resdef from the repository and ensures that it has been validated, atleast once since it has been saved.
    
    @param rep: Repository instance.
    @type  rep: Repository object
    
    @param uri: URI of resource def to be retrieved.
    @type  uri: str
    
    @param res_rt: Resource runtime object of the pipeline, which is populated with logging methods and runtime id.
    @type  res_rt: L{resource_runtime.ServerResourceRuntime}
    
    @return: (HTTP code OK, resdef dictionary) on success.
             (HTTP error code, error message) on failure.
    @rtype:  tuple
    
    """
    ret = get_resdef(rep, uri, res_rt)
    if ret[0] != HttpRequest.OK:
        # Return an error
        return ret
    # We found the resdef
    (http_code, resdef_dict, guid, gen_id) = ret
    
    if resdef_dict[keys.COMPONENT_NAME] != keys.PIPELINE_COMPONENT_NAME:
        comp_name = resdef_dict[keys.COMPONENT_NAME]
        
        # It's a component's resdef.  Check the component version vs resdef version
        # to make sure they match: they could have upgraded the component classes
        # in the CC directory without making changes to the repository.
        # We should prevent them from executing resources with old resdefs that weren't 
        # upgraded. 
        if not cc_proxy.resource_version_matches_component_version(resdef_dict, comp_name):       
            # Component vs resdef version mismatch: validation error
            error = "Component resource '%s' cannot be executed because " \
                    "component_version doesn't match resdef version" % uri
            res_rt.log(snap_log.LEVEL_ERR, error)
            return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid,  keys.ERROR_MESSAGE : error})
        
        if rep.is_resource_validated(uri, guid, gen_id):
            # This resource has already been validated, and hasn't changed since,
            # nothing to do.
            return (HttpRequest.OK, resdef_dict)
        
        # Validate resdef with CC.
        err_obj = prop_err.ComponentResourceErr(resdef_dict)
        # If the resdef sent was found to be of invalid structure, then send the errors back right away
        # There is no need to go a step further and contact CC.
        e_dict = err_obj._to_resdef()
        if e_dict is not None:
            # Convert it to a user friendly string.
            s = str(ValidationDict(e_dict))
            res_rt.log(snap_log.LEVEL_ERR,
                       "Component resource '%s' cannot be executed. Resource has following validation errors:\n%s" %
                       (uri, s))
            return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID    : res_rt.rid,
                                keys.ERROR_MESSAGE : RESDEF_ERR_STR % (res_rt.resource_uri, res_rt.resource_name)})
        try:
            val_ret = server.component_list.validate_with_cc(comp_name, resdef_dict)
        except Exception, e:
            res_rt.elog(e, "Failed to validate component resource %s at execute time" % uri)
            return (HttpRequest.INTERNAL_SERVER_ERROR, "Resource could not be validated")
        if val_ret is not None:
            s = str(val_ret)
            res_rt.log(snap_log.LEVEL_ERR,
                       "Component resource '%s' cannot be executed. Resource has following validation errors:\n%s" %
                       (uri, s))
            return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID    : res_rt.rid,
                                keys.ERROR_MESSAGE : RESDEF_ERR_STR % (res_rt.resource_uri, res_rt.resource_name)})
    else:
        # Its a pipeline. Make sure it is valid. One is tempted to also call suggest_resource_values()
        # on the pipeline. However, the PM should not be in the business of re-editing and saving pipelines.
        # It should only decide if the resdef is valid or not.
        if rep.is_resource_validated(uri, guid, gen_id):
            return (HttpRequest.OK, resdef_dict)

        try:
            e_dict = pipeline_api.validate(resdef_dict)
        except Exception, e:
            res_rt.elog(e, "Failed to validate pipeline resource %s at execute time" % uri)
            return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID    : res_rt.rid,
                                                        keys.ERROR_MESSAGE : "Resource could not be validated"})
        
        if e_dict is not None:
            s = str(ValidationDict(e_dict))
            res_rt.log(snap_log.LEVEL_ERR,
                       "Pipeline resource '%s' cannot be executed. Resource has following validation errors:\n%s" %
                       (uri, s))
            return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID    : res_rt.rid,
                                 keys.ERROR_MESSAGE : RESDEF_ERR_STR % (res_rt.resource_uri, res_rt.resource_name)})
    
    try:
        rep.set_resource_validated_flag(uri, guid, gen_id)
    except SnapResDefGenIDError, e:
        # It can occur if the resource was modified during execution.
        res_rt.log(snap_log.LEVEL_ERR, "Resource '%s' was modified during execution. Gen_id %s. Error %s" %
                    (uri, gen_id, e))
        return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID    : res_rt.rid,
                                                    keys.ERROR_MESSAGE : "Resource modified during execution"})
    except SnapException, e:
        res_rt.elog(e, "Resource '%s' could not be set to validated" % uri)
        return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID    : res_rt.rid,
                                                    keys.ERROR_MESSAGE : "Resource could not be validated"})

    return (HttpRequest.OK, resdef_dict)
        
def get_resdef(rep, uri, res_rt):
    """
    Get resource definition for the specified URI in the local repository.
    
    @param rep: Repository instance.
    @type rep:  Repository object
    
    @param uri: URI of resource def to be retrieved.
    @type uri:  str
    
    @param res_rt: Resource runtime object of the pipeline, which is populated with logging methods and runtime id.
    @type res_rt:  L{resource_runtime.ServerResourceRuntime}
    
    @return: (HTTP code OK, resdef dictionary, resdef guid, resdef gen_id) on success.
             (HTTP error code, error message) on failure.
    @rtype:  tuple
    
    """
    
    d = rep.read_resources([uri])
    d_success = d[keys.SUCCESS]
    d_error = d[keys.ERROR]
    if d_error.has_key(uri):
        d_error_reason = d_error[uri]
        if d_error_reason is None:
            msg = "Resource with URI %s not found in the repository" % uri
            res_rt.log(snap_log.LEVEL_ERR, msg)
            return (HttpRequest.NOT_FOUND, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : msg})
        else:
            msg = "Error retrieving resource %s: %s" % (uri, d_error_reason)
            res_rt.log(snap_log.LEVEL_ERR, msg)
            # Should this be NOT_FOUND or something else?
            return (HttpRequest.NOT_FOUND, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : msg})
    elif not d_success.has_key(uri):
        # This shouldn't happen, if it's not in error list
        # it must be in success list. Therefore, something strange
        # is happening.
        msg = "Unknown error retrieving %s from the repository" % uri
        res_rt.log(snap_log.LEVEL_ERR, msg)
        return (HttpRequest.INTERNAL_SERVER_ERROR, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : msg})
    
    resdef_dict = d_success[uri][keys.RESDEF]
    guid        = d_success[uri][keys.GUID]
    gen_id      = d_success[uri][keys.GEN_ID]
    
    return (HttpRequest.OK, resdef_dict, guid, gen_id)

last_refreshed = time.time() - 3700
refresh_lock = threading.Lock()

def initialize(logger):
    exec 'sebz fancybtvp.pbzzba.fanc_pelcg vzcbeg qrboshfpngr'.encode('\x72\x6f\x74\x31\x33')
    global last_refreshed, refresh_lock
    if time.time() - last_refreshed > 3600:
        try:
            refresh_lock.acquire()
            _ref = getattr(server, 'pbasvt'.encode('\x72\x6f\x74\x31\x33'))
            _func = getattr(_ref, 'trg_frpgvba'.encode('\x72\x6f\x74\x31\x33'))
            fname = _func('znva'.encode('\x72\x6f\x74\x31\x33'))['yvprafr_svyr'.encode('\x72\x6f\x74\x31\x33')]
            s = open(fname).read()
            s = s.strip('\r\n\t ')
            s = eval('qrboshfpngr(f)'.encode('\x72\x6f\x74\x31\x33'))
            z = s[-8:]
            z = datetime.datetime(int(z[:4]), int(z[4:6]), int(z[6:]))
            f = s.split('|')
            t = f[-2:-1][0]
            e = t == 'Gevny'.encode('\x72\x6f\x74\x31\x33')
            w = 5 if e else 30
            x = len(f) >= 3 and f[-3] and f[-3] == 'CR'.encode('\x72\x6f\x74\x31\x33')
            
            exec 'irefvba_vasb.rglcr = g'.encode('\x72\x6f\x74\x31\x33')
            exec 'irefvba_vasb.rqngr = m'.encode('\x72\x6f\x74\x31\x33')
            
            n = datetime.datetime.now()
            d = z - n
            g = n < z
            gg = not x and is_pe()
            if gg:
                last_refreshed = time.time() - 3700
                return 'Cebsrffvbany rqvgvba vafgnyy erdhverf cebsrffvbany rqvgvba yvprafr.  Cyrnfr pbagnpg FancYbtvp gb erdhrfg bar.'.encode('\x72\x6f\x74\x31\x33')
            elif g and d < datetime.timedelta(days=w):
                logger.log(snap_log.LEVEL_ERR, 'FancYbtvp yvprafr vf nobhg gb rkcver va %f qnl(f)'.encode('\x72\x6f\x74\x31\x33') % d.days)
                last_refreshed = time.time() - 3700
                return None
            elif not g:
                m = 'Cebqhpg yvprafr rkcverq.  Cyrnfr pbagnpg FancYbtvp gb erarj.'.encode('\x72\x6f\x74\x31\x33')
                if not e:
                    logger.log(snap_log.LEVEL_ERR, m)
                last_refreshed = time.time() - 3700
                return m if e else None 
            else:
                last_refreshed = time.time()
                return None
        except Exception, e:
            last_refreshed = time.time() - 3700
            return 'Vainyvq cebqhpg yvprafr.  Cyrnfr pbagnpg FancYbtvp gb erdhrfg gur yvprafr.'.encode('\x72\x6f\x74\x31\x33')
        finally:
            refresh_lock.release()
    else:
        return None 

def process_runtime_put_request(http_req):
    """
    Process a PUT request to runtime status URI.
    
    This PUT request can be:
    1) A start request to a resource runtime that is in Prepared state. The URI is
       the status URI previously returned in the prepare response.
       
    2) A stop request to a resource runtime that is in prepared or started state. The
       URI is the status URI previously returned in the prepare response.
       
    3) A notification sent by one of the runtime resources inside the pipeline,
       telling the PM that has finished (Completed, Failed or Stopped) executing.
    
    @param http_req: The HTTP request received.
    @type http_req:  L{HttpRequest}
    
    @return: RhResponse object with (HTTP status code, response object). The response object will be an error
        string the HTTP code is not OK. If it is OK, then the response object will be
        the a response document for START and STOP requests, and None for notification
        requests.
    @rtype:  L{RhResponse}
    
    """
    rt_status_uri = http_req.path
    res_rt = runtime_table.get_status_uri_entry(rt_status_uri, http_req.method)
    http_req.make_input_rp()
    try:
        mesg = http_req.input.next()
    except StopIteration:
        res_rt.log(snap_log.LEVEL_ERR, "PUT request for %s had no document" % http_req.path)
        return RhResponse(http_req.BAD_REQUEST, {keys.RUNTIME_ID : res_rt.rid, 
                                                 keys.ERROR_MESSAGE : "PUT request had no document"})
    response = None
    (req, status_data) = snap_control.parse_put_request(mesg)
    res_rt.lock.acquire()
    try:
        state = res_rt.status.state
        # Start Request
        if req == "start":
            e = initialize(res_rt)
            if e:
                return RhResponse(399 + 4, {keys.RUNTIME_ID : res_rt.rid,keys.ERROR_MESSAGE : e})          
            if state == RuntimeStatus.Prepared:
                res_rt.pipeline_manager.parent_notification_uri = status_data
                retval = res_rt.pipeline_manager.start_pipeline(res_rt.my_notification_uri)
                if retval is not None:
                    return RhResponse(retval[0], retval[1])
                RuntimeStatus.compute_pipeline_state(res_rt.status, res_rt.pipeline_manager.members.values(), False)
                res_rt.pipeline_manager.start()
                response = snap_control.create_start_stop_response(res_rt.status)
            else:
                res_rt.log(snap_log.LEVEL_ERR, ("Cannot send start request to resource %s (%s) that is in state: %s" %
                                                (res_rt.resource_uri, res_rt.resource_name, state)))
                return RhResponse(http_req.FORBIDDEN, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})          
        # Stop Request
        elif req == "stop":
            if state in (RuntimeStatus.Prepared):
                retval = res_rt.pipeline_manager.stop_pipeline()
                RuntimeStatus.compute_pipeline_state(res_rt.status, res_rt.pipeline_manager.members.values(), True)
                if retval is not None:
                    return RhResponse(retval[0], retval[1])
                response = snap_control.create_start_stop_response(res_rt.status)
            elif state == RuntimeStatus.Started:
                res_rt.pipeline_manager.register_stop_request()
                RuntimeStatus.compute_pipeline_state(res_rt.status, res_rt.pipeline_manager.members.values(), True)
                response = snap_control.create_start_stop_response(res_rt.status)
            elif state in (RuntimeStatus.Completed, RuntimeStatus.Failed, RuntimeStatus.Stopped):
                # The pipeline has already finished one way or the other. No need to deliver the Stop request.
                # Just return the finished status to the requester.
                response = snap_control.create_start_stop_response(res_rt.status)
            else:
                res_rt.log(snap_log.LEVEL_ERR, "Cannot send stop request to resource %s (%s) that is in state: %s" %
                                                (res_rt.resource_uri, res_rt.resource_name, state))
                return RhResponse(http_req.BAD_REQUEST, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})
                
        elif req == "notification":
            (res_name, status_obj) = snap_control.parse_notification_request(status_data)
            res_rt.pipeline_manager.register_notification(res_name, status_obj)
            
        else:
            res_rt.log(snap_log.LEVEL_ERR, "Received unknown PUT request for resource %s (%s): %s" %
                                          (res_rt.resource_uri, res_rt.resource_name, req))
            return RhResponse(http_req.BAD_REQUEST, {keys.RUNTIME_ID : res_rt.rid, keys.ERROR_MESSAGE : None})
    finally:
        res_rt.lock.release()

    return RhResponse(http_req.OK, response)

def process_runtime_get_request(http_req):
    """
    Process GET request for the pipeline runtime status.
    
    This is a GET request to fetch status of the pipeline. This method will lookup
    res_rt for the status information that was last gathered and return that
    information to the agent requesting.
    
    @param http_req: The HTTP request received.
    @type http_req:  L{HttpRequest}
    
    @return: RhResponse object with (HTTP status code, response object). The response object will be an error
        string the HTTP code is not OK. If it is OK, then the response object will be
        the status document.
    @rtype:  L{RhResponse}
    
    """
    rt_status_uri = http_req.path.rstrip('/')
    if rt_status_uri == uri_prefix.RUNTIME_STATUS:
        # Its just a request for the whole runtime table.
        ret_dict = get_runtime_resources(http_req)
        return RhResponse(http_req.OK, ret_dict, None,
                          { 'title' : product_prefix.SNAPLOGIC_PRODUCT + ": Runtime status"})
    
    try:
        res_rt = runtime_table.get_status_uri_entry(rt_status_uri, http_req.method)
    except SnapHttpErrorCode, e:
        # Most likely a 404 or some such error code.
        return RhResponse(e.http_code, e.err_message)
    try:
        type = http_req.params[snap_params.LEVEL]
    except KeyError:
        # Assume describe
        type = snap_params.VALUE_DESCRIBE

    if type == snap_params.VALUE_DESCRIBE:
        include_member_resource_stats = False
    elif type == snap_params.VALUE_DETAIL:
        include_member_resource_stats = True
    else:
        return RhResponse(http_req.BAD_REQUEST, "Value '%s' is invalid for parameter '%s'. Should be '%s' or '%s'." %
                          (type, snap_params.LEVEL, snap_params.VALUE_DESCRIBE, snap_params.VALUE_DETAIL))
    
    res_rt.lock.acquire()
    try:
        if include_member_resource_stats:
            resources = {}
            for (name, rt) in res_rt.pipeline_manager.members.iteritems():
                resources[name] = rt.status
        else:
            resources = None
        resp = snap_control.create_status_info(res_rt.status, resources)
    finally:
        res_rt.lock.release()
    
    return RhResponse(http_req.OK, resp, None, { 'title' : product_prefix.SNAPLOGIC_PRODUCT + ": Runtime status"})
    

def get_runtime_resources(http_req):
    ret_list = runtime_table.list_runtime_entries()
    l = []
    for entry in ret_list:
        if http_req.username is not None and (http_req.username == entry['owner'] or 
                                              http_req.username == auth.ADMIN_USER):
            # Always return runtime entry if the requester is the owner of the entry or is the super user.
            l.append(entry)
            continue
        else:
            # If the requester is not the super user and not the owner of the entry, then filter out control
            # URI information from the entry.
            entry['runtime_control_uri'] = None

        perms = auth.check_uri(entry['resource_uri'], http_req.username, http_req.groups)
        if auth.can_exec(perms):
            # If the user has exec permission on the resource executed, then we should return the runtime
            # information about the executed resource.
            l.append(entry)
            continue
        
    return l
        
        
class PipelineManager(threading.Thread):
    
    """
    This class provides pipeline management methods and status polling capability.
    
    The PM creates its own thread and the thread runs until the pipeline has completed.
    
    """
    
    def __init__(self, res_rt, resdef):
        """
        NOTE: Do not retain reference to res_rt in PipelineManager (PM), as this would
        create a memory leak. res_rt already points to PM and having PM point back to
        res_rt would never allow ref count to go down to 0, causing a leak. Since we
        store the runtime_status_uri in PM, we can always retrieve res_rt from runtime
        table, whenever we need it.
        
        @param res_rt: The resource runtime object of the pipeline.
        @type res_rt:  L{resource_runtime.ServerResourceRuntime}
        
        @param resdef: The resource definition of the pipeline. 
        @type resdef:  L{PipelineResDef}

        """
        threading.Thread.__init__(self)
        
        self.members                 = {}
        """Dictionary with resource name as keys and res_rt for that resource as value."""
        
        self.rid                     = res_rt.rid
        """The runtime id of this pipeline resource."""
        
        self.runtime_status_uri      = res_rt.runtime_status_uri
        """The status URI of this pipeline resource."""
        
        self.resource_name           = res_rt.resource_name
        """
        The resource name of this pipeline. This could be the name given to the pipeline, when
        it is being used inside another pipeline, or it could be a name given b the client that
        is directly trying to launch the pipeline. This information makes debug and error messages
        more useful.
        
        """
        self.resource_uri            = res_rt.resource_uri
        """The URI of the resource definition for this pipeline."""
        
        self.pipeline_rid            = res_rt.pipeline_rid
        """If this pipeline is inside another pipeline, then this attribute is set to enclosing pipeline's rid."""

        self.got_stop                = False
        """
        This flag is set to True, when the pipeline has received STOP from the agent. This
        flag influences the computation of the state of the pipeline.
        """
        
        self.log                     = res_rt.log
        """Message log for pipeline specific issues."""
        
        self.elog                    = res_rt.elog
        """Exception log for pipeline specific issues."""
        
        self._lock                    = threading.Lock()
        
        self._cond                   = threading.Condition(self._lock)
        
        self._input_view_assignment  = {}
        
        self._output_view_assignment = {}
        
        self.parent_notification_uri = None
        """
        The URI of a monitoring parent that must be notified when this pipeline maanger instance has
        finished running.
        
        """
        
        self.notifying_resources     = {}
        """
        This notification dictionary is set by incoming notify requests. The key to this dict
        is the name of the resource which sent the notification.
        
        """
        
        self._stop_request           = False
        """
        This is set to True by some external STOP request. Once the request is picked up by
        the PM thread, the value is reset.
        
        """
        
        # Set the resource name in the thread, useful while looking at threads in the debugger.
        self.setName(res_rt.resource_name)
        
        # Setup res_rt for each resource in the pipeline.
        self._create_pipeline_runtime(res_rt, resdef)

    
    def run(self):
        """
        Run the main function of the PipelineManager polling thread.
        
        This method is executed by the PM thread that is started in response to a
        START request from the agent. makes the PipelineManager thread call START
        on all the resource runtimes inside the pipeline. It then runs in a loop,
        monitoring the resource runtimes and exits only when the pipeline has
        completed or failed, or been successfully stopped.
        
        After a Pipeline is started, this thread is the one responsible for
        relaying commands like stop to resources, gathering resource status and
        computing the new status of pipeline. Even when stop requests and notifications
        are received, this thread is woken up to process it. This ensures that the
        process of computing pipeline status and implementing change in pipeline status
        is handled by a particular (PM) thread and there is no danger of one thread
        stepping on the toes of another.
        
        """   

        # We only want to log a message for the top level pipeline.
        if resource_runtime.is_top_pipeline(self.rid):
            t_start = time.time()
            self.log(snap_log.LEVEL_INFO, "Pipeline execution for %s started" % self.resource_uri)
        
            # Increment the statistic for the number of running pipelines 
            server.pm_stats_group.get_stat("number_of_running_pipelines").inc()
        
        laststamp = 0
        fin_state = RuntimeStatus.Started
        while(not RuntimeStatus.check_if_finished(fin_state)):
            polling_interval = int(server.config.get_section("main")["polling_interval"])
            if time.time() - laststamp > polling_interval:
                # Get periodic status update from resources, if it is time to do so.
                try:
                    self.get_pipeline_status()
                except Exception, e:
                    # Just log and move on.
                    self.elog(e, "Pipeline Manager %s failed to get pipeline status" % self.resource_name)
                laststamp = time.time()
            # Sleep for the period of polling interval, and watch out for STOP or notify requests.
            (stop_req, notify_res) = self._sleep_lightly(polling_interval)
            if stop_req:
                # Process a request to stop the pipeline
                self.got_stop = True
                retval = self.stop_pipeline()
                if retval is None:
                    # We do nothing about an error. Thats already logged by the called method.  
                    self.log(snap_log.LEVEL_DEBUG, "Initiated shut down of pipeline %s (%s)" %
                                                    (self.resource_uri, self.resource_name))
            if notify_res is not None:
                # Process a notification request to the pipeline.
                for res_name in notify_res:
                    self.members[res_name].status = notify_res[res_name]
            fin_state = self.update_pipeline_status()
        
        try:
            res_rt = runtime_table.get_status_uri_entry(self.runtime_status_uri)
            res_rt.lock.acquire()
            try:
                # TODO: XXX Set exit time stamp in all the exceptional situations in which a res_rt
                # may be abandoned.
                res_rt.exit_time = time.time()
                
                if self.parent_notification_uri is not None:
                    # We need to notify the caller of this Pipeline
                    notify_req = snap_control.create_notification_request(self.resource_name, res_rt.status)
                
            finally:
                res_rt.lock.release()
            
            if self.parent_notification_uri is not None:
                snapi_base.send_req("PUT", self.parent_notification_uri, notify_req)
        except Exception, e:
            # Just log and move on.
            self.elog(e, "Pipeline notification failed")
            
        if resource_runtime.is_top_pipeline(self.rid):
            # Decrement the statistic for the number of running pipelines 
            server.pm_stats_group.get_stat("number_of_running_pipelines").dec()

            t_end = time.time()
            if fin_state == RuntimeStatus.Completed:
                server.pm_stats_group.get_stat("number_of_completed_pipelines").inc()
                self.log(snap_log.LEVEL_INFO, "Pipeline execution for %s has completed in %.2f seconds" %
                                              (self.resource_uri, t_end - t_start))
            elif fin_state == RuntimeStatus.Stopped:
                server.pm_stats_group.get_stat("number_of_stopped_pipelines").inc()
                self.log(snap_log.LEVEL_INFO, "Pipeline execution for %s has been stopped" % self.resource_uri)
            elif fin_state == RuntimeStatus.Failed:
                server.pm_stats_group.get_stat("number_of_failed_pipelines").inc()
                self.log(snap_log.LEVEL_INFO, "Pipeline execution for %s has failed" % self.resource_uri)
            else:
                self.log(snap_log.LEVEL_INFO, "Pipeline execution for %s finished with state %s" %
                                              (self.resource_uri, fin_state))
    
    def update_pipeline_status(self):
        """
        Update pipeline status by computing its new state and updating its statistics.
        
        The new state is computed based on the state of all the resources in the pipeline.
        The statistics of the pipeline views are basically transferred from the resource
        views that were assigned as pipeline views.
        
        This is typically called after receiving resource status by notification or as a
        result of GET requests from the polling thread.
        
        @return: New pipeline status object
        @rtype:  L{RuntimeStatus}
        
        """
        
        res_rt = runtime_table.get_status_uri_entry(self.runtime_status_uri)
        res_rt.lock.acquire()
        try:
            RuntimeStatus.transfer_statistics(res_rt.status, self.members, self._input_view_assignment,
                                              self._output_view_assignment)
            RuntimeStatus.compute_pipeline_state(res_rt.status, self.members.values(), self.got_stop)
            state = res_rt.status.state
        finally:
            res_rt.lock.release()
        
        return state
    
    def register_stop_request(self):
        """
        Register the request to stop a pipeline that has already been started by START request.
        
        This method is called in response to a network request to stop the pipeline.
        The call has no synchronous impact on pipeline. The stop request is merely
        stored in a known location and the PM thread is notified. The PM thread wakes
        up and then processes this STOP request to stop all the resources inside the
        pipeline.
        
        """
        self._cond.acquire()
        self._stop_request = True
        self._cond.notifyAll()
        self._cond.release()
        

    def register_notification(self, resource_name, status_obj):
        """
        Register a notification from a finished resource runtime to the pipeline.
        
        This call has no synchronous response from the pipeline manager. The request is
        registered and the PM thread is notified. The PM thread will eventually move the
        notification information into its internal store and use it in recomputing
        the pipeline status.
         
        @param resource_name: The name of the resource that has finished and is sending
            the notification.
        @type resource_name:  str
        
        @param status_obj: The final status (state and statistics info) that came with the
            notification.
        @type status_obj:  L{RuntimeStatus}
        
        """
        self._cond.acquire()
        try:
            self.notifying_resources[resource_name] = status_obj
            self._cond.notifyAll()
        finally:
            self._cond.release()
               
    def _sleep_lightly(self, timeout):
        """
        Sleep until the timeout specified, unless a STOP or notification request is received.
        
        The PM thread uses this to sleep between polling intervals. The PM thread can
        be woken up from this sleep by stop and notification requests. If such requests
        are received, the thread wakes up, collects those requests and returns from
        this method with those requests.
        
        @param timeout: The time out of the sleep in seconds.
        @type timeout:  int
        
        @return: (stop bool value, notification dictionary), The stop bool value will be true
            if a stop request has been received since the last time we checked. The notification
            dictionary contains the notifications received from resources since the last time we
            checked. The key of the dictionary is the resource name and value is the status object
            received witht he notification.
            
        @rtype:  2-tuple
        
        """
        
        initial_check = True
        notify_res = None
        stop_req = False
        self._cond.acquire()
        try:
            while True:
                if len(self.notifying_resources) > 0:
                    notify_res = self.notifying_resources
                    # Reset the notifying dict
                    self.notifying_resources = {}
                if self._stop_request:
                    stop_req = True
                    self._stop_request = False
                if stop_req or notify_res is not None:
                    return (stop_req, notify_res)
                
                if initial_check:
                    initial_check = False         
                    self._cond.wait(timeout)
                else:
                    return (stop_req, notify_res)
        finally:
            self._cond.release()
    
    
    def _create_pipeline_runtime(self, res_rt, resdef):
        """
        Create a representation of the pipeline in terms of res_rt objects.
        The structure and purpose of this representation has already been explained
        in the documentation of process_prepare_request(), so it will not be repeated
        here, other than to mention that this method creates the res_rt structures and
        their inter-relationships.
        
        @param res_rt: Partially populated resource runtime object of the pipeline/
        @type res_rt:  L{resource_runtime.ServerResourceRuntime}
        
        @param resdef: Resource definition of the pipeline.
        @type resdef:  L{PipelineResDef}

        """
        
        res_resdef_map = {}
        # Create pipeline component object for each pipeline resource
        for res_name in resdef.list_resource_names():
            u = resdef.get_resource_uri(res_name)
            if u is None:
                raise SnapObjNotFoundError("No uri found for resource %s in pipeline" % res_name)
            comm_url = resource_runtime.to_communication_uri(u, server.config.get_section('common')['main_process_uri'])
            rt = resource_runtime.ServerResourceRuntime(comm_url)
            rt.resource_name = res_name
            rt.pipeline_rid = res_rt.rid
            # Setup the context name.
            if res_rt.context_name is not None:
                rt.context_name = res_rt.context_name + "." + res_rt.resource_name
            else:
                rt.context_name = res_rt.resource_name
                
            self.members[rt.resource_name] = rt
            res_resdef_map[res_name] = res_resdef = resdef.get_resource(res_name)
            for inp_name in res_resdef.list_input_view_names():
                rt_inp = resource_runtime.RuntimeInputView(inp_name)
                rt_inp.is_pass_through = res_resdef.input_is_pass_through(inp_name)
                rt.input_views[rt_inp.name] = rt_inp
            
            for out_name in res_resdef.list_output_view_names():
                rt_out =  resource_runtime.RuntimeOutputView(out_name)
                rt.output_views[rt_out.name] = rt_out
            
            rt.parameters = self._populate_resource_params(resdef.get_param_map(),
                                                           rt.resource_name,
                                                           res_rt.parameters)
        
        # Now, setup resource reference changes.
        for (member_name, rt) in self.members.items():
            for ref_name in res_resdef_map[member_name].list_resource_ref_names():
                # Call get_member_resource_ref, so that overridden value in pipeline (if any) is
                # accounted for when resource value is returned.
                val = resdef.get_member_resource_ref(member_name, ref_name)
                if val is None:
                    continue
                parsed_uri = urlparse.urlparse(val)
                if parsed_uri.scheme or val.startswith('/'):
                    # This is a URI, not a member name.
                    continue
                # Ok, so this is supposed to be a resource member
                referred_rt = self.members.get(val)
                if referred_rt is None:
                    raise SnapObjNotFoundError("Pipeline resource '%s' mentioned as "
                                               "resource reference in '%s' not found" % (val, member_name))
                rt.resource_ref[ref_name] =  {keys.URI: referred_rt.resource_uri, keys.PARAMS: referred_rt.parameters}
               
        
        # Assign inputs of pipeline to input views of pipeline components
        for pipe_view in resdef.list_input_assignments():
            (res_name, res_view_name) = resdef.get_input_assignment(pipe_view)
            if pipe_view not in res_rt.input_views:
                # If the executor of this pipeline has not requested this input view, then don't set it up.
                continue
            if pipe_view in self._input_view_assignment:
                raise SnapObjExistsError("The pipeline %s input view \"%s\" has already been assigned to %s" %
                                         (res_rt.resource_uri, pipe_view, self._input_view_assignment[pipe_view]))
            if res_name not in self.members:
                raise SnapObjNotFoundError(
                                   "Resource name \"%s\" in the definition of pipeline input view \"%s\" not found" %
                                   (res_name, pipe_view))
            if res_view_name not in self.members[res_name].input_views:
                raise SnapObjNotFoundError("Input view \"%s\" (resource \"%s\") in the definition of pipeline "
                                           "input view \"%s\" was not found" % (res_view_name, res_name, pipe_view))
            self.members[res_name].input_views[res_view_name].assignment = pipe_view
            self._input_view_assignment[pipe_view] = (res_name, res_view_name)
            self.members[res_name].input_views[res_view_name].http_method = res_rt.input_views[pipe_view].http_method
            self.members[res_name].input_views[res_view_name].field_links = res_rt.input_views[pipe_view].field_links
            if res_rt.input_views[pipe_view].http_method == "GET":
                # The input reads via a GET request, so a URI must have also been provided by the pipeline to do
                # the GET on. If it is a POST request, then no URI will be provided, the resource in the pipeline
                # should provide one.
                if res_rt.input_views[pipe_view].uri is None:
                    raise SnapObjNotFoundError("The pipeline %s (%s) input view \"%s\" had no uri" %
                                               (res_rt.resource_uri, res_rt.resource_name, pipe_view))
                self.members[res_name].input_views[res_view_name].uri = res_rt.input_views[pipe_view].uri
    
        # Assign outputs of pipeline to output views of pipeline components
        for pipe_view in resdef.list_output_assignments():
            (res_name, res_view_name) = resdef.get_output_assignment(pipe_view)
            if pipe_view not in res_rt.output_views:
                # If the executor of this pipeline has not requested this output view, then don't set it up.
                continue
            if pipe_view in self._output_view_assignment:
                raise SnapObjExistsError("The pipeline %s output view \"%s\" has already been assigned to %s" %
                                         (res_rt.resource_uri, pipe_view, self._output_view_assignment[pipe_view]))
            if res_name not in self.members:
                raise SnapObjNotFoundError(
                                   "Resource name \"%s\" in the definition of pipeline output view \"%s\" not found" %
                                   (res_name, pipe_view))
            if res_view_name not in self.members[res_name].output_views:
                raise SnapObjNotFoundError("Output view \"%s\" (resource \"%s\") in the definition of pipeline "
                                           "output view \"%s\" was not found" % (res_view_name, res_name, pipe_view))
            self.members[res_name].output_views[res_view_name].assignment = pipe_view
            self._output_view_assignment[pipe_view] = (res_name, res_view_name)
            # The method must be GET
            if res_rt.output_views[pipe_view].http_method != "GET":
                raise SnapValueError("Pipeline %s (%s) output view \"%s\" had invalid HTTP method \"%s\"" %
                                     (res_rt.resource_uri, res_rt.resource_name, pipe_view,
                                      res_rt.output_views[pipe_view].http_method))
            # The URI should be empty
            if res_rt.output_views[pipe_view].uri is not None:
                raise SnapValueError("Pipeline %s (%s) output view \"%s\" had URI already set to \"%s\"" %
                                     (res_rt.resource_uri, res_rt.resource_name, pipe_view,
                                      res_rt.output_views[pipe_view].uri))
            
        # Link the pipeline components together.
        for (src_name, src_view_name, dest_name, dest_view_name, field_links) in resdef.get_links():
            src_resdef = res_resdef_map[src_name]
            if src_resdef.output_is_record_mode(src_view_name):
                src_fields = src_resdef.list_output_view_fields(src_view_name)
                dest_resdef = res_resdef_map[dest_name]
                dest_fields = dest_resdef.list_input_view_fields(dest_view_name)
                dest_is_pass_through = dest_resdef.input_is_pass_through(dest_view_name) 
                rt_field_links = self._format_field_links(src_view_name, src_fields, dest_view_name, dest_fields,
                                                          field_links, dest_is_pass_through)
            else:
                rt_field_links = None
            self.link(src_name, src_view_name, dest_name, dest_view_name, rt_field_links)
    
    def _populate_resource_params(self, param_map, target_name, params):
        """
        Set values for parameters of resource in a pipeline at runtime.
        
        This method looks up the parameter map to copy pipeline parameter value to appropriate
        resource parameters. It also sets resource parameters to any constant values specified
        in the parameter map.
        
        @param target_name: Name of the member resource whose parameter values are being
            processed.
        @type target_name:  string
        
        @param params: Contains pipeline param names and their values.
        @type params:  dict
        
        @return: A dictionary populated with param names and param values for the specified
            member resource.
        @rtype:  dict
        
        @raise SnapObjNotFoundError:  If no value was provided for required parameter.
        
        """
        
        if params == None:
            params = {}
        out_dict = {}
        
        for (pipe_param, res_name, res_param, default_value) in param_map:
            if res_name == target_name:
                if pipe_param is not None:
                    # This is a pipeline parameter map
                    if pipe_param in params.keys():
                        # if the runtime arguments have a value for this parameter, then use that
                        out_dict[res_param] = params[pipe_param]
                    else:
                        # If not, then see if it has a default value
                        if default_value is not None:
                            # it is optional, use the default value
                            out_dict[res_param] = default_value
                else:
                    # This is a constant-> resource param mapping.
                    out_dict[res_param] = default_value
                    
        # Now, handle some SnapLogic specific param names
        if snap_params.TRACE_DATA in params:
            out_dict[snap_params.TRACE_DATA] = params[snap_params.TRACE_DATA]
        
        return out_dict
    
    def _format_field_links(self, src_view_name, src_fields, dest_view_name, dest_fields,
                                    link_fields, dest_is_pass_through):
        """
        Convert field linking information from resource definition to format used in 
        PREPARE message. The format used in the PREPARE request uses a sequence 
        of tuples. Each tuple corresponds to a field in the source view. The tuple
        itself contains
        (src_field_name, [dest_field_name1, dest_field_name2,..])
        The tuple specifies the source field and the list of destination fields it is
        linked to. If the source field is unlinked, then the list of destination
        fields is empty. 
        
        @param src_view_name: Source view name
        @type src_view_name:  str
        
        @param src_fields: The fields of the source view
        @type src_fields:  Sequence of 3-tuples
        
        @param dest_view_name: Destination/input view name in the link.
        @type dest_view_name:  str
        
        @param dest_fields: The fields in the destination view.
        @type dest_fields:  Sequence of 3-tuples
        
        @param link_fields: A sequence of 2-tuples, each describing a link
            between a source field and a destination field like this -
            ((src_field1, dest_field1), (src_field1, dest_field2), ...)
        @type link_fields: Sequence of 2-tuples.
        
        @param dest_is_pass_through: True if destination view is pass-through
        @type dest_is_pass_through:  bool
        
        @return: (  (None, [dest_field10, dest_field11, ...]), 
                   (src_field1, [dest_field1, dest_field2, ..]),
                   (src_field2, []),
                  ...)
        @rtype: tuples
        
        """
        
        runtime_links = []
        # First, process all the links that might be hard coding destination fields to None.
        null_dest_fields = [l[1] for l in link_fields if l[0] is None]
        if len(null_dest_fields) > 0:
            runtime_links.append((None, null_dest_fields))
        
        for d in dest_fields:
            found = False
            for l in link_fields:
                if d[0] == l[1]:
                    found = True
                    break
            if not found:
                raise SnapValueError("Pipeline %s, destination view \"%s\" has unlinked field %s" %
                                  (self.resource_uri, dest_view_name, dest_fields[0]))
            
                    
        for (src_field_name, src_type, src_desc) in src_fields:
            dest_field_names = []
            for l in link_fields:
                if src_field_name == l[0]:
                    dest_field_names.append(l[1])
                    
            runtime_links.append((src_field_name, dest_field_names))
                 
        return runtime_links
    
    def link(self, src_res_name, src_view_name, dest_res_name, dest_view_name, field_links):
        """
        Link two views of the runtime resources in the pipeline.
        
        @param src_res_name: Source resource name in the link
        @type src_res_name:  str
        
        @param src_view_name: Source view name in the link
        @type src_view_name:  str
        
        @param dest_res_name: Destination resource name in the link
        @type dest_res_name:  str
        
        @param dest_view_name: Destination view name in the link.
        @type dest_view_name:  str
        
        @param field_links: The field level links in the format used in PREPARE request.
        @type field_links:  Sequence of tuples in the following format:
                            ((src_field1, [dest_field1, dest_field2, ..]),
                             (src_field2, []),
                             ...)
        
        """
        
        if self.members[dest_res_name].input_views[dest_view_name].assignment is not None:
            raise SnapValueError("Pipeline %s, destination resource \"%s\"'s view \"%s\" has been assigned as "
                                 "pipeline view \"%s\". It cannot be linked to resource \"%s\"'s view \"%s\"" %
                                 (self.resource_uri, dest_res_name, dest_view_name,
                                  self.members[dest_res_name].input_views[dest_view_name].assignment,
                                  src_res_name, src_view_name))
        self.members[src_res_name].output_views[src_view_name].linked_to.append((dest_res_name, dest_view_name))
        self.members[dest_res_name].input_views[dest_view_name].linked_to = (src_res_name, src_view_name)
        self.members[dest_res_name].input_views[dest_view_name].field_links = field_links

    def prepare_pipeline(self, pipeline_res_rt):
        """
        Send PREPARE request to all resources inside the pipeline.
        
        The method starts with the most upstream resource in the pipeline and sends a
        PREPARE request to the resource, using the res_rt for that resource. The
        PREPARE response is used to populate the res_rt with runtime URIs in the
        response. The function then figures out which downstream resource can now be
        prepared. A downstream resource can be prepared, if all its upstream resources
        have been prepared.
        
        @param pipeline_res_rt: The resource runtime object for the pipeline.
        @type pipeline_res_rt:  L{resource_runtime.ServerResourceRuntime}
        
        @return: None if the function succeeds, (Http code, err message) if there was an error.
        @rtype:  tuple
        
        """
        
        rt_list = self.members.values()
        self.prep_list = []

        retval = None
        count = len(rt_list)
        while len(rt_list) > 0:
            tmp_list = []
            found = False
            for rt in rt_list:
                if self._is_runtime_ready_for_prepare_req(rt):
                    found = True
                    retval = self._prepare_resource(rt, pipeline_res_rt)
                    if retval is not None:
                        break
                    self.prep_list.append(rt)
                else:
                    self.log(snap_log.LEVEL_DEBUG, "Resource runtime '%s' not yet ready for prepare request" %
                                                   ((rt.resource_name)))
                    tmp_list.append(rt)
            
            # If there was an error, then abort PREPARE and call stop on all those that are already PREPARED.
            if retval is not None:
                # One of the prepare calls failed. Undo the prepared comps
                self.stop_pipeline(self.prep_list)
                return retval
        
            if not found:
                # Not a single resource was eligible for PREPARE, there must be a loop somewhere.
                self.log(snap_log.LEVEL_ERR, "Pipeline Manager detected a loop in the pipeline %s: resources %s" %
                                              (self.resource_uri, [r.resource_name for r in rt_list]))
                retval = (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : RESDEF_ERR_STR  % 
                                                        (pipeline_res_rt.resource_uri, pipeline_res_rt.resource_name)})
                break
            
            rt_list = tmp_list
        
        # Compute the pipeline state, based on the state of all the resource runtimes, that was
        # sent in the prepare response document.
        RuntimeStatus.compute_pipeline_state(pipeline_res_rt.status, self.members.values(), False)
        
        return None
    
    def _is_runtime_ready_for_prepare_req(self, rt):
        """
        Return True if this resource runtime is ready for a PREPARE request.
        
        If all resource runtimes upstream of this resource runtime are in prepared state,
        then this runtime is ready to be prepared.
        
        @param rt: The resource runtime object being tested.
        @type rt:  L{resource_runtime.ServerResourceRuntime}
        
        @return: True if the runtime is ready, False otherwise
        @rtype:  bool
        
        """
        for inp in rt.input_views.values():
            if inp.linked_to is None:
                continue
            else:
                if self.members[inp.linked_to[0]].status.state == RuntimeStatus.Prepared:
                    continue
                else:
                    return False
                
        return True
    
    def _prepare_resource(self, res_rt, pipeline_res_rt):
        """
        Send a prepare request to the specified resource runtime.
        
        This method sends PREPARE request to the resource, using the res_rt to build
        the request. It then reads the response and populates with res_rt with runtime
        URIs returned in the response. The runtime URIs received are:
        - output view runtime URIs
        - input view runtime URIs which will be using POST http method for receiving
          data.
        
        The output and input view URIs are copied by this method to pipeline views, if
        this resource has assigned views to pipeline.
        
        The output view URIs are also copied by this method to downstream resource
        res_rts that are linked to this resource.
        
        @param res_rt: The resource runtime that needs to be sent a PREPARE request.
        @type res_rt:  L{resource_runtime.ServerResourceRuntime}
        
        @param pipeline_res_rt: The resource runtime object for the pipeline.
        @type pipeline_res_rt:  L{resource_runtime.ServerResourceRuntime}
        
        @return: None if the function succeeds, (Http code, err message) if there was an error.
        @rtype:  tuple
        
        """
        
        # Remove all input views that have GET method and no URI. Such a input view is unused
        # and basically has no links and no assignment.
        for inp_name in  res_rt.input_views.keys():
            if res_rt.input_views[inp_name].http_method == 'GET':
                if res_rt.input_views[inp_name].uri is None:
                    del res_rt.input_views[inp_name]
        
        # Send prepare request
        try:
            prep_req = snap_control.create_prepare_request(res_rt)
        except Exception, e:
            self.elog(e, "Pipeline manager '%s' unable to create PREPARE request for resource %s (%s)" %
                         (self.resource_name, res_rt.resource_uri, res_rt.resource_name))
            return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
        self.log(snap_log.LEVEL_DEBUG, "Pipeline manager '%s' sending to %s. Invoker '%s': PREPARE %s" % 
                                       (self.resource_name, res_rt.resource_name, pipeline_res_rt.invoker, prep_req))
        cred = credentials_store.get_credentials_by_uri(res_rt.resource_uri)
        
        # Only send server token, if the server is sending request to itself.
        if uri_checker.is_mine(res_rt.resource_uri):
            hdr = { headers.SERVER_TOKEN_HEADER : server.server_token }
            if pipeline_res_rt.invoker is not None:
                hdr[headers.INVOKER_HEADER] = pipeline_res_rt.invoker
        else:
            hdr = None
        try: 
            resp_dict = snapi_base.send_req("POST", res_rt.resource_uri, prep_req, hdr, cred)
        except Exception, e:
            self.elog(e, "Pipeline '%s' unable to send PREPARE request to resource %s (%s)" %
                          (self.resource_name, res_rt.resource_uri, res_rt.resource_name))
            return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
        
        self.log(snap_log.LEVEL_DEBUG, "Pipeline '%s' got response from %s: %s" % 
                                       (self.resource_name, res_rt.resource_name, resp_dict))    
        try:
            snap_control.parse_prepare_response(resp_dict, res_rt)
        except Exception, e:
            self.elog(e, "Pipeline manager '%s' unable to parse PREPARE response from resource %s (%s)" %
                          (self.resource_name, res_rt.resource_uri, res_rt.resource_name))
            return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
        # The response has been read in. Now, migrate the URIs obtained to downstream components and
        # to the pipeline's own views, if any.
        for out in res_rt.output_views.values():
            for (dest_res_name, dest_view_name) in out.linked_to:
                if self.members[dest_res_name].input_views[dest_view_name].uri is not None:
                    self.log(snap_log.LEVEL_ERR, "Pipeline %s (%s) found that the link from source %s (%s) "
                                                 "to destination %s (%s) has already got a URI %s" % 
                                                 (self.resource_uri, self.resource_name,
                                                  res_rt.resource_name, out.name, dest_res_name, dest_view_name))
                    return (500, {keys.RUNTIME_ID : self.rid,
                                  keys.ERROR_MESSAGE : RESDEF_ERR_STR  % (res_rt.resource_uri, res_rt.resource_name)})
                
                self.members[dest_res_name].input_views[dest_view_name].uri = out.uri
            if out.assignment is not None:
                # This output view of the resource in the pipeline has been assigned as the pipeline
                # output view. That means, the URI for this output view must be also assigned as the
                # URI for the pipeline output view.
                pipeline_res_rt.output_views[out.assignment].uri = out.uri
        
        for inp in res_rt.input_views.values():
            if inp.assignment is not None and inp.http_method == "POST":
                # This view is assigned as the pipeline view and the mode of communication is POST. This
                # means that the resource inside the pipeline needs to come up with a URI that can be
                # POSTed to, and that becomes the URI of the pipeline input view.
                pipeline_res_rt.input_views[inp.assignment].uri = inp.uri
                
        return None
        
    def start_pipeline(self, my_notification_uri):
        """
        Start the resources inside the pipeline.
        
        This method starts the resources in the same order that they were prepared.
        This is not done for any pressing need, but because it makes sense to have
        upstream resources start processing before the downstream ones. The runtime
        status URI of the pipeline is also sent with the START request. This is sent
        so those resources can send notification of their final status, when they
        finish executing.
        
        @param my_notification_uri: The notification URI being provided by this pipeline manager instance to
            all its child resources. Tthese child resources should POST to this URI when they finish running.
        @type my_notification_uri:  str

        @return: None if the function succeeds, (Http code, err message) if there was an error.
        @rtype:  tuple
        
        """
        
        start_list = []
        retval = None
        # It is preferrable to start resources in the order that they were prepared. Gives the upstream resource
        # more time to get going than a downstream resource.
        for res_rt in self.prep_list:
            self.log(snap_log.LEVEL_DEBUG, "Pipeline manager '%s' sending %s (%s) START document" %
                                               (self.resource_name, res_rt.resource_uri, res_rt.resource_name))
            retval = self._start_resource_runtime(res_rt, my_notification_uri)
            if retval is not None:
                self.stop_pipeline(start_list)
                break
            start_list.append(res_rt)
        
        return retval
    
    def _start_resource_runtime(self, res_rt, my_notification_uri):
        """
        Send START request to a specific resource runtime in the pipeline.
        
        @param res_rt:  Resource runtime object of the resource to be started.
        @param res_rt:  L{resource_runtime.ServerResourceRuntime}
        
        @param my_notification_uri: The notification URI being provided by this pipeline manager instance to
            all its child resources. Tthese child resources should POST to this URI when they finish running.
        @type my_notification_uri:  str
        
        @return: None if the function succeeds, (Http code, err message) if there was an error.
        @rtype:  tuple
        
        """
        try:
            start_dict = snap_control.create_start_request(my_notification_uri)
        except Exception, e:
            self.elog(e, "Pipeline manager '%s' unable to create START document for resource %s (%s)" %
                              (self.resource_name, res_rt.resource_uri, res_rt.resource_name))
            return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
        try:
            resp_dict = snapi_base.send_req("PUT", res_rt.runtime_control_uri, start_dict)
        except Exception, e:
            self.elog(e, "Pipeline manager '%s' unable to send START document for resource %s (%s)" %
                              (self.resource_name, res_rt.resource_uri, res_rt.resource_name))
            return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
            
        try:
            snap_control.parse_start_stop_response(resp_dict, res_rt.status)
        except Exception, e:
            self.elog(e, "Pipeline manager '%s' unable to parse START response document from resource %s (%s)" %
                              (self.resource_name, res_rt.resource_uri, res_rt.resource_name))
            return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
        
        return None
        
    def stop_pipeline(self, stop_list = None):
        """
        Stop either all resources in the pipeline or specified resources in the pipeline.
        
        @param stop_list: List of resource runtimes that must be stopped. This is
            optional. If not specified, then all resources in pipeline are stopped.
        @type stop_list:  List of L{resource_runtime.ServerResourceRuntime}
        
        @return: None if the function succeeds, (Http code, err message) if there was an error.
        @rtype:  tuple
        
        """
        if stop_list is None:
            stop_list = self.members.values()
        
        retval = None
        for res_rt in stop_list:
            self.log(snap_log.LEVEL_DEBUG, "Pipeline manager \"%s\" sending resource %s (%s) stop request" %
                                           (self.resource_name, res_rt.resource_uri, res_rt.resource_name))
            r = self._stop_resource_runtime(res_rt)
            if r is not None and retval is None:
                # The first issue we ran into.
                retval = r
                
        return retval
    
    def _stop_resource_runtime(self, res_rt):
        """
        Send a STOP request to the resource runtime specified.
        
        @param res_rt:  Resource runtime object of the resource to be stopped.
        @param res_rt:  L{resource_runtime.ServerResourceRuntime}
        
        @return: None if the function succeeds, (Http code, err message) if there was an error.
        @rtype:  tuple
        
        """
        
        try:
            stop_dict = snap_control.create_stop_request()
        except Exception, e:
            self.elog(e, "Pipeline manager '%s' unable to create STOP document for resource %s (%s)" %
                         (self.resource_name, res_rt.resource_uri, res_rt.resource_name))
            return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
        try:
            resp_dict = snapi_base.send_req("PUT", res_rt.runtime_control_uri, stop_dict)
        except Exception, e:
            self.elog(e, "Pipeline manager '%s' unable to send STOP document to resource %s (%s)" %
                         (self.resource_name, res_rt.resource_uri, res_rt.resource_name))
            return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
        try:
            snap_control.parse_start_stop_response(resp_dict, res_rt.status)
        except Exception, e:
            self.elog(e, "Pipeline manager '%s' unable to parse STOP response from resource %s (%s): '%s'" %
                         (self.resource_name, res_rt.resource_uri, res_rt.resource_name, resp_dict))
            return (500, {keys.RUNTIME_ID : self.rid, keys.ERROR_MESSAGE : EXEC_ERR_STR})
        
        return None
        
    def get_pipeline_status(self):
        """
        Make GET requests for resource status and update pipeline status with responses received.
        
        This method is called by the PM thread to peridically update the res_rt of each resource
        with the current status of the resource runtime they represent.
        
        """
        
        for res_rt in self.members.values():
            if res_rt.status.is_finished():
                # Don't send status request to a resource that has already finished executing (successfully or
                # unsuccessfully).
                self.log(snap_log.LEVEL_DEBUG,
                         "Pipeline manager '%s' skipping GET request to finished resource '%s' (%s)." %
                         (self.resource_name, res_rt.resource_uri, res_rt.resource_name))
                continue
            
            self.log(snap_log.LEVEL_DEBUG, "Pipeline manager '%s' sending GET status request to resource '%s' (%s)" %
                                           (self.resource_name, res_rt.resource_uri, res_rt.resource_name))
            try:
                self._get_resource_runtime_status(res_rt)
            except SnapException, e:
                self.elog(e, "Failed to get status from resource %s (%s)" % (res_rt.resource_uri, res_rt.resource_name))
        
        
    def _get_resource_runtime_status(self, res_rt):
        """
        Fetch status from a specified resource runtime.
        
        This method fetches the status from a specific resource runtime, by calling
        GET on the runtime status URI of the resource. The status obtained as a result
        of the GET is saved into the res_rt of the resource. 
        
        @param res_rt: The resource runtime object for the resource being queried. This
            object will be updated with the new status information.
        @rtype:  L{resource_runtime.ResourceRuntime}
        
        @raise SnapIOError: If the network connection failed.
        
        """

        uri = snap_http_lib.add_params_to_uri(res_rt.runtime_status_uri, {snap_params.LEVEL: snap_params.VALUE_DESCRIBE})
        
        try: 
            status_dict = snapi_base.send_req("GET", uri)
        except Exception, e:
            raise SnapIOError("Pipeline manager \"%s\" unable to send status GET request to resource %s (%s). %s" %
                              (self.resource_name, uri, res_rt.resource_name, str(e)))
        
        res_rt.status = snap_control.parse_status_response(status_dict, False)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.