001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package java.util.concurrent;
008:
009: import java.util.*;
010:
011: /**
012: * Provides default implementation of {@link ExecutorService}
013: * execution methods. This class implements the <tt>submit</tt>,
014: * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using the default
015: * {@link FutureTask} class provided in this package. For example,
016: * the implementation of <tt>submit(Runnable)</tt> creates an
017: * associated <tt>FutureTask</tt> that is executed and
018: * returned. Subclasses overriding these methods to use different
019: * {@link Future} implementations should do so consistently for each
020: * of these methods.
021: *
022: * @since 1.5
023: * @author Doug Lea
024: */
025: public abstract class AbstractExecutorService implements
026: ExecutorService {
027:
028: public Future<?> submit(Runnable task) {
029: if (task == null)
030: throw new NullPointerException();
031: FutureTask<Object> ftask = new FutureTask<Object>(task, null);
032: execute(ftask);
033: return ftask;
034: }
035:
036: public <T> Future<T> submit(Runnable task, T result) {
037: if (task == null)
038: throw new NullPointerException();
039: FutureTask<T> ftask = new FutureTask<T>(task, result);
040: execute(ftask);
041: return ftask;
042: }
043:
044: public <T> Future<T> submit(Callable<T> task) {
045: if (task == null)
046: throw new NullPointerException();
047: FutureTask<T> ftask = new FutureTask<T>(task);
048: execute(ftask);
049: return ftask;
050: }
051:
052: /**
053: * the main mechanics of invokeAny.
054: */
055: private <T> T doInvokeAny(Collection<Callable<T>> tasks,
056: boolean timed, long nanos) throws InterruptedException,
057: ExecutionException, TimeoutException {
058: if (tasks == null)
059: throw new NullPointerException();
060: int ntasks = tasks.size();
061: if (ntasks == 0)
062: throw new IllegalArgumentException();
063: List<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
064: ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(
065: this );
066:
067: // For efficiency, especially in executors with limited
068: // parallelism, check to see if previously submitted tasks are
069: // done before submitting more of them. This interleaving
070: // plus the exception mechanics account for messiness of main
071: // loop.
072:
073: try {
074: // Record exceptions so that if we fail to obtain any
075: // result, we can throw the last exception we got.
076: ExecutionException ee = null;
077: long lastTime = (timed) ? System.nanoTime() : 0;
078: Iterator<Callable<T>> it = tasks.iterator();
079:
080: // Start one task for sure; the rest incrementally
081: futures.add(ecs.submit(it.next()));
082: --ntasks;
083: int active = 1;
084:
085: for (;;) {
086: Future<T> f = ecs.poll();
087: if (f == null) {
088: if (ntasks > 0) {
089: --ntasks;
090: futures.add(ecs.submit(it.next()));
091: ++active;
092: } else if (active == 0)
093: break;
094: else if (timed) {
095: f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
096: if (f == null)
097: throw new TimeoutException();
098: long now = System.nanoTime();
099: nanos -= now - lastTime;
100: lastTime = now;
101: } else
102: f = ecs.take();
103: }
104: if (f != null) {
105: --active;
106: try {
107: return f.get();
108: } catch (InterruptedException ie) {
109: throw ie;
110: } catch (ExecutionException eex) {
111: ee = eex;
112: } catch (RuntimeException rex) {
113: ee = new ExecutionException(rex);
114: }
115: }
116: }
117:
118: if (ee == null)
119: ee = new ExecutionException();
120: throw ee;
121:
122: } finally {
123: for (Future<T> f : futures)
124: f.cancel(true);
125: }
126: }
127:
128: public <T> T invokeAny(Collection<Callable<T>> tasks)
129: throws InterruptedException, ExecutionException {
130: try {
131: return doInvokeAny(tasks, false, 0);
132: } catch (TimeoutException cannotHappen) {
133: assert false;
134: return null;
135: }
136: }
137:
138: public <T> T invokeAny(Collection<Callable<T>> tasks, long timeout,
139: TimeUnit unit) throws InterruptedException,
140: ExecutionException, TimeoutException {
141: return doInvokeAny(tasks, true, unit.toNanos(timeout));
142: }
143:
144: public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
145: throws InterruptedException {
146: if (tasks == null)
147: throw new NullPointerException();
148: List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
149: boolean done = false;
150: try {
151: for (Callable<T> t : tasks) {
152: FutureTask<T> f = new FutureTask<T>(t);
153: futures.add(f);
154: execute(f);
155: }
156: for (Future<T> f : futures) {
157: if (!f.isDone()) {
158: try {
159: f.get();
160: } catch (CancellationException ignore) {
161: } catch (ExecutionException ignore) {
162: }
163: }
164: }
165: done = true;
166: return futures;
167: } finally {
168: if (!done)
169: for (Future<T> f : futures)
170: f.cancel(true);
171: }
172: }
173:
174: public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
175: long timeout, TimeUnit unit) throws InterruptedException {
176: if (tasks == null || unit == null)
177: throw new NullPointerException();
178: long nanos = unit.toNanos(timeout);
179: List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
180: boolean done = false;
181: try {
182: for (Callable<T> t : tasks)
183: futures.add(new FutureTask<T>(t));
184:
185: long lastTime = System.nanoTime();
186:
187: // Interleave time checks and calls to execute in case
188: // executor doesn't have any/much parallelism.
189: Iterator<Future<T>> it = futures.iterator();
190: while (it.hasNext()) {
191: execute((Runnable) (it.next()));
192: long now = System.nanoTime();
193: nanos -= now - lastTime;
194: lastTime = now;
195: if (nanos <= 0)
196: return futures;
197: }
198:
199: for (Future<T> f : futures) {
200: if (!f.isDone()) {
201: if (nanos <= 0)
202: return futures;
203: try {
204: f.get(nanos, TimeUnit.NANOSECONDS);
205: } catch (CancellationException ignore) {
206: } catch (ExecutionException ignore) {
207: } catch (TimeoutException toe) {
208: return futures;
209: }
210: long now = System.nanoTime();
211: nanos -= now - lastTime;
212: lastTime = now;
213: }
214: }
215: done = true;
216: return futures;
217: } finally {
218: if (!done)
219: for (Future<T> f : futures)
220: f.cancel(true);
221: }
222: }
223:
224: }
|