import threading
PRINT_TRACEBACKS = 0
class ThreadedTaskHandler:
'''Rather than creating a new thread for each task, reuses existing
threads for speed.
'''
def __init__(self, target_threads=1, limit_threads=0):
self.queue = []
self.cond = threading.Condition()
self.running_threads = 0
self.idle_threads = 0
self.target_threads = target_threads
self.limit_threads = limit_threads
def addTask(self, task, args=(), kw=None):
'''
task is a callable object which will be executed in another
thread.
'''
if 0: # Set to 1 to get equivalent but slower processing.
if kw is None: kw = {}
t = threading.Thread(target=task, args=args, kwargs=kw)
t.setDaemon(1)
t.start()
return
cond = self.cond
cond.acquire()
try:
self.queue.append((task, args, kw))
if self.idle_threads < 1:
if self.limit_threads < 1 or (self.running_threads
< self.limit_threads):
t = threading.Thread(target=self.clientThread)
t.setDaemon(1)
self.running_threads = self.running_threads + 1
t.start()
else:
self.idle_threads = self.idle_threads - 1
cond.notify()
finally:
cond.release()
def clientThread(self):
'''
Performs tasks.
'''
exit_loop = 0
while not exit_loop:
task = args = kw = None
cond = self.cond
cond.acquire()
try:
queue = self.queue
if len(queue) < 1:
if self.running_threads > self.target_threads:
exit_loop = 1
self.running_threads = self.running_threads - 1
else:
self.idle_threads = self.idle_threads + 1
cond.wait()
if len(queue) > 0:
task, args, kw = queue[0]
del queue[0]
finally:
cond.release()
if task is not None:
#print 'performing task: %s(%s, %s)'%(task, args, kw)
try:
if kw is not None:
task(*args, **kw)
else:
task(*args)
except SystemExit:
exit_loop = 1
self.running_threads = self.running_threads - 1
except:
if PRINT_TRACEBACKS:
# The task ought to do its own error handling,
# but sometimes it doesn't.
import traceback
traceback.print_exc()
if __name__ == '__main__':
from time import time,sleep
evt = threading.Event()
tth = ThreadedTaskHandler()
start = 0
end = 0
class SpeedTest:
def __init__(self, count):
self.count = count
def __call__(self):
global end
self.count = self.count - 1
if self.count <= 0:
end = time()
evt.set()
else:
tth.addTask(self)
count = 1000
t = SpeedTest(count)
start = time()
tth.addTask(t)
evt.wait()
print 'Performed %d tasks in %d ms' % (count,
int((end - start) * 1000))
|