001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package java.util.concurrent;
008:
009: import java.util.concurrent.locks.*;
010: import java.util.*;
011:
012: /**
013: * A {@linkplain BlockingQueue blocking queue} in which each
014: * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
015: * synchronous queue does not have any internal capacity, not even a
016: * capacity of one. You cannot <tt>peek</tt> at a synchronous queue
017: * because an element is only present when you try to take it; you
018: * cannot add an element (using any method) unless another thread is
019: * trying to remove it; you cannot iterate as there is nothing to
020: * iterate. The <em>head</em> of the queue is the element that the
021: * first queued thread is trying to add to the queue; if there are no
022: * queued threads then no element is being added and the head is
023: * <tt>null</tt>. For purposes of other <tt>Collection</tt> methods
024: * (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts
025: * as an empty collection. This queue does not permit <tt>null</tt>
026: * elements.
027: *
028: * <p>Synchronous queues are similar to rendezvous channels used in
029: * CSP and Ada. They are well suited for handoff designs, in which an
030: * object running in one thread must sync up with an object running
031: * in another thread in order to hand it some information, event, or
032: * task.
033: *
034: * <p> This class supports an optional fairness policy for ordering
035: * waiting producer and consumer threads. By default, this ordering
036: * is not guaranteed. However, a queue constructed with fairness set
037: * to <tt>true</tt> grants threads access in FIFO order. Fairness
038: * generally decreases throughput but reduces variability and avoids
039: * starvation.
040: *
041: * <p>This class implements all of the <em>optional</em> methods
042: * of the {@link Collection} and {@link Iterator} interfaces.
043: *
044: * <p>This class is a member of the
045: * <a href="{@docRoot}/../guide/collections/index.html">
046: * Java Collections Framework</a>.
047: *
048: * @since 1.5
049: * @author Doug Lea
050: * @param <E> the type of elements held in this collection
051: */
052: public class SynchronousQueue<E> extends AbstractQueue<E> implements
053: BlockingQueue<E>, java.io.Serializable {
054: private static final long serialVersionUID = -3223113410248163686L;
055:
056: /*
057: This implementation divides actions into two cases for puts:
058:
059: * An arriving producer that does not already have a waiting consumer
060: creates a node holding item, and then waits for a consumer to take it.
061: * An arriving producer that does already have a waiting consumer fills
062: the slot node created by the consumer, and notifies it to continue.
063:
064: And symmetrically, two for takes:
065:
066: * An arriving consumer that does not already have a waiting producer
067: creates an empty slot node, and then waits for a producer to fill it.
068: * An arriving consumer that does already have a waiting producer takes
069: item from the node created by the producer, and notifies it to continue.
070:
071: When a put or take waiting for the actions of its counterpart
072: aborts due to interruption or timeout, it marks the node
073: it created as "CANCELLED", which causes its counterpart to retry
074: the entire put or take sequence.
075:
076: This requires keeping two simple queues, waitingProducers and
077: waitingConsumers. Each of these can be FIFO (preserves fairness)
078: or LIFO (improves throughput).
079: */
080:
081: /** Lock protecting both wait queues */
082: private final ReentrantLock qlock;
083: /** Queue holding waiting puts */
084: private final WaitQueue waitingProducers;
085: /** Queue holding waiting takes */
086: private final WaitQueue waitingConsumers;
087:
088: /**
089: * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
090: */
091: public SynchronousQueue() {
092: this (false);
093: }
094:
095: /**
096: * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
097: * @param fair if true, threads contend in FIFO order for access;
098: * otherwise the order is unspecified.
099: */
100: public SynchronousQueue(boolean fair) {
101: if (fair) {
102: qlock = new ReentrantLock(true);
103: waitingProducers = new FifoWaitQueue();
104: waitingConsumers = new FifoWaitQueue();
105: } else {
106: qlock = new ReentrantLock();
107: waitingProducers = new LifoWaitQueue();
108: waitingConsumers = new LifoWaitQueue();
109: }
110: }
111:
112: /**
113: * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
114: * These queues have all transient fields, but are serializable
115: * in order to recover fairness settings when deserialized.
116: */
117: static abstract class WaitQueue implements java.io.Serializable {
118: /** Create, add, and return node for x */
119: abstract Node enq(Object x);
120:
121: /** Remove and return node, or null if empty */
122: abstract Node deq();
123: }
124:
125: /**
126: * FIFO queue to hold waiting puts/takes.
127: */
128: static final class FifoWaitQueue extends WaitQueue implements
129: java.io.Serializable {
130: private static final long serialVersionUID = -3623113410248163686L;
131: private transient Node head;
132: private transient Node last;
133:
134: Node enq(Object x) {
135: Node p = new Node(x);
136: if (last == null)
137: last = head = p;
138: else
139: last = last.next = p;
140: return p;
141: }
142:
143: Node deq() {
144: Node p = head;
145: if (p != null) {
146: if ((head = p.next) == null)
147: last = null;
148: p.next = null;
149: }
150: return p;
151: }
152: }
153:
154: /**
155: * LIFO queue to hold waiting puts/takes.
156: */
157: static final class LifoWaitQueue extends WaitQueue implements
158: java.io.Serializable {
159: private static final long serialVersionUID = -3633113410248163686L;
160: private transient Node head;
161:
162: Node enq(Object x) {
163: return head = new Node(x, head);
164: }
165:
166: Node deq() {
167: Node p = head;
168: if (p != null) {
169: head = p.next;
170: p.next = null;
171: }
172: return p;
173: }
174: }
175:
176: /**
177: * Nodes each maintain an item and handle waits and signals for
178: * getting and setting it. The class extends
179: * AbstractQueuedSynchronizer to manage blocking, using AQS state
180: * 0 for waiting, 1 for ack, -1 for cancelled.
181: */
182: static final class Node extends AbstractQueuedSynchronizer {
183: /** Synchronization state value representing that node acked */
184: private static final int ACK = 1;
185: /** Synchronization state value representing that node cancelled */
186: private static final int CANCEL = -1;
187:
188: /** The item being transferred */
189: Object item;
190: /** Next node in wait queue */
191: Node next;
192:
193: /** Creates a node with initial item */
194: Node(Object x) {
195: item = x;
196: }
197:
198: /** Creates a node with initial item and next */
199: Node(Object x, Node n) {
200: item = x;
201: next = n;
202: }
203:
204: /**
205: * Implements AQS base acquire to succeed if not in WAITING state
206: */
207: protected boolean tryAcquire(int ignore) {
208: return getState() != 0;
209: }
210:
211: /**
212: * Implements AQS base release to signal if state changed
213: */
214: protected boolean tryRelease(int newState) {
215: return compareAndSetState(0, newState);
216: }
217:
218: /**
219: * Takes item and nulls out field (for sake of GC)
220: */
221: private Object extract() {
222: Object x = item;
223: item = null;
224: return x;
225: }
226:
227: /**
228: * Tries to cancel on interrupt; if so rethrowing,
229: * else setting interrupt state
230: */
231: private void checkCancellationOnInterrupt(
232: InterruptedException ie) throws InterruptedException {
233: if (release(CANCEL))
234: throw ie;
235: Thread.currentThread().interrupt();
236: }
237:
238: /**
239: * Fills in the slot created by the consumer and signal consumer to
240: * continue.
241: */
242: boolean setItem(Object x) {
243: item = x; // can place in slot even if cancelled
244: return release(ACK);
245: }
246:
247: /**
248: * Removes item from slot created by producer and signal producer
249: * to continue.
250: */
251: Object getItem() {
252: return (release(ACK)) ? extract() : null;
253: }
254:
255: /**
256: * Waits for a consumer to take item placed by producer.
257: */
258: void waitForTake() throws InterruptedException {
259: try {
260: acquireInterruptibly(0);
261: } catch (InterruptedException ie) {
262: checkCancellationOnInterrupt(ie);
263: }
264: }
265:
266: /**
267: * Waits for a producer to put item placed by consumer.
268: */
269: Object waitForPut() throws InterruptedException {
270: try {
271: acquireInterruptibly(0);
272: } catch (InterruptedException ie) {
273: checkCancellationOnInterrupt(ie);
274: }
275: return extract();
276: }
277:
278: /**
279: * Waits for a consumer to take item placed by producer or time out.
280: */
281: boolean waitForTake(long nanos) throws InterruptedException {
282: try {
283: if (!tryAcquireNanos(0, nanos) && release(CANCEL))
284: return false;
285: } catch (InterruptedException ie) {
286: checkCancellationOnInterrupt(ie);
287: }
288: return true;
289: }
290:
291: /**
292: * Waits for a producer to put item placed by consumer, or time out.
293: */
294: Object waitForPut(long nanos) throws InterruptedException {
295: try {
296: if (!tryAcquireNanos(0, nanos) && release(CANCEL))
297: return null;
298: } catch (InterruptedException ie) {
299: checkCancellationOnInterrupt(ie);
300: }
301: return extract();
302: }
303: }
304:
305: /**
306: * Adds the specified element to this queue, waiting if necessary for
307: * another thread to receive it.
308: * @param o the element to add
309: * @throws InterruptedException if interrupted while waiting.
310: * @throws NullPointerException if the specified element is <tt>null</tt>.
311: */
312: public void put(E o) throws InterruptedException {
313: if (o == null)
314: throw new NullPointerException();
315: final ReentrantLock qlock = this .qlock;
316:
317: for (;;) {
318: Node node;
319: boolean mustWait;
320: if (Thread.interrupted())
321: throw new InterruptedException();
322: qlock.lock();
323: try {
324: node = waitingConsumers.deq();
325: if ((mustWait = (node == null)))
326: node = waitingProducers.enq(o);
327: } finally {
328: qlock.unlock();
329: }
330:
331: if (mustWait) {
332: node.waitForTake();
333: return;
334: }
335:
336: else if (node.setItem(o))
337: return;
338:
339: // else consumer cancelled, so retry
340: }
341: }
342:
343: /**
344: * Inserts the specified element into this queue, waiting if necessary
345: * up to the specified wait time for another thread to receive it.
346: * @param o the element to add
347: * @param timeout how long to wait before giving up, in units of
348: * <tt>unit</tt>
349: * @param unit a <tt>TimeUnit</tt> determining how to interpret the
350: * <tt>timeout</tt> parameter
351: * @return <tt>true</tt> if successful, or <tt>false</tt> if
352: * the specified waiting time elapses before a consumer appears.
353: * @throws InterruptedException if interrupted while waiting.
354: * @throws NullPointerException if the specified element is <tt>null</tt>.
355: */
356: public boolean offer(E o, long timeout, TimeUnit unit)
357: throws InterruptedException {
358: if (o == null)
359: throw new NullPointerException();
360: long nanos = unit.toNanos(timeout);
361: final ReentrantLock qlock = this .qlock;
362: for (;;) {
363: Node node;
364: boolean mustWait;
365: if (Thread.interrupted())
366: throw new InterruptedException();
367: qlock.lock();
368: try {
369: node = waitingConsumers.deq();
370: if ((mustWait = (node == null)))
371: node = waitingProducers.enq(o);
372: } finally {
373: qlock.unlock();
374: }
375:
376: if (mustWait)
377: return node.waitForTake(nanos);
378:
379: else if (node.setItem(o))
380: return true;
381:
382: // else consumer cancelled, so retry
383: }
384: }
385:
386: /**
387: * Retrieves and removes the head of this queue, waiting if necessary
388: * for another thread to insert it.
389: * @throws InterruptedException if interrupted while waiting.
390: * @return the head of this queue
391: */
392: public E take() throws InterruptedException {
393: final ReentrantLock qlock = this .qlock;
394: for (;;) {
395: Node node;
396: boolean mustWait;
397:
398: if (Thread.interrupted())
399: throw new InterruptedException();
400: qlock.lock();
401: try {
402: node = waitingProducers.deq();
403: if ((mustWait = (node == null)))
404: node = waitingConsumers.enq(null);
405: } finally {
406: qlock.unlock();
407: }
408:
409: if (mustWait) {
410: Object x = node.waitForPut();
411: return (E) x;
412: } else {
413: Object x = node.getItem();
414: if (x != null)
415: return (E) x;
416: // else cancelled, so retry
417: }
418: }
419: }
420:
421: /**
422: * Retrieves and removes the head of this queue, waiting
423: * if necessary up to the specified wait time, for another thread
424: * to insert it.
425: * @param timeout how long to wait before giving up, in units of
426: * <tt>unit</tt>
427: * @param unit a <tt>TimeUnit</tt> determining how to interpret the
428: * <tt>timeout</tt> parameter
429: * @return the head of this queue, or <tt>null</tt> if the
430: * specified waiting time elapses before an element is present.
431: * @throws InterruptedException if interrupted while waiting.
432: */
433: public E poll(long timeout, TimeUnit unit)
434: throws InterruptedException {
435: long nanos = unit.toNanos(timeout);
436: final ReentrantLock qlock = this .qlock;
437:
438: for (;;) {
439: Node node;
440: boolean mustWait;
441:
442: if (Thread.interrupted())
443: throw new InterruptedException();
444: qlock.lock();
445: try {
446: node = waitingProducers.deq();
447: if ((mustWait = (node == null)))
448: node = waitingConsumers.enq(null);
449: } finally {
450: qlock.unlock();
451: }
452:
453: if (mustWait) {
454: Object x = node.waitForPut(nanos);
455: return (E) x;
456: } else {
457: Object x = node.getItem();
458: if (x != null)
459: return (E) x;
460: // else cancelled, so retry
461: }
462: }
463: }
464:
465: // Untimed nonblocking versions
466:
467: /**
468: * Inserts the specified element into this queue, if another thread is
469: * waiting to receive it.
470: *
471: * @param o the element to add.
472: * @return <tt>true</tt> if it was possible to add the element to
473: * this queue, else <tt>false</tt>
474: * @throws NullPointerException if the specified element is <tt>null</tt>
475: */
476: public boolean offer(E o) {
477: if (o == null)
478: throw new NullPointerException();
479: final ReentrantLock qlock = this .qlock;
480:
481: for (;;) {
482: Node node;
483: qlock.lock();
484: try {
485: node = waitingConsumers.deq();
486: } finally {
487: qlock.unlock();
488: }
489: if (node == null)
490: return false;
491:
492: else if (node.setItem(o))
493: return true;
494: // else retry
495: }
496: }
497:
498: /**
499: * Retrieves and removes the head of this queue, if another thread
500: * is currently making an element available.
501: *
502: * @return the head of this queue, or <tt>null</tt> if no
503: * element is available.
504: */
505: public E poll() {
506: final ReentrantLock qlock = this .qlock;
507: for (;;) {
508: Node node;
509: qlock.lock();
510: try {
511: node = waitingProducers.deq();
512: } finally {
513: qlock.unlock();
514: }
515: if (node == null)
516: return null;
517:
518: else {
519: Object x = node.getItem();
520: if (x != null)
521: return (E) x;
522: // else retry
523: }
524: }
525: }
526:
527: /**
528: * Always returns <tt>true</tt>.
529: * A <tt>SynchronousQueue</tt> has no internal capacity.
530: * @return <tt>true</tt>
531: */
532: public boolean isEmpty() {
533: return true;
534: }
535:
536: /**
537: * Always returns zero.
538: * A <tt>SynchronousQueue</tt> has no internal capacity.
539: * @return zero.
540: */
541: public int size() {
542: return 0;
543: }
544:
545: /**
546: * Always returns zero.
547: * A <tt>SynchronousQueue</tt> has no internal capacity.
548: * @return zero.
549: */
550: public int remainingCapacity() {
551: return 0;
552: }
553:
554: /**
555: * Does nothing.
556: * A <tt>SynchronousQueue</tt> has no internal capacity.
557: */
558: public void clear() {
559: }
560:
561: /**
562: * Always returns <tt>false</tt>.
563: * A <tt>SynchronousQueue</tt> has no internal capacity.
564: * @param o the element
565: * @return <tt>false</tt>
566: */
567: public boolean contains(Object o) {
568: return false;
569: }
570:
571: /**
572: * Always returns <tt>false</tt>.
573: * A <tt>SynchronousQueue</tt> has no internal capacity.
574: *
575: * @param o the element to remove
576: * @return <tt>false</tt>
577: */
578: public boolean remove(Object o) {
579: return false;
580: }
581:
582: /**
583: * Returns <tt>false</tt> unless given collection is empty.
584: * A <tt>SynchronousQueue</tt> has no internal capacity.
585: * @param c the collection
586: * @return <tt>false</tt> unless given collection is empty
587: */
588: public boolean containsAll(Collection<?> c) {
589: return c.isEmpty();
590: }
591:
592: /**
593: * Always returns <tt>false</tt>.
594: * A <tt>SynchronousQueue</tt> has no internal capacity.
595: * @param c the collection
596: * @return <tt>false</tt>
597: */
598: public boolean removeAll(Collection<?> c) {
599: return false;
600: }
601:
602: /**
603: * Always returns <tt>false</tt>.
604: * A <tt>SynchronousQueue</tt> has no internal capacity.
605: * @param c the collection
606: * @return <tt>false</tt>
607: */
608: public boolean retainAll(Collection<?> c) {
609: return false;
610: }
611:
612: /**
613: * Always returns <tt>null</tt>.
614: * A <tt>SynchronousQueue</tt> does not return elements
615: * unless actively waited on.
616: * @return <tt>null</tt>
617: */
618: public E peek() {
619: return null;
620: }
621:
622: static class EmptyIterator<E> implements Iterator<E> {
623: public boolean hasNext() {
624: return false;
625: }
626:
627: public E next() {
628: throw new NoSuchElementException();
629: }
630:
631: public void remove() {
632: throw new IllegalStateException();
633: }
634: }
635:
636: /**
637: * Returns an empty iterator in which <tt>hasNext</tt> always returns
638: * <tt>false</tt>.
639: *
640: * @return an empty iterator
641: */
642: public Iterator<E> iterator() {
643: return new EmptyIterator<E>();
644: }
645:
646: /**
647: * Returns a zero-length array.
648: * @return a zero-length array
649: */
650: public Object[] toArray() {
651: return new Object[0];
652: }
653:
654: /**
655: * Sets the zeroeth element of the specified array to <tt>null</tt>
656: * (if the array has non-zero length) and returns it.
657: * @param a the array
658: * @return the specified array
659: */
660: public <T> T[] toArray(T[] a) {
661: if (a.length > 0)
662: a[0] = null;
663: return a;
664: }
665:
666: public int drainTo(Collection<? super E> c) {
667: if (c == null)
668: throw new NullPointerException();
669: if (c == this )
670: throw new IllegalArgumentException();
671: int n = 0;
672: E e;
673: while ((e = poll()) != null) {
674: c.add(e);
675: ++n;
676: }
677: return n;
678: }
679:
680: public int drainTo(Collection<? super E> c, int maxElements) {
681: if (c == null)
682: throw new NullPointerException();
683: if (c == this )
684: throw new IllegalArgumentException();
685: int n = 0;
686: E e;
687: while (n < maxElements && (e = poll()) != null) {
688: c.add(e);
689: ++n;
690: }
691: return n;
692: }
693: }
|