test_win32trace.py :  » Windows » pyExcelerator » pywin32-214 » win32 » test » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Windows » pyExcelerator 
pyExcelerator » pywin32 214 » win32 » test » test_win32trace.py
import unittest
import win32trace
import threading
import time
import os
import sys

if __name__=='__main__':
    this_file = sys.argv[0]
else:
    this_file = __file__

def CheckNoOtherReaders():
    win32trace.write("Hi")
    time.sleep(0.05)
    if win32trace.read() != "Hi":
        # Reset everything so following tests still fail with this error!
        win32trace.TermRead()
        win32trace.TermWrite()
        raise RuntimeError("An existing win32trace reader appears to be " \
                            "running - please stop this process and try again")

class TestInitOps(unittest.TestCase):
    def setUp(self):
        # clear old data
        win32trace.InitRead()
        win32trace.read()
        win32trace.TermRead()

    def tearDown(self):
        try:
            win32trace.TermRead()
        except win32trace.error:
            pass
        try:
            win32trace.TermWrite()
        except win32trace.error:
            pass
        
    def testInitTermRead(self):
        self.assertRaises(win32trace.error, win32trace.read)
        win32trace.InitRead()
        result = win32trace.read()
        self.assertEquals(result, '')
        win32trace.TermRead()
        self.assertRaises(win32trace.error, win32trace.read)

        win32trace.InitRead()
        self.assertRaises(win32trace.error, win32trace.InitRead)
        win32trace.InitWrite()
        self.assertRaises(win32trace.error, win32trace.InitWrite)
        win32trace.TermWrite()
        win32trace.TermRead()

    def testInitTermWrite(self):
        self.assertRaises(win32trace.error, win32trace.write, 'Hei')
        win32trace.InitWrite()
        win32trace.write('Johan Galtung')
        win32trace.TermWrite()
        self.assertRaises(win32trace.error, win32trace.write, 'Hei')

    def testTermSematics(self):
        win32trace.InitWrite()
        win32trace.write('Ta da')

        # if we both Write and Read are terminated at the same time,
        # we lose the data as the win32 object is closed.  Note that
        # if another writer is running, we do *not* lose the data - so
        # test for either the correct data or an empty string
        win32trace.TermWrite()
        win32trace.InitRead()
        self.failUnless(win32trace.read() in ['Ta da', ''])
        win32trace.TermRead()

        # we keep the data because we init read before terminating write
        win32trace.InitWrite()
        win32trace.write('Ta da')
        win32trace.InitRead()
        win32trace.TermWrite()
        self.assertEquals('Ta da', win32trace.read())
        win32trace.TermRead()


class BasicSetupTearDown(unittest.TestCase):
    def setUp(self):
        win32trace.InitRead()
        # If any other writers are running (even if not actively writing),
        # terminating the module will *not* close the handle, meaning old data
        # will remain. This can cause other tests to fail.
        win32trace.read()
        win32trace.InitWrite()

    def tearDown(self):
        win32trace.TermWrite()
        win32trace.TermRead()
    

class TestModuleOps(BasicSetupTearDown):        
    def testRoundTrip(self):
        win32trace.write('Syver Enstad')
        syverEnstad = win32trace.read()
        self.assertEquals('Syver Enstad', syverEnstad)

    def testRoundTripUnicode(self):
        win32trace.write(u'\xa9opyright Syver Enstad')
        syverEnstad = win32trace.read()
        # str objects are always returned in py2k (latin-1 encoding was used
        # on unicode objects)
        self.assertEquals('\xa9opyright Syver Enstad', syverEnstad)

    def testBlockingRead(self):
        win32trace.write('Syver Enstad')
        self.assertEquals('Syver Enstad', win32trace.blockingread())

    def testBlockingReadUnicode(self):
        win32trace.write(u'\xa9opyright Syver Enstad')
        # str objects are always returned in py2k (latin-1 encoding was used
        # on unicode objects)
        self.assertEquals('\xa9opyright Syver Enstad', win32trace.blockingread())

    def testFlush(self):
        win32trace.flush()


class TestTraceObjectOps(BasicSetupTearDown):
    def testInit(self):
        win32trace.TermRead()
        win32trace.TermWrite()
        traceObject = win32trace.GetTracer()
        self.assertRaises(win32trace.error, traceObject.read)
        self.assertRaises(win32trace.error, traceObject.write, '')
        win32trace.InitRead()
        win32trace.InitWrite()
        self.assertEquals('', traceObject.read())
        traceObject.write('Syver')

    def testFlush(self):
        traceObject = win32trace.GetTracer()
        traceObject.flush()

    def testIsatty(self):
        tracer = win32trace.GetTracer()
        assert tracer.isatty() == False
        

    def testRoundTrip(self):
        traceObject = win32trace.GetTracer()
        traceObject.write('Syver Enstad')
        self.assertEquals('Syver Enstad', traceObject.read())

class WriterThread(threading.Thread):
    def run(self):
        self.writeCount = 0
        for each in range(self.BucketCount):
            win32trace.write(str(each))
        self.writeCount = self.BucketCount

    def verifyWritten(self):
        return self.writeCount == self.BucketCount

