001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package java.util.concurrent;
008:
009: import java.util.concurrent.locks.*;
010:
011: /**
012: * A cancellable asynchronous computation. This class provides a base
013: * implementation of {@link Future}, with methods to start and cancel
014: * a computation, query to see if the computation is complete, and
015: * retrieve the result of the computation. The result can only be
016: * retrieved when the computation has completed; the <tt>get</tt>
017: * method will block if the computation has not yet completed. Once
018: * the computation has completed, the computation cannot be restarted
019: * or cancelled.
020: *
021: * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
022: * {@link java.lang.Runnable} object. Because <tt>FutureTask</tt>
023: * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
024: * submitted to an {@link Executor} for execution.
025: *
026: * <p>In addition to serving as a standalone class, this class provides
027: * <tt>protected</tt> functionality that may be useful when creating
028: * customized task classes.
029: *
030: * @since 1.5
031: * @author Doug Lea
032: * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
033: */
034: public class FutureTask<V> implements Future<V>, Runnable {
035: /** Synchronization control for FutureTask */
036: private final Sync sync;
037:
038: /**
039: * Creates a <tt>FutureTask</tt> that will upon running, execute the
040: * given <tt>Callable</tt>.
041: *
042: * @param callable the callable task
043: * @throws NullPointerException if callable is null
044: */
045: public FutureTask(Callable<V> callable) {
046: if (callable == null)
047: throw new NullPointerException();
048: sync = new Sync(callable);
049: }
050:
051: /**
052: * Creates a <tt>FutureTask</tt> that will upon running, execute the
053: * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
054: * given result on successful completion.
055: *
056: * @param runnable the runnable task
057: * @param result the result to return on successful completion. If
058: * you don't need a particular result, consider using
059: * constructions of the form:
060: * <tt>Future<?> f = new FutureTask<Object>(runnable, null)</tt>
061: * @throws NullPointerException if runnable is null
062: */
063: public FutureTask(Runnable runnable, V result) {
064: sync = new Sync(Executors.callable(runnable, result));
065: }
066:
067: public boolean isCancelled() {
068: return sync.innerIsCancelled();
069: }
070:
071: public boolean isDone() {
072: return sync.innerIsDone();
073: }
074:
075: public boolean cancel(boolean mayInterruptIfRunning) {
076: return sync.innerCancel(mayInterruptIfRunning);
077: }
078:
079: public V get() throws InterruptedException, ExecutionException {
080: return sync.innerGet();
081: }
082:
083: public V get(long timeout, TimeUnit unit)
084: throws InterruptedException, ExecutionException,
085: TimeoutException {
086: return sync.innerGet(unit.toNanos(timeout));
087: }
088:
089: /**
090: * Protected method invoked when this task transitions to state
091: * <tt>isDone</tt> (whether normally or via cancellation). The
092: * default implementation does nothing. Subclasses may override
093: * this method to invoke completion callbacks or perform
094: * bookkeeping. Note that you can query status inside the
095: * implementation of this method to determine whether this task
096: * has been cancelled.
097: */
098: protected void done() {
099: }
100:
101: /**
102: * Sets the result of this Future to the given value unless
103: * this future has already been set or has been cancelled.
104: * @param v the value
105: */
106: protected void set(V v) {
107: sync.innerSet(v);
108: }
109:
110: /**
111: * Causes this future to report an <tt>ExecutionException</tt>
112: * with the given throwable as its cause, unless this Future has
113: * already been set or has been cancelled.
114: * @param t the cause of failure.
115: */
116: protected void setException(Throwable t) {
117: sync.innerSetException(t);
118: }
119:
120: /**
121: * Sets this Future to the result of computation unless
122: * it has been cancelled.
123: */
124: public void run() {
125: sync.innerRun();
126: }
127:
128: /**
129: * Executes the computation without setting its result, and then
130: * resets this Future to initial state, failing to do so if the
131: * computation encounters an exception or is cancelled. This is
132: * designed for use with tasks that intrinsically execute more
133: * than once.
134: * @return true if successfully run and reset
135: */
136: protected boolean runAndReset() {
137: return sync.innerRunAndReset();
138: }
139:
140: /**
141: * Synchronization control for FutureTask. Note that this must be
142: * a non-static inner class in order to invoke the protected
143: * <tt>done</tt> method. For clarity, all inner class support
144: * methods are same as outer, prefixed with "inner".
145: *
146: * Uses AQS sync state to represent run status
147: */
148: private final class Sync extends AbstractQueuedSynchronizer {
149: /** State value representing that task is running */
150: private static final int RUNNING = 1;
151: /** State value representing that task ran */
152: private static final int RAN = 2;
153: /** State value representing that task was cancelled */
154: private static final int CANCELLED = 4;
155:
156: /** The underlying callable */
157: private final Callable<V> callable;
158: /** The result to return from get() */
159: private V result;
160: /** The exception to throw from get() */
161: private Throwable exception;
162:
163: /**
164: * The thread running task. When nulled after set/cancel, this
165: * indicates that the results are accessible. Must be
166: * volatile, to serve as write barrier on completion.
167: */
168: private volatile Thread runner;
169:
170: Sync(Callable<V> callable) {
171: this .callable = callable;
172: }
173:
174: private boolean ranOrCancelled(int state) {
175: return (state & (RAN | CANCELLED)) != 0;
176: }
177:
178: /**
179: * Implements AQS base acquire to succeed if ran or cancelled
180: */
181: protected int tryAcquireShared(int ignore) {
182: return innerIsDone() ? 1 : -1;
183: }
184:
185: /**
186: * Implements AQS base release to always signal after setting
187: * final done status by nulling runner thread.
188: */
189: protected boolean tryReleaseShared(int ignore) {
190: runner = null;
191: return true;
192: }
193:
194: boolean innerIsCancelled() {
195: return getState() == CANCELLED;
196: }
197:
198: boolean innerIsDone() {
199: return ranOrCancelled(getState()) && runner == null;
200: }
201:
202: V innerGet() throws InterruptedException, ExecutionException {
203: acquireSharedInterruptibly(0);
204: if (getState() == CANCELLED)
205: throw new CancellationException();
206: if (exception != null)
207: throw new ExecutionException(exception);
208: return result;
209: }
210:
211: V innerGet(long nanosTimeout) throws InterruptedException,
212: ExecutionException, TimeoutException {
213: if (!tryAcquireSharedNanos(0, nanosTimeout))
214: throw new TimeoutException();
215: if (getState() == CANCELLED)
216: throw new CancellationException();
217: if (exception != null)
218: throw new ExecutionException(exception);
219: return result;
220: }
221:
222: void innerSet(V v) {
223: int s = getState();
224: if (ranOrCancelled(s) || !compareAndSetState(s, RAN))
225: return;
226: result = v;
227: releaseShared(0);
228: done();
229: }
230:
231: void innerSetException(Throwable t) {
232: int s = getState();
233: if (ranOrCancelled(s) || !compareAndSetState(s, RAN))
234: return;
235: exception = t;
236: result = null;
237: releaseShared(0);
238: done();
239: }
240:
241: boolean innerCancel(boolean mayInterruptIfRunning) {
242: int s = getState();
243: if (ranOrCancelled(s) || !compareAndSetState(s, CANCELLED))
244: return false;
245: if (mayInterruptIfRunning) {
246: Thread r = runner;
247: if (r != null)
248: r.interrupt();
249: }
250: releaseShared(0);
251: done();
252: return true;
253: }
254:
255: void innerRun() {
256: if (!compareAndSetState(0, RUNNING))
257: return;
258: try {
259: runner = Thread.currentThread();
260: innerSet(callable.call());
261: } catch (Throwable ex) {
262: innerSetException(ex);
263: }
264: }
265:
266: boolean innerRunAndReset() {
267: if (!compareAndSetState(0, RUNNING))
268: return false;
269: try {
270: runner = Thread.currentThread();
271: callable.call(); // don't set result
272: runner = null;
273: return compareAndSetState(RUNNING, 0);
274: } catch (Throwable ex) {
275: innerSetException(ex);
276: return false;
277: }
278: }
279: }
280: }
|