# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008 - 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: runtime_table.py 9099 2009-09-25 21:53:48Z dhiraj $
import os
import base64
import time
import urlparse
import threading
from snaplogic.common.snap_exceptions import *
from snaplogic.common import uri_prefix
from snaplogic.common import snap_http_lib
"""
This module provides methods that manipulate runtime tables.
The runtime table provides a location for storing and looking up entries
based on a runtime URI. There are two kinds of tables supported
1) Table of resource runtime objects, with runtime status URI as the key
2)Table of runtime view entries, with runtime view URI as the key.
In both cases, the runtime URI is created by thes table at the time when the
entry is added. These tables provide a way of locating the related entries,
when HTTP requests to the runtime URIs arrive.
"""
status_lifetime = 300
"""The amount of time (in seconds) status information of a completed resource is kept around"""
runtime_status_uri_table = {}
"""This table maps a runtime status URI to the runtime resource object."""
runtime_view_uri_table = {}
"""This table maps a runtime view uri to the runtime resource object that serves the view and view name."""
runtime_table_lock = threading.Lock()
""" This lock is used for locking both the status andview runtime URI tables."""
def get_path_elements(u):
p = urlparse.urlparse(u).path
return [e.strip() for e in p.split('/') if e.strip()]
STATUS_URI_PATH_COUNT = len(get_path_elements(uri_prefix.RUNTIME_STATUS_ENTRY))
"""Precompute the number of path elements in the startus URI prefix."""
def initialize(runtime_entry_timeout):
"""
Initialize runtime table for this process.
@param runtime_entry_timeout: The timeout in seconds specified as either an int or a string.
@type runtime_entry_timeout: int/str
"""
global status_lifetime
status_lifetime = int(runtime_entry_timeout)
def _random():
"""Generate a random value."""
return base64.urlsafe_b64encode(os.urandom(30))
def add_runtime_entry(res_rt, host_uri):
runtime_table_lock.acquire()
try:
c = 0
key = _random()
while key in runtime_status_uri_table:
c += 1
if c > 10:
raise SnapException("Failed to generate random number despite several attempts (%s)" % key)
key = _random()
runtime_status_uri_table[key] = res_rt
res_rt.runtime_status_uri = snap_http_lib.concat_paths(host_uri, uri_prefix.RUNTIME_STATUS_ENTRY,
key, res_rt.resource_uri)
res_rt.runtime_control_uri = snap_http_lib.concat_paths(host_uri, uri_prefix.RUNTIME_STATUS_ENTRY, "/control/",
key, _random(), res_rt.resource_uri)
res_rt.my_notification_uri = snap_http_lib.concat_paths(host_uri, uri_prefix.RUNTIME_STATUS_ENTRY,
"/notification/", key, _random(), res_rt.resource_uri)
finally:
runtime_table_lock.release()
def list_runtime_entries():
ret_list = []
runtime_table_lock.acquire()
try:
for res_rt in runtime_status_uri_table.values():
if res_rt.status is not None:
state = res_rt.status.state
else:
state = None
ret_list.append({"runtime_status_uri" : res_rt.runtime_status_uri,
"runtime_control_uri" : res_rt.runtime_control_uri,
"runtime_id" : res_rt.rid,
"resource_uri" : res_rt.resource_uri,
"resource_name" : res_rt.resource_name,
"state" : state,
"owner" : res_rt.invoker
})
return ret_list
finally:
runtime_table_lock.release()
def remove_completed_runtime_entries():
"""
Removes finished and exited resource runtimes from the table.
This should be called whenever there is a need to reduce the number of old entries
in the resource runtime table.
"""
runtime_table_lock.acquire()
try:
# Use keys, as dict is being modified in loop.
for u in runtime_status_uri_table.keys():
if ((runtime_status_uri_table[u].exit_time is not None) and
(time.time() - runtime_status_uri_table[u].exit_time) > status_lifetime):
del runtime_status_uri_table[u]
finally:
runtime_table_lock.release()
def get_status_uri_entry(u, http_method=None):
"""
Fetch resource runtime object corresponding to the URI.
@param u: Runtime status URI.
@type u: str
@return: Resource runtime object found
@rtype: L{ServerResourceRuntime}
"""
elems = get_path_elements(u)
# Even when a control or notify URI is specified, we basically snip off those control and
# notification specific suffix from the URI and lookup based on the status component
# of the URI.
if len(elems) > STATUS_URI_PATH_COUNT + 1:
if elems[STATUS_URI_PATH_COUNT] in ('control', 'notification'):
if len(elems) <= STATUS_URI_PATH_COUNT + 3:
raise SnapHttpErrorCode(404, "Runtime URI (%s) not found" % u)
key = elems[STATUS_URI_PATH_COUNT + 1]
sub_key = elems[STATUS_URI_PATH_COUNT + 2]
uri_type = elems[STATUS_URI_PATH_COUNT]
else:
key = elems[STATUS_URI_PATH_COUNT]
uri_type = 'status'
else:
raise SnapHttpErrorCode(404, "Runtime URI (%s) not found" % u)
runtime_table_lock.acquire()
try:
try:
res_rt = runtime_status_uri_table[key]
if uri_type == 'control':
c = get_path_elements(res_rt.runtime_control_uri)
if c[STATUS_URI_PATH_COUNT + 2] != sub_key:
raise SnapHttpErrorCode(404, "Control URI (%s) not found" % u)
if http_method is not None and http_method.upper() != 'PUT':
raise SnapHttpErrorCode(405, "Control URI (%s) does not support HTTP method '%s'" %
(u, http_method))
elif uri_type == 'notification':
n = get_path_elements(res_rt.my_notification_uri)
if n[STATUS_URI_PATH_COUNT + 2] != sub_key:
raise SnapHttpErrorCode(404, "Notification URI (%s) not found" % u)
if http_method is not None and http_method.upper() != 'PUT':
raise SnapHttpErrorCode(405, "Notification URI (%s) does not support HTTP method '%s'" %
(u, http_method))
else:
if http_method is not None and http_method.upper() != 'GET':
raise SnapHttpErrorCode(405, "Status URI (%s) does not support HTTP method '%s'" %
(u, http_method))
return res_rt
finally:
runtime_table_lock.release()
except KeyError:
raise SnapHttpErrorCode(404, "No such Resource runtime found (%s)" % u)
def add_runtime_view_entry(resource_uri, is_record, entry):
"""
Add a runtime view entry to table and create a runtime view URI for that entry.
The runtime view URI returned by this method can be used by future incoming HTTP
requests to target the particular view of the runtime resource. The entry has
following values in a sequence:
- status URI of the resource runtime that has this view.
- True if the view is output view and False if it is input view.
- Name of the view.
@param resource_uri: URI of the resource that the view belongs to.
@type resource_uri: str
@param is_record: True, if the view is record mode, false otherwise.
@type is_record: bool
@param entry: The view entry consists of:
(runtime status URI, True if output view/False for input, view name)
@rtype: 3-tuple
@return: The runtime view URI
@rtype: str
"""
if is_record:
pref = uri_prefix.RUNTIME_RECORD_VIEW
else:
pref = uri_prefix.RUNTIME_BINARY_VIEW
runtime_table_lock.acquire()
try:
c = 0
u = snap_http_lib.concat_paths(pref, _random(), resource_uri, entry[2])
while u in runtime_view_uri_table:
c += 1
if c > 10:
raise SnapException("Failed to generate random number despite several attempts (%s)" % u)
u = snap_http_lib.concat_paths(pref,_random(), resource_uri, entry[2])
runtime_view_uri_table[u] = entry
return u
finally:
runtime_table_lock.release()
def set_stream_object_for_view_entry(uri):
"""
Find view URI entry in table and set the stream object for the entry.
This method is currently called for POST snap streams to input views, because unlike
other snap streams, there could be a sequence of POST requests that actually feed
this stream. By placing the stream object in the table, we indicate that the first
HTTP request to this URI has actually happened. Followup requests will note that the
first request has happened and won't do some of the view initialization. Instead, the
effort will only be to connect the new request to the existing stream object.
@param uri: The runtime view URI.
@type uri: str
@param stream: The stream object created by the first request to the view.
"""
def get_view_uri_entry(u):
"""
Get a runtime view entry from the view URI table.
This returns:
- status URI of the resource runtime that has this view.
- True if the view is output view and False if it is input view.
- Name of the view.
@param u: Runtime view URI.
@type u: str
@return: The view entry consists of a sequence of these values:
(runtime status URI of res_rt, True for output view/False for input, view name)
@rtype: 3-tuple
"""
sch,loc,resource_path,param,query_str,frag = urlparse.urlparse(u)
runtime_table_lock.acquire()
try:
try:
return runtime_view_uri_table[resource_path]
finally:
runtime_table_lock.release()
except KeyError:
raise SnapObjNotFoundError("No such Resource runtime view found: %s %s" %
(resource_path, runtime_view_uri_table.keys()))
def remove_runtime_view_uris(u_list):
"""
Remove the specifed list of view URIs from the table.
This is typically called when a resource moves from PREPARED to START state and does
not want to receive any more new connections to the views.
@param u_list: List of runtime view URIs.
@type u_list: list
"""
runtime_table_lock.acquire()
try:
for u in u_list:
sch,loc,resource_path,param,query_str,frag = urlparse.urlparse(u)
try:
del runtime_view_uri_table[resource_path]
except KeyError:
raise SnapObjNotFoundError("Runtime view uri \"%s\" not found for deletion %s" %
(resource_path, runtime_view_uri_table.keys()))
finally:
runtime_table_lock.release()
|