ExecResource.py :  » Development » SnapLogic » snaplogic » components » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » components » ExecResource.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

#    $Id: ExecResource.py 10330 2009-12-24 22:13:38Z grisha $

"""
ExecResource component.

This component executes a specified SnapLogic resource. The component is
designed to execute a target resource as many times as there are records in
the input view 'ExecRequests'. The records in the input view specify
a user defined id for the executing the target resource and also the params
needed for the execution. For each param that is needed to execute the target
resource, there should be a corresponding input field in 'ExecRequests'.
The name of the field derived by prefixing 'param_' to the param name.
For example, if the  resource had a parameter named 'port_number', then it
would be represented by the input field name 'param_port_number' in this view.


The ExecResource can be configured via the property 'WaitTime' to 

1) Wait until each resource completes execution before the next one is executed.
2) Wait for a certain timeout and then move to the next execution.
3) Move to the next execution, without waiting.


The output view 'ExecStatus' provides the status of each execution.

LIMITATION:
ExecResource can only execute a target resource if the target resource's input
and output views do not require to be connected.

"""

from snaplogic.common.snap_exceptions import SnapObjTypeError

from decimal import Decimal
from datetime import datetime
import time
from urlparse import urlparse

from snaplogic.common import snap_log
from snaplogic.common.runtime_status import RuntimeStatus
from snaplogic.common.data_types import SnapString,SnapNumber,SnapDateTime
from snaplogic.common import version_info
from snaplogic.common.snap_exceptions import SnapComponentError

from snaplogic.snapi_base import keys
from snaplogic.snapi_base.exceptions import SnapiException,SnapiHttpException

from snaplogic.server.http_request import HttpRequest

import snaplogic.components as components
from snaplogic.cc import component_api
from snaplogic.cc.component_api import ComponentAPI,has_param
from snaplogic.cc import prop


INPUT_VIEW = "ExecRequests"
OUTPUT_VIEW = "ExecStatus"
ID_FIELD = 'id'
 # Predefined a skeletal input view.
INPUT_FIELDS = ((ID_FIELD, SnapString, "User defined id for the executing resource. Must be either NULL or unique "
                                       "across all input records."),)
INPUT_VIEW_DESC = "Each input record represents a request to execute the resource. Field names "\
                  "that represent parameters have the prefix 'param_'. For example, if the "\
                  "target resource had a parameter named 'port_number', then it would be represented by "\
                  "the field name 'param_port_number' in this view."
RESOURCE_URI_PROP = "ResourceURI"
WAIT_TIME_PROP = "WaitTime"
POLL_PROP = "PollingInterval"
SNAPLOGIC_USER_PROP = "SnapLogicUser"
SNAPLOGIC_PASSWD_PROP = "SnapLogicPassword"

PARAM_PREFIX = "param_"

