# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id:exec_interface.py 1764 2008-03-21 04:27:56Z dhiraj $
"""
This module provides a set of functions that can be used by a Python based client to
start/stop and monitor a resource.
"""
import StringIO
import time
from snaplogic.common.snap_exceptions import *
from snaplogic.common import headers
from snaplogic.common import snap_http_lib,snap_params
from snaplogic.common import snap_control
import snaplogic.common.resource_runtime as resource_runtime
from snaplogic.common.runtime_status import RuntimeStatus
from snaplogic import rp
from snaplogic.snapi_base import keys
from snaplogic.snapi_base.exceptions import SnapiException,SnapiHttpException
from snaplogic import snapi_base
class WriteStream(object):
"""
This class provides the interface for writing to the input view of a resource.
If the view is in record mode, then write_record must be called, if it is in
binary mode, the write_binary() must be called. The attribute is_record will
be True if the view is in record mode and False if it is binary mode.
"""
def __init__(self, name, is_record, uri, content_type):
"""
Initialize the stream
@param name: Name of the input view.
@type name: str
@param is_record: True if view is record mode, False otherwise.
@param is_record: bool
@param uri: The URI of the input view.
@type uri: str
@param content_type: Content type being POSTed.
@type content_type: str
"""
self.uri = uri
"""The URI for the POST request."""
self.view_name = name
self.content_type = content_type
""" The content type of the POST."""
self.is_record = is_record
""" True if stream is in record mode. False, if it is in binary mode."""
self.buffer_size = 1024
"""This is the amount of bytes that will be buffered before data is written out in POST request."""
self._stream = StringIO.StringIO()
self.continued_count = 0
"""This number is incremented, every time data is POST-ed for this stream session."""
self.field_names = None
"""If this is a record mode view, then this attribute will hold the field names of the view."""
self.fields = None
"""
If this is a record mode view, then this attribute will hold the field definitions in this format -
((field_name, type, description), (field_name, type, description), ...
"""
self.dictionary_format_record = False
"""
If set to True, the records are represented as dictionaries with field name as key and field
value as dict value.
"""
if self.is_record:
# For record mode view, locate the readily available RPs.
if content_type is None:
# Default is ASN1 for record mode
content_type = rp.SNAP_ASN1_CONTENT_TYPE
requested_rp = rp.get_rp(content_type)
if requested_rp is None:
raise SnapiException("Content type %s cannot be handled by this client" % content_type)
self._writer = requested_rp.Writer(self._stream)
self._writer.initialize()
else:
if content_type is None:
# For binary mode, default is to take whatever server will provide.
raise SnapiException("For binary input view %s no content type has been specified" % self.view_name)
self._hdr = {}
self._hdr["Content-Type"] = content_type
self._hdr[headers.SNAPSTREAM_CONTINUED] = str(self.continued_count)
# Now make an initial connection.
self.flush()
def flush(self, end_stream = False):
"""
POST whatever data we have buffered since the last POST.
@param end_stream: This is set to True if no further data is going to be sent.
@type end_stream: bool
"""
if end_stream:
del self._hdr[headers.SNAPSTREAM_CONTINUED]
else:
self._hdr[headers.SNAPSTREAM_CONTINUED] = str(self.continued_count + 1)
response = snap_http_lib.sendreq("POST", self.uri, self._stream.getvalue(), self._hdr)
response.getResponse()
if response.getStatus() != 200:
response.close()
raise SnapiHttpException(response.getStatus(), response.getReason(), None,
"Got HTTP Error code %s for sending data to view %s" % (response.getStatus(),
self.view_name))
response.close()
self._stream.truncate(0)
def write_record(self, rec):
"""
Write a raw record to the stream.
@param data: A raw record to be written to stream. Raw record is bascially
a sequence of field values.
@type raw_rec: list or tuple.
"""
if self.is_record:
if self.dictionary_format_record:
rec = [rec.get(fname) for fname in self.field_names]
self._writer.write(rec)
else:
raise SnapiException("This is a binary mode view")
if len(self._stream.getvalue()) > self.buffer_size:
self.flush()
def write_binary(self, data):
"""
Write a raw record to the stream.
@param data: A raw record to be written to stream. Raw record is bascially
a sequence of field values.
@type raw_rec: list or tuple.
"""
if self.is_record:
raise SnapiException("This is a record mode view")
else:
self._stream.write(data)
if len(self._stream.getvalue()) > self.buffer_size:
self.flush()
def close(self):
"""Close the stream."""
if self.is_record:
self._writer.end()
self.flush(True)
self._stream.close()
class ReadStream(object):
"""
This class provides a streaming interface to the
output view of a resource.
"""
def __init__(self, view_name, is_record, uri, accept_type):
"""
Initialize the stream
@param name: Name of the output view.
@type name: str
@param is_record: True if view is record mode, False otherwise.
@param is_record: bool
@param uri: The URI of the output view.
@type uri: str
@param accept_type: Content type being requested.
@type accept_type: str
"""
self.uri = uri
"""URI for the output view."""
self.view_name = view_name
"""Name of the output view being read."""
self.content_type = None
"""The content type negotiated on."""
self.accept_type = accept_type
"""Content type requested by the GET request."""
self.is_record = is_record
""" True if stream is in record mode. False, if it is in binary mode."""
self.field_names = None
"""If this is a record mode view, then this attribute will hold the field names of the view."""
self.fields = None
"""
If this is a record mode view, then this attribute will hold the field definitions in this format -
((field_name, type, description), (field_name, type, description), ...
"""
self.dictionary_format_record = False
"""
If set to True, the records are represented as dictionaries with field name as key and field
value as dict value.
"""
hdr = {}
if accept_type is None:
if is_record:
# Default is ASN1 for record mode
hdr["Accept"] = rp.SNAP_ASN1_CONTENT_TYPE
else:
# This is binary mode. Default is to take whatever server will provide.
hdr["Accept"] = "*/*"
else:
hdr["Accept"] = accept_type
self._resp = snap_http_lib.sendreq("GET", self.uri, None, hdr)
self._resp.getResponse()
if self._resp.getStatus() != 200:
self._resp.close()
raise SnapiHttpException(self._resp.getStatus(), self._resp.getReason(), None,
"Got HTTP error %s while connecting to output view %s URI %s" %
(self._resp.getStatus(), self.view_name, self.uri))
self.content_type = self._resp.getHeaders()['content-type']
if self.is_record:
requested_rp = rp.get_rp(self.content_type)
if requested_rp is None:
self._resp.close()
raise SnapValueError("Content type %s cannot be handled by this client" % self.content_type)
self._gen = requested_rp.Reader(self._resp)
def read_record(self):
"""
Read Snap records from record mode output view.
@return: A sequence of field values of the record that was just read. None, if no more records are
going to be sent in this output view.
@rtype: list
"""
if self.is_record:
try:
rec = self._gen.next()
except StopIteration:
return None
if self.dictionary_format_record:
# We need to convert the tuple into a dictionary format.
return dict(zip(self.field_names, rec))
else:
return rec
else:
raise SnapiException("This is a binary mode view")
def read_binary(self, size):
"""
Read binary data from binary mode output view.
@param size: Size to read in bytes.
@type size: int
@return: The binary data read. None, if no more data is available from the stream.
@rtype: str
"""
if self.is_record:
raise SnapiException("This is a record mode view")
ret = self._resp.read(size)
if ret == "":
return None
return ret
def close(self):
"""Closes the stream."""
self._resp.close()
class Handle(object):
"""
This class represents the handle returned when a resource is executed.
This handle can be used for monitoring and controlling the resource, as
it executes.
"""
def __init__(self, status_uri, rid, resource_name, resource_uri, creds=None, inp_dict=None, out_dict=None,
resdef=None, control_uri=None):
"""Initializes the object with URI of the executing resource."""
self.status_uri = status_uri
self.control_uri = control_uri
self.resource_uri = resource_uri
self.resource_name = resource_name
self.credentials = creds
self.rid = rid
if inp_dict is None:
inp_dict = {}
if out_dict is None:
out_dict = {}
self.inputs = inp_dict
self.outputs = out_dict
self.resdef = resdef
for inp_name in self.inputs:
if self.resdef.input_is_record_mode(inp_name):
self.inputs[inp_name].field_names = self.resdef.list_input_field_names(inp_name)
self.inputs[inp_name].fields = self.resdef.list_input_view_fields(inp_name)
for out_name in self.outputs:
if self.resdef.output_is_record_mode(out_name):
self.outputs[out_name].field_names = self.resdef.list_output_field_names(out_name)
self.outputs[out_name].fields = self.resdef.list_output_view_fields(out_name)
def get_current_status(self, detailed = False):
"""
Returns the status of the resource execution.
@param detailed: If set to True, then it returns the status of
the resources inside the pipeline
@type detailed: boolean
@return: Status of the session
@rtype: L{RuntimeStatus}
"""
return get_status(self.status_uri, detailed)
def stop(self):
"""Send STOP request to an executing resource."""
send_stop(self.control_uri)
def wait(self, polling_interval = 10, timeout = 0):
"""
Waits for the session represented by this handle to finish.
@param polling_interval: The number of seconds between each check for status
@type polling_interval: int
@param timeout: The amount of time to wait for the pipeline to complete. If 0, then block until
it completes.
@type timeout: int
@return: Status of the finished session or None, if timed out.
@rtype: L{RuntimeStatus}
"""
if timeout:
start_time = time.time()
while True:
ret = get_status(self.status_uri)
if ret.is_finished():
return ret
if timeout:
time_used = time.time() - start_time
if time_used > timeout:
# Exceeded timeout.
return None
elif polling_interval + time_used > timeout:
# Don't even bother sleeping, we will exceed timeout. Just return
return None
time.sleep(polling_interval)
def get_resource_logs(self, include_facilities=None, exclude_facilities=None):
"""
Fetch all the log messages related to this resource execution.
@return: List of strings.
@rtype: list
"""
return snapi_base.get_resource_logs(self.rid, self.resource_uri, include_facilities,
exclude_facilities, self.credentials)
def dictionary_format_record():
doc = "If set to True, the records are expected and provided in dictionary format (instead of tuple format)."
def fget(self):
return self._dict_format_record
def fset(self, value):
self._dict_format_record = value
for inp_name in self.inputs:
self.inputs[inp_name].dictionary_format_record = value
for out_name in self.outputs:
self.outputs[out_name].dictionary_format_record = value
return locals()
dictionary_format_record = property(**dictionary_format_record())
def exec_resource(resource_name, resource_uri, resdef, inputs = None, outputs = None, params = None, creds = None,
custom_header = None):
"""
Internally used function that executes a resource.
@param resource_name: User defined name for the resource. Could be any string the caller of this
method would like to use to identify the pipeline being invoked. This name does not have to
be unique.
@type resource_name: str
@param resource_uri: The URI of the resource to be executed.
@type resource_uri: str
@param resdef: The resource definition of the resource being executed (can be a shallow copy).
@type resdef: L{ResDef}
@param inputs: Dictionary with names of the input views as keys. The value can be requested content type or
can be None, if default content_type is fine.
@type inputs: dict
@param outputs: Dictionary with names of the output views as keys. The value can be requested content type or
can be None, if default content_type is fine.
@type outputs: dict
@param params: Dictionary with param name as key and param value as value.
@type params: dict
@param creds: A 2-tuple containing (username, password)
@type creds: tuple
@param custom_header: If not None, contains header entries that must be sent with the prepare request.
@type custom_header: dict
@return: Handle for the executing resource.
@rtype: L{Handle}
"""
if not inputs:
inputs = {}
if not outputs:
outputs = {}
if not params:
params = {}
out_stream_dict = {}
inp_stream_dict = {}
if resource_uri is None:
raise SnapiException("The resource definition needs a URI in order to be executed.")
res_rt = _send_prepare(resource_name, resource_uri, inputs.keys(), outputs.keys(), params, creds, custom_header)
for out in res_rt.output_views.values():
try:
out_stream_dict[out.name] = ReadStream(out.name, out.is_record, out.uri, outputs[out.name])
except Exception, e:
for s in out_stream_dict.values():
if s is not None:
s.close()
raise
for inp in res_rt.input_views.values():
try:
inp_stream_dict[inp.name] = WriteStream(inp.name, inp.is_record, inp.uri, inputs[inp.name])
except Exception, e:
for s in out_stream_dict.values():
s.close()
for s in inp_stream_dict.values():
if s is not None:
s.close()
raise
try:
_send_start(res_rt)
except:
for s in inp_stream_dict.values():
s.close()
for s in out_stream_dict.values():
s.close()
raise
return Handle(res_rt.runtime_status_uri, res_rt.rid, resource_name, resource_uri,
creds, inp_stream_dict, out_stream_dict, resdef, res_rt.runtime_control_uri)
def _send_prepare(resource_name, resource_uri, input_names = None, output_names = None, params = None, cred = None,
custom_header = None):
"""
Send PREPARE request to resource.
@param resource_name: Caller defined named for the resource. Does not have to be unique.
@type resource_name: str
@param input_names: Names of the input views, that the caller wishes to POST input to.
@type input_names: list
@param output_names: Names of the output views, that the caller wishes to GET output from.
@type output_names: list
@param params: Dictionary with param name as key and param value as value.
@type params: dict
@param cred: A 2-tuple containing (username, password)
@type cred: tuple
@param custom_header: If not None, contains header entries that must be sent with the prepare request.
@type custom_header: dict
@return: Runtime status URI of the prepared resource.
@rtype: L{resource_runtime.ResourceRuntime}
"""
res_rt = resource_runtime.ResourceRuntime(resource_uri)
res_rt.resource_name = resource_name
if input_names is not None:
for name in input_names:
rt_inp = resource_runtime.RuntimeInputView(name)
rt_inp.http_method = "POST"
rt_inp.field_links = resource_runtime.EXACT_MATCH
res_rt.input_views[name] = rt_inp
if output_names is not None:
for name in output_names:
rt_out = resource_runtime.RuntimeOutputView(name)
res_rt.output_views[name] = rt_out
res_rt.parameters = params
# Send prepare request
try:
prep_req = snap_control.create_prepare_request(res_rt)
except Exception, e:
raise SnapiException("Unable to send PREPARE request to resource %s. %s" %
(res_rt.resource_uri, str(e)))
resp_dict = snapi_base.send_req("POST", res_rt.resource_uri, prep_req, custom_header, cred)
snap_control.parse_prepare_response(resp_dict, res_rt)
return res_rt
def _send_start(res_rt):
"""
Send START request to a prepared resource.
@param res_rt: The resource runtime object for the prepared resource
@type res_rt: L{ResourceRuntime}
@raise SnapiException: If the request failed.
"""
try:
start_dict = snap_control.create_start_request()
resp_dict = snapi_base.send_req("PUT", res_rt.runtime_control_uri, start_dict)
snap_control.parse_start_stop_response(resp_dict, res_rt.status)
except SnapiException, e:
raise
except Exception, e:
raise SnapiException("Unable to send START document. %s " % str(e))
def send_stop(runtime_control_uri):
"""
Send STOP request to an executing resource.
@param runtime_control_uri: The control URI of the executing resource
@type runtime_control_uri: str
@raise SnapiException: If the request failed.
"""
status_obj = RuntimeStatus()
stop_dict = snap_control.create_stop_request()
try:
resp_dict = snapi_base.send_req("PUT", runtime_control_uri, stop_dict)
snap_control.parse_start_stop_response(resp_dict, status_obj)
except SnapiException, e:
raise
except Exception, e:
raise SnapiException("Unable to send STOP document. %s " % str(e))
def get_status(runtime_status_uri, is_detailed=False):
"""
Fetch status from a specified resource runtime.
@param runtime_status_uri: The status URI of the executing resource
@type runtime_status_uri: str
@param is_detailed: Request detailed status information from the executing resource. If the
resource is a pipeline, then setting this param to True will provide status information
of the resources inside the pipeline too.
@type is_detailed: bool
@return: The status object parsed from the GET response.
@rtype: L{RuntimeStatus}
@raise SnapiException: If the request failed.
"""
if is_detailed:
uri = snap_http_lib.add_params_to_uri(runtime_status_uri, {snap_params.LEVEL : snap_params.VALUE_DETAIL})
else:
uri = snap_http_lib.add_params_to_uri(runtime_status_uri, {snap_params.LEVEL : snap_params.VALUE_DESCRIBE})
status_dict = snapi_base.send_req("GET", uri)
return snap_control.parse_status_response(status_dict, is_detailed)
|