001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package java.util.concurrent;
008:
009: import java.util.*;
010: import java.util.concurrent.atomic.AtomicInteger;
011: import java.security.AccessControlContext;
012: import java.security.AccessController;
013: import java.security.PrivilegedAction;
014: import java.security.PrivilegedExceptionAction;
015: import java.security.AccessControlException;
016:
017: /**
018: * Factory and utility methods for {@link Executor}, {@link
019: * ExecutorService}, {@link ScheduledExecutorService}, {@link
020: * ThreadFactory}, and {@link Callable} classes defined in this
021: * package. This class supports the following kinds of methods:
022: *
023: * <ul>
024: * <li> Methods that create and return an {@link ExecutorService}
025: * set up with commonly useful configuration settings.
026: * <li> Methods that create and return a {@link ScheduledExecutorService}
027: * set up with commonly useful configuration settings.
028: * <li> Methods that create and return a "wrapped" ExecutorService, that
029: * disables reconfiguration by making implementation-specific methods
030: * inaccessible.
031: * <li> Methods that create and return a {@link ThreadFactory}
032: * that sets newly created threads to a known state.
033: * <li> Methods that create and return a {@link Callable}
034: * out of other closure-like forms, so they can be used
035: * in execution methods requiring <tt>Callable</tt>.
036: * </ul>
037: *
038: * @since 1.5
039: * @author Doug Lea
040: */
041: public class Executors {
042:
043: /**
044: * Creates a thread pool that reuses a fixed set of threads
045: * operating off a shared unbounded queue. If any thread
046: * terminates due to a failure during execution prior to shutdown,
047: * a new one will take its place if needed to execute subsequent
048: * tasks.
049: *
050: * @param nThreads the number of threads in the pool
051: * @return the newly created thread pool
052: */
053: public static ExecutorService newFixedThreadPool(int nThreads) {
054: return new ThreadPoolExecutor(nThreads, nThreads, 0L,
055: TimeUnit.MILLISECONDS,
056: new LinkedBlockingQueue<Runnable>());
057: }
058:
059: /**
060: * Creates a thread pool that reuses a fixed set of threads
061: * operating off a shared unbounded queue, using the provided
062: * ThreadFactory to create new threads when needed.
063: *
064: * @param nThreads the number of threads in the pool
065: * @param threadFactory the factory to use when creating new threads
066: * @return the newly created thread pool
067: */
068: public static ExecutorService newFixedThreadPool(int nThreads,
069: ThreadFactory threadFactory) {
070: return new ThreadPoolExecutor(nThreads, nThreads, 0L,
071: TimeUnit.MILLISECONDS,
072: new LinkedBlockingQueue<Runnable>(), threadFactory);
073: }
074:
075: /**
076: * Creates an Executor that uses a single worker thread operating
077: * off an unbounded queue. (Note however that if this single
078: * thread terminates due to a failure during execution prior to
079: * shutdown, a new one will take its place if needed to execute
080: * subsequent tasks.) Tasks are guaranteed to execute
081: * sequentially, and no more than one task will be active at any
082: * given time. Unlike the otherwise equivalent
083: * <tt>newFixedThreadPool(1)</tt> the returned executor is
084: * guaranteed not to be reconfigurable to use additional threads.
085: *
086: * @return the newly created single-threaded Executor
087: */
088: public static ExecutorService newSingleThreadExecutor() {
089: return new DelegatedExecutorService(new ThreadPoolExecutor(1,
090: 1, 0L, TimeUnit.MILLISECONDS,
091: new LinkedBlockingQueue<Runnable>()));
092: }
093:
094: /**
095: * Creates an Executor that uses a single worker thread operating
096: * off an unbounded queue, and uses the provided ThreadFactory to
097: * create a new thread when needed. Unlike the otherwise
098: * equivalent <tt>newFixedThreadPool(1, threadFactory)</tt> the returned executor
099: * is guaranteed not to be reconfigurable to use additional
100: * threads.
101: *
102: * @param threadFactory the factory to use when creating new
103: * threads
104: *
105: * @return the newly created single-threaded Executor
106: */
107: public static ExecutorService newSingleThreadExecutor(
108: ThreadFactory threadFactory) {
109: return new DelegatedExecutorService(new ThreadPoolExecutor(1,
110: 1, 0L, TimeUnit.MILLISECONDS,
111: new LinkedBlockingQueue<Runnable>(), threadFactory));
112: }
113:
114: /**
115: * Creates a thread pool that creates new threads as needed, but
116: * will reuse previously constructed threads when they are
117: * available. These pools will typically improve the performance
118: * of programs that execute many short-lived asynchronous tasks.
119: * Calls to <tt>execute</tt> will reuse previously constructed
120: * threads if available. If no existing thread is available, a new
121: * thread will be created and added to the pool. Threads that have
122: * not been used for sixty seconds are terminated and removed from
123: * the cache. Thus, a pool that remains idle for long enough will
124: * not consume any resources. Note that pools with similar
125: * properties but different details (for example, timeout parameters)
126: * may be created using {@link ThreadPoolExecutor} constructors.
127: *
128: * @return the newly created thread pool
129: */
130: public static ExecutorService newCachedThreadPool() {
131: return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,
132: TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
133: }
134:
135: /**
136: * Creates a thread pool that creates new threads as needed, but
137: * will reuse previously constructed threads when they are
138: * available, and uses the provided
139: * ThreadFactory to create new threads when needed.
140: * @param threadFactory the factory to use when creating new threads
141: * @return the newly created thread pool
142: */
143: public static ExecutorService newCachedThreadPool(
144: ThreadFactory threadFactory) {
145: return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,
146: TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
147: threadFactory);
148: }
149:
150: /**
151: * Creates a single-threaded executor that can schedule commands
152: * to run after a given delay, or to execute periodically.
153: * (Note however that if this single
154: * thread terminates due to a failure during execution prior to
155: * shutdown, a new one will take its place if needed to execute
156: * subsequent tasks.) Tasks are guaranteed to execute
157: * sequentially, and no more than one task will be active at any
158: * given time. Unlike the otherwise equivalent
159: * <tt>newScheduledThreadPool(1)</tt> the returned executor is
160: * guaranteed not to be reconfigurable to use additional threads.
161: * @return the newly created scheduled executor
162: */
163: public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
164: return new DelegatedScheduledExecutorService(
165: new ScheduledThreadPoolExecutor(1));
166: }
167:
168: /**
169: * Creates a single-threaded executor that can schedule commands
170: * to run after a given delay, or to execute periodically. (Note
171: * however that if this single thread terminates due to a failure
172: * during execution prior to shutdown, a new one will take its
173: * place if needed to execute subsequent tasks.) Tasks are
174: * guaranteed to execute sequentially, and no more than one task
175: * will be active at any given time. Unlike the otherwise
176: * equivalent <tt>newScheduledThreadPool(1, threadFactory)</tt>
177: * the returned executor is guaranteed not to be reconfigurable to
178: * use additional threads.
179: * @param threadFactory the factory to use when creating new
180: * threads
181: * @return a newly created scheduled executor
182: */
183: public static ScheduledExecutorService newSingleThreadScheduledExecutor(
184: ThreadFactory threadFactory) {
185: return new DelegatedScheduledExecutorService(
186: new ScheduledThreadPoolExecutor(1, threadFactory));
187: }
188:
189: /**
190: * Creates a thread pool that can schedule commands to run after a
191: * given delay, or to execute periodically.
192: * @param corePoolSize the number of threads to keep in the pool,
193: * even if they are idle.
194: * @return a newly created scheduled thread pool
195: */
196: public static ScheduledExecutorService newScheduledThreadPool(
197: int corePoolSize) {
198: return new ScheduledThreadPoolExecutor(corePoolSize);
199: }
200:
201: /**
202: * Creates a thread pool that can schedule commands to run after a
203: * given delay, or to execute periodically.
204: * @param corePoolSize the number of threads to keep in the pool,
205: * even if they are idle.
206: * @param threadFactory the factory to use when the executor
207: * creates a new thread.
208: * @return a newly created scheduled thread pool
209: */
210: public static ScheduledExecutorService newScheduledThreadPool(
211: int corePoolSize, ThreadFactory threadFactory) {
212: return new ScheduledThreadPoolExecutor(corePoolSize,
213: threadFactory);
214: }
215:
216: /**
217: * Returns an object that delegates all defined {@link
218: * ExecutorService} methods to the given executor, but not any
219: * other methods that might otherwise be accessible using
220: * casts. This provides a way to safely "freeze" configuration and
221: * disallow tuning of a given concrete implementation.
222: * @param executor the underlying implementation
223: * @return an <tt>ExecutorService</tt> instance
224: * @throws NullPointerException if executor null
225: */
226: public static ExecutorService unconfigurableExecutorService(
227: ExecutorService executor) {
228: if (executor == null)
229: throw new NullPointerException();
230: return new DelegatedExecutorService(executor);
231: }
232:
233: /**
234: * Returns an object that delegates all defined {@link
235: * ScheduledExecutorService} methods to the given executor, but
236: * not any other methods that might otherwise be accessible using
237: * casts. This provides a way to safely "freeze" configuration and
238: * disallow tuning of a given concrete implementation.
239: * @param executor the underlying implementation
240: * @return a <tt>ScheduledExecutorService</tt> instance
241: * @throws NullPointerException if executor null
242: */
243: public static ScheduledExecutorService unconfigurableScheduledExecutorService(
244: ScheduledExecutorService executor) {
245: if (executor == null)
246: throw new NullPointerException();
247: return new DelegatedScheduledExecutorService(executor);
248: }
249:
250: /**
251: * Returns a default thread factory used to create new threads.
252: * This factory creates all new threads used by an Executor in the
253: * same {@link ThreadGroup}. If there is a {@link
254: * java.lang.SecurityManager}, it uses the group of {@link
255: * System#getSecurityManager}, else the group of the thread
256: * invoking this <tt>defaultThreadFactory</tt> method. Each new
257: * thread is created as a non-daemon thread with priority
258: * <tt>Thread.NORM_PRIORITY</tt>. New threads have names
259: * accessible via {@link Thread#getName} of
260: * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
261: * number of this factory, and <em>M</em> is the sequence number
262: * of the thread created by this factory.
263: * @return a thread factory
264: */
265: public static ThreadFactory defaultThreadFactory() {
266: return new DefaultThreadFactory();
267: }
268:
269: /**
270: * Returns a thread factory used to create new threads that
271: * have the same permissions as the current thread.
272: * This factory creates threads with the same settings as {@link
273: * Executors#defaultThreadFactory}, additionally setting the
274: * AccessControlContext and contextClassLoader of new threads to
275: * be the same as the thread invoking this
276: * <tt>privilegedThreadFactory</tt> method. A new
277: * <tt>privilegedThreadFactory</tt> can be created within an
278: * {@link AccessController#doPrivileged} action setting the
279: * current thread's access control context to create threads with
280: * the selected permission settings holding within that action.
281: *
282: * <p> Note that while tasks running within such threads will have
283: * the same access control and class loader settings as the
284: * current thread, they need not have the same {@link
285: * java.lang.ThreadLocal} or {@link
286: * java.lang.InheritableThreadLocal} values. If necessary,
287: * particular values of thread locals can be set or reset before
288: * any task runs in {@link ThreadPoolExecutor} subclasses using
289: * {@link ThreadPoolExecutor#beforeExecute}. Also, if it is
290: * necessary to initialize worker threads to have the same
291: * InheritableThreadLocal settings as some other designated
292: * thread, you can create a custom ThreadFactory in which that
293: * thread waits for and services requests to create others that
294: * will inherit its values.
295: *
296: * @return a thread factory
297: * @throws AccessControlException if the current access control
298: * context does not have permission to both get and set context
299: * class loader.
300: */
301: public static ThreadFactory privilegedThreadFactory() {
302: return new PrivilegedThreadFactory();
303: }
304:
305: /**
306: * Returns a {@link Callable} object that, when
307: * called, runs the given task and returns the given result. This
308: * can be useful when applying methods requiring a
309: * <tt>Callable</tt> to an otherwise resultless action.
310: * @param task the task to run
311: * @param result the result to return
312: * @throws NullPointerException if task null
313: * @return a callable object
314: */
315: public static <T> Callable<T> callable(Runnable task, T result) {
316: if (task == null)
317: throw new NullPointerException();
318: return new RunnableAdapter<T>(task, result);
319: }
320:
321: /**
322: * Returns a {@link Callable} object that, when
323: * called, runs the given task and returns <tt>null</tt>.
324: * @param task the task to run
325: * @return a callable object
326: * @throws NullPointerException if task null
327: */
328: public static Callable<Object> callable(Runnable task) {
329: if (task == null)
330: throw new NullPointerException();
331: return new RunnableAdapter<Object>(task, null);
332: }
333:
334: /**
335: * Returns a {@link Callable} object that, when
336: * called, runs the given privileged action and returns its result.
337: * @param action the privileged action to run
338: * @return a callable object
339: * @throws NullPointerException if action null
340: */
341: public static Callable<Object> callable(PrivilegedAction action) {
342: if (action == null)
343: throw new NullPointerException();
344: return new PrivilegedActionAdapter(action);
345: }
346:
347: /**
348: * Returns a {@link Callable} object that, when
349: * called, runs the given privileged exception action and returns
350: * its result.
351: * @param action the privileged exception action to run
352: * @return a callable object
353: * @throws NullPointerException if action null
354: */
355: public static Callable<Object> callable(
356: PrivilegedExceptionAction action) {
357: if (action == null)
358: throw new NullPointerException();
359: return new PrivilegedExceptionActionAdapter(action);
360: }
361:
362: /**
363: * Returns a {@link Callable} object that will, when
364: * called, execute the given <tt>callable</tt> under the current
365: * access control context. This method should normally be
366: * invoked within an {@link AccessController#doPrivileged} action
367: * to create callables that will, if possible, execute under the
368: * selected permission settings holding within that action; or if
369: * not possible, throw an associated {@link
370: * AccessControlException}.
371: * @param callable the underlying task
372: * @return a callable object
373: * @throws NullPointerException if callable null
374: *
375: */
376: public static <T> Callable<T> privilegedCallable(
377: Callable<T> callable) {
378: if (callable == null)
379: throw new NullPointerException();
380: return new PrivilegedCallable(callable);
381: }
382:
383: /**
384: * Returns a {@link Callable} object that will, when
385: * called, execute the given <tt>callable</tt> under the current
386: * access control context, with the current context class loader
387: * as the context class loader. This method should normally be
388: * invoked within an {@link AccessController#doPrivileged} action
389: * to create callables that will, if possible, execute under the
390: * selected permission settings holding within that action; or if
391: * not possible, throw an associated {@link
392: * AccessControlException}.
393: * @param callable the underlying task
394: *
395: * @return a callable object
396: * @throws NullPointerException if callable null
397: * @throws AccessControlException if the current access control
398: * context does not have permission to both set and get context
399: * class loader.
400: */
401: public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(
402: Callable<T> callable) {
403: if (callable == null)
404: throw new NullPointerException();
405: return new PrivilegedCallableUsingCurrentClassLoader(callable);
406: }
407:
408: // Non-public classes supporting the public methods
409:
410: /**
411: * A callable that runs given task and returns given result
412: */
413: static final class RunnableAdapter<T> implements Callable<T> {
414: final Runnable task;
415: final T result;
416:
417: RunnableAdapter(Runnable task, T result) {
418: this .task = task;
419: this .result = result;
420: }
421:
422: public T call() {
423: task.run();
424: return result;
425: }
426: }
427:
428: /**
429: * A callable that runs given privileged action and returns its result
430: */
431: static final class PrivilegedActionAdapter implements
432: Callable<Object> {
433: PrivilegedActionAdapter(PrivilegedAction action) {
434: this .action = action;
435: }
436:
437: public Object call() {
438: return action.run();
439: }
440:
441: private final PrivilegedAction action;
442: }
443:
444: /**
445: * A callable that runs given privileged exception action and returns its result
446: */
447: static final class PrivilegedExceptionActionAdapter implements
448: Callable<Object> {
449: PrivilegedExceptionActionAdapter(
450: PrivilegedExceptionAction action) {
451: this .action = action;
452: }
453:
454: public Object call() throws Exception {
455: return action.run();
456: }
457:
458: private final PrivilegedExceptionAction action;
459: }
460:
461: /**
462: * A callable that runs under established access control settings
463: */
464: static final class PrivilegedCallable<T> implements Callable<T> {
465: private final AccessControlContext acc;
466: private final Callable<T> task;
467: private T result;
468: private Exception exception;
469:
470: PrivilegedCallable(Callable<T> task) {
471: this .task = task;
472: this .acc = AccessController.getContext();
473: }
474:
475: public T call() throws Exception {
476: AccessController.doPrivileged(new PrivilegedAction() {
477: public Object run() {
478: try {
479: result = task.call();
480: } catch (Exception ex) {
481: exception = ex;
482: }
483: return null;
484: }
485: }, acc);
486: if (exception != null)
487: throw exception;
488: else
489: return result;
490: }
491: }
492:
493: /**
494: * A callable that runs under established access control settings and
495: * current ClassLoader
496: */
497: static final class PrivilegedCallableUsingCurrentClassLoader<T>
498: implements Callable<T> {
499: private final ClassLoader ccl;
500: private final AccessControlContext acc;
501: private final Callable<T> task;
502: private T result;
503: private Exception exception;
504:
505: PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
506: this .task = task;
507: this .ccl = Thread.currentThread().getContextClassLoader();
508: this .acc = AccessController.getContext();
509: acc.checkPermission(new RuntimePermission(
510: "getContextClassLoader"));
511: acc.checkPermission(new RuntimePermission(
512: "setContextClassLoader"));
513: }
514:
515: public T call() throws Exception {
516: AccessController.doPrivileged(new PrivilegedAction() {
517: public Object run() {
518: ClassLoader savedcl = null;
519: Thread t = Thread.currentThread();
520: try {
521: ClassLoader cl = t.getContextClassLoader();
522: if (ccl != cl) {
523: t.setContextClassLoader(ccl);
524: savedcl = cl;
525: }
526: result = task.call();
527: } catch (Exception ex) {
528: exception = ex;
529: } finally {
530: if (savedcl != null)
531: t.setContextClassLoader(savedcl);
532: }
533: return null;
534: }
535: }, acc);
536: if (exception != null)
537: throw exception;
538: else
539: return result;
540: }
541: }
542:
543: /**
544: * The default thread factory
545: */
546: static class DefaultThreadFactory implements ThreadFactory {
547: static final AtomicInteger poolNumber = new AtomicInteger(1);
548: final ThreadGroup group;
549: final AtomicInteger threadNumber = new AtomicInteger(1);
550: final String namePrefix;
551:
552: DefaultThreadFactory() {
553: SecurityManager s = System.getSecurityManager();
554: group = (s != null) ? s.getThreadGroup() : Thread
555: .currentThread().getThreadGroup();
556: namePrefix = "pool-" + poolNumber.getAndIncrement()
557: + "-thread-";
558: }
559:
560: public Thread newThread(Runnable r) {
561: Thread t = new Thread(group, r, namePrefix
562: + threadNumber.getAndIncrement(), 0);
563: if (t.isDaemon())
564: t.setDaemon(false);
565: if (t.getPriority() != Thread.NORM_PRIORITY)
566: t.setPriority(Thread.NORM_PRIORITY);
567: return t;
568: }
569: }
570:
571: /**
572: * Thread factory capturing access control and class loader
573: */
574: static class PrivilegedThreadFactory extends DefaultThreadFactory {
575: private final ClassLoader ccl;
576: private final AccessControlContext acc;
577:
578: PrivilegedThreadFactory() {
579: super ();
580: this .ccl = Thread.currentThread().getContextClassLoader();
581: this .acc = AccessController.getContext();
582: acc.checkPermission(new RuntimePermission(
583: "setContextClassLoader"));
584: }
585:
586: public Thread newThread(final Runnable r) {
587: return super .newThread(new Runnable() {
588: public void run() {
589: AccessController.doPrivileged(
590: new PrivilegedAction() {
591: public Object run() {
592: Thread.currentThread()
593: .setContextClassLoader(ccl);
594: r.run();
595: return null;
596: }
597: }, acc);
598: }
599: });
600: }
601:
602: }
603:
604: /**
605: * A wrapper class that exposes only the ExecutorService methods
606: * of an implementation.
607: */
608: static class DelegatedExecutorService extends
609: AbstractExecutorService {
610: private final ExecutorService e;
611:
612: DelegatedExecutorService(ExecutorService executor) {
613: e = executor;
614: }
615:
616: public void execute(Runnable command) {
617: e.execute(command);
618: }
619:
620: public void shutdown() {
621: e.shutdown();
622: }
623:
624: public List<Runnable> shutdownNow() {
625: return e.shutdownNow();
626: }
627:
628: public boolean isShutdown() {
629: return e.isShutdown();
630: }
631:
632: public boolean isTerminated() {
633: return e.isTerminated();
634: }
635:
636: public boolean awaitTermination(long timeout, TimeUnit unit)
637: throws InterruptedException {
638: return e.awaitTermination(timeout, unit);
639: }
640:
641: public Future<?> submit(Runnable task) {
642: return e.submit(task);
643: }
644:
645: public <T> Future<T> submit(Callable<T> task) {
646: return e.submit(task);
647: }
648:
649: public <T> Future<T> submit(Runnable task, T result) {
650: return e.submit(task, result);
651: }
652:
653: public <T> List<Future<T>> invokeAll(
654: Collection<Callable<T>> tasks)
655: throws InterruptedException {
656: return e.invokeAll(tasks);
657: }
658:
659: public <T> List<Future<T>> invokeAll(
660: Collection<Callable<T>> tasks, long timeout,
661: TimeUnit unit) throws InterruptedException {
662: return e.invokeAll(tasks, timeout, unit);
663: }
664:
665: public <T> T invokeAny(Collection<Callable<T>> tasks)
666: throws InterruptedException, ExecutionException {
667: return e.invokeAny(tasks);
668: }
669:
670: public <T> T invokeAny(Collection<Callable<T>> tasks,
671: long timeout, TimeUnit unit)
672: throws InterruptedException, ExecutionException,
673: TimeoutException {
674: return e.invokeAny(tasks, timeout, unit);
675: }
676: }
677:
678: /**
679: * A wrapper class that exposes only the ExecutorService and
680: * ScheduleExecutor methods of a ScheduledExecutorService implementation.
681: */
682: static class DelegatedScheduledExecutorService extends
683: DelegatedExecutorService implements
684: ScheduledExecutorService {
685: private final ScheduledExecutorService e;
686:
687: DelegatedScheduledExecutorService(
688: ScheduledExecutorService executor) {
689: super (executor);
690: e = executor;
691: }
692:
693: public ScheduledFuture<?> schedule(Runnable command,
694: long delay, TimeUnit unit) {
695: return e.schedule(command, delay, unit);
696: }
697:
698: public <V> ScheduledFuture<V> schedule(Callable<V> callable,
699: long delay, TimeUnit unit) {
700: return e.schedule(callable, delay, unit);
701: }
702:
703: public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
704: long initialDelay, long period, TimeUnit unit) {
705: return e.scheduleAtFixedRate(command, initialDelay, period,
706: unit);
707: }
708:
709: public ScheduledFuture<?> scheduleWithFixedDelay(
710: Runnable command, long initialDelay, long delay,
711: TimeUnit unit) {
712: return e.scheduleWithFixedDelay(command, initialDelay,
713: delay, unit);
714: }
715: }
716:
717: /** Cannot instantiate. */
718: private Executors() {
719: }
720: }
|