001 /*
002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
003 *
004 * This code is free software; you can redistribute it and/or modify it
005 * under the terms of the GNU General Public License version 2 only, as
006 * published by the Free Software Foundation. Sun designates this
007 * particular file as subject to the "Classpath" exception as provided
008 * by Sun in the LICENSE file that accompanied this code.
009 *
010 * This code is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
013 * version 2 for more details (a copy is included in the LICENSE file that
014 * accompanied this code).
015 *
016 * You should have received a copy of the GNU General Public License version
017 * 2 along with this work; if not, write to the Free Software Foundation,
018 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
019 *
020 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
021 * CA 95054 USA or visit www.sun.com if you need additional information or
022 * have any questions.
023 */
024
025 /*
026 * This file is available under and governed by the GNU General Public
027 * License version 2 only, as published by the Free Software Foundation.
028 * However, the following notice accompanied the original version of this
029 * file:
030 *
031 * Written by Doug Lea with assistance from members of JCP JSR-166
032 * Expert Group and released to the public domain, as explained at
033 * http://creativecommons.org/licenses/publicdomain
034 */
035
036 package java.util.concurrent;
037
038 import java.util.concurrent.locks.*;
039 import java.util.*;
040
041 /**
042 * An unbounded {@linkplain BlockingQueue blocking queue} of
043 * <tt>Delayed</tt> elements, in which an element can only be taken
044 * when its delay has expired. The <em>head</em> of the queue is that
045 * <tt>Delayed</tt> element whose delay expired furthest in the
046 * past. If no delay has expired there is no head and <tt>poll</tt>
047 * will return <tt>null</tt>. Expiration occurs when an element's
048 * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
049 * than or equal to zero. Even though unexpired elements cannot be
050 * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
051 * treated as normal elements. For example, the <tt>size</tt> method
052 * returns the count of both expired and unexpired elements.
053 * This queue does not permit null elements.
054 *
055 * <p>This class and its iterator implement all of the
056 * <em>optional</em> methods of the {@link Collection} and {@link
057 * Iterator} interfaces.
058 *
059 * <p>This class is a member of the
060 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
061 * Java Collections Framework</a>.
062 *
063 * @since 1.5
064 * @author Doug Lea
065 * @param <E> the type of elements held in this collection
066 */
067
068 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
069 implements BlockingQueue<E> {
070
071 private transient final ReentrantLock lock = new ReentrantLock();
072 private transient final Condition available = lock.newCondition();
073 private final PriorityQueue<E> q = new PriorityQueue<E>();
074
075 /**
076 * Creates a new <tt>DelayQueue</tt> that is initially empty.
077 */
078 public DelayQueue() {
079 }
080
081 /**
082 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
083 * given collection of {@link Delayed} instances.
084 *
085 * @param c the collection of elements to initially contain
086 * @throws NullPointerException if the specified collection or any
087 * of its elements are null
088 */
089 public DelayQueue(Collection<? extends E> c) {
090 this .addAll(c);
091 }
092
093 /**
094 * Inserts the specified element into this delay queue.
095 *
096 * @param e the element to add
097 * @return <tt>true</tt> (as specified by {@link Collection#add})
098 * @throws NullPointerException if the specified element is null
099 */
100 public boolean add(E e) {
101 return offer(e);
102 }
103
104 /**
105 * Inserts the specified element into this delay queue.
106 *
107 * @param e the element to add
108 * @return <tt>true</tt>
109 * @throws NullPointerException if the specified element is null
110 */
111 public boolean offer(E e) {
112 final ReentrantLock lock = this .lock;
113 lock.lock();
114 try {
115 E first = q.peek();
116 q.offer(e);
117 if (first == null || e.compareTo(first) < 0)
118 available.signalAll();
119 return true;
120 } finally {
121 lock.unlock();
122 }
123 }
124
125 /**
126 * Inserts the specified element into this delay queue. As the queue is
127 * unbounded this method will never block.
128 *
129 * @param e the element to add
130 * @throws NullPointerException {@inheritDoc}
131 */
132 public void put(E e) {
133 offer(e);
134 }
135
136 /**
137 * Inserts the specified element into this delay queue. As the queue is
138 * unbounded this method will never block.
139 *
140 * @param e the element to add
141 * @param timeout This parameter is ignored as the method never blocks
142 * @param unit This parameter is ignored as the method never blocks
143 * @return <tt>true</tt>
144 * @throws NullPointerException {@inheritDoc}
145 */
146 public boolean offer(E e, long timeout, TimeUnit unit) {
147 return offer(e);
148 }
149
150 /**
151 * Retrieves and removes the head of this queue, or returns <tt>null</tt>
152 * if this queue has no elements with an expired delay.
153 *
154 * @return the head of this queue, or <tt>null</tt> if this
155 * queue has no elements with an expired delay
156 */
157 public E poll() {
158 final ReentrantLock lock = this .lock;
159 lock.lock();
160 try {
161 E first = q.peek();
162 if (first == null
163 || first.getDelay(TimeUnit.NANOSECONDS) > 0)
164 return null;
165 else {
166 E x = q.poll();
167 assert x != null;
168 if (q.size() != 0)
169 available.signalAll();
170 return x;
171 }
172 } finally {
173 lock.unlock();
174 }
175 }
176
177 /**
178 * Retrieves and removes the head of this queue, waiting if necessary
179 * until an element with an expired delay is available on this queue.
180 *
181 * @return the head of this queue
182 * @throws InterruptedException {@inheritDoc}
183 */
184 public E take() throws InterruptedException {
185 final ReentrantLock lock = this .lock;
186 lock.lockInterruptibly();
187 try {
188 for (;;) {
189 E first = q.peek();
190 if (first == null) {
191 available.await();
192 } else {
193 long delay = first.getDelay(TimeUnit.NANOSECONDS);
194 if (delay > 0) {
195 long tl = available.awaitNanos(delay);
196 } else {
197 E x = q.poll();
198 assert x != null;
199 if (q.size() != 0)
200 available.signalAll(); // wake up other takers
201 return x;
202
203 }
204 }
205 }
206 } finally {
207 lock.unlock();
208 }
209 }
210
211 /**
212 * Retrieves and removes the head of this queue, waiting if necessary
213 * until an element with an expired delay is available on this queue,
214 * or the specified wait time expires.
215 *
216 * @return the head of this queue, or <tt>null</tt> if the
217 * specified waiting time elapses before an element with
218 * an expired delay becomes available
219 * @throws InterruptedException {@inheritDoc}
220 */
221 public E poll(long timeout, TimeUnit unit)
222 throws InterruptedException {
223 long nanos = unit.toNanos(timeout);
224 final ReentrantLock lock = this .lock;
225 lock.lockInterruptibly();
226 try {
227 for (;;) {
228 E first = q.peek();
229 if (first == null) {
230 if (nanos <= 0)
231 return null;
232 else
233 nanos = available.awaitNanos(nanos);
234 } else {
235 long delay = first.getDelay(TimeUnit.NANOSECONDS);
236 if (delay > 0) {
237 if (nanos <= 0)
238 return null;
239 if (delay > nanos)
240 delay = nanos;
241 long timeLeft = available.awaitNanos(delay);
242 nanos -= delay - timeLeft;
243 } else {
244 E x = q.poll();
245 assert x != null;
246 if (q.size() != 0)
247 available.signalAll();
248 return x;
249 }
250 }
251 }
252 } finally {
253 lock.unlock();
254 }
255 }
256
257 /**
258 * Retrieves, but does not remove, the head of this queue, or
259 * returns <tt>null</tt> if this queue is empty. Unlike
260 * <tt>poll</tt>, if no expired elements are available in the queue,
261 * this method returns the element that will expire next,
262 * if one exists.
263 *
264 * @return the head of this queue, or <tt>null</tt> if this
265 * queue is empty.
266 */
267 public E peek() {
268 final ReentrantLock lock = this .lock;
269 lock.lock();
270 try {
271 return q.peek();
272 } finally {
273 lock.unlock();
274 }
275 }
276
277 public int size() {
278 final ReentrantLock lock = this .lock;
279 lock.lock();
280 try {
281 return q.size();
282 } finally {
283 lock.unlock();
284 }
285 }
286
287 /**
288 * @throws UnsupportedOperationException {@inheritDoc}
289 * @throws ClassCastException {@inheritDoc}
290 * @throws NullPointerException {@inheritDoc}
291 * @throws IllegalArgumentException {@inheritDoc}
292 */
293 public int drainTo(Collection<? super E> c) {
294 if (c == null)
295 throw new NullPointerException();
296 if (c == this )
297 throw new IllegalArgumentException();
298 final ReentrantLock lock = this .lock;
299 lock.lock();
300 try {
301 int n = 0;
302 for (;;) {
303 E first = q.peek();
304 if (first == null
305 || first.getDelay(TimeUnit.NANOSECONDS) > 0)
306 break;
307 c.add(q.poll());
308 ++n;
309 }
310 if (n > 0)
311 available.signalAll();
312 return n;
313 } finally {
314 lock.unlock();
315 }
316 }
317
318 /**
319 * @throws UnsupportedOperationException {@inheritDoc}
320 * @throws ClassCastException {@inheritDoc}
321 * @throws NullPointerException {@inheritDoc}
322 * @throws IllegalArgumentException {@inheritDoc}
323 */
324 public int drainTo(Collection<? super E> c, int maxElements) {
325 if (c == null)
326 throw new NullPointerException();
327 if (c == this )
328 throw new IllegalArgumentException();
329 if (maxElements <= 0)
330 return 0;
331 final ReentrantLock lock = this .lock;
332 lock.lock();
333 try {
334 int n = 0;
335 while (n < maxElements) {
336 E first = q.peek();
337 if (first == null
338 || first.getDelay(TimeUnit.NANOSECONDS) > 0)
339 break;
340 c.add(q.poll());
341 ++n;
342 }
343 if (n > 0)
344 available.signalAll();
345 return n;
346 } finally {
347 lock.unlock();
348 }
349 }
350
351 /**
352 * Atomically removes all of the elements from this delay queue.
353 * The queue will be empty after this call returns.
354 * Elements with an unexpired delay are not waited for; they are
355 * simply discarded from the queue.
356 */
357 public void clear() {
358 final ReentrantLock lock = this .lock;
359 lock.lock();
360 try {
361 q.clear();
362 } finally {
363 lock.unlock();
364 }
365 }
366
367 /**
368 * Always returns <tt>Integer.MAX_VALUE</tt> because
369 * a <tt>DelayQueue</tt> is not capacity constrained.
370 *
371 * @return <tt>Integer.MAX_VALUE</tt>
372 */
373 public int remainingCapacity() {
374 return Integer.MAX_VALUE;
375 }
376
377 /**
378 * Returns an array containing all of the elements in this queue.
379 * The returned array elements are in no particular order.
380 *
381 * <p>The returned array will be "safe" in that no references to it are
382 * maintained by this queue. (In other words, this method must allocate
383 * a new array). The caller is thus free to modify the returned array.
384 *
385 * <p>This method acts as bridge between array-based and collection-based
386 * APIs.
387 *
388 * @return an array containing all of the elements in this queue
389 */
390 public Object[] toArray() {
391 final ReentrantLock lock = this .lock;
392 lock.lock();
393 try {
394 return q.toArray();
395 } finally {
396 lock.unlock();
397 }
398 }
399
400 /**
401 * Returns an array containing all of the elements in this queue; the
402 * runtime type of the returned array is that of the specified array.
403 * The returned array elements are in no particular order.
404 * If the queue fits in the specified array, it is returned therein.
405 * Otherwise, a new array is allocated with the runtime type of the
406 * specified array and the size of this queue.
407 *
408 * <p>If this queue fits in the specified array with room to spare
409 * (i.e., the array has more elements than this queue), the element in
410 * the array immediately following the end of the queue is set to
411 * <tt>null</tt>.
412 *
413 * <p>Like the {@link #toArray()} method, this method acts as bridge between
414 * array-based and collection-based APIs. Further, this method allows
415 * precise control over the runtime type of the output array, and may,
416 * under certain circumstances, be used to save allocation costs.
417 *
418 * <p>The following code can be used to dump a delay queue into a newly
419 * allocated array of <tt>Delayed</tt>:
420 *
421 * <pre>
422 * Delayed[] a = q.toArray(new Delayed[0]);</pre>
423 *
424 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
425 * <tt>toArray()</tt>.
426 *
427 * @param a the array into which the elements of the queue are to
428 * be stored, if it is big enough; otherwise, a new array of the
429 * same runtime type is allocated for this purpose
430 * @return an array containing all of the elements in this queue
431 * @throws ArrayStoreException if the runtime type of the specified array
432 * is not a supertype of the runtime type of every element in
433 * this queue
434 * @throws NullPointerException if the specified array is null
435 */
436 public <T> T[] toArray(T[] a) {
437 final ReentrantLock lock = this .lock;
438 lock.lock();
439 try {
440 return q.toArray(a);
441 } finally {
442 lock.unlock();
443 }
444 }
445
446 /**
447 * Removes a single instance of the specified element from this
448 * queue, if it is present, whether or not it has expired.
449 */
450 public boolean remove(Object o) {
451 final ReentrantLock lock = this .lock;
452 lock.lock();
453 try {
454 return q.remove(o);
455 } finally {
456 lock.unlock();
457 }
458 }
459
460 /**
461 * Returns an iterator over all the elements (both expired and
462 * unexpired) in this queue. The iterator does not return the
463 * elements in any particular order. The returned
464 * <tt>Iterator</tt> is a "weakly consistent" iterator that will
465 * never throw {@link ConcurrentModificationException}, and
466 * guarantees to traverse elements as they existed upon
467 * construction of the iterator, and may (but is not guaranteed
468 * to) reflect any modifications subsequent to construction.
469 *
470 * @return an iterator over the elements in this queue
471 */
472 public Iterator<E> iterator() {
473 return new Itr(toArray());
474 }
475
476 /**
477 * Snapshot iterator that works off copy of underlying q array.
478 */
479 private class Itr implements Iterator<E> {
480 final Object[] array; // Array of all elements
481 int cursor; // index of next element to return;
482 int lastRet; // index of last element, or -1 if no such
483
484 Itr(Object[] array) {
485 lastRet = -1;
486 this .array = array;
487 }
488
489 public boolean hasNext() {
490 return cursor < array.length;
491 }
492
493 public E next() {
494 if (cursor >= array.length)
495 throw new NoSuchElementException();
496 lastRet = cursor;
497 return (E) array[cursor++];
498 }
499
500 public void remove() {
501 if (lastRet < 0)
502 throw new IllegalStateException();
503 Object x = array[lastRet];
504 lastRet = -1;
505 // Traverse underlying queue to find == element,
506 // not just a .equals element.
507 lock.lock();
508 try {
509 for (Iterator it = q.iterator(); it.hasNext();) {
510 if (it.next() == x) {
511 it.remove();
512 return;
513 }
514 }
515 } finally {
516 lock.unlock();
517 }
518 }
519 }
520
521 }
|