import gc
import socket
import threading
import time
import unittest
import sys
sys.path.insert(1, "..")
import SOAPpy
#SOAPpy.Config.debug=1
# global to shut down server
quit = 0
def echoDateTime(dt):
return dt
def echo(s):
"""repeats a string twice"""
return s + s
def kill():
"""tell the server to quit"""
global quit
quit = 1
def server1():
"""start a SOAP server on localhost:8000"""
print "Starting SOAP Server...",
server = SOAPpy.Server.SOAPServer(addr=('127.0.0.1', 8000))
server.registerFunction(echoDateTime)
server.registerFunction(echo)
server.registerFunction(kill)
print "Done."
global quit
while not quit:
server.handle_request()
quit = 0
print "Server shut down."
class ClientTestCase(unittest.TestCase):
server = None
startup_timeout = 5 # seconds
def setUp(self):
'''This is run once before each unit test.'''
serverthread = threading.Thread(target=server1, name="SOAPServer")
serverthread.start()
start = time.time()
connected = False
server = None
while not connected and time.time() - start < self.startup_timeout:
print "Trying to connect to the SOAP server...",
try:
server = SOAPpy.Client.SOAPProxy('127.0.0.1:8000')
server.echo('Hello World')
except socket.error, e:
print "Failure:", e
time.sleep(0.5)
else:
connected = True
self.server = server
print "Success."
if not connected: raise 'Server failed to start.'
def tearDown(self):
'''This is run once after each unit test.'''
print "Trying to shut down SOAP server..."
if self.server is not None:
self.server.kill()
time.sleep(5)
return 1
def testEcho(self):
'''Test echo function.'''
server = SOAPpy.Client.SOAPProxy('127.0.0.1:8000')
s = 'Hello World'
self.assertEquals(server.echo(s), s+s)
def testNamedEcho(self):
'''Test echo function.'''
server = SOAPpy.Client.SOAPProxy('127.0.0.1:8000')
s = 'Hello World'
self.assertEquals(server.echo(s=s), s+s)
def testEchoDateTime(self):
'''Test passing DateTime objects.'''
server = SOAPpy.Client.SOAPProxy('127.0.0.1:8000')
dt = SOAPpy.Types.dateTimeType(data=time.time())
dt_return = server.echoDateTime(dt)
self.assertEquals(dt_return, dt)
# def testNoLeak(self):
# '''Test for memory leak.'''
# gc.set_debug(gc.DEBUG_SAVEALL)
# for i in range(400):
# server = SOAPpy.Client.SOAPProxy('127.0.0.1:8000')
# s = 'Hello World'
# server.echo(s)
# gc.collect()
# self.assertEquals(len(gc.garbage), 0)
if __name__ == '__main__':
unittest.main()
|