001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package java.util.concurrent;
008:
009: import java.util.concurrent.atomic.*;
010: import java.util.concurrent.locks.*;
011: import java.util.*;
012:
013: /**
014: * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
015: * linked nodes.
016: * This queue orders elements FIFO (first-in-first-out).
017: * The <em>head</em> of the queue is that element that has been on the
018: * queue the longest time.
019: * The <em>tail</em> of the queue is that element that has been on the
020: * queue the shortest time. New elements
021: * are inserted at the tail of the queue, and the queue retrieval
022: * operations obtain elements at the head of the queue.
023: * Linked queues typically have higher throughput than array-based queues but
024: * less predictable performance in most concurrent applications.
025: *
026: * <p> The optional capacity bound constructor argument serves as a
027: * way to prevent excessive queue expansion. The capacity, if unspecified,
028: * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
029: * dynamically created upon each insertion unless this would bring the
030: * queue above capacity.
031: *
032: * <p>This class implements all of the <em>optional</em> methods
033: * of the {@link Collection} and {@link Iterator} interfaces.
034: *
035: * <p>This class is a member of the
036: * <a href="{@docRoot}/../guide/collections/index.html">
037: * Java Collections Framework</a>.
038: *
039: * @since 1.5
040: * @author Doug Lea
041: * @param <E> the type of elements held in this collection
042: *
043: **/
044: public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements
045: BlockingQueue<E>, java.io.Serializable {
046: private static final long serialVersionUID = -6903933977591709194L;
047:
048: /*
049: * A variant of the "two lock queue" algorithm. The putLock gates
050: * entry to put (and offer), and has an associated condition for
051: * waiting puts. Similarly for the takeLock. The "count" field
052: * that they both rely on is maintained as an atomic to avoid
053: * needing to get both locks in most cases. Also, to minimize need
054: * for puts to get takeLock and vice-versa, cascading notifies are
055: * used. When a put notices that it has enabled at least one take,
056: * it signals taker. That taker in turn signals others if more
057: * items have been entered since the signal. And symmetrically for
058: * takes signalling puts. Operations such as remove(Object) and
059: * iterators acquire both locks.
060: */
061:
062: /**
063: * Linked list node class
064: */
065: static class Node<E> {
066: /** The item, volatile to ensure barrier separating write and read */
067: volatile E item;
068: Node<E> next;
069:
070: Node(E x) {
071: item = x;
072: }
073: }
074:
075: /** The capacity bound, or Integer.MAX_VALUE if none */
076: private final int capacity;
077:
078: /** Current number of elements */
079: private final AtomicInteger count = new AtomicInteger(0);
080:
081: /** Head of linked list */
082: private transient Node<E> head;
083:
084: /** Tail of linked list */
085: private transient Node<E> last;
086:
087: /** Lock held by take, poll, etc */
088: private final ReentrantLock takeLock = new ReentrantLock();
089:
090: /** Wait queue for waiting takes */
091: private final Condition notEmpty = takeLock.newCondition();
092:
093: /** Lock held by put, offer, etc */
094: private final ReentrantLock putLock = new ReentrantLock();
095:
096: /** Wait queue for waiting puts */
097: private final Condition notFull = putLock.newCondition();
098:
099: /**
100: * Signal a waiting take. Called only from put/offer (which do not
101: * otherwise ordinarily lock takeLock.)
102: */
103: private void signalNotEmpty() {
104: final ReentrantLock takeLock = this .takeLock;
105: takeLock.lock();
106: try {
107: notEmpty.signal();
108: } finally {
109: takeLock.unlock();
110: }
111: }
112:
113: /**
114: * Signal a waiting put. Called only from take/poll.
115: */
116: private void signalNotFull() {
117: final ReentrantLock putLock = this .putLock;
118: putLock.lock();
119: try {
120: notFull.signal();
121: } finally {
122: putLock.unlock();
123: }
124: }
125:
126: /**
127: * Create a node and link it at end of queue
128: * @param x the item
129: */
130: private void insert(E x) {
131: last = last.next = new Node<E>(x);
132: }
133:
134: /**
135: * Remove a node from head of queue,
136: * @return the node
137: */
138: private E extract() {
139: Node<E> first = head.next;
140: head = first;
141: E x = first.item;
142: first.item = null;
143: return x;
144: }
145:
146: /**
147: * Lock to prevent both puts and takes.
148: */
149: private void fullyLock() {
150: putLock.lock();
151: takeLock.lock();
152: }
153:
154: /**
155: * Unlock to allow both puts and takes.
156: */
157: private void fullyUnlock() {
158: takeLock.unlock();
159: putLock.unlock();
160: }
161:
162: /**
163: * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
164: * {@link Integer#MAX_VALUE}.
165: */
166: public LinkedBlockingQueue() {
167: this (Integer.MAX_VALUE);
168: }
169:
170: /**
171: * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
172: *
173: * @param capacity the capacity of this queue.
174: * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
175: * than zero.
176: */
177: public LinkedBlockingQueue(int capacity) {
178: if (capacity <= 0)
179: throw new IllegalArgumentException();
180: this .capacity = capacity;
181: last = head = new Node<E>(null);
182: }
183:
184: /**
185: * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
186: * {@link Integer#MAX_VALUE}, initially containing the elements of the
187: * given collection,
188: * added in traversal order of the collection's iterator.
189: * @param c the collection of elements to initially contain
190: * @throws NullPointerException if <tt>c</tt> or any element within it
191: * is <tt>null</tt>
192: */
193: public LinkedBlockingQueue(Collection<? extends E> c) {
194: this (Integer.MAX_VALUE);
195: for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
196: add(it.next());
197: }
198:
199: // this doc comment is overridden to remove the reference to collections
200: // greater in size than Integer.MAX_VALUE
201: /**
202: * Returns the number of elements in this queue.
203: *
204: * @return the number of elements in this queue.
205: */
206: public int size() {
207: return count.get();
208: }
209:
210: // this doc comment is a modified copy of the inherited doc comment,
211: // without the reference to unlimited queues.
212: /**
213: * Returns the number of elements that this queue can ideally (in
214: * the absence of memory or resource constraints) accept without
215: * blocking. This is always equal to the initial capacity of this queue
216: * less the current <tt>size</tt> of this queue.
217: * <p>Note that you <em>cannot</em> always tell if
218: * an attempt to <tt>add</tt> an element will succeed by
219: * inspecting <tt>remainingCapacity</tt> because it may be the
220: * case that a waiting consumer is ready to <tt>take</tt> an
221: * element out of an otherwise full queue.
222: */
223: public int remainingCapacity() {
224: return capacity - count.get();
225: }
226:
227: /**
228: * Adds the specified element to the tail of this queue, waiting if
229: * necessary for space to become available.
230: * @param o the element to add
231: * @throws InterruptedException if interrupted while waiting.
232: * @throws NullPointerException if the specified element is <tt>null</tt>.
233: */
234: public void put(E o) throws InterruptedException {
235: if (o == null)
236: throw new NullPointerException();
237: // Note: convention in all put/take/etc is to preset
238: // local var holding count negative to indicate failure unless set.
239: int c = -1;
240: final ReentrantLock putLock = this .putLock;
241: final AtomicInteger count = this .count;
242: putLock.lockInterruptibly();
243: try {
244: /*
245: * Note that count is used in wait guard even though it is
246: * not protected by lock. This works because count can
247: * only decrease at this point (all other puts are shut
248: * out by lock), and we (or some other waiting put) are
249: * signalled if it ever changes from
250: * capacity. Similarly for all other uses of count in
251: * other wait guards.
252: */
253: try {
254: while (count.get() == capacity)
255: notFull.await();
256: } catch (InterruptedException ie) {
257: notFull.signal(); // propagate to a non-interrupted thread
258: throw ie;
259: }
260: insert(o);
261: c = count.getAndIncrement();
262: if (c + 1 < capacity)
263: notFull.signal();
264: } finally {
265: putLock.unlock();
266: }
267: if (c == 0)
268: signalNotEmpty();
269: }
270:
271: /**
272: * Inserts the specified element at the tail of this queue, waiting if
273: * necessary up to the specified wait time for space to become available.
274: * @param o the element to add
275: * @param timeout how long to wait before giving up, in units of
276: * <tt>unit</tt>
277: * @param unit a <tt>TimeUnit</tt> determining how to interpret the
278: * <tt>timeout</tt> parameter
279: * @return <tt>true</tt> if successful, or <tt>false</tt> if
280: * the specified waiting time elapses before space is available.
281: * @throws InterruptedException if interrupted while waiting.
282: * @throws NullPointerException if the specified element is <tt>null</tt>.
283: */
284: public boolean offer(E o, long timeout, TimeUnit unit)
285: throws InterruptedException {
286:
287: if (o == null)
288: throw new NullPointerException();
289: long nanos = unit.toNanos(timeout);
290: int c = -1;
291: final ReentrantLock putLock = this .putLock;
292: final AtomicInteger count = this .count;
293: putLock.lockInterruptibly();
294: try {
295: for (;;) {
296: if (count.get() < capacity) {
297: insert(o);
298: c = count.getAndIncrement();
299: if (c + 1 < capacity)
300: notFull.signal();
301: break;
302: }
303: if (nanos <= 0)
304: return false;
305: try {
306: nanos = notFull.awaitNanos(nanos);
307: } catch (InterruptedException ie) {
308: notFull.signal(); // propagate to a non-interrupted thread
309: throw ie;
310: }
311: }
312: } finally {
313: putLock.unlock();
314: }
315: if (c == 0)
316: signalNotEmpty();
317: return true;
318: }
319:
320: /**
321: * Inserts the specified element at the tail of this queue if possible,
322: * returning immediately if this queue is full.
323: *
324: * @param o the element to add.
325: * @return <tt>true</tt> if it was possible to add the element to
326: * this queue, else <tt>false</tt>
327: * @throws NullPointerException if the specified element is <tt>null</tt>
328: */
329: public boolean offer(E o) {
330: if (o == null)
331: throw new NullPointerException();
332: final AtomicInteger count = this .count;
333: if (count.get() == capacity)
334: return false;
335: int c = -1;
336: final ReentrantLock putLock = this .putLock;
337: putLock.lock();
338: try {
339: if (count.get() < capacity) {
340: insert(o);
341: c = count.getAndIncrement();
342: if (c + 1 < capacity)
343: notFull.signal();
344: }
345: } finally {
346: putLock.unlock();
347: }
348: if (c == 0)
349: signalNotEmpty();
350: return c >= 0;
351: }
352:
353: public E take() throws InterruptedException {
354: E x;
355: int c = -1;
356: final AtomicInteger count = this .count;
357: final ReentrantLock takeLock = this .takeLock;
358: takeLock.lockInterruptibly();
359: try {
360: try {
361: while (count.get() == 0)
362: notEmpty.await();
363: } catch (InterruptedException ie) {
364: notEmpty.signal(); // propagate to a non-interrupted thread
365: throw ie;
366: }
367:
368: x = extract();
369: c = count.getAndDecrement();
370: if (c > 1)
371: notEmpty.signal();
372: } finally {
373: takeLock.unlock();
374: }
375: if (c == capacity)
376: signalNotFull();
377: return x;
378: }
379:
380: public E poll(long timeout, TimeUnit unit)
381: throws InterruptedException {
382: E x = null;
383: int c = -1;
384: long nanos = unit.toNanos(timeout);
385: final AtomicInteger count = this .count;
386: final ReentrantLock takeLock = this .takeLock;
387: takeLock.lockInterruptibly();
388: try {
389: for (;;) {
390: if (count.get() > 0) {
391: x = extract();
392: c = count.getAndDecrement();
393: if (c > 1)
394: notEmpty.signal();
395: break;
396: }
397: if (nanos <= 0)
398: return null;
399: try {
400: nanos = notEmpty.awaitNanos(nanos);
401: } catch (InterruptedException ie) {
402: notEmpty.signal(); // propagate to a non-interrupted thread
403: throw ie;
404: }
405: }
406: } finally {
407: takeLock.unlock();
408: }
409: if (c == capacity)
410: signalNotFull();
411: return x;
412: }
413:
414: public E poll() {
415: final AtomicInteger count = this .count;
416: if (count.get() == 0)
417: return null;
418: E x = null;
419: int c = -1;
420: final ReentrantLock takeLock = this .takeLock;
421: takeLock.lock();
422: try {
423: if (count.get() > 0) {
424: x = extract();
425: c = count.getAndDecrement();
426: if (c > 1)
427: notEmpty.signal();
428: }
429: } finally {
430: takeLock.unlock();
431: }
432: if (c == capacity)
433: signalNotFull();
434: return x;
435: }
436:
437: public E peek() {
438: if (count.get() == 0)
439: return null;
440: final ReentrantLock takeLock = this .takeLock;
441: takeLock.lock();
442: try {
443: Node<E> first = head.next;
444: if (first == null)
445: return null;
446: else
447: return first.item;
448: } finally {
449: takeLock.unlock();
450: }
451: }
452:
453: public boolean remove(Object o) {
454: if (o == null)
455: return false;
456: boolean removed = false;
457: fullyLock();
458: try {
459: Node<E> trail = head;
460: Node<E> p = head.next;
461: while (p != null) {
462: if (o.equals(p.item)) {
463: removed = true;
464: break;
465: }
466: trail = p;
467: p = p.next;
468: }
469: if (removed) {
470: p.item = null;
471: trail.next = p.next;
472: if (count.getAndDecrement() == capacity)
473: notFull.signalAll();
474: }
475: } finally {
476: fullyUnlock();
477: }
478: return removed;
479: }
480:
481: public Object[] toArray() {
482: fullyLock();
483: try {
484: int size = count.get();
485: Object[] a = new Object[size];
486: int k = 0;
487: for (Node<E> p = head.next; p != null; p = p.next)
488: a[k++] = p.item;
489: return a;
490: } finally {
491: fullyUnlock();
492: }
493: }
494:
495: public <T> T[] toArray(T[] a) {
496: fullyLock();
497: try {
498: int size = count.get();
499: if (a.length < size)
500: a = (T[]) java.lang.reflect.Array.newInstance(a
501: .getClass().getComponentType(), size);
502:
503: int k = 0;
504: for (Node p = head.next; p != null; p = p.next)
505: a[k++] = (T) p.item;
506: return a;
507: } finally {
508: fullyUnlock();
509: }
510: }
511:
512: public String toString() {
513: fullyLock();
514: try {
515: return super .toString();
516: } finally {
517: fullyUnlock();
518: }
519: }
520:
521: public void clear() {
522: fullyLock();
523: try {
524: head.next = null;
525: if (count.getAndSet(0) == capacity)
526: notFull.signalAll();
527: } finally {
528: fullyUnlock();
529: }
530: }
531:
532: public int drainTo(Collection<? super E> c) {
533: if (c == null)
534: throw new NullPointerException();
535: if (c == this )
536: throw new IllegalArgumentException();
537: Node first;
538: fullyLock();
539: try {
540: first = head.next;
541: head.next = null;
542: if (count.getAndSet(0) == capacity)
543: notFull.signalAll();
544: } finally {
545: fullyUnlock();
546: }
547: // Transfer the elements outside of locks
548: int n = 0;
549: for (Node<E> p = first; p != null; p = p.next) {
550: c.add(p.item);
551: p.item = null;
552: ++n;
553: }
554: return n;
555: }
556:
557: public int drainTo(Collection<? super E> c, int maxElements) {
558: if (c == null)
559: throw new NullPointerException();
560: if (c == this )
561: throw new IllegalArgumentException();
562: if (maxElements <= 0)
563: return 0;
564: fullyLock();
565: try {
566: int n = 0;
567: Node<E> p = head.next;
568: while (p != null && n < maxElements) {
569: c.add(p.item);
570: p.item = null;
571: p = p.next;
572: ++n;
573: }
574: if (n != 0) {
575: head.next = p;
576: if (count.getAndAdd(-n) == capacity)
577: notFull.signalAll();
578: }
579: return n;
580: } finally {
581: fullyUnlock();
582: }
583: }
584:
585: /**
586: * Returns an iterator over the elements in this queue in proper sequence.
587: * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
588: * will never throw {@link java.util.ConcurrentModificationException},
589: * and guarantees to traverse elements as they existed upon
590: * construction of the iterator, and may (but is not guaranteed to)
591: * reflect any modifications subsequent to construction.
592: *
593: * @return an iterator over the elements in this queue in proper sequence.
594: */
595: public Iterator<E> iterator() {
596: return new Itr();
597: }
598:
599: private class Itr implements Iterator<E> {
600: /*
601: * Basic weak-consistent iterator. At all times hold the next
602: * item to hand out so that if hasNext() reports true, we will
603: * still have it to return even if lost race with a take etc.
604: */
605: private Node<E> current;
606: private Node<E> lastRet;
607: private E currentElement;
608:
609: Itr() {
610: final ReentrantLock putLock = LinkedBlockingQueue.this .putLock;
611: final ReentrantLock takeLock = LinkedBlockingQueue.this .takeLock;
612: putLock.lock();
613: takeLock.lock();
614: try {
615: current = head.next;
616: if (current != null)
617: currentElement = current.item;
618: } finally {
619: takeLock.unlock();
620: putLock.unlock();
621: }
622: }
623:
624: public boolean hasNext() {
625: return current != null;
626: }
627:
628: public E next() {
629: final ReentrantLock putLock = LinkedBlockingQueue.this .putLock;
630: final ReentrantLock takeLock = LinkedBlockingQueue.this .takeLock;
631: putLock.lock();
632: takeLock.lock();
633: try {
634: if (current == null)
635: throw new NoSuchElementException();
636: E x = currentElement;
637: lastRet = current;
638: current = current.next;
639: if (current != null)
640: currentElement = current.item;
641: return x;
642: } finally {
643: takeLock.unlock();
644: putLock.unlock();
645: }
646: }
647:
648: public void remove() {
649: if (lastRet == null)
650: throw new IllegalStateException();
651: final ReentrantLock putLock = LinkedBlockingQueue.this .putLock;
652: final ReentrantLock takeLock = LinkedBlockingQueue.this .takeLock;
653: putLock.lock();
654: takeLock.lock();
655: try {
656: Node<E> node = lastRet;
657: lastRet = null;
658: Node<E> trail = head;
659: Node<E> p = head.next;
660: while (p != null && p != node) {
661: trail = p;
662: p = p.next;
663: }
664: if (p == node) {
665: p.item = null;
666: trail.next = p.next;
667: int c = count.getAndDecrement();
668: if (c == capacity)
669: notFull.signalAll();
670: }
671: } finally {
672: takeLock.unlock();
673: putLock.unlock();
674: }
675: }
676: }
677:
678: /**
679: * Save the state to a stream (that is, serialize it).
680: *
681: * @serialData The capacity is emitted (int), followed by all of
682: * its elements (each an <tt>Object</tt>) in the proper order,
683: * followed by a null
684: * @param s the stream
685: */
686: private void writeObject(java.io.ObjectOutputStream s)
687: throws java.io.IOException {
688:
689: fullyLock();
690: try {
691: // Write out any hidden stuff, plus capacity
692: s.defaultWriteObject();
693:
694: // Write out all elements in the proper order.
695: for (Node<E> p = head.next; p != null; p = p.next)
696: s.writeObject(p.item);
697:
698: // Use trailing null as sentinel
699: s.writeObject(null);
700: } finally {
701: fullyUnlock();
702: }
703: }
704:
705: /**
706: * Reconstitute this queue instance from a stream (that is,
707: * deserialize it).
708: * @param s the stream
709: */
710: private void readObject(java.io.ObjectInputStream s)
711: throws java.io.IOException, ClassNotFoundException {
712: // Read in capacity, and any hidden stuff
713: s.defaultReadObject();
714:
715: count.set(0);
716: last = head = new Node<E>(null);
717:
718: // Read in all elements and place in queue
719: for (;;) {
720: E item = (E) s.readObject();
721: if (item == null)
722: break;
723: add(item);
724: }
725: }
726: }
|