LoadTest.py :  » Test » pyUnitPerf » pyunitperf » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Test » pyUnitPerf 
pyUnitPerf » pyunitperf » LoadTest.py
"""
 * The <code>LoadTest</code> is a test decorator that runs
 * a test with a simulated number of concurrent users and
 * iterations.
 * <p>
 * In its simplest form, a <code>LoadTest</code> is constructed 
 * with a test to decorate and the number of concurrent users.
 * </p>
 * <p>
 * For example, to create a load test of 10 concurrent users
 * with each user running <code>ExampleTest</code> once and 
 * all users started simultaneously, use:
 * <blockquote>
 * <pre>
 * Test loadTest = new LoadTest(new TestSuite(ExampleTest.class), 10);
 * </pre>
 * </blockquote>
 * or, to load test a single test method, use: 
 * <blockquote>
 * <pre>
 * Test loadTest = new LoadTest(new ExampleTest("testSomething"), 10);
 * </pre>
 * </blockquote>
 * </p>
 * <p>
 * The load can be ramped by specifying a pluggable 
 * <code>Timer</code> instance which prescribes the delay
 * between the addition of each concurrent user.  A
 * <code>ConstantTimer</code> has a constant delay, with 
 * a zero value indicating that all users will be started 
 * simultaneously. A <code>RandomTimer</code> has a random 
 * delay with a uniformly distributed variation.
 * </p>
 * <p>
 * For example, to create a load test of 10 concurrent users
 * with each user running <code>ExampleTest.testSomething()</code> once and 
 * with a one second delay between the addition of users, use:
 * <blockquote>
 * <pre>
 * Timer timer = new ConstantTimer(1000);
 * Test loadTest = new LoadTest(new ExampleTest("testSomething"), 10, timer);
 * </pre>
 * </blockquote>
 * </p>
 * <p>
 * In order to simulate each concurrent user running a test for a 
 * specified number of iterations, a <code>LoadTest</code> can be 
 * constructed to decorate a <code>RepeatedTest</code>. 
 * Alternatively, a <code>LoadTest</code> convenience constructor 
 * specifying the number of iterations is provided which creates a 
 * <code>RepeatedTest</code>. 
 * </p>
 * <p>
 * For example, to create a load test of 10 concurrent users
 * with each user running <code>ExampleTest.testSomething()</code> for 20 iterations, 
 * and with a one second delay between the addition of users, use:
 * <blockquote>
 * <pre>
 * Timer timer = new ConstantTimer(1000);
 * Test repeatedTest = new RepeatedTest(new ExampleTest("testSomething"), 20);
 * Test loadTest = new LoadTest(repeatedTest, 10, timer);
 * </pre>
 * </blockquote>
 * or, alternatively, use: 
 * <blockquote>
 * <pre>
 * Timer timer = new ConstantTimer(1000);
 * Test loadTest = new LoadTest(new ExampleTest("testSomething"), 10, 20, timer);
 * </pre>
 * </blockquote> 
 * A <code>LoadTest</code> can be decorated as a <code>TimedTest</code>
 * to test the elapsed time of the load test.  For example, to decorate 
 * the load test constructed above as a timed test with a maximum elapsed 
 * time of 2 seconds, use:
 * <blockquote>
 * <pre>
 * Test timedTest = new TimedTest(loadTest, 2000);
 * </pre>
 * </blockquote>
 * </p>
 * <p>
 * By default, a <code>LoadTest</code> does not enforce test 
 * atomicity (as defined in transaction processing) if its decorated 
 * test spawns threads, either directly or indirectly.  In other words, 
 * if a decorated test spawns a thread and then returns control without 
 * waiting for its spawned thread to complete, then the test is assumed 
 * to be transactionally complete.  
 * </p>
 * <p>
 * If threads are integral to the successful completion of 
 * a decorated test, meaning that the decorated test should not be 
 * treated as complete until all of its threads complete, then  
 * <code>setEnforceTestAtomicity(true)</code> should be invoked to 
 * enforce test atomicity.  This effectively causes the load test to 
 * wait for the completion of all threads belonging to the same 
 * <code>ThreadGroup</code> as the thread running the decorated test.
 * </p>
 * @author <b>Mike Clark</b>
 * @author Clarkware Consulting, Inc.
 * @author Ervin Varga
 **************************************
 * Ported to Python by Grig Gheorghiu *
 **************************************
"""

