001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package java.util.concurrent;
008:
009: import java.util.concurrent.locks.*;
010: import java.util.*;
011:
012: /**
013: * An unbounded {@linkplain BlockingQueue blocking queue} that uses
014: * the same ordering rules as class {@link PriorityQueue} and supplies
015: * blocking retrieval operations. While this queue is logically
016: * unbounded, attempted additions may fail due to resource exhaustion
017: * (causing <tt>OutOfMemoryError</tt>). This class does not permit
018: * <tt>null</tt> elements. A priority queue relying on natural
019: * ordering also does not permit insertion of non-comparable objects
020: * (doing so results in <tt>ClassCastException</tt>).
021: *
022: * <p>This class implements all of the <em>optional</em> methods
023: * of the {@link Collection} and {@link Iterator} interfaces.
024: * <p>The Iterator provided in method {@link #iterator()} is
025: * <em>not</em> guaranteed to traverse the elements of the
026: * PriorityBlockingQueue in any particular order. If you need ordered
027: * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
028: *
029: * <p>This class is a member of the
030: * <a href="{@docRoot}/../guide/collections/index.html">
031: * Java Collections Framework</a>.
032: *
033: * @since 1.5
034: * @author Doug Lea
035: * @param <E> the type of elements held in this collection
036: */
037: public class PriorityBlockingQueue<E> extends AbstractQueue<E>
038: implements BlockingQueue<E>, java.io.Serializable {
039: private static final long serialVersionUID = 5595510919245408276L;
040:
041: private final PriorityQueue<E> q;
042: private final ReentrantLock lock = new ReentrantLock(true);
043: private final Condition notEmpty = lock.newCondition();
044:
045: /**
046: * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
047: * capacity
048: * (11) that orders its elements according to their natural
049: * ordering (using <tt>Comparable</tt>).
050: */
051: public PriorityBlockingQueue() {
052: q = new PriorityQueue<E>();
053: }
054:
055: /**
056: * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
057: * capacity
058: * that orders its elements according to their natural ordering
059: * (using <tt>Comparable</tt>).
060: *
061: * @param initialCapacity the initial capacity for this priority queue.
062: * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
063: * than 1
064: */
065: public PriorityBlockingQueue(int initialCapacity) {
066: q = new PriorityQueue<E>(initialCapacity, null);
067: }
068:
069: /**
070: * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
071: * capacity
072: * that orders its elements according to the specified comparator.
073: *
074: * @param initialCapacity the initial capacity for this priority queue.
075: * @param comparator the comparator used to order this priority queue.
076: * If <tt>null</tt> then the order depends on the elements' natural
077: * ordering.
078: * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
079: * than 1
080: */
081: public PriorityBlockingQueue(int initialCapacity,
082: Comparator<? super E> comparator) {
083: q = new PriorityQueue<E>(initialCapacity, comparator);
084: }
085:
086: /**
087: * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
088: * in the specified collection. The priority queue has an initial
089: * capacity of 110% of the size of the specified collection. If
090: * the specified collection is a {@link SortedSet} or a {@link
091: * PriorityQueue}, this priority queue will be sorted according to
092: * the same comparator, or according to its elements' natural
093: * order if the collection is sorted according to its elements'
094: * natural order. Otherwise, this priority queue is ordered
095: * according to its elements' natural order.
096: *
097: * @param c the collection whose elements are to be placed
098: * into this priority queue.
099: * @throws ClassCastException if elements of the specified collection
100: * cannot be compared to one another according to the priority
101: * queue's ordering.
102: * @throws NullPointerException if <tt>c</tt> or any element within it
103: * is <tt>null</tt>
104: */
105: public PriorityBlockingQueue(Collection<? extends E> c) {
106: q = new PriorityQueue<E>(c);
107: }
108:
109: // these first few override just to update doc comments
110:
111: /**
112: * Adds the specified element to this queue.
113: * @param o the element to add
114: * @return <tt>true</tt> (as per the general contract of
115: * <tt>Collection.add</tt>).
116: *
117: * @throws NullPointerException if the specified element is <tt>null</tt>.
118: * @throws ClassCastException if the specified element cannot be compared
119: * with elements currently in the priority queue according
120: * to the priority queue's ordering.
121: */
122: public boolean add(E o) {
123: return super .add(o);
124: }
125:
126: /**
127: * Returns the comparator used to order this collection, or <tt>null</tt>
128: * if this collection is sorted according to its elements natural ordering
129: * (using <tt>Comparable</tt>).
130: *
131: * @return the comparator used to order this collection, or <tt>null</tt>
132: * if this collection is sorted according to its elements natural ordering.
133: */
134: public Comparator comparator() {
135: return q.comparator();
136: }
137:
138: /**
139: * Inserts the specified element into this priority queue.
140: *
141: * @param o the element to add
142: * @return <tt>true</tt>
143: * @throws ClassCastException if the specified element cannot be compared
144: * with elements currently in the priority queue according
145: * to the priority queue's ordering.
146: * @throws NullPointerException if the specified element is <tt>null</tt>.
147: */
148: public boolean offer(E o) {
149: if (o == null)
150: throw new NullPointerException();
151: final ReentrantLock lock = this .lock;
152: lock.lock();
153: try {
154: boolean ok = q.offer(o);
155: assert ok;
156: notEmpty.signal();
157: return true;
158: } finally {
159: lock.unlock();
160: }
161: }
162:
163: /**
164: * Adds the specified element to this priority queue. As the queue is
165: * unbounded this method will never block.
166: * @param o the element to add
167: * @throws ClassCastException if the element cannot be compared
168: * with elements currently in the priority queue according
169: * to the priority queue's ordering.
170: * @throws NullPointerException if the specified element is <tt>null</tt>.
171: */
172: public void put(E o) {
173: offer(o); // never need to block
174: }
175:
176: /**
177: * Inserts the specified element into this priority queue. As the queue is
178: * unbounded this method will never block.
179: * @param o the element to add
180: * @param timeout This parameter is ignored as the method never blocks
181: * @param unit This parameter is ignored as the method never blocks
182: * @return <tt>true</tt>
183: * @throws ClassCastException if the element cannot be compared
184: * with elements currently in the priority queue according
185: * to the priority queue's ordering.
186: * @throws NullPointerException if the specified element is <tt>null</tt>.
187: */
188: public boolean offer(E o, long timeout, TimeUnit unit) {
189: return offer(o); // never need to block
190: }
191:
192: public E take() throws InterruptedException {
193: final ReentrantLock lock = this .lock;
194: lock.lockInterruptibly();
195: try {
196: try {
197: while (q.size() == 0)
198: notEmpty.await();
199: } catch (InterruptedException ie) {
200: notEmpty.signal(); // propagate to non-interrupted thread
201: throw ie;
202: }
203: E x = q.poll();
204: assert x != null;
205: return x;
206: } finally {
207: lock.unlock();
208: }
209: }
210:
211: public E poll() {
212: final ReentrantLock lock = this .lock;
213: lock.lock();
214: try {
215: return q.poll();
216: } finally {
217: lock.unlock();
218: }
219: }
220:
221: public E poll(long timeout, TimeUnit unit)
222: throws InterruptedException {
223: long nanos = unit.toNanos(timeout);
224: final ReentrantLock lock = this .lock;
225: lock.lockInterruptibly();
226: try {
227: for (;;) {
228: E x = q.poll();
229: if (x != null)
230: return x;
231: if (nanos <= 0)
232: return null;
233: try {
234: nanos = notEmpty.awaitNanos(nanos);
235: } catch (InterruptedException ie) {
236: notEmpty.signal(); // propagate to non-interrupted thread
237: throw ie;
238: }
239: }
240: } finally {
241: lock.unlock();
242: }
243: }
244:
245: public E peek() {
246: final ReentrantLock lock = this .lock;
247: lock.lock();
248: try {
249: return q.peek();
250: } finally {
251: lock.unlock();
252: }
253: }
254:
255: public int size() {
256: final ReentrantLock lock = this .lock;
257: lock.lock();
258: try {
259: return q.size();
260: } finally {
261: lock.unlock();
262: }
263: }
264:
265: /**
266: * Always returns <tt>Integer.MAX_VALUE</tt> because
267: * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
268: * @return <tt>Integer.MAX_VALUE</tt>
269: */
270: public int remainingCapacity() {
271: return Integer.MAX_VALUE;
272: }
273:
274: public boolean remove(Object o) {
275: final ReentrantLock lock = this .lock;
276: lock.lock();
277: try {
278: return q.remove(o);
279: } finally {
280: lock.unlock();
281: }
282: }
283:
284: public boolean contains(Object o) {
285: final ReentrantLock lock = this .lock;
286: lock.lock();
287: try {
288: return q.contains(o);
289: } finally {
290: lock.unlock();
291: }
292: }
293:
294: public Object[] toArray() {
295: final ReentrantLock lock = this .lock;
296: lock.lock();
297: try {
298: return q.toArray();
299: } finally {
300: lock.unlock();
301: }
302: }
303:
304: public String toString() {
305: final ReentrantLock lock = this .lock;
306: lock.lock();
307: try {
308: return q.toString();
309: } finally {
310: lock.unlock();
311: }
312: }
313:
314: public int drainTo(Collection<? super E> c) {
315: if (c == null)
316: throw new NullPointerException();
317: if (c == this )
318: throw new IllegalArgumentException();
319: final ReentrantLock lock = this .lock;
320: lock.lock();
321: try {
322: int n = 0;
323: E e;
324: while ((e = q.poll()) != null) {
325: c.add(e);
326: ++n;
327: }
328: return n;
329: } finally {
330: lock.unlock();
331: }
332: }
333:
334: public int drainTo(Collection<? super E> c, int maxElements) {
335: if (c == null)
336: throw new NullPointerException();
337: if (c == this )
338: throw new IllegalArgumentException();
339: if (maxElements <= 0)
340: return 0;
341: final ReentrantLock lock = this .lock;
342: lock.lock();
343: try {
344: int n = 0;
345: E e;
346: while (n < maxElements && (e = q.poll()) != null) {
347: c.add(e);
348: ++n;
349: }
350: return n;
351: } finally {
352: lock.unlock();
353: }
354: }
355:
356: /**
357: * Atomically removes all of the elements from this delay queue.
358: * The queue will be empty after this call returns.
359: */
360: public void clear() {
361: final ReentrantLock lock = this .lock;
362: lock.lock();
363: try {
364: q.clear();
365: } finally {
366: lock.unlock();
367: }
368: }
369:
370: public <T> T[] toArray(T[] a) {
371: final ReentrantLock lock = this .lock;
372: lock.lock();
373: try {
374: return q.toArray(a);
375: } finally {
376: lock.unlock();
377: }
378: }
379:
380: /**
381: * Returns an iterator over the elements in this queue. The
382: * iterator does not return the elements in any particular order.
383: * The returned iterator is a thread-safe "fast-fail" iterator
384: * that will throw {@link
385: * java.util.ConcurrentModificationException} upon detected
386: * interference.
387: *
388: * @return an iterator over the elements in this queue.
389: */
390: public Iterator<E> iterator() {
391: final ReentrantLock lock = this .lock;
392: lock.lock();
393: try {
394: return new Itr(q.iterator());
395: } finally {
396: lock.unlock();
397: }
398: }
399:
400: private class Itr<E> implements Iterator<E> {
401: private final Iterator<E> iter;
402:
403: Itr(Iterator<E> i) {
404: iter = i;
405: }
406:
407: public boolean hasNext() {
408: /*
409: * No sync -- we rely on underlying hasNext to be
410: * stateless, in which case we can return true by mistake
411: * only when next() will subsequently throw
412: * ConcurrentModificationException.
413: */
414: return iter.hasNext();
415: }
416:
417: public E next() {
418: ReentrantLock lock = PriorityBlockingQueue.this .lock;
419: lock.lock();
420: try {
421: return iter.next();
422: } finally {
423: lock.unlock();
424: }
425: }
426:
427: public void remove() {
428: ReentrantLock lock = PriorityBlockingQueue.this .lock;
429: lock.lock();
430: try {
431: iter.remove();
432: } finally {
433: lock.unlock();
434: }
435: }
436: }
437:
438: /**
439: * Save the state to a stream (that is, serialize it). This
440: * merely wraps default serialization within lock. The
441: * serialization strategy for items is left to underlying
442: * Queue. Note that locking is not needed on deserialization, so
443: * readObject is not defined, just relying on default.
444: */
445: private void writeObject(java.io.ObjectOutputStream s)
446: throws java.io.IOException {
447: lock.lock();
448: try {
449: s.defaultWriteObject();
450: } finally {
451: lock.unlock();
452: }
453: }
454:
455: }
|