0001: /*
0002: * Written by Doug Lea with assistance from members of JCP JSR-166
0003: * Expert Group and released to the public domain, as explained at
0004: * http://creativecommons.org/licenses/publicdomain
0005: */
0006:
0007: package java.util.concurrent;
0008:
0009: import java.util.concurrent.locks.*;
0010: import java.util.*;
0011:
0012: /**
0013: * An {@link ExecutorService} that executes each submitted task using
0014: * one of possibly several pooled threads, normally configured
0015: * using {@link Executors} factory methods.
0016: *
0017: * <p>Thread pools address two different problems: they usually
0018: * provide improved performance when executing large numbers of
0019: * asynchronous tasks, due to reduced per-task invocation overhead,
0020: * and they provide a means of bounding and managing the resources,
0021: * including threads, consumed when executing a collection of tasks.
0022: * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
0023: * statistics, such as the number of completed tasks.
0024: *
0025: * <p>To be useful across a wide range of contexts, this class
0026: * provides many adjustable parameters and extensibility
0027: * hooks. However, programmers are urged to use the more convenient
0028: * {@link Executors} factory methods {@link
0029: * Executors#newCachedThreadPool} (unbounded thread pool, with
0030: * automatic thread reclamation), {@link Executors#newFixedThreadPool}
0031: * (fixed size thread pool) and {@link
0032: * Executors#newSingleThreadExecutor} (single background thread), that
0033: * preconfigure settings for the most common usage
0034: * scenarios. Otherwise, use the following guide when manually
0035: * configuring and tuning this class:
0036: *
0037: * <dl>
0038: *
0039: * <dt>Core and maximum pool sizes</dt>
0040: *
0041: * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
0042: * pool size
0043: * (see {@link ThreadPoolExecutor#getPoolSize})
0044: * according to the bounds set by corePoolSize
0045: * (see {@link ThreadPoolExecutor#getCorePoolSize})
0046: * and
0047: * maximumPoolSize
0048: * (see {@link ThreadPoolExecutor#getMaximumPoolSize}).
0049: * When a new task is submitted in method {@link
0050: * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads
0051: * are running, a new thread is created to handle the request, even if
0052: * other worker threads are idle. If there are more than
0053: * corePoolSize but less than maximumPoolSize threads running, a new
0054: * thread will be created only if the queue is full. By setting
0055: * corePoolSize and maximumPoolSize the same, you create a fixed-size
0056: * thread pool. By setting maximumPoolSize to an essentially unbounded
0057: * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to
0058: * accommodate an arbitrary number of concurrent tasks. Most typically,
0059: * core and maximum pool sizes are set only upon construction, but they
0060: * may also be changed dynamically using {@link
0061: * ThreadPoolExecutor#setCorePoolSize} and {@link
0062: * ThreadPoolExecutor#setMaximumPoolSize}. <dd>
0063: *
0064: * <dt> On-demand construction
0065: *
0066: * <dd> By default, even core threads are initially created and
0067: * started only when needed by new tasks, but this can be overridden
0068: * dynamically using method {@link
0069: * ThreadPoolExecutor#prestartCoreThread} or
0070: * {@link ThreadPoolExecutor#prestartAllCoreThreads}. </dd>
0071: *
0072: * <dt>Creating new threads</dt>
0073: *
0074: * <dd>New threads are created using a {@link
0075: * java.util.concurrent.ThreadFactory}. If not otherwise specified, a
0076: * {@link Executors#defaultThreadFactory} is used, that creates threads to all
0077: * be in the same {@link ThreadGroup} and with the same
0078: * <tt>NORM_PRIORITY</tt> priority and non-daemon status. By supplying
0079: * a different ThreadFactory, you can alter the thread's name, thread
0080: * group, priority, daemon status, etc. </dd>
0081: *
0082: * <dt>Keep-alive times</dt>
0083: *
0084: * <dd>If the pool currently has more than corePoolSize threads,
0085: * excess threads will be terminated if they have been idle for more
0086: * than the keepAliveTime (see {@link
0087: * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of
0088: * reducing resource consumption when the pool is not being actively
0089: * used. If the pool becomes more active later, new threads will be
0090: * constructed. This parameter can also be changed dynamically
0091: * using method {@link ThreadPoolExecutor#setKeepAliveTime}. Using
0092: * a value of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS}
0093: * effectively disables idle threads from ever terminating prior
0094: * to shut down.
0095: * </dd>
0096: *
0097: * <dt>Queuing</dt>
0098: *
0099: * <dd>Any {@link BlockingQueue} may be used to transfer and hold
0100: * submitted tasks. The use of this queue interacts with pool sizing:
0101: *
0102: * <ul>
0103: *
0104: * <li> If fewer than corePoolSize threads are running, the Executor
0105: * always prefers adding a new thread
0106: * rather than queuing.</li>
0107: *
0108: * <li> If corePoolSize or more threads are running, the Executor
0109: * always prefers queuing a request rather than adding a new
0110: * thread.</li>
0111: *
0112: * <li> If a request cannot be queued, a new thread is created unless
0113: * this would exceed maximumPoolSize, in which case, the task will be
0114: * rejected.</li>
0115: *
0116: * </ul>
0117: *
0118: * There are three general strategies for queuing:
0119: * <ol>
0120: *
0121: * <li> <em> Direct handoffs.</em> A good default choice for a work
0122: * queue is a {@link SynchronousQueue} that hands off tasks to threads
0123: * without otherwise holding them. Here, an attempt to queue a task
0124: * will fail if no threads are immediately available to run it, so a
0125: * new thread will be constructed. This policy avoids lockups when
0126: * handling sets of requests that might have internal dependencies.
0127: * Direct handoffs generally require unbounded maximumPoolSizes to
0128: * avoid rejection of new submitted tasks. This in turn admits the
0129: * possibility of unbounded thread growth when commands continue to
0130: * arrive on average faster than they can be processed. </li>
0131: *
0132: * <li><em> Unbounded queues.</em> Using an unbounded queue (for
0133: * example a {@link LinkedBlockingQueue} without a predefined
0134: * capacity) will cause new tasks to be queued in cases where all
0135: * corePoolSize threads are busy. Thus, no more than corePoolSize
0136: * threads will ever be created. (And the value of the maximumPoolSize
0137: * therefore doesn't have any effect.) This may be appropriate when
0138: * each task is completely independent of others, so tasks cannot
0139: * affect each others execution; for example, in a web page server.
0140: * While this style of queuing can be useful in smoothing out
0141: * transient bursts of requests, it admits the possibility of
0142: * unbounded work queue growth when commands continue to arrive on
0143: * average faster than they can be processed. </li>
0144: *
0145: * <li><em>Bounded queues.</em> A bounded queue (for example, an
0146: * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
0147: * used with finite maximumPoolSizes, but can be more difficult to
0148: * tune and control. Queue sizes and maximum pool sizes may be traded
0149: * off for each other: Using large queues and small pools minimizes
0150: * CPU usage, OS resources, and context-switching overhead, but can
0151: * lead to artificially low throughput. If tasks frequently block (for
0152: * example if they are I/O bound), a system may be able to schedule
0153: * time for more threads than you otherwise allow. Use of small queues
0154: * generally requires larger pool sizes, which keeps CPUs busier but
0155: * may encounter unacceptable scheduling overhead, which also
0156: * decreases throughput. </li>
0157: *
0158: * </ol>
0159: *
0160: * </dd>
0161: *
0162: * <dt>Rejected tasks</dt>
0163: *
0164: * <dd> New tasks submitted in method {@link
0165: * ThreadPoolExecutor#execute} will be <em>rejected</em> when the
0166: * Executor has been shut down, and also when the Executor uses finite
0167: * bounds for both maximum threads and work queue capacity, and is
0168: * saturated. In either case, the <tt>execute</tt> method invokes the
0169: * {@link RejectedExecutionHandler#rejectedExecution} method of its
0170: * {@link RejectedExecutionHandler}. Four predefined handler policies
0171: * are provided:
0172: *
0173: * <ol>
0174: *
0175: * <li> In the
0176: * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a
0177: * runtime {@link RejectedExecutionException} upon rejection. </li>
0178: *
0179: * <li> In {@link
0180: * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes
0181: * <tt>execute</tt> itself runs the task. This provides a simple
0182: * feedback control mechanism that will slow down the rate that new
0183: * tasks are submitted. </li>
0184: *
0185: * <li> In {@link ThreadPoolExecutor.DiscardPolicy},
0186: * a task that cannot be executed is simply dropped. </li>
0187: *
0188: * <li>In {@link
0189: * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not
0190: * shut down, the task at the head of the work queue is dropped, and
0191: * then execution is retried (which can fail again, causing this to be
0192: * repeated.) </li>
0193: *
0194: * </ol>
0195: *
0196: * It is possible to define and use other kinds of {@link
0197: * RejectedExecutionHandler} classes. Doing so requires some care
0198: * especially when policies are designed to work only under particular
0199: * capacity or queuing policies. </dd>
0200: *
0201: * <dt>Hook methods</dt>
0202: *
0203: * <dd>This class provides <tt>protected</tt> overridable {@link
0204: * ThreadPoolExecutor#beforeExecute} and {@link
0205: * ThreadPoolExecutor#afterExecute} methods that are called before and
0206: * after execution of each task. These can be used to manipulate the
0207: * execution environment, for example, reinitializing ThreadLocals,
0208: * gathering statistics, or adding log entries. Additionally, method
0209: * {@link ThreadPoolExecutor#terminated} can be overridden to perform
0210: * any special processing that needs to be done once the Executor has
0211: * fully terminated.</dd>
0212: *
0213: * <dt>Queue maintenance</dt>
0214: *
0215: * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to
0216: * the work queue for purposes of monitoring and debugging. Use of
0217: * this method for any other purpose is strongly discouraged. Two
0218: * supplied methods, {@link ThreadPoolExecutor#remove} and {@link
0219: * ThreadPoolExecutor#purge} are available to assist in storage
0220: * reclamation when large numbers of queued tasks become
0221: * cancelled.</dd> </dl>
0222: *
0223: * <p> <b>Extension example</b>. Most extensions of this class
0224: * override one or more of the protected hook methods. For example,
0225: * here is a subclass that adds a simple pause/resume feature:
0226: *
0227: * <pre>
0228: * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
0229: * private boolean isPaused;
0230: * private ReentrantLock pauseLock = new ReentrantLock();
0231: * private Condition unpaused = pauseLock.newCondition();
0232: *
0233: * public PausableThreadPoolExecutor(...) { super(...); }
0234: *
0235: * protected void beforeExecute(Thread t, Runnable r) {
0236: * super.beforeExecute(t, r);
0237: * pauseLock.lock();
0238: * try {
0239: * while (isPaused) unpaused.await();
0240: * } catch(InterruptedException ie) {
0241: * t.interrupt();
0242: * } finally {
0243: * pauseLock.unlock();
0244: * }
0245: * }
0246: *
0247: * public void pause() {
0248: * pauseLock.lock();
0249: * try {
0250: * isPaused = true;
0251: * } finally {
0252: * pauseLock.unlock();
0253: * }
0254: * }
0255: *
0256: * public void resume() {
0257: * pauseLock.lock();
0258: * try {
0259: * isPaused = false;
0260: * unpaused.signalAll();
0261: * } finally {
0262: * pauseLock.unlock();
0263: * }
0264: * }
0265: * }
0266: * </pre>
0267: * @since 1.5
0268: * @author Doug Lea
0269: */
0270: public class ThreadPoolExecutor extends AbstractExecutorService {
0271: /**
0272: * Only used to force toArray() to produce a Runnable[].
0273: */
0274: private static final Runnable[] EMPTY_RUNNABLE_ARRAY = new Runnable[0];
0275:
0276: /**
0277: * Permission for checking shutdown
0278: */
0279: private static final RuntimePermission shutdownPerm = new RuntimePermission(
0280: "modifyThread");
0281:
0282: /**
0283: * Queue used for holding tasks and handing off to worker threads.
0284: */
0285: private final BlockingQueue<Runnable> workQueue;
0286:
0287: /**
0288: * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
0289: * workers set.
0290: */
0291: private final ReentrantLock mainLock = new ReentrantLock();
0292:
0293: /**
0294: * Wait condition to support awaitTermination
0295: */
0296: private final Condition termination = mainLock.newCondition();
0297:
0298: /**
0299: * Set containing all worker threads in pool.
0300: */
0301: private final HashSet<Worker> workers = new HashSet<Worker>();
0302:
0303: /**
0304: * Timeout in nanoseconds for idle threads waiting for work.
0305: * Threads use this timeout only when there are more than
0306: * corePoolSize present. Otherwise they wait forever for new work.
0307: */
0308: private volatile long keepAliveTime;
0309:
0310: /**
0311: * Core pool size, updated only while holding mainLock,
0312: * but volatile to allow concurrent readability even
0313: * during updates.
0314: */
0315: private volatile int corePoolSize;
0316:
0317: /**
0318: * Maximum pool size, updated only while holding mainLock
0319: * but volatile to allow concurrent readability even
0320: * during updates.
0321: */
0322: private volatile int maximumPoolSize;
0323:
0324: /**
0325: * Current pool size, updated only while holding mainLock
0326: * but volatile to allow concurrent readability even
0327: * during updates.
0328: */
0329: private volatile int poolSize;
0330:
0331: /**
0332: * Lifecycle state
0333: */
0334: volatile int runState;
0335:
0336: // Special values for runState
0337: /** Normal, not-shutdown mode */
0338: static final int RUNNING = 0;
0339: /** Controlled shutdown mode */
0340: static final int SHUTDOWN = 1;
0341: /** Immediate shutdown mode */
0342: static final int STOP = 2;
0343: /** Final state */
0344: static final int TERMINATED = 3;
0345:
0346: /**
0347: * Handler called when saturated or shutdown in execute.
0348: */
0349: private volatile RejectedExecutionHandler handler;
0350:
0351: /**
0352: * Factory for new threads.
0353: */
0354: private volatile ThreadFactory threadFactory;
0355:
0356: /**
0357: * Tracks largest attained pool size.
0358: */
0359: private int largestPoolSize;
0360:
0361: /**
0362: * Counter for completed tasks. Updated only on termination of
0363: * worker threads.
0364: */
0365: private long completedTaskCount;
0366:
0367: /**
0368: * The default rejected execution handler
0369: */
0370: private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
0371:
0372: /**
0373: * Invoke the rejected execution handler for the given command.
0374: */
0375: void reject(Runnable command) {
0376: handler.rejectedExecution(command, this );
0377: }
0378:
0379: /**
0380: * Create and return a new thread running firstTask as its first
0381: * task. Call only while holding mainLock
0382: * @param firstTask the task the new thread should run first (or
0383: * null if none)
0384: * @return the new thread
0385: */
0386: private Thread addThread(Runnable firstTask) {
0387: Worker w = new Worker(firstTask);
0388: Thread t = threadFactory.newThread(w);
0389: w.thread = t;
0390: workers.add(w);
0391: int nt = ++poolSize;
0392: if (nt > largestPoolSize)
0393: largestPoolSize = nt;
0394: return t;
0395: }
0396:
0397: /**
0398: * Create and start a new thread running firstTask as its first
0399: * task, only if fewer than corePoolSize threads are running.
0400: * @param firstTask the task the new thread should run first (or
0401: * null if none)
0402: * @return true if successful.
0403: */
0404: private boolean addIfUnderCorePoolSize(Runnable firstTask) {
0405: Thread t = null;
0406: final ReentrantLock mainLock = this .mainLock;
0407: mainLock.lock();
0408: try {
0409: if (poolSize < corePoolSize)
0410: t = addThread(firstTask);
0411: } finally {
0412: mainLock.unlock();
0413: }
0414: if (t == null)
0415: return false;
0416: t.start();
0417: return true;
0418: }
0419:
0420: /**
0421: * Create and start a new thread only if fewer than maximumPoolSize
0422: * threads are running. The new thread runs as its first task the
0423: * next task in queue, or if there is none, the given task.
0424: * @param firstTask the task the new thread should run first (or
0425: * null if none)
0426: * @return null on failure, else the first task to be run by new thread.
0427: */
0428: private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
0429: Thread t = null;
0430: Runnable next = null;
0431: final ReentrantLock mainLock = this .mainLock;
0432: mainLock.lock();
0433: try {
0434: if (poolSize < maximumPoolSize) {
0435: next = workQueue.poll();
0436: if (next == null)
0437: next = firstTask;
0438: t = addThread(next);
0439: }
0440: } finally {
0441: mainLock.unlock();
0442: }
0443: if (t == null)
0444: return null;
0445: t.start();
0446: return next;
0447: }
0448:
0449: /**
0450: * Get the next task for a worker thread to run.
0451: * @return the task
0452: * @throws InterruptedException if interrupted while waiting for task
0453: */
0454: Runnable getTask() throws InterruptedException {
0455: for (;;) {
0456: switch (runState) {
0457: case RUNNING: {
0458: if (poolSize <= corePoolSize) // untimed wait if core
0459: return workQueue.take();
0460:
0461: long timeout = keepAliveTime;
0462: if (timeout <= 0) // die immediately for 0 timeout
0463: return null;
0464: Runnable r = workQueue.poll(timeout,
0465: TimeUnit.NANOSECONDS);
0466: if (r != null)
0467: return r;
0468: if (poolSize > corePoolSize) // timed out
0469: return null;
0470: // else, after timeout, pool shrank so shouldn't die, so retry
0471: break;
0472: }
0473:
0474: case SHUTDOWN: {
0475: // Help drain queue
0476: Runnable r = workQueue.poll();
0477: if (r != null)
0478: return r;
0479:
0480: // Check if can terminate
0481: if (workQueue.isEmpty()) {
0482: interruptIdleWorkers();
0483: return null;
0484: }
0485:
0486: // There could still be delayed tasks in queue.
0487: // Wait for one, re-checking state upon interruption
0488: try {
0489: return workQueue.take();
0490: } catch (InterruptedException ignore) {
0491: }
0492: break;
0493: }
0494:
0495: case STOP:
0496: return null;
0497: default:
0498: assert false;
0499: }
0500: }
0501: }
0502:
0503: /**
0504: * Wake up all threads that might be waiting for tasks.
0505: */
0506: void interruptIdleWorkers() {
0507: final ReentrantLock mainLock = this .mainLock;
0508: mainLock.lock();
0509: try {
0510: for (Worker w : workers)
0511: w.interruptIfIdle();
0512: } finally {
0513: mainLock.unlock();
0514: }
0515: }
0516:
0517: /**
0518: * Perform bookkeeping for a terminated worker thread.
0519: * @param w the worker
0520: */
0521: void workerDone(Worker w) {
0522: final ReentrantLock mainLock = this .mainLock;
0523: mainLock.lock();
0524: try {
0525: completedTaskCount += w.completedTasks;
0526: workers.remove(w);
0527: if (--poolSize > 0)
0528: return;
0529:
0530: // Else, this is the last thread. Deal with potential shutdown.
0531:
0532: int state = runState;
0533: assert state != TERMINATED;
0534:
0535: if (state != STOP) {
0536: // If there are queued tasks but no threads, create
0537: // replacement.
0538: Runnable r = workQueue.poll();
0539: if (r != null) {
0540: addThread(r).start();
0541: return;
0542: }
0543:
0544: // If there are some (presumably delayed) tasks but
0545: // none pollable, create an idle replacement to wait.
0546: if (!workQueue.isEmpty()) {
0547: addThread(null).start();
0548: return;
0549: }
0550:
0551: // Otherwise, we can exit without replacement
0552: if (state == RUNNING)
0553: return;
0554: }
0555:
0556: // Either state is STOP, or state is SHUTDOWN and there is
0557: // no work to do. So we can terminate.
0558: termination.signalAll();
0559: runState = TERMINATED;
0560: // fall through to call terminate() outside of lock.
0561: } finally {
0562: mainLock.unlock();
0563: }
0564:
0565: assert runState == TERMINATED;
0566: terminated();
0567: }
0568:
0569: /**
0570: * Worker threads
0571: */
0572: private class Worker implements Runnable {
0573:
0574: /**
0575: * The runLock is acquired and released surrounding each task
0576: * execution. It mainly protects against interrupts that are
0577: * intended to cancel the worker thread from instead
0578: * interrupting the task being run.
0579: */
0580: private final ReentrantLock runLock = new ReentrantLock();
0581:
0582: /**
0583: * Initial task to run before entering run loop
0584: */
0585: private Runnable firstTask;
0586:
0587: /**
0588: * Per thread completed task counter; accumulated
0589: * into completedTaskCount upon termination.
0590: */
0591: volatile long completedTasks;
0592:
0593: /**
0594: * Thread this worker is running in. Acts as a final field,
0595: * but cannot be set until thread is created.
0596: */
0597: Thread thread;
0598:
0599: Worker(Runnable firstTask) {
0600: this .firstTask = firstTask;
0601: }
0602:
0603: boolean isActive() {
0604: return runLock.isLocked();
0605: }
0606:
0607: /**
0608: * Interrupt thread if not running a task
0609: */
0610: void interruptIfIdle() {
0611: final ReentrantLock runLock = this .runLock;
0612: if (runLock.tryLock()) {
0613: try {
0614: thread.interrupt();
0615: } finally {
0616: runLock.unlock();
0617: }
0618: }
0619: }
0620:
0621: /**
0622: * Cause thread to die even if running a task.
0623: */
0624: void interruptNow() {
0625: thread.interrupt();
0626: }
0627:
0628: /**
0629: * Run a single task between before/after methods.
0630: */
0631: private void runTask(Runnable task) {
0632: final ReentrantLock runLock = this .runLock;
0633: runLock.lock();
0634: try {
0635: // Abort now if immediate cancel. Otherwise, we have
0636: // committed to run this task.
0637: if (runState == STOP)
0638: return;
0639:
0640: Thread.interrupted(); // clear interrupt status on entry
0641: boolean ran = false;
0642: beforeExecute(thread, task);
0643: try {
0644: task.run();
0645: ran = true;
0646: afterExecute(task, null);
0647: ++completedTasks;
0648: } catch (RuntimeException ex) {
0649: if (!ran)
0650: afterExecute(task, ex);
0651: // Else the exception occurred within
0652: // afterExecute itself in which case we don't
0653: // want to call it again.
0654: throw ex;
0655: }
0656: } finally {
0657: runLock.unlock();
0658: }
0659: }
0660:
0661: /**
0662: * Main run loop
0663: */
0664: public void run() {
0665: try {
0666: Runnable task = firstTask;
0667: firstTask = null;
0668: while (task != null || (task = getTask()) != null) {
0669: runTask(task);
0670: task = null; // unnecessary but can help GC
0671: }
0672: } catch (InterruptedException ie) {
0673: // fall through
0674: } finally {
0675: workerDone(this );
0676: }
0677: }
0678: }
0679:
0680: // Public methods
0681:
0682: /**
0683: * Creates a new <tt>ThreadPoolExecutor</tt> with the given
0684: * initial parameters and default thread factory and handler. It
0685: * may be more convenient to use one of the {@link Executors}
0686: * factory methods instead of this general purpose constructor.
0687: *
0688: * @param corePoolSize the number of threads to keep in the
0689: * pool, even if they are idle.
0690: * @param maximumPoolSize the maximum number of threads to allow in the
0691: * pool.
0692: * @param keepAliveTime when the number of threads is greater than
0693: * the core, this is the maximum time that excess idle threads
0694: * will wait for new tasks before terminating.
0695: * @param unit the time unit for the keepAliveTime
0696: * argument.
0697: * @param workQueue the queue to use for holding tasks before they
0698: * are executed. This queue will hold only the <tt>Runnable</tt>
0699: * tasks submitted by the <tt>execute</tt> method.
0700: * @throws IllegalArgumentException if corePoolSize, or
0701: * keepAliveTime less than zero, or if maximumPoolSize less than or
0702: * equal to zero, or if corePoolSize greater than maximumPoolSize.
0703: * @throws NullPointerException if <tt>workQueue</tt> is null
0704: */
0705: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
0706: long keepAliveTime, TimeUnit unit,
0707: BlockingQueue<Runnable> workQueue) {
0708: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
0709: workQueue, Executors.defaultThreadFactory(),
0710: defaultHandler);
0711: }
0712:
0713: /**
0714: * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
0715: * parameters.
0716: *
0717: * @param corePoolSize the number of threads to keep in the
0718: * pool, even if they are idle.
0719: * @param maximumPoolSize the maximum number of threads to allow in the
0720: * pool.
0721: * @param keepAliveTime when the number of threads is greater than
0722: * the core, this is the maximum time that excess idle threads
0723: * will wait for new tasks before terminating.
0724: * @param unit the time unit for the keepAliveTime
0725: * argument.
0726: * @param workQueue the queue to use for holding tasks before they
0727: * are executed. This queue will hold only the <tt>Runnable</tt>
0728: * tasks submitted by the <tt>execute</tt> method.
0729: * @param threadFactory the factory to use when the executor
0730: * creates a new thread.
0731: * @throws IllegalArgumentException if corePoolSize, or
0732: * keepAliveTime less than zero, or if maximumPoolSize less than or
0733: * equal to zero, or if corePoolSize greater than maximumPoolSize.
0734: * @throws NullPointerException if <tt>workQueue</tt>
0735: * or <tt>threadFactory</tt> are null.
0736: */
0737: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
0738: long keepAliveTime, TimeUnit unit,
0739: BlockingQueue<Runnable> workQueue,
0740: ThreadFactory threadFactory) {
0741: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
0742: workQueue, threadFactory, defaultHandler);
0743: }
0744:
0745: /**
0746: * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
0747: * parameters.
0748: *
0749: * @param corePoolSize the number of threads to keep in the
0750: * pool, even if they are idle.
0751: * @param maximumPoolSize the maximum number of threads to allow in the
0752: * pool.
0753: * @param keepAliveTime when the number of threads is greater than
0754: * the core, this is the maximum time that excess idle threads
0755: * will wait for new tasks before terminating.
0756: * @param unit the time unit for the keepAliveTime
0757: * argument.
0758: * @param workQueue the queue to use for holding tasks before they
0759: * are executed. This queue will hold only the <tt>Runnable</tt>
0760: * tasks submitted by the <tt>execute</tt> method.
0761: * @param handler the handler to use when execution is blocked
0762: * because the thread bounds and queue capacities are reached.
0763: * @throws IllegalArgumentException if corePoolSize, or
0764: * keepAliveTime less than zero, or if maximumPoolSize less than or
0765: * equal to zero, or if corePoolSize greater than maximumPoolSize.
0766: * @throws NullPointerException if <tt>workQueue</tt>
0767: * or <tt>handler</tt> are null.
0768: */
0769: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
0770: long keepAliveTime, TimeUnit unit,
0771: BlockingQueue<Runnable> workQueue,
0772: RejectedExecutionHandler handler) {
0773: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
0774: workQueue, Executors.defaultThreadFactory(), handler);
0775: }
0776:
0777: /**
0778: * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
0779: * parameters.
0780: *
0781: * @param corePoolSize the number of threads to keep in the
0782: * pool, even if they are idle.
0783: * @param maximumPoolSize the maximum number of threads to allow in the
0784: * pool.
0785: * @param keepAliveTime when the number of threads is greater than
0786: * the core, this is the maximum time that excess idle threads
0787: * will wait for new tasks before terminating.
0788: * @param unit the time unit for the keepAliveTime
0789: * argument.
0790: * @param workQueue the queue to use for holding tasks before they
0791: * are executed. This queue will hold only the <tt>Runnable</tt>
0792: * tasks submitted by the <tt>execute</tt> method.
0793: * @param threadFactory the factory to use when the executor
0794: * creates a new thread.
0795: * @param handler the handler to use when execution is blocked
0796: * because the thread bounds and queue capacities are reached.
0797: * @throws IllegalArgumentException if corePoolSize, or
0798: * keepAliveTime less than zero, or if maximumPoolSize less than or
0799: * equal to zero, or if corePoolSize greater than maximumPoolSize.
0800: * @throws NullPointerException if <tt>workQueue</tt>
0801: * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
0802: */
0803: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
0804: long keepAliveTime, TimeUnit unit,
0805: BlockingQueue<Runnable> workQueue,
0806: ThreadFactory threadFactory,
0807: RejectedExecutionHandler handler) {
0808: if (corePoolSize < 0 || maximumPoolSize <= 0
0809: || maximumPoolSize < corePoolSize || keepAliveTime < 0)
0810: throw new IllegalArgumentException();
0811: if (workQueue == null || threadFactory == null
0812: || handler == null)
0813: throw new NullPointerException();
0814: this .corePoolSize = corePoolSize;
0815: this .maximumPoolSize = maximumPoolSize;
0816: this .workQueue = workQueue;
0817: this .keepAliveTime = unit.toNanos(keepAliveTime);
0818: this .threadFactory = threadFactory;
0819: this .handler = handler;
0820: }
0821:
0822: /**
0823: * Executes the given task sometime in the future. The task
0824: * may execute in a new thread or in an existing pooled thread.
0825: *
0826: * If the task cannot be submitted for execution, either because this
0827: * executor has been shutdown or because its capacity has been reached,
0828: * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
0829: *
0830: * @param command the task to execute
0831: * @throws RejectedExecutionException at discretion of
0832: * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
0833: * for execution
0834: * @throws NullPointerException if command is null
0835: */
0836: public void execute(Runnable command) {
0837: if (command == null)
0838: throw new NullPointerException();
0839: for (;;) {
0840: if (runState != RUNNING) {
0841: reject(command);
0842: return;
0843: }
0844: if (poolSize < corePoolSize
0845: && addIfUnderCorePoolSize(command))
0846: return;
0847: if (workQueue.offer(command))
0848: return;
0849: Runnable r = addIfUnderMaximumPoolSize(command);
0850: if (r == command)
0851: return;
0852: if (r == null) {
0853: reject(command);
0854: return;
0855: }
0856: // else retry
0857: }
0858: }
0859:
0860: /**
0861: * Initiates an orderly shutdown in which previously submitted
0862: * tasks are executed, but no new tasks will be
0863: * accepted. Invocation has no additional effect if already shut
0864: * down.
0865: * @throws SecurityException if a security manager exists and
0866: * shutting down this ExecutorService may manipulate threads that
0867: * the caller is not permitted to modify because it does not hold
0868: * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
0869: * or the security manager's <tt>checkAccess</tt> method denies access.
0870: */
0871: public void shutdown() {
0872: // Fail if caller doesn't have modifyThread permission
0873: SecurityManager security = System.getSecurityManager();
0874: if (security != null)
0875: java.security.AccessController
0876: .checkPermission(shutdownPerm);
0877:
0878: boolean fullyTerminated = false;
0879: final ReentrantLock mainLock = this .mainLock;
0880: mainLock.lock();
0881: try {
0882: if (workers.size() > 0) {
0883: // Check if caller can modify worker threads. This
0884: // might not be true even if passed above check, if
0885: // the SecurityManager treats some threads specially.
0886: if (security != null) {
0887: for (Worker w : workers)
0888: security.checkAccess(w.thread);
0889: }
0890:
0891: int state = runState;
0892: if (state == RUNNING) // don't override shutdownNow
0893: runState = SHUTDOWN;
0894:
0895: try {
0896: for (Worker w : workers)
0897: w.interruptIfIdle();
0898: } catch (SecurityException se) {
0899: // If SecurityManager allows above checks, but
0900: // then unexpectedly throws exception when
0901: // interrupting threads (which it ought not do),
0902: // back out as cleanly as we can. Some threads may
0903: // have been killed but we remain in non-shutdown
0904: // state.
0905: runState = state;
0906: throw se;
0907: }
0908: } else { // If no workers, trigger full termination now
0909: fullyTerminated = true;
0910: runState = TERMINATED;
0911: termination.signalAll();
0912: }
0913: } finally {
0914: mainLock.unlock();
0915: }
0916: if (fullyTerminated)
0917: terminated();
0918: }
0919:
0920: /**
0921: * Attempts to stop all actively executing tasks, halts the
0922: * processing of waiting tasks, and returns a list of the tasks that were
0923: * awaiting execution.
0924: *
0925: * <p>This implementation cancels tasks via {@link
0926: * Thread#interrupt}, so if any tasks mask or fail to respond to
0927: * interrupts, they may never terminate.
0928: *
0929: * @return list of tasks that never commenced execution
0930: * @throws SecurityException if a security manager exists and
0931: * shutting down this ExecutorService may manipulate threads that
0932: * the caller is not permitted to modify because it does not hold
0933: * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
0934: * or the security manager's <tt>checkAccess</tt> method denies access.
0935: */
0936: public List<Runnable> shutdownNow() {
0937: // Almost the same code as shutdown()
0938: SecurityManager security = System.getSecurityManager();
0939: if (security != null)
0940: java.security.AccessController
0941: .checkPermission(shutdownPerm);
0942:
0943: boolean fullyTerminated = false;
0944: final ReentrantLock mainLock = this .mainLock;
0945: mainLock.lock();
0946: try {
0947: if (workers.size() > 0) {
0948: if (security != null) {
0949: for (Worker w : workers)
0950: security.checkAccess(w.thread);
0951: }
0952:
0953: int state = runState;
0954: if (state != TERMINATED)
0955: runState = STOP;
0956: try {
0957: for (Worker w : workers)
0958: w.interruptNow();
0959: } catch (SecurityException se) {
0960: runState = state; // back out;
0961: throw se;
0962: }
0963: } else { // If no workers, trigger full termination now
0964: fullyTerminated = true;
0965: runState = TERMINATED;
0966: termination.signalAll();
0967: }
0968: } finally {
0969: mainLock.unlock();
0970: }
0971: if (fullyTerminated)
0972: terminated();
0973: return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY));
0974: }
0975:
0976: public boolean isShutdown() {
0977: return runState != RUNNING;
0978: }
0979:
0980: /**
0981: * Returns true if this executor is in the process of terminating
0982: * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
0983: * completely terminated. This method may be useful for
0984: * debugging. A return of <tt>true</tt> reported a sufficient
0985: * period after shutdown may indicate that submitted tasks have
0986: * ignored or suppressed interruption, causing this executor not
0987: * to properly terminate.
0988: * @return true if terminating but not yet terminated.
0989: */
0990: public boolean isTerminating() {
0991: return runState == STOP;
0992: }
0993:
0994: public boolean isTerminated() {
0995: return runState == TERMINATED;
0996: }
0997:
0998: public boolean awaitTermination(long timeout, TimeUnit unit)
0999: throws InterruptedException {
1000: long nanos = unit.toNanos(timeout);
1001: final ReentrantLock mainLock = this .mainLock;
1002: mainLock.lock();
1003: try {
1004: for (;;) {
1005: if (runState == TERMINATED)
1006: return true;
1007: if (nanos <= 0)
1008: return false;
1009: nanos = termination.awaitNanos(nanos);
1010: }
1011: } finally {
1012: mainLock.unlock();
1013: }
1014: }
1015:
1016: /**
1017: * Invokes <tt>shutdown</tt> when this executor is no longer
1018: * referenced.
1019: */
1020: protected void finalize() {
1021: shutdown();
1022: }
1023:
1024: /**
1025: * Sets the thread factory used to create new threads.
1026: *
1027: * @param threadFactory the new thread factory
1028: * @throws NullPointerException if threadFactory is null
1029: * @see #getThreadFactory
1030: */
1031: public void setThreadFactory(ThreadFactory threadFactory) {
1032: if (threadFactory == null)
1033: throw new NullPointerException();
1034: this .threadFactory = threadFactory;
1035: }
1036:
1037: /**
1038: * Returns the thread factory used to create new threads.
1039: *
1040: * @return the current thread factory
1041: * @see #setThreadFactory
1042: */
1043: public ThreadFactory getThreadFactory() {
1044: return threadFactory;
1045: }
1046:
1047: /**
1048: * Sets a new handler for unexecutable tasks.
1049: *
1050: * @param handler the new handler
1051: * @throws NullPointerException if handler is null
1052: * @see #getRejectedExecutionHandler
1053: */
1054: public void setRejectedExecutionHandler(
1055: RejectedExecutionHandler handler) {
1056: if (handler == null)
1057: throw new NullPointerException();
1058: this .handler = handler;
1059: }
1060:
1061: /**
1062: * Returns the current handler for unexecutable tasks.
1063: *
1064: * @return the current handler
1065: * @see #setRejectedExecutionHandler
1066: */
1067: public RejectedExecutionHandler getRejectedExecutionHandler() {
1068: return handler;
1069: }
1070:
1071: /**
1072: * Returns the task queue used by this executor. Access to the
1073: * task queue is intended primarily for debugging and monitoring.
1074: * This queue may be in active use. Retrieving the task queue
1075: * does not prevent queued tasks from executing.
1076: *
1077: * @return the task queue
1078: */
1079: public BlockingQueue<Runnable> getQueue() {
1080: return workQueue;
1081: }
1082:
1083: /**
1084: * Removes this task from the executor's internal queue if it is
1085: * present, thus causing it not to be run if it has not already
1086: * started.
1087: *
1088: * <p> This method may be useful as one part of a cancellation
1089: * scheme. It may fail to remove tasks that have been converted
1090: * into other forms before being placed on the internal queue. For
1091: * example, a task entered using <tt>submit</tt> might be
1092: * converted into a form that maintains <tt>Future</tt> status.
1093: * However, in such cases, method {@link ThreadPoolExecutor#purge}
1094: * may be used to remove those Futures that have been cancelled.
1095: *
1096: *
1097: * @param task the task to remove
1098: * @return true if the task was removed
1099: */
1100: public boolean remove(Runnable task) {
1101: return getQueue().remove(task);
1102: }
1103:
1104: /**
1105: * Tries to remove from the work queue all {@link Future}
1106: * tasks that have been cancelled. This method can be useful as a
1107: * storage reclamation operation, that has no other impact on
1108: * functionality. Cancelled tasks are never executed, but may
1109: * accumulate in work queues until worker threads can actively
1110: * remove them. Invoking this method instead tries to remove them now.
1111: * However, this method may fail to remove tasks in
1112: * the presence of interference by other threads.
1113: */
1114: public void purge() {
1115: // Fail if we encounter interference during traversal
1116: try {
1117: Iterator<Runnable> it = getQueue().iterator();
1118: while (it.hasNext()) {
1119: Runnable r = it.next();
1120: if (r instanceof Future<?>) {
1121: Future<?> c = (Future<?>) r;
1122: if (c.isCancelled())
1123: it.remove();
1124: }
1125: }
1126: } catch (ConcurrentModificationException ex) {
1127: return;
1128: }
1129: }
1130:
1131: /**
1132: * Sets the core number of threads. This overrides any value set
1133: * in the constructor. If the new value is smaller than the
1134: * current value, excess existing threads will be terminated when
1135: * they next become idle. If larger, new threads will, if needed,
1136: * be started to execute any queued tasks.
1137: *
1138: * @param corePoolSize the new core size
1139: * @throws IllegalArgumentException if <tt>corePoolSize</tt>
1140: * less than zero
1141: * @see #getCorePoolSize
1142: */
1143: public void setCorePoolSize(int corePoolSize) {
1144: if (corePoolSize < 0)
1145: throw new IllegalArgumentException();
1146: final ReentrantLock mainLock = this .mainLock;
1147: mainLock.lock();
1148: try {
1149: int extra = this .corePoolSize - corePoolSize;
1150: this .corePoolSize = corePoolSize;
1151: if (extra < 0) {
1152: Runnable r;
1153: while (extra++ < 0 && poolSize < corePoolSize
1154: && (r = workQueue.poll()) != null)
1155: addThread(r).start();
1156: } else if (extra > 0 && poolSize > corePoolSize) {
1157: Iterator<Worker> it = workers.iterator();
1158: while (it.hasNext() && extra-- > 0
1159: && poolSize > corePoolSize
1160: && workQueue.remainingCapacity() == 0)
1161: it.next().interruptIfIdle();
1162: }
1163: } finally {
1164: mainLock.unlock();
1165: }
1166: }
1167:
1168: /**
1169: * Returns the core number of threads.
1170: *
1171: * @return the core number of threads
1172: * @see #setCorePoolSize
1173: */
1174: public int getCorePoolSize() {
1175: return corePoolSize;
1176: }
1177:
1178: /**
1179: * Starts a core thread, causing it to idly wait for work. This
1180: * overrides the default policy of starting core threads only when
1181: * new tasks are executed. This method will return <tt>false</tt>
1182: * if all core threads have already been started.
1183: * @return true if a thread was started
1184: */
1185: public boolean prestartCoreThread() {
1186: return addIfUnderCorePoolSize(null);
1187: }
1188:
1189: /**
1190: * Starts all core threads, causing them to idly wait for work. This
1191: * overrides the default policy of starting core threads only when
1192: * new tasks are executed.
1193: * @return the number of threads started.
1194: */
1195: public int prestartAllCoreThreads() {
1196: int n = 0;
1197: while (addIfUnderCorePoolSize(null))
1198: ++n;
1199: return n;
1200: }
1201:
1202: /**
1203: * Sets the maximum allowed number of threads. This overrides any
1204: * value set in the constructor. If the new value is smaller than
1205: * the current value, excess existing threads will be
1206: * terminated when they next become idle.
1207: *
1208: * @param maximumPoolSize the new maximum
1209: * @throws IllegalArgumentException if maximumPoolSize less than zero or
1210: * the {@link #getCorePoolSize core pool size}
1211: * @see #getMaximumPoolSize
1212: */
1213: public void setMaximumPoolSize(int maximumPoolSize) {
1214: if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
1215: throw new IllegalArgumentException();
1216: final ReentrantLock mainLock = this .mainLock;
1217: mainLock.lock();
1218: try {
1219: int extra = this .maximumPoolSize - maximumPoolSize;
1220: this .maximumPoolSize = maximumPoolSize;
1221: if (extra > 0 && poolSize > maximumPoolSize) {
1222: Iterator<Worker> it = workers.iterator();
1223: while (it.hasNext() && extra > 0
1224: && poolSize > maximumPoolSize) {
1225: it.next().interruptIfIdle();
1226: --extra;
1227: }
1228: }
1229: } finally {
1230: mainLock.unlock();
1231: }
1232: }
1233:
1234: /**
1235: * Returns the maximum allowed number of threads.
1236: *
1237: * @return the maximum allowed number of threads
1238: * @see #setMaximumPoolSize
1239: */
1240: public int getMaximumPoolSize() {
1241: return maximumPoolSize;
1242: }
1243:
1244: /**
1245: * Sets the time limit for which threads may remain idle before
1246: * being terminated. If there are more than the core number of
1247: * threads currently in the pool, after waiting this amount of
1248: * time without processing a task, excess threads will be
1249: * terminated. This overrides any value set in the constructor.
1250: * @param time the time to wait. A time value of zero will cause
1251: * excess threads to terminate immediately after executing tasks.
1252: * @param unit the time unit of the time argument
1253: * @throws IllegalArgumentException if time less than zero
1254: * @see #getKeepAliveTime
1255: */
1256: public void setKeepAliveTime(long time, TimeUnit unit) {
1257: if (time < 0)
1258: throw new IllegalArgumentException();
1259: this .keepAliveTime = unit.toNanos(time);
1260: }
1261:
1262: /**
1263: * Returns the thread keep-alive time, which is the amount of time
1264: * which threads in excess of the core pool size may remain
1265: * idle before being terminated.
1266: *
1267: * @param unit the desired time unit of the result
1268: * @return the time limit
1269: * @see #setKeepAliveTime
1270: */
1271: public long getKeepAliveTime(TimeUnit unit) {
1272: return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1273: }
1274:
1275: /* Statistics */
1276:
1277: /**
1278: * Returns the current number of threads in the pool.
1279: *
1280: * @return the number of threads
1281: */
1282: public int getPoolSize() {
1283: return poolSize;
1284: }
1285:
1286: /**
1287: * Returns the approximate number of threads that are actively
1288: * executing tasks.
1289: *
1290: * @return the number of threads
1291: */
1292: public int getActiveCount() {
1293: final ReentrantLock mainLock = this .mainLock;
1294: mainLock.lock();
1295: try {
1296: int n = 0;
1297: for (Worker w : workers) {
1298: if (w.isActive())
1299: ++n;
1300: }
1301: return n;
1302: } finally {
1303: mainLock.unlock();
1304: }
1305: }
1306:
1307: /**
1308: * Returns the largest number of threads that have ever
1309: * simultaneously been in the pool.
1310: *
1311: * @return the number of threads
1312: */
1313: public int getLargestPoolSize() {
1314: final ReentrantLock mainLock = this .mainLock;
1315: mainLock.lock();
1316: try {
1317: return largestPoolSize;
1318: } finally {
1319: mainLock.unlock();
1320: }
1321: }
1322:
1323: /**
1324: * Returns the approximate total number of tasks that have been
1325: * scheduled for execution. Because the states of tasks and
1326: * threads may change dynamically during computation, the returned
1327: * value is only an approximation, but one that does not ever
1328: * decrease across successive calls.
1329: *
1330: * @return the number of tasks
1331: */
1332: public long getTaskCount() {
1333: final ReentrantLock mainLock = this .mainLock;
1334: mainLock.lock();
1335: try {
1336: long n = completedTaskCount;
1337: for (Worker w : workers) {
1338: n += w.completedTasks;
1339: if (w.isActive())
1340: ++n;
1341: }
1342: return n + workQueue.size();
1343: } finally {
1344: mainLock.unlock();
1345: }
1346: }
1347:
1348: /**
1349: * Returns the approximate total number of tasks that have
1350: * completed execution. Because the states of tasks and threads
1351: * may change dynamically during computation, the returned value
1352: * is only an approximation, but one that does not ever decrease
1353: * across successive calls.
1354: *
1355: * @return the number of tasks
1356: */
1357: public long getCompletedTaskCount() {
1358: final ReentrantLock mainLock = this .mainLock;
1359: mainLock.lock();
1360: try {
1361: long n = completedTaskCount;
1362: for (Worker w : workers)
1363: n += w.completedTasks;
1364: return n;
1365: } finally {
1366: mainLock.unlock();
1367: }
1368: }
1369:
1370: /**
1371: * Method invoked prior to executing the given Runnable in the
1372: * given thread. This method is invoked by thread <tt>t</tt> that
1373: * will execute task <tt>r</tt>, and may be used to re-initialize
1374: * ThreadLocals, or to perform logging. Note: To properly nest
1375: * multiple overridings, subclasses should generally invoke
1376: * <tt>super.beforeExecute</tt> at the end of this method.
1377: *
1378: * @param t the thread that will run task r.
1379: * @param r the task that will be executed.
1380: */
1381: protected void beforeExecute(Thread t, Runnable r) {
1382: }
1383:
1384: /**
1385: * Method invoked upon completion of execution of the given
1386: * Runnable. This method is invoked by the thread that executed
1387: * the task. If non-null, the Throwable is the uncaught exception
1388: * that caused execution to terminate abruptly. Note: To properly
1389: * nest multiple overridings, subclasses should generally invoke
1390: * <tt>super.afterExecute</tt> at the beginning of this method.
1391: *
1392: * @param r the runnable that has completed.
1393: * @param t the exception that caused termination, or null if
1394: * execution completed normally.
1395: */
1396: protected void afterExecute(Runnable r, Throwable t) {
1397: }
1398:
1399: /**
1400: * Method invoked when the Executor has terminated. Default
1401: * implementation does nothing. Note: To properly nest multiple
1402: * overridings, subclasses should generally invoke
1403: * <tt>super.terminated</tt> within this method.
1404: */
1405: protected void terminated() {
1406: }
1407:
1408: /**
1409: * A handler for rejected tasks that runs the rejected task
1410: * directly in the calling thread of the <tt>execute</tt> method,
1411: * unless the executor has been shut down, in which case the task
1412: * is discarded.
1413: */
1414: public static class CallerRunsPolicy implements
1415: RejectedExecutionHandler {
1416: /**
1417: * Creates a <tt>CallerRunsPolicy</tt>.
1418: */
1419: public CallerRunsPolicy() {
1420: }
1421:
1422: /**
1423: * Executes task r in the caller's thread, unless the executor
1424: * has been shut down, in which case the task is discarded.
1425: * @param r the runnable task requested to be executed
1426: * @param e the executor attempting to execute this task
1427: */
1428: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1429: if (!e.isShutdown()) {
1430: r.run();
1431: }
1432: }
1433: }
1434:
1435: /**
1436: * A handler for rejected tasks that throws a
1437: * <tt>RejectedExecutionException</tt>.
1438: */
1439: public static class AbortPolicy implements RejectedExecutionHandler {
1440: /**
1441: * Creates an <tt>AbortPolicy</tt>.
1442: */
1443: public AbortPolicy() {
1444: }
1445:
1446: /**
1447: * Always throws RejectedExecutionException.
1448: * @param r the runnable task requested to be executed
1449: * @param e the executor attempting to execute this task
1450: * @throws RejectedExecutionException always.
1451: */
1452: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1453: throw new RejectedExecutionException();
1454: }
1455: }
1456:
1457: /**
1458: * A handler for rejected tasks that silently discards the
1459: * rejected task.
1460: */
1461: public static class DiscardPolicy implements
1462: RejectedExecutionHandler {
1463: /**
1464: * Creates a <tt>DiscardPolicy</tt>.
1465: */
1466: public DiscardPolicy() {
1467: }
1468:
1469: /**
1470: * Does nothing, which has the effect of discarding task r.
1471: * @param r the runnable task requested to be executed
1472: * @param e the executor attempting to execute this task
1473: */
1474: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1475: }
1476: }
1477:
1478: /**
1479: * A handler for rejected tasks that discards the oldest unhandled
1480: * request and then retries <tt>execute</tt>, unless the executor
1481: * is shut down, in which case the task is discarded.
1482: */
1483: public static class DiscardOldestPolicy implements
1484: RejectedExecutionHandler {
1485: /**
1486: * Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
1487: */
1488: public DiscardOldestPolicy() {
1489: }
1490:
1491: /**
1492: * Obtains and ignores the next task that the executor
1493: * would otherwise execute, if one is immediately available,
1494: * and then retries execution of task r, unless the executor
1495: * is shut down, in which case task r is instead discarded.
1496: * @param r the runnable task requested to be executed
1497: * @param e the executor attempting to execute this task
1498: */
1499: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1500: if (!e.isShutdown()) {
1501: e.getQueue().poll();
1502: e.execute(r);
1503: }
1504: }
1505: }
1506: }
|