001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package java.util.concurrent;
008:
009: /**
010: * A {@link CompletionService} that uses a supplied {@link Executor}
011: * to execute tasks. This class arranges that submitted tasks are,
012: * upon completion, placed on a queue accessible using <tt>take</tt>.
013: * The class is lightweight enough to be suitable for transient use
014: * when processing groups of tasks.
015: *
016: * <p>
017: *
018: * <b>Usage Examples.</b>
019: *
020: * Suppose you have a set of solvers for a certain problem, each
021: * returning a value of some type <tt>Result</tt>, and would like to
022: * run them concurrently, processing the results of each of them that
023: * return a non-null value, in some method <tt>use(Result r)</tt>. You
024: * could write this as:
025: *
026: * <pre>
027: * void solve(Executor e, Collection<Callable<Result>> solvers)
028: * throws InterruptedException, ExecutionException {
029: * CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
030: * for (Callable<Result> s : solvers)
031: * ecs.submit(s);
032: * int n = solvers.size();
033: * for (int i = 0; i < n; ++i) {
034: * Result r = ecs.take().get();
035: * if (r != null)
036: * use(r);
037: * }
038: * }
039: * </pre>
040: *
041: * Suppose instead that you would like to use the first non-null result
042: * of the set of tasks, ignoring any that encounter exceptions,
043: * and cancelling all other tasks when the first one is ready:
044: *
045: * <pre>
046: * void solve(Executor e, Collection<Callable<Result>> solvers)
047: * throws InterruptedException {
048: * CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
049: * int n = solvers.size();
050: * List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
051: * Result result = null;
052: * try {
053: * for (Callable<Result> s : solvers)
054: * futures.add(ecs.submit(s));
055: * for (int i = 0; i < n; ++i) {
056: * try {
057: * Result r = ecs.take().get();
058: * if (r != null) {
059: * result = r;
060: * break;
061: * }
062: * } catch(ExecutionException ignore) {}
063: * }
064: * }
065: * finally {
066: * for (Future<Result> f : futures)
067: * f.cancel(true);
068: * }
069: *
070: * if (result != null)
071: * use(result);
072: * }
073: * </pre>
074: */
075: public class ExecutorCompletionService<V> implements
076: CompletionService<V> {
077: private final Executor executor;
078: private final BlockingQueue<Future<V>> completionQueue;
079:
080: /**
081: * FutureTask extension to enqueue upon completion
082: */
083: private class QueueingFuture extends FutureTask<V> {
084: QueueingFuture(Callable<V> c) {
085: super (c);
086: }
087:
088: QueueingFuture(Runnable t, V r) {
089: super (t, r);
090: }
091:
092: protected void done() {
093: completionQueue.add(this );
094: }
095: }
096:
097: /**
098: * Creates an ExecutorCompletionService using the supplied
099: * executor for base task execution and a
100: * {@link LinkedBlockingQueue} as a completion queue.
101: * @param executor the executor to use
102: * @throws NullPointerException if executor is <tt>null</tt>
103: */
104: public ExecutorCompletionService(Executor executor) {
105: if (executor == null)
106: throw new NullPointerException();
107: this .executor = executor;
108: this .completionQueue = new LinkedBlockingQueue<Future<V>>();
109: }
110:
111: /**
112: * Creates an ExecutorCompletionService using the supplied
113: * executor for base task execution and the supplied queue as its
114: * completion queue.
115: * @param executor the executor to use
116: * @param completionQueue the queue to use as the completion queue
117: * normally one dedicated for use by this service
118: * @throws NullPointerException if executor or completionQueue are <tt>null</tt>
119: */
120: public ExecutorCompletionService(Executor executor,
121: BlockingQueue<Future<V>> completionQueue) {
122: if (executor == null || completionQueue == null)
123: throw new NullPointerException();
124: this .executor = executor;
125: this .completionQueue = completionQueue;
126: }
127:
128: public Future<V> submit(Callable<V> task) {
129: if (task == null)
130: throw new NullPointerException();
131: QueueingFuture f = new QueueingFuture(task);
132: executor.execute(f);
133: return f;
134: }
135:
136: public Future<V> submit(Runnable task, V result) {
137: if (task == null)
138: throw new NullPointerException();
139: QueueingFuture f = new QueueingFuture(task, result);
140: executor.execute(f);
141: return f;
142: }
143:
144: public Future<V> take() throws InterruptedException {
145: return completionQueue.take();
146: }
147:
148: public Future<V> poll() {
149: return completionQueue.poll();
150: }
151:
152: public Future<V> poll(long timeout, TimeUnit unit)
153: throws InterruptedException {
154: return completionQueue.poll(timeout, unit);
155: }
156:
157: }
|