class ExecResource(ComponentAPI):
    """
    ExecResource.
    
    """
    
    api_version = '1.0'
    component_version = '1.1'
    
    capabilities = {
        ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT    : 1,
        ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT    : 1,
        ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT   : 1,
        ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT   : 1,
        ComponentAPI.CAPABILITY_ALLOW_PASS_THROUGH        : True
    }
    component_description = "Execute Resource"
    
    component_label = "Execute Resource"
    
    component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/ExecResource" % \
                                                        version_info.doc_uri_version

    def create_resource_template(self):
        """Create ExecResource resource template."""
        
        self.set_property_def(RESOURCE_URI_PROP,
                              prop.SimpleProp("Resource URI", 
                                              SnapString, 
                                              "The URI of the Resource which should be executed",
                                              None,
                                              True))
        
        self.set_property_def(SNAPLOGIC_USER_PROP,
                              prop.SimpleProp("SnapLogic Username", 
                                              SnapString, 
                                            "The user id (if any) to use while executing the SnapLogic Resource."))
        
        self.set_property_def(SNAPLOGIC_PASSWD_PROP,
                              prop.SimpleProp("SnapLogic Password", 
                                              SnapString, 
                                              "The SnapLogic Password. This is only needed if a username is specified.", 
                                              {keys.CONSTRAINT_OBFUSCATE : 0}))  

        self.set_property_def(WAIT_TIME_PROP,
                              prop.SimpleProp("Wait Time",
                                              SnapNumber,
                                              "Maximum amount of time (in seconds) to wait for a resource execution to "
                                              "complete. If no value is specified, then the component will not wait "
                                              "at all. If 0 is specified, then the component will wait as long as it "
                                              "takes for each execution to complete."))
        self.set_property_value(WAIT_TIME_PROP, 0)
        
        self.set_property_def(POLL_PROP,
                              prop.SimpleProp("Polling Interval",
                                              SnapNumber,
                                              "Polling interval (in seconds) in which the component checks for "
                                              "completion of resource execution. This value is needed only if a "
                                              "'wait time' is specified."))
        self.set_property_value(POLL_PROP, 10)
        
        self.add_record_input_view_def(INPUT_VIEW, INPUT_FIELDS, INPUT_VIEW_DESC)
        
        # Predefine the output view. Unlike the input view, this view is not modifiable.
        fields = ((ID_FIELD, SnapString, "User defined id for the executing resource"),
                  ("rid",    SnapString, "Runtime id for the resource that was executed"),
                  ("state",  SnapString, "State of the resource executed."),
                  ("status_uri",  SnapString, "Status URI (if any) of the resource executed."),
                  ("message",  SnapString, "Error message (if any) for a a failed execution of the resource."))
        # This view is uneditable for the client.
        self.add_record_output_view_def(OUTPUT_VIEW, fields, "Status of the execute request.", False)
        self.set_output_view_pass_through(OUTPUT_VIEW, [INPUT_VIEW,])

    def suggest_resource_values(self, err_obj):
        """
        Suggests fields for the input view.
        
        In particular, it sets the params of the resource being executed as input fields names in
        the input view. 
        
        @param err_obj: The error object used to report any errors found.
        @type err_obj:  error object
        
        """
        self.process_creds()
        
        uri = self.get_property_value(RESOURCE_URI_PROP)
        uri = uri.strip()
        if has_param(uri):
            err_obj.get_property_err(RESOURCE_URI_PROP).set_message(
                                "No value has been specified for parameter '%s' in URI property" % uri)
            return 
        
        resdef = self.uri_validation(err_obj, uri)
        if resdef is None:
            return
        
        fields = list(INPUT_FIELDS)
        pnames = resdef.list_param_names()
        for p in pnames:
            new_name = PARAM_PREFIX + p
            t = (new_name, SnapString, "Value for parameter %s" % new_name)
            fields.append(t)
        
        self.add_record_input_view_def(INPUT_VIEW, fields, INPUT_VIEW_DESC)
        
    def validate(self, err_obj):
        """Validate the fields of the input view and the property values."""
        
        self.process_creds()
        uri = self.get_property_value(RESOURCE_URI_PROP)
        uri = uri.strip()
        resdef = self.uri_validation(err_obj, uri)
        if resdef is None:
            return
        
        input_views = self.list_input_view_names()
        param_names = resdef.list_param_names()
        
        if INPUT_VIEW not in input_views:
            err_obj.get_input_view_err().set_message("Expected input view '%s' not found." % INPUT_VIEW)
        else:
            found_id = False
            input_view = self.get_input_view_def(INPUT_VIEW)[keys.VIEW_FIELDS]
            input_view_field_names = [ d[keys.FIELD_NAME] for d in input_view ]
            view_err = err_obj.get_input_view_err()[INPUT_VIEW][keys.VIEW_FIELDS]
            for (i, field_name) in enumerate(input_view_field_names):
                if field_name == ID_FIELD:
                    found_id = True
                    if input_view[i][keys.FIELD_TYPE] != SnapString:
                        view_err[i][keys.FIELD_TYPE].set_message("Field '%s' in view '%s' must be of type '%s'." %
                                                                 (field_name, INPUT_VIEW, SnapString))
                elif field_name.startswith(PARAM_PREFIX):
                    n = field_name[len(PARAM_PREFIX):]
                    if n not in param_names:
                        view_err[i][keys.FIELD_NAME].set_message(
                            "Field '%s' in view '%s' does not represent any parameter of resource '%s'. "
                            "The possible values are: %s." %
                            (field_name, INPUT_VIEW, uri, ", ".join([PARAM_PREFIX + name for name in param_names])))
                    elif input_view[i][keys.FIELD_TYPE] != SnapString:
                        view_err[i][keys.FIELD_TYPE].set_message("Field '%s' in view '%s' must be of type '%s'." %
                                                                 (field_name, INPUT_VIEW, SnapString))
                else:
                    view_err[i][keys.FIELD_NAME].set_message("Unexpected field '%s' in view '%s'." %
                                                             (field_name, INPUT_VIEW))
            
            if not found_id:
                view_err.set_message("Input view '%s' must have field '%s'." %(INPUT_VIEW, ID_FIELD))
        
        wait_time = self.get_property_value(WAIT_TIME_PROP)
        if (not has_param(wait_time)) and (wait_time is not None):
            poll_interval = self.get_property_value(POLL_PROP)
            if not has_param(poll_interval) and poll_interval is None:
                err_obj.get_property_err(POLL_PROP).set_message(
                        "A value must be specified for 'Polling Interval', when a value is specified for 'Wait Time'.")

    def uri_validation(self, err_obj, uri):
        """
        Validate the resource URI specified.
        
        This method validates that the URI specified looks valid and points to an actual resource definition.
        This method also fetches the resource (to validate access right) the returns the resdef pointed to.
        
        @param err_obj: The error object used for reporting validation errors.
        @type err_obj:  error object.
        
        @param uri: The URI pointing to the target resource.
        @type uri:  str
        
        @return: Resource definition of the target resource.
        @rtype:  L{ResDef}
        
        """
        if uri is None:
            err_obj.get_property_err(RESOURCE_URI_PROP).set_message("URI has not been specified.")
            return None
        if has_param(uri):
            return None
        
        v = urlparse(uri)
        if v.scheme and (v.scheme not in ('http', 'https')):
             err_obj.get_property_err(RESOURCE_URI_PROP).set_message("Resource URI '%s' has invalid scheme '%s'." % 
                                                                     (uri, v.scheme))
             return None
        
        if not v[2].startswith('/'):
            err_obj.get_property_err(RESOURCE_URI_PROP).set_message("Resource URI '%s' is not a valid URI." % uri)
            return None
            
        try:
            resdef = self.get_resource_object(uri, self.creds)
        except SnapiHttpException, e:
            if e.status == HttpRequest.NOT_FOUND:
                 err_obj.get_property_err(RESOURCE_URI_PROP).set_message("Resource '%s' does not exist."  % uri)
                 return None
            if e.status == HttpRequest.UNAUTHORIZED:
                err_obj.get_property_err(RESOURCE_URI_PROP).set_message("Resource '%s' access denied."  % uri)
                return None
            
            err_obj.get_property_err(RESOURCE_URI_PROP).set_message("Failed to get resource '%s' - %s." %
                                                                    (uri, unicode(e)))
            return None
        except Exception, e:
            err_obj.get_property_err(RESOURCE_URI_PROP).set_message("Failed to get resource '%s' - %s." %
                                                                    (uri, unicode(e)))
            return None
            
        return resdef
    
    def process_creds(self):
        """Read a cred information (if any) from the properties and set it as object attribute."""
        self.creds = None
        username = self.get_property_value(SNAPLOGIC_USER_PROP)
        if username is not None:
            password = self.get_property_value(SNAPLOGIC_PASSWD_PROP)
            self.creds = (username, password)
             
    def execute(self, input_views, output_views):
        """Execute the ExecResource component."""
        
        try:
            input_view = input_views[INPUT_VIEW]
        except KeyError:
            raise SnapComponentError("Input view '%s' must be connected." % INPUT_VIEW)
        try: 
            self.output_view = output_views[OUTPUT_VIEW]
        except KeyError:
            self.output_view = None
        
        uri = self.get_property_value(RESOURCE_URI_PROP)
        self.uri = uri.strip()
        self.wait_time = self.get_property_value(WAIT_TIME_PROP)
        if self.wait_time is not None:
            self.poll_interval = self.get_property_value(POLL_PROP)
            if self.poll_interval is None:
                raise SnapComponentError(
                        "A value must be specified for 'Polling Interval', when a value is specified for 'Wait Time'.")
        
        self.creds = None
        username = self.get_property_value(SNAPLOGIC_USER_PROP)
        if username is not None:
            password = self.get_property_value(SNAPLOGIC_PASSWD_PROP)
            self.creds = (username, password)
        
        input_view_def = self.get_input_view_def(INPUT_VIEW)
        input_view_field_names = [ d[keys.FIELD_NAME] for d in input_view_def[keys.VIEW_FIELDS] ]
        self.param_name_map = {}
        for fname in input_view_field_names:
            if fname.startswith(PARAM_PREFIX):
                p = fname[len(PARAM_PREFIX):]
                self.param_name_map[fname] = p
            
        rec = input_view.read_record()
        while rec is not None:
            self.process_resource_exec(rec)
            rec = input_view.read_record()
            
        if self.output_view is not None:
            self.output_view.completed()
       
    def process_resource_exec(self, rec):
        """
        Process each input record and execute the resource based on that information.
        
        @param rec: Input record
        @type rec:  L{Record}
        
        """
        id = rec[ID_FIELD]
        params = {}
        for (fname, pname) in self.param_name_map.items():
            params[pname] = rec[fname]
        
        rid = None
        state = None
        mesg = None
        status_uri = None
        try:
            h = self.exec_resource(self.uri, None, None, params, self.creds, id)
        except SnapiHttpException, e:
            if e.body is not None and hasattr(e.body, "get"):
                rid   = e.body.get(keys.RUNTIME_ID)
                mesg  = e.body.get(keys.ERROR_MESSAGE)
                state = RuntimeStatus.Failed
            else:
                mesg  = unicode(e)
                state = RuntimeStatus.Failed
        except Exception, e:
            mesg = unicode(e)
            state = RuntimeStatus.Failed
        else:
            rid = h.rid
            status_uri = h.status_uri
            if self.wait_time is not None:
                s = h.wait(polling_interval = self.poll_interval, timeout = self.wait_time)
            else:
                s = h.get_current_status()
            if s is not None:
                state = s.state
            else:
                # We could not get any state information.
                state = RuntimeStatus.NoUpdate
        
        if self.output_view is not None:
            out_rec = self.output_view.create_record()
            out_rec.transfer_pass_through_fields(rec)
            out_rec["rid"] = rid
            out_rec["state"] = unicode(state)
            out_rec["status_uri"] = status_uri
            out_rec["message"] = mesg
            self.output_view.write_record(out_rec)
    
    def upgrade_1_0_to_1_1(self):
        """
        No-op upgrade only to change component doc URI during the upgrade
        which will be by cc_info before calling this method.
        
        """
        pass
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.