mgmt_server.py :  » Development » SnapLogic » snaplogic » mgmtserver » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » mgmtserver » mgmt_server.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2008-2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# $
#
# $Id: mgmt_server.py 9267 2009-10-14 23:28:34Z darin $

import sys
import os
import getopt
import types
import time
import cgi
from xmlrpclib import Marshaller,Unmarshaller
import xmlrpclib
from StringIO import StringIO
from decimal import Decimal
from time import strptime
from datetime import datetime
import logging
import cherrypy
from cherrypy._cptools import XMLRPCController
from cherrypy import request


from snaplogic.common.snap_exceptions import SnapValueError,SnapFormatError
from snaplogic.snapi import exec_interface,snapi_base,keys
from snaplogic.common.config import snap_config
from snaplogic.common.config.snap_config import SnapConfig
from snaplogic.common.runtime_status import RuntimeStatus
from snaplogic.snapi_base.exceptions import SnapiHttpException
from snaplogic.common import version_info

from snaplogic import rp
from snaplogic.rp import json_rp,streaming_json_rp,SNAP_ASN1_CONTENT_TYPE
from snaplogic.common.snap_http_lib import *

snap_server_port = None

log = None
elog = None

STATIC_WEB_ROOT ='/'

RPC_WEB_ROOT ='/'

DEFAULT_FAVICON = 'SnapLogic_gear.gif'

FAULT_CODE_TEMPLATE = """<?xml version="1.0"?>
<methodResponse>
  <fault>
    <value>
      <struct>
        <member>
          <name>faultCode</name>
          <value><int>%s</int></value>
        </member>
        <member>
          <name>faultString</name>
          <value><string>%s</string></value>
        </member>
      </struct>
    </value>
  </fault>
</methodResponse>
"""

_orig_loads = xmlrpclib.loads

def _my_loads(data, use_datetime=1):
    return _orig_loads(data, use_datetime)

xmlrpclib.loads = _my_loads

_orig_datetime_type = xmlrpclib._datetime_type

def _my_datetime_type(data):
    t = strptime(data, "%Y-%m-%dT%H:%M:%S")
    return datetime(*tuple(t)[:6])

xmlrpclib._datetime_type = _my_datetime_type

class SnapController(XMLRPCController):
    """
    Dispatcher of the XmlRpc calls coming from the Designer to
    the appropriate methods in SnAPI.
    
    """
    
    def __reflect_methods(self, module):
        """
        Dynamically reflect all the methods in SnAPI.
        
        """
        for f in module.__dict__.keys():
            if f.startswith("_"):
                continue
            snapi_func = module.__dict__[f] 
            if callable(snapi_func):
                self.__dict__[f] = cherrypy.expose(snapi_func)
    
    def _dump_Decimal(self, value, write):
        write("<value><double>")
        write(str(value))
        write("</double></value>\n")
    Marshaller.dispatch[Decimal] = _dump_Decimal
    
    def _dump_string(self, value, write, escape=xmlrpclib.escape):
        """
        Replaces the original dump_string in XmlRpcLib's
        Marshaller with this version, which would properly
        produce CDATA if necessary
        
        """
        write("<value><string>")
        if value and len(value) != len(value.strip()): 
            # Do CDATA if we have leading/trailing
            # whitespace 
            values = value.split(']]>')
            first = True
            cdata = ''
            for value in values:
                if first:
                    first = False
                else:
                    cdata += '<![CDATA[]]]]><![CDATA[>]]>'
                cdata = cdata + '<![CDATA[' + value + ']]>'
            write(cdata)
        else:
            write(escape(value))
        write("</string></value>\n")
    Marshaller.dispatch[types.StringType] = _dump_string

    
    def _dump_unicode(self, value, write, escape=xmlrpclib.escape):
        value = value.encode(self.encoding)
        f = Marshaller.dispatch[types.StringType]
        f(self, value, write, escape)
    Marshaller.dispatch[types.UnicodeType] = _dump_unicode
    
    def _end_double(self, data):
        self.append(Decimal(data))
        self._value = 0
    Unmarshaller.dispatch["double"] = _end_double
    
    def __init__(self):
        """
        Initialize this dispatcher, by installing reflections of SnAPI methods
        (see L{self.__reflect_methods}).
        
        """
        super(SnapController, self).__init__()
        self.__reflect_methods(snapi_base)
        self.__reflect_methods(exec_interface)
        dict_dispatcher = Marshaller.dispatch[dict]
        # Maps Runtime URI to the RT object, for sending start requests
        # (for preview).
        # Because these will be guaranteed to be unique by the server,
        # we will not be locking this...
        self._rt_dict = {}
        # Maps URI of an output view to a Response object to read from
        # (for preview)
        # Because these will be guaranteed to be unique by the server,
        # we will not be locking this...
        self._view_conns = {}
        Marshaller.dispatch[RuntimeStatus] = dict_dispatcher
    
    @cherrypy.expose
    def execute(self, prepare_only, name, uri, inputs = None, outputs = None, params = None, cred = None):
        return self._execute(prepare_only, name, uri, inputs, outputs, params, cred)
    
    def _execute(self, prepare_only, name, uri, inputs = None, outputs = None, params = None, cred = None):
        """
        Execute resource.
        
        @param resource_name: User defined name for the resource. Could be any string the caller of this
            method would like to use to identify the pipeline being invoked. This name does not have to
            be unique.
        @type resource_name:  str
        
        @param resource_uri: The URI of the resource to be executed.
        @type resource_uri:  str
        
        @param inputs: input views to write to 
        @type inputs:  list
        
        @param outputs: output views to read from
        @type outputs:  list
        
        @param params: Dictionary with param name as key and param value as value.
        @type params:  dict 
        
        @param cred: A 2-tuple containing (username, password)
        @type cred:  tuple
        
        @return: Runtime status URI of the executing resource.
        @rtype:  str
        
        """
        if not inputs:
            inputs = []
        if not outputs:
            outputs = []
        if not params:
            params = {}
        
        retval = {}
        retval[keys.INPUT_VIEWS] = {}
        retval[keys.OUTPUT_VIEWS] = {}
        
        res_rt = exec_interface._send_prepare(name, uri, inputs, outputs, params, cred)
        for out in res_rt.output_views.values():
            retval[keys.OUTPUT_VIEWS][out.name] = out.uri
        for inp in res_rt.input_views.values():
            retval[keys.OUTPUT_VIEWS][inp.name] = inp.uri
    
        if not prepare_only:
            exec_interface._send_start(res_rt)
        else:
            self._rt_dict[res_rt.rid] = res_rt
        retval[keys.RUNTIME_STATUS_URI] = res_rt.runtime_status_uri
        retval[keys.RUNTIME_ID] = res_rt.rid
        retval[keys.RUNTIME_CONTROL_URI] = res_rt.runtime_control_uri
        return retval
    
    @cherrypy.expose
    def send_start(self, rid):
        res_rt = self._rt_dict[rid]
        self._rt_dict.__delitem__(rid)
        exec_interface._send_start(res_rt)
        return "OK"
    
    @cherrypy.expose
    def stop_exec(self, status_uri, control_uri, view_uris=None):
        st = exec_interface.get_status(status_uri)
        if st.state in ['Prepared', 'Started']:
            exec_interface.send_stop(control_uri)
        if not view_uris:
            # Nothing to close, we have sent the stop request
            # and we are done.
            return "OK"
        t0 = time.time()
        while True:
            st = exec_interface.get_status(status_uri)
            if st.state in ['Completed', 'Failed', 'Stopped']:
                break
            if time.time() - t0 > 30:
                # Too much time passed, we'll close responses
                # anyway...
                cherrypy.log("Timed out waiting for pipeline to stop", severity=logging.WARNING) 
                break
            time.sleep(1)
        for view_uri in view_uris:
            try:
                response = self._view_conns[view_uri]
                response.close()
                self._view_conns.__delitem__(view_uri)
            except Exception, e:
                cherrypy.log("Error closing view %s: %s" % (view_uri, e), severity=logging.WARNING)
        return "OK"
        
    @cherrypy.expose
    def connect_output_view(self, uri):
        """
        Connect to the URI of an output view. 
        """
        response = urlopen('GET', uri, None, {'Accept' : SNAP_ASN1_CONTENT_TYPE})
        self._view_conns[uri] = response
        return uri
    
    @cherrypy.expose
    def read_output_view(self, uri, limit=100):
        try:
            response = self._view_conns[uri]
        except KeyError, e:
            err_result = FAULT_CODE_TEMPLATE % (-1, "Runtime URI not found: " % uri)
            return err_result
        
        if limit <= 0:
            limit = 100
        
        try:
            headers = response.getHeaders()
            status = response.getStatus()
            reason = response.getReason()
            
            content_type = headers['content-type']
            rp_plugin = rp.get_rp(content_type)
            if rp_plugin == json_rp:
                rp_plugin = streaming_json_rp
            reader = rp_plugin.Reader(response)
            retval = []
            idx = 0
            for row in reader:
                if idx >= limit:
                    break
                retval.append(row)
                idx += 1
            retval = [uri, retval]
        except Exception, e:
            return self._makeFault(str(e), -1)
        if status >= 400:
            err_msg = "%s: %s" % (reason, retval)
            return self._makeFault(err_msg, status)
        return retval
        
    
    @cherrypy.expose
    def tunnel_http(self, method, uri, data=None, headers=None, cred=None):
        """
        Tunnel an HTTP request from client (if crossdomain.xml is not there).
        
        @param method: HTTP method to use (POST, PUT, GET, etc. -- see RFC 2616)
        @type method: str
        
        @param uri: URI to send the request to
        @type uri: str
        
        @param data: Data to send in the body of the request. Note that, in full compliance with RFC 
        2616, we allow data to be send even on GET and DELETE requests, while some implementations
        don't deal with this correctly. TODO.
        @type data: obj
        
        @param cred: A 2-tuple containing (username, password)
        @type cred:  tuple
        
        @return: Response data to the request as text
        @rtype:  python objects.
        
        """
        headers = {'Accept':'application/json'}
        response = urlopen(method, uri, data, headers, cred)
        headers = response.getHeaders()
        status = response.getStatus()
        reason = response.getReason()
        rsp = response.read()
        buf = StringIO()
        buf.write(rsp)
        buf.seek(0)
        response = buf
        
        try:
            content_type = headers['content-type']
            if content_type == 'text/html':
                retval = response.read()
            else:
                rp_plugin = rp.get_rp(content_type)
                
                reader = rp_plugin.Reader(response)
                retval = [obj for obj in reader]
                if len(retval) == 1:
                    retval = retval[0]
        except Exception, e:
            retval = response.read()
            return self._makeFault(str(e) + ": " + retval, -1)
        if status >= 400:
            err_msg = "%s: %s" % (reason, retval)
            return self._makeFault(err_msg, status)
        return retval

    
    def _makeFault(self, msg, faultCode=-1):
        """
        Create an XmlRpc Fault message.
        
        This used to work automagically in CherryPy 2, but apparently doesn't in 
        CherryPy 3. So this method is introduced. It just converts an exception into
        an XmlRpc fault message (the fault code is always -1). See also 
        http://groups.google.com/group/cherrypy-users/browse_thread/thread/1a05e386d8356ca6
        
        @param exc: Exception to convert into a fault message
        @type exc: Exception
        
        @return: The XmlRpc fault message
        @rtype: str 
        
        """
        result = FAULT_CODE_TEMPLATE % (faultCode, cgi.escape(msg))
        return result
    
    @cherrypy.expose
    def index(self, *args, **kwargs):
        if req.method == 'GET':
            raise cherrypy.InternalRedirect(STATIC_WEB_ROOT + '/index.html')
        else:
            result = None
            try:
                result = XMLRPCController.default(self, *args, **kwargs)
            except SnapiHttpException, se:
                result = [self._makeFault(se.__str__(), se.status)]
            except Exception, e:
                result = [self._makeFault(e.__str__())]
            return result
           
    @cherrypy.expose
    def validator(self, *args, **kwargs):
        try:
            return snapi_base.validate(*args, **kwargs)
        except SnapiHttpException, exc:
            if exc.status == 400:
                return exc.body
            else:
                raise

    @cherrypy.expose
    def get_snap_server_port(self, *args, **kwargs):
        try:
            return int(snap_server_port)
        except ValueError:
            err_result = FAULT_CODE_TEMPLATE % (-1, "Unable to determine port" % snap_server_port)
            return err_result
        

    @cherrypy.expose
    def suggest(self, *args, **kwargs):
        result = {}
        try:
            result['resdef'] = snapi_base.suggest_resource_values(*args, **kwargs)
        except SnapiHttpException, exc:
            if exc.status == 400:
                result['err_obj'] = exc.body
            else:
                raise
        return result
    
    @cherrypy.expose
    def pipeline_validator(self, *args, **kwargs):
        try:
            return snapi_base.validate_pipeline(*args, **kwargs)
        except SnapiHttpException, exc:
            if exc.status == 400:
                return exc.body
            else:
                raise
        

                
def __no_signals():
    """A no-op."""
    pass

def main():
    """Main entry point of the Management Server."""
    
    root = SnapController()
    config_filename = None
    try:
        opts = getopt.getopt(sys.argv[1:], "c:", ["config="])[0]
    except getopt.GetoptError, e:
        sys.exit("Option(s) specified are not valid %s" % str(e))
    for o, a in opts:
        if o in ("-c", "--config"):
            config_filename = a
    
    if config_filename is None:
        sys.exit('Usage: MgmtServer -c <config-file>') 
        
    try:
        conf = SnapConfig(config_filename, default=snap_config.MANAGEMENT_DEFAULT)
        main_conf = SnapConfig(config_filename, default=snap_config.MAIN_SERVER_DEFAULT, instance_only=True)            
    except Exception, e:
        sys.exit("Config file %s failed to parse: %s" % (config_filename, str(e)))

    main_conf = main_conf.get_section('main')
    log_dir = main_conf['log_dir']
    global snap_server_port
    snap_server_port = main_conf['server_port']
    mgmt_conf = conf.get_section('mgmt_server')
    try:
        port = int(mgmt_conf['port'])
    except ValueError:
        raise SnapValueError('Invalid value for port specified: %s' % port)
    
    host = None
    try:
        host = mgmt_conf['hostname']
    except KeyError:
        print "Hostname not found, defaulting to 0.0.0.0" 
        host = "0.0.0.0"
    
    
    static_dir = mgmt_conf['static_dir']
        
    if not os.path.exists(static_dir):
        raise SnapValueError("static_dir %s does not exist" % static_dir)

    if not os.path.isdir(static_dir):
        raise SnapValueError("static_dir %s is not a directory" % static_dir)
    
    try:
        favicon_path = mgmt_conf['favicon_path']
    except KeyError:
        favicon_path = "%s/%s" % (static_dir, DEFAULT_FAVICON)
    cherrypy.engine._set_signals = __no_signals
   
    config = {
               'global'       : {
                                  'server.socket_host'      : host,
                                  'server.socket_port'      : port,
                                  'tools.xmlrpc.allow_none' : True,
                                  'tools.xmlrpc.encoding'   : 'utf-8',
                                  'allow_none'              : True,
                                  'log.error_file'          : "%s/mgmt_error.log" % log_dir,
                                  'log.access_file'         : "%s/mgmt_access.log" % log_dir,
                                  'log.screen'              : False
                                },
               '/'            : {
                                  'tools.staticdir.root'        : static_dir,
                                  'tools.staticdir.dir'         : '.',
                                  'tools.xmlrpc.on'             : True,  
                                  'tools.staticdir.on'          : True,
                                 },
               '/favicon.ico'   : {
                                  'tools.staticfile.on'         :  True,
                                  'tools.staticfile.filename'   :  favicon_path
                                  }
             }
    print version_info.server_banner 
    print version_info.server_copyright
    print "Starting Management Server..."     
    cherrypy.quickstart(root, '/', config)
    
if __name__ == "__main__":
    main()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.