001: /*
002: File: PooledExecutor.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 19Jun1998 dl Create public version
012: 29aug1998 dl rely on ThreadFactoryUser,
013: remove ThreadGroup-based methods
014: adjusted locking policies
015: 3mar1999 dl Worker threads sense decreases in pool size
016: 31mar1999 dl Allow supplied channel in constructor;
017: add methods createThreads, drain
018: 15may1999 dl Allow infinite keepalives
019: 21oct1999 dl add minimumPoolSize methods
020: 7sep2000 dl BlockedExecutionHandler now an interface,
021: new DiscardOldestWhenBlocked policy
022: 12oct2000 dl add shutdownAfterProcessingCurrentlyQueuedTasks
023: 13nov2000 dl null out task ref after run
024: 08apr2001 dl declare inner class ctor protected
025: 12nov2001 dl Better shutdown support
026: Blocked exec handlers can throw IE
027: Simplify locking scheme
028: 25jan2001 dl {get,set}BlockedExecutionHandler now public
029: 17may2002 dl null out task var in worker run to enable GC.
030: */
031:
032: package EDU.oswego.cs.dl.util.concurrent;
033:
034: import java.util.*;
035:
036: /**
037: * A tunable, extensible thread pool class. The main supported public
038: * method is <code>execute(Runnable command)</code>, which can be
039: * called instead of directly creating threads to execute commands.
040: *
041: * <p>
042: * Thread pools can be useful for several, usually intertwined
043: * reasons:
044: *
045: * <ul>
046: *
047: * <li> To bound resource use. A limit can be placed on the maximum
048: * number of simultaneously executing threads.
049: *
050: * <li> To manage concurrency levels. A targeted number of threads
051: * can be allowed to execute simultaneously.
052: *
053: * <li> To manage a set of threads performing related tasks.
054: *
055: * <li> To minimize overhead, by reusing previously constructed
056: * Thread objects rather than creating new ones. (Note however
057: * that pools are hardly ever cure-alls for performance problems
058: * associated with thread construction, especially on JVMs that
059: * themselves internally pool or recycle threads.)
060: *
061: * </ul>
062: *
063: * These goals introduce a number of policy parameters that are
064: * encapsulated in this class. All of these parameters have defaults
065: * and are tunable, either via get/set methods, or, in cases where
066: * decisions should hold across lifetimes, via methods that can be
067: * easily overridden in subclasses. The main, most commonly set
068: * parameters can be established in constructors. Policy choices
069: * across these dimensions can and do interact. Be careful, and
070: * please read this documentation completely before using! See also
071: * the usage examples below.
072: *
073: * <dl>
074: * <dt> Queueing
075: *
076: * <dd> By default, this pool uses queueless synchronous channels to
077: * to hand off work to threads. This is a safe, conservative policy
078: * that avoids lockups when handling sets of requests that might
079: * have internal dependencies. (In these cases, queuing one task
080: * could lock up another that would be able to continue if the
081: * queued task were to run.) If you are sure that this cannot
082: * happen, then you can instead supply a queue of some sort (for
083: * example, a BoundedBuffer or LinkedQueue) in the constructor.
084: * This will cause new commands to be queued in cases where all
085: * MaximumPoolSize threads are busy. Queues are sometimes
086: * appropriate when each task is completely independent of others,
087: * so tasks cannot affect each others execution. For example, in an
088: * http server. <p>
089: *
090: * When given a choice, this pool always prefers adding a new thread
091: * rather than queueing if there are currently fewer than the
092: * current getMinimumPoolSize threads running, but otherwise always
093: * prefers queuing a request rather than adding a new thread. Thus,
094: * if you use an unbounded buffer, you will never have more than
095: * getMinimumPoolSize threads running. (Since the default
096: * minimumPoolSize is one, you will probably want to explicitly
097: * setMinimumPoolSize.) <p>
098: *
099: * While queuing can be useful in smoothing out transient bursts of
100: * requests, especially in socket-based services, it is not very
101: * well behaved when commands continue to arrive on average faster
102: * than they can be processed. Using bounds for both the queue and
103: * the pool size, along with run-when-blocked policy is often a
104: * reasonable response to such possibilities. <p>
105: *
106: * Queue sizes and maximum pool sizes can often be traded off for
107: * each other. Using large queues and small pools minimizes CPU
108: * usage, OS resources, and context-switching overhead, but can lead
109: * to artifically low throughput. Especially if tasks frequently
110: * block (for example if they are I/O bound), a JVM and underlying
111: * OS may be able to schedule time for more threads than you
112: * otherwise allow. Use of small queues or queueless handoffs
113: * generally requires larger pool sizes, which keeps CPUs busier but
114: * may encounter unacceptable scheduling overhead, which also
115: * decreases throughput. <p>
116: *
117: * <dt> Maximum Pool size
118: *
119: * <dd> The maximum number of threads to use, when needed. The pool
120: * does not by default preallocate threads. Instead, a thread is
121: * created, if necessary and if there are fewer than the maximum,
122: * only when an <code>execute</code> request arrives. The default
123: * value is (for all practical purposes) infinite --
124: * <code>Integer.MAX_VALUE</code>, so should be set in the
125: * constructor or the set method unless you are just using the pool
126: * to minimize construction overhead. Because task handoffs to idle
127: * worker threads require synchronization that in turn relies on JVM
128: * scheduling policies to ensure progress, it is possible that a new
129: * thread will be created even though an existing worker thread has
130: * just become idle but has not progressed to the point at which it
131: * can accept a new task. This phenomenon tends to occur on some
132: * JVMs when bursts of short tasks are executed. <p>
133: *
134: * <dt> Minimum Pool size
135: *
136: * <dd> The minimum number of threads to use, when needed (default
137: * 1). When a new request is received, and fewer than the minimum
138: * number of threads are running, a new thread is always created to
139: * handle the request even if other worker threads are idly waiting
140: * for work. Otherwise, a new thread is created only if there are
141: * fewer than the maximum and the request cannot immediately be
142: * queued. <p>
143: *
144: * <dt> Preallocation
145: *
146: * <dd> You can override lazy thread construction policies via
147: * method createThreads, which establishes a given number of warm
148: * threads. Be aware that these preallocated threads will time out
149: * and die (and later be replaced with others if needed) if not used
150: * within the keep-alive time window. If you use preallocation, you
151: * probably want to increase the keepalive time. The difference
152: * between setMinimumPoolSize and createThreads is that
153: * createThreads immediately establishes threads, while setting the
154: * minimum pool size waits until requests arrive. <p>
155: *
156: * <dt> Keep-alive time
157: *
158: * <dd> If the pool maintained references to a fixed set of threads
159: * in the pool, then it would impede garbage collection of otherwise
160: * idle threads. This would defeat the resource-management aspects
161: * of pools. One solution would be to use weak references. However,
162: * this would impose costly and difficult synchronization issues.
163: * Instead, threads are simply allowed to terminate and thus be
164: * GCable if they have been idle for the given keep-alive time. The
165: * value of this parameter represents a trade-off between GCability
166: * and construction time. In most current Java VMs, thread
167: * construction and cleanup overhead is on the order of
168: * milliseconds. The default keep-alive value is one minute, which
169: * means that the time needed to construct and then GC a thread is
170: * expended at most once per minute.
171: * <p>
172: *
173: * To establish worker threads permanently, use a <em>negative</em>
174: * argument to setKeepAliveTime. <p>
175: *
176: * <dt> Blocked execution policy
177: *
178: * <dd> If the maximum pool size or queue size is bounded, then it
179: * is possible for incoming <code>execute</code> requests to
180: * block. There are four supported policies for handling this
181: * problem, and mechanics (based on the Strategy Object pattern) to
182: * allow others in subclasses: <p>
183: *
184: * <dl>
185: * <dt> Run (the default)
186: * <dd> The thread making the <code>execute</code> request
187: * runs the task itself. This policy helps guard against lockup.
188: * <dt> Wait
189: * <dd> Wait until a thread becomes available.
190: * <dt> Abort
191: * <dd> Throw a RuntimeException
192: * <dt> Discard
193: * <dd> Throw away the current request and return.
194: * <dt> DiscardOldest
195: * <dd> Throw away the oldest request and return.
196: * </dl>
197: *
198: * Other plausible policies include raising the maximum pool size
199: * after checking with some other objects that this is OK. <p>
200: *
201: * These cases can never occur if the maximum pool size is unbounded
202: * or the queue is unbounded. In these cases you instead face
203: * potential resource exhaustion.) The execute method does not
204: * throw any checked exceptions in any of these cases since any
205: * errors associated with them must normally be dealt with via
206: * handlers or callbacks. (Although in some cases, these might be
207: * associated with throwing unchecked exceptions.) You may wish to
208: * add special implementations even if you choose one of the listed
209: * policies. For example, the supplied Discard policy does not
210: * inform the caller of the drop. You could add your own version
211: * that does so. Since choice of policies is normally a system-wide
212: * decision, selecting a policy affects all calls to
213: * <code>execute</code>. If for some reason you would instead like
214: * to make per-call decisions, you could add variant versions of the
215: * <code>execute</code> method (for example,
216: * <code>executeIfWouldNotBlock</code>) in subclasses. <p>
217: *
218: * <dt> Thread construction parameters
219: *
220: * <dd> A settable ThreadFactory establishes each new thread. By
221: * default, it merely generates a new instance of class Thread, but
222: * can be changed to use a Thread subclass, to set priorities,
223: * ThreadLocals, etc. <p>
224: *
225: * <dt> Interruption policy
226: *
227: * <dd> Worker threads check for interruption after processing each
228: * command, and terminate upon interruption. Fresh threads will
229: * replace them if needed. Thus, new tasks will not start out in an
230: * interrupted state due to an uncleared interruption in a previous
231: * task. Also, unprocessed commands are never dropped upon
232: * interruption. It would conceptually suffice simply to clear
233: * interruption between tasks, but implementation characteristics of
234: * interruption-based methods are uncertain enough to warrant this
235: * conservative strategy. It is a good idea to be equally
236: * conservative in your code for the tasks running within pools.
237: * <p>
238: *
239: * <dt> Shutdown policy
240: *
241: * <dd> The interruptAll method interrupts, but does not disable the
242: * pool. Two different shutdown methods are supported for use when
243: * you do want to (permanently) stop processing tasks. Method
244: * shutdownAfterProcessingCurrentlyQueuedTasks waits until all
245: * current tasks are finished. The shutDownNow method interrupts
246: * current threads and leaves other queued requests unprocessed.
247: * <p>
248: *
249: * <dt> Handling requests after shutdown
250: *
251: * <dd> When the pool is shutdown, new incoming requests are handled
252: * by the blockedExecutionHandler. By default, the handler is set to
253: * discard new requests, but this can be set with an optional
254: * argument to method
255: * shutdownAfterProcessingCurrentlyQueuedTasks. <p> Also, if you are
256: * using some form of queuing, you may wish to call method drain()
257: * to remove (and return) unprocessed commands from the queue after
258: * shutting down the pool and its clients. If you need to be sure
259: * these commands are processed, you can then run() each of the
260: * commands in the list returned by drain().
261: *
262: * </dl>
263: * <p>
264: *
265: * <b>Usage examples.</b>
266: * <p>
267: *
268: * Probably the most common use of pools is in statics or singletons
269: * accessible from a number of classes in a package; for example:
270: *
271: * <pre>
272: * class MyPool {
273: * // initialize to use a maximum of 8 threads.
274: * static PooledExecutor pool = new PooledExecutor(8);
275: * }
276: * </pre>
277: * Here are some sample variants in initialization:
278: * <ol>
279: * <li> Using a bounded buffer of 10 tasks, at least 4 threads (started only
280: * when needed due to incoming requests), but allowing
281: * up to 100 threads if the buffer gets full.
282: * <pre>
283: * pool = new PooledExecutor(new BoundedBuffer(10), 100);
284: * pool.setMinimumPoolSize(4);
285: * </pre>
286: * <li> Same as (1), except pre-start 9 threads, allowing them to
287: * die if they are not used for five minutes.
288: * <pre>
289: * pool = new PooledExecutor(new BoundedBuffer(10), 100);
290: * pool.setMinimumPoolSize(4);
291: * pool.setKeepAliveTime(1000 * 60 * 5);
292: * pool.createThreads(9);
293: * </pre>
294: * <li> Same as (2) except clients block if both the buffer is full and
295: * all 100 threads are busy:
296: * <pre>
297: * pool = new PooledExecutor(new BoundedBuffer(10), 100);
298: * pool.setMinimumPoolSize(4);
299: * pool.setKeepAliveTime(1000 * 60 * 5);
300: * pool.waitWhenBlocked();
301: * pool.createThreads(9);
302: * </pre>
303: * <li> An unbounded queue serviced by exactly 5 threads:
304: * <pre>
305: * pool = new PooledExecutor(new LinkedQueue());
306: * pool.setKeepAliveTime(-1); // live forever
307: * pool.createThreads(5);
308: * </pre>
309: * </ol>
310: *
311: * <p>
312: * <b>Usage notes.</b>
313: * <p>
314: *
315: * Pools do not mesh well with using thread-specific storage via
316: * java.lang.ThreadLocal. ThreadLocal relies on the identity of a
317: * thread executing a particular task. Pools use the same thread to
318: * perform different tasks. <p>
319: *
320: * If you need a policy not handled by the parameters in this class
321: * consider writing a subclass. <p>
322: *
323: * Version note: Previous versions of this class relied on
324: * ThreadGroups for aggregate control. This has been removed, and the
325: * method interruptAll added, to avoid differences in behavior across
326: * JVMs.
327: *
328: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
329: **/
330:
331: public class PooledExecutor extends ThreadFactoryUser implements
332: Executor {
333:
334: /**
335: * The maximum pool size; used if not otherwise specified. Default
336: * value is essentially infinite (Integer.MAX_VALUE)
337: **/
338: public static final int DEFAULT_MAXIMUMPOOLSIZE = Integer.MAX_VALUE;
339:
340: /**
341: * The minimum pool size; used if not otherwise specified. Default
342: * value is 1.
343: **/
344: public static final int DEFAULT_MINIMUMPOOLSIZE = 1;
345:
346: /**
347: * The maximum time to keep worker threads alive waiting for new
348: * tasks; used if not otherwise specified. Default value is one
349: * minute (60000 milliseconds).
350: **/
351: public static final long DEFAULT_KEEPALIVETIME = 60 * 1000;
352:
353: /** The maximum number of threads allowed in pool. **/
354: protected int maximumPoolSize_ = DEFAULT_MAXIMUMPOOLSIZE;
355:
356: /** The minumum number of threads to maintain in pool. **/
357: protected int minimumPoolSize_ = DEFAULT_MINIMUMPOOLSIZE;
358:
359: /** Current pool size. **/
360: protected int poolSize_ = 0;
361:
362: /** The maximum time for an idle thread to wait for new task. **/
363: protected long keepAliveTime_ = DEFAULT_KEEPALIVETIME;
364:
365: /**
366: * Shutdown flag - latches true when a shutdown method is called
367: * in order to disable queuing/handoffs of new tasks.
368: **/
369: protected boolean shutdown_ = false;
370:
371: /**
372: * The channel used to hand off the command to a thread in the pool.
373: **/
374: protected final Channel handOff_;
375:
376: /**
377: * The set of active threads, declared as a map from workers to
378: * their threads. This is needed by the interruptAll method. It
379: * may also be useful in subclasses that need to perform other
380: * thread management chores.
381: **/
382: protected final Map threads_;
383:
384: /** The current handler for unserviceable requests. **/
385: protected BlockedExecutionHandler blockedExecutionHandler_;
386:
387: /**
388: * Create a new pool with all default settings
389: **/
390:
391: public PooledExecutor() {
392: this (new SynchronousChannel(), DEFAULT_MAXIMUMPOOLSIZE);
393: }
394:
395: /**
396: * Create a new pool with all default settings except
397: * for maximum pool size.
398: **/
399:
400: public PooledExecutor(int maxPoolSize) {
401: this (new SynchronousChannel(), maxPoolSize);
402: }
403:
404: /**
405: * Create a new pool that uses the supplied Channel for queuing, and
406: * with all default parameter settings.
407: **/
408:
409: public PooledExecutor(Channel channel) {
410: this (channel, DEFAULT_MAXIMUMPOOLSIZE);
411: }
412:
413: /**
414: * Create a new pool that uses the supplied Channel for queuing, and
415: * with all default parameter settings except for maximum pool size.
416: **/
417:
418: public PooledExecutor(Channel channel, int maxPoolSize) {
419: maximumPoolSize_ = maxPoolSize;
420: handOff_ = channel;
421: runWhenBlocked();
422: threads_ = new HashMap();
423: }
424:
425: /**
426: * Return the maximum number of threads to simultaneously execute
427: * New unqueued requests will be handled according to the current
428: * blocking policy once this limit is exceeded.
429: **/
430: public synchronized int getMaximumPoolSize() {
431: return maximumPoolSize_;
432: }
433:
434: /**
435: * Set the maximum number of threads to use. Decreasing the pool
436: * size will not immediately kill existing threads, but they may
437: * later die when idle.
438: * @exception IllegalArgumentException if less or equal to zero.
439: * (It is
440: * not considered an error to set the maximum to be less than than
441: * the minimum. However, in this case there are no guarantees
442: * about behavior.)
443: **/
444: public synchronized void setMaximumPoolSize(int newMaximum) {
445: if (newMaximum <= 0)
446: throw new IllegalArgumentException();
447: maximumPoolSize_ = newMaximum;
448: }
449:
450: /**
451: * Return the minimum number of threads to simultaneously execute.
452: * (Default value is 1). If fewer than the mininum number are
453: * running upon reception of a new request, a new thread is started
454: * to handle this request.
455: **/
456: public synchronized int getMinimumPoolSize() {
457: return minimumPoolSize_;
458: }
459:
460: /**
461: * Set the minimum number of threads to use.
462: * @exception IllegalArgumentException if less than zero. (It is not
463: * considered an error to set the minimum to be greater than the
464: * maximum. However, in this case there are no guarantees about
465: * behavior.)
466: **/
467: public synchronized void setMinimumPoolSize(int newMinimum) {
468: if (newMinimum < 0)
469: throw new IllegalArgumentException();
470: minimumPoolSize_ = newMinimum;
471: }
472:
473: /**
474: * Return the current number of active threads in the pool. This
475: * number is just a snaphot, and may change immediately upon
476: * returning
477: **/
478: public synchronized int getPoolSize() {
479: return poolSize_;
480: }
481:
482: /**
483: * Return the number of milliseconds to keep threads alive waiting
484: * for new commands. A negative value means to wait forever. A zero
485: * value means not to wait at all.
486: **/
487: public synchronized long getKeepAliveTime() {
488: return keepAliveTime_;
489: }
490:
491: /**
492: * Set the number of milliseconds to keep threads alive waiting for
493: * new commands. A negative value means to wait forever. A zero
494: * value means not to wait at all.
495: **/
496: public synchronized void setKeepAliveTime(long msecs) {
497: keepAliveTime_ = msecs;
498: }
499:
500: /** Get the handler for blocked execution **/
501: public synchronized BlockedExecutionHandler getBlockedExecutionHandler() {
502: return blockedExecutionHandler_;
503: }
504:
505: /** Set the handler for blocked execution **/
506: public synchronized void setBlockedExecutionHandler(
507: BlockedExecutionHandler h) {
508: blockedExecutionHandler_ = h;
509: }
510:
511: /**
512: * Create and start a thread to handle a new command. Call only
513: * when holding lock.
514: **/
515: protected void addThread(Runnable command) {
516: Worker worker = new Worker(command);
517: Thread thread = getThreadFactory().newThread(worker);
518: threads_.put(worker, thread);
519: ++poolSize_;
520: thread.start();
521: }
522:
523: /**
524: * Create and start up to numberOfThreads threads in the pool.
525: * Return the number created. This may be less than the number
526: * requested if creating more would exceed maximum pool size bound.
527: **/
528: public int createThreads(int numberOfThreads) {
529: int ncreated = 0;
530: for (int i = 0; i < numberOfThreads; ++i) {
531: synchronized (this ) {
532: if (poolSize_ < maximumPoolSize_) {
533: addThread(null);
534: ++ncreated;
535: } else
536: break;
537: }
538: }
539: return ncreated;
540: }
541:
542: /**
543: * Interrupt all threads in the pool, causing them all to
544: * terminate. Assuming that executed tasks do not disable (clear)
545: * interruptions, each thread will terminate after processing its
546: * current task. Threads will terminate sooner if the executed tasks
547: * themselves respond to interrupts.
548: **/
549: public synchronized void interruptAll() {
550: for (Iterator it = threads_.values().iterator(); it.hasNext();) {
551: Thread t = (Thread) (it.next());
552: t.interrupt();
553: }
554: }
555:
556: /**
557: * Interrupt all threads and disable construction of new
558: * threads. Any tasks entered after this point will be discarded. A
559: * shut down pool cannot be restarted.
560: */
561: public void shutdownNow() {
562: shutdownNow(new DiscardWhenBlocked());
563: }
564:
565: /**
566: * Interrupt all threads and disable construction of new
567: * threads. Any tasks entered after this point will be handled by
568: * the given BlockedExecutionHandler. A shut down pool cannot be
569: * restarted.
570: */
571: public synchronized void shutdownNow(BlockedExecutionHandler handler) {
572: setBlockedExecutionHandler(handler);
573: shutdown_ = true; // don't allow new tasks
574: minimumPoolSize_ = maximumPoolSize_ = 0; // don't make new threads
575: interruptAll(); // interrupt all existing threads
576: }
577:
578: /**
579: * Terminate threads after processing all elements currently in
580: * queue. Any tasks entered after this point will be discarded. A
581: * shut down pool cannot be restarted.
582: **/
583: public void shutdownAfterProcessingCurrentlyQueuedTasks() {
584: shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked());
585: }
586:
587: /**
588: * Terminate threads after processing all elements currently in
589: * queue. Any tasks entered after this point will be handled by the
590: * given BlockedExecutionHandler. A shut down pool cannot be
591: * restarted.
592: **/
593: public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks(
594: BlockedExecutionHandler handler) {
595: setBlockedExecutionHandler(handler);
596: shutdown_ = true;
597: if (poolSize_ == 0) // disable new thread construction when idle
598: minimumPoolSize_ = maximumPoolSize_ = 0;
599: }
600:
601: /**
602: * Return true if a shutDown method has succeeded in terminating all
603: * threads.
604: */
605: public synchronized boolean isTerminatedAfterShutdown() {
606: return shutdown_ && poolSize_ == 0;
607: }
608:
609: /**
610: * Wait for a shutdown pool to fully terminate, or until the timeout
611: * has expired. This method may only be called <em>after</em>
612: * invoking shutdownNow or
613: * shutdownAfterProcessingCurrentlyQueuedTasks.
614: *
615: * @param maxWaitTime the maximum time in milliseconds to wait
616: * @return true if the pool has terminated within the max wait period
617: * @exception IllegalStateException if shutdown has not been requested
618: * @exception InterruptedException if the current thread has been interrupted in the course of waiting
619: */
620: public synchronized boolean awaitTerminationAfterShutdown(
621: long maxWaitTime) throws InterruptedException {
622: if (!shutdown_)
623: throw new IllegalStateException();
624: if (poolSize_ == 0)
625: return true;
626: long waitTime = maxWaitTime;
627: if (waitTime <= 0)
628: return false;
629: long start = System.currentTimeMillis();
630: for (;;) {
631: wait(waitTime);
632: if (poolSize_ == 0)
633: return true;
634: waitTime = maxWaitTime
635: - (System.currentTimeMillis() - start);
636: if (waitTime <= 0)
637: return false;
638: }
639: }
640:
641: /**
642: * Wait for a shutdown pool to fully terminate. This method may
643: * only be called <em>after</em> invoking shutdownNow or
644: * shutdownAfterProcessingCurrentlyQueuedTasks.
645: *
646: * @exception IllegalStateException if shutdown has not been requested
647: * @exception InterruptedException if the current thread has been interrupted in the course of waiting
648: */
649: public synchronized void awaitTerminationAfterShutdown()
650: throws InterruptedException {
651: if (!shutdown_)
652: throw new IllegalStateException();
653: while (poolSize_ > 0)
654: wait();
655: }
656:
657: /**
658: * Remove all unprocessed tasks from pool queue, and return them in
659: * a java.util.List. Thsi method should be used only when there are
660: * not any active clients of the pool. Otherwise you face the
661: * possibility that the method will loop pulling out tasks as
662: * clients are putting them in. This method can be useful after
663: * shutting down a pool (via shutdownNow) to determine whether there
664: * are any pending tasks that were not processed. You can then, for
665: * example execute all unprocessed commands via code along the lines
666: * of:
667: *
668: * <pre>
669: * List tasks = pool.drain();
670: * for (Iterator it = tasks.iterator(); it.hasNext();)
671: * ( (Runnable)(it.next()) ).run();
672: * </pre>
673: **/
674: public List drain() {
675: boolean wasInterrupted = false;
676: Vector tasks = new Vector();
677: for (;;) {
678: try {
679: Object x = handOff_.poll(0);
680: if (x == null)
681: break;
682: else
683: tasks.addElement(x);
684: } catch (InterruptedException ex) {
685: wasInterrupted = true; // postpone re-interrupt until drained
686: }
687: }
688: if (wasInterrupted)
689: Thread.currentThread().interrupt();
690: return tasks;
691: }
692:
693: /**
694: * Cleanup method called upon termination of worker thread.
695: **/
696: protected synchronized void workerDone(Worker w) {
697: threads_.remove(w);
698: if (--poolSize_ == 0 && shutdown_) {
699: maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads
700: notifyAll(); // notify awaitTerminationAfterShutdown
701: }
702: }
703:
704: /**
705: * Get a task from the handoff queue, or null if shutting down.
706: **/
707: protected Runnable getTask() throws InterruptedException {
708: long waitTime;
709: synchronized (this ) {
710: if (poolSize_ > maximumPoolSize_) // Cause to die if too many threads
711: return null;
712: waitTime = (shutdown_) ? 0 : keepAliveTime_;
713: }
714: if (waitTime >= 0)
715: return (Runnable) (handOff_.poll(waitTime));
716: else
717: return (Runnable) (handOff_.take());
718: }
719:
720: /**
721: * Class defining the basic run loop for pooled threads.
722: **/
723: protected class Worker implements Runnable {
724: protected Runnable firstTask_;
725:
726: protected Worker(Runnable firstTask) {
727: firstTask_ = firstTask;
728: }
729:
730: public void run() {
731: try {
732: Runnable task = firstTask_;
733: firstTask_ = null; // enable GC
734:
735: if (task != null) {
736: task.run();
737: task = null;
738: }
739:
740: while ((task = getTask()) != null) {
741: task.run();
742: task = null;
743: }
744: } catch (InterruptedException ex) {
745: } // fall through
746: finally {
747: workerDone(this );
748: }
749: }
750: }
751:
752: /**
753: * Class for actions to take when execute() blocks. Uses Strategy
754: * pattern to represent different actions. You can add more in
755: * subclasses, and/or create subclasses of these. If so, you will
756: * also want to add or modify the corresponding methods that set the
757: * current blockedExectionHandler_.
758: **/
759: public interface BlockedExecutionHandler {
760: /**
761: * Return true if successfully handled so, execute should
762: * terminate; else return false if execute loop should be retried.
763: **/
764: boolean blockedAction(Runnable command)
765: throws InterruptedException;
766: }
767:
768: /** Class defining Run action. **/
769: protected class RunWhenBlocked implements BlockedExecutionHandler {
770: public boolean blockedAction(Runnable command) {
771: command.run();
772: return true;
773: }
774: }
775:
776: /**
777: * Set the policy for blocked execution to be that the current
778: * thread executes the command if there are no available threads in
779: * the pool.
780: **/
781: public void runWhenBlocked() {
782: setBlockedExecutionHandler(new RunWhenBlocked());
783: }
784:
785: /** Class defining Wait action. **/
786: protected class WaitWhenBlocked implements BlockedExecutionHandler {
787: public boolean blockedAction(Runnable command)
788: throws InterruptedException {
789: handOff_.put(command);
790: return true;
791: }
792: }
793:
794: /**
795: * Set the policy for blocked execution to be to wait until a thread
796: * is available.
797: **/
798: public void waitWhenBlocked() {
799: setBlockedExecutionHandler(new WaitWhenBlocked());
800: }
801:
802: /** Class defining Discard action. **/
803: protected class DiscardWhenBlocked implements
804: BlockedExecutionHandler {
805: public boolean blockedAction(Runnable command) {
806: return true;
807: }
808: }
809:
810: /**
811: * Set the policy for blocked execution to be to return without
812: * executing the request.
813: **/
814: public void discardWhenBlocked() {
815: setBlockedExecutionHandler(new DiscardWhenBlocked());
816: }
817:
818: /** Class defining Abort action. **/
819: protected class AbortWhenBlocked implements BlockedExecutionHandler {
820: public boolean blockedAction(Runnable command) {
821: throw new RuntimeException("Pool is blocked");
822: }
823: }
824:
825: /**
826: * Set the policy for blocked execution to be to
827: * throw a RuntimeException.
828: **/
829: public void abortWhenBlocked() {
830: setBlockedExecutionHandler(new AbortWhenBlocked());
831: }
832:
833: /**
834: * Class defining DiscardOldest action. Under this policy, at most
835: * one old unhandled task is discarded. If the new task can then be
836: * handed off, it is. Otherwise, the new task is run in the current
837: * thread (i.e., RunWhenBlocked is used as a backup policy.)
838: **/
839: protected class DiscardOldestWhenBlocked implements
840: BlockedExecutionHandler {
841: public boolean blockedAction(Runnable command)
842: throws InterruptedException {
843: handOff_.poll(0);
844: if (!handOff_.offer(command, 0))
845: command.run();
846: return true;
847: }
848: }
849:
850: /**
851: * Set the policy for blocked execution to be to discard the oldest
852: * unhandled request
853: **/
854: public void discardOldestWhenBlocked() {
855: setBlockedExecutionHandler(new DiscardOldestWhenBlocked());
856: }
857:
858: /**
859: * Arrange for the given command to be executed by a thread in this
860: * pool. The method normally returns when the command has been
861: * handed off for (possibly later) execution.
862: **/
863: public void execute(Runnable command) throws InterruptedException {
864: for (;;) {
865: synchronized (this ) {
866: if (!shutdown_) {
867: int size = poolSize_;
868:
869: // Ensure minimum number of threads
870: if (size < minimumPoolSize_) {
871: addThread(command);
872: return;
873: }
874:
875: // Try to give to existing thread
876: if (handOff_.offer(command, 0)) {
877: return;
878: }
879:
880: // If cannot handoff and still under maximum, create new thread
881: if (size < maximumPoolSize_) {
882: addThread(command);
883: return;
884: }
885: }
886: }
887:
888: // Cannot hand off and cannot create -- ask for help
889: if (getBlockedExecutionHandler().blockedAction(command)) {
890: return;
891: }
892: }
893: }
894: }
|