test_poll.py :  » Language-Interface » ChinesePython » chinesepython2.1.3-0.4 » Lib » test » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Language Interface » ChinesePython 
ChinesePython » chinesepython2.1.3 0.4 » Lib » test » test_poll.py
# Test case for the os.poll() function

import sys, os, select, random
from test_support import verify,verbose,TestSkipped,TESTFN

try:
    select.poll
except AttributeError:
    raise TestSkipped, "select.poll not defined -- skipping test_poll"


def find_ready_matching(ready, flag):
    match = []
    for fd, mode in ready:
        if mode & flag:
            match.append(fd)
    return match

def test_poll1():
    """Basic functional test of poll object

    Create a bunch of pipe and test that poll works with them.
    """
    print 'Running poll test 1'
    p = select.poll()

    NUM_PIPES = 12
    MSG = " This is a test."
    MSG_LEN = len(MSG)
    readers = []
    writers = []
    r2w = {}
    w2r = {}

    for i in range(NUM_PIPES):
        rd, wr = os.pipe()
        p.register(rd, select.POLLIN)
        p.register(wr, select.POLLOUT)
        readers.append(rd)
        writers.append(wr)
        r2w[rd] = wr
        w2r[wr] = rd

    while writers:
        ready = p.poll()
        ready_writers = find_ready_matching(ready, select.POLLOUT)
        if not ready_writers:
            raise RuntimeError, "no pipes ready for writing"
        wr = random.choice(ready_writers)
        os.write(wr, MSG)

        ready = p.poll()
        ready_readers = find_ready_matching(ready, select.POLLIN)
        if not ready_readers:
            raise RuntimeError, "no pipes ready for reading"
        rd = random.choice(ready_readers)
        buf = os.read(rd, MSG_LEN)
        verify(len(buf) == MSG_LEN)
        print buf
        os.close(r2w[rd]) ; os.close( rd )
        p.unregister( r2w[rd] )
        p.unregister( rd )
        writers.remove(r2w[rd])

    poll_unit_tests()
    print 'Poll test 1 complete'

def poll_unit_tests():
    # returns NVAL for invalid file descriptor
    FD = 42
    try:
        os.close(FD)
    except OSError:
        pass
    p = select.poll()
    p.register(FD)
    r = p.poll()
    verify(r[0] == (FD, select.POLLNVAL))

    f = open(TESTFN, 'w')
    fd = f.fileno()
    p = select.poll()
    p.register(f)
    r = p.poll()
    verify(r[0][0] == fd)
    f.close()
    r = p.poll()
    verify(r[0] == (fd, select.POLLNVAL))
    os.unlink(TESTFN)

    # type error for invalid arguments
    p = select.poll()
    try:
        p.register(p)
    except TypeError:
        pass
    else:
        print "Bogus register call did not raise TypeError"
    try:
        p.unregister(p)
    except TypeError:
        pass
    else:
        print "Bogus unregister call did not raise TypeError"

    # can't unregister non-existent object
    p = select.poll()
    try:
        p.unregister(3)
    except KeyError:
        pass
    else:
        print "Bogus unregister call did not raise KeyError"

    # Test error cases
    pollster = select.poll()
    class Nope:
        pass

    class Almost:
        def fileno(self):
            return 'fileno'

    try:
        pollster.register( Nope(), 0 )
    except TypeError: pass
    else: print 'expected TypeError exception, not raised'

    try:
        pollster.register( Almost(), 0 )
    except TypeError: pass
    else: print 'expected TypeError exception, not raised'


# Another test case for poll().  This is copied from the test case for
# select(), modified to use poll() instead.

def test_poll2():
    print 'Running poll test 2'
    cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
    p = os.popen(cmd, 'r')
    pollster = select.poll()
    pollster.register( p, select.POLLIN )
    for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
        if verbose:
            print 'timeout =', tout
        fdlist = pollster.poll(tout)
        if (fdlist == []):
            continue
        fd, flags = fdlist[0]
        if flags & select.POLLHUP:
            line = p.readline()
            if line != "":
                print 'error: pipe seems to be closed, but still returns data'
            continue

        elif flags & select.POLLIN:
            line = p.readline()
            if verbose:
                print `line`
            if not line:
                if verbose:
                    print 'EOF'
                break
            continue
        else:
            print 'Unexpected return value from select.poll:', fdlist
    p.close()
    print 'Poll test 2 complete'

test_poll1()
test_poll2()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.