# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008 - 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: resource_runtime.py 10183 2009-12-16 23:58:55Z grisha $
import threading
import sys
import random
import time
import re
import urlparse
import urllib
from snaplogic.common import snap_log,snap_params,snap_http_lib
from snaplogic.common.snap_exceptions import *
from snaplogic.common.runtime_status import RuntimeStatus
from snaplogic.server.http_request import HttpRequest
EXACT_MATCH = "exact_match"
def synch_with_resdef(res_rt, resdef):
"""
Synchronize the resource runtime received from PREPARE req. with ResDef for that resource.
The initial ResourceRuntime object is created based on the information provided in the
PREPARE request. This method will now fill in some missing information from the ResDef
of the resource. This method is also an important runtime validation because, Pipelines have
description of a resource that existed when the resource was added to the pipeline. If that
description has since changed in a way that makes it incompatible, then this method should
catch it.
@return: None if the function succeeds, (Http code, err message) if there was an error.
@rtype: 2-tuple
"""
# TODO: XXX
# 1) validate the mode specified for input and output views.
# 2) If the output talks about pass through, confirm that pass through setting
# matches it.
# 3) Should we validate destination fields in field links here? I think...yes!!!
# 4) Validate that field_links is None if resdef says the view is binary mode.
# 5) validate params here.
output_names = resdef.list_output_view_names()
input_names = resdef.list_input_view_names()
for inp_name in res_rt.input_views:
if inp_name not in input_names:
res_rt.log(snap_log.LEVEL_ERR, "Resource '%s' does not have input view '%s' specified in PREPARE document" %
(res_rt.resource_uri, inp_name))
return (HttpRequest.BAD_REQUEST, "Input view '%s' not found in the resource '%s'" %
(inp_name, res_rt.resource_uri))
res_rt.input_views[inp_name].is_pass_through = resdef.input_is_pass_through(inp_name)
res_rt.input_views[inp_name].is_record = resdef.input_is_record_mode(inp_name)
# We don't mind if there are fewer output views specified in ResourceSession than
# what is defined in the ResDef, because a pipeline need not use all the output views
# of a member resource.
for out_name in res_rt.output_views:
if out_name not in output_names:
res_rt.log(snap_log.LEVEL_ERR, "Resource '%s' does not have output view '%s' specified in PREPARE document"
% (res_rt.resource_uri, out_name))
return (HttpRequest.BAD_REQUEST, "Output view '%s' not found in the resource '%s'" %
(out_name, res_rt.resource_uri))
res_rt.output_views[out_name].is_record = resdef.output_is_record_mode(out_name)
for param_name in resdef.list_param_names():
if resdef.get_param_default_value(param_name) is None:
# This is a required parameter. Lets see if the runtime information posted has the value
# for this param. If not, its an error.
if res_rt.parameters.get(param_name) is None:
res_rt.log(snap_log.LEVEL_ERR, "Resource %s (%s) does not have value for required parameter %s" %
(res_rt.resource_uri, res_rt.resource_name, param_name))
return (HttpRequest.BAD_REQUEST, "Request did not specify required parameter '%s' for resource '%s'" %
(param_name, res_rt.resource_uri))
# Make sure no unexpected params have been passed.
expected_params = resdef.list_param_names() + [snap_params.TRACE_DATA]
for p in res_rt.parameters:
if p not in expected_params:
return (HttpRequest.BAD_REQUEST, "Parameter '%s' not found in the resource '%s'" % (p, res_rt.resource_uri))
return None
def to_communication_uri(resource_url, host_url):
"""
Converts resource URL to a URL format that can be used for HTTP connection.
@param resource_url: URL of the resource.
@type resource_url: string
@param host_url: URL of the server host (e.g. http://example.com:8088/)
@type host_url: string
@return: The URL that is suitable for HTTP communication/requests
@rtype: string
@raise SnapFormatError: If input URL has invalid format.
"""
# The assumption is that a resource URL can have just scheme, host and path
# information and no other URL values (like parameters, query string, fragments etc).
# Since the path can have special characters like space, #, ? , = etc. which have
# not been URL encoded, we are going to pay attention to only the scheme and
# location information in the urlparse() below.
(scheme, loc, path) = snap_http_lib.parseHostAndScheme(resource_url)
if scheme is None:
# Not an absolute URL, a relative one.
if not len(resource_url):
raise SnapFormatError("Resource URL has zero length" % resource_url)
new_url = host_url + urllib.quote_plus(resource_url, "/")
else:
# The URL has scheme and location information.
path = urllib.quote_plus(path, "/")
new_url = urlparse.urlunparse((scheme, loc, path, "", "", ""))
return new_url
def is_top_pipeline(rid):
"""Returns True if the runtime id is that of a top level pipeline."""
# A top level runtime id will have no period.
if rid and rid.count(".") == 0:
return True
else:
return False
class RuntimeInputView(object):
"""
This class represents a runtime resource's input view.
"""
def __init__(self, view_name):
"""Initializer method. Sets the name of the view."""
self.name = view_name
"""Name of this input view."""
self.http_method = "GET"
"""
The HTTP method used to feed the input view.
An input view can establish a stream by either issuing a GET request or
providing a URI which can accept a POST request. The PM will tell a
runtime resource at PREPARE time, which approach to take. POST is
currently made only by alien clients. Its all GET request based inside
the pipeline.
"""
self.uri = None
"""
The URI the runtime resource must call GET on, or,
THe URI that the runtime resource provides for the remote end to call POST on.
"""
self.is_pass_through = False
"""If True, supports pass-through. Applies only to record mode views."""
self.field_links = None
"""
The field linking information that the runtime input view needs, in order to
parse the incoming stream of data and populating the input records with that
information. Remember that the stream of data will be organized as the
upstream output view defines it. The runtime input view needs to use this
field link information to reorganize it into the view structure defined
by the input view.
This property is set to None, if the view is in binary mode.
TODO: XXX Describe the field_links tuple here
"""
self.is_record = False
"""If the view is binary, then this is set to False."""
self.linked_to = None
"""
A 2-tuple. Name of resource and output view, that this input view is linked to (if any).
"""
self.assignment = None
"""
If this input view has been assigned as pipeline input view, then this attribute
holds the name of the pipeline input view.
"""
def __str__(self):
"""Provide a print friendly string format of the object."""
l = []
l.append("Input view name %s: http_method %s, URI %s, is_record %s, is_pass_through %s" %
(self.name, self.http_method, self.uri, self.is_record, self.is_pass_through))
return "".join(l)
class RuntimeOutputView(object):
"""
This class represents a runtime resource's output view.
"""
def __init__(self, view_name):
"""Initializer method which sets the output view name."""
self.name = view_name
"""Name of this output view."""
self.http_method = "GET"
"""
The HTTP method used to establish a stream with runtime output view.
The output view accepts only GET requests for establishing a stream.
"""
self.uri = None
"""
The URI that the runtime output view provides for downstream resources
to call GET on.
"""
self.pass_through_fields = None
"""
This holds information on the fields that are passed through to it by
input views. This is only valid for record mode views.
TODO: XXX Define this property value in more detail.
"""
self.is_record = True
"""If the view is binary, then this is set to Fa;se."""
self.linked_to = []
"""
List of 2-tuples. Each 2-tuple a name of resource and input view, that this output view is linked to.
"""
self.assignment = None
"""
If this output view has been assigned as pipeline output view, then this attribute
holds the name of the pipeline output view.
"""
def __str__(self):
"""Provide a print friendly string format of the object."""
l = []
l.append("Output view name %s: http_method %s, URI %s, is_record %s, pass_through_fields %s" %
(self.name, self.http_method, self.uri, self.is_record, self.pass_through_fields))
return "".join(l)
class ResourceRuntime(object):
"""
This class is used to represent a runtime resource.
"""
def __init__(self, resource_uri):
"""
Initialize a new runtime resource object.
@param resource_uri: URI for the resource
@type resource_uri: string
"""
self.status = RuntimeStatus()
"""Contains statistics and state information of the resource runtime."""
self.resource_uri = resource_uri
"""The URI of the resource definition on which this runtime resource is based."""
self.resource_name = None
"""The resource instance name that was given to this resource when it was added to a pipeline (if any)."""
self.context_name = None
"""
The resource instance names of pipelines that envelope this resource instance specified in a "." separated
manner. For example, if this resource is inside pipeline with instance name "FilePipe" and that is inside
pipeline "SortedDataSource" and that is inside the pipeline "StockInfo", then the context name would be
"StockInfo.SortedDataSource.FilePipe". This information is very useful in providing debug friendly
information in log messages and generating data trace output.
"""
self.runtime_status_uri = None
"""The URI of the runtime resource, which can be used for getting status."""
self.runtime_control_uri = None
"""The URI of the runtime resource, which can be used for starting and stopping the resource."""
self.my_notification_uri = None
"""
The URI provided by this runtime resource to receive notifications from its child resources. The
URI is only used when the resource is a pipeline.
"""
self.rid = None
"""The runtime identifier for this resource. A dot separated alpha numerical value."""
self.pipeline_rid = None
""" The rid of the pipeline (if any) inside which the resource runtime was created."""
self.input_views = {}
"""Dictionary keyed by input view name and the value is L{RuntimeInputView} object."""
self.output_views = {}
"""Dictionary keyed by output view name and the value is L{RuntimeOutputView} object."""
self.parameters = {}
"""The runtime parameters provided for the execution of this resource."""
self.invoker = None
"""The value specified in the 'invoker' HTTP header of the PREPARE POST that created this rt object."""
def __str__(self):
"""Provide a print friendly string format of the object."""
l = []
l.append("rid %s: pipeline_rid %s, resource_name %s, resource_uri %s" %
(self.rid, self.pipeline_rid, self.resource_name, self.resource_uri))
for inp_name in self.input_views:
l.append(str(self.input_views[inp_name]))
for out_name in self.output_views:
l.append(str(self.output_views[out_name]))
return "\n".join(l)
class ServerResourceRuntime(ResourceRuntime):
def __init__(self, resource_uri):
super(ServerResourceRuntime, self).__init__(resource_uri)
#
# The following attributes are needed only when this object is instantiated on server (CC or main
# process) side. These need not be set in client code
#
self.log = None
"""The log file for resource runtime specific logging."""
self.resource_object = None
"""This is the instantiated object for the component or PipelineManager."""
self.lock = threading.Lock()
"""This lock may be used to lock any update related to the resource runtime."""
self.exit_time = None
"""
If this value is not None, then it records the time at which the thread running inside
the component or PM has exited. This is useful for getting rid of resource runtime
entries after a while.
"""
self.resource_ref = {}
"""
Dictionary of references. Each entry in this dictionary is itself a dictionary which can
contain two keys.
keys.URI - Points to the resdef that must be fetched.
keys.PARAMS - This one is optional. It is present only if some params must be substituted
in the resdef that is fetched with the above URI.
"""
|