001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package java.util.concurrent;
008:
009: import java.util.concurrent.locks.*;
010:
011: /**
012: * A synchronization point at which two threads can exchange objects.
013: * Each thread presents some object on entry to the {@link #exchange
014: * exchange} method, and receives the object presented by the other
015: * thread on return.
016: *
017: * <p><b>Sample Usage:</b>
018: * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
019: * swap buffers between threads so that the thread filling the
020: * buffer gets a freshly
021: * emptied one when it needs it, handing off the filled one to
022: * the thread emptying the buffer.
023: * <pre>
024: * class FillAndEmpty {
025: * Exchanger<DataBuffer> exchanger = new Exchanger();
026: * DataBuffer initialEmptyBuffer = ... a made-up type
027: * DataBuffer initialFullBuffer = ...
028: *
029: * class FillingLoop implements Runnable {
030: * public void run() {
031: * DataBuffer currentBuffer = initialEmptyBuffer;
032: * try {
033: * while (currentBuffer != null) {
034: * addToBuffer(currentBuffer);
035: * if (currentBuffer.full())
036: * currentBuffer = exchanger.exchange(currentBuffer);
037: * }
038: * } catch (InterruptedException ex) { ... handle ... }
039: * }
040: * }
041: *
042: * class EmptyingLoop implements Runnable {
043: * public void run() {
044: * DataBuffer currentBuffer = initialFullBuffer;
045: * try {
046: * while (currentBuffer != null) {
047: * takeFromBuffer(currentBuffer);
048: * if (currentBuffer.empty())
049: * currentBuffer = exchanger.exchange(currentBuffer);
050: * }
051: * } catch (InterruptedException ex) { ... handle ...}
052: * }
053: * }
054: *
055: * void start() {
056: * new Thread(new FillingLoop()).start();
057: * new Thread(new EmptyingLoop()).start();
058: * }
059: * }
060: * </pre>
061: *
062: * @since 1.5
063: * @author Doug Lea
064: * @param <V> The type of objects that may be exchanged
065: */
066: public class Exchanger<V> {
067: private final ReentrantLock lock = new ReentrantLock();
068: private final Condition taken = lock.newCondition();
069:
070: /** Holder for the item being exchanged */
071: private V item;
072:
073: /**
074: * Arrival count transitions from 0 to 1 to 2 then back to 0
075: * during an exchange.
076: */
077: private int arrivalCount;
078:
079: /**
080: * Main exchange function, handling the different policy variants.
081: */
082: private V doExchange(V x, boolean timed, long nanos)
083: throws InterruptedException, TimeoutException {
084: lock.lock();
085: try {
086: V other;
087:
088: // If arrival count already at two, we must wait for
089: // a previous pair to finish and reset the count;
090: while (arrivalCount == 2) {
091: if (!timed)
092: taken.await();
093: else if (nanos > 0)
094: nanos = taken.awaitNanos(nanos);
095: else
096: throw new TimeoutException();
097: }
098:
099: int count = ++arrivalCount;
100:
101: // If item is already waiting, replace it and signal other thread
102: if (count == 2) {
103: other = item;
104: item = x;
105: taken.signal();
106: return other;
107: }
108:
109: // Otherwise, set item and wait for another thread to
110: // replace it and signal us.
111:
112: item = x;
113: InterruptedException interrupted = null;
114: try {
115: while (arrivalCount != 2) {
116: if (!timed)
117: taken.await();
118: else if (nanos > 0)
119: nanos = taken.awaitNanos(nanos);
120: else
121: break; // timed out
122: }
123: } catch (InterruptedException ie) {
124: interrupted = ie;
125: }
126:
127: // Get and reset item and count after the wait.
128: // (We need to do this even if wait was aborted.)
129: other = item;
130: item = null;
131: count = arrivalCount;
132: arrivalCount = 0;
133: taken.signal();
134:
135: // If the other thread replaced item, then we must
136: // continue even if cancelled.
137: if (count == 2) {
138: if (interrupted != null)
139: Thread.currentThread().interrupt();
140: return other;
141: }
142:
143: // If no one is waiting for us, we can back out
144: if (interrupted != null)
145: throw interrupted;
146: else
147: // must be timeout
148: throw new TimeoutException();
149: } finally {
150: lock.unlock();
151: }
152: }
153:
154: /**
155: * Create a new Exchanger.
156: **/
157: public Exchanger() {
158: }
159:
160: /**
161: * Waits for another thread to arrive at this exchange point (unless
162: * it is {@link Thread#interrupt interrupted}),
163: * and then transfers the given object to it, receiving its object
164: * in return.
165: * <p>If another thread is already waiting at the exchange point then
166: * it is resumed for thread scheduling purposes and receives the object
167: * passed in by the current thread. The current thread returns immediately,
168: * receiving the object passed to the exchange by that other thread.
169: * <p>If no other thread is already waiting at the exchange then the
170: * current thread is disabled for thread scheduling purposes and lies
171: * dormant until one of two things happens:
172: * <ul>
173: * <li>Some other thread enters the exchange; or
174: * <li>Some other thread {@link Thread#interrupt interrupts} the current
175: * thread.
176: * </ul>
177: * <p>If the current thread:
178: * <ul>
179: * <li>has its interrupted status set on entry to this method; or
180: * <li>is {@link Thread#interrupt interrupted} while waiting
181: * for the exchange,
182: * </ul>
183: * then {@link InterruptedException} is thrown and the current thread's
184: * interrupted status is cleared.
185: *
186: * @param x the object to exchange
187: * @return the object provided by the other thread.
188: * @throws InterruptedException if current thread was interrupted
189: * while waiting
190: **/
191: public V exchange(V x) throws InterruptedException {
192: try {
193: return doExchange(x, false, 0);
194: } catch (TimeoutException cannotHappen) {
195: throw new Error(cannotHappen);
196: }
197: }
198:
199: /**
200: * Waits for another thread to arrive at this exchange point (unless
201: * it is {@link Thread#interrupt interrupted}, or the specified waiting
202: * time elapses),
203: * and then transfers the given object to it, receiving its object
204: * in return.
205: *
206: * <p>If another thread is already waiting at the exchange point then
207: * it is resumed for thread scheduling purposes and receives the object
208: * passed in by the current thread. The current thread returns immediately,
209: * receiving the object passed to the exchange by that other thread.
210: *
211: * <p>If no other thread is already waiting at the exchange then the
212: * current thread is disabled for thread scheduling purposes and lies
213: * dormant until one of three things happens:
214: * <ul>
215: * <li>Some other thread enters the exchange; or
216: * <li>Some other thread {@link Thread#interrupt interrupts} the current
217: * thread; or
218: * <li>The specified waiting time elapses.
219: * </ul>
220: * <p>If the current thread:
221: * <ul>
222: * <li>has its interrupted status set on entry to this method; or
223: * <li>is {@link Thread#interrupt interrupted} while waiting
224: * for the exchange,
225: * </ul>
226: * then {@link InterruptedException} is thrown and the current thread's
227: * interrupted status is cleared.
228: *
229: * <p>If the specified waiting time elapses then {@link TimeoutException}
230: * is thrown.
231: * If the time is
232: * less than or equal to zero, the method will not wait at all.
233: *
234: * @param x the object to exchange
235: * @param timeout the maximum time to wait
236: * @param unit the time unit of the <tt>timeout</tt> argument.
237: * @return the object provided by the other thread.
238: * @throws InterruptedException if current thread was interrupted
239: * while waiting
240: * @throws TimeoutException if the specified waiting time elapses before
241: * another thread enters the exchange.
242: **/
243: public V exchange(V x, long timeout, TimeUnit unit)
244: throws InterruptedException, TimeoutException {
245: return doExchange(x, true, unit.toNanos(timeout));
246: }
247:
248: }
|