# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: runtime_status.py 2035 2008-03-28 00:24:29Z dhiraj $
"""
SnapLogic runtime resource statistics module
"""
import time
from snaplogic.common.snap_exceptions import *
from snaplogic.snapi_base import keys
class RuntimeStatus(dict):
"""
A class representing statistics and state of a runtime resource.
"""
# Possible state values for a component
NoUpdate = 'NoUpdate'
Prepared = 'Prepared'
Started = 'Started'
Completed = 'Completed'
Failed = 'Failed'
Stopped = 'Stopped'
Stopping = 'Stopping'
def __init__(self):
"""RuntimeStatus Constructor."""
# Current status of the component
self['state'] = self.NoUpdate
# The state change time as a numeric value expressed in seconds since the epoch, in UTC.
self['state_ts'] = long(time.time())
# The timestamp for when the resource was created (expressed in seconds since the epoch, in UTC).
self['create_ts'] = self['state_ts']
# Statistics for this resource runtime
self['statistics'] = {keys.INPUT_VIEWS:{}, keys.OUTPUT_VIEWS:{}}
# Time stamp of the last time the stats were updated.
self['statistics_ts'] = int(time.time())
self['resources'] = {}
def __str__(self):
"""String representation of object."""
l = []
l.append(' Status: %s TimeStamp: %s Create TimeStamp %s\n' %
(self['state'], time.ctime(self['state_ts']), time.ctime(self['create_ts'])))
l.append(" Input Views: \n")
for (k, v) in self['statistics']["input_views"].iteritems():
l.append(" %s %s\n" % (k, v))
l.append(" Output Views: \n")
for (k, v) in self['statistics']["output_views"].iteritems():
l.append(" %s %s\n" % (k, v))
if len(self['resources']) > 0:
l.append("\n Resources:\n ==========\n")
for (k, v) in self['resources'].iteritems():
l.append(" %s\n%s\n" % (k, v))
return "".join(l)
def set_state(self, state):
"""Sets the state of the component."""
if not state in (self.NoUpdate, self.Prepared, self.Started, self.Completed,
self.Failed, self.Stopped, self.Stopping):
raise SnapValueError("%s is not a valid state value for component" % state)
if self['state'] != state:
self['state'] = state
self['state_ts'] = int(time.time())
def state():
doc = "State of this component."
def fget(self):
return self['state']
def fset(self, value):
self.set_state(value)
return locals()
state = property(**state())
def create_ts():
doc = "Create timestamp."
def fget(self):
return self['create_ts']
def fset(self, value):
self['create_ts'] = value
return locals()
create_ts = property(**create_ts())
def statistics():
doc = "Statistics."
def fget(self):
return self['statistics']
def fset(self, value):
self['statistics'] = value
return locals()
statistics = property(**statistics())
def get_state_with_ts(self):
"""Returns the state and the timestamp of that state change."""
return (self['state'], self['state_ts'])
def set_state_with_ts(self, state, ts):
"""Set state and timestamp. Called only when the object is being deserailized from XML or being copied."""
self['state'] = state
self['state_ts'] = ts
def is_finished(self):
"""Returns true if the session has finished."""
return self.check_if_finished(self.state)
@classmethod
def check_if_finished(cls, state):
"""Returns true if the specified state object is in a finished state."""
if state in (RuntimeStatus.Completed, RuntimeStatus.Failed, RuntimeStatus.Stopped):
return True
return False
@classmethod
def transfer_statistics(cls, status_obj, rt_list, input_assign, output_assign):
"""
Transfer statistics to the pipeline input and output views.
If a pipeline has assigned resource views as its own view, then we need to transfer those
resource view statistics to the pipeline view.
@param status_obj: The pipeline runtime status object, that will be updated with the results.
@type status_obj: L{RuntimeStatus}
@param rt_list: List of child status-es.
@type rt_list: List of L{ResourceRuntime} objects.
@param input_assign: The input assignment dictionary.
@type input_assign: dict
@param output_assign: The output assignment dictioary.
@type output_assign: dict
"""
for pipe_view_name in input_assign:
(res_name, view_name) = input_assign[pipe_view_name]
if view_name in rt_list[res_name].status.statistics[keys.INPUT_VIEWS]:
vstats = rt_list[res_name].status.statistics[keys.INPUT_VIEWS][view_name]
status_obj.statistics[keys.INPUT_VIEWS][pipe_view_name] = vstats
for pipe_view_name in output_assign:
(res_name, view_name) = output_assign[pipe_view_name]
if view_name in rt_list[res_name].status.statistics[keys.OUTPUT_VIEWS]:
vstats = rt_list[res_name].status.statistics[keys.OUTPUT_VIEWS][view_name]
status_obj.statistics[keys.OUTPUT_VIEWS][pipe_view_name] = vstats
@classmethod
def compute_pipeline_state(cls, status_obj, rt_list, got_stop_req):
"""
Computes overall state based on child component's state.
Placing the logic for computing state of pipeline in the module for convenience,
as it will be easy to modify and update if new states are added.
@param status_obj: The pipeline runtime status object, that will be updated with the results.
@type status_obj: L{RuntimeStatus}
@param rt_list: List of child status-es.
@type rt_list: List of L{ResourceRuntime} objects.
@param got_stop_req: True if this pipeline has got the stop request.
@type got_stop_req: bool
"""
state = None
for s in rt_list:
res_state = s.status.state
# If even one child is in NoUpdate state, then return parent
# state as NoUpdate.
if res_state == RuntimeStatus.NoUpdate:
status_obj.state = RuntimeStatus.NoUpdate
return
if state == None:
# First iteration, just set state to the child state.
state = res_state
elif state == RuntimeStatus.Prepared:
# If in Prepared state and child state is not NoUpdate, then
# stay with Prepared state.
continue
elif state == RuntimeStatus.Started:
# if child state is not in NoUpdate or Prepared state, stay with the Started state
if res_state == RuntimeStatus.Prepared:
state = RuntimeStatus.Prepared
elif state == RuntimeStatus.Completed:
# If child state is not in NoUpdate, Prepared, Started, Failed, Stopping or Stopped state,
# then stay with completed state.
if res_state in (RuntimeStatus.Prepared, RuntimeStatus.Started, RuntimeStatus.Failed,
RuntimeStatus.Stopping, RuntimeStatus.Stopped):
state = res_state
elif state == RuntimeStatus.Stopping:
# If some child is still in Prepared/Started state or in failed state,
# then set parent state to the same.
if res_state in (RuntimeStatus.Prepared, RuntimeStatus.Started, RuntimeStatus.Failed):
state = res_state
elif state == RuntimeStatus.Stopped:
# If some child is still in Prepared/Started state or in Stopping/Failed state,
# then set parent state to the same.
if res_state in (RuntimeStatus.Prepared, RuntimeStatus.Started,
RuntimeStatus.Failed, RuntimeStatus.Stopping):
state = res_state
elif state == RuntimeStatus.Failed:
# If some child is still in Prepared/Started state, then set state to the same.
if res_state in (RuntimeStatus.Prepared, RuntimeStatus.Started):
state = res_state
# A stop request can cause some components in the pipeline to fail. For example, parent calls
# stop on child component A, which closes all its streams. This causes downstream component B
# to fail, even before it receives a stop request.
if got_stop_req:
if state == RuntimeStatus.Completed:
status_obj.state = RuntimeStatus.Completed
return
elif cls.check_if_finished(state):
status_obj.state = RuntimeStatus.Stopped
return
else:
status_obj.state = RuntimeStatus.Stopping
return
status_obj.state = state
|