threadedselectreactor.py :  » Business-Application » hylaPEx » hylapex » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Business Application » hylaPEx 
hylaPEx » hylapex » threadedselectreactor.py
# -*- test-case-name: twisted.test.test_internet -*-
# $Id: default.py,v 1.90 2004/01/06 22:35:22 warner Exp $
#
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
# See LICENSE for details.

from __future__ import generators

"""Threaded select reactor

API Stability: unstable

Maintainer: U{Bob Ippolito<mailto:bob@redivi.com>}
"""

from threading import Thread
from Queue import Queue,Empty
from time import sleep
import sys

from zope.interface import implements

from twisted.internet.interfaces import IReactorFDSet
from twisted.internet import error
from twisted.internet import posixbase
from twisted.python import log,components,failure,threadable
from twisted.persisted import styles
from twisted.python.runtime import platformType

import select
from errno import EINTR,EBADF

from twisted.internet.selectreactor import _select

# Exceptions that doSelect might return frequently
_NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method')
_NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')

def dictRemove(dct, value):
    try:
        del dct[value]
    except KeyError:
        pass

def raiseException(e):
    raise e

class ThreadedSelectReactor(posixbase.PosixReactorBase):
    """A threaded select() based reactor - runs on all POSIX platforms and on
    Win32.
    """
    implements(IReactorFDSet)

    def __init__(self):
        threadable.init(1)
        self.reads = {}
        self.writes = {}
        self.toThreadQueue = Queue()
        self.toMainThread = Queue(1)
        self.workerThread = None
        self.mainWaker = None
        posixbase.PosixReactorBase.__init__(self)
        self.addSystemEventTrigger('after', 'shutdown', self._mainLoopShutdown)

    def wakeUp(self):
        # we want to wake up from any thread
        self.waker.wakeUp()

    def callLater(self, *args, **kw):
        tple = posixbase.PosixReactorBase.callLater(self, *args, **kw)
        self.wakeUp()
        return tple

    def _sendToMain(self, msg, *args):
        try:
         #print >>sys.stderr, 'sendToMain', msg, args
         self.toMainThread.put((msg, args))
         if self.mainWaker is not None:
             self.mainWaker()
        except:
         print '_sendtoamain'

    def _sendToThread(self, fn, *args):
        #print >>sys.stderr, 'sendToThread', fn, args
        self.toThreadQueue.put((fn, args))

    def _preenDescriptorsInThread(self):
        log.msg("Malformed file descriptor found.  Preening lists.")
        readers = self.reads.keys()
        writers = self.writes.keys()
        self.reads.clear()
        self.writes.clear()
        for selDict, selList in ((self.reads, readers), (self.writes, writers)):
            for selectable in selList:
                try:
                    select.select([selectable], [selectable], [selectable], 0)
                except:
                    log.msg("bad descriptor %s" % selectable)
                else:
                    selDict[selectable] = 1

    def _workerInThread(self):
        try:
            while 1:
                fn, args = self.toThreadQueue.get()
                #print >>sys.stderr, "worker got", fn, args
                fn(*args)
        except SystemExit:
            pass
        except:
            f = failure.Failure()
            self._sendToMain('Failure', f)
        #print >>sys.stderr, "worker finished"
    
    def _doSelectInThread(self, timeout):
        """Run one iteration of the I/O monitor loop.

        This will run all selectables who had input or output readiness
        waiting for them.
        """
        reads = self.reads
        writes = self.writes
        while 1:
            try:
                r, w, ignored = _select(reads.keys(),
                                        writes.keys(),
                                        [], timeout)
                break
            except ValueError, ve:
                # Possibly a file descriptor has gone negative?
                log.err()
                self._preenDescriptorsInThread()
            except TypeError, te:
                # Something *totally* invalid (object w/o fileno, non-integral
                # result) was passed
                log.err()
                self._preenDescriptorsInThread()
            except (select.error, IOError), se:
                # select(2) encountered an error
                if se.args[0] in (0, 2):
                    # windows does this if it got an empty list
                    if (not reads) and (not writes):
                        return
                    else:
                        raise
                elif se.args[0] == EINTR:
                    return
                elif se.args[0] == EBADF:
                    self._preenDescriptorsInThread()
                else:
                    # OK, I really don't know what's going on.  Blow up.
                    raise
        self._sendToMain('Notify', r, w)
        
    def _process_Notify(self, r, w):
        #print >>sys.stderr, "_process_Notify"
        reads = self.reads
        writes = self.writes
    
        _drdw = self._doReadOrWrite
        _logrun = log.callWithLogger
        for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", writes)):
            for selectable in selectables:
                # if this was disconnected in another thread, kill it.
                if selectable not in dct:
                    continue
                # This for pausing input when we're not ready for more.
                _logrun(selectable, _drdw, selectable, method, dct)
        #print >>sys.stderr, "done _process_Notify"

    def _process_Failure(self, f):
        f.raiseException()

    _doIterationInThread = _doSelectInThread

    def ensureWorkerThread(self):
        if self.workerThread is None or not self.workerThread.isAlive():
            self.workerThread = Thread(target=self._workerInThread)
            self.workerThread.setDaemon(True)
            self.workerThread.start()

    def doThreadIteration(self, timeout):
        self._sendToThread(self._doIterationInThread, timeout)
        self.ensureWorkerThread()
        #print >>sys.stderr, 'getting...'
        msg, args = self.toMainThread.get()
        #print >>sys.stderr, 'got', msg, args
        getattr(self, '_process_' + msg)(*args)
    
    doIteration = doThreadIteration

    def mainLoopBegin(self):
        if self.running:
            self.runUntilCurrent()

    def _interleave(self):
        while self.running:
            #print >>sys.stderr, "runUntilCurrent"
            self.runUntilCurrent()
            t2 = self.timeout()
            t = self.running and t2
            self._sendToThread(self._doIterationInThread, t)
            #print >>sys.stderr, "yielding"
            yield None
            #print >>sys.stderr, "fetching"
            try:
                msg, args = self.toMainThread.get_nowait()
            except Empty:
                continue
            getattr(self, '_process_' + msg)(*args)

    def interleave(self, waker, *args, **kw):
        """
        waker(func) is a callable that will be called from
        some random thread.  Its job is to call func from
        the same thread that called this method.

        You should use this like a ninja master to integrate
        with foreign event loops.
        """
        self.startRunning(*args, **kw)
        loop = self._interleave()
        def mainWaker(waker=waker, loop=loop):
            #print >>sys.stderr, "mainWaker()"
            waker(loop.next)
        self.mainWaker = mainWaker
        loop.next()
        self.ensureWorkerThread()
    
    def _mainLoopShutdown(self):
        self.mainWaker = None
        if self.workerThread is not None:
            #print >>sys.stderr, 'getting...'
            self._sendToThread(raiseException, SystemExit)
            self.wakeUp()
            try:
                while 1:
                    msg, args = self.toMainThread.get_nowait()
                    #print >>sys.stderr, "ignored:", (msg, args)
            except Empty:
                pass
            self.workerThread.join()
            self.workerThread = None
        try:
            while 1:
                fn, args = self.toThreadQueue.get_nowait()
                if fn is self._doIterationInThread:
                    log.msg('Iteration is still in the thread queue!')
                elif fn is raiseException and args[0] is SystemExit:
                    pass
                else:
                    fn(*args)
        except Empty:
            pass

    def _doReadOrWrite(self, selectable, method, dict):
        try:
            why = getattr(selectable, method)()
            handfn = getattr(selectable, 'fileno', None)
            if not handfn:
                why = _NO_FILENO
            elif handfn() == -1:
                why = _NO_FILEDESC
        except:
            why = sys.exc_info()[1]
            log.err()
        if why:
            self._disconnectSelectable(selectable, why, method == "doRead")
    
    def addReader(self, reader):
        """Add a FileDescriptor for notification of data available to read.
        """
        self._sendToThread(self.reads.__setitem__, reader, 1)
        self.wakeUp()

    def addWriter(self, writer):
        """Add a FileDescriptor for notification of data available to write.
        """
        self._sendToThread(self.writes.__setitem__, writer, 1)
        self.wakeUp()

    def removeReader(self, reader):
        """Remove a Selectable for notification of data available to read.
        """
        self._sendToThread(dictRemove, self.reads, reader)

    def removeWriter(self, writer):
        """Remove a Selectable for notification of data available to write.
        """
        self._sendToThread(dictRemove, self.writes, writer)

    def removeAll(self):
        return self._removeAll(self.reads, self.writes)



def install():
    """Configure the twisted mainloop to be run using the select() reactor.
    """
    reactor = ThreadedSelectReactor()
    from twisted.internet.main import installReactor
    installReactor(reactor)
    return reactor

__all__ = ['install']
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.