Source Code Cross Referenced for ThreadPoolExecutor.java in  » Apache-Harmony-Java-SE » java-package » java » util » concurrent » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Apache Harmony Java SE » java package » java.util.concurrent 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * Written by Doug Lea with assistance from members of JCP JSR-166
0003:         * Expert Group and released to the public domain, as explained at
0004:         * http://creativecommons.org/licenses/publicdomain
0005:         */
0006:
0007:        package java.util.concurrent;
0008:
0009:        import java.util.concurrent.locks.*;
0010:        import java.util.*;
0011:
0012:        /**
0013:         * An {@link ExecutorService} that executes each submitted task using
0014:         * one of possibly several pooled threads, normally configured
0015:         * using {@link Executors} factory methods.
0016:         *
0017:         * <p>Thread pools address two different problems: they usually
0018:         * provide improved performance when executing large numbers of
0019:         * asynchronous tasks, due to reduced per-task invocation overhead,
0020:         * and they provide a means of bounding and managing the resources,
0021:         * including threads, consumed when executing a collection of tasks.
0022:         * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
0023:         * statistics, such as the number of completed tasks.
0024:         *
0025:         * <p>To be useful across a wide range of contexts, this class
0026:         * provides many adjustable parameters and extensibility
0027:         * hooks. However, programmers are urged to use the more convenient
0028:         * {@link Executors} factory methods {@link
0029:         * Executors#newCachedThreadPool} (unbounded thread pool, with
0030:         * automatic thread reclamation), {@link Executors#newFixedThreadPool}
0031:         * (fixed size thread pool) and {@link
0032:         * Executors#newSingleThreadExecutor} (single background thread), that
0033:         * preconfigure settings for the most common usage
0034:         * scenarios. Otherwise, use the following guide when manually
0035:         * configuring and tuning this class:
0036:         *
0037:         * <dl>
0038:         *
0039:         * <dt>Core and maximum pool sizes</dt>
0040:         *
0041:         * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
0042:         * pool size 
0043:         * (see {@link ThreadPoolExecutor#getPoolSize})
0044:         * according to the bounds set by corePoolSize 
0045:         * (see {@link ThreadPoolExecutor#getCorePoolSize})
0046:         * and
0047:         * maximumPoolSize
0048:         * (see {@link ThreadPoolExecutor#getMaximumPoolSize}).
0049:         * When a new task is submitted in method {@link
0050:         * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads
0051:         * are running, a new thread is created to handle the request, even if
0052:         * other worker threads are idle.  If there are more than
0053:         * corePoolSize but less than maximumPoolSize threads running, a new
0054:         * thread will be created only if the queue is full.  By setting
0055:         * corePoolSize and maximumPoolSize the same, you create a fixed-size
0056:         * thread pool. By setting maximumPoolSize to an essentially unbounded
0057:         * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to
0058:         * accommodate an arbitrary number of concurrent tasks. Most typically,
0059:         * core and maximum pool sizes are set only upon construction, but they
0060:         * may also be changed dynamically using {@link
0061:         * ThreadPoolExecutor#setCorePoolSize} and {@link
0062:         * ThreadPoolExecutor#setMaximumPoolSize}. <dd>
0063:         *
0064:         * <dt> On-demand construction
0065:         *
0066:         * <dd> By default, even core threads are initially created and
0067:         * started only when needed by new tasks, but this can be overridden
0068:         * dynamically using method {@link
0069:         * ThreadPoolExecutor#prestartCoreThread} or
0070:         * {@link ThreadPoolExecutor#prestartAllCoreThreads}.  </dd>
0071:         *
0072:         * <dt>Creating new threads</dt>
0073:         *
0074:         * <dd>New threads are created using a {@link
0075:         * java.util.concurrent.ThreadFactory}.  If not otherwise specified, a
0076:         * {@link Executors#defaultThreadFactory} is used, that creates threads to all
0077:         * be in the same {@link ThreadGroup} and with the same
0078:         * <tt>NORM_PRIORITY</tt> priority and non-daemon status. By supplying
0079:         * a different ThreadFactory, you can alter the thread's name, thread
0080:         * group, priority, daemon status, etc.  </dd>
0081:         *
0082:         * <dt>Keep-alive times</dt>
0083:         *
0084:         * <dd>If the pool currently has more than corePoolSize threads,
0085:         * excess threads will be terminated if they have been idle for more
0086:         * than the keepAliveTime (see {@link
0087:         * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of
0088:         * reducing resource consumption when the pool is not being actively
0089:         * used. If the pool becomes more active later, new threads will be
0090:         * constructed. This parameter can also be changed dynamically
0091:         * using method {@link ThreadPoolExecutor#setKeepAliveTime}. Using
0092:         * a value of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS}
0093:         * effectively disables idle threads from ever terminating prior
0094:         * to shut down.
0095:         * </dd>
0096:         *
0097:         * <dt>Queuing</dt>
0098:         *
0099:         * <dd>Any {@link BlockingQueue} may be used to transfer and hold
0100:         * submitted tasks.  The use of this queue interacts with pool sizing:
0101:         *
0102:         * <ul>
0103:         *
0104:         * <li> If fewer than corePoolSize threads are running, the Executor
0105:         * always prefers adding a new thread
0106:         * rather than queuing.</li>
0107:         *
0108:         * <li> If corePoolSize or more threads are running, the Executor
0109:         * always prefers queuing a request rather than adding a new
0110:         * thread.</li>
0111:         * 
0112:         * <li> If a request cannot be queued, a new thread is created unless
0113:         * this would exceed maximumPoolSize, in which case, the task will be
0114:         * rejected.</li>
0115:         *
0116:         * </ul>
0117:         *
0118:         * There are three general strategies for queuing:
0119:         * <ol>
0120:         *
0121:         * <li> <em> Direct handoffs.</em> A good default choice for a work
0122:         * queue is a {@link SynchronousQueue} that hands off tasks to threads
0123:         * without otherwise holding them. Here, an attempt to queue a task
0124:         * will fail if no threads are immediately available to run it, so a
0125:         * new thread will be constructed. This policy avoids lockups when
0126:         * handling sets of requests that might have internal dependencies.
0127:         * Direct handoffs generally require unbounded maximumPoolSizes to
0128:         * avoid rejection of new submitted tasks. This in turn admits the
0129:         * possibility of unbounded thread growth when commands continue to
0130:         * arrive on average faster than they can be processed.  </li>
0131:         *
0132:         * <li><em> Unbounded queues.</em> Using an unbounded queue (for
0133:         * example a {@link LinkedBlockingQueue} without a predefined
0134:         * capacity) will cause new tasks to be queued in cases where all
0135:         * corePoolSize threads are busy. Thus, no more than corePoolSize
0136:         * threads will ever be created. (And the value of the maximumPoolSize
0137:         * therefore doesn't have any effect.)  This may be appropriate when
0138:         * each task is completely independent of others, so tasks cannot
0139:         * affect each others execution; for example, in a web page server.
0140:         * While this style of queuing can be useful in smoothing out
0141:         * transient bursts of requests, it admits the possibility of
0142:         * unbounded work queue growth when commands continue to arrive on
0143:         * average faster than they can be processed.  </li>
0144:         *
0145:         * <li><em>Bounded queues.</em> A bounded queue (for example, an
0146:         * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
0147:         * used with finite maximumPoolSizes, but can be more difficult to
0148:         * tune and control.  Queue sizes and maximum pool sizes may be traded
0149:         * off for each other: Using large queues and small pools minimizes
0150:         * CPU usage, OS resources, and context-switching overhead, but can
0151:         * lead to artificially low throughput.  If tasks frequently block (for
0152:         * example if they are I/O bound), a system may be able to schedule
0153:         * time for more threads than you otherwise allow. Use of small queues
0154:         * generally requires larger pool sizes, which keeps CPUs busier but
0155:         * may encounter unacceptable scheduling overhead, which also
0156:         * decreases throughput.  </li>
0157:         *
0158:         * </ol>
0159:         *
0160:         * </dd>
0161:         *
0162:         * <dt>Rejected tasks</dt>
0163:         *
0164:         * <dd> New tasks submitted in method {@link
0165:         * ThreadPoolExecutor#execute} will be <em>rejected</em> when the
0166:         * Executor has been shut down, and also when the Executor uses finite
0167:         * bounds for both maximum threads and work queue capacity, and is
0168:         * saturated.  In either case, the <tt>execute</tt> method invokes the
0169:         * {@link RejectedExecutionHandler#rejectedExecution} method of its
0170:         * {@link RejectedExecutionHandler}.  Four predefined handler policies
0171:         * are provided:
0172:         *
0173:         * <ol>
0174:         *
0175:         * <li> In the
0176:         * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a
0177:         * runtime {@link RejectedExecutionException} upon rejection. </li>
0178:         * 
0179:         * <li> In {@link
0180:         * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes
0181:         * <tt>execute</tt> itself runs the task. This provides a simple
0182:         * feedback control mechanism that will slow down the rate that new
0183:         * tasks are submitted. </li>
0184:         *
0185:         * <li> In {@link ThreadPoolExecutor.DiscardPolicy},
0186:         * a task that cannot be executed is simply dropped.  </li>
0187:         *
0188:         * <li>In {@link
0189:         * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not
0190:         * shut down, the task at the head of the work queue is dropped, and
0191:         * then execution is retried (which can fail again, causing this to be
0192:         * repeated.) </li>
0193:         *
0194:         * </ol>
0195:         *
0196:         * It is possible to define and use other kinds of {@link
0197:         * RejectedExecutionHandler} classes. Doing so requires some care
0198:         * especially when policies are designed to work only under particular
0199:         * capacity or queuing policies. </dd>
0200:         *
0201:         * <dt>Hook methods</dt>
0202:         *
0203:         * <dd>This class provides <tt>protected</tt> overridable {@link
0204:         * ThreadPoolExecutor#beforeExecute} and {@link
0205:         * ThreadPoolExecutor#afterExecute} methods that are called before and
0206:         * after execution of each task.  These can be used to manipulate the
0207:         * execution environment, for example, reinitializing ThreadLocals,
0208:         * gathering statistics, or adding log entries. Additionally, method
0209:         * {@link ThreadPoolExecutor#terminated} can be overridden to perform
0210:         * any special processing that needs to be done once the Executor has
0211:         * fully terminated.</dd>
0212:         *
0213:         * <dt>Queue maintenance</dt>
0214:         *
0215:         * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to
0216:         * the work queue for purposes of monitoring and debugging.  Use of
0217:         * this method for any other purpose is strongly discouraged.  Two
0218:         * supplied methods, {@link ThreadPoolExecutor#remove} and {@link
0219:         * ThreadPoolExecutor#purge} are available to assist in storage
0220:         * reclamation when large numbers of queued tasks become
0221:         * cancelled.</dd> </dl>
0222:         *
0223:         * <p> <b>Extension example</b>. Most extensions of this class
0224:         * override one or more of the protected hook methods. For example,
0225:         * here is a subclass that adds a simple pause/resume feature:
0226:         *
0227:         * <pre>
0228:         * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
0229:         *   private boolean isPaused;
0230:         *   private ReentrantLock pauseLock = new ReentrantLock();
0231:         *   private Condition unpaused = pauseLock.newCondition();
0232:         *
0233:         *   public PausableThreadPoolExecutor(...) { super(...); }
0234:         * 
0235:         *   protected void beforeExecute(Thread t, Runnable r) {
0236:         *     super.beforeExecute(t, r);
0237:         *     pauseLock.lock();
0238:         *     try {
0239:         *       while (isPaused) unpaused.await();
0240:         *     } catch(InterruptedException ie) {
0241:         *       t.interrupt();
0242:         *     } finally {
0243:         *       pauseLock.unlock();
0244:         *     }
0245:         *   }
0246:         * 
0247:         *   public void pause() {
0248:         *     pauseLock.lock();
0249:         *     try {
0250:         *       isPaused = true;
0251:         *     } finally {
0252:         *       pauseLock.unlock();
0253:         *     }
0254:         *   }
0255:         * 
0256:         *   public void resume() {
0257:         *     pauseLock.lock();
0258:         *     try {
0259:         *       isPaused = false;
0260:         *       unpaused.signalAll();
0261:         *     } finally {
0262:         *       pauseLock.unlock();
0263:         *     }
0264:         *   }
0265:         * }
0266:         * </pre>
0267:         * @since 1.5
0268:         * @author Doug Lea
0269:         */
0270:        public class ThreadPoolExecutor extends AbstractExecutorService {
0271:            /**
0272:             * Only used to force toArray() to produce a Runnable[].
0273:             */
0274:            private static final Runnable[] EMPTY_RUNNABLE_ARRAY = new Runnable[0];
0275:
0276:            /**
0277:             * Permission for checking shutdown
0278:             */
0279:            private static final RuntimePermission shutdownPerm = new RuntimePermission(
0280:                    "modifyThread");
0281:
0282:            /**
0283:             * Queue used for holding tasks and handing off to worker threads.
0284:             */
0285:            private final BlockingQueue<Runnable> workQueue;
0286:
0287:            /**
0288:             * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
0289:             * workers set.
0290:             */
0291:            private final ReentrantLock mainLock = new ReentrantLock();
0292:
0293:            /**
0294:             * Wait condition to support awaitTermination
0295:             */
0296:            private final Condition termination = mainLock.newCondition();
0297:
0298:            /**
0299:             * Set containing all worker threads in pool.
0300:             */
0301:            private final HashSet<Worker> workers = new HashSet<Worker>();
0302:
0303:            /**
0304:             * Timeout in nanoseconds for idle threads waiting for work.
0305:             * Threads use this timeout only when there are more than
0306:             * corePoolSize present. Otherwise they wait forever for new work.
0307:             */
0308:            private volatile long keepAliveTime;
0309:
0310:            /**
0311:             * Core pool size, updated only while holding mainLock,
0312:             * but volatile to allow concurrent readability even
0313:             * during updates.
0314:             */
0315:            private volatile int corePoolSize;
0316:
0317:            /**
0318:             * Maximum pool size, updated only while holding mainLock
0319:             * but volatile to allow concurrent readability even
0320:             * during updates.
0321:             */
0322:            private volatile int maximumPoolSize;
0323:
0324:            /**
0325:             * Current pool size, updated only while holding mainLock
0326:             * but volatile to allow concurrent readability even
0327:             * during updates.
0328:             */
0329:            private volatile int poolSize;
0330:
0331:            /**
0332:             * Lifecycle state
0333:             */
0334:            volatile int runState;
0335:
0336:            // Special values for runState
0337:            /** Normal, not-shutdown mode */
0338:            static final int RUNNING = 0;
0339:            /** Controlled shutdown mode */
0340:            static final int SHUTDOWN = 1;
0341:            /** Immediate shutdown mode */
0342:            static final int STOP = 2;
0343:            /** Final state */
0344:            static final int TERMINATED = 3;
0345:
0346:            /**
0347:             * Handler called when saturated or shutdown in execute.
0348:             */
0349:            private volatile RejectedExecutionHandler handler;
0350:
0351:            /**
0352:             * Factory for new threads.
0353:             */
0354:            private volatile ThreadFactory threadFactory;
0355:
0356:            /**
0357:             * Tracks largest attained pool size.
0358:             */
0359:            private int largestPoolSize;
0360:
0361:            /**
0362:             * Counter for completed tasks. Updated only on termination of
0363:             * worker threads.
0364:             */
0365:            private long completedTaskCount;
0366:
0367:            /**
0368:             * The default rejected execution handler
0369:             */
0370:            private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
0371:
0372:            /**
0373:             * Invoke the rejected execution handler for the given command.
0374:             */
0375:            void reject(Runnable command) {
0376:                handler.rejectedExecution(command, this );
0377:            }
0378:
0379:            /**
0380:             * Create and return a new thread running firstTask as its first
0381:             * task. Call only while holding mainLock
0382:             * @param firstTask the task the new thread should run first (or
0383:             * null if none)
0384:             * @return the new thread
0385:             */
0386:            private Thread addThread(Runnable firstTask) {
0387:                Worker w = new Worker(firstTask);
0388:                Thread t = threadFactory.newThread(w);
0389:                w.thread = t;
0390:                workers.add(w);
0391:                int nt = ++poolSize;
0392:                if (nt > largestPoolSize)
0393:                    largestPoolSize = nt;
0394:                return t;
0395:            }
0396:
0397:            /**
0398:             * Create and start a new thread running firstTask as its first
0399:             * task, only if fewer than corePoolSize threads are running.
0400:             * @param firstTask the task the new thread should run first (or
0401:             * null if none)
0402:             * @return true if successful.
0403:             */
0404:            private boolean addIfUnderCorePoolSize(Runnable firstTask) {
0405:                Thread t = null;
0406:                final ReentrantLock mainLock = this .mainLock;
0407:                mainLock.lock();
0408:                try {
0409:                    if (poolSize < corePoolSize)
0410:                        t = addThread(firstTask);
0411:                } finally {
0412:                    mainLock.unlock();
0413:                }
0414:                if (t == null)
0415:                    return false;
0416:                t.start();
0417:                return true;
0418:            }
0419:
0420:            /**
0421:             * Create and start a new thread only if fewer than maximumPoolSize
0422:             * threads are running.  The new thread runs as its first task the
0423:             * next task in queue, or if there is none, the given task.
0424:             * @param firstTask the task the new thread should run first (or
0425:             * null if none)
0426:             * @return null on failure, else the first task to be run by new thread.
0427:             */
0428:            private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
0429:                Thread t = null;
0430:                Runnable next = null;
0431:                final ReentrantLock mainLock = this .mainLock;
0432:                mainLock.lock();
0433:                try {
0434:                    if (poolSize < maximumPoolSize) {
0435:                        next = workQueue.poll();
0436:                        if (next == null)
0437:                            next = firstTask;
0438:                        t = addThread(next);
0439:                    }
0440:                } finally {
0441:                    mainLock.unlock();
0442:                }
0443:                if (t == null)
0444:                    return null;
0445:                t.start();
0446:                return next;
0447:            }
0448:
0449:            /**
0450:             * Get the next task for a worker thread to run.
0451:             * @return the task
0452:             * @throws InterruptedException if interrupted while waiting for task
0453:             */
0454:            Runnable getTask() throws InterruptedException {
0455:                for (;;) {
0456:                    switch (runState) {
0457:                    case RUNNING: {
0458:                        if (poolSize <= corePoolSize) // untimed wait if core
0459:                            return workQueue.take();
0460:
0461:                        long timeout = keepAliveTime;
0462:                        if (timeout <= 0) // die immediately for 0 timeout
0463:                            return null;
0464:                        Runnable r = workQueue.poll(timeout,
0465:                                TimeUnit.NANOSECONDS);
0466:                        if (r != null)
0467:                            return r;
0468:                        if (poolSize > corePoolSize) // timed out
0469:                            return null;
0470:                        // else, after timeout, pool shrank so shouldn't die, so retry
0471:                        break;
0472:                    }
0473:
0474:                    case SHUTDOWN: {
0475:                        // Help drain queue 
0476:                        Runnable r = workQueue.poll();
0477:                        if (r != null)
0478:                            return r;
0479:
0480:                        // Check if can terminate
0481:                        if (workQueue.isEmpty()) {
0482:                            interruptIdleWorkers();
0483:                            return null;
0484:                        }
0485:
0486:                        // There could still be delayed tasks in queue.
0487:                        // Wait for one, re-checking state upon interruption
0488:                        try {
0489:                            return workQueue.take();
0490:                        } catch (InterruptedException ignore) {
0491:                        }
0492:                        break;
0493:                    }
0494:
0495:                    case STOP:
0496:                        return null;
0497:                    default:
0498:                        assert false;
0499:                    }
0500:                }
0501:            }
0502:
0503:            /**
0504:             * Wake up all threads that might be waiting for tasks.
0505:             */
0506:            void interruptIdleWorkers() {
0507:                final ReentrantLock mainLock = this .mainLock;
0508:                mainLock.lock();
0509:                try {
0510:                    for (Worker w : workers)
0511:                        w.interruptIfIdle();
0512:                } finally {
0513:                    mainLock.unlock();
0514:                }
0515:            }
0516:
0517:            /**
0518:             * Perform bookkeeping for a terminated worker thread.
0519:             * @param w the worker
0520:             */
0521:            void workerDone(Worker w) {
0522:                final ReentrantLock mainLock = this .mainLock;
0523:                mainLock.lock();
0524:                try {
0525:                    completedTaskCount += w.completedTasks;
0526:                    workers.remove(w);
0527:                    if (--poolSize > 0)
0528:                        return;
0529:
0530:                    // Else, this is the last thread. Deal with potential shutdown.
0531:
0532:                    int state = runState;
0533:                    assert state != TERMINATED;
0534:
0535:                    if (state != STOP) {
0536:                        // If there are queued tasks but no threads, create
0537:                        // replacement.
0538:                        Runnable r = workQueue.poll();
0539:                        if (r != null) {
0540:                            addThread(r).start();
0541:                            return;
0542:                        }
0543:
0544:                        // If there are some (presumably delayed) tasks but
0545:                        // none pollable, create an idle replacement to wait.
0546:                        if (!workQueue.isEmpty()) {
0547:                            addThread(null).start();
0548:                            return;
0549:                        }
0550:
0551:                        // Otherwise, we can exit without replacement
0552:                        if (state == RUNNING)
0553:                            return;
0554:                    }
0555:
0556:                    // Either state is STOP, or state is SHUTDOWN and there is
0557:                    // no work to do. So we can terminate.
0558:                    termination.signalAll();
0559:                    runState = TERMINATED;
0560:                    // fall through to call terminate() outside of lock.
0561:                } finally {
0562:                    mainLock.unlock();
0563:                }
0564:
0565:                assert runState == TERMINATED;
0566:                terminated();
0567:            }
0568:
0569:            /**
0570:             *  Worker threads
0571:             */
0572:            private class Worker implements  Runnable {
0573:
0574:                /**
0575:                 * The runLock is acquired and released surrounding each task
0576:                 * execution. It mainly protects against interrupts that are
0577:                 * intended to cancel the worker thread from instead
0578:                 * interrupting the task being run.
0579:                 */
0580:                private final ReentrantLock runLock = new ReentrantLock();
0581:
0582:                /**
0583:                 * Initial task to run before entering run loop
0584:                 */
0585:                private Runnable firstTask;
0586:
0587:                /**
0588:                 * Per thread completed task counter; accumulated
0589:                 * into completedTaskCount upon termination.
0590:                 */
0591:                volatile long completedTasks;
0592:
0593:                /**
0594:                 * Thread this worker is running in.  Acts as a final field,
0595:                 * but cannot be set until thread is created.
0596:                 */
0597:                Thread thread;
0598:
0599:                Worker(Runnable firstTask) {
0600:                    this .firstTask = firstTask;
0601:                }
0602:
0603:                boolean isActive() {
0604:                    return runLock.isLocked();
0605:                }
0606:
0607:                /**
0608:                 * Interrupt thread if not running a task
0609:                 */
0610:                void interruptIfIdle() {
0611:                    final ReentrantLock runLock = this .runLock;
0612:                    if (runLock.tryLock()) {
0613:                        try {
0614:                            thread.interrupt();
0615:                        } finally {
0616:                            runLock.unlock();
0617:                        }
0618:                    }
0619:                }
0620:
0621:                /**
0622:                 * Cause thread to die even if running a task.
0623:                 */
0624:                void interruptNow() {
0625:                    thread.interrupt();
0626:                }
0627:
0628:                /**
0629:                 * Run a single task between before/after methods.
0630:                 */
0631:                private void runTask(Runnable task) {
0632:                    final ReentrantLock runLock = this .runLock;
0633:                    runLock.lock();
0634:                    try {
0635:                        // Abort now if immediate cancel.  Otherwise, we have
0636:                        // committed to run this task.
0637:                        if (runState == STOP)
0638:                            return;
0639:
0640:                        Thread.interrupted(); // clear interrupt status on entry
0641:                        boolean ran = false;
0642:                        beforeExecute(thread, task);
0643:                        try {
0644:                            task.run();
0645:                            ran = true;
0646:                            afterExecute(task, null);
0647:                            ++completedTasks;
0648:                        } catch (RuntimeException ex) {
0649:                            if (!ran)
0650:                                afterExecute(task, ex);
0651:                            // Else the exception occurred within
0652:                            // afterExecute itself in which case we don't
0653:                            // want to call it again.
0654:                            throw ex;
0655:                        }
0656:                    } finally {
0657:                        runLock.unlock();
0658:                    }
0659:                }
0660:
0661:                /**
0662:                 * Main run loop
0663:                 */
0664:                public void run() {
0665:                    try {
0666:                        Runnable task = firstTask;
0667:                        firstTask = null;
0668:                        while (task != null || (task = getTask()) != null) {
0669:                            runTask(task);
0670:                            task = null; // unnecessary but can help GC
0671:                        }
0672:                    } catch (InterruptedException ie) {
0673:                        // fall through
0674:                    } finally {
0675:                        workerDone(this );
0676:                    }
0677:                }
0678:            }
0679:
0680:            // Public methods
0681:
0682:            /**
0683:             * Creates a new <tt>ThreadPoolExecutor</tt> with the given
0684:             * initial parameters and default thread factory and handler.  It
0685:             * may be more convenient to use one of the {@link Executors}
0686:             * factory methods instead of this general purpose constructor.
0687:             *
0688:             * @param corePoolSize the number of threads to keep in the
0689:             * pool, even if they are idle.
0690:             * @param maximumPoolSize the maximum number of threads to allow in the
0691:             * pool.
0692:             * @param keepAliveTime when the number of threads is greater than
0693:             * the core, this is the maximum time that excess idle threads
0694:             * will wait for new tasks before terminating.
0695:             * @param unit the time unit for the keepAliveTime
0696:             * argument.
0697:             * @param workQueue the queue to use for holding tasks before they
0698:             * are executed. This queue will hold only the <tt>Runnable</tt>
0699:             * tasks submitted by the <tt>execute</tt> method.
0700:             * @throws IllegalArgumentException if corePoolSize, or
0701:             * keepAliveTime less than zero, or if maximumPoolSize less than or
0702:             * equal to zero, or if corePoolSize greater than maximumPoolSize.
0703:             * @throws NullPointerException if <tt>workQueue</tt> is null
0704:             */
0705:            public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
0706:                    long keepAliveTime, TimeUnit unit,
0707:                    BlockingQueue<Runnable> workQueue) {
0708:                this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
0709:                        workQueue, Executors.defaultThreadFactory(),
0710:                        defaultHandler);
0711:            }
0712:
0713:            /**
0714:             * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
0715:             * parameters.
0716:             *
0717:             * @param corePoolSize the number of threads to keep in the
0718:             * pool, even if they are idle.
0719:             * @param maximumPoolSize the maximum number of threads to allow in the
0720:             * pool.
0721:             * @param keepAliveTime when the number of threads is greater than
0722:             * the core, this is the maximum time that excess idle threads
0723:             * will wait for new tasks before terminating.
0724:             * @param unit the time unit for the keepAliveTime
0725:             * argument.
0726:             * @param workQueue the queue to use for holding tasks before they
0727:             * are executed. This queue will hold only the <tt>Runnable</tt>
0728:             * tasks submitted by the <tt>execute</tt> method.
0729:             * @param threadFactory the factory to use when the executor
0730:             * creates a new thread.
0731:             * @throws IllegalArgumentException if corePoolSize, or
0732:             * keepAliveTime less than zero, or if maximumPoolSize less than or
0733:             * equal to zero, or if corePoolSize greater than maximumPoolSize.
0734:             * @throws NullPointerException if <tt>workQueue</tt>
0735:             * or <tt>threadFactory</tt> are null.
0736:             */
0737:            public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
0738:                    long keepAliveTime, TimeUnit unit,
0739:                    BlockingQueue<Runnable> workQueue,
0740:                    ThreadFactory threadFactory) {
0741:                this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
0742:                        workQueue, threadFactory, defaultHandler);
0743:            }
0744:
0745:            /**
0746:             * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
0747:             * parameters.
0748:             *
0749:             * @param corePoolSize the number of threads to keep in the
0750:             * pool, even if they are idle.
0751:             * @param maximumPoolSize the maximum number of threads to allow in the
0752:             * pool.
0753:             * @param keepAliveTime when the number of threads is greater than
0754:             * the core, this is the maximum time that excess idle threads
0755:             * will wait for new tasks before terminating.
0756:             * @param unit the time unit for the keepAliveTime
0757:             * argument.
0758:             * @param workQueue the queue to use for holding tasks before they
0759:             * are executed. This queue will hold only the <tt>Runnable</tt>
0760:             * tasks submitted by the <tt>execute</tt> method.
0761:             * @param handler the handler to use when execution is blocked
0762:             * because the thread bounds and queue capacities are reached.
0763:             * @throws IllegalArgumentException if corePoolSize, or
0764:             * keepAliveTime less than zero, or if maximumPoolSize less than or
0765:             * equal to zero, or if corePoolSize greater than maximumPoolSize.
0766:             * @throws NullPointerException if <tt>workQueue</tt>
0767:             * or  <tt>handler</tt> are null.
0768:             */
0769:            public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
0770:                    long keepAliveTime, TimeUnit unit,
0771:                    BlockingQueue<Runnable> workQueue,
0772:                    RejectedExecutionHandler handler) {
0773:                this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
0774:                        workQueue, Executors.defaultThreadFactory(), handler);
0775:            }
0776:
0777:            /**
0778:             * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
0779:             * parameters.
0780:             *
0781:             * @param corePoolSize the number of threads to keep in the
0782:             * pool, even if they are idle.
0783:             * @param maximumPoolSize the maximum number of threads to allow in the
0784:             * pool.
0785:             * @param keepAliveTime when the number of threads is greater than
0786:             * the core, this is the maximum time that excess idle threads
0787:             * will wait for new tasks before terminating.
0788:             * @param unit the time unit for the keepAliveTime
0789:             * argument.
0790:             * @param workQueue the queue to use for holding tasks before they
0791:             * are executed. This queue will hold only the <tt>Runnable</tt>
0792:             * tasks submitted by the <tt>execute</tt> method.
0793:             * @param threadFactory the factory to use when the executor
0794:             * creates a new thread.
0795:             * @param handler the handler to use when execution is blocked
0796:             * because the thread bounds and queue capacities are reached.
0797:             * @throws IllegalArgumentException if corePoolSize, or
0798:             * keepAliveTime less than zero, or if maximumPoolSize less than or
0799:             * equal to zero, or if corePoolSize greater than maximumPoolSize.
0800:             * @throws NullPointerException if <tt>workQueue</tt>
0801:             * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
0802:             */
0803:            public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
0804:                    long keepAliveTime, TimeUnit unit,
0805:                    BlockingQueue<Runnable> workQueue,
0806:                    ThreadFactory threadFactory,
0807:                    RejectedExecutionHandler handler) {
0808:                if (corePoolSize < 0 || maximumPoolSize <= 0
0809:                        || maximumPoolSize < corePoolSize || keepAliveTime < 0)
0810:                    throw new IllegalArgumentException();
0811:                if (workQueue == null || threadFactory == null
0812:                        || handler == null)
0813:                    throw new NullPointerException();
0814:                this .corePoolSize = corePoolSize;
0815:                this .maximumPoolSize = maximumPoolSize;
0816:                this .workQueue = workQueue;
0817:                this .keepAliveTime = unit.toNanos(keepAliveTime);
0818:                this .threadFactory = threadFactory;
0819:                this .handler = handler;
0820:            }
0821:
0822:            /**
0823:             * Executes the given task sometime in the future.  The task
0824:             * may execute in a new thread or in an existing pooled thread.
0825:             *
0826:             * If the task cannot be submitted for execution, either because this
0827:             * executor has been shutdown or because its capacity has been reached,
0828:             * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
0829:             *
0830:             * @param command the task to execute
0831:             * @throws RejectedExecutionException at discretion of
0832:             * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
0833:             * for execution
0834:             * @throws NullPointerException if command is null
0835:             */
0836:            public void execute(Runnable command) {
0837:                if (command == null)
0838:                    throw new NullPointerException();
0839:                for (;;) {
0840:                    if (runState != RUNNING) {
0841:                        reject(command);
0842:                        return;
0843:                    }
0844:                    if (poolSize < corePoolSize
0845:                            && addIfUnderCorePoolSize(command))
0846:                        return;
0847:                    if (workQueue.offer(command))
0848:                        return;
0849:                    Runnable r = addIfUnderMaximumPoolSize(command);
0850:                    if (r == command)
0851:                        return;
0852:                    if (r == null) {
0853:                        reject(command);
0854:                        return;
0855:                    }
0856:                    // else retry
0857:                }
0858:            }
0859:
0860:            /**
0861:             * Initiates an orderly shutdown in which previously submitted
0862:             * tasks are executed, but no new tasks will be
0863:             * accepted. Invocation has no additional effect if already shut
0864:             * down.
0865:             * @throws SecurityException if a security manager exists and
0866:             * shutting down this ExecutorService may manipulate threads that
0867:             * the caller is not permitted to modify because it does not hold
0868:             * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
0869:             * or the security manager's <tt>checkAccess</tt>  method denies access.
0870:             */
0871:            public void shutdown() {
0872:                // Fail if caller doesn't have modifyThread permission
0873:                SecurityManager security = System.getSecurityManager();
0874:                if (security != null)
0875:                    java.security.AccessController
0876:                            .checkPermission(shutdownPerm);
0877:
0878:                boolean fullyTerminated = false;
0879:                final ReentrantLock mainLock = this .mainLock;
0880:                mainLock.lock();
0881:                try {
0882:                    if (workers.size() > 0) {
0883:                        // Check if caller can modify worker threads.  This
0884:                        // might not be true even if passed above check, if
0885:                        // the SecurityManager treats some threads specially.
0886:                        if (security != null) {
0887:                            for (Worker w : workers)
0888:                                security.checkAccess(w.thread);
0889:                        }
0890:
0891:                        int state = runState;
0892:                        if (state == RUNNING) // don't override shutdownNow
0893:                            runState = SHUTDOWN;
0894:
0895:                        try {
0896:                            for (Worker w : workers)
0897:                                w.interruptIfIdle();
0898:                        } catch (SecurityException se) {
0899:                            // If SecurityManager allows above checks, but
0900:                            // then unexpectedly throws exception when
0901:                            // interrupting threads (which it ought not do),
0902:                            // back out as cleanly as we can. Some threads may
0903:                            // have been killed but we remain in non-shutdown
0904:                            // state.
0905:                            runState = state;
0906:                            throw se;
0907:                        }
0908:                    } else { // If no workers, trigger full termination now
0909:                        fullyTerminated = true;
0910:                        runState = TERMINATED;
0911:                        termination.signalAll();
0912:                    }
0913:                } finally {
0914:                    mainLock.unlock();
0915:                }
0916:                if (fullyTerminated)
0917:                    terminated();
0918:            }
0919:
0920:            /**
0921:             * Attempts to stop all actively executing tasks, halts the
0922:             * processing of waiting tasks, and returns a list of the tasks that were
0923:             * awaiting execution. 
0924:             *  
0925:             * <p>This implementation cancels tasks via {@link
0926:             * Thread#interrupt}, so if any tasks mask or fail to respond to
0927:             * interrupts, they may never terminate.
0928:             *
0929:             * @return list of tasks that never commenced execution
0930:             * @throws SecurityException if a security manager exists and
0931:             * shutting down this ExecutorService may manipulate threads that
0932:             * the caller is not permitted to modify because it does not hold
0933:             * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
0934:             * or the security manager's <tt>checkAccess</tt> method denies access.
0935:             */
0936:            public List<Runnable> shutdownNow() {
0937:                // Almost the same code as shutdown()
0938:                SecurityManager security = System.getSecurityManager();
0939:                if (security != null)
0940:                    java.security.AccessController
0941:                            .checkPermission(shutdownPerm);
0942:
0943:                boolean fullyTerminated = false;
0944:                final ReentrantLock mainLock = this .mainLock;
0945:                mainLock.lock();
0946:                try {
0947:                    if (workers.size() > 0) {
0948:                        if (security != null) {
0949:                            for (Worker w : workers)
0950:                                security.checkAccess(w.thread);
0951:                        }
0952:
0953:                        int state = runState;
0954:                        if (state != TERMINATED)
0955:                            runState = STOP;
0956:                        try {
0957:                            for (Worker w : workers)
0958:                                w.interruptNow();
0959:                        } catch (SecurityException se) {
0960:                            runState = state; // back out;
0961:                            throw se;
0962:                        }
0963:                    } else { // If no workers, trigger full termination now
0964:                        fullyTerminated = true;
0965:                        runState = TERMINATED;
0966:                        termination.signalAll();
0967:                    }
0968:                } finally {
0969:                    mainLock.unlock();
0970:                }
0971:                if (fullyTerminated)
0972:                    terminated();
0973:                return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY));
0974:            }
0975:
0976:            public boolean isShutdown() {
0977:                return runState != RUNNING;
0978:            }
0979:
0980:            /** 
0981:             * Returns true if this executor is in the process of terminating
0982:             * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
0983:             * completely terminated.  This method may be useful for
0984:             * debugging. A return of <tt>true</tt> reported a sufficient
0985:             * period after shutdown may indicate that submitted tasks have
0986:             * ignored or suppressed interruption, causing this executor not
0987:             * to properly terminate.
0988:             * @return true if terminating but not yet terminated.
0989:             */
0990:            public boolean isTerminating() {
0991:                return runState == STOP;
0992:            }
0993:
0994:            public boolean isTerminated() {
0995:                return runState == TERMINATED;
0996:            }
0997:
0998:            public boolean awaitTermination(long timeout, TimeUnit unit)
0999:                    throws InterruptedException {
1000:                long nanos = unit.toNanos(timeout);
1001:                final ReentrantLock mainLock = this .mainLock;
1002:                mainLock.lock();
1003:                try {
1004:                    for (;;) {
1005:                        if (runState == TERMINATED)
1006:                            return true;
1007:                        if (nanos <= 0)
1008:                            return false;
1009:                        nanos = termination.awaitNanos(nanos);
1010:                    }
1011:                } finally {
1012:                    mainLock.unlock();
1013:                }
1014:            }
1015:
1016:            /**
1017:             * Invokes <tt>shutdown</tt> when this executor is no longer
1018:             * referenced.
1019:             */
1020:            protected void finalize() {
1021:                shutdown();
1022:            }
1023:
1024:            /**
1025:             * Sets the thread factory used to create new threads.
1026:             *
1027:             * @param threadFactory the new thread factory
1028:             * @throws NullPointerException if threadFactory is null
1029:             * @see #getThreadFactory
1030:             */
1031:            public void setThreadFactory(ThreadFactory threadFactory) {
1032:                if (threadFactory == null)
1033:                    throw new NullPointerException();
1034:                this .threadFactory = threadFactory;
1035:            }
1036:
1037:            /**
1038:             * Returns the thread factory used to create new threads.
1039:             *
1040:             * @return the current thread factory
1041:             * @see #setThreadFactory
1042:             */
1043:            public ThreadFactory getThreadFactory() {
1044:                return threadFactory;
1045:            }
1046:
1047:            /**
1048:             * Sets a new handler for unexecutable tasks.
1049:             *
1050:             * @param handler the new handler
1051:             * @throws NullPointerException if handler is null
1052:             * @see #getRejectedExecutionHandler
1053:             */
1054:            public void setRejectedExecutionHandler(
1055:                    RejectedExecutionHandler handler) {
1056:                if (handler == null)
1057:                    throw new NullPointerException();
1058:                this .handler = handler;
1059:            }
1060:
1061:            /**
1062:             * Returns the current handler for unexecutable tasks.
1063:             *
1064:             * @return the current handler
1065:             * @see #setRejectedExecutionHandler
1066:             */
1067:            public RejectedExecutionHandler getRejectedExecutionHandler() {
1068:                return handler;
1069:            }
1070:
1071:            /**
1072:             * Returns the task queue used by this executor. Access to the
1073:             * task queue is intended primarily for debugging and monitoring.
1074:             * This queue may be in active use.  Retrieving the task queue
1075:             * does not prevent queued tasks from executing.
1076:             *
1077:             * @return the task queue
1078:             */
1079:            public BlockingQueue<Runnable> getQueue() {
1080:                return workQueue;
1081:            }
1082:
1083:            /**
1084:             * Removes this task from the executor's internal queue if it is
1085:             * present, thus causing it not to be run if it has not already
1086:             * started.
1087:             * 
1088:             * <p> This method may be useful as one part of a cancellation
1089:             * scheme.  It may fail to remove tasks that have been converted
1090:             * into other forms before being placed on the internal queue. For
1091:             * example, a task entered using <tt>submit</tt> might be
1092:             * converted into a form that maintains <tt>Future</tt> status.
1093:             * However, in such cases, method {@link ThreadPoolExecutor#purge}
1094:             * may be used to remove those Futures that have been cancelled.
1095:             * 
1096:             *
1097:             * @param task the task to remove
1098:             * @return true if the task was removed
1099:             */
1100:            public boolean remove(Runnable task) {
1101:                return getQueue().remove(task);
1102:            }
1103:
1104:            /**
1105:             * Tries to remove from the work queue all {@link Future}
1106:             * tasks that have been cancelled. This method can be useful as a
1107:             * storage reclamation operation, that has no other impact on
1108:             * functionality. Cancelled tasks are never executed, but may
1109:             * accumulate in work queues until worker threads can actively
1110:             * remove them. Invoking this method instead tries to remove them now.
1111:             * However, this method may fail to remove tasks in
1112:             * the presence of interference by other threads.
1113:             */
1114:            public void purge() {
1115:                // Fail if we encounter interference during traversal
1116:                try {
1117:                    Iterator<Runnable> it = getQueue().iterator();
1118:                    while (it.hasNext()) {
1119:                        Runnable r = it.next();
1120:                        if (r instanceof  Future<?>) {
1121:                            Future<?> c = (Future<?>) r;
1122:                            if (c.isCancelled())
1123:                                it.remove();
1124:                        }
1125:                    }
1126:                } catch (ConcurrentModificationException ex) {
1127:                    return;
1128:                }
1129:            }
1130:
1131:            /**
1132:             * Sets the core number of threads.  This overrides any value set
1133:             * in the constructor.  If the new value is smaller than the
1134:             * current value, excess existing threads will be terminated when
1135:             * they next become idle. If larger, new threads will, if needed,
1136:             * be started to execute any queued tasks.
1137:             *
1138:             * @param corePoolSize the new core size
1139:             * @throws IllegalArgumentException if <tt>corePoolSize</tt>
1140:             * less than zero
1141:             * @see #getCorePoolSize
1142:             */
1143:            public void setCorePoolSize(int corePoolSize) {
1144:                if (corePoolSize < 0)
1145:                    throw new IllegalArgumentException();
1146:                final ReentrantLock mainLock = this .mainLock;
1147:                mainLock.lock();
1148:                try {
1149:                    int extra = this .corePoolSize - corePoolSize;
1150:                    this .corePoolSize = corePoolSize;
1151:                    if (extra < 0) {
1152:                        Runnable r;
1153:                        while (extra++ < 0 && poolSize < corePoolSize
1154:                                && (r = workQueue.poll()) != null)
1155:                            addThread(r).start();
1156:                    } else if (extra > 0 && poolSize > corePoolSize) {
1157:                        Iterator<Worker> it = workers.iterator();
1158:                        while (it.hasNext() && extra-- > 0
1159:                                && poolSize > corePoolSize
1160:                                && workQueue.remainingCapacity() == 0)
1161:                            it.next().interruptIfIdle();
1162:                    }
1163:                } finally {
1164:                    mainLock.unlock();
1165:                }
1166:            }
1167:
1168:            /**
1169:             * Returns the core number of threads.
1170:             *
1171:             * @return the core number of threads
1172:             * @see #setCorePoolSize
1173:             */
1174:            public int getCorePoolSize() {
1175:                return corePoolSize;
1176:            }
1177:
1178:            /**
1179:             * Starts a core thread, causing it to idly wait for work. This
1180:             * overrides the default policy of starting core threads only when
1181:             * new tasks are executed. This method will return <tt>false</tt>
1182:             * if all core threads have already been started.
1183:             * @return true if a thread was started
1184:             */
1185:            public boolean prestartCoreThread() {
1186:                return addIfUnderCorePoolSize(null);
1187:            }
1188:
1189:            /**
1190:             * Starts all core threads, causing them to idly wait for work. This
1191:             * overrides the default policy of starting core threads only when
1192:             * new tasks are executed. 
1193:             * @return the number of threads started.
1194:             */
1195:            public int prestartAllCoreThreads() {
1196:                int n = 0;
1197:                while (addIfUnderCorePoolSize(null))
1198:                    ++n;
1199:                return n;
1200:            }
1201:
1202:            /**
1203:             * Sets the maximum allowed number of threads. This overrides any
1204:             * value set in the constructor. If the new value is smaller than
1205:             * the current value, excess existing threads will be
1206:             * terminated when they next become idle.
1207:             *
1208:             * @param maximumPoolSize the new maximum
1209:             * @throws IllegalArgumentException if maximumPoolSize less than zero or
1210:             * the {@link #getCorePoolSize core pool size}
1211:             * @see #getMaximumPoolSize
1212:             */
1213:            public void setMaximumPoolSize(int maximumPoolSize) {
1214:                if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
1215:                    throw new IllegalArgumentException();
1216:                final ReentrantLock mainLock = this .mainLock;
1217:                mainLock.lock();
1218:                try {
1219:                    int extra = this .maximumPoolSize - maximumPoolSize;
1220:                    this .maximumPoolSize = maximumPoolSize;
1221:                    if (extra > 0 && poolSize > maximumPoolSize) {
1222:                        Iterator<Worker> it = workers.iterator();
1223:                        while (it.hasNext() && extra > 0
1224:                                && poolSize > maximumPoolSize) {
1225:                            it.next().interruptIfIdle();
1226:                            --extra;
1227:                        }
1228:                    }
1229:                } finally {
1230:                    mainLock.unlock();
1231:                }
1232:            }
1233:
1234:            /**
1235:             * Returns the maximum allowed number of threads.
1236:             *
1237:             * @return the maximum allowed number of threads
1238:             * @see #setMaximumPoolSize
1239:             */
1240:            public int getMaximumPoolSize() {
1241:                return maximumPoolSize;
1242:            }
1243:
1244:            /**
1245:             * Sets the time limit for which threads may remain idle before
1246:             * being terminated.  If there are more than the core number of
1247:             * threads currently in the pool, after waiting this amount of
1248:             * time without processing a task, excess threads will be
1249:             * terminated.  This overrides any value set in the constructor.
1250:             * @param time the time to wait.  A time value of zero will cause
1251:             * excess threads to terminate immediately after executing tasks.
1252:             * @param unit  the time unit of the time argument
1253:             * @throws IllegalArgumentException if time less than zero
1254:             * @see #getKeepAliveTime
1255:             */
1256:            public void setKeepAliveTime(long time, TimeUnit unit) {
1257:                if (time < 0)
1258:                    throw new IllegalArgumentException();
1259:                this .keepAliveTime = unit.toNanos(time);
1260:            }
1261:
1262:            /**
1263:             * Returns the thread keep-alive time, which is the amount of time
1264:             * which threads in excess of the core pool size may remain
1265:             * idle before being terminated.
1266:             *
1267:             * @param unit the desired time unit of the result
1268:             * @return the time limit
1269:             * @see #setKeepAliveTime
1270:             */
1271:            public long getKeepAliveTime(TimeUnit unit) {
1272:                return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1273:            }
1274:
1275:            /* Statistics */
1276:
1277:            /**
1278:             * Returns the current number of threads in the pool.
1279:             *
1280:             * @return the number of threads
1281:             */
1282:            public int getPoolSize() {
1283:                return poolSize;
1284:            }
1285:
1286:            /**
1287:             * Returns the approximate number of threads that are actively
1288:             * executing tasks.
1289:             *
1290:             * @return the number of threads
1291:             */
1292:            public int getActiveCount() {
1293:                final ReentrantLock mainLock = this .mainLock;
1294:                mainLock.lock();
1295:                try {
1296:                    int n = 0;
1297:                    for (Worker w : workers) {
1298:                        if (w.isActive())
1299:                            ++n;
1300:                    }
1301:                    return n;
1302:                } finally {
1303:                    mainLock.unlock();
1304:                }
1305:            }
1306:
1307:            /**
1308:             * Returns the largest number of threads that have ever
1309:             * simultaneously been in the pool.
1310:             *
1311:             * @return the number of threads
1312:             */
1313:            public int getLargestPoolSize() {
1314:                final ReentrantLock mainLock = this .mainLock;
1315:                mainLock.lock();
1316:                try {
1317:                    return largestPoolSize;
1318:                } finally {
1319:                    mainLock.unlock();
1320:                }
1321:            }
1322:
1323:            /**
1324:             * Returns the approximate total number of tasks that have been
1325:             * scheduled for execution. Because the states of tasks and
1326:             * threads may change dynamically during computation, the returned
1327:             * value is only an approximation, but one that does not ever
1328:             * decrease across successive calls.
1329:             *
1330:             * @return the number of tasks
1331:             */
1332:            public long getTaskCount() {
1333:                final ReentrantLock mainLock = this .mainLock;
1334:                mainLock.lock();
1335:                try {
1336:                    long n = completedTaskCount;
1337:                    for (Worker w : workers) {
1338:                        n += w.completedTasks;
1339:                        if (w.isActive())
1340:                            ++n;
1341:                    }
1342:                    return n + workQueue.size();
1343:                } finally {
1344:                    mainLock.unlock();
1345:                }
1346:            }
1347:
1348:            /**
1349:             * Returns the approximate total number of tasks that have
1350:             * completed execution. Because the states of tasks and threads
1351:             * may change dynamically during computation, the returned value
1352:             * is only an approximation, but one that does not ever decrease
1353:             * across successive calls.
1354:             *
1355:             * @return the number of tasks
1356:             */
1357:            public long getCompletedTaskCount() {
1358:                final ReentrantLock mainLock = this .mainLock;
1359:                mainLock.lock();
1360:                try {
1361:                    long n = completedTaskCount;
1362:                    for (Worker w : workers)
1363:                        n += w.completedTasks;
1364:                    return n;
1365:                } finally {
1366:                    mainLock.unlock();
1367:                }
1368:            }
1369:
1370:            /**
1371:             * Method invoked prior to executing the given Runnable in the
1372:             * given thread.  This method is invoked by thread <tt>t</tt> that
1373:             * will execute task <tt>r</tt>, and may be used to re-initialize
1374:             * ThreadLocals, or to perform logging. Note: To properly nest
1375:             * multiple overridings, subclasses should generally invoke
1376:             * <tt>super.beforeExecute</tt> at the end of this method.
1377:             *
1378:             * @param t the thread that will run task r.
1379:             * @param r the task that will be executed.
1380:             */
1381:            protected void beforeExecute(Thread t, Runnable r) {
1382:            }
1383:
1384:            /**
1385:             * Method invoked upon completion of execution of the given
1386:             * Runnable.  This method is invoked by the thread that executed
1387:             * the task. If non-null, the Throwable is the uncaught exception
1388:             * that caused execution to terminate abruptly. Note: To properly
1389:             * nest multiple overridings, subclasses should generally invoke
1390:             * <tt>super.afterExecute</tt> at the beginning of this method.
1391:             *
1392:             * @param r the runnable that has completed.
1393:             * @param t the exception that caused termination, or null if
1394:             * execution completed normally.
1395:             */
1396:            protected void afterExecute(Runnable r, Throwable t) {
1397:            }
1398:
1399:            /**
1400:             * Method invoked when the Executor has terminated.  Default
1401:             * implementation does nothing. Note: To properly nest multiple
1402:             * overridings, subclasses should generally invoke
1403:             * <tt>super.terminated</tt> within this method.
1404:             */
1405:            protected void terminated() {
1406:            }
1407:
1408:            /**
1409:             * A handler for rejected tasks that runs the rejected task
1410:             * directly in the calling thread of the <tt>execute</tt> method,
1411:             * unless the executor has been shut down, in which case the task
1412:             * is discarded.
1413:             */
1414:            public static class CallerRunsPolicy implements 
1415:                    RejectedExecutionHandler {
1416:                /**
1417:                 * Creates a <tt>CallerRunsPolicy</tt>.
1418:                 */
1419:                public CallerRunsPolicy() {
1420:                }
1421:
1422:                /**
1423:                 * Executes task r in the caller's thread, unless the executor
1424:                 * has been shut down, in which case the task is discarded.
1425:                 * @param r the runnable task requested to be executed
1426:                 * @param e the executor attempting to execute this task
1427:                 */
1428:                public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1429:                    if (!e.isShutdown()) {
1430:                        r.run();
1431:                    }
1432:                }
1433:            }
1434:
1435:            /**
1436:             * A handler for rejected tasks that throws a
1437:             * <tt>RejectedExecutionException</tt>.
1438:             */
1439:            public static class AbortPolicy implements  RejectedExecutionHandler {
1440:                /**
1441:                 * Creates an <tt>AbortPolicy</tt>.
1442:                 */
1443:                public AbortPolicy() {
1444:                }
1445:
1446:                /**
1447:                 * Always throws RejectedExecutionException.
1448:                 * @param r the runnable task requested to be executed
1449:                 * @param e the executor attempting to execute this task
1450:                 * @throws RejectedExecutionException always.
1451:                 */
1452:                public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1453:                    throw new RejectedExecutionException();
1454:                }
1455:            }
1456:
1457:            /**
1458:             * A handler for rejected tasks that silently discards the
1459:             * rejected task.
1460:             */
1461:            public static class DiscardPolicy implements 
1462:                    RejectedExecutionHandler {
1463:                /**
1464:                 * Creates a <tt>DiscardPolicy</tt>.
1465:                 */
1466:                public DiscardPolicy() {
1467:                }
1468:
1469:                /**
1470:                 * Does nothing, which has the effect of discarding task r.
1471:                 * @param r the runnable task requested to be executed
1472:                 * @param e the executor attempting to execute this task
1473:                 */
1474:                public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1475:                }
1476:            }
1477:
1478:            /**
1479:             * A handler for rejected tasks that discards the oldest unhandled
1480:             * request and then retries <tt>execute</tt>, unless the executor
1481:             * is shut down, in which case the task is discarded.
1482:             */
1483:            public static class DiscardOldestPolicy implements 
1484:                    RejectedExecutionHandler {
1485:                /**
1486:                 * Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
1487:                 */
1488:                public DiscardOldestPolicy() {
1489:                }
1490:
1491:                /**
1492:                 * Obtains and ignores the next task that the executor
1493:                 * would otherwise execute, if one is immediately available,
1494:                 * and then retries execution of task r, unless the executor
1495:                 * is shut down, in which case task r is instead discarded.
1496:                 * @param r the runnable task requested to be executed
1497:                 * @param e the executor attempting to execute this task
1498:                 */
1499:                public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1500:                    if (!e.isShutdown()) {
1501:                        e.getQueue().poll();
1502:                        e.execute(r);
1503:                    }
1504:                }
1505:            }
1506:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.