001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package java.util.concurrent;
008:
009: import java.util.concurrent.atomic.*;
010: import java.util.*;
011:
012: /**
013: * A {@link ThreadPoolExecutor} that can additionally schedule
014: * commands to run after a given delay, or to execute
015: * periodically. This class is preferable to {@link java.util.Timer}
016: * when multiple worker threads are needed, or when the additional
017: * flexibility or capabilities of {@link ThreadPoolExecutor} (which
018: * this class extends) are required.
019: *
020: * <p> Delayed tasks execute no sooner than they are enabled, but
021: * without any real-time guarantees about when, after they are
022: * enabled, they will commence. Tasks scheduled for exactly the same
023: * execution time are enabled in first-in-first-out (FIFO) order of
024: * submission.
025: *
026: * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
027: * of the inherited tuning methods are not useful for it. In
028: * particular, because it acts as a fixed-sized pool using
029: * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments
030: * to <tt>maximumPoolSize</tt> have no useful effect.
031: *
032: * @since 1.5
033: * @author Doug Lea
034: */
035: public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
036: implements ScheduledExecutorService {
037:
038: /**
039: * False if should cancel/suppress periodic tasks on shutdown.
040: */
041: private volatile boolean continueExistingPeriodicTasksAfterShutdown;
042:
043: /**
044: * False if should cancel non-periodic tasks on shutdown.
045: */
046: private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
047:
048: /**
049: * Sequence number to break scheduling ties, and in turn to
050: * guarantee FIFO order among tied entries.
051: */
052: private static final AtomicLong sequencer = new AtomicLong(0);
053:
054: /** Base of nanosecond timings, to avoid wrapping */
055: private static final long NANO_ORIGIN = System.nanoTime();
056:
057: /**
058: * Returns nanosecond time offset by origin
059: */
060: final long now() {
061: return System.nanoTime() - NANO_ORIGIN;
062: }
063:
064: private class ScheduledFutureTask<V> extends FutureTask<V>
065: implements ScheduledFuture<V> {
066:
067: /** Sequence number to break ties FIFO */
068: private final long sequenceNumber;
069: /** The time the task is enabled to execute in nanoTime units */
070: private long time;
071: /**
072: * Period in nanoseconds for repeating tasks. A positive
073: * value indicates fixed-rate execution. A negative value
074: * indicates fixed-delay execution. A value of 0 indicates a
075: * non-repeating task.
076: */
077: private final long period;
078:
079: /**
080: * Creates a one-shot action with given nanoTime-based trigger time
081: */
082: ScheduledFutureTask(Runnable r, V result, long ns) {
083: super (r, result);
084: this .time = ns;
085: this .period = 0;
086: this .sequenceNumber = sequencer.getAndIncrement();
087: }
088:
089: /**
090: * Creates a periodic action with given nano time and period
091: */
092: ScheduledFutureTask(Runnable r, V result, long ns, long period) {
093: super (r, result);
094: this .time = ns;
095: this .period = period;
096: this .sequenceNumber = sequencer.getAndIncrement();
097: }
098:
099: /**
100: * Creates a one-shot action with given nanoTime-based trigger
101: */
102: ScheduledFutureTask(Callable<V> callable, long ns) {
103: super (callable);
104: this .time = ns;
105: this .period = 0;
106: this .sequenceNumber = sequencer.getAndIncrement();
107: }
108:
109: public long getDelay(TimeUnit unit) {
110: long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
111: return d;
112: }
113:
114: public int compareTo(Object other) {
115: if (other == this ) // compare zero ONLY if same object
116: return 0;
117: ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
118: long diff = time - x.time;
119: if (diff < 0)
120: return -1;
121: else if (diff > 0)
122: return 1;
123: else if (sequenceNumber < x.sequenceNumber)
124: return -1;
125: else
126: return 1;
127: }
128:
129: /**
130: * Returns true if this is a periodic (not a one-shot) action.
131: * @return true if periodic
132: */
133: boolean isPeriodic() {
134: return period != 0;
135: }
136:
137: /**
138: * Run a periodic task
139: */
140: private void runPeriodic() {
141: boolean ok = ScheduledFutureTask.super .runAndReset();
142: boolean down = isShutdown();
143: // Reschedule if not cancelled and not shutdown or policy allows
144: if (ok
145: && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isTerminating()))) {
146: long p = period;
147: if (p > 0)
148: time += p;
149: else
150: time = now() - p;
151: ScheduledThreadPoolExecutor.super .getQueue().add(this );
152: }
153: // This might have been the final executed delayed
154: // task. Wake up threads to check.
155: else if (down)
156: interruptIdleWorkers();
157: }
158:
159: /**
160: * Overrides FutureTask version so as to reset/requeue if periodic.
161: */
162: public void run() {
163: if (isPeriodic())
164: runPeriodic();
165: else
166: ScheduledFutureTask.super .run();
167: }
168: }
169:
170: /**
171: * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
172: */
173: private void delayedExecute(Runnable command) {
174: if (isShutdown()) {
175: reject(command);
176: return;
177: }
178: // Prestart a thread if necessary. We cannot prestart it
179: // running the task because the task (probably) shouldn't be
180: // run yet, so thread will just idle until delay elapses.
181: if (getPoolSize() < getCorePoolSize())
182: prestartCoreThread();
183:
184: super .getQueue().add(command);
185: }
186:
187: /**
188: * Cancel and clear the queue of all tasks that should not be run
189: * due to shutdown policy.
190: */
191: private void cancelUnwantedTasks() {
192: boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
193: boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
194: if (!keepDelayed && !keepPeriodic)
195: super .getQueue().clear();
196: else if (keepDelayed || keepPeriodic) {
197: Object[] entries = super .getQueue().toArray();
198: for (int i = 0; i < entries.length; ++i) {
199: Object e = entries[i];
200: if (e instanceof ScheduledFutureTask) {
201: ScheduledFutureTask<?> t = (ScheduledFutureTask<?>) e;
202: if (t.isPeriodic() ? !keepPeriodic : !keepDelayed)
203: t.cancel(false);
204: }
205: }
206: entries = null;
207: purge();
208: }
209: }
210:
211: public boolean remove(Runnable task) {
212: if (!(task instanceof ScheduledFutureTask))
213: return false;
214: return getQueue().remove(task);
215: }
216:
217: /**
218: * Creates a new ScheduledThreadPoolExecutor with the given core
219: * pool size.
220: *
221: * @param corePoolSize the number of threads to keep in the pool,
222: * even if they are idle.
223: * @throws IllegalArgumentException if corePoolSize less than or
224: * equal to zero
225: */
226: public ScheduledThreadPoolExecutor(int corePoolSize) {
227: super (corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
228: new DelayedWorkQueue());
229: }
230:
231: /**
232: * Creates a new ScheduledThreadPoolExecutor with the given
233: * initial parameters.
234: *
235: * @param corePoolSize the number of threads to keep in the pool,
236: * even if they are idle.
237: * @param threadFactory the factory to use when the executor
238: * creates a new thread.
239: * @throws NullPointerException if threadFactory is null
240: */
241: public ScheduledThreadPoolExecutor(int corePoolSize,
242: ThreadFactory threadFactory) {
243: super (corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
244: new DelayedWorkQueue(), threadFactory);
245: }
246:
247: /**
248: * Creates a new ScheduledThreadPoolExecutor with the given
249: * initial parameters.
250: *
251: * @param corePoolSize the number of threads to keep in the pool,
252: * even if they are idle.
253: * @param handler the handler to use when execution is blocked
254: * because the thread bounds and queue capacities are reached.
255: * @throws NullPointerException if handler is null
256: */
257: public ScheduledThreadPoolExecutor(int corePoolSize,
258: RejectedExecutionHandler handler) {
259: super (corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
260: new DelayedWorkQueue(), handler);
261: }
262:
263: /**
264: * Creates a new ScheduledThreadPoolExecutor with the given
265: * initial parameters.
266: *
267: * @param corePoolSize the number of threads to keep in the pool,
268: * even if they are idle.
269: * @param threadFactory the factory to use when the executor
270: * creates a new thread.
271: * @param handler the handler to use when execution is blocked
272: * because the thread bounds and queue capacities are reached.
273: * @throws NullPointerException if threadFactory or handler is null
274: */
275: public ScheduledThreadPoolExecutor(int corePoolSize,
276: ThreadFactory threadFactory,
277: RejectedExecutionHandler handler) {
278: super (corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
279: new DelayedWorkQueue(), threadFactory, handler);
280: }
281:
282: public ScheduledFuture<?> schedule(Runnable command, long delay,
283: TimeUnit unit) {
284: if (command == null || unit == null)
285: throw new NullPointerException();
286: long triggerTime = now() + unit.toNanos(delay);
287: ScheduledFutureTask<?> t = new ScheduledFutureTask<Boolean>(
288: command, null, triggerTime);
289: delayedExecute(t);
290: return t;
291: }
292:
293: public <V> ScheduledFuture<V> schedule(Callable<V> callable,
294: long delay, TimeUnit unit) {
295: if (callable == null || unit == null)
296: throw new NullPointerException();
297: if (delay < 0)
298: delay = 0;
299: long triggerTime = now() + unit.toNanos(delay);
300: ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable,
301: triggerTime);
302: delayedExecute(t);
303: return t;
304: }
305:
306: public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
307: long initialDelay, long period, TimeUnit unit) {
308: if (command == null || unit == null)
309: throw new NullPointerException();
310: if (period <= 0)
311: throw new IllegalArgumentException();
312: if (initialDelay < 0)
313: initialDelay = 0;
314: long triggerTime = now() + unit.toNanos(initialDelay);
315: ScheduledFutureTask<?> t = new ScheduledFutureTask<Object>(
316: command, null, triggerTime, unit.toNanos(period));
317: delayedExecute(t);
318: return t;
319: }
320:
321: public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
322: long initialDelay, long delay, TimeUnit unit) {
323: if (command == null || unit == null)
324: throw new NullPointerException();
325: if (delay <= 0)
326: throw new IllegalArgumentException();
327: if (initialDelay < 0)
328: initialDelay = 0;
329: long triggerTime = now() + unit.toNanos(initialDelay);
330: ScheduledFutureTask<?> t = new ScheduledFutureTask<Boolean>(
331: command, null, triggerTime, unit.toNanos(-delay));
332: delayedExecute(t);
333: return t;
334: }
335:
336: /**
337: * Execute command with zero required delay. This has effect
338: * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
339: * that inspections of the queue and of the list returned by
340: * <tt>shutdownNow</tt> will access the zero-delayed
341: * {@link ScheduledFuture}, not the <tt>command</tt> itself.
342: *
343: * @param command the task to execute
344: * @throws RejectedExecutionException at discretion of
345: * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
346: * for execution because the executor has been shut down.
347: * @throws NullPointerException if command is null
348: */
349: public void execute(Runnable command) {
350: if (command == null)
351: throw new NullPointerException();
352: schedule(command, 0, TimeUnit.NANOSECONDS);
353: }
354:
355: // Override AbstractExecutorService methods
356:
357: public Future<?> submit(Runnable task) {
358: return schedule(task, 0, TimeUnit.NANOSECONDS);
359: }
360:
361: public <T> Future<T> submit(Runnable task, T result) {
362: return schedule(Executors.callable(task, result), 0,
363: TimeUnit.NANOSECONDS);
364: }
365:
366: public <T> Future<T> submit(Callable<T> task) {
367: return schedule(task, 0, TimeUnit.NANOSECONDS);
368: }
369:
370: /**
371: * Set policy on whether to continue executing existing periodic
372: * tasks even when this executor has been <tt>shutdown</tt>. In
373: * this case, these tasks will only terminate upon
374: * <tt>shutdownNow</tt>, or after setting the policy to
375: * <tt>false</tt> when already shutdown. This value is by default
376: * false.
377: * @param value if true, continue after shutdown, else don't.
378: * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
379: */
380: public void setContinueExistingPeriodicTasksAfterShutdownPolicy(
381: boolean value) {
382: continueExistingPeriodicTasksAfterShutdown = value;
383: if (!value && isShutdown())
384: cancelUnwantedTasks();
385: }
386:
387: /**
388: * Get the policy on whether to continue executing existing
389: * periodic tasks even when this executor has been
390: * <tt>shutdown</tt>. In this case, these tasks will only
391: * terminate upon <tt>shutdownNow</tt> or after setting the policy
392: * to <tt>false</tt> when already shutdown. This value is by
393: * default false.
394: * @return true if will continue after shutdown.
395: * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
396: */
397: public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
398: return continueExistingPeriodicTasksAfterShutdown;
399: }
400:
401: /**
402: * Set policy on whether to execute existing delayed
403: * tasks even when this executor has been <tt>shutdown</tt>. In
404: * this case, these tasks will only terminate upon
405: * <tt>shutdownNow</tt>, or after setting the policy to
406: * <tt>false</tt> when already shutdown. This value is by default
407: * true.
408: * @param value if true, execute after shutdown, else don't.
409: * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
410: */
411: public void setExecuteExistingDelayedTasksAfterShutdownPolicy(
412: boolean value) {
413: executeExistingDelayedTasksAfterShutdown = value;
414: if (!value && isShutdown())
415: cancelUnwantedTasks();
416: }
417:
418: /**
419: * Get policy on whether to execute existing delayed
420: * tasks even when this executor has been <tt>shutdown</tt>. In
421: * this case, these tasks will only terminate upon
422: * <tt>shutdownNow</tt>, or after setting the policy to
423: * <tt>false</tt> when already shutdown. This value is by default
424: * true.
425: * @return true if will execute after shutdown.
426: * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
427: */
428: public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
429: return executeExistingDelayedTasksAfterShutdown;
430: }
431:
432: /**
433: * Initiates an orderly shutdown in which previously submitted
434: * tasks are executed, but no new tasks will be accepted. If the
435: * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
436: * been set <tt>false</tt>, existing delayed tasks whose delays
437: * have not yet elapsed are cancelled. And unless the
438: * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
439: * been set <tt>true</tt>, future executions of existing periodic
440: * tasks will be cancelled.
441: */
442: public void shutdown() {
443: cancelUnwantedTasks();
444: super .shutdown();
445: }
446:
447: /**
448: * Attempts to stop all actively executing tasks, halts the
449: * processing of waiting tasks, and returns a list of the tasks that were
450: * awaiting execution.
451: *
452: * <p>There are no guarantees beyond best-effort attempts to stop
453: * processing actively executing tasks. This implementation
454: * cancels tasks via {@link Thread#interrupt}, so if any tasks mask or
455: * fail to respond to interrupts, they may never terminate.
456: *
457: * @return list of tasks that never commenced execution. Each
458: * element of this list is a {@link ScheduledFuture},
459: * including those tasks submitted using <tt>execute</tt>, which
460: * are for scheduling purposes used as the basis of a zero-delay
461: * <tt>ScheduledFuture</tt>.
462: */
463: public List<Runnable> shutdownNow() {
464: return super .shutdownNow();
465: }
466:
467: /**
468: * Returns the task queue used by this executor. Each element of
469: * this queue is a {@link ScheduledFuture}, including those
470: * tasks submitted using <tt>execute</tt> which are for scheduling
471: * purposes used as the basis of a zero-delay
472: * <tt>ScheduledFuture</tt>. Iteration over this queue is
473: * <em>not</em> guaranteed to traverse tasks in the order in
474: * which they will execute.
475: *
476: * @return the task queue
477: */
478: public BlockingQueue<Runnable> getQueue() {
479: return super .getQueue();
480: }
481:
482: /**
483: * An annoying wrapper class to convince generics compiler to
484: * use a DelayQueue<ScheduledFutureTask> as a BlockingQueue<Runnable>
485: */
486: private static class DelayedWorkQueue extends
487: AbstractCollection<Runnable> implements
488: BlockingQueue<Runnable> {
489:
490: private final DelayQueue<ScheduledFutureTask> dq = new DelayQueue<ScheduledFutureTask>();
491:
492: public Runnable poll() {
493: return dq.poll();
494: }
495:
496: public Runnable peek() {
497: return dq.peek();
498: }
499:
500: public Runnable take() throws InterruptedException {
501: return dq.take();
502: }
503:
504: public Runnable poll(long timeout, TimeUnit unit)
505: throws InterruptedException {
506: return dq.poll(timeout, unit);
507: }
508:
509: public boolean add(Runnable x) {
510: return dq.add((ScheduledFutureTask) x);
511: }
512:
513: public boolean offer(Runnable x) {
514: return dq.offer((ScheduledFutureTask) x);
515: }
516:
517: public void put(Runnable x) {
518: dq.put((ScheduledFutureTask) x);
519: }
520:
521: public boolean offer(Runnable x, long timeout, TimeUnit unit) {
522: return dq.offer((ScheduledFutureTask) x, timeout, unit);
523: }
524:
525: public Runnable remove() {
526: return dq.remove();
527: }
528:
529: public Runnable element() {
530: return dq.element();
531: }
532:
533: public void clear() {
534: dq.clear();
535: }
536:
537: public int drainTo(Collection<? super Runnable> c) {
538: return dq.drainTo(c);
539: }
540:
541: public int drainTo(Collection<? super Runnable> c,
542: int maxElements) {
543: return dq.drainTo(c, maxElements);
544: }
545:
546: public int remainingCapacity() {
547: return dq.remainingCapacity();
548: }
549:
550: public boolean remove(Object x) {
551: return dq.remove(x);
552: }
553:
554: public boolean contains(Object x) {
555: return dq.contains(x);
556: }
557:
558: public int size() {
559: return dq.size();
560: }
561:
562: public boolean isEmpty() {
563: return dq.isEmpty();
564: }
565:
566: public Object[] toArray() {
567: return dq.toArray();
568: }
569:
570: public <T> T[] toArray(T[] array) {
571: return dq.toArray(array);
572: }
573:
574: public Iterator<Runnable> iterator() {
575: return new Iterator<Runnable>() {
576: private Iterator<ScheduledFutureTask> it = dq
577: .iterator();
578:
579: public boolean hasNext() {
580: return it.hasNext();
581: }
582:
583: public Runnable next() {
584: return it.next();
585: }
586:
587: public void remove() {
588: it.remove();
589: }
590: };
591: }
592: }
593: }
|