001 /*
002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
003 *
004 * This code is free software; you can redistribute it and/or modify it
005 * under the terms of the GNU General Public License version 2 only, as
006 * published by the Free Software Foundation. Sun designates this
007 * particular file as subject to the "Classpath" exception as provided
008 * by Sun in the LICENSE file that accompanied this code.
009 *
010 * This code is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
013 * version 2 for more details (a copy is included in the LICENSE file that
014 * accompanied this code).
015 *
016 * You should have received a copy of the GNU General Public License version
017 * 2 along with this work; if not, write to the Free Software Foundation,
018 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
019 *
020 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
021 * CA 95054 USA or visit www.sun.com if you need additional information or
022 * have any questions.
023 */
024
025 /*
026 * This file is available under and governed by the GNU General Public
027 * License version 2 only, as published by the Free Software Foundation.
028 * However, the following notice accompanied the original version of this
029 * file:
030 *
031 * Written by Doug Lea with assistance from members of JCP JSR-166
032 * Expert Group and released to the public domain, as explained at
033 * http://creativecommons.org/licenses/publicdomain
034 */
035
036 package java.util.concurrent;
037
038 import java.util.concurrent.locks.*;
039 import java.util.*;
040
041 /**
042 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
043 * the same ordering rules as class {@link PriorityQueue} and supplies
044 * blocking retrieval operations. While this queue is logically
045 * unbounded, attempted additions may fail due to resource exhaustion
046 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
047 * <tt>null</tt> elements. A priority queue relying on {@linkplain
048 * Comparable natural ordering} also does not permit insertion of
049 * non-comparable objects (doing so results in
050 * <tt>ClassCastException</tt>).
051 *
052 * <p>This class and its iterator implement all of the
053 * <em>optional</em> methods of the {@link Collection} and {@link
054 * Iterator} interfaces. The Iterator provided in method {@link
055 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
056 * the PriorityBlockingQueue in any particular order. If you need
057 * ordered traversal, consider using
058 * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt>
059 * can be used to <em>remove</em> some or all elements in priority
060 * order and place them in another collection.
061 *
062 * <p>Operations on this class make no guarantees about the ordering
063 * of elements with equal priority. If you need to enforce an
064 * ordering, you can define custom classes or comparators that use a
065 * secondary key to break ties in primary priority values. For
066 * example, here is a class that applies first-in-first-out
067 * tie-breaking to comparable elements. To use it, you would insert a
068 * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
069 *
070 * <pre>
071 * class FIFOEntry<E extends Comparable<? super E>>
072 * implements Comparable<FIFOEntry<E>> {
073 * final static AtomicLong seq = new AtomicLong();
074 * final long seqNum;
075 * final E entry;
076 * public FIFOEntry(E entry) {
077 * seqNum = seq.getAndIncrement();
078 * this.entry = entry;
079 * }
080 * public E getEntry() { return entry; }
081 * public int compareTo(FIFOEntry<E> other) {
082 * int res = entry.compareTo(other.entry);
083 * if (res == 0 && other.entry != this.entry)
084 * res = (seqNum < other.seqNum ? -1 : 1);
085 * return res;
086 * }
087 * }</pre>
088 *
089 * <p>This class is a member of the
090 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
091 * Java Collections Framework</a>.
092 *
093 * @since 1.5
094 * @author Doug Lea
095 * @param <E> the type of elements held in this collection
096 */
097 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
098 implements BlockingQueue<E>, java.io.Serializable {
099 private static final long serialVersionUID = 5595510919245408276L;
100
101 private final PriorityQueue<E> q;
102 private final ReentrantLock lock = new ReentrantLock(true);
103 private final Condition notEmpty = lock.newCondition();
104
105 /**
106 * Creates a <tt>PriorityBlockingQueue</tt> with the default
107 * initial capacity (11) that orders its elements according to
108 * their {@linkplain Comparable natural ordering}.
109 */
110 public PriorityBlockingQueue() {
111 q = new PriorityQueue<E>();
112 }
113
114 /**
115 * Creates a <tt>PriorityBlockingQueue</tt> with the specified
116 * initial capacity that orders its elements according to their
117 * {@linkplain Comparable natural ordering}.
118 *
119 * @param initialCapacity the initial capacity for this priority queue
120 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
121 * than 1
122 */
123 public PriorityBlockingQueue(int initialCapacity) {
124 q = new PriorityQueue<E>(initialCapacity, null);
125 }
126
127 /**
128 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
129 * capacity that orders its elements according to the specified
130 * comparator.
131 *
132 * @param initialCapacity the initial capacity for this priority queue
133 * @param comparator the comparator that will be used to order this
134 * priority queue. If {@code null}, the {@linkplain Comparable
135 * natural ordering} of the elements will be used.
136 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
137 * than 1
138 */
139 public PriorityBlockingQueue(int initialCapacity,
140 Comparator<? super E> comparator) {
141 q = new PriorityQueue<E>(initialCapacity, comparator);
142 }
143
144 /**
145 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
146 * in the specified collection. If the specified collection is a
147 * {@link SortedSet} or a {@link PriorityQueue}, this
148 * priority queue will be ordered according to the same ordering.
149 * Otherwise, this priority queue will be ordered according to the
150 * {@linkplain Comparable natural ordering} of its elements.
151 *
152 * @param c the collection whose elements are to be placed
153 * into this priority queue
154 * @throws ClassCastException if elements of the specified collection
155 * cannot be compared to one another according to the priority
156 * queue's ordering
157 * @throws NullPointerException if the specified collection or any
158 * of its elements are null
159 */
160 public PriorityBlockingQueue(Collection<? extends E> c) {
161 q = new PriorityQueue<E>(c);
162 }
163
164 /**
165 * Inserts the specified element into this priority queue.
166 *
167 * @param e the element to add
168 * @return <tt>true</tt> (as specified by {@link Collection#add})
169 * @throws ClassCastException if the specified element cannot be compared
170 * with elements currently in the priority queue according to the
171 * priority queue's ordering
172 * @throws NullPointerException if the specified element is null
173 */
174 public boolean add(E e) {
175 return offer(e);
176 }
177
178 /**
179 * Inserts the specified element into this priority queue.
180 *
181 * @param e the element to add
182 * @return <tt>true</tt> (as specified by {@link Queue#offer})
183 * @throws ClassCastException if the specified element cannot be compared
184 * with elements currently in the priority queue according to the
185 * priority queue's ordering
186 * @throws NullPointerException if the specified element is null
187 */
188 public boolean offer(E e) {
189 final ReentrantLock lock = this .lock;
190 lock.lock();
191 try {
192 boolean ok = q.offer(e);
193 assert ok;
194 notEmpty.signal();
195 return true;
196 } finally {
197 lock.unlock();
198 }
199 }
200
201 /**
202 * Inserts the specified element into this priority queue. As the queue is
203 * unbounded this method will never block.
204 *
205 * @param e the element to add
206 * @throws ClassCastException if the specified element cannot be compared
207 * with elements currently in the priority queue according to the
208 * priority queue's ordering
209 * @throws NullPointerException if the specified element is null
210 */
211 public void put(E e) {
212 offer(e); // never need to block
213 }
214
215 /**
216 * Inserts the specified element into this priority queue. As the queue is
217 * unbounded this method will never block.
218 *
219 * @param e the element to add
220 * @param timeout This parameter is ignored as the method never blocks
221 * @param unit This parameter is ignored as the method never blocks
222 * @return <tt>true</tt>
223 * @throws ClassCastException if the specified element cannot be compared
224 * with elements currently in the priority queue according to the
225 * priority queue's ordering
226 * @throws NullPointerException if the specified element is null
227 */
228 public boolean offer(E e, long timeout, TimeUnit unit) {
229 return offer(e); // never need to block
230 }
231
232 public E poll() {
233 final ReentrantLock lock = this .lock;
234 lock.lock();
235 try {
236 return q.poll();
237 } finally {
238 lock.unlock();
239 }
240 }
241
242 public E take() throws InterruptedException {
243 final ReentrantLock lock = this .lock;
244 lock.lockInterruptibly();
245 try {
246 try {
247 while (q.size() == 0)
248 notEmpty.await();
249 } catch (InterruptedException ie) {
250 notEmpty.signal(); // propagate to non-interrupted thread
251 throw ie;
252 }
253 E x = q.poll();
254 assert x != null;
255 return x;
256 } finally {
257 lock.unlock();
258 }
259 }
260
261 public E poll(long timeout, TimeUnit unit)
262 throws InterruptedException {
263 long nanos = unit.toNanos(timeout);
264 final ReentrantLock lock = this .lock;
265 lock.lockInterruptibly();
266 try {
267 for (;;) {
268 E x = q.poll();
269 if (x != null)
270 return x;
271 if (nanos <= 0)
272 return null;
273 try {
274 nanos = notEmpty.awaitNanos(nanos);
275 } catch (InterruptedException ie) {
276 notEmpty.signal(); // propagate to non-interrupted thread
277 throw ie;
278 }
279 }
280 } finally {
281 lock.unlock();
282 }
283 }
284
285 public E peek() {
286 final ReentrantLock lock = this .lock;
287 lock.lock();
288 try {
289 return q.peek();
290 } finally {
291 lock.unlock();
292 }
293 }
294
295 /**
296 * Returns the comparator used to order the elements in this queue,
297 * or <tt>null</tt> if this queue uses the {@linkplain Comparable
298 * natural ordering} of its elements.
299 *
300 * @return the comparator used to order the elements in this queue,
301 * or <tt>null</tt> if this queue uses the natural
302 * ordering of its elements
303 */
304 public Comparator<? super E> comparator() {
305 return q.comparator();
306 }
307
308 public int size() {
309 final ReentrantLock lock = this .lock;
310 lock.lock();
311 try {
312 return q.size();
313 } finally {
314 lock.unlock();
315 }
316 }
317
318 /**
319 * Always returns <tt>Integer.MAX_VALUE</tt> because
320 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
321 * @return <tt>Integer.MAX_VALUE</tt>
322 */
323 public int remainingCapacity() {
324 return Integer.MAX_VALUE;
325 }
326
327 /**
328 * Removes a single instance of the specified element from this queue,
329 * if it is present. More formally, removes an element {@code e} such
330 * that {@code o.equals(e)}, if this queue contains one or more such
331 * elements. Returns {@code true} if and only if this queue contained
332 * the specified element (or equivalently, if this queue changed as a
333 * result of the call).
334 *
335 * @param o element to be removed from this queue, if present
336 * @return <tt>true</tt> if this queue changed as a result of the call
337 */
338 public boolean remove(Object o) {
339 final ReentrantLock lock = this .lock;
340 lock.lock();
341 try {
342 return q.remove(o);
343 } finally {
344 lock.unlock();
345 }
346 }
347
348 /**
349 * Returns {@code true} if this queue contains the specified element.
350 * More formally, returns {@code true} if and only if this queue contains
351 * at least one element {@code e} such that {@code o.equals(e)}.
352 *
353 * @param o object to be checked for containment in this queue
354 * @return <tt>true</tt> if this queue contains the specified element
355 */
356 public boolean contains(Object o) {
357 final ReentrantLock lock = this .lock;
358 lock.lock();
359 try {
360 return q.contains(o);
361 } finally {
362 lock.unlock();
363 }
364 }
365
366 /**
367 * Returns an array containing all of the elements in this queue.
368 * The returned array elements are in no particular order.
369 *
370 * <p>The returned array will be "safe" in that no references to it are
371 * maintained by this queue. (In other words, this method must allocate
372 * a new array). The caller is thus free to modify the returned array.
373 *
374 * <p>This method acts as bridge between array-based and collection-based
375 * APIs.
376 *
377 * @return an array containing all of the elements in this queue
378 */
379 public Object[] toArray() {
380 final ReentrantLock lock = this .lock;
381 lock.lock();
382 try {
383 return q.toArray();
384 } finally {
385 lock.unlock();
386 }
387 }
388
389 public String toString() {
390 final ReentrantLock lock = this .lock;
391 lock.lock();
392 try {
393 return q.toString();
394 } finally {
395 lock.unlock();
396 }
397 }
398
399 /**
400 * @throws UnsupportedOperationException {@inheritDoc}
401 * @throws ClassCastException {@inheritDoc}
402 * @throws NullPointerException {@inheritDoc}
403 * @throws IllegalArgumentException {@inheritDoc}
404 */
405 public int drainTo(Collection<? super E> c) {
406 if (c == null)
407 throw new NullPointerException();
408 if (c == this )
409 throw new IllegalArgumentException();
410 final ReentrantLock lock = this .lock;
411 lock.lock();
412 try {
413 int n = 0;
414 E e;
415 while ((e = q.poll()) != null) {
416 c.add(e);
417 ++n;
418 }
419 return n;
420 } finally {
421 lock.unlock();
422 }
423 }
424
425 /**
426 * @throws UnsupportedOperationException {@inheritDoc}
427 * @throws ClassCastException {@inheritDoc}
428 * @throws NullPointerException {@inheritDoc}
429 * @throws IllegalArgumentException {@inheritDoc}
430 */
431 public int drainTo(Collection<? super E> c, int maxElements) {
432 if (c == null)
433 throw new NullPointerException();
434 if (c == this )
435 throw new IllegalArgumentException();
436 if (maxElements <= 0)
437 return 0;
438 final ReentrantLock lock = this .lock;
439 lock.lock();
440 try {
441 int n = 0;
442 E e;
443 while (n < maxElements && (e = q.poll()) != null) {
444 c.add(e);
445 ++n;
446 }
447 return n;
448 } finally {
449 lock.unlock();
450 }
451 }
452
453 /**
454 * Atomically removes all of the elements from this queue.
455 * The queue will be empty after this call returns.
456 */
457 public void clear() {
458 final ReentrantLock lock = this .lock;
459 lock.lock();
460 try {
461 q.clear();
462 } finally {
463 lock.unlock();
464 }
465 }
466
467 /**
468 * Returns an array containing all of the elements in this queue; the
469 * runtime type of the returned array is that of the specified array.
470 * The returned array elements are in no particular order.
471 * If the queue fits in the specified array, it is returned therein.
472 * Otherwise, a new array is allocated with the runtime type of the
473 * specified array and the size of this queue.
474 *
475 * <p>If this queue fits in the specified array with room to spare
476 * (i.e., the array has more elements than this queue), the element in
477 * the array immediately following the end of the queue is set to
478 * <tt>null</tt>.
479 *
480 * <p>Like the {@link #toArray()} method, this method acts as bridge between
481 * array-based and collection-based APIs. Further, this method allows
482 * precise control over the runtime type of the output array, and may,
483 * under certain circumstances, be used to save allocation costs.
484 *
485 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
486 * The following code can be used to dump the queue into a newly
487 * allocated array of <tt>String</tt>:
488 *
489 * <pre>
490 * String[] y = x.toArray(new String[0]);</pre>
491 *
492 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
493 * <tt>toArray()</tt>.
494 *
495 * @param a the array into which the elements of the queue are to
496 * be stored, if it is big enough; otherwise, a new array of the
497 * same runtime type is allocated for this purpose
498 * @return an array containing all of the elements in this queue
499 * @throws ArrayStoreException if the runtime type of the specified array
500 * is not a supertype of the runtime type of every element in
501 * this queue
502 * @throws NullPointerException if the specified array is null
503 */
504 public <T> T[] toArray(T[] a) {
505 final ReentrantLock lock = this .lock;
506 lock.lock();
507 try {
508 return q.toArray(a);
509 } finally {
510 lock.unlock();
511 }
512 }
513
514 /**
515 * Returns an iterator over the elements in this queue. The
516 * iterator does not return the elements in any particular order.
517 * The returned <tt>Iterator</tt> is a "weakly consistent"
518 * iterator that will never throw {@link
519 * ConcurrentModificationException}, and guarantees to traverse
520 * elements as they existed upon construction of the iterator, and
521 * may (but is not guaranteed to) reflect any modifications
522 * subsequent to construction.
523 *
524 * @return an iterator over the elements in this queue
525 */
526 public Iterator<E> iterator() {
527 return new Itr(toArray());
528 }
529
530 /**
531 * Snapshot iterator that works off copy of underlying q array.
532 */
533 private class Itr implements Iterator<E> {
534 final Object[] array; // Array of all elements
535 int cursor; // index of next element to return;
536 int lastRet; // index of last element, or -1 if no such
537
538 Itr(Object[] array) {
539 lastRet = -1;
540 this .array = array;
541 }
542
543 public boolean hasNext() {
544 return cursor < array.length;
545 }
546
547 public E next() {
548 if (cursor >= array.length)
549 throw new NoSuchElementException();
550 lastRet = cursor;
551 return (E) array[cursor++];
552 }
553
554 public void remove() {
555 if (lastRet < 0)
556 throw new IllegalStateException();
557 Object x = array[lastRet];
558 lastRet = -1;
559 // Traverse underlying queue to find == element,
560 // not just a .equals element.
561 lock.lock();
562 try {
563 for (Iterator it = q.iterator(); it.hasNext();) {
564 if (it.next() == x) {
565 it.remove();
566 return;
567 }
568 }
569 } finally {
570 lock.unlock();
571 }
572 }
573 }
574
575 /**
576 * Saves the state to a stream (that is, serializes it). This
577 * merely wraps default serialization within lock. The
578 * serialization strategy for items is left to underlying
579 * Queue. Note that locking is not needed on deserialization, so
580 * readObject is not defined, just relying on default.
581 */
582 private void writeObject(java.io.ObjectOutputStream s)
583 throws java.io.IOException {
584 lock.lock();
585 try {
586 s.defaultWriteObject();
587 } finally {
588 lock.unlock();
589 }
590 }
591
592 }
|