001 /*
002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
003 *
004 * This code is free software; you can redistribute it and/or modify it
005 * under the terms of the GNU General Public License version 2 only, as
006 * published by the Free Software Foundation. Sun designates this
007 * particular file as subject to the "Classpath" exception as provided
008 * by Sun in the LICENSE file that accompanied this code.
009 *
010 * This code is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
013 * version 2 for more details (a copy is included in the LICENSE file that
014 * accompanied this code).
015 *
016 * You should have received a copy of the GNU General Public License version
017 * 2 along with this work; if not, write to the Free Software Foundation,
018 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
019 *
020 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
021 * CA 95054 USA or visit www.sun.com if you need additional information or
022 * have any questions.
023 */
024
025 /*
026 * This file is available under and governed by the GNU General Public
027 * License version 2 only, as published by the Free Software Foundation.
028 * However, the following notice accompanied the original version of this
029 * file:
030 *
031 * Written by Doug Lea with assistance from members of JCP JSR-166
032 * Expert Group and released to the public domain, as explained at
033 * http://creativecommons.org/licenses/publicdomain
034 */
035
036 package java.util.concurrent;
037
038 import java.util.*;
039
040 /**
041 * Provides default implementations of {@link ExecutorService}
042 * execution methods. This class implements the <tt>submit</tt>,
043 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
044 * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
045 * to the {@link FutureTask} class provided in this package. For example,
046 * the implementation of <tt>submit(Runnable)</tt> creates an
047 * associated <tt>RunnableFuture</tt> that is executed and
048 * returned. Subclasses may override the <tt>newTaskFor</tt> methods
049 * to return <tt>RunnableFuture</tt> implementations other than
050 * <tt>FutureTask</tt>.
051 *
052 * <p> <b>Extension example</b>. Here is a sketch of a class
053 * that customizes {@link ThreadPoolExecutor} to use
054 * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
055 * <pre>
056 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
057 *
058 * static class CustomTask<V> implements RunnableFuture<V> {...}
059 *
060 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
061 * return new CustomTask<V>(c);
062 * }
063 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
064 * return new CustomTask<V>(r, v);
065 * }
066 * // ... add constructors, etc.
067 * }
068 * </pre>
069 * @since 1.5
070 * @author Doug Lea
071 */
072 public abstract class AbstractExecutorService implements
073 ExecutorService {
074
075 /**
076 * Returns a <tt>RunnableFuture</tt> for the given runnable and default
077 * value.
078 *
079 * @param runnable the runnable task being wrapped
080 * @param value the default value for the returned future
081 * @return a <tt>RunnableFuture</tt> which when run will run the
082 * underlying runnable and which, as a <tt>Future</tt>, will yield
083 * the given value as its result and provide for cancellation of
084 * the underlying task.
085 * @since 1.6
086 */
087 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable,
088 T value) {
089 return new FutureTask<T>(runnable, value);
090 }
091
092 /**
093 * Returns a <tt>RunnableFuture</tt> for the given callable task.
094 *
095 * @param callable the callable task being wrapped
096 * @return a <tt>RunnableFuture</tt> which when run will call the
097 * underlying callable and which, as a <tt>Future</tt>, will yield
098 * the callable's result as its result and provide for
099 * cancellation of the underlying task.
100 * @since 1.6
101 */
102 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
103 return new FutureTask<T>(callable);
104 }
105
106 /**
107 * @throws RejectedExecutionException {@inheritDoc}
108 * @throws NullPointerException {@inheritDoc}
109 */
110 public Future<?> submit(Runnable task) {
111 if (task == null)
112 throw new NullPointerException();
113 RunnableFuture<Object> ftask = newTaskFor(task, null);
114 execute(ftask);
115 return ftask;
116 }
117
118 /**
119 * @throws RejectedExecutionException {@inheritDoc}
120 * @throws NullPointerException {@inheritDoc}
121 */
122 public <T> Future<T> submit(Runnable task, T result) {
123 if (task == null)
124 throw new NullPointerException();
125 RunnableFuture<T> ftask = newTaskFor(task, result);
126 execute(ftask);
127 return ftask;
128 }
129
130 /**
131 * @throws RejectedExecutionException {@inheritDoc}
132 * @throws NullPointerException {@inheritDoc}
133 */
134 public <T> Future<T> submit(Callable<T> task) {
135 if (task == null)
136 throw new NullPointerException();
137 RunnableFuture<T> ftask = newTaskFor(task);
138 execute(ftask);
139 return ftask;
140 }
141
142 /**
143 * the main mechanics of invokeAny.
144 */
145 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
146 boolean timed, long nanos) throws InterruptedException,
147 ExecutionException, TimeoutException {
148 if (tasks == null)
149 throw new NullPointerException();
150 int ntasks = tasks.size();
151 if (ntasks == 0)
152 throw new IllegalArgumentException();
153 List<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
154 ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(
155 this );
156
157 // For efficiency, especially in executors with limited
158 // parallelism, check to see if previously submitted tasks are
159 // done before submitting more of them. This interleaving
160 // plus the exception mechanics account for messiness of main
161 // loop.
162
163 try {
164 // Record exceptions so that if we fail to obtain any
165 // result, we can throw the last exception we got.
166 ExecutionException ee = null;
167 long lastTime = (timed) ? System.nanoTime() : 0;
168 Iterator<? extends Callable<T>> it = tasks.iterator();
169
170 // Start one task for sure; the rest incrementally
171 futures.add(ecs.submit(it.next()));
172 --ntasks;
173 int active = 1;
174
175 for (;;) {
176 Future<T> f = ecs.poll();
177 if (f == null) {
178 if (ntasks > 0) {
179 --ntasks;
180 futures.add(ecs.submit(it.next()));
181 ++active;
182 } else if (active == 0)
183 break;
184 else if (timed) {
185 f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
186 if (f == null)
187 throw new TimeoutException();
188 long now = System.nanoTime();
189 nanos -= now - lastTime;
190 lastTime = now;
191 } else
192 f = ecs.take();
193 }
194 if (f != null) {
195 --active;
196 try {
197 return f.get();
198 } catch (InterruptedException ie) {
199 throw ie;
200 } catch (ExecutionException eex) {
201 ee = eex;
202 } catch (RuntimeException rex) {
203 ee = new ExecutionException(rex);
204 }
205 }
206 }
207
208 if (ee == null)
209 ee = new ExecutionException();
210 throw ee;
211
212 } finally {
213 for (Future<T> f : futures)
214 f.cancel(true);
215 }
216 }
217
218 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
219 throws InterruptedException, ExecutionException {
220 try {
221 return doInvokeAny(tasks, false, 0);
222 } catch (TimeoutException cannotHappen) {
223 assert false;
224 return null;
225 }
226 }
227
228 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
229 long timeout, TimeUnit unit) throws InterruptedException,
230 ExecutionException, TimeoutException {
231 return doInvokeAny(tasks, true, unit.toNanos(timeout));
232 }
233
234 public <T> List<Future<T>> invokeAll(
235 Collection<? extends Callable<T>> tasks)
236 throws InterruptedException {
237 if (tasks == null)
238 throw new NullPointerException();
239 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
240 boolean done = false;
241 try {
242 for (Callable<T> t : tasks) {
243 RunnableFuture<T> f = newTaskFor(t);
244 futures.add(f);
245 execute(f);
246 }
247 for (Future<T> f : futures) {
248 if (!f.isDone()) {
249 try {
250 f.get();
251 } catch (CancellationException ignore) {
252 } catch (ExecutionException ignore) {
253 }
254 }
255 }
256 done = true;
257 return futures;
258 } finally {
259 if (!done)
260 for (Future<T> f : futures)
261 f.cancel(true);
262 }
263 }
264
265 public <T> List<Future<T>> invokeAll(
266 Collection<? extends Callable<T>> tasks, long timeout,
267 TimeUnit unit) throws InterruptedException {
268 if (tasks == null || unit == null)
269 throw new NullPointerException();
270 long nanos = unit.toNanos(timeout);
271 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
272 boolean done = false;
273 try {
274 for (Callable<T> t : tasks)
275 futures.add(newTaskFor(t));
276
277 long lastTime = System.nanoTime();
278
279 // Interleave time checks and calls to execute in case
280 // executor doesn't have any/much parallelism.
281 Iterator<Future<T>> it = futures.iterator();
282 while (it.hasNext()) {
283 execute((Runnable) (it.next()));
284 long now = System.nanoTime();
285 nanos -= now - lastTime;
286 lastTime = now;
287 if (nanos <= 0)
288 return futures;
289 }
290
291 for (Future<T> f : futures) {
292 if (!f.isDone()) {
293 if (nanos <= 0)
294 return futures;
295 try {
296 f.get(nanos, TimeUnit.NANOSECONDS);
297 } catch (CancellationException ignore) {
298 } catch (ExecutionException ignore) {
299 } catch (TimeoutException toe) {
300 return futures;
301 }
302 long now = System.nanoTime();
303 nanos -= now - lastTime;
304 lastTime = now;
305 }
306 }
307 done = true;
308 return futures;
309 } finally {
310 if (!done)
311 for (Future<T> f : futures)
312 f.cancel(true);
313 }
314 }
315
316 }
|