001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package java.util.concurrent;
008:
009: import java.util.concurrent.locks.*;
010: import java.util.*;
011:
012: /**
013: * A bounded {@linkplain BlockingQueue blocking queue} backed by an
014: * array. This queue orders elements FIFO (first-in-first-out). The
015: * <em>head</em> of the queue is that element that has been on the
016: * queue the longest time. The <em>tail</em> of the queue is that
017: * element that has been on the queue the shortest time. New elements
018: * are inserted at the tail of the queue, and the queue retrieval
019: * operations obtain elements at the head of the queue.
020: *
021: * <p>This is a classic "bounded buffer", in which a
022: * fixed-sized array holds elements inserted by producers and
023: * extracted by consumers. Once created, the capacity cannot be
024: * increased. Attempts to offer an element to a full queue will
025: * result in the offer operation blocking; attempts to retrieve an
026: * element from an empty queue will similarly block.
027: *
028: * <p> This class supports an optional fairness policy for ordering
029: * waiting producer and consumer threads. By default, this ordering
030: * is not guaranteed. However, a queue constructed with fairness set
031: * to <tt>true</tt> grants threads access in FIFO order. Fairness
032: * generally decreases throughput but reduces variability and avoids
033: * starvation.
034: *
035: * <p>This class implements all of the <em>optional</em> methods
036: * of the {@link Collection} and {@link Iterator} interfaces.
037: *
038: * <p>This class is a member of the
039: * <a href="{@docRoot}/../guide/collections/index.html">
040: * Java Collections Framework</a>.
041: *
042: * @since 1.5
043: * @author Doug Lea
044: * @param <E> the type of elements held in this collection
045: */
046: public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements
047: BlockingQueue<E>, java.io.Serializable {
048:
049: /**
050: * Serialization ID. This class relies on default serialization
051: * even for the items array, which is default-serialized, even if
052: * it is empty. Otherwise it could not be declared final, which is
053: * necessary here.
054: */
055: private static final long serialVersionUID = -817911632652898426L;
056:
057: /** The queued items */
058: private final E[] items;
059: /** items index for next take, poll or remove */
060: private transient int takeIndex;
061: /** items index for next put, offer, or add. */
062: private transient int putIndex;
063: /** Number of items in the queue */
064: private int count;
065:
066: /*
067: * Concurrency control uses the classic two-condition algorithm
068: * found in any textbook.
069: */
070:
071: /** Main lock guarding all access */
072: private final ReentrantLock lock;
073: /** Condition for waiting takes */
074: private final Condition notEmpty;
075: /** Condition for waiting puts */
076: private final Condition notFull;
077:
078: // Internal helper methods
079:
080: /**
081: * Circularly increment i.
082: */
083: final int inc(int i) {
084: return (++i == items.length) ? 0 : i;
085: }
086:
087: /**
088: * Insert element at current put position, advance, and signal.
089: * Call only when holding lock.
090: */
091: private void insert(E x) {
092: items[putIndex] = x;
093: putIndex = inc(putIndex);
094: ++count;
095: notEmpty.signal();
096: }
097:
098: /**
099: * Extract element at current take position, advance, and signal.
100: * Call only when holding lock.
101: */
102: private E extract() {
103: final E[] items = this .items;
104: E x = items[takeIndex];
105: items[takeIndex] = null;
106: takeIndex = inc(takeIndex);
107: --count;
108: notFull.signal();
109: return x;
110: }
111:
112: /**
113: * Utility for remove and iterator.remove: Delete item at position i.
114: * Call only when holding lock.
115: */
116: void removeAt(int i) {
117: final E[] items = this .items;
118: // if removing front item, just advance
119: if (i == takeIndex) {
120: items[takeIndex] = null;
121: takeIndex = inc(takeIndex);
122: } else {
123: // slide over all others up through putIndex.
124: for (;;) {
125: int nexti = inc(i);
126: if (nexti != putIndex) {
127: items[i] = items[nexti];
128: i = nexti;
129: } else {
130: items[i] = null;
131: putIndex = i;
132: break;
133: }
134: }
135: }
136: --count;
137: notFull.signal();
138: }
139:
140: /**
141: * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
142: * capacity and default access policy.
143: * @param capacity the capacity of this queue
144: * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
145: */
146: public ArrayBlockingQueue(int capacity) {
147: this (capacity, false);
148: }
149:
150: /**
151: * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
152: * capacity and the specified access policy.
153: * @param capacity the capacity of this queue
154: * @param fair if <tt>true</tt> then queue accesses for threads blocked
155: * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
156: * the access order is unspecified.
157: * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
158: */
159: public ArrayBlockingQueue(int capacity, boolean fair) {
160: if (capacity <= 0)
161: throw new IllegalArgumentException();
162: this .items = (E[]) new Object[capacity];
163: lock = new ReentrantLock(fair);
164: notEmpty = lock.newCondition();
165: notFull = lock.newCondition();
166: }
167:
168: /**
169: * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
170: * capacity, the specified access policy and initially containing the
171: * elements of the given collection,
172: * added in traversal order of the collection's iterator.
173: * @param capacity the capacity of this queue
174: * @param fair if <tt>true</tt> then queue accesses for threads blocked
175: * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
176: * the access order is unspecified.
177: * @param c the collection of elements to initially contain
178: * @throws IllegalArgumentException if <tt>capacity</tt> is less than
179: * <tt>c.size()</tt>, or less than 1.
180: * @throws NullPointerException if <tt>c</tt> or any element within it
181: * is <tt>null</tt>
182: */
183: public ArrayBlockingQueue(int capacity, boolean fair,
184: Collection<? extends E> c) {
185: this (capacity, fair);
186: if (capacity < c.size())
187: throw new IllegalArgumentException();
188:
189: for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
190: add(it.next());
191: }
192:
193: /**
194: * Inserts the specified element at the tail of this queue if possible,
195: * returning immediately if this queue is full.
196: *
197: * @param o the element to add.
198: * @return <tt>true</tt> if it was possible to add the element to
199: * this queue, else <tt>false</tt>
200: * @throws NullPointerException if the specified element is <tt>null</tt>
201: */
202: public boolean offer(E o) {
203: if (o == null)
204: throw new NullPointerException();
205: final ReentrantLock lock = this .lock;
206: lock.lock();
207: try {
208: if (count == items.length)
209: return false;
210: else {
211: insert(o);
212: return true;
213: }
214: } finally {
215: lock.unlock();
216: }
217: }
218:
219: /**
220: * Inserts the specified element at the tail of this queue, waiting if
221: * necessary up to the specified wait time for space to become available.
222: * @param o the element to add
223: * @param timeout how long to wait before giving up, in units of
224: * <tt>unit</tt>
225: * @param unit a <tt>TimeUnit</tt> determining how to interpret the
226: * <tt>timeout</tt> parameter
227: * @return <tt>true</tt> if successful, or <tt>false</tt> if
228: * the specified waiting time elapses before space is available.
229: * @throws InterruptedException if interrupted while waiting.
230: * @throws NullPointerException if the specified element is <tt>null</tt>.
231: */
232: public boolean offer(E o, long timeout, TimeUnit unit)
233: throws InterruptedException {
234:
235: if (o == null)
236: throw new NullPointerException();
237: final ReentrantLock lock = this .lock;
238: lock.lockInterruptibly();
239: try {
240: long nanos = unit.toNanos(timeout);
241: for (;;) {
242: if (count != items.length) {
243: insert(o);
244: return true;
245: }
246: if (nanos <= 0)
247: return false;
248: try {
249: nanos = notFull.awaitNanos(nanos);
250: } catch (InterruptedException ie) {
251: notFull.signal(); // propagate to non-interrupted thread
252: throw ie;
253: }
254: }
255: } finally {
256: lock.unlock();
257: }
258: }
259:
260: public E poll() {
261: final ReentrantLock lock = this .lock;
262: lock.lock();
263: try {
264: if (count == 0)
265: return null;
266: E x = extract();
267: return x;
268: } finally {
269: lock.unlock();
270: }
271: }
272:
273: public E poll(long timeout, TimeUnit unit)
274: throws InterruptedException {
275: final ReentrantLock lock = this .lock;
276: lock.lockInterruptibly();
277: try {
278: long nanos = unit.toNanos(timeout);
279: for (;;) {
280: if (count != 0) {
281: E x = extract();
282: return x;
283: }
284: if (nanos <= 0)
285: return null;
286: try {
287: nanos = notEmpty.awaitNanos(nanos);
288: } catch (InterruptedException ie) {
289: notEmpty.signal(); // propagate to non-interrupted thread
290: throw ie;
291: }
292:
293: }
294: } finally {
295: lock.unlock();
296: }
297: }
298:
299: public boolean remove(Object o) {
300: if (o == null)
301: return false;
302: final E[] items = this .items;
303: final ReentrantLock lock = this .lock;
304: lock.lock();
305: try {
306: int i = takeIndex;
307: int k = 0;
308: for (;;) {
309: if (k++ >= count)
310: return false;
311: if (o.equals(items[i])) {
312: removeAt(i);
313: return true;
314: }
315: i = inc(i);
316: }
317:
318: } finally {
319: lock.unlock();
320: }
321: }
322:
323: public E peek() {
324: final ReentrantLock lock = this .lock;
325: lock.lock();
326: try {
327: return (count == 0) ? null : items[takeIndex];
328: } finally {
329: lock.unlock();
330: }
331: }
332:
333: public E take() throws InterruptedException {
334: final ReentrantLock lock = this .lock;
335: lock.lockInterruptibly();
336: try {
337: try {
338: while (count == 0)
339: notEmpty.await();
340: } catch (InterruptedException ie) {
341: notEmpty.signal(); // propagate to non-interrupted thread
342: throw ie;
343: }
344: E x = extract();
345: return x;
346: } finally {
347: lock.unlock();
348: }
349: }
350:
351: /**
352: * Adds the specified element to the tail of this queue, waiting if
353: * necessary for space to become available.
354: * @param o the element to add
355: * @throws InterruptedException if interrupted while waiting.
356: * @throws NullPointerException if the specified element is <tt>null</tt>.
357: */
358: public void put(E o) throws InterruptedException {
359: if (o == null)
360: throw new NullPointerException();
361: final E[] items = this .items;
362: final ReentrantLock lock = this .lock;
363: lock.lockInterruptibly();
364: try {
365: try {
366: while (count == items.length)
367: notFull.await();
368: } catch (InterruptedException ie) {
369: notFull.signal(); // propagate to non-interrupted thread
370: throw ie;
371: }
372: insert(o);
373: } finally {
374: lock.unlock();
375: }
376: }
377:
378: // this doc comment is overridden to remove the reference to collections
379: // greater in size than Integer.MAX_VALUE
380: /**
381: * Returns the number of elements in this queue.
382: *
383: * @return the number of elements in this queue.
384: */
385: public int size() {
386: final ReentrantLock lock = this .lock;
387: lock.lock();
388: try {
389: return count;
390: } finally {
391: lock.unlock();
392: }
393: }
394:
395: // this doc comment is a modified copy of the inherited doc comment,
396: // without the reference to unlimited queues.
397: /**
398: * Returns the number of elements that this queue can ideally (in
399: * the absence of memory or resource constraints) accept without
400: * blocking. This is always equal to the initial capacity of this queue
401: * less the current <tt>size</tt> of this queue.
402: * <p>Note that you <em>cannot</em> always tell if
403: * an attempt to <tt>add</tt> an element will succeed by
404: * inspecting <tt>remainingCapacity</tt> because it may be the
405: * case that a waiting consumer is ready to <tt>take</tt> an
406: * element out of an otherwise full queue.
407: */
408: public int remainingCapacity() {
409: final ReentrantLock lock = this .lock;
410: lock.lock();
411: try {
412: return items.length - count;
413: } finally {
414: lock.unlock();
415: }
416: }
417:
418: public boolean contains(Object o) {
419: if (o == null)
420: return false;
421: final E[] items = this .items;
422: final ReentrantLock lock = this .lock;
423: lock.lock();
424: try {
425: int i = takeIndex;
426: int k = 0;
427: while (k++ < count) {
428: if (o.equals(items[i]))
429: return true;
430: i = inc(i);
431: }
432: return false;
433: } finally {
434: lock.unlock();
435: }
436: }
437:
438: public Object[] toArray() {
439: final E[] items = this .items;
440: final ReentrantLock lock = this .lock;
441: lock.lock();
442: try {
443: Object[] a = new Object[count];
444: int k = 0;
445: int i = takeIndex;
446: while (k < count) {
447: a[k++] = items[i];
448: i = inc(i);
449: }
450: return a;
451: } finally {
452: lock.unlock();
453: }
454: }
455:
456: public <T> T[] toArray(T[] a) {
457: final E[] items = this .items;
458: final ReentrantLock lock = this .lock;
459: lock.lock();
460: try {
461: if (a.length < count)
462: a = (T[]) java.lang.reflect.Array.newInstance(a
463: .getClass().getComponentType(), count);
464:
465: int k = 0;
466: int i = takeIndex;
467: while (k < count) {
468: a[k++] = (T) items[i];
469: i = inc(i);
470: }
471: if (a.length > count)
472: a[count] = null;
473: return a;
474: } finally {
475: lock.unlock();
476: }
477: }
478:
479: public String toString() {
480: final ReentrantLock lock = this .lock;
481: lock.lock();
482: try {
483: return super .toString();
484: } finally {
485: lock.unlock();
486: }
487: }
488:
489: public void clear() {
490: final E[] items = this .items;
491: final ReentrantLock lock = this .lock;
492: lock.lock();
493: try {
494: int i = takeIndex;
495: int k = count;
496: while (k-- > 0) {
497: items[i] = null;
498: i = inc(i);
499: }
500: count = 0;
501: putIndex = 0;
502: takeIndex = 0;
503: notFull.signalAll();
504: } finally {
505: lock.unlock();
506: }
507: }
508:
509: public int drainTo(Collection<? super E> c) {
510: if (c == null)
511: throw new NullPointerException();
512: if (c == this )
513: throw new IllegalArgumentException();
514: final E[] items = this .items;
515: final ReentrantLock lock = this .lock;
516: lock.lock();
517: try {
518: int i = takeIndex;
519: int n = 0;
520: int max = count;
521: while (n < max) {
522: c.add(items[i]);
523: items[i] = null;
524: i = inc(i);
525: ++n;
526: }
527: if (n > 0) {
528: count = 0;
529: putIndex = 0;
530: takeIndex = 0;
531: notFull.signalAll();
532: }
533: return n;
534: } finally {
535: lock.unlock();
536: }
537: }
538:
539: public int drainTo(Collection<? super E> c, int maxElements) {
540: if (c == null)
541: throw new NullPointerException();
542: if (c == this )
543: throw new IllegalArgumentException();
544: if (maxElements <= 0)
545: return 0;
546: final E[] items = this .items;
547: final ReentrantLock lock = this .lock;
548: lock.lock();
549: try {
550: int i = takeIndex;
551: int n = 0;
552: int sz = count;
553: int max = (maxElements < count) ? maxElements : count;
554: while (n < max) {
555: c.add(items[i]);
556: items[i] = null;
557: i = inc(i);
558: ++n;
559: }
560: if (n > 0) {
561: count -= n;
562: takeIndex = i;
563: notFull.signalAll();
564: }
565: return n;
566: } finally {
567: lock.unlock();
568: }
569: }
570:
571: /**
572: * Returns an iterator over the elements in this queue in proper sequence.
573: * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
574: * will never throw {@link java.util.ConcurrentModificationException},
575: * and guarantees to traverse elements as they existed upon
576: * construction of the iterator, and may (but is not guaranteed to)
577: * reflect any modifications subsequent to construction.
578: *
579: * @return an iterator over the elements in this queue in proper sequence.
580: */
581: public Iterator<E> iterator() {
582: final ReentrantLock lock = this .lock;
583: lock.lock();
584: try {
585: return new Itr();
586: } finally {
587: lock.unlock();
588: }
589: }
590:
591: /**
592: * Iterator for ArrayBlockingQueue
593: */
594: private class Itr implements Iterator<E> {
595: /**
596: * Index of element to be returned by next,
597: * or a negative number if no such.
598: */
599: private int nextIndex;
600:
601: /**
602: * nextItem holds on to item fields because once we claim
603: * that an element exists in hasNext(), we must return it in
604: * the following next() call even if it was in the process of
605: * being removed when hasNext() was called.
606: **/
607: private E nextItem;
608:
609: /**
610: * Index of element returned by most recent call to next.
611: * Reset to -1 if this element is deleted by a call to remove.
612: */
613: private int lastRet;
614:
615: Itr() {
616: lastRet = -1;
617: if (count == 0)
618: nextIndex = -1;
619: else {
620: nextIndex = takeIndex;
621: nextItem = items[takeIndex];
622: }
623: }
624:
625: public boolean hasNext() {
626: /*
627: * No sync. We can return true by mistake here
628: * only if this iterator passed across threads,
629: * which we don't support anyway.
630: */
631: return nextIndex >= 0;
632: }
633:
634: /**
635: * Check whether nextIndex is valid; if so setting nextItem.
636: * Stops iterator when either hits putIndex or sees null item.
637: */
638: private void checkNext() {
639: if (nextIndex == putIndex) {
640: nextIndex = -1;
641: nextItem = null;
642: } else {
643: nextItem = items[nextIndex];
644: if (nextItem == null)
645: nextIndex = -1;
646: }
647: }
648:
649: public E next() {
650: final ReentrantLock lock = ArrayBlockingQueue.this .lock;
651: lock.lock();
652: try {
653: if (nextIndex < 0)
654: throw new NoSuchElementException();
655: lastRet = nextIndex;
656: E x = nextItem;
657: nextIndex = inc(nextIndex);
658: checkNext();
659: return x;
660: } finally {
661: lock.unlock();
662: }
663: }
664:
665: public void remove() {
666: final ReentrantLock lock = ArrayBlockingQueue.this .lock;
667: lock.lock();
668: try {
669: int i = lastRet;
670: if (i == -1)
671: throw new IllegalStateException();
672: lastRet = -1;
673:
674: int ti = takeIndex;
675: removeAt(i);
676: // back up cursor (reset to front if was first element)
677: nextIndex = (i == ti) ? takeIndex : i;
678: checkNext();
679: } finally {
680: lock.unlock();
681: }
682: }
683: }
684: }
|