001 /*
002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
003 *
004 * This code is free software; you can redistribute it and/or modify it
005 * under the terms of the GNU General Public License version 2 only, as
006 * published by the Free Software Foundation. Sun designates this
007 * particular file as subject to the "Classpath" exception as provided
008 * by Sun in the LICENSE file that accompanied this code.
009 *
010 * This code is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
013 * version 2 for more details (a copy is included in the LICENSE file that
014 * accompanied this code).
015 *
016 * You should have received a copy of the GNU General Public License version
017 * 2 along with this work; if not, write to the Free Software Foundation,
018 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
019 *
020 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
021 * CA 95054 USA or visit www.sun.com if you need additional information or
022 * have any questions.
023 */
024
025 /*
026 * This file is available under and governed by the GNU General Public
027 * License version 2 only, as published by the Free Software Foundation.
028 * However, the following notice accompanied the original version of this
029 * file:
030 *
031 * Written by Doug Lea with assistance from members of JCP JSR-166
032 * Expert Group and released to the public domain, as explained at
033 * http://creativecommons.org/licenses/publicdomain
034 */
035
036 package java.util.concurrent;
037
038 import java.util.concurrent.locks.*;
039
040 /**
041 * A synchronization aid that allows a set of threads to all wait for
042 * each other to reach a common barrier point. CyclicBarriers are
043 * useful in programs involving a fixed sized party of threads that
044 * must occasionally wait for each other. The barrier is called
045 * <em>cyclic</em> because it can be re-used after the waiting threads
046 * are released.
047 *
048 * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
049 * that is run once per barrier point, after the last thread in the party
050 * arrives, but before any threads are released.
051 * This <em>barrier action</em> is useful
052 * for updating shared-state before any of the parties continue.
053 *
054 * <p><b>Sample usage:</b> Here is an example of
055 * using a barrier in a parallel decomposition design:
056 * <pre>
057 * class Solver {
058 * final int N;
059 * final float[][] data;
060 * final CyclicBarrier barrier;
061 *
062 * class Worker implements Runnable {
063 * int myRow;
064 * Worker(int row) { myRow = row; }
065 * public void run() {
066 * while (!done()) {
067 * processRow(myRow);
068 *
069 * try {
070 * barrier.await();
071 * } catch (InterruptedException ex) {
072 * return;
073 * } catch (BrokenBarrierException ex) {
074 * return;
075 * }
076 * }
077 * }
078 * }
079 *
080 * public Solver(float[][] matrix) {
081 * data = matrix;
082 * N = matrix.length;
083 * barrier = new CyclicBarrier(N,
084 * new Runnable() {
085 * public void run() {
086 * mergeRows(...);
087 * }
088 * });
089 * for (int i = 0; i < N; ++i)
090 * new Thread(new Worker(i)).start();
091 *
092 * waitUntilDone();
093 * }
094 * }
095 * </pre>
096 * Here, each worker thread processes a row of the matrix then waits at the
097 * barrier until all rows have been processed. When all rows are processed
098 * the supplied {@link Runnable} barrier action is executed and merges the
099 * rows. If the merger
100 * determines that a solution has been found then <tt>done()</tt> will return
101 * <tt>true</tt> and each worker will terminate.
102 *
103 * <p>If the barrier action does not rely on the parties being suspended when
104 * it is executed, then any of the threads in the party could execute that
105 * action when it is released. To facilitate this, each invocation of
106 * {@link #await} returns the arrival index of that thread at the barrier.
107 * You can then choose which thread should execute the barrier action, for
108 * example:
109 * <pre> if (barrier.await() == 0) {
110 * // log the completion of this iteration
111 * }</pre>
112 *
113 * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
114 * for failed synchronization attempts: If a thread leaves a barrier
115 * point prematurely because of interruption, failure, or timeout, all
116 * other threads waiting at that barrier point will also leave
117 * abnormally via {@link BrokenBarrierException} (or
118 * {@link InterruptedException} if they too were interrupted at about
119 * the same time).
120 *
121 * <p>Memory consistency effects: Actions in a thread prior to calling
122 * {@code await()}
123 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
124 * actions that are part of the barrier action, which in turn
125 * <i>happen-before</i> actions following a successful return from the
126 * corresponding {@code await()} in other threads.
127 *
128 * @since 1.5
129 * @see CountDownLatch
130 *
131 * @author Doug Lea
132 */
133 public class CyclicBarrier {
134 /**
135 * Each use of the barrier is represented as a generation instance.
136 * The generation changes whenever the barrier is tripped, or
137 * is reset. There can be many generations associated with threads
138 * using the barrier - due to the non-deterministic way the lock
139 * may be allocated to waiting threads - but only one of these
140 * can be active at a time (the one to which <tt>count</tt> applies)
141 * and all the rest are either broken or tripped.
142 * There need not be an active generation if there has been a break
143 * but no subsequent reset.
144 */
145 private static class Generation {
146 boolean broken = false;
147 }
148
149 /** The lock for guarding barrier entry */
150 private final ReentrantLock lock = new ReentrantLock();
151 /** Condition to wait on until tripped */
152 private final Condition trip = lock.newCondition();
153 /** The number of parties */
154 private final int parties;
155 /* The command to run when tripped */
156 private final Runnable barrierCommand;
157 /** The current generation */
158 private Generation generation = new Generation();
159
160 /**
161 * Number of parties still waiting. Counts down from parties to 0
162 * on each generation. It is reset to parties on each new
163 * generation or when broken.
164 */
165 private int count;
166
167 /**
168 * Updates state on barrier trip and wakes up everyone.
169 * Called only while holding lock.
170 */
171 private void nextGeneration() {
172 // signal completion of last generation
173 trip.signalAll();
174 // set up next generation
175 count = parties;
176 generation = new Generation();
177 }
178
179 /**
180 * Sets current barrier generation as broken and wakes up everyone.
181 * Called only while holding lock.
182 */
183 private void breakBarrier() {
184 generation.broken = true;
185 count = parties;
186 trip.signalAll();
187 }
188
189 /**
190 * Main barrier code, covering the various policies.
191 */
192 private int dowait(boolean timed, long nanos)
193 throws InterruptedException, BrokenBarrierException,
194 TimeoutException {
195 final ReentrantLock lock = this .lock;
196 lock.lock();
197 try {
198 final Generation g = generation;
199
200 if (g.broken)
201 throw new BrokenBarrierException();
202
203 if (Thread.interrupted()) {
204 breakBarrier();
205 throw new InterruptedException();
206 }
207
208 int index = --count;
209 if (index == 0) { // tripped
210 boolean ranAction = false;
211 try {
212 final Runnable command = barrierCommand;
213 if (command != null)
214 command.run();
215 ranAction = true;
216 nextGeneration();
217 return 0;
218 } finally {
219 if (!ranAction)
220 breakBarrier();
221 }
222 }
223
224 // loop until tripped, broken, interrupted, or timed out
225 for (;;) {
226 try {
227 if (!timed)
228 trip.await();
229 else if (nanos > 0L)
230 nanos = trip.awaitNanos(nanos);
231 } catch (InterruptedException ie) {
232 if (g == generation && !g.broken) {
233 breakBarrier();
234 throw ie;
235 } else {
236 // We're about to finish waiting even if we had not
237 // been interrupted, so this interrupt is deemed to
238 // "belong" to subsequent execution.
239 Thread.currentThread().interrupt();
240 }
241 }
242
243 if (g.broken)
244 throw new BrokenBarrierException();
245
246 if (g != generation)
247 return index;
248
249 if (timed && nanos <= 0L) {
250 breakBarrier();
251 throw new TimeoutException();
252 }
253 }
254 } finally {
255 lock.unlock();
256 }
257 }
258
259 /**
260 * Creates a new <tt>CyclicBarrier</tt> that will trip when the
261 * given number of parties (threads) are waiting upon it, and which
262 * will execute the given barrier action when the barrier is tripped,
263 * performed by the last thread entering the barrier.
264 *
265 * @param parties the number of threads that must invoke {@link #await}
266 * before the barrier is tripped
267 * @param barrierAction the command to execute when the barrier is
268 * tripped, or {@code null} if there is no action
269 * @throws IllegalArgumentException if {@code parties} is less than 1
270 */
271 public CyclicBarrier(int parties, Runnable barrierAction) {
272 if (parties <= 0)
273 throw new IllegalArgumentException();
274 this .parties = parties;
275 this .count = parties;
276 this .barrierCommand = barrierAction;
277 }
278
279 /**
280 * Creates a new <tt>CyclicBarrier</tt> that will trip when the
281 * given number of parties (threads) are waiting upon it, and
282 * does not perform a predefined action when the barrier is tripped.
283 *
284 * @param parties the number of threads that must invoke {@link #await}
285 * before the barrier is tripped
286 * @throws IllegalArgumentException if {@code parties} is less than 1
287 */
288 public CyclicBarrier(int parties) {
289 this (parties, null);
290 }
291
292 /**
293 * Returns the number of parties required to trip this barrier.
294 *
295 * @return the number of parties required to trip this barrier
296 */
297 public int getParties() {
298 return parties;
299 }
300
301 /**
302 * Waits until all {@linkplain #getParties parties} have invoked
303 * <tt>await</tt> on this barrier.
304 *
305 * <p>If the current thread is not the last to arrive then it is
306 * disabled for thread scheduling purposes and lies dormant until
307 * one of the following things happens:
308 * <ul>
309 * <li>The last thread arrives; or
310 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
311 * the current thread; or
312 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
313 * one of the other waiting threads; or
314 * <li>Some other thread times out while waiting for barrier; or
315 * <li>Some other thread invokes {@link #reset} on this barrier.
316 * </ul>
317 *
318 * <p>If the current thread:
319 * <ul>
320 * <li>has its interrupted status set on entry to this method; or
321 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
322 * </ul>
323 * then {@link InterruptedException} is thrown and the current thread's
324 * interrupted status is cleared.
325 *
326 * <p>If the barrier is {@link #reset} while any thread is waiting,
327 * or if the barrier {@linkplain #isBroken is broken} when
328 * <tt>await</tt> is invoked, or while any thread is waiting, then
329 * {@link BrokenBarrierException} is thrown.
330 *
331 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
332 * then all other waiting threads will throw
333 * {@link BrokenBarrierException} and the barrier is placed in the broken
334 * state.
335 *
336 * <p>If the current thread is the last thread to arrive, and a
337 * non-null barrier action was supplied in the constructor, then the
338 * current thread runs the action before allowing the other threads to
339 * continue.
340 * If an exception occurs during the barrier action then that exception
341 * will be propagated in the current thread and the barrier is placed in
342 * the broken state.
343 *
344 * @return the arrival index of the current thread, where index
345 * <tt>{@link #getParties()} - 1</tt> indicates the first
346 * to arrive and zero indicates the last to arrive
347 * @throws InterruptedException if the current thread was interrupted
348 * while waiting
349 * @throws BrokenBarrierException if <em>another</em> thread was
350 * interrupted or timed out while the current thread was
351 * waiting, or the barrier was reset, or the barrier was
352 * broken when {@code await} was called, or the barrier
353 * action (if present) failed due an exception.
354 */
355 public int await() throws InterruptedException,
356 BrokenBarrierException {
357 try {
358 return dowait(false, 0L);
359 } catch (TimeoutException toe) {
360 throw new Error(toe); // cannot happen;
361 }
362 }
363
364 /**
365 * Waits until all {@linkplain #getParties parties} have invoked
366 * <tt>await</tt> on this barrier, or the specified waiting time elapses.
367 *
368 * <p>If the current thread is not the last to arrive then it is
369 * disabled for thread scheduling purposes and lies dormant until
370 * one of the following things happens:
371 * <ul>
372 * <li>The last thread arrives; or
373 * <li>The specified timeout elapses; or
374 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
375 * the current thread; or
376 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
377 * one of the other waiting threads; or
378 * <li>Some other thread times out while waiting for barrier; or
379 * <li>Some other thread invokes {@link #reset} on this barrier.
380 * </ul>
381 *
382 * <p>If the current thread:
383 * <ul>
384 * <li>has its interrupted status set on entry to this method; or
385 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
386 * </ul>
387 * then {@link InterruptedException} is thrown and the current thread's
388 * interrupted status is cleared.
389 *
390 * <p>If the specified waiting time elapses then {@link TimeoutException}
391 * is thrown. If the time is less than or equal to zero, the
392 * method will not wait at all.
393 *
394 * <p>If the barrier is {@link #reset} while any thread is waiting,
395 * or if the barrier {@linkplain #isBroken is broken} when
396 * <tt>await</tt> is invoked, or while any thread is waiting, then
397 * {@link BrokenBarrierException} is thrown.
398 *
399 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
400 * waiting, then all other waiting threads will throw {@link
401 * BrokenBarrierException} and the barrier is placed in the broken
402 * state.
403 *
404 * <p>If the current thread is the last thread to arrive, and a
405 * non-null barrier action was supplied in the constructor, then the
406 * current thread runs the action before allowing the other threads to
407 * continue.
408 * If an exception occurs during the barrier action then that exception
409 * will be propagated in the current thread and the barrier is placed in
410 * the broken state.
411 *
412 * @param timeout the time to wait for the barrier
413 * @param unit the time unit of the timeout parameter
414 * @return the arrival index of the current thread, where index
415 * <tt>{@link #getParties()} - 1</tt> indicates the first
416 * to arrive and zero indicates the last to arrive
417 * @throws InterruptedException if the current thread was interrupted
418 * while waiting
419 * @throws TimeoutException if the specified timeout elapses
420 * @throws BrokenBarrierException if <em>another</em> thread was
421 * interrupted or timed out while the current thread was
422 * waiting, or the barrier was reset, or the barrier was broken
423 * when {@code await} was called, or the barrier action (if
424 * present) failed due an exception
425 */
426 public int await(long timeout, TimeUnit unit)
427 throws InterruptedException, BrokenBarrierException,
428 TimeoutException {
429 return dowait(true, unit.toNanos(timeout));
430 }
431
432 /**
433 * Queries if this barrier is in a broken state.
434 *
435 * @return {@code true} if one or more parties broke out of this
436 * barrier due to interruption or timeout since
437 * construction or the last reset, or a barrier action
438 * failed due to an exception; {@code false} otherwise.
439 */
440 public boolean isBroken() {
441 final ReentrantLock lock = this .lock;
442 lock.lock();
443 try {
444 return generation.broken;
445 } finally {
446 lock.unlock();
447 }
448 }
449
450 /**
451 * Resets the barrier to its initial state. If any parties are
452 * currently waiting at the barrier, they will return with a
453 * {@link BrokenBarrierException}. Note that resets <em>after</em>
454 * a breakage has occurred for other reasons can be complicated to
455 * carry out; threads need to re-synchronize in some other way,
456 * and choose one to perform the reset. It may be preferable to
457 * instead create a new barrier for subsequent use.
458 */
459 public void reset() {
460 final ReentrantLock lock = this .lock;
461 lock.lock();
462 try {
463 breakBarrier(); // break the current generation
464 nextGeneration(); // start a new generation
465 } finally {
466 lock.unlock();
467 }
468 }
469
470 /**
471 * Returns the number of parties currently waiting at the barrier.
472 * This method is primarily useful for debugging and assertions.
473 *
474 * @return the number of parties currently blocked in {@link #await}
475 */
476 public int getNumberWaiting() {
477 final ReentrantLock lock = this.lock;
478 lock.lock();
479 try {
480 return parties - count;
481 } finally {
482 lock.unlock();
483 }
484 }
485 }
|