import random, threading, time
import Queue as queue
# producer/consumer queue, with (potentially) multiple consumers
class PCQueue:
def __init__(self, minthreads, maxthreads=0, qsize=0, name=None, q=None):
if not name:
name = self.__class__.__name__
if maxthreads == 0:
maxthreads = minthreads
if minthreads > maxthreads:
raise ValueError('minthreads may not be greater than max')
if not q:
q = queue.Queue()
self.q = q
self.running = True
self.name = name
self.minthreads = minthreads
self.maxthreads = maxthreads
self.threadcount = 0
for i in range(minthreads):
self.add_consumer()
if maxthreads > minthreads:
self.put = self._put
else:
self.put = lambda obj: self.q.put(obj)
def run_consumer(self):
while self.running:
try:
obj = self.next()
except queue.Empty:
if self.threadcount > self.minthreads:
# (30) slows down thread destruction w/ minimal overhead
if not random.randrange(30):
break
continue
self.consume(obj)
self.q.mutex.acquire()
self.threadcount -= 1
self.q.mutex.release()
def next(self):
"""
Return an object to call consume on. Should raise queue.Empty
if nothing is available to process. Should not block indefinitely.
"""
return self.q.get(timeout=1)
def _put(self, obj):
# this method is unused if maxthreads == minthreads
self.q.put(obj)
self.q.mutex.acquire()
try:
if self.q._qsize() > 1 and self.threadcount < self.maxthreads:
self.add_consumer()
finally:
self.q.mutex.release()
def consume(self, obj):
raise NotImplementedError()
def add_consumer(self):
self.threadcount += 1
th = threading.Thread(target=self.run_consumer, name='%s consumer %d'
% (self.name, self.threadcount))
th.start()
def stop(self):
self.running = False
def join(self):
while self.q.qsize():
time.sleep(0.1)
self.stop()
while self.threadcount:
time.sleep(0.1)
def __repr__(self):
return '%s(%s, %s)' % (self.__class__.__name__, self.threadcount, self.q)
def __getattr__(self, name):
return getattr(self.q, name)
|