001 /*
002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
003 *
004 * This code is free software; you can redistribute it and/or modify it
005 * under the terms of the GNU General Public License version 2 only, as
006 * published by the Free Software Foundation. Sun designates this
007 * particular file as subject to the "Classpath" exception as provided
008 * by Sun in the LICENSE file that accompanied this code.
009 *
010 * This code is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
013 * version 2 for more details (a copy is included in the LICENSE file that
014 * accompanied this code).
015 *
016 * You should have received a copy of the GNU General Public License version
017 * 2 along with this work; if not, write to the Free Software Foundation,
018 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
019 *
020 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
021 * CA 95054 USA or visit www.sun.com if you need additional information or
022 * have any questions.
023 */
024
025 /*
026 * This file is available under and governed by the GNU General Public
027 * License version 2 only, as published by the Free Software Foundation.
028 * However, the following notice accompanied the original version of this
029 * file:
030 *
031 * Written by Doug Lea with assistance from members of JCP JSR-166
032 * Expert Group and released to the public domain, as explained at
033 * http://creativecommons.org/licenses/publicdomain
034 */
035
036 package java.util.concurrent;
037
038 import java.util.concurrent.locks.*;
039 import java.util.*;
040
041 /**
042 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
043 * array. This queue orders elements FIFO (first-in-first-out). The
044 * <em>head</em> of the queue is that element that has been on the
045 * queue the longest time. The <em>tail</em> of the queue is that
046 * element that has been on the queue the shortest time. New elements
047 * are inserted at the tail of the queue, and the queue retrieval
048 * operations obtain elements at the head of the queue.
049 *
050 * <p>This is a classic "bounded buffer", in which a
051 * fixed-sized array holds elements inserted by producers and
052 * extracted by consumers. Once created, the capacity cannot be
053 * increased. Attempts to <tt>put</tt> an element into a full queue
054 * will result in the operation blocking; attempts to <tt>take</tt> an
055 * element from an empty queue will similarly block.
056 *
057 * <p> This class supports an optional fairness policy for ordering
058 * waiting producer and consumer threads. By default, this ordering
059 * is not guaranteed. However, a queue constructed with fairness set
060 * to <tt>true</tt> grants threads access in FIFO order. Fairness
061 * generally decreases throughput but reduces variability and avoids
062 * starvation.
063 *
064 * <p>This class and its iterator implement all of the
065 * <em>optional</em> methods of the {@link Collection} and {@link
066 * Iterator} interfaces.
067 *
068 * <p>This class is a member of the
069 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
070 * Java Collections Framework</a>.
071 *
072 * @since 1.5
073 * @author Doug Lea
074 * @param <E> the type of elements held in this collection
075 */
076 public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements
077 BlockingQueue<E>, java.io.Serializable {
078
079 /**
080 * Serialization ID. This class relies on default serialization
081 * even for the items array, which is default-serialized, even if
082 * it is empty. Otherwise it could not be declared final, which is
083 * necessary here.
084 */
085 private static final long serialVersionUID = -817911632652898426L;
086
087 /** The queued items */
088 private final E[] items;
089 /** items index for next take, poll or remove */
090 private int takeIndex;
091 /** items index for next put, offer, or add. */
092 private int putIndex;
093 /** Number of items in the queue */
094 private int count;
095
096 /*
097 * Concurrency control uses the classic two-condition algorithm
098 * found in any textbook.
099 */
100
101 /** Main lock guarding all access */
102 private final ReentrantLock lock;
103 /** Condition for waiting takes */
104 private final Condition notEmpty;
105 /** Condition for waiting puts */
106 private final Condition notFull;
107
108 // Internal helper methods
109
110 /**
111 * Circularly increment i.
112 */
113 final int inc(int i) {
114 return (++i == items.length) ? 0 : i;
115 }
116
117 /**
118 * Inserts element at current put position, advances, and signals.
119 * Call only when holding lock.
120 */
121 private void insert(E x) {
122 items[putIndex] = x;
123 putIndex = inc(putIndex);
124 ++count;
125 notEmpty.signal();
126 }
127
128 /**
129 * Extracts element at current take position, advances, and signals.
130 * Call only when holding lock.
131 */
132 private E extract() {
133 final E[] items = this .items;
134 E x = items[takeIndex];
135 items[takeIndex] = null;
136 takeIndex = inc(takeIndex);
137 --count;
138 notFull.signal();
139 return x;
140 }
141
142 /**
143 * Utility for remove and iterator.remove: Delete item at position i.
144 * Call only when holding lock.
145 */
146 void removeAt(int i) {
147 final E[] items = this .items;
148 // if removing front item, just advance
149 if (i == takeIndex) {
150 items[takeIndex] = null;
151 takeIndex = inc(takeIndex);
152 } else {
153 // slide over all others up through putIndex.
154 for (;;) {
155 int nexti = inc(i);
156 if (nexti != putIndex) {
157 items[i] = items[nexti];
158 i = nexti;
159 } else {
160 items[i] = null;
161 putIndex = i;
162 break;
163 }
164 }
165 }
166 --count;
167 notFull.signal();
168 }
169
170 /**
171 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
172 * capacity and default access policy.
173 *
174 * @param capacity the capacity of this queue
175 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
176 */
177 public ArrayBlockingQueue(int capacity) {
178 this (capacity, false);
179 }
180
181 /**
182 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
183 * capacity and the specified access policy.
184 *
185 * @param capacity the capacity of this queue
186 * @param fair if <tt>true</tt> then queue accesses for threads blocked
187 * on insertion or removal, are processed in FIFO order;
188 * if <tt>false</tt> the access order is unspecified.
189 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
190 */
191 public ArrayBlockingQueue(int capacity, boolean fair) {
192 if (capacity <= 0)
193 throw new IllegalArgumentException();
194 this .items = (E[]) new Object[capacity];
195 lock = new ReentrantLock(fair);
196 notEmpty = lock.newCondition();
197 notFull = lock.newCondition();
198 }
199
200 /**
201 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
202 * capacity, the specified access policy and initially containing the
203 * elements of the given collection,
204 * added in traversal order of the collection's iterator.
205 *
206 * @param capacity the capacity of this queue
207 * @param fair if <tt>true</tt> then queue accesses for threads blocked
208 * on insertion or removal, are processed in FIFO order;
209 * if <tt>false</tt> the access order is unspecified.
210 * @param c the collection of elements to initially contain
211 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
212 * <tt>c.size()</tt>, or less than 1.
213 * @throws NullPointerException if the specified collection or any
214 * of its elements are null
215 */
216 public ArrayBlockingQueue(int capacity, boolean fair,
217 Collection<? extends E> c) {
218 this (capacity, fair);
219 if (capacity < c.size())
220 throw new IllegalArgumentException();
221
222 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
223 add(it.next());
224 }
225
226 /**
227 * Inserts the specified element at the tail of this queue if it is
228 * possible to do so immediately without exceeding the queue's capacity,
229 * returning <tt>true</tt> upon success and throwing an
230 * <tt>IllegalStateException</tt> if this queue is full.
231 *
232 * @param e the element to add
233 * @return <tt>true</tt> (as specified by {@link Collection#add})
234 * @throws IllegalStateException if this queue is full
235 * @throws NullPointerException if the specified element is null
236 */
237 public boolean add(E e) {
238 return super .add(e);
239 }
240
241 /**
242 * Inserts the specified element at the tail of this queue if it is
243 * possible to do so immediately without exceeding the queue's capacity,
244 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
245 * is full. This method is generally preferable to method {@link #add},
246 * which can fail to insert an element only by throwing an exception.
247 *
248 * @throws NullPointerException if the specified element is null
249 */
250 public boolean offer(E e) {
251 if (e == null)
252 throw new NullPointerException();
253 final ReentrantLock lock = this .lock;
254 lock.lock();
255 try {
256 if (count == items.length)
257 return false;
258 else {
259 insert(e);
260 return true;
261 }
262 } finally {
263 lock.unlock();
264 }
265 }
266
267 /**
268 * Inserts the specified element at the tail of this queue, waiting
269 * for space to become available if the queue is full.
270 *
271 * @throws InterruptedException {@inheritDoc}
272 * @throws NullPointerException {@inheritDoc}
273 */
274 public void put(E e) throws InterruptedException {
275 if (e == null)
276 throw new NullPointerException();
277 final E[] items = this .items;
278 final ReentrantLock lock = this .lock;
279 lock.lockInterruptibly();
280 try {
281 try {
282 while (count == items.length)
283 notFull.await();
284 } catch (InterruptedException ie) {
285 notFull.signal(); // propagate to non-interrupted thread
286 throw ie;
287 }
288 insert(e);
289 } finally {
290 lock.unlock();
291 }
292 }
293
294 /**
295 * Inserts the specified element at the tail of this queue, waiting
296 * up to the specified wait time for space to become available if
297 * the queue is full.
298 *
299 * @throws InterruptedException {@inheritDoc}
300 * @throws NullPointerException {@inheritDoc}
301 */
302 public boolean offer(E e, long timeout, TimeUnit unit)
303 throws InterruptedException {
304
305 if (e == null)
306 throw new NullPointerException();
307 long nanos = unit.toNanos(timeout);
308 final ReentrantLock lock = this .lock;
309 lock.lockInterruptibly();
310 try {
311 for (;;) {
312 if (count != items.length) {
313 insert(e);
314 return true;
315 }
316 if (nanos <= 0)
317 return false;
318 try {
319 nanos = notFull.awaitNanos(nanos);
320 } catch (InterruptedException ie) {
321 notFull.signal(); // propagate to non-interrupted thread
322 throw ie;
323 }
324 }
325 } finally {
326 lock.unlock();
327 }
328 }
329
330 public E poll() {
331 final ReentrantLock lock = this .lock;
332 lock.lock();
333 try {
334 if (count == 0)
335 return null;
336 E x = extract();
337 return x;
338 } finally {
339 lock.unlock();
340 }
341 }
342
343 public E take() throws InterruptedException {
344 final ReentrantLock lock = this .lock;
345 lock.lockInterruptibly();
346 try {
347 try {
348 while (count == 0)
349 notEmpty.await();
350 } catch (InterruptedException ie) {
351 notEmpty.signal(); // propagate to non-interrupted thread
352 throw ie;
353 }
354 E x = extract();
355 return x;
356 } finally {
357 lock.unlock();
358 }
359 }
360
361 public E poll(long timeout, TimeUnit unit)
362 throws InterruptedException {
363 long nanos = unit.toNanos(timeout);
364 final ReentrantLock lock = this .lock;
365 lock.lockInterruptibly();
366 try {
367 for (;;) {
368 if (count != 0) {
369 E x = extract();
370 return x;
371 }
372 if (nanos <= 0)
373 return null;
374 try {
375 nanos = notEmpty.awaitNanos(nanos);
376 } catch (InterruptedException ie) {
377 notEmpty.signal(); // propagate to non-interrupted thread
378 throw ie;
379 }
380
381 }
382 } finally {
383 lock.unlock();
384 }
385 }
386
387 public E peek() {
388 final ReentrantLock lock = this .lock;
389 lock.lock();
390 try {
391 return (count == 0) ? null : items[takeIndex];
392 } finally {
393 lock.unlock();
394 }
395 }
396
397 // this doc comment is overridden to remove the reference to collections
398 // greater in size than Integer.MAX_VALUE
399 /**
400 * Returns the number of elements in this queue.
401 *
402 * @return the number of elements in this queue
403 */
404 public int size() {
405 final ReentrantLock lock = this .lock;
406 lock.lock();
407 try {
408 return count;
409 } finally {
410 lock.unlock();
411 }
412 }
413
414 // this doc comment is a modified copy of the inherited doc comment,
415 // without the reference to unlimited queues.
416 /**
417 * Returns the number of additional elements that this queue can ideally
418 * (in the absence of memory or resource constraints) accept without
419 * blocking. This is always equal to the initial capacity of this queue
420 * less the current <tt>size</tt> of this queue.
421 *
422 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
423 * an element will succeed by inspecting <tt>remainingCapacity</tt>
424 * because it may be the case that another thread is about to
425 * insert or remove an element.
426 */
427 public int remainingCapacity() {
428 final ReentrantLock lock = this .lock;
429 lock.lock();
430 try {
431 return items.length - count;
432 } finally {
433 lock.unlock();
434 }
435 }
436
437 /**
438 * Removes a single instance of the specified element from this queue,
439 * if it is present. More formally, removes an element <tt>e</tt> such
440 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
441 * elements.
442 * Returns <tt>true</tt> if this queue contained the specified element
443 * (or equivalently, if this queue changed as a result of the call).
444 *
445 * @param o element to be removed from this queue, if present
446 * @return <tt>true</tt> if this queue changed as a result of the call
447 */
448 public boolean remove(Object o) {
449 if (o == null)
450 return false;
451 final E[] items = this .items;
452 final ReentrantLock lock = this .lock;
453 lock.lock();
454 try {
455 int i = takeIndex;
456 int k = 0;
457 for (;;) {
458 if (k++ >= count)
459 return false;
460 if (o.equals(items[i])) {
461 removeAt(i);
462 return true;
463 }
464 i = inc(i);
465 }
466
467 } finally {
468 lock.unlock();
469 }
470 }
471
472 /**
473 * Returns <tt>true</tt> if this queue contains the specified element.
474 * More formally, returns <tt>true</tt> if and only if this queue contains
475 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
476 *
477 * @param o object to be checked for containment in this queue
478 * @return <tt>true</tt> if this queue contains the specified element
479 */
480 public boolean contains(Object o) {
481 if (o == null)
482 return false;
483 final E[] items = this .items;
484 final ReentrantLock lock = this .lock;
485 lock.lock();
486 try {
487 int i = takeIndex;
488 int k = 0;
489 while (k++ < count) {
490 if (o.equals(items[i]))
491 return true;
492 i = inc(i);
493 }
494 return false;
495 } finally {
496 lock.unlock();
497 }
498 }
499
500 /**
501 * Returns an array containing all of the elements in this queue, in
502 * proper sequence.
503 *
504 * <p>The returned array will be "safe" in that no references to it are
505 * maintained by this queue. (In other words, this method must allocate
506 * a new array). The caller is thus free to modify the returned array.
507 *
508 * <p>This method acts as bridge between array-based and collection-based
509 * APIs.
510 *
511 * @return an array containing all of the elements in this queue
512 */
513 public Object[] toArray() {
514 final E[] items = this .items;
515 final ReentrantLock lock = this .lock;
516 lock.lock();
517 try {
518 Object[] a = new Object[count];
519 int k = 0;
520 int i = takeIndex;
521 while (k < count) {
522 a[k++] = items[i];
523 i = inc(i);
524 }
525 return a;
526 } finally {
527 lock.unlock();
528 }
529 }
530
531 /**
532 * Returns an array containing all of the elements in this queue, in
533 * proper sequence; the runtime type of the returned array is that of
534 * the specified array. If the queue fits in the specified array, it
535 * is returned therein. Otherwise, a new array is allocated with the
536 * runtime type of the specified array and the size of this queue.
537 *
538 * <p>If this queue fits in the specified array with room to spare
539 * (i.e., the array has more elements than this queue), the element in
540 * the array immediately following the end of the queue is set to
541 * <tt>null</tt>.
542 *
543 * <p>Like the {@link #toArray()} method, this method acts as bridge between
544 * array-based and collection-based APIs. Further, this method allows
545 * precise control over the runtime type of the output array, and may,
546 * under certain circumstances, be used to save allocation costs.
547 *
548 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
549 * The following code can be used to dump the queue into a newly
550 * allocated array of <tt>String</tt>:
551 *
552 * <pre>
553 * String[] y = x.toArray(new String[0]);</pre>
554 *
555 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
556 * <tt>toArray()</tt>.
557 *
558 * @param a the array into which the elements of the queue are to
559 * be stored, if it is big enough; otherwise, a new array of the
560 * same runtime type is allocated for this purpose
561 * @return an array containing all of the elements in this queue
562 * @throws ArrayStoreException if the runtime type of the specified array
563 * is not a supertype of the runtime type of every element in
564 * this queue
565 * @throws NullPointerException if the specified array is null
566 */
567 public <T> T[] toArray(T[] a) {
568 final E[] items = this .items;
569 final ReentrantLock lock = this .lock;
570 lock.lock();
571 try {
572 if (a.length < count)
573 a = (T[]) java.lang.reflect.Array.newInstance(a
574 .getClass().getComponentType(), count);
575
576 int k = 0;
577 int i = takeIndex;
578 while (k < count) {
579 a[k++] = (T) items[i];
580 i = inc(i);
581 }
582 if (a.length > count)
583 a[count] = null;
584 return a;
585 } finally {
586 lock.unlock();
587 }
588 }
589
590 public String toString() {
591 final ReentrantLock lock = this .lock;
592 lock.lock();
593 try {
594 return super .toString();
595 } finally {
596 lock.unlock();
597 }
598 }
599
600 /**
601 * Atomically removes all of the elements from this queue.
602 * The queue will be empty after this call returns.
603 */
604 public void clear() {
605 final E[] items = this .items;
606 final ReentrantLock lock = this .lock;
607 lock.lock();
608 try {
609 int i = takeIndex;
610 int k = count;
611 while (k-- > 0) {
612 items[i] = null;
613 i = inc(i);
614 }
615 count = 0;
616 putIndex = 0;
617 takeIndex = 0;
618 notFull.signalAll();
619 } finally {
620 lock.unlock();
621 }
622 }
623
624 /**
625 * @throws UnsupportedOperationException {@inheritDoc}
626 * @throws ClassCastException {@inheritDoc}
627 * @throws NullPointerException {@inheritDoc}
628 * @throws IllegalArgumentException {@inheritDoc}
629 */
630 public int drainTo(Collection<? super E> c) {
631 if (c == null)
632 throw new NullPointerException();
633 if (c == this )
634 throw new IllegalArgumentException();
635 final E[] items = this .items;
636 final ReentrantLock lock = this .lock;
637 lock.lock();
638 try {
639 int i = takeIndex;
640 int n = 0;
641 int max = count;
642 while (n < max) {
643 c.add(items[i]);
644 items[i] = null;
645 i = inc(i);
646 ++n;
647 }
648 if (n > 0) {
649 count = 0;
650 putIndex = 0;
651 takeIndex = 0;
652 notFull.signalAll();
653 }
654 return n;
655 } finally {
656 lock.unlock();
657 }
658 }
659
660 /**
661 * @throws UnsupportedOperationException {@inheritDoc}
662 * @throws ClassCastException {@inheritDoc}
663 * @throws NullPointerException {@inheritDoc}
664 * @throws IllegalArgumentException {@inheritDoc}
665 */
666 public int drainTo(Collection<? super E> c, int maxElements) {
667 if (c == null)
668 throw new NullPointerException();
669 if (c == this )
670 throw new IllegalArgumentException();
671 if (maxElements <= 0)
672 return 0;
673 final E[] items = this .items;
674 final ReentrantLock lock = this .lock;
675 lock.lock();
676 try {
677 int i = takeIndex;
678 int n = 0;
679 int sz = count;
680 int max = (maxElements < count) ? maxElements : count;
681 while (n < max) {
682 c.add(items[i]);
683 items[i] = null;
684 i = inc(i);
685 ++n;
686 }
687 if (n > 0) {
688 count -= n;
689 takeIndex = i;
690 notFull.signalAll();
691 }
692 return n;
693 } finally {
694 lock.unlock();
695 }
696 }
697
698 /**
699 * Returns an iterator over the elements in this queue in proper sequence.
700 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
701 * will never throw {@link ConcurrentModificationException},
702 * and guarantees to traverse elements as they existed upon
703 * construction of the iterator, and may (but is not guaranteed to)
704 * reflect any modifications subsequent to construction.
705 *
706 * @return an iterator over the elements in this queue in proper sequence
707 */
708 public Iterator<E> iterator() {
709 final ReentrantLock lock = this .lock;
710 lock.lock();
711 try {
712 return new Itr();
713 } finally {
714 lock.unlock();
715 }
716 }
717
718 /**
719 * Iterator for ArrayBlockingQueue
720 */
721 private class Itr implements Iterator<E> {
722 /**
723 * Index of element to be returned by next,
724 * or a negative number if no such.
725 */
726 private int nextIndex;
727
728 /**
729 * nextItem holds on to item fields because once we claim
730 * that an element exists in hasNext(), we must return it in
731 * the following next() call even if it was in the process of
732 * being removed when hasNext() was called.
733 */
734 private E nextItem;
735
736 /**
737 * Index of element returned by most recent call to next.
738 * Reset to -1 if this element is deleted by a call to remove.
739 */
740 private int lastRet;
741
742 Itr() {
743 lastRet = -1;
744 if (count == 0)
745 nextIndex = -1;
746 else {
747 nextIndex = takeIndex;
748 nextItem = items[takeIndex];
749 }
750 }
751
752 public boolean hasNext() {
753 /*
754 * No sync. We can return true by mistake here
755 * only if this iterator passed across threads,
756 * which we don't support anyway.
757 */
758 return nextIndex >= 0;
759 }
760
761 /**
762 * Checks whether nextIndex is valid; if so setting nextItem.
763 * Stops iterator when either hits putIndex or sees null item.
764 */
765 private void checkNext() {
766 if (nextIndex == putIndex) {
767 nextIndex = -1;
768 nextItem = null;
769 } else {
770 nextItem = items[nextIndex];
771 if (nextItem == null)
772 nextIndex = -1;
773 }
774 }
775
776 public E next() {
777 final ReentrantLock lock = ArrayBlockingQueue.this .lock;
778 lock.lock();
779 try {
780 if (nextIndex < 0)
781 throw new NoSuchElementException();
782 lastRet = nextIndex;
783 E x = nextItem;
784 nextIndex = inc(nextIndex);
785 checkNext();
786 return x;
787 } finally {
788 lock.unlock();
789 }
790 }
791
792 public void remove() {
793 final ReentrantLock lock = ArrayBlockingQueue.this .lock;
794 lock.lock();
795 try {
796 int i = lastRet;
797 if (i == -1)
798 throw new IllegalStateException();
799 lastRet = -1;
800
801 int ti = takeIndex;
802 removeAt(i);
803 // back up cursor (reset to front if was first element)
804 nextIndex = (i == ti) ? takeIndex : i;
805 checkNext();
806 } finally {
807 lock.unlock();
808 }
809 }
810 }
811 }
|