001: /*
002: File: Rendezvous.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: 30Jul1998 dl Minor code simplifications
013: */
014:
015: package EDU.oswego.cs.dl.util.concurrent;
016:
017: /**
018: * A rendezvous is a barrier that:
019: * <ul>
020: * <li> Unlike a CyclicBarrier, is not restricted to use
021: * with fixed-sized groups of threads.
022: * Any number of threads can attempt to enter a rendezvous,
023: * but only the predetermined number of parties enter
024: * and later become released from the rendezvous at any give time.
025: * <li> Enables each participating thread to exchange information
026: * with others at the rendezvous point. Each entering thread
027: * presents some object on entry to the rendezvous, and
028: * returns some object on release. The object returned is
029: * the result of a RendezvousFunction that is run once per
030: * rendezvous, (it is run by the last-entering thread). By
031: * default, the function applied is a rotation, so each
032: * thread returns the object given by the next (modulo parties)
033: * entering thread. This default function faciliates simple
034: * application of a common use of rendezvous, as exchangers.
035: * </ul>
036: * <p>
037: * Rendezvous use an all-or-none breakage model
038: * for failed synchronization attempts: If threads
039: * leave a rendezvous point prematurely because of timeout
040: * or interruption, others will also leave abnormally
041: * (via BrokenBarrierException), until
042: * the rendezvous is <code>restart</code>ed. This is usually
043: * the simplest and best strategy for sharing knowledge
044: * about failures among cooperating threads in the most
045: * common usages contexts of Rendezvous.
046: * <p>
047: * While any positive number (including 1) of parties can
048: * be handled, the most common case is to have two parties.
049: * <p>
050: * <b>Sample Usage</b><p>
051: * Here are the highlights of a class that uses a Rendezvous to
052: * swap buffers between threads so that the thread filling the
053: * buffer gets a freshly
054: * emptied one when it needs it, handing off the filled one to
055: * the thread emptying the buffer.
056: * <pre>
057: * class FillAndEmpty {
058: * Rendezvous exchanger = new Rendezvous(2);
059: * Buffer initialEmptyBuffer = ... a made-up type
060: * Buffer initialFullBuffer = ...
061: *
062: * class FillingLoop implements Runnable {
063: * public void run() {
064: * Buffer currentBuffer = initialEmptyBuffer;
065: * try {
066: * while (currentBuffer != null) {
067: * addToBuffer(currentBuffer);
068: * if (currentBuffer.full())
069: * currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer));
070: * }
071: * }
072: * catch (BrokenBarrierException ex) {
073: * return;
074: * }
075: * catch (InterruptedException ex) {
076: * Thread.currentThread().interrupt();
077: * }
078: * }
079: * }
080: *
081: * class EmptyingLoop implements Runnable {
082: * public void run() {
083: * Buffer currentBuffer = initialFullBuffer;
084: * try {
085: * while (currentBuffer != null) {
086: * takeFromBuffer(currentBuffer);
087: * if (currentBuffer.empty())
088: * currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer));
089: * }
090: * }
091: * catch (BrokenBarrierException ex) {
092: * return;
093: * }
094: * catch (InterruptedException ex) {
095: * Thread.currentThread().interrupt();
096: * }
097: * }
098: * }
099: *
100: * void start() {
101: * new Thread(new FillingLoop()).start();
102: * new Thread(new EmptyingLoop()).start();
103: * }
104: * }
105: * </pre>
106: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
107:
108: **/
109:
110: public class Rendezvous implements Barrier {
111:
112: /**
113: * Interface for functions run at rendezvous points
114: **/
115: public interface RendezvousFunction {
116: /**
117: * Perform some function on the objects presented at
118: * a rendezvous. The objects array holds all presented
119: * items; one per thread. Its length is the number of parties.
120: * The array is ordered by arrival into the rendezvous.
121: * So, the last element (at objects[objects.length-1])
122: * is guaranteed to have been presented by the thread performing
123: * this function. No identifying information is
124: * otherwise kept about which thread presented which item.
125: * If you need to
126: * trace origins, you will need to use an item type for rendezvous
127: * that includes identifying information. After return of this
128: * function, other threads are released, and each returns with
129: * the item with the same index as the one it presented.
130: **/
131: public void rendezvousFunction(Object[] objects);
132: }
133:
134: /**
135: * The default rendezvous function. Rotates the array
136: * so that each thread returns an item presented by some
137: * other thread (or itself, if parties is 1).
138: **/
139: public static class Rotator implements RendezvousFunction {
140: /** Rotate the array **/
141: public void rendezvousFunction(Object[] objects) {
142: int lastIdx = objects.length - 1;
143: Object first = objects[0];
144: for (int i = 0; i < lastIdx; ++i)
145: objects[i] = objects[i + 1];
146: objects[lastIdx] = first;
147: }
148: }
149:
150: protected final int parties_;
151:
152: protected boolean broken_ = false;
153:
154: /**
155: * Number of threads that have entered rendezvous
156: **/
157: protected int entries_ = 0;
158:
159: /**
160: * Number of threads that are permitted to depart rendezvous
161: **/
162: protected long departures_ = 0;
163:
164: /**
165: * Incoming threads pile up on entry until last set done.
166: **/
167: protected final Semaphore entryGate_;
168:
169: /**
170: * Temporary holder for items in exchange
171: **/
172: protected final Object[] slots_;
173:
174: /**
175: * The function to run at rendezvous point
176: **/
177:
178: protected RendezvousFunction rendezvousFunction_;
179:
180: /**
181: * Create a Barrier for the indicated number of parties,
182: * and the default Rotator function to run at each barrier point.
183: * @exception IllegalArgumentException if parties less than or equal to zero.
184: **/
185:
186: public Rendezvous(int parties) {
187: this (parties, new Rotator());
188: }
189:
190: /**
191: * Create a Barrier for the indicated number of parties.
192: * and the given function to run at each barrier point.
193: * @exception IllegalArgumentException if parties less than or equal to zero.
194: **/
195:
196: public Rendezvous(int parties, RendezvousFunction function) {
197: if (parties <= 0)
198: throw new IllegalArgumentException();
199: parties_ = parties;
200: rendezvousFunction_ = function;
201: entryGate_ = new WaiterPreferenceSemaphore(parties);
202: slots_ = new Object[parties];
203: }
204:
205: /**
206: * Set the function to call at the point at which all threads reach the
207: * rendezvous. This function is run exactly once, by the thread
208: * that trips the barrier. The function is not run if the barrier is
209: * broken.
210: * @param function the function to run. If null, no function is run.
211: * @return the previous function
212: **/
213:
214: public synchronized RendezvousFunction setRendezvousFunction(
215: RendezvousFunction function) {
216: RendezvousFunction old = rendezvousFunction_;
217: rendezvousFunction_ = function;
218: return old;
219: }
220:
221: public int parties() {
222: return parties_;
223: }
224:
225: public synchronized boolean broken() {
226: return broken_;
227: }
228:
229: /**
230: * Reset to initial state. Clears both the broken status
231: * and any record of waiting threads, and releases all
232: * currently waiting threads with indeterminate return status.
233: * This method is intended only for use in recovery actions
234: * in which it is somehow known
235: * that no thread could possibly be relying on the
236: * the synchronization properties of this barrier.
237: **/
238:
239: public void restart() {
240: // This is not very good, but probably the best that can be done
241: for (;;) {
242: synchronized (this ) {
243: if (entries_ != 0) {
244: notifyAll();
245: } else {
246: broken_ = false;
247: return;
248: }
249: }
250: Thread.yield();
251: }
252: }
253:
254: /**
255: * Enter a rendezvous; returning after all other parties arrive.
256: * @param x the item to present at rendezvous point.
257: * By default, this item is exchanged with another.
258: * @return an item x given by some thread, and/or processed
259: * by the rendezvousFunction.
260: * @exception BrokenBarrierException
261: * if any other thread
262: * in any previous or current barrier
263: * since either creation or the last <code>restart</code>
264: * operation left the barrier
265: * prematurely due to interruption or time-out. (If so,
266: * the <code>broken</code> status is also set.)
267: * Also returns as
268: * broken if the RendezvousFunction encountered a run-time exception.
269: * Threads that are noticed to have been
270: * interrupted <em>after</em> being released are not considered
271: * to have broken the barrier.
272: * In all cases, the interruption
273: * status of the current thread is preserved, so can be tested
274: * by checking <code>Thread.interrupted</code>.
275: * @exception InterruptedException if this thread was interrupted
276: * during the exchange. If so, <code>broken</code> status is also set.
277: **/
278:
279: public Object rendezvous(Object x) throws InterruptedException,
280: BrokenBarrierException {
281: return doRendezvous(x, false, 0);
282: }
283:
284: /**
285: * Wait msecs to complete a rendezvous.
286: * @param x the item to present at rendezvous point.
287: * By default, this item is exchanged with another.
288: * @param msecs The maximum time to wait.
289: * @return an item x given by some thread, and/or processed
290: * by the rendezvousFunction.
291: * @exception BrokenBarrierException
292: * if any other thread
293: * in any previous or current barrier
294: * since either creation or the last <code>restart</code>
295: * operation left the barrier
296: * prematurely due to interruption or time-out. (If so,
297: * the <code>broken</code> status is also set.)
298: * Also returns as
299: * broken if the RendezvousFunction encountered a run-time exception.
300: * Threads that are noticed to have been
301: * interrupted <em>after</em> being released are not considered
302: * to have broken the barrier.
303: * In all cases, the interruption
304: * status of the current thread is preserved, so can be tested
305: * by checking <code>Thread.interrupted</code>.
306: * @exception InterruptedException if this thread was interrupted
307: * during the exchange. If so, <code>broken</code> status is also set.
308: * @exception TimeoutException if this thread timed out waiting for
309: * the exchange. If the timeout occured while already in the
310: * exchange, <code>broken</code> status is also set.
311: **/
312:
313: public Object attemptRendezvous(Object x, long msecs)
314: throws InterruptedException, TimeoutException,
315: BrokenBarrierException {
316: return doRendezvous(x, true, msecs);
317: }
318:
319: protected Object doRendezvous(Object x, boolean timed, long msecs)
320: throws InterruptedException, TimeoutException,
321: BrokenBarrierException {
322:
323: // rely on semaphore to throw interrupt on entry
324:
325: long startTime;
326:
327: if (timed) {
328: startTime = System.currentTimeMillis();
329: if (!entryGate_.attempt(msecs)) {
330: throw new TimeoutException(msecs);
331: }
332: } else {
333: startTime = 0;
334: entryGate_.acquire();
335: }
336:
337: synchronized (this ) {
338:
339: Object y = null;
340:
341: int index = entries_++;
342: slots_[index] = x;
343:
344: try {
345: // last one in runs function and releases
346: if (entries_ == parties_) {
347:
348: departures_ = entries_;
349: notifyAll();
350:
351: try {
352: if (!broken_ && rendezvousFunction_ != null)
353: rendezvousFunction_
354: .rendezvousFunction(slots_);
355: } catch (RuntimeException ex) {
356: broken_ = true;
357: }
358:
359: }
360:
361: else {
362:
363: while (!broken_ && departures_ < 1) {
364: long timeLeft = 0;
365: if (timed) {
366: timeLeft = msecs
367: - (System.currentTimeMillis() - startTime);
368: if (timeLeft <= 0) {
369: broken_ = true;
370: departures_ = entries_;
371: notifyAll();
372: throw new TimeoutException(msecs);
373: }
374: }
375:
376: try {
377: wait(timeLeft);
378: } catch (InterruptedException ex) {
379: if (broken_ || departures_ > 0) { // interrupted after release
380: Thread.currentThread().interrupt();
381: break;
382: } else {
383: broken_ = true;
384: departures_ = entries_;
385: notifyAll();
386: throw ex;
387: }
388: }
389: }
390: }
391:
392: }
393:
394: finally {
395:
396: y = slots_[index];
397:
398: // Last one out cleans up and allows next set of threads in
399: if (--departures_ <= 0) {
400: for (int i = 0; i < slots_.length; ++i)
401: slots_[i] = null;
402: entryGate_.release(entries_);
403: entries_ = 0;
404: }
405: }
406:
407: // continue if no IE/TO throw
408: if (broken_)
409: throw new BrokenBarrierException(index);
410: else
411: return y;
412: }
413: }
414:
415: }
|