AppServerTest.py :  » Web-Frameworks » Webware » Webware-1.0.2 » WebKit » Tests » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Frameworks » Webware 
Webware » Webware 1.0.2 » WebKit » Tests » AppServerTest.py
import unittest
import os
import time
from re import compile
from threading import Thread
from Queue import Queue,Empty
from urllib import urlopen

try: # for Python < 2.3
  True, False
except NameError:
  True, False = 1, 0


class AppServerTest(unittest.TestCase):

  def setUp(self):
    """Setup fixture and test starting."""
    workdir = self.workDir()
    dirname = os.path.dirname
    webwaredir = dirname(dirname(dirname(workdir)))
    launch = os.path.join(workdir, 'Launch.py')
    cmd = "python %s --webware-dir=%s --work-dir=%s" \
      " ThreadedAppServer" % (launch, webwaredir, workdir)
    self._output = os.popen(cmd)
    # Setting the appserver output to non-blocking mode
    # could be done as follows on Unix systems:
    # fcntl.fcntl(self._output.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
    # But, since this does not work on Windows systems,
    # we will pull the output in a separate thread:
    def pullStream(stream, queue):
      while stream:
        line = stream.readline()
        if not line:
          break
        queue.put(line)
    self._queue = Queue()
    self._thread = Thread(target=pullStream,
      args=(self._output, self._queue))
    self._thread.start()
    self.assertAppServerSays('^WebKit AppServer ')
    self.assertAppServerSays(' Webware for Python.$')
    self.assertAppServerSays(' by Chuck Esterbrook.')
    self.assertAppServerSays('^WebKit and Webware are open source.$')
    self.assertAppServerSays('^EnableHTTP\s*=\s*(True|1)$')
    self.assertAppServerSays('^HTTPPort\s*=\s*8080$')
    self.assertAppServerSays('^Host\s*=\s*(localhost|127.0.0.1)$')
    self.assertAppServerSays('^Ready (.*).$')
    # We will also test the built-in HTTP server with this:
    try:
      data = urlopen('http://localhost:8080').read()
    except IOError:
      data = '<h2>Could not read page.</h2>'
    assert data.find('<h1>Welcome to Webware!</h1>') > 0
    assert data.find('<h2>Test passed.</h2>') > 0

  def assertAppServerSays(self, pattern, wait=5):
    """Check that the appserver output contains the specified pattern.

    If the appserver does not output the pattern within the given number
    of seconds, an assertion is raised.

    """
    if not self.waitForAppServer(pattern, wait):
      assert False, "Expected appserver to say '%s',\n" \
        "but after waiting %d seconds it said:\n%s" \
        % (pattern, wait, self._actualAppServerOutput)

  def waitForAppServer(self, pattern, wait=5):
    """Check that the appserver output contains the specified pattern.

    Returns True or False depending on whether the pattern was seen.

    """
    start = time.time()
    comp = reCompile(pattern)
    lines = []
    found = False
    while 1:
      try:
        line = self._queue.get_nowait()
      except Empty:
        line = None
      if line is None:
        now = time.time()
        if now - start > wait:
          break
        time.sleep(0.2)
      else:
        if len(lines) > 9:
          del lines[0]
        lines.append(line)
        if comp.search(line):
          found = True
          break
    self._actualAppServerOutput = ''.join(lines)
    return found

  def tearDown(self):
    """Teardown fixture and test stopping."""
    try:
      data = urlopen('http://localhost:8080/stop').read()
    except IOError:
      data = '<h2>Could not read page.</h2>'
    assert data.find('<h2>The AppServer has been stopped.</h2>') > 0
    self.assertAppServerSays('^AppServer has been shutdown.$')
    self._output = None
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.