class TestMultipleThreadsWriting(unittest.TestCase):
    # FullBucket is the thread count
    FullBucket = 50
    BucketCount = 9 # buckets must be a single digit number (ie. less than 10)
    def setUp(self):
        WriterThread.BucketCount = self.BucketCount        
        win32trace.InitRead()
        win32trace.read() # clear any old data.
        win32trace.InitWrite()
        CheckNoOtherReaders()
        self.threads = [WriterThread() for each in range(self.FullBucket)]
        self.buckets = range(self.BucketCount)
        for each in self.buckets:
            self.buckets[each] = 0

    def tearDown(self):
        win32trace.TermRead()
        win32trace.TermWrite()

    def areBucketsFull(self):
        bucketsAreFull = True
        for each in self.buckets:
            assert each <= self.FullBucket, each
            if each != self.FullBucket:
                bucketsAreFull = False
                break
        return bucketsAreFull
        

    def read(self):
        while 1:
            readString = win32trace.blockingread()
            for ch in readString:
                integer = int(ch)
                count = self.buckets[integer]
                assert count != -1
                self.buckets[integer] = count + 1
                if self.buckets[integer] == self.FullBucket:
                    if self.areBucketsFull():
                        return
                        
    def testThreads(self):
        for each in self.threads:
            each.start()
        self.read()
        for each in self.threads:
            each.join()
        for each in self.threads:
            assert each.verifyWritten()
        assert self.areBucketsFull()

class TestHugeChunks(unittest.TestCase):
    # BiggestChunk is the size where we stop stressing the writer
    BiggestChunk = 2**16 # 256k should do it.
    def setUp(self):
        win32trace.InitRead()
        win32trace.read() # clear any old data
        win32trace.InitWrite()
    def testHugeChunks(self):
        data = "*" * 1023 + "\n"
        while len(data) <= self.BiggestChunk:
            win32trace.write(data)
            data = data + data
        # If we made it here, we passed.

    def tearDown(self):
        win32trace.TermRead()
        win32trace.TermWrite()

import win32event
import win32process

class TraceWriteProcess:
    def __init__(self, threadCount):
        self.exitCode = -1
        self.threadCount = threadCount
        
    def start(self):
        procHandle, threadHandle, procId, threadId  = win32process.CreateProcess(
            None, # appName
            'python.exe "%s" /run_test_process %s %s' % (this_file,
                                                         self.BucketCount,
                                                         self.threadCount),
            None, # process security
            None, # thread security
            0, # inherit handles
            win32process.NORMAL_PRIORITY_CLASS,
            None, # new environment
            None, # Current directory
            win32process.STARTUPINFO(), # startup info
            )
        self.processHandle = procHandle
        
    def join(self):
        win32event.WaitForSingleObject(self.processHandle,
                                       win32event.INFINITE)
        self.exitCode = win32process.GetExitCodeProcess(self.processHandle)

    def verifyWritten(self):
        return self.exitCode == 0


class TestOutofProcess(unittest.TestCase):
    BucketCount = 9
    FullBucket = 50
    def setUp(self):
        win32trace.InitRead()
        TraceWriteProcess.BucketCount = self.BucketCount
        self.setUpWriters()
        self.buckets = range(self.BucketCount)
        for each in self.buckets:
            self.buckets[each] = 0

    def tearDown(self):
        win32trace.TermRead()


    def setUpWriters(self):
        self.processes = []
        # 5 processes, quot threads in each process
        quot, remainder = divmod(self.FullBucket, 5)
        for each in range(5):
            self.processes.append(TraceWriteProcess(quot))
        if remainder:
            self.processes.append(TraceWriteProcess(remainder))
            
    def areBucketsFull(self):
        bucketsAreFull = True
        for each in self.buckets:
            assert each <= self.FullBucket, each
            if each != self.FullBucket:
                bucketsAreFull = False
                break
        return bucketsAreFull
        
    def read(self):
        while 1:
            readString = win32trace.blockingread()
            for ch in readString:
                integer = int(ch)
                count = self.buckets[integer]
                assert count != -1
                self.buckets[integer] = count + 1
                if self.buckets[integer] == self.FullBucket:
                    if self.areBucketsFull():
                        return
                        
    def testProcesses(self):
        for each in self.processes:
            each.start()
        self.read()
        for each in self.processes:
            each.join()
        for each in self.processes:
            assert each.verifyWritten()
        assert self.areBucketsFull()    

def _RunAsTestProcess():
    # Run as an external process by the main tests.
    WriterThread.BucketCount = int(sys.argv[2])
    threadCount = int(sys.argv[3])
    threads = [WriterThread() for each in range(threadCount)]
    win32trace.InitWrite()
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    for t in threads:
        if not t.verifyWritten():
            sys.exit(-1)
    
if __name__ == '__main__':
    if sys.argv[1:2]==["/run_test_process"]:
        _RunAsTestProcess()
        sys.exit(0)
    # If some other win32traceutil reader is running, these tests fail
    # badly (as the other reader sometimes sees the output!)
    win32trace.InitRead()
    win32trace.InitWrite()
    CheckNoOtherReaders()
    # reset state so test env is back to normal
    win32trace.TermRead()
    win32trace.TermWrite()
    unittest.main()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.