001: /*
002: File: CyclicBarrier.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jul1998 dl Create public version
012: 28Aug1998 dl minor code simplification
013: */
014:
015: package EDU.oswego.cs.dl.util.concurrent;
016:
017: /**
018: * A cyclic barrier is a reasonable choice for a barrier in contexts
019: * involving a fixed sized group of threads that
020: * must occasionally wait for each other.
021: * (A Rendezvous better handles applications in which
022: * any number of threads meet, n-at-a-time.)
023: * <p>
024: * CyclicBarriers use an all-or-none breakage model
025: * for failed synchronization attempts: If threads
026: * leave a barrier point prematurely because of timeout
027: * or interruption, others will also leave abnormally
028: * (via BrokenBarrierException), until
029: * the barrier is <code>restart</code>ed. This is usually
030: * the simplest and best strategy for sharing knowledge
031: * about failures among cooperating threads in the most
032: * common usages contexts of Barriers.
033: * This implementation has the property that interruptions
034: * among newly arriving threads can cause as-yet-unresumed
035: * threads from a previous barrier cycle to return out
036: * as broken. This transmits breakage
037: * as early as possible, but with the possible byproduct that
038: * only some threads returning out of a barrier will realize
039: * that it is newly broken. (Others will not realize this until a
040: * future cycle.) (The Rendezvous class has a more uniform, but
041: * sometimes less desirable policy.)
042: * <p>
043: * Barriers support an optional Runnable command
044: * that is run once per barrier point.
045: * <p>
046: * <b>Sample usage</b> Here is a code sketch of
047: * a barrier in a parallel decomposition design.
048: * <pre>
049: * class Solver {
050: * final int N;
051: * final float[][] data;
052: * final CyclicBarrier barrier;
053: *
054: * class Worker implements Runnable {
055: * int myRow;
056: * Worker(int row) { myRow = row; }
057: * public void run() {
058: * while (!done()) {
059: * processRow(myRow);
060: *
061: * try {
062: * barrier.barrier();
063: * }
064: * catch (InterruptedException ex) { return; }
065: * catch (BrokenBarrierException ex) { return; }
066: * }
067: * }
068: * }
069: *
070: * public Solver(float[][] matrix) {
071: * data = matrix;
072: * N = matrix.length;
073: * barrier = new CyclicBarrier(N);
074: * barrier.setBarrierCommand(new Runnable() {
075: * public void run() { mergeRows(...); }
076: * });
077: * for (int i = 0; i < N; ++i) {
078: * new Thread(new Worker(i)).start();
079: * waitUntilDone();
080: * }
081: * }
082: * </pre>
083: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
084:
085: **/
086: public class CyclicBarrier implements Barrier {
087:
088: protected final int parties_;
089: protected boolean broken_ = false;
090: protected Runnable barrierCommand_ = null;
091: protected int count_; // number of parties still waiting
092: protected int resets_ = 0; // incremented on each release
093:
094: /**
095: * Create a CyclicBarrier for the indicated number of parties,
096: * and no command to run at each barrier.
097: * @exception IllegalArgumentException if parties less than or equal to zero.
098: **/
099:
100: public CyclicBarrier(int parties) {
101: this (parties, null);
102: }
103:
104: /**
105: * Create a CyclicBarrier for the indicated number of parties.
106: * and the given command to run at each barrier point.
107: * @exception IllegalArgumentException if parties less than or equal to zero.
108: **/
109:
110: public CyclicBarrier(int parties, Runnable command) {
111: if (parties <= 0)
112: throw new IllegalArgumentException();
113: parties_ = parties;
114: count_ = parties;
115: barrierCommand_ = command;
116: }
117:
118: /**
119: * Set the command to run at the point at which all threads reach the
120: * barrier. This command is run exactly once, by the thread
121: * that trips the barrier. The command is not run if the barrier is
122: * broken.
123: * @param command the command to run. If null, no command is run.
124: * @return the previous command
125: **/
126:
127: public synchronized Runnable setBarrierCommand(Runnable command) {
128: Runnable old = barrierCommand_;
129: barrierCommand_ = command;
130: return old;
131: }
132:
133: public synchronized boolean broken() {
134: return broken_;
135: }
136:
137: /**
138: * Reset to initial state. Clears both the broken status
139: * and any record of waiting threads, and releases all
140: * currently waiting threads with indeterminate return status.
141: * This method is intended only for use in recovery actions
142: * in which it is somehow known
143: * that no thread could possibly be relying on the
144: * the synchronization properties of this barrier.
145: **/
146:
147: public synchronized void restart() {
148: broken_ = false;
149: ++resets_;
150: count_ = parties_;
151: notifyAll();
152: }
153:
154: public int parties() {
155: return parties_;
156: }
157:
158: /**
159: * Enter barrier and wait for the other parties()-1 threads.
160: * @return the arrival index: the number of other parties
161: * that were still waiting
162: * upon entry. This is a unique value from zero to parties()-1.
163: * If it is zero, then the current
164: * thread was the last party to hit barrier point
165: * and so was responsible for releasing the others.
166: * @exception BrokenBarrierException if any other thread
167: * in any previous or current barrier
168: * since either creation or the last <code>restart</code>
169: * operation left the barrier
170: * prematurely due to interruption or time-out. (If so,
171: * the <code>broken</code> status is also set.)
172: * Threads that are noticied to have been
173: * interrupted <em>after</em> being released are not considered
174: * to have broken the barrier.
175: * In all cases, the interruption
176: * status of the current thread is preserved, so can be tested
177: * by checking <code>Thread.interrupted</code>.
178: * @exception InterruptedException if this thread was interrupted
179: * during the barrier, and was the one causing breakage.
180: * If so, <code>broken</code> status is also set.
181: **/
182:
183: public int barrier() throws InterruptedException,
184: BrokenBarrierException {
185: return doBarrier(false, 0);
186: }
187:
188: /**
189: * Enter barrier and wait at most msecs for the other parties()-1 threads.
190: * @return if not timed out, the arrival index: the number of other parties
191: * that were still waiting
192: * upon entry. This is a unique value from zero to parties()-1.
193: * If it is zero, then the current
194: * thread was the last party to hit barrier point
195: * and so was responsible for releasing the others.
196: * @exception BrokenBarrierException
197: * if any other thread
198: * in any previous or current barrier
199: * since either creation or the last <code>restart</code>
200: * operation left the barrier
201: * prematurely due to interruption or time-out. (If so,
202: * the <code>broken</code> status is also set.)
203: * Threads that are noticed to have been
204: * interrupted <em>after</em> being released are not considered
205: * to have broken the barrier.
206: * In all cases, the interruption
207: * status of the current thread is preserved, so can be tested
208: * by checking <code>Thread.interrupted</code>.
209: * @exception InterruptedException if this thread was interrupted
210: * during the barrier. If so, <code>broken</code> status is also set.
211: * @exception TimeoutException if this thread timed out waiting for
212: * the barrier. If the timeout occured while already in the
213: * barrier, <code>broken</code> status is also set.
214: **/
215:
216: public int attemptBarrier(long msecs) throws InterruptedException,
217: TimeoutException, BrokenBarrierException {
218: return doBarrier(true, msecs);
219: }
220:
221: protected synchronized int doBarrier(boolean timed, long msecs)
222: throws InterruptedException, TimeoutException,
223: BrokenBarrierException {
224:
225: int index = --count_;
226:
227: if (broken_) {
228: throw new BrokenBarrierException(index);
229: } else if (Thread.interrupted()) {
230: broken_ = true;
231: notifyAll();
232: throw new InterruptedException();
233: } else if (index == 0) { // tripped
234: count_ = parties_;
235: ++resets_;
236: notifyAll();
237: try {
238: if (barrierCommand_ != null)
239: barrierCommand_.run();
240: return 0;
241: } catch (RuntimeException ex) {
242: broken_ = true;
243: return 0;
244: }
245: } else if (timed && msecs <= 0) {
246: broken_ = true;
247: notifyAll();
248: throw new TimeoutException(msecs);
249: } else { // wait until next reset
250: int r = resets_;
251: long startTime = (timed) ? System.currentTimeMillis() : 0;
252: long waitTime = msecs;
253: for (;;) {
254: try {
255: wait(waitTime);
256: } catch (InterruptedException ex) {
257: // Only claim that broken if interrupted before reset
258: if (resets_ == r) {
259: broken_ = true;
260: notifyAll();
261: throw ex;
262: } else {
263: Thread.currentThread().interrupt(); // propagate
264: }
265: }
266:
267: if (broken_)
268: throw new BrokenBarrierException(index);
269:
270: else if (r != resets_)
271: return index;
272:
273: else if (timed) {
274: waitTime = msecs
275: - (System.currentTimeMillis() - startTime);
276: if (waitTime <= 0) {
277: broken_ = true;
278: notifyAll();
279: throw new TimeoutException(msecs);
280: }
281: }
282: }
283: }
284: }
285:
286: }
|