# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008-2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
# $
#
# $Id: mgmt_server.py 9267 2009-10-14 23:28:34Z darin $
import sys
import os
import getopt
import types
import time
import cgi
from xmlrpclib import Marshaller,Unmarshaller
import xmlrpclib
from StringIO import StringIO
from decimal import Decimal
from time import strptime
from datetime import datetime
import logging
import cherrypy
from cherrypy._cptools import XMLRPCController
from cherrypy import request
from snaplogic.common.snap_exceptions import SnapValueError,SnapFormatError
from snaplogic.snapi import exec_interface,snapi_base,keys
from snaplogic.common.config import snap_config
from snaplogic.common.config.snap_config import SnapConfig
from snaplogic.common.runtime_status import RuntimeStatus
from snaplogic.snapi_base.exceptions import SnapiHttpException
from snaplogic.common import version_info
from snaplogic import rp
from snaplogic.rp import json_rp,streaming_json_rp,SNAP_ASN1_CONTENT_TYPE
from snaplogic.common.snap_http_lib import *
snap_server_port = None
log = None
elog = None
STATIC_WEB_ROOT ='/'
RPC_WEB_ROOT ='/'
DEFAULT_FAVICON = 'SnapLogic_gear.gif'
FAULT_CODE_TEMPLATE = """<?xml version="1.0"?>
<methodResponse>
<fault>
<value>
<struct>
<member>
<name>faultCode</name>
<value><int>%s</int></value>
</member>
<member>
<name>faultString</name>
<value><string>%s</string></value>
</member>
</struct>
</value>
</fault>
</methodResponse>
"""
_orig_loads = xmlrpclib.loads
def _my_loads(data, use_datetime=1):
return _orig_loads(data, use_datetime)
xmlrpclib.loads = _my_loads
_orig_datetime_type = xmlrpclib._datetime_type
def _my_datetime_type(data):
t = strptime(data, "%Y-%m-%dT%H:%M:%S")
return datetime(*tuple(t)[:6])
xmlrpclib._datetime_type = _my_datetime_type
class SnapController(XMLRPCController):
"""
Dispatcher of the XmlRpc calls coming from the Designer to
the appropriate methods in SnAPI.
"""
def __reflect_methods(self, module):
"""
Dynamically reflect all the methods in SnAPI.
"""
for f in module.__dict__.keys():
if f.startswith("_"):
continue
snapi_func = module.__dict__[f]
if callable(snapi_func):
self.__dict__[f] = cherrypy.expose(snapi_func)
def _dump_Decimal(self, value, write):
write("<value><double>")
write(str(value))
write("</double></value>\n")
Marshaller.dispatch[Decimal] = _dump_Decimal
def _dump_string(self, value, write, escape=xmlrpclib.escape):
"""
Replaces the original dump_string in XmlRpcLib's
Marshaller with this version, which would properly
produce CDATA if necessary
"""
write("<value><string>")
if value and len(value) != len(value.strip()):
# Do CDATA if we have leading/trailing
# whitespace
values = value.split(']]>')
first = True
cdata = ''
for value in values:
if first:
first = False
else:
cdata += '<![CDATA[]]]]><![CDATA[>]]>'
cdata = cdata + '<![CDATA[' + value + ']]>'
write(cdata)
else:
write(escape(value))
write("</string></value>\n")
Marshaller.dispatch[types.StringType] = _dump_string
def _dump_unicode(self, value, write, escape=xmlrpclib.escape):
value = value.encode(self.encoding)
f = Marshaller.dispatch[types.StringType]
f(self, value, write, escape)
Marshaller.dispatch[types.UnicodeType] = _dump_unicode
def _end_double(self, data):
self.append(Decimal(data))
self._value = 0
Unmarshaller.dispatch["double"] = _end_double
def __init__(self):
"""
Initialize this dispatcher, by installing reflections of SnAPI methods
(see L{self.__reflect_methods}).
"""
super(SnapController, self).__init__()
self.__reflect_methods(snapi_base)
self.__reflect_methods(exec_interface)
dict_dispatcher = Marshaller.dispatch[dict]
# Maps Runtime URI to the RT object, for sending start requests
# (for preview).
# Because these will be guaranteed to be unique by the server,
# we will not be locking this...
self._rt_dict = {}
# Maps URI of an output view to a Response object to read from
# (for preview)
# Because these will be guaranteed to be unique by the server,
# we will not be locking this...
self._view_conns = {}
Marshaller.dispatch[RuntimeStatus] = dict_dispatcher
@cherrypy.expose
def execute(self, prepare_only, name, uri, inputs = None, outputs = None, params = None, cred = None):
return self._execute(prepare_only, name, uri, inputs, outputs, params, cred)
def _execute(self, prepare_only, name, uri, inputs = None, outputs = None, params = None, cred = None):
"""
Execute resource.
@param resource_name: User defined name for the resource. Could be any string the caller of this
method would like to use to identify the pipeline being invoked. This name does not have to
be unique.
@type resource_name: str
@param resource_uri: The URI of the resource to be executed.
@type resource_uri: str
@param inputs: input views to write to
@type inputs: list
@param outputs: output views to read from
@type outputs: list
@param params: Dictionary with param name as key and param value as value.
@type params: dict
@param cred: A 2-tuple containing (username, password)
@type cred: tuple
@return: Runtime status URI of the executing resource.
@rtype: str
"""
if not inputs:
inputs = []
if not outputs:
outputs = []
if not params:
params = {}
retval = {}
retval[keys.INPUT_VIEWS] = {}
retval[keys.OUTPUT_VIEWS] = {}
res_rt = exec_interface._send_prepare(name, uri, inputs, outputs, params, cred)
for out in res_rt.output_views.values():
retval[keys.OUTPUT_VIEWS][out.name] = out.uri
for inp in res_rt.input_views.values():
retval[keys.OUTPUT_VIEWS][inp.name] = inp.uri
if not prepare_only:
exec_interface._send_start(res_rt)
else:
self._rt_dict[res_rt.rid] = res_rt
retval[keys.RUNTIME_STATUS_URI] = res_rt.runtime_status_uri
retval[keys.RUNTIME_ID] = res_rt.rid
retval[keys.RUNTIME_CONTROL_URI] = res_rt.runtime_control_uri
return retval
@cherrypy.expose
def send_start(self, rid):
res_rt = self._rt_dict[rid]
self._rt_dict.__delitem__(rid)
exec_interface._send_start(res_rt)
return "OK"
@cherrypy.expose
def stop_exec(self, status_uri, control_uri, view_uris=None):
st = exec_interface.get_status(status_uri)
if st.state in ['Prepared', 'Started']:
exec_interface.send_stop(control_uri)
if not view_uris:
# Nothing to close, we have sent the stop request
# and we are done.
return "OK"
t0 = time.time()
while True:
st = exec_interface.get_status(status_uri)
if st.state in ['Completed', 'Failed', 'Stopped']:
break
if time.time() - t0 > 30:
# Too much time passed, we'll close responses
# anyway...
cherrypy.log("Timed out waiting for pipeline to stop", severity=logging.WARNING)
break
time.sleep(1)
for view_uri in view_uris:
try:
response = self._view_conns[view_uri]
response.close()
self._view_conns.__delitem__(view_uri)
except Exception, e:
cherrypy.log("Error closing view %s: %s" % (view_uri, e), severity=logging.WARNING)
return "OK"
@cherrypy.expose
def connect_output_view(self, uri):
"""
Connect to the URI of an output view.
"""
response = urlopen('GET', uri, None, {'Accept' : SNAP_ASN1_CONTENT_TYPE})
self._view_conns[uri] = response
return uri
@cherrypy.expose
def read_output_view(self, uri, limit=100):
try:
response = self._view_conns[uri]
except KeyError, e:
err_result = FAULT_CODE_TEMPLATE % (-1, "Runtime URI not found: " % uri)
return err_result
if limit <= 0:
limit = 100
try:
headers = response.getHeaders()
status = response.getStatus()
reason = response.getReason()
content_type = headers['content-type']
rp_plugin = rp.get_rp(content_type)
if rp_plugin == json_rp:
rp_plugin = streaming_json_rp
reader = rp_plugin.Reader(response)
retval = []
idx = 0
for row in reader:
if idx >= limit:
break
retval.append(row)
idx += 1
retval = [uri, retval]
except Exception, e:
return self._makeFault(str(e), -1)
if status >= 400:
err_msg = "%s: %s" % (reason, retval)
return self._makeFault(err_msg, status)
return retval
@cherrypy.expose
def tunnel_http(self, method, uri, data=None, headers=None, cred=None):
"""
Tunnel an HTTP request from client (if crossdomain.xml is not there).
@param method: HTTP method to use (POST, PUT, GET, etc. -- see RFC 2616)
@type method: str
@param uri: URI to send the request to
@type uri: str
@param data: Data to send in the body of the request. Note that, in full compliance with RFC
2616, we allow data to be send even on GET and DELETE requests, while some implementations
don't deal with this correctly. TODO.
@type data: obj
@param cred: A 2-tuple containing (username, password)
@type cred: tuple
@return: Response data to the request as text
@rtype: python objects.
"""
headers = {'Accept':'application/json'}
response = urlopen(method, uri, data, headers, cred)
headers = response.getHeaders()
status = response.getStatus()
reason = response.getReason()
rsp = response.read()
buf = StringIO()
buf.write(rsp)
buf.seek(0)
response = buf
try:
content_type = headers['content-type']
if content_type == 'text/html':
retval = response.read()
else:
rp_plugin = rp.get_rp(content_type)
reader = rp_plugin.Reader(response)
retval = [obj for obj in reader]
if len(retval) == 1:
retval = retval[0]
except Exception, e:
retval = response.read()
return self._makeFault(str(e) + ": " + retval, -1)
if status >= 400:
err_msg = "%s: %s" % (reason, retval)
return self._makeFault(err_msg, status)
return retval
def _makeFault(self, msg, faultCode=-1):
"""
Create an XmlRpc Fault message.
This used to work automagically in CherryPy 2, but apparently doesn't in
CherryPy 3. So this method is introduced. It just converts an exception into
an XmlRpc fault message (the fault code is always -1). See also
http://groups.google.com/group/cherrypy-users/browse_thread/thread/1a05e386d8356ca6
@param exc: Exception to convert into a fault message
@type exc: Exception
@return: The XmlRpc fault message
@rtype: str
"""
result = FAULT_CODE_TEMPLATE % (faultCode, cgi.escape(msg))
return result
@cherrypy.expose
def index(self, *args, **kwargs):
if req.method == 'GET':
raise cherrypy.InternalRedirect(STATIC_WEB_ROOT + '/index.html')
else:
result = None
try:
result = XMLRPCController.default(self, *args, **kwargs)
except SnapiHttpException, se:
result = [self._makeFault(se.__str__(), se.status)]
except Exception, e:
result = [self._makeFault(e.__str__())]
return result
@cherrypy.expose
def validator(self, *args, **kwargs):
try:
return snapi_base.validate(*args, **kwargs)
except SnapiHttpException, exc:
if exc.status == 400:
return exc.body
else:
raise
@cherrypy.expose
def get_snap_server_port(self, *args, **kwargs):
try:
return int(snap_server_port)
except ValueError:
err_result = FAULT_CODE_TEMPLATE % (-1, "Unable to determine port" % snap_server_port)
return err_result
@cherrypy.expose
def suggest(self, *args, **kwargs):
result = {}
try:
result['resdef'] = snapi_base.suggest_resource_values(*args, **kwargs)
except SnapiHttpException, exc:
if exc.status == 400:
result['err_obj'] = exc.body
else:
raise
return result
@cherrypy.expose
def pipeline_validator(self, *args, **kwargs):
try:
return snapi_base.validate_pipeline(*args, **kwargs)
except SnapiHttpException, exc:
if exc.status == 400:
return exc.body
else:
raise
def __no_signals():
"""A no-op."""
pass
def main():
"""Main entry point of the Management Server."""
root = SnapController()
config_filename = None
try:
opts = getopt.getopt(sys.argv[1:], "c:", ["config="])[0]
except getopt.GetoptError, e:
sys.exit("Option(s) specified are not valid %s" % str(e))
for o, a in opts:
if o in ("-c", "--config"):
config_filename = a
if config_filename is None:
sys.exit('Usage: MgmtServer -c <config-file>')
try:
conf = SnapConfig(config_filename, default=snap_config.MANAGEMENT_DEFAULT)
main_conf = SnapConfig(config_filename, default=snap_config.MAIN_SERVER_DEFAULT, instance_only=True)
except Exception, e:
sys.exit("Config file %s failed to parse: %s" % (config_filename, str(e)))
main_conf = main_conf.get_section('main')
log_dir = main_conf['log_dir']
global snap_server_port
snap_server_port = main_conf['server_port']
mgmt_conf = conf.get_section('mgmt_server')
try:
port = int(mgmt_conf['port'])
except ValueError:
raise SnapValueError('Invalid value for port specified: %s' % port)
host = None
try:
host = mgmt_conf['hostname']
except KeyError:
print "Hostname not found, defaulting to 0.0.0.0"
host = "0.0.0.0"
static_dir = mgmt_conf['static_dir']
if not os.path.exists(static_dir):
raise SnapValueError("static_dir %s does not exist" % static_dir)
if not os.path.isdir(static_dir):
raise SnapValueError("static_dir %s is not a directory" % static_dir)
try:
favicon_path = mgmt_conf['favicon_path']
except KeyError:
favicon_path = "%s/%s" % (static_dir, DEFAULT_FAVICON)
cherrypy.engine._set_signals = __no_signals
config = {
'global' : {
'server.socket_host' : host,
'server.socket_port' : port,
'tools.xmlrpc.allow_none' : True,
'tools.xmlrpc.encoding' : 'utf-8',
'allow_none' : True,
'log.error_file' : "%s/mgmt_error.log" % log_dir,
'log.access_file' : "%s/mgmt_access.log" % log_dir,
'log.screen' : False
},
'/' : {
'tools.staticdir.root' : static_dir,
'tools.staticdir.dir' : '.',
'tools.xmlrpc.on' : True,
'tools.staticdir.on' : True,
},
'/favicon.ico' : {
'tools.staticfile.on' : True,
'tools.staticfile.filename' : favicon_path
}
}
print version_info.server_banner
print version_info.server_copyright
print "Starting Management Server..."
cherrypy.quickstart(root, '/', config)
if __name__ == "__main__":
main()
|