import time
from threading import Thread
from unittest import TestResult,TestCase
from Test import Test
from RepeatedTest import RepeatedTest
from ConstantTimer import ConstantTimer
from CustomExceptions import IllegalArgumentException
from ThreadBarrier import ThreadBarrier
from ThreadedTestGroup import ThreadedTestGroup
from ThreadedTest import ThreadedTest

try:
  bool = True
except:
  True = 1
  False = 0
  
class LoadTest(Test):

  def __init__(self, test, users, iterations=0, timer=None):
    """
     * Constructs a <code>LoadTest</code> to decorate 
     * the specified test using the specified number 
     * of concurrent users starting simultaneously and
     * the number of iterations per user. If a Timer is
     * indicated, then a delay is introduced
     *
     * @param test Test to decorate.
     * @param users Number of concurrent users.
     * @param iterations Number of iterations per user.
     * @param timer Delay timer.
    """
    if iterations:
      test = RepeatedTest(test, iterations)
    if timer is None:
      timer = ConstantTimer(0)
      
    if users < 1:
      raise IllegalArgumentException("Number of users must be > 0")
    if timer is None:
      raise IllegalArgumentException("Delay timer is null")
    if test is None:
      raise IllegalArgumentException("Decorated test is null")

    self.users = users
    self.timer = timer
    self.setEnforceTestAtomicity(False)
    self.barrier = ThreadBarrier(users)
    self.group = ThreadedTestGroup(self, "LoadTest:ThreadedTestGroup")
    self.test = ThreadedTest(test, self.group, self.barrier)
  
  def setEnforceTestAtomicity(self, isAtomic):
    """
     * Indicates whether test atomicity should be enforced.
     * <p>
      * If threads are integral to the successful completion of 
      * a decorated test, meaning that the decorated test should not be 
      * treated as complete until all of its threads complete, then  
      * <code>setEnforceTestAtomicity(true)</code> should be invoked to 
      * enforce test atomicity.  This effectively causes the load test to 
      * wait for the completion of all threads belonging to the same 
      * <code>ThreadGroup</code> as the thread running the decorated test.
      * 
     * @param isAtomic <code>true</code> to enforce test atomicity;
     *                 <code>false</code> otherwise.
     """
    self.enforceTestAtomicity = isAtomic
  
  def countTestCases(self):
    """
     * Returns the number of tests in this load test.
     *
     * @return Number of tests.
    """
    return self.users * self.test.countTestCases()

  def run(self, result):
    """
     * Runs the test.
     *
     * @param result Test result.
    """
    self.group.setTestResult(result)
    for i in range(self.users):
      #if result.shouldStop():
      #  self.barrier.cancelThreads(self.users - i)
      #  break
      self.test.run(result)
      self.sleep(self.getDelay())
    
    self.waitForTestCompletion()
    self.cleanup()

  def __call__(self, result):
    self.run(result)
  
  def waitForTestCompletion(self):
    """
    // TODO: May require a strategy pattern
    //       if other algorithms emerge.
    """
    if self.enforceTestAtomicity:
      self.waitForAllThreadsToComplete()
    else:
      self.waitForThreadedTestThreadsToComplete()

  def waitForThreadedTestThreadsToComplete(self):
    while not self.barrier.isReached():
      self.sleep(50)
  
  def waitForAllThreadsToComplete(self):
    while self.group.activeCount() > 0:
      self.sleep(50)
  
  def sleep(self, ms):
    try:
      time.sleep(ms*0.001)
    except:
      pass
  
  def cleanup(self):
    try:
      self.group.destroy()
    except:
      pass
  
  def __str__(self):
    if self.enforceTestAtomicity:
      return "LoadTest (ATOMIC): " + str(self.test)
    else:
      return "LoadTest (NON-ATOMIC): " + str(self.test)

  def getDelay(self):
    return self.timer.getDelay()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.