thread_handler.py :  » Web-Frameworks » Zope » Zope-2.6.0 » ZServer » medusa » thread » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Frameworks » Zope 
Zope » Zope 2.6.0 » ZServer » medusa » thread » thread_handler.py
# -*- Mode: Python; tab-width: 4 -*-

import re
import string
import StringIO
import sys

import os
import sys
import time

import counter
import select_trigger
import producers

from default_handler import split_path,unquote,get_header

import fifo

import threading

class request_queue:

    def __init__ (self):
        self.mon = threading.RLock()
        self.cv = threading.Condition (self.mon)
        self.queue = fifo.fifo()
        
    def put (self, item):
        self.cv.acquire()
        self.queue.push (item)
        self.cv.notify()
        self.cv.release()
        
    def get(self):
        self.cv.acquire()
        while not self.queue:
            self.cv.wait()
        result = self.queue.pop()
        self.cv.release()
        return result
        
header2env= {
        'Content-Length'  : 'CONTENT_LENGTH',
        'Content-Type'    : 'CONTENT_TYPE',
        'Referer'      : 'HTTP_REFERER',
        'User-Agent'    : 'HTTP_USER_AGENT',
        'Accept'      : 'HTTP_ACCEPT',
        'Accept-Charset'  : 'HTTP_ACCEPT_CHARSET',
        'Accept-Language'  : 'HTTP_ACCEPT_LANGUAGE',
        'Host'        : 'HTTP_HOST',
        'Connection'    : 'CONNECTION_TYPE',
        'Authorization'    : 'HTTP_AUTHORIZATION',
        'Cookie'      : 'HTTP_COOKIE',
        }

# convert keys to lower case for case-insensitive matching
for (key,value) in header2env.items():
    del header2env[key]
    key=string.lower(key)
    header2env[key]=value
    
class thread_output_file (select_trigger.trigger_file):

    def close (self):
        self.trigger_close()
        
class script_handler:

    def __init__ (self, queue, document_root=""):
        self.modules = {}
        self.document_root = document_root
        self.queue = queue
        
    def add_module (self, module, *names):
        if not names:
            names = ["/%s" % module.__name__]
        for name in names:
            self.modules['/'+name] = module
            
    def match (self, request):
        uri = request.uri
        
        i = string.find(uri, "/", 1)
        if i != -1:
            uri = uri[:i]
            
        i = string.find(uri, "?", 1)
        if i != -1:
            uri = uri[:i]
            
        if self.modules.has_key (uri):
            request.module = self.modules[uri]
            return 1
        else:
            return 0
            
    def handle_request (self, request):
    
        [path, params, query, fragment] = split_path (request.uri)
        
        while path and path[0] == '/':
            path = path[1:]
            
        if '%' in path:
            path = unquote (path)
            
        env = {}
        
        env['REQUEST_URI'] = "/" + path
        env['REQUEST_METHOD']  = string.upper(request.command)
        env['SERVER_PORT']   = str(request.channel.server.port)
        env['SERVER_NAME']   = request.channel.server.server_name
        env['SERVER_SOFTWARE'] = request['Server']
        env['DOCUMENT_ROOT']   = self.document_root
        
        parts = string.split(path, "/")
        
        # are script_name and path_info ok?
        
        env['SCRIPT_NAME']  = "/" + parts[0]
        
        if query and query[0] == "?":
            query = query[1:]
            
        env['QUERY_STRING']  = query
        
        try:
            path_info = "/" + string.join(parts[1:], "/")
        except:
            path_info = ''
            
        env['PATH_INFO']    = path_info
        env['GATEWAY_INTERFACE']='CGI/1.1'           # what should this really be?
        env['REMOTE_ADDR']    =request.channel.addr[0]
        env['REMOTE_HOST']    =request.channel.addr[0]  # TODO: connect to resolver
        
        for header in request.header:
            [key,value]=string.split(header,": ",1)
            key=string.lower(key)
            
            if header2env.has_key(key):
                if header2env[key]:
                    env[header2env[key]]=value
            else:
                key = 'HTTP_' + string.upper(
                        string.join(
                                string.split (key,"-"),
                                "_"
                                )
                        )
                env[key]=value
                
                ## remove empty environment variables
        for key in env.keys():
            if env[key]=="" or env[key]==None:
                del env[key]
                
        try:
            httphost = env['HTTP_HOST']
            parts = string.split(httphost,":")
            env['HTTP_HOST'] = parts[0]
        except KeyError: 
            pass
            
        if request.command in ('put', 'post'):
                # PUT data requires a correct Content-Length: header
                # (though I bet with http/1.1 we can expect chunked encoding)
            request.collector = collector (self, request, env)
            request.channel.set_terminator (None)
        else:
            sin = StringIO.StringIO ('')
            self.continue_request (sin, request, env)
            
    def continue_request (self, stdin, request, env):
        stdout = header_scanning_file (
                request,
                thread_output_file (request.channel)
                )
        self.queue.put (
                (request.module.main, (env, stdin, stdout))
                )
        
HEADER_LINE = re.compile ('([A-Za-z0-9-]+): ([^\r\n]+)')

# A file wrapper that handles the CGI 'Status:' header hack
# by scanning the output.

class header_scanning_file:

    def __init__ (self, request, file):
        self.buffer = ''
        self.request = request
        self.file = file
        self.got_header = 0
        self.bytes_out = counter.counter()
        
    def write (self, data):
        if self.got_header:
            self._write (data)
        else:
                # CGI scripts may optionally provide extra headers.
                # 
                # If they do not, then the output is assumed to be
                # text/html, with an HTTP reply code of '200 OK'.
                # 
                # If they do, we need to scan those headers for one in
                # particular: the 'Status:' header, which will tell us
                # to use a different HTTP reply code [like '302 Moved']
                #
            self.buffer = self.buffer + data
            lines = string.split (self.buffer, '\n')
            # ignore the last piece, it is either empty, or a partial line
            lines = lines[:-1]
            # look for something un-header-like
            for i in range(len(lines)):
                li = lines[i]
                if (not li) or (HEADER_LINE.match (li) is None):
                        # this is either the header separator, or it
                        # is not a header line.
                    self.got_header = 1
                    h = self.build_header (lines[:i])
                    self._write (h)
                    # rejoin the rest of the data
                    d = string.join (lines[i:], '\n')
                    self._write (d)
                    self.buffer = ''
                    break
                    
    def build_header (self, lines):
        status = '200 OK'
        saw_content_type = 0
        hl = HEADER_LINE
        for line in lines:
            mo = hl.match (line)
            if mo is not None:
                h = string.lower (mo.group(1))
                if h == 'status':
                    status = mo.group(2)
                elif h == 'content-type':
                    saw_content_type = 1
        lines.insert (0, 'HTTP/1.0 %s' % status)
        lines.append ('Server: ' + self.request['Server'])
        lines.append ('Date: ' + self.request['Date'])
        if not saw_content_type:
            lines.append ('Content-Type: text/html')
        lines.append ('Connection: close')
        return string.join (lines, '\r\n')+'\r\n\r\n'
        
    def _write (self, data):
        self.bytes_out.increment (len(data))
        self.file.write (data)
        
    def writelines(self, list):
        self.write (string.join (list, ''))
        
    def flush(self):
        pass
        
    def close (self):
        if not self.got_header:
                # managed to slip through our header detectors
            self._write (self.build_header (['Status: 502', 'Content-Type: text/html']))
            self._write (
                    '<html><h1>Server Error</h1>\r\n'
                    '<b>Bad Gateway:</b> No Header from CGI Script\r\n'
                    '<pre>Data: %s</pre>'
                    '</html>\r\n' % (repr(self.buffer))
                    )
        self.request.log (int(self.bytes_out.as_long()))
        self.file.close()
        self.request.channel.current_request = None
        
        
class collector:

    "gathers input for PUT requests"
    
    def __init__ (self, handler, request, env):
        self.handler  = handler
        self.env = env
        self.request  = request
        self.data = StringIO.StringIO()
        
        # make sure there's a content-length header
        self.cl = request.get_header ('content-length')
        
        if not self.cl:
            request.error (411)
            return
        else:
            self.cl = string.atoi(self.cl)
            
    def collect_incoming_data (self, data):
        self.data.write (data)
        if self.data.tell() >= self.cl:
            self.data.seek(0)
            
            h=self.handler
            r=self.request
            
            # set the terminator back to the default
            self.request.channel.set_terminator ('\r\n\r\n')
            del self.handler
            del self.request
            
            h.continue_request (self.data, r, self.env)
            
            
class request_loop_thread (threading.Thread):

    def __init__ (self, queue):
        threading.Thread.__init__ (self)
        self.setDaemon(1)
        self.queue = queue
        
    def run (self):
        while 1:
            function, (env, stdin, stdout) = self.queue.get()
            function (env, stdin, stdout)
            stdout.close()
            
            # ===========================================================================
            #                 Testing
            # ===========================================================================
            
if __name__ == '__main__':

    import sys
    
    if len(sys.argv) < 2:
        print 'Usage: %s <worker_threads>' % sys.argv[0]
    else:
        nthreads = string.atoi (sys.argv[1])
        
        import asyncore
        import http_server
        # create a generic web server
        hs = http_server.http_server ('', 7080)
        
        # create a request queue
        q = request_queue()
        
        # create a script handler
        sh = script_handler (q)
        
        # install the script handler on the web server
        hs.install_handler (sh)
        
        # get a couple of CGI modules
        import test_module
        import pi_module
        
        # install the module on the script handler
        sh.add_module (test_module, 'test')
        sh.add_module (pi_module, 'pi')
        
        # fire up the worker threads
        for i in range (nthreads):
            rt = request_loop_thread (q)
            rt.start()
            
            # start the main event loop
        asyncore.loop()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.