001 /*
002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
003 *
004 * This code is free software; you can redistribute it and/or modify it
005 * under the terms of the GNU General Public License version 2 only, as
006 * published by the Free Software Foundation. Sun designates this
007 * particular file as subject to the "Classpath" exception as provided
008 * by Sun in the LICENSE file that accompanied this code.
009 *
010 * This code is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
013 * version 2 for more details (a copy is included in the LICENSE file that
014 * accompanied this code).
015 *
016 * You should have received a copy of the GNU General Public License version
017 * 2 along with this work; if not, write to the Free Software Foundation,
018 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
019 *
020 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
021 * CA 95054 USA or visit www.sun.com if you need additional information or
022 * have any questions.
023 */
024
025 /*
026 * This file is available under and governed by the GNU General Public
027 * License version 2 only, as published by the Free Software Foundation.
028 * However, the following notice accompanied the original version of this
029 * file:
030 *
031 * Written by Doug Lea with assistance from members of JCP JSR-166
032 * Expert Group and released to the public domain, as explained at
033 * http://creativecommons.org/licenses/publicdomain
034 */
035
036 package java.util.concurrent;
037
038 import java.util.concurrent.locks.*;
039
040 /**
041 * A cancellable asynchronous computation. This class provides a base
042 * implementation of {@link Future}, with methods to start and cancel
043 * a computation, query to see if the computation is complete, and
044 * retrieve the result of the computation. The result can only be
045 * retrieved when the computation has completed; the <tt>get</tt>
046 * method will block if the computation has not yet completed. Once
047 * the computation has completed, the computation cannot be restarted
048 * or cancelled.
049 *
050 * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
051 * {@link java.lang.Runnable} object. Because <tt>FutureTask</tt>
052 * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
053 * submitted to an {@link Executor} for execution.
054 *
055 * <p>In addition to serving as a standalone class, this class provides
056 * <tt>protected</tt> functionality that may be useful when creating
057 * customized task classes.
058 *
059 * @since 1.5
060 * @author Doug Lea
061 * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
062 */
063 public class FutureTask<V> implements RunnableFuture<V> {
064 /** Synchronization control for FutureTask */
065 private final Sync sync;
066
067 /**
068 * Creates a <tt>FutureTask</tt> that will, upon running, execute the
069 * given <tt>Callable</tt>.
070 *
071 * @param callable the callable task
072 * @throws NullPointerException if callable is null
073 */
074 public FutureTask(Callable<V> callable) {
075 if (callable == null)
076 throw new NullPointerException();
077 sync = new Sync(callable);
078 }
079
080 /**
081 * Creates a <tt>FutureTask</tt> that will, upon running, execute the
082 * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
083 * given result on successful completion.
084 *
085 * @param runnable the runnable task
086 * @param result the result to return on successful completion. If
087 * you don't need a particular result, consider using
088 * constructions of the form:
089 * <tt>Future<?> f = new FutureTask<Object>(runnable, null)</tt>
090 * @throws NullPointerException if runnable is null
091 */
092 public FutureTask(Runnable runnable, V result) {
093 sync = new Sync(Executors.callable(runnable, result));
094 }
095
096 public boolean isCancelled() {
097 return sync.innerIsCancelled();
098 }
099
100 public boolean isDone() {
101 return sync.innerIsDone();
102 }
103
104 public boolean cancel(boolean mayInterruptIfRunning) {
105 return sync.innerCancel(mayInterruptIfRunning);
106 }
107
108 /**
109 * @throws CancellationException {@inheritDoc}
110 */
111 public V get() throws InterruptedException, ExecutionException {
112 return sync.innerGet();
113 }
114
115 /**
116 * @throws CancellationException {@inheritDoc}
117 */
118 public V get(long timeout, TimeUnit unit)
119 throws InterruptedException, ExecutionException,
120 TimeoutException {
121 return sync.innerGet(unit.toNanos(timeout));
122 }
123
124 /**
125 * Protected method invoked when this task transitions to state
126 * <tt>isDone</tt> (whether normally or via cancellation). The
127 * default implementation does nothing. Subclasses may override
128 * this method to invoke completion callbacks or perform
129 * bookkeeping. Note that you can query status inside the
130 * implementation of this method to determine whether this task
131 * has been cancelled.
132 */
133 protected void done() {
134 }
135
136 /**
137 * Sets the result of this Future to the given value unless
138 * this future has already been set or has been cancelled.
139 * This method is invoked internally by the <tt>run</tt> method
140 * upon successful completion of the computation.
141 * @param v the value
142 */
143 protected void set(V v) {
144 sync.innerSet(v);
145 }
146
147 /**
148 * Causes this future to report an <tt>ExecutionException</tt>
149 * with the given throwable as its cause, unless this Future has
150 * already been set or has been cancelled.
151 * This method is invoked internally by the <tt>run</tt> method
152 * upon failure of the computation.
153 * @param t the cause of failure
154 */
155 protected void setException(Throwable t) {
156 sync.innerSetException(t);
157 }
158
159 // The following (duplicated) doc comment can be removed once
160 //
161 // 6270645: Javadoc comments should be inherited from most derived
162 // superinterface or superclass
163 // is fixed.
164 /**
165 * Sets this Future to the result of its computation
166 * unless it has been cancelled.
167 */
168 public void run() {
169 sync.innerRun();
170 }
171
172 /**
173 * Executes the computation without setting its result, and then
174 * resets this Future to initial state, failing to do so if the
175 * computation encounters an exception or is cancelled. This is
176 * designed for use with tasks that intrinsically execute more
177 * than once.
178 * @return true if successfully run and reset
179 */
180 protected boolean runAndReset() {
181 return sync.innerRunAndReset();
182 }
183
184 /**
185 * Synchronization control for FutureTask. Note that this must be
186 * a non-static inner class in order to invoke the protected
187 * <tt>done</tt> method. For clarity, all inner class support
188 * methods are same as outer, prefixed with "inner".
189 *
190 * Uses AQS sync state to represent run status
191 */
192 private final class Sync extends AbstractQueuedSynchronizer {
193 private static final long serialVersionUID = -7828117401763700385L;
194
195 /** State value representing that task is ready to run */
196 private static final int READY = 0;
197 /** State value representing that task is running */
198 private static final int RUNNING = 1;
199 /** State value representing that task ran */
200 private static final int RAN = 2;
201 /** State value representing that task was cancelled */
202 private static final int CANCELLED = 4;
203
204 /** The underlying callable */
205 private final Callable<V> callable;
206 /** The result to return from get() */
207 private V result;
208 /** The exception to throw from get() */
209 private Throwable exception;
210
211 /**
212 * The thread running task. When nulled after set/cancel, this
213 * indicates that the results are accessible. Must be
214 * volatile, to ensure visibility upon completion.
215 */
216 private volatile Thread runner;
217
218 Sync(Callable<V> callable) {
219 this .callable = callable;
220 }
221
222 private boolean ranOrCancelled(int state) {
223 return (state & (RAN | CANCELLED)) != 0;
224 }
225
226 /**
227 * Implements AQS base acquire to succeed if ran or cancelled
228 */
229 protected int tryAcquireShared(int ignore) {
230 return innerIsDone() ? 1 : -1;
231 }
232
233 /**
234 * Implements AQS base release to always signal after setting
235 * final done status by nulling runner thread.
236 */
237 protected boolean tryReleaseShared(int ignore) {
238 runner = null;
239 return true;
240 }
241
242 boolean innerIsCancelled() {
243 return getState() == CANCELLED;
244 }
245
246 boolean innerIsDone() {
247 return ranOrCancelled(getState()) && runner == null;
248 }
249
250 V innerGet() throws InterruptedException, ExecutionException {
251 acquireSharedInterruptibly(0);
252 if (getState() == CANCELLED)
253 throw new CancellationException();
254 if (exception != null)
255 throw new ExecutionException(exception);
256 return result;
257 }
258
259 V innerGet(long nanosTimeout) throws InterruptedException,
260 ExecutionException, TimeoutException {
261 if (!tryAcquireSharedNanos(0, nanosTimeout))
262 throw new TimeoutException();
263 if (getState() == CANCELLED)
264 throw new CancellationException();
265 if (exception != null)
266 throw new ExecutionException(exception);
267 return result;
268 }
269
270 void innerSet(V v) {
271 for (;;) {
272 int s = getState();
273 if (s == RAN)
274 return;
275 if (s == CANCELLED) {
276 // aggressively release to set runner to null,
277 // in case we are racing with a cancel request
278 // that will try to interrupt runner
279 releaseShared(0);
280 return;
281 }
282 if (compareAndSetState(s, RAN)) {
283 result = v;
284 releaseShared(0);
285 done();
286 return;
287 }
288 }
289 }
290
291 void innerSetException(Throwable t) {
292 for (;;) {
293 int s = getState();
294 if (s == RAN)
295 return;
296 if (s == CANCELLED) {
297 // aggressively release to set runner to null,
298 // in case we are racing with a cancel request
299 // that will try to interrupt runner
300 releaseShared(0);
301 return;
302 }
303 if (compareAndSetState(s, RAN)) {
304 exception = t;
305 releaseShared(0);
306 done();
307 return;
308 }
309 }
310 }
311
312 boolean innerCancel(boolean mayInterruptIfRunning) {
313 for (;;) {
314 int s = getState();
315 if (ranOrCancelled(s))
316 return false;
317 if (compareAndSetState(s, CANCELLED))
318 break;
319 }
320 if (mayInterruptIfRunning) {
321 Thread r = runner;
322 if (r != null)
323 r.interrupt();
324 }
325 releaseShared(0);
326 done();
327 return true;
328 }
329
330 void innerRun() {
331 if (!compareAndSetState(READY, RUNNING))
332 return;
333
334 runner = Thread.currentThread();
335 if (getState() == RUNNING) { // recheck after setting thread
336 V result;
337 try {
338 result = callable.call();
339 } catch (Throwable ex) {
340 setException(ex);
341 return;
342 }
343 set(result);
344 } else {
345 releaseShared(0); // cancel
346 }
347 }
348
349 boolean innerRunAndReset() {
350 if (!compareAndSetState(READY, RUNNING))
351 return false;
352 try {
353 runner = Thread.currentThread();
354 if (getState() == RUNNING)
355 callable.call(); // don't set result
356 runner = null;
357 return compareAndSetState(RUNNING, READY);
358 } catch (Throwable ex) {
359 setException(ex);
360 return false;
361 }
362 }
363 }
364 }
|