server.py :  » Development » SnapLogic » snaplogic » server » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » server » server.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2008-2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id: server.py 9868 2009-11-21 17:50:45Z dhiraj $

"""
The SnapLogic server.

The SnapServer runs as a WSGI application. All interactions with the
server take place via a REST interface, all incoming requests are first
handled by a 'request handler' component. This module here implements
the WSGI front-end and the request handler. 

The request handler knows which of our modules need to be invoked when a
new request arrives. It takes the request and translates it into the
appropriate call to that module's API.

Every WSGI application has to have a dedicated function or object for
the initial handling of incoming requests. In our case, the request-handler
is this function (called 'application').

Any incoming request results in a new thread being created. This is done
by WSGI automatically, we don't have to take any extra steps for this. Once
our request-handler is called, it is already running inside of a new thread.

"""

from __future__ import with_statement

import functools
import urlparse
import time
import socket
import signal
import types
from threading import BoundedSemaphore
from decimal import Decimal
from datetime import datetime

from snaplogic.common.version_info import is_pe
from snaplogic.server.http_request import HttpRequest
from snaplogic.server.repository import rh_front_end
from snaplogic.server.resource_diff import rh_resource_diff
from snaplogic.server.config import set_common_config
from snaplogic import snapi_base
from snaplogic import server
from snaplogic.server import pipeline_rh_front_end,pipeline_manager,pipe_to_http,http_to_pipe,cc_list
from snaplogic.server import RhResponse,cc_proxy
from snaplogic.server import product_prefix,uri_checker
from snaplogic.common.snap_exceptions import *
from snaplogic.common import snap_crypt,runtime_table

from snaplogic.common import uri_prefix,snap_log,version_info,snap_stats
from snaplogic.common.config import rh_front_end
from snaplogic.common.config import snap_config,credentials_store
from snaplogic.common import snap_http_lib
from snaplogic.common.headers import *
from snaplogic.common.version_info import *

from snaplogic.snapi_base import keys

from snaplogic import rp

from snaplogic.server import auth

import snaplogic.server.component_list as comp_list
from snaplogic.server import cc_list

from string import replace

import os

if is_pe():
    from snaplogic.pe import snap_scheduler
    from snaplogic.pe import snap_notifier

pipe_to_http_uri_prefix = None
"""The URI prefix used by 'pipe to http' or 'http to pipe' services"""

# List of prefixes sorted by length in descending order. Used for finding longest match first.
prefixes = None

# We will translate incoming HTTP requests into calls to various API functions,
# using a dictionary as the jump table. This implements our REST CRUD call-matrix.
# The two dimensions of that matrix are going to be the prefix of the URI as well
# as the HTTP method.
handlers = {}

MAX_BAD_REQUESTS = 100
""" 
Maximum number of simultaneous bad requests.  
Bad request is defined as a request to which the server replies with an HTTP error. 

"""

MAX_BAD_REQUEST_LENGTH = 100 * 1024
""" Limit on bad request length """

error_sem = None
""" Semaphore limiting the maximum number of simultaneous bad requests """ 

def main_root(http_req):
    """
    Print the main page of the server.

    This is just a static listing of the various high-level URIs,
    which a user (or web spider) can browse in our server.

    @param http_req:        HTTP request object.
    @type  http_req:        L{HttpRequest}

    @return:                RhResponse object with data and code
                            to be written to client.
    @rtype:                 L{RhResponse}

    """
    serv = server.public_uri
    
    # All URIs are always visible, admin_mode or not.
    # However some of them will not work in admin_mode, e.g. returning a 
    # 503 Service Temporarily Unavailable error.
    
    root_uris = {
                  keys.SERVER_COMPONENT_LIST    : dict(uri=serv + uri_prefix.COMPONENT_LIST,
                                                       description="List of available components."),
                  keys.SERVER_STATS             : dict(uri=serv + uri_prefix.STATS,
                                                       description="Server statistics."),
                  keys.SERVER_LOGFILES          : dict(uri=serv + uri_prefix.LOGS,
                                                       description="Links to various server log files."),
                  keys.SERVER_RESOURCE_LIST     : dict(uri=serv + uri_prefix.RESOURCE_LIST,
                                                       description="__???"),
                  keys.SERVER_RESOURCE_SUMMARY  : dict(uri=serv + uri_prefix.RESOURCE_SUMMARY,
                                                       description="List of available resources."),
                  keys.SERVER_RESOURCE_READ     : dict(uri=serv + uri_prefix.RESOURCE_READ,
                                                       description="__???"),
                  keys.RESOURCE_DIFF            : dict(uri=serv + uri_prefix.RESOURCE_DIFF,
                                                       description="Diff specified resource and its current image."),
                  keys.SERVER_PIPELINE_TEMPLATE : dict(uri=serv + uri_prefix.PIPELINE_RESOURCE_TEMPLATE,
                                                       description="__???"),
                  keys.SERVER_PIPELINE_SUGGEST  : dict(uri=serv + uri_prefix.PIPELINE_SUGGEST_RESOURCE_VALUES,
                                                       description="__???"),
                  keys.SERVER_PIPELINE_VALIDATE : dict(uri=serv + uri_prefix.PIPELINE_VALIDATE_RESOURCE, 
                                                       description="__???"),
                  keys.SERVER_RUNTIME_STATUS    : dict(uri=serv + uri_prefix.RUNTIME_STATUS,
                                                       description="Information about current and past pipeline runs."),
                  keys.SERVER_RUNTIME_INFO      : dict(uri=serv + uri_prefix.RUNTIME,
                                                       description="__???"),
                  keys.SERVER_AUTH              : dict(uri=serv + uri_prefix.AUTH,
                                                       description="Links to the authentication/authorization sub-system"),
                  keys.SERVER_INFO              : dict(uri=serv + uri_prefix.INFO, 
                                                       description="Basic information about the server build."),
                  keys.SERVER_RESOURCE_UPGRADE  : dict(uri=serv + uri_prefix.RESOURCE_UPGRADE, 
                                                       description="Upgrade resources."),
                }

    if is_pe():
        # Additional visible URIs for the PE edition
        root_uris[keys.SERVER_SCHEDULER]    = dict(uri=serv + uri_prefix.SCHEDULER, description="Scheduler sub-system.")
        root_uris[keys.SERVER_NOTIFICATION] = dict(uri=serv + uri_prefix.NOTIFICATION, description="Notification sub-system.")

    render_options = { 'title' : product_prefix.SNAPLOGIC_PRODUCT + ': Main page' }
    if server.config.get_section('main')['admin_mode']:
        render_options['mode_indicator'] = "Admin mode"

    return RhResponse(200, root_uris, None, render_options)


class __MyStream(object):
    """
    A small helper class, which allows us to assemble
    streamed output (expected to be written to an object
    with a write method) into a list.

    """
    def __init__(self):
        self.reset()

    def write(self, buf):
        self.buf_list.append(buf)

    def get(self):
        return self.buf_list

    def reset(self):
        self.buf_list = []


def get_logs(http_req):
    """
    Return server log info.

    @param http_req:        HTTP request object.
    @type  http_req:        L{HttpRequest}

    @return:                RhResponse object with data and code
                            to be written to client.
    @rtype:                 L{RhResponse}
    
    """
    # If the requestor has READ permissions and this is a request
    # for a specific pipeline log then we need to gather relevant
    # information from a number of log files from the various CCs
    # and present all of this as one cohesive output.
    if http_req.permissions & auth.PERM_READ:
        logname = snap_log.get_log_name(http_req)
        params = {}
        ret = snap_log.process_log_params(logname, http_req, params)
        if ret is not None:
            return ret
        if logname == "pipeline":
            if "rid" not in params:
                return RhResponse(400, "Bad request: URI must specify runtime id with 'rid' parameter")
            # Yes, this is the kind of request we are looking for.
            # Let the work begin.
            # We request the various logs that we need as JSON
            # objects and then create a single output from them.

            out_list = [ "=== Log output of main server ===\n" ]
            my_stream = __MyStream()
            server.logger.write_log_out("main", my_stream, params) 
            main_out_list = my_stream.get()
            if main_out_list:
                out_list.append("== Main log ==\n")
                out_list.extend(main_out_list)

            # And also from our main server exception log
            my_stream = __MyStream()
            server.logger.write_log_out("exception", my_stream, params)
            ex_out_list = my_stream.get()
            if ex_out_list:
                out_list.append("\n")
                out_list.append("== Exception log ==\n")
                out_list.extend(ex_out_list)

            # ------------------------------------------------
            # A small helper function which gets information
            # from a CC log file into a list.
            def get_cc_log_data(logname):
                log_list = []
                cc_params = {}
                for p in snap_log.log_uri_params:
                    if p in http_req.params:
                        cc_params[p] = http_req.params[p]
                        
                log_uri = snap_http_lib.add_params_to_uri("%s%s/%s" % (cc_base_uri, uri_prefix.LOGS, logname),
                                                          cc_params)
                try :
                    resp    = snap_http_lib.urlopen("GET", log_uri, None,
                                                    { CC_TOKEN_HEADER : cc_token, 'accept' : 'application/json' } )
                except Exception, e:
                     server.serv_elog(e, "Failed to contact CC %s to get log messages" % cc_name)
                     return []
                if resp.getStatus() == 200:
                    resp_headers = resp.getHeaders()
                    jrp = rp.get_rp('application/json')
                    jrp_reader = jrp.Reader(resp)
                    try:
                        while True:
                            n = jrp_reader.next()
                            if n:
                                log_list.append(n)
                            else:
                                break
                    except Exception, e:
                        # Didn't get anything back?
                        pass
                return log_list
            # ------------------------------------------------

            # Now get the lines from the various CC logs
            for cc_name in cc_list.cc_map:
                cc_token    = cc_list.cc_map[cc_name]['cc_token']
                cc_base_uri = cc_list.cc_map[cc_name]['uri']

                # The CC exception log
                main_log_list      = get_cc_log_data("main")
                exception_log_list = get_cc_log_data("exception")
                if main_log_list or exception_log_list:
                    out_list.append("\n")
                    out_list.append("=== Log output of component container '%s' (%s) ===\n" % (cc_name, cc_base_uri))

                    if main_log_list:
                        out_list.append("\n")
                        out_list.append("== Main log ==\n")
                        out_list.extend(main_log_list)

                    if exception_log_list:
                        out_list.append("\n")
                        out_list.append("== Exception log ==\n")
                        out_list.extend(exception_log_list)

            snap_log.send_headers_for_logs(http_req, logname)
            # A line starting with some '=' characters indicates a headline. We pass that
            # fact to the HTML-RP by setting the 'hlevel' option accordingly.
            options = {}
            for l in out_list:
                if l.startswith("="):
                    if l.startswith("==="):
                        options['hlevel'] = 2
                    elif l.startswith("=="):
                        options['hlevel'] = 3
                    else:
                        options['hlevel'] = 4
                    http_req.output.write(l, options)
                else:
                    http_req.output.write(l)

            http_req.output.end()
            server.rlog(http_req, 200)
            return None

    # We are merely calling the generic get_log() function of the snap_log module
    # with the process specific parameters.
    return snap_log.get_logs(http_req, server.public_uri, server.logger, server.rlog, server.elog)


def get_stats(http_req):
    """
    Return server stats information.

    @param http_req:        HTTP request object.
    @type  http_req:        L{HttpRequest}

    @return:                RhResponse object with data and code
                            to be written to client.
    @rtype:                 L{RhResponse}
    
    """
    d = {}
    d["Main Server"] = server.stats.get_all_groups()

    if http_req.permissions & auth.PERM_READ:
        # If the requestor has READ permissions, we also get the stats from all the CCs.

        # --------------------------------------------------
        # Helper function to get stats from a CC 
        def get_cc_stats():
            try:
                resp = snap_http_lib.urlopen("GET", "%s%s" % (cc_base_uri, uri_prefix.STATS), None,
                                                 { CC_TOKEN_HEADER : cc_token, 'accept' : 'application/json' } )
                if resp.getStatus() == 200:
                    resp_headers = resp.getHeaders()
                    jrp = rp.get_rp('application/json')
                    jrp_reader = jrp.Reader(resp)
                    return jrp_reader.next()
            except:
                pass
            return None
        # ------------------------------------------------

        for cc_name in cc_list.cc_map:
            cc_token    = cc_list.cc_map[cc_name]['cc_token']
            cc_base_uri = cc_list.cc_map[cc_name]['uri']
            cc_stats    = get_cc_stats()
            if cc_stats:
                d["CC " + cc_name] = cc_stats
            else:
                d["CC " + cc_name] = "Could not retrieve stats!"

    return RhResponse(200, d, None, { 'title' : product_prefix.SNAPLOGIC_PRODUCT + ': Stats', 'first_field' : 'Main Server' })


server_info = None


def info(http_req):
    """
    Return server info.

    @param http_req:        HTTP request object.
    @type  http_req:        L{HttpRequest}

    @return:                RhResponse object with data and code
                            to be written to client.
    @rtype:                 L{RhResponse}
    
    """

    render_options = { 'title' : product_prefix.SNAPLOGIC_PRODUCT + ': Info' }

    if server.config.get_section('main')['admin_mode']:
        render_options['mode_indicator'] = "Admin mode"

    return RhResponse(200, server_info, None, render_options)


def serve_static(http_req):
    """
    Serve static files in the __static__ directory.

    @param http_req:        HTTP request object.
    @type  http_req:        L{HttpRequest}

    @return:                None, indicating that it has taken care of
                            the output stream already, or RhResponse
                            object with an error...
    
    """

    global main_conf

    try:
        static_dir = main_conf['static_dir']
        fname = os.path.join(static_dir, http_req.path[len(uri_prefix.STATIC)+1:])
        f = open(fname, "rb")
        d = f.read()
        http_req.send_response_headers(200, None, False)
        http_req._output.write(d)
        server.rlog(http_req, 200)
    except:
        return RhResponse(http_req.NOT_FOUND, "Not found")

        
    return None
   
def service_unavailable(http_req):
    """
    This handler sends back 503 service unavailable,
    which is used to tell the clients that some URIs aren't available 
    if server is running in admin mode.
    """
    error = "Server running in admin_mode cannot service %s" % http_req.path
    server.log(snap_log.LEVEL_ERR, error)  
    return RhResponse(http_req.SERVICE_UNAVAILABLE, error)        

def _init_handlers(pipe_to_http_uri_prefix):
    """
    Initialize the L{handlers} matrix.

    @param pipe_to_http_uri_prefix:     The prefix for the invocation of the pipe-to-http handler,
                                        which allows a direct browser request to a resource.
    @type  pipe_to_http_uri_prefix:     string
    
    """    

    global handlers
    global prefixes


    # Each handler function is specified together with a permissions mask.
    # The HTTP request has to carry at least one of the permissions that
    # are specified here, or else the RH will return a 401 error, before
    # even calling the request handler function.
    handlers = {
        uri_prefix.ROOT           : { 
                                     'GET'  : (main_root, auth.PERM_READ)
                                    },
        uri_prefix.INFO           : { 
                                     'GET'  : (info, auth.PERM_READ)
                                    },
        uri_prefix.LOGS           : { 
                                         'GET'  : (get_logs, auth.PERM_EXEC | auth.PERM_READ)
                                    },
        uri_prefix.CC_LOGS        : { 
                                         'GET'  : (cc_proxy.cc_log_uri_mapper, auth.PERM_READ | auth.PERM_EXEC),
                                    },
        uri_prefix.STATS          : { 
                                         'GET'  : (get_stats, auth.PERM_EXEC | auth.PERM_READ)
                                    },
        uri_prefix.SELF_CHECK     : { 
                                         'POST'  : (uri_checker.process_self_check, auth.PERM_EXEC)
                                    },
                                    
        uri_prefix.URI_CHECK      : { 
                                         'POST'  : (uri_checker.process_uri_check, auth.PERM_EXEC)
                                    },
        uri_prefix.STATIC         : { 
                                         'GET'  : (serve_static, auth.PERM_READ | auth.PERM_EXEC)
                                    },
        pipe_to_http_uri_prefix   : { 
                                         'GET'  : (pipe_to_http.rh_pipe_to_http, auth.PERM_READ),
                                         'POST' : (http_to_pipe.rh_http_to_pipe, auth.PERM_READ | auth.PERM_EXEC)
                                    },

        uri_prefix.RESOURCE_LIST  : { 
                                         'GET'  : (repo_rh_front_end.rh_list_resources, auth.PERM_READ | auth.PERM_EXEC),
                                         'POST' : (repo_rh_front_end.rh_list_resources, auth.PERM_READ | auth.PERM_EXEC),
                                    },
        uri_prefix.RESOURCE_SUMMARY : { 
                                         'GET'  : (repo_rh_front_end.rh_summarize_resources, auth.PERM_READ | auth.PERM_EXEC),
                                         'POST' : (repo_rh_front_end.rh_summarize_resources, auth.PERM_READ | auth.PERM_EXEC),
                                    },
        uri_prefix.RESOURCE_READ  : {
                                         'POST' : (repo_rh_front_end.rh_read_resources, auth.PERM_READ | auth.PERM_EXEC)
                                    },
        uri_prefix.AUTOGEN      :   { 
                                         'PUT'    : (repo_rh_front_end.rh_update_resource, auth.PERM_WRITE),
                                         'GET'    : (repo_rh_front_end.rh_read_resources, auth.PERM_READ | auth.PERM_EXEC),
                                         'POST'   : (pipeline_manager.process_prepare_request, auth.PERM_EXEC),
                                         'DELETE' : (repo_rh_front_end.rh_delete_resource, auth.PERM_WRITE)
                                    },
        uri_prefix.RESOURCE_DIFF  : {
                                         'POST'   : (rh_resource_diff, auth.PERM_WRITE | auth.PERM_EXEC) 
                                    },
        uri_prefix.PIPELINE_RESOURCE_TEMPLATE : {
                                                    'POST' : (None, None) #TODO
                                                },
        uri_prefix.PIPELINE_SUGGEST_RESOURCE_VALUES : {
                                                        'POST' :  (pipeline_rh_front_end.rh_suggest_resource_values, auth.PERM_WRITE | auth.PERM_EXEC)
                                                      },
        uri_prefix.PIPELINE_VALIDATE_RESOURCE : {
                                                    'POST' : (pipeline_rh_front_end.rh_validate, auth.PERM_WRITE | auth.PERM_EXEC)
                                                },
        uri_prefix.COMPONENT_LIST : {
                                         'GET'  : (comp_list.rh_get_component_list, auth.PERM_READ)
                                    },
        uri_prefix.RUNTIME_STATUS : { 
                                         'GET'  : (pipeline_manager.process_runtime_get_request, auth.PERM_READ | auth.PERM_EXEC),
                                         'PUT'  : (pipeline_manager.process_runtime_put_request, auth.PERM_EXEC)
                                    },
        uri_prefix.COMPONENT      : {
                                        'GET'     : (cc_proxy.cc_uri_mapper, auth.PERM_READ | auth.PERM_EXEC),
                                        'POST'    : (cc_proxy.cc_uri_mapper, auth.PERM_READ | auth.PERM_EXEC),
                                        'PUT'     : (cc_proxy.cc_uri_mapper, auth.PERM_READ | auth.PERM_EXEC)
                                    },
        uri_prefix.CC_REGISTER    : {
                                        'POST'    : (cc_list.cc_register, auth.PERM_WRITE)
                                    },

        uri_prefix.AUTH:            {
                                        'GET'   : ( auth.auth_get_handler, auth.PERM_READ | auth.PERM_EXEC )
                                    },
        uri_prefix.AUTH_CHECK:      {
                                        'GET'   : ( auth.auth_check, auth.PERM_READ | auth.PERM_EXEC )
                                    },

        uri_prefix.AUTH_USER_LIST:  {
                                        'GET'   : ( auth.user_list, auth.PERM_READ | auth.PERM_EXEC )
                                    },
        uri_prefix.AUTH_USER_ENTRY: {
                                        'GET'   : ( auth.user_entry_get, auth.PERM_READ | auth.PERM_EXEC ),
                                        'POST'  : ( auth.user_entry_post, auth.PERM_READ | auth.PERM_EXEC ),
                                        'PUT'   : ( auth.user_entry_put, auth.PERM_READ | auth.PERM_EXEC ),
                                        'DELETE': ( auth.user_entry_delete, auth.PERM_READ | auth.PERM_EXEC )
                                    },
        uri_prefix.AUTH_GROUP_LIST: {
                                        'GET'   : ( auth.group_list, auth.PERM_READ | auth.PERM_EXEC ),
                                    },
        uri_prefix.AUTH_GROUP_ENTRY: {
                                        'GET'   : ( auth.group_entry_get, auth.PERM_READ | auth.PERM_EXEC ),
                                        'POST'  : ( auth.group_entry_post, auth.PERM_READ | auth.PERM_EXEC ),
                                        'PUT'   : ( auth.group_entry_put, auth.PERM_READ | auth.PERM_EXEC ),
                                        'DELETE': ( auth.group_entry_delete, auth.PERM_READ | auth.PERM_EXEC )
                                    },
        uri_prefix.AUTH_ACL_LIST: {
                                        'GET'   : ( auth.acl_list, auth.PERM_READ | auth.PERM_EXEC ),
                                    },
        uri_prefix.AUTH_ACL_ENTRY: {
                                        'GET'   : ( auth.acl_entry_get, auth.PERM_READ | auth.PERM_EXEC ),
                                        'POST'  : ( auth.acl_entry_post, auth.PERM_READ | auth.PERM_EXEC ),
                                        'PUT'   : ( auth.acl_entry_put, auth.PERM_READ | auth.PERM_EXEC ),
                                        'DELETE': ( auth.acl_entry_delete, auth.PERM_READ | auth.PERM_EXEC )
                                    },
        uri_prefix.RESOURCE_UPGRADE:{
                                        'POST'  : (cc_proxy.upgrade_resources, auth.PERM_READ | auth.PERM_WRITE) 
                                    },
        # This will be explicitly called for a URI that does not match
        # any of the above.
        # Some requests will not have specific URI prefixes. We need to assume that
        # they are for user-named resources, which can live anywhere in the namespace
        # (except under the well-known prefixes).
        None                      : { 
                                         'PUT'    : (repo_rh_front_end.rh_update_resource, auth.PERM_WRITE),
                                         'GET'    : (repo_rh_front_end.rh_get_handler, auth.PERM_READ | auth.PERM_EXEC),
                                         'POST'   : (pipeline_manager.process_prepare_request, auth.PERM_EXEC),
                                         'DELETE' : (repo_rh_front_end.rh_delete_resource, auth.PERM_WRITE)
                                    }
            }

    if is_pe():
        # Additional request handlers for PE edition modules
        handlers[uri_prefix.SCHEDULER] =  { 
                                     'GET'    : (snap_scheduler.list_events, auth.PERM_READ),
                                     'POST'   : (snap_scheduler.create_event, auth.PERM_WRITE),
                                     'PUT'    : (snap_scheduler.update_event, auth.PERM_WRITE),
                                     'DELETE' : (snap_scheduler.delete_event, auth.PERM_WRITE),
                                    }
        handlers[uri_prefix.NOTIFICATION] = { 
                                     'GET'    : (snap_notifier.show_capabilities, auth.PERM_READ | auth.PERM_EXEC),
                                    }

    # If in admin_mode only some URIs work, others should return 503 service temporarily unavailable. 
    if server.config.get_section('main')['admin_mode']:
        uris_allowed_in_admin_mode = (
                                      uri_prefix.ROOT, 
                                      uri_prefix.INFO,
                                      uri_prefix.SELF_CHECK,
                                      uri_prefix.URI_CHECK,
                                      uri_prefix.STATIC, 
                                      uri_prefix.RESOURCE_LIST, 
                                      uri_prefix.RESOURCE_UPGRADE, 
                                      uri_prefix.RESOURCE_SUMMARY, 
                                      uri_prefix.CC_REGISTER, 
                                      uri_prefix.AUTH_CHECK,
                                      None 
                                     )
        for uri in handlers:
            if uri not in uris_allowed_in_admin_mode:
                # This URI isn't allowed in admin_mode.
                # Change the handler to service_unavailable
                handler_dict = handlers[uri]
                for method in handler_dict:
                    # For each method, change method to service_unavailable 
                    (handler, perm) = handler_dict[method]
                    handler_dict[method] = (service_unavailable, perm)
    
    # Get the keys of handlers dict, which is subsequently sorted by the length
    # of each list element (longest first). This allows us to do a longest-match
    # first search later on.
    #prefixes = [uri_prefix.__dict__[p] for p in uri_prefix.__dict__.keys() if not p.startswith('__')]
    prefixes = [ k for k in handlers if k is not None]
    prefixes.sort(lambda x, y: -cmp(len(x), len(y)))


def is_ready():
    """
    Return True of the server is ready to go.

    This is needed because the scheduler will start to fire
    off events, even if the scheduler has not been able to
    make contact to any CCs yet.

    The scheduler will be given a handle to this function.
    As long as it returns False, it will not attempt to
    run any events.

    @return:    A flag indicating readiness of the server.
    @rtype:     bool

    """
    return cc_list.is_ready()


def initialize():
    """
    Perform any necessary initialization.

    Whichever modules requires an initialization step: Add a call to your
    initialization function here.

    For the request handler, it is useful to have its initialization done
    here, rather than in the application() function, because that function
    is called for every request. Whatever we initialize for it here will
    be available to application() as a global.

    """
    
    global main_conf, pipe_to_http_uri_prefix, error_sem
    
    # Set up the semaphore that limits bad requests
    error_sem = BoundedSemaphore(value = MAX_BAD_REQUESTS)

    # Process the command line
    import sys
    import getopt
    config_filename = None
    
    """
    Admin mode can be set with a command line option and disables certain features and URIs.
    An example of when this can be used is during an upgrade:
    When repository is being upgraded from one version to another we need
    to prevent concurrent modifications, e.g. saving the resources and 
    execution of pipelines.
    """
    admin_mode = False

    try:
        opts, xargs = getopt.getopt(sys.argv[1:], "c:A", ["config=", "admin_mode"])
    except getopt.GetoptError, e:
        sys.exit("Option(s) specified are not valid %s" % str(e))
    for o, a in opts:
        if o in ("-c", "--config"):
            config_filename = a
        if o in ("-A", "--admin_mode"):
            admin_mode = True

    # If a config file has been specified, then use it.
    if config_filename is not None:
        try:
            conf = snap_config.SnapConfig(config_filename, None, snap_config.MAIN_SERVER_DEFAULT,
                                          exclude_list=["component_container"])
        except Exception, e:
            sys.exit("Config file %s failed to parse: %s" % (config_filename, str(e)))
    else:
        conf = snap_config.SnapConfig(None, None, snap_config.MAIN_SERVER_DEFAULT)

    server.config         = conf

    # Initialization for the request handler. We need to set up logging.
    # So, we create the log object as well as facility specific logger functions.
    # Those two functions will be stored as module globals, so that the
    # application() function has easy access to them.

    main_conf = conf.get_section("main")
    
    # Store admin_mode in the config object
    main_conf['admin_mode'] = admin_mode

    cr = 'no'
    try:
        cr = main_conf["console_output"]
        to_console = cr == 'yes'
    except:
        to_console = False

    if cr not in [ 'yes', 'no' ]:
        raise SnapException("Config file entry 'console_output' must be either 'yes' or 'no', found '%s' instead." % cr)

    # The server name is mandatory, but only if the proxy URI is not defined.
    if not main_conf["server_proxy_uri"]  and  not main_conf["server_hostname"]:
        raise SnapException("Config file entry 'server_hostname' is mandatory if 'server_proxy_uri' is undefined.")

    server.logger         = snap_log.SnapLog("main_process", main_conf["log_dir"], to_console,
                                             main_conf["disable_logging"], main_conf["log_level"],
                                             main_conf["max_log_size"], main_conf["log_backup_count"])
    try:
        log_level = main_conf["log_level"]
        if not snap_log._level_lookup.has_key(log_level):
            sys.exit("Config file %s failed to parse: Illegal log-level '%s'" % (config_filename, log_level))
    except:
        log_level = snap_log.LEVEL_INFO

    ( log,  elog, rlog )  = server.logger.make_specific_loggers(snap_log.LOG_RH)
    server.log            = log
    server.elog           = elog
    server.rlog           = rlog

    ( serv_log,  serv_elog, serv_rlog )  = server.logger.make_specific_loggers(snap_log.LOG_SERVER)
    server.serv_log       = serv_log
    server.serv_elog      = serv_elog
    server.serv_rlog      = serv_rlog

    # Set up the snap_stats object for this process, and initialize the stats-group
    # for the request handler module.
    server.stats = snap_stats.SnapStats()
    server.stats_group  = snap_stats.SnapStatsGroup("Request Handler")
    server.stats.add_stats_group(server.stats_group)
    server.stats_group.add_field_stat("alive_since", "number", time.time())
    server.stats_group.add_counter_stat("number_of_requests")
    server.stats_group.add_counter_stat("number_of_failed_requests")
    server.stats_group.add_counter_stat("number_of_unauthorized_requests")
    server.stats_group.add_counter_stat("number_of_get_requests")
    server.stats_group.add_counter_stat("number_of_post_requests")
    server.stats_group.add_counter_stat("number_of_put_requests")
    server.stats_group.add_counter_stat("number_of_delete_requests")
    server.stats_group.add_counter_stat("number_of_invalid_method_requests")
    
    # Add pipeline manager stats group
    server.pm_stats_group  = snap_stats.SnapStatsGroup("Pipeline Manager")
    server.stats.add_stats_group(server.pm_stats_group)
    server.pm_stats_group.add_counter_stat("number_of_running_pipelines")
    server.pm_stats_group.add_counter_stat("number_of_completed_pipelines")
    server.pm_stats_group.add_counter_stat("number_of_failed_pipelines")
    server.pm_stats_group.add_counter_stat("number_of_stopped_pipelines")
    
    # Add a couple of computed parameters to config, so that they become available
    # to everyone...
    set_common_config(server.config)

    # Print the banner on stdout
    buf = product_prefix.SNAPLOGIC_PRODUCT + " started"
    bannerout = "%s\n%s\n%s" % (server_banner, server_copyright, buf)
    if admin_mode:
        bannerout += " in admin mode!"

    server.serv_log(snap_log.LEVEL_INFO, bannerout)

    # Convenient shortcut to our public URI
    server.public_uri = server.config.get_section('common')['main_process_uri']

    for token in ('server_hostname', 'server_address'):
        if snap_http_lib.is_localhost(server.config.get_section('main')[token]):
            server.log(snap_log.LEVEL_WARN,
                       "Server configuration parameter '%s' is set to '%s'. Network capabilities will be restricted." %
                       (token, server.config.get_section('main')[token]))

    # Initialize the repository
    repo_rh_front_end.rh_init()
    
    if "credentials_store" in main_conf  and  main_conf["credentials_store"]:
        credentials_store.initialize(main_conf["credentials_store"])
        
    runtime_table.initialize(main_conf['runtime_entry_timeout'])

    cc_list.initialize()
    # We won't initiate the population of the list yet, because
    # the CCs most likely aren't ready anyway. But we can already
    # provide a mapping for the access to the logfiles of the CCs
    # via the main server.
    cc_log_map = {}
    # Add mappings for each CC
    for cc_name in cc_list.cc_map:
        # Add a mapping for each of the well-known log types
        for lpath in [ "access", "exception", "main" ]:
            cc_log_map["%s/%s/%s" % (uri_prefix.CC_LOGS, cc_name, lpath)] = "%s%s/%s" % (cc_list.cc_map[cc_name]['uri'], uri_prefix.LOGS, lpath)

    server.component_list = comp_list
    comp_list.initialize(cc_log_map)
    pipe_to_http.initialize()
    http_to_pipe.initialize()
    pipeline_manager.initialize(server)
    
    pipe_to_http_uri_prefix = main_conf["pipe_to_http_uri_prefix"]

    _init_handlers(pipe_to_http_uri_prefix)

    global server_info
    server_info = {}
    
    # If we are in admin_mode add admin_mode:True to 
    # the server_info structure so Designer knows we are in this restricted
    # maintenance-only mode.
    if admin_mode:
        server_info['admin_mode'] = True
        
    vinfo_dict = version_info.__dict__
    for key in vinfo_dict.keys():
        val = vinfo_dict[key] 
        if not key.startswith("__") and not type(val) == types.FunctionType:
            server_info[key] = val

    try:
        if main_conf.has_key('auth_file_config')  and  main_conf.has_key('auth_file_passwords'):
            auth_type = 'file'
            db        = main_conf['auth_file_config']
            passwds   = main_conf['auth_file_passwords']
        else:
            sys.exit("Config file %s does not specify auth_file_config or auth_file_passwords." % (config_filename))
            auth_type = None
            db        = None
            passwds   = None

        auth.initialize(auth_type, db, passwds)

    except Exception, e:
        elog(e, str(e))
        sys.exit("Problem reading authentication information...")

    if is_pe():
        # Initializations for the professional edition
        #
        #   * notifier
        #   * scheduler
        #
        try:
            snap_notifier.initialize(conf.get_section("notification"))
        except SnapObjNotFoundError, e:
            # The notification section is optional...
            pass
        
        # Since the scheduler could try to run pipelines on its own,
        # we initialize it last. The rest of the server should be
        # in place before this one starts to work.
        if not admin_mode:
            snap_scheduler.initialize(is_ready)
            
def read_ignore_request(http_req, ret, server):
    """
    Read all the data from the HTTP socket and ignore it.
    This is usually used when we are about to deny the request,
    e.g. because of insufficient permissions.
    However before we reject the request we make sure to read in 
    all the data the client has to send.  This makes it easier
    to handle the response in the client.
    
    Had we not done this, a race condition may have occurred
    where depending on how much data was sent or received 
    the client could have gotten a connection reset error.
    
    Usually this happens under the following circumstances:
    1) Client hits a URL and starts sending data.
    2) Server reads the headers from the request such as username/password
       and decides to deny it.
    3) Server sends back the HTTP error code, and the HTTP message
       and closes the connection.
    4) Client is still sending data while connection is closed.
    5) Client fails.  Depending on how the client is implemented
       the connection may be in a bad state so that reading from it
       is no longer possible.  
    """

    # Is this a "bad" request?
    if ret is not None and ret.http_code != 200:
        # Acquire the semaphore in a non-blocking mode.
        # This will succeed if there aren't too many simulatenous requests.
        # (less than the MAX_BAD_REQUESTS value that the semaphore was created with).
        if error_sem.acquire(blocking = False):
            try:
                # Read in the request completely
                while http_req._input.read(MAX_BAD_REQUEST_LENGTH):
                    pass
            except Exception, e:
                server.elog(e, 'Error reading request.')
            finally:
                # Release the semaphore
                error_sem.release()
        else:
            # We couldn't acquire the sempahore because there were too many 
            # simultaneous bad requests.
            server.log(snap_log.LEVEL_WARN, 'Too many simultatenous bad requests')  

def application(environ, start_response):
    """
    Handle a new HTTP request.

    This is the core function of our WSGI application.

    @param  environ:        The environment object of the WSGI request.
                            It contains all HTTP headers.
    @type   environ:        dictionary

    @param  start_response: A callable provided by WSGI, which can be used
                            to write the response.
    @type   start_response: callable

    """

    server.stats_group.get_stat("number_of_requests").inc()
    http_req = HttpRequest(environ, start_response)

    if http_req.method == "GET":
        server.stats_group.get_stat("number_of_get_requests").inc()
    elif http_req.method == "POST":
        server.stats_group.get_stat("number_of_post_requests").inc()
    elif http_req.method == "PUT":
        server.stats_group.get_stat("number_of_put_requests").inc()
    elif http_req.method == "DELETE":
        server.stats_group.get_stat("number_of_delete_requests").inc()

    #print environ
    #print environ['HTTP_USER_AGENT']
    #print environ['HTTP_ACCEPT']
    #server.log(snap_log.LEVEL_INFO, "req: %s [%s]" % (http_req.raw_uri, http_req.client_address))

    """

    # I'm leaving this comment block in here, as a documentation on
    # how to use the req object, and how to handle exceptions and
    # logging and such. All of that will be removed in the future,
    # of course...

    response_headers = [('Content-type', 'text/plain'),]
    http_req.send_response_headers(200, response_headers)
    http_req.output.write("This is another test...")
    d = http_req.input.read(http_req.content_length)
    print "@@@@@@ ", d
    server.log(snap_log.LEVEL_INFO, "Received request: %s" % http_req.path)
    try:
        try:
            raise SnapGeneralError("GENERAL")
        except Exception, e:
            raise SnapException.chain(e, SnapNativeError("NATIVE"))
    except Exception, e:
        server.elog(e, "This is my high-level message...")
    """


    """
    if http_req.method == "POST":
        print "\n\n#### POST ####\n"
        http_req.make_input_rp()
        for d in http_req.input:
            print d
    """

    if server.config.get_section('main')['admin_mode']:
        render_options = { 'mode_indicator' : "Admin mode" }
    else:
        render_options = None

    try:
        if http_req.method in [ "POST", "PUT" ] and not http_req.path.startswith(pipe_to_http_uri_prefix):
            http_req.make_input_rp()
        
        path = http_req.path
        ret = None
        for p in prefixes:
            if http_req.path == p or http_req.path.startswith(p + "/"):
                if handlers.has_key(p):
                    handler_dict = handlers[p]
                    if handler_dict.has_key(http_req.method):
                        (handler, permissions) = handler_dict[http_req.method]
                        if not http_req.permissions & permissions:
                            ret = RhResponse(http_req.UNAUTHORIZED, "Insufficient permissions for method '%s' on '%s'" % (http_req.method, path), None, render_options)
                            server.stats_group.get_stat("number_of_unauthorized_requests").inc()
                        else:
                            ret = handler(http_req)
                        break
                    else:
                        ret = RhResponse(http_req.METHOD_NOT_ALLOWED, "Method '%s' not allowed for '%s'" % (http_req.method, path), None, render_options)
                        server.stats_group.get_stat("number_of_invalid_method_requests").inc()
                        break
        else:
            # This only gets executed if the for-loop ran to completion,
            # and this will only be the case if the request doesn't match
            # any of the well-known prefixes. In that case, we need to
            # assume that this is for user-named resources, which can live
            # anywhere in the namespace, outside of the well-known prefixes.
            handler_dict = handlers[None]
            if handler_dict.has_key(http_req.method):
                (handler, permissions) = handler_dict[http_req.method]
                if not http_req.permissions & permissions:
                    ret = RhResponse(http_req.UNAUTHORIZED, "Insufficient permissions for method '%s' on '%s'" % (http_req.method, path), None, render_options)
                    server.stats_group.get_stat("number_of_unauthorized_requests").inc()
                else:
                    ret = handler(http_req)
            else:
                ret = RhResponse(http_req.METHOD_NOT_ALLOWED, "Method '%s' not allowed for '%s'" % (http_req.method, path), None, render_options)
                server.stats_group.get_stat("number_of_invalid_method_requests").inc()

    except Exception, e:
        # An exception? Shouldn't happen, so all we can do is indicate
        # an internal server error...
        server.elog(e, str(e))
        ret = RhResponse(500, "Internal server error", None, render_options)
        server.stats_group.get_stat("number_of_failed_requests").inc()

    # Make sure to read in the request completely before sending the response.
    read_ignore_request(http_req, ret, server)

    # Writing the response back to the client, if there is one
    if ret:
        try:
            headers = []
            if ret.status:
                # Some handlers may have set a special Snapi return status. That
                # is communicated as an X-header.
                headers.append( (STATUS_HEADER, str(ret.status) ) )
            elif ret.headers:
                # In other cases there might be additional headers as well. These
                # can be set by the RhResponse object itself as required by certain
                # response codes. For example, the "WWW-Authenticate" header needs
                # to be set for the 401 response.
                for h in ret.headers:
                    headers.append(h)
            if ret.data is not None  and  ret.http_code != HttpRequest.UNSUPPORTED_MEDIA_TYPE:
                initialize_out_stream = True
            else:
                initialize_out_stream = False
            http_req.send_response_headers(ret.http_code, headers, initialize_out_stream, ret.options)
            if ret.data is not None:
                try:
                    if ret.http_code != HttpRequest.UNSUPPORTED_MEDIA_TYPE:
                        http_req.output.write(ret.data)
                        http_req.output.end()
                    else:
                        http_req._output.write(ret.data)
                except socket.error, e:
                    # We are ignoring those errors quietly, since they are often caused
                    # by the client (outside of our control) doing something strange, such
                    # as just closing its connection. This can happen, and we don't need
                    # to report this as a big exception.
                    pass
            server.rlog(http_req, ret.http_code)
            # send_response_headers() can raise an exception if the media type
            # for output is not known. We silently ignore that, since it will
            # have sent the right HTTP status code for that already.
        except Exception, e:
            server.elog(e)
            pass
    
    return []

def signal_handler(*args, **kwargs):
    sys.exit(1)
            
    
def main():
    """
    Do something when in standalone mode.

    We need this in a main() function, due to particularities of our installer.
    Normally, we would just put this code right at the very end, where we now
    have the call to main().

    """
    global main_conf

    try:
        signal.signal(signal.SIGINT, signal_handler)
    except ValueError:
        # In some of our test cases, the server is run as a thread. In that
        # case, we get an exception, because a signal handler can only be
        # installed in the main thread.
        pass

    from paste import httpserver
    
    server.server_token = snap_crypt.generate_random_string()
    server.local_host_ip_addresses = snap_http_lib.gather_local_host_ips([main_conf["server_hostname"]])
    server.server_port = main_conf["server_port"]
                                                                          
    httpserver.serve(auth.SnapAuthBasicHandler(application, server.auth_realm, auth.check_auth),
                                               host=main_conf["server_address"], port=main_conf["server_port"],
                                               use_threadpool=False)


# -------------------------------------------------------------------
# The initialization code needs to be called, even if the 'main' part
# below is not executed. This could be the case when we are running
# behind Apache via mod_wsgi, for example. That's why the call to
# initialize() takes place unconditionally right here.
# -------------------------------------------------------------------

initialize()

# --------------------------------------------------------------------
# If we are run as a stand alone application, we will use the HTTP and
# WSGI server that comes with the Paste package.
# --------------------------------------------------------------------
if __name__ == '__main__':
    main()

www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.