001: /*
002: File: SynchronousChannel.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: 17Jul1998 dl Disabled direct semaphore permit check
013: 31Jul1998 dl Replaced main algorithm with one with
014: better scaling and fairness properties.
015: 25aug1998 dl added peek
016: 24Nov2001 dl Replaced main algorithm with faster one.
017: */
018:
019: package EDU.oswego.cs.dl.util.concurrent;
020:
021: /**
022: * A rendezvous channel, similar to those used in CSP and Ada. Each
023: * put must wait for a take, and vice versa. Synchronous channels
024: * are well suited for handoff designs, in which an object running in
025: * one thread must synch up with an object running in another thread
026: * in order to hand it some information, event, or task.
027: * <p> If you only need threads to synch up without
028: * exchanging information, consider using a Barrier. If you need
029: * bidirectional exchanges, consider using a Rendezvous. <p>
030: *
031: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
032: * @see CyclicBarrier
033: * @see Rendezvous
034: **/
035:
036: public class SynchronousChannel implements BoundedChannel {
037:
038: /*
039: This implementation divides actions into two cases for puts:
040:
041: * An arriving putter that does not already have a waiting taker
042: creates a node holding item, and then waits for a taker to take it.
043: * An arriving putter that does already have a waiting taker fills
044: the slot node created by the taker, and notifies it to continue.
045:
046: And symmetrically, two for takes:
047:
048: * An arriving taker that does not already have a waiting putter
049: creates an empty slot node, and then waits for a putter to fill it.
050: * An arriving taker that does already have a waiting putter takes
051: item from the node created by the putter, and notifies it to continue.
052:
053: This requires keeping two simple queues: waitingPuts and waitingTakes.
054:
055: When a put or take waiting for the actions of its counterpart
056: aborts due to interruption or timeout, it marks the node
057: it created as "CANCELLED", which causes its counterpart to retry
058: the entire put or take sequence.
059: */
060:
061: /**
062: * Special marker used in queue nodes to indicate that
063: * the thread waiting for a change in the node has timed out
064: * or been interrupted.
065: **/
066: protected static final Object CANCELLED = new Object();
067:
068: /**
069: * Simple FIFO queue class to hold waiting puts/takes.
070: **/
071: protected static class Queue {
072: protected LinkedNode head;
073: protected LinkedNode last;
074:
075: protected void enq(LinkedNode p) {
076: if (last == null)
077: last = head = p;
078: else
079: last = last.next = p;
080: }
081:
082: protected LinkedNode deq() {
083: LinkedNode p = head;
084: if (p != null && (head = p.next) == null)
085: last = null;
086: return p;
087: }
088: }
089:
090: protected final Queue waitingPuts = new Queue();
091: protected final Queue waitingTakes = new Queue();
092:
093: /**
094: * @return zero --
095: * Synchronous channels have no internal capacity.
096: **/
097: public int capacity() {
098: return 0;
099: }
100:
101: /**
102: * @return null --
103: * Synchronous channels do not hold contents unless actively taken
104: **/
105: public Object peek() {
106: return null;
107: }
108:
109: public void put(Object x) throws InterruptedException {
110: if (x == null)
111: throw new IllegalArgumentException();
112:
113: // This code is conceptually straightforward, but messy
114: // because we need to intertwine handling of put-arrives first
115: // vs take-arrives first cases.
116:
117: // Outer loop is to handle retry due to cancelled waiting taker
118: for (;;) {
119:
120: // Get out now if we are interrupted
121: if (Thread.interrupted())
122: throw new InterruptedException();
123:
124: // Exactly one of item or slot will be nonnull at end of
125: // synchronized block, depending on whether a put or a take
126: // arrived first.
127: LinkedNode slot;
128: LinkedNode item = null;
129:
130: synchronized (this ) {
131: // Try to match up with a waiting taker; fill and signal it below
132: slot = waitingTakes.deq();
133:
134: // If no takers yet, create a node and wait below
135: if (slot == null)
136: waitingPuts.enq(item = new LinkedNode(x));
137: }
138:
139: if (slot != null) { // There is a waiting taker.
140: // Fill in the slot created by the taker and signal taker to
141: // continue.
142: synchronized (slot) {
143: if (slot.value != CANCELLED) {
144: slot.value = x;
145: slot.notify();
146: return;
147: }
148: // else the taker has cancelled, so retry outer loop
149: }
150: }
151:
152: else {
153: // Wait for a taker to arrive and take the item.
154: synchronized (item) {
155: try {
156: while (item.value != null)
157: item.wait();
158: return;
159: } catch (InterruptedException ie) {
160: // If item was taken, return normally but set interrupt status
161: if (item.value == null) {
162: Thread.currentThread().interrupt();
163: return;
164: } else {
165: item.value = CANCELLED;
166: throw ie;
167: }
168: }
169: }
170: }
171: }
172: }
173:
174: public Object take() throws InterruptedException {
175: // Entirely symmetric to put()
176:
177: for (;;) {
178: if (Thread.interrupted())
179: throw new InterruptedException();
180:
181: LinkedNode item;
182: LinkedNode slot = null;
183:
184: synchronized (this ) {
185: item = waitingPuts.deq();
186: if (item == null)
187: waitingTakes.enq(slot = new LinkedNode());
188: }
189:
190: if (item != null) {
191: synchronized (item) {
192: Object x = item.value;
193: if (x != CANCELLED) {
194: item.value = null;
195: item.next = null;
196: item.notify();
197: return x;
198: }
199: }
200: }
201:
202: else {
203: synchronized (slot) {
204: try {
205: for (;;) {
206: Object x = slot.value;
207: if (x != null) {
208: slot.value = null;
209: slot.next = null;
210: return x;
211: } else
212: slot.wait();
213: }
214: } catch (InterruptedException ie) {
215: Object x = slot.value;
216: if (x != null) {
217: slot.value = null;
218: slot.next = null;
219: Thread.currentThread().interrupt();
220: return x;
221: } else {
222: slot.value = CANCELLED;
223: throw ie;
224: }
225: }
226: }
227: }
228: }
229: }
230:
231: /*
232: Offer and poll are just like put and take, except even messier.
233: */
234:
235: public boolean offer(Object x, long msecs)
236: throws InterruptedException {
237: if (x == null)
238: throw new IllegalArgumentException();
239: long waitTime = msecs;
240: long startTime = 0; // lazily initialize below if needed
241:
242: for (;;) {
243: if (Thread.interrupted())
244: throw new InterruptedException();
245:
246: LinkedNode slot;
247: LinkedNode item = null;
248:
249: synchronized (this ) {
250: slot = waitingTakes.deq();
251: if (slot == null) {
252: if (waitTime <= 0)
253: return false;
254: else
255: waitingPuts.enq(item = new LinkedNode(x));
256: }
257: }
258:
259: if (slot != null) {
260: synchronized (slot) {
261: if (slot.value != CANCELLED) {
262: slot.value = x;
263: slot.notify();
264: return true;
265: }
266: }
267: }
268:
269: long now = System.currentTimeMillis();
270: if (startTime == 0)
271: startTime = now;
272: else
273: waitTime = msecs - (now - startTime);
274:
275: if (item != null) {
276: synchronized (item) {
277: try {
278: for (;;) {
279: if (item.value == null)
280: return true;
281: if (waitTime <= 0) {
282: item.value = CANCELLED;
283: return false;
284: }
285: item.wait(waitTime);
286: waitTime = msecs
287: - (System.currentTimeMillis() - startTime);
288: }
289: } catch (InterruptedException ie) {
290: if (item.value == null) {
291: Thread.currentThread().interrupt();
292: return true;
293: } else {
294: item.value = CANCELLED;
295: throw ie;
296: }
297: }
298: }
299: }
300: }
301: }
302:
303: public Object poll(long msecs) throws InterruptedException {
304: long waitTime = msecs;
305: long startTime = 0;
306:
307: for (;;) {
308: if (Thread.interrupted())
309: throw new InterruptedException();
310:
311: LinkedNode item;
312: LinkedNode slot = null;
313:
314: synchronized (this ) {
315: item = waitingPuts.deq();
316: if (item == null) {
317: if (waitTime <= 0)
318: return null;
319: else
320: waitingTakes.enq(slot = new LinkedNode());
321: }
322: }
323:
324: if (item != null) {
325: synchronized (item) {
326: Object x = item.value;
327: if (x != CANCELLED) {
328: item.value = null;
329: item.next = null;
330: item.notify();
331: return x;
332: }
333: }
334: }
335:
336: long now = System.currentTimeMillis();
337: if (startTime == 0)
338: startTime = now;
339: else
340: waitTime = msecs - (now - startTime);
341:
342: if (slot != null) {
343: synchronized (slot) {
344: try {
345: for (;;) {
346: Object x = slot.value;
347: if (x != null) {
348: slot.value = null;
349: slot.next = null;
350: return x;
351: }
352: if (waitTime <= 0) {
353: slot.value = CANCELLED;
354: return null;
355: }
356: slot.wait(waitTime);
357: waitTime = msecs
358: - (System.currentTimeMillis() - startTime);
359: }
360: } catch (InterruptedException ie) {
361: Object x = slot.value;
362: if (x != null) {
363: slot.value = null;
364: slot.next = null;
365: Thread.currentThread().interrupt();
366: return x;
367: } else {
368: slot.value = CANCELLED;
369: throw ie;
370: }
371: }
372: }
373: }
374: }
375: }
376:
377: }
|