001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package java.util.concurrent;
008:
009: import java.util.concurrent.locks.*;
010: import java.util.*;
011:
012: /**
013: * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
014: * elements, in which an element can only be taken when its delay has expired.
015: * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
016: * expired furthest in the past - if no delay has expired there is no head and
017: * <tt>poll</tt> will return <tt>null</tt>.
018: * This queue does not permit <tt>null</tt> elements.
019: * <p>This class implements all of the <em>optional</em> methods
020: * of the {@link Collection} and {@link Iterator} interfaces.
021: *
022: * <p>This class is a member of the
023: * <a href="{@docRoot}/../guide/collections/index.html">
024: * Java Collections Framework</a>.
025: *
026: * @since 1.5
027: * @author Doug Lea
028: * @param <E> the type of elements held in this collection
029: */
030:
031: public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
032: implements BlockingQueue<E> {
033:
034: private transient final ReentrantLock lock = new ReentrantLock();
035: private transient final Condition available = lock.newCondition();
036: private final PriorityQueue<E> q = new PriorityQueue<E>();
037:
038: /**
039: * Creates a new <tt>DelayQueue</tt> that is initially empty.
040: */
041: public DelayQueue() {
042: }
043:
044: /**
045: * Creates a <tt>DelayQueue</tt> initially containing the elements of the
046: * given collection of {@link Delayed} instances.
047: *
048: * @param c the collection
049: * @throws NullPointerException if <tt>c</tt> or any element within it
050: * is <tt>null</tt>
051: *
052: */
053: public DelayQueue(Collection<? extends E> c) {
054: this .addAll(c);
055: }
056:
057: /**
058: * Inserts the specified element into this delay queue.
059: *
060: * @param o the element to add
061: * @return <tt>true</tt>
062: * @throws NullPointerException if the specified element is <tt>null</tt>.
063: */
064: public boolean offer(E o) {
065: final ReentrantLock lock = this .lock;
066: lock.lock();
067: try {
068: E first = q.peek();
069: q.offer(o);
070: if (first == null || o.compareTo(first) < 0)
071: available.signalAll();
072: return true;
073: } finally {
074: lock.unlock();
075: }
076: }
077:
078: /**
079: * Adds the specified element to this delay queue. As the queue is
080: * unbounded this method will never block.
081: * @param o the element to add
082: * @throws NullPointerException if the specified element is <tt>null</tt>.
083: */
084: public void put(E o) {
085: offer(o);
086: }
087:
088: /**
089: * Inserts the specified element into this delay queue. As the queue is
090: * unbounded this method will never block.
091: * @param o the element to add
092: * @param timeout This parameter is ignored as the method never blocks
093: * @param unit This parameter is ignored as the method never blocks
094: * @return <tt>true</tt>
095: * @throws NullPointerException if the specified element is <tt>null</tt>.
096: */
097: public boolean offer(E o, long timeout, TimeUnit unit) {
098: return offer(o);
099: }
100:
101: /**
102: * Adds the specified element to this queue.
103: * @param o the element to add
104: * @return <tt>true</tt> (as per the general contract of
105: * <tt>Collection.add</tt>).
106: *
107: * @throws NullPointerException if the specified element is <tt>null</tt>.
108: */
109: public boolean add(E o) {
110: return offer(o);
111: }
112:
113: public E take() throws InterruptedException {
114: final ReentrantLock lock = this .lock;
115: lock.lockInterruptibly();
116: try {
117: for (;;) {
118: E first = q.peek();
119: if (first == null) {
120: available.await();
121: } else {
122: long delay = first.getDelay(TimeUnit.NANOSECONDS);
123: if (delay > 0) {
124: long tl = available.awaitNanos(delay);
125: } else {
126: E x = q.poll();
127: assert x != null;
128: if (q.size() != 0)
129: available.signalAll(); // wake up other takers
130: return x;
131:
132: }
133: }
134: }
135: } finally {
136: lock.unlock();
137: }
138: }
139:
140: public E poll(long time, TimeUnit unit) throws InterruptedException {
141: final ReentrantLock lock = this .lock;
142: lock.lockInterruptibly();
143: long nanos = unit.toNanos(time);
144: try {
145: for (;;) {
146: E first = q.peek();
147: if (first == null) {
148: if (nanos <= 0)
149: return null;
150: else
151: nanos = available.awaitNanos(nanos);
152: } else {
153: long delay = first.getDelay(TimeUnit.NANOSECONDS);
154: if (delay > 0) {
155: if (delay > nanos)
156: delay = nanos;
157: long timeLeft = available.awaitNanos(delay);
158: nanos -= delay - timeLeft;
159: } else {
160: E x = q.poll();
161: assert x != null;
162: if (q.size() != 0)
163: available.signalAll();
164: return x;
165: }
166: }
167: }
168: } finally {
169: lock.unlock();
170: }
171: }
172:
173: public E poll() {
174: final ReentrantLock lock = this .lock;
175: lock.lock();
176: try {
177: E first = q.peek();
178: if (first == null
179: || first.getDelay(TimeUnit.NANOSECONDS) > 0)
180: return null;
181: else {
182: E x = q.poll();
183: assert x != null;
184: if (q.size() != 0)
185: available.signalAll();
186: return x;
187: }
188: } finally {
189: lock.unlock();
190: }
191: }
192:
193: public E peek() {
194: final ReentrantLock lock = this .lock;
195: lock.lock();
196: try {
197: return q.peek();
198: } finally {
199: lock.unlock();
200: }
201: }
202:
203: public int size() {
204: final ReentrantLock lock = this .lock;
205: lock.lock();
206: try {
207: return q.size();
208: } finally {
209: lock.unlock();
210: }
211: }
212:
213: public int drainTo(Collection<? super E> c) {
214: if (c == null)
215: throw new NullPointerException();
216: if (c == this )
217: throw new IllegalArgumentException();
218: final ReentrantLock lock = this .lock;
219: lock.lock();
220: try {
221: int n = 0;
222: for (;;) {
223: E first = q.peek();
224: if (first == null
225: || first.getDelay(TimeUnit.NANOSECONDS) > 0)
226: break;
227: c.add(q.poll());
228: ++n;
229: }
230: if (n > 0)
231: available.signalAll();
232: return n;
233: } finally {
234: lock.unlock();
235: }
236: }
237:
238: public int drainTo(Collection<? super E> c, int maxElements) {
239: if (c == null)
240: throw new NullPointerException();
241: if (c == this )
242: throw new IllegalArgumentException();
243: if (maxElements <= 0)
244: return 0;
245: final ReentrantLock lock = this .lock;
246: lock.lock();
247: try {
248: int n = 0;
249: while (n < maxElements) {
250: E first = q.peek();
251: if (first == null
252: || first.getDelay(TimeUnit.NANOSECONDS) > 0)
253: break;
254: c.add(q.poll());
255: ++n;
256: }
257: if (n > 0)
258: available.signalAll();
259: return n;
260: } finally {
261: lock.unlock();
262: }
263: }
264:
265: /**
266: * Atomically removes all of the elements from this delay queue.
267: * The queue will be empty after this call returns.
268: */
269: public void clear() {
270: final ReentrantLock lock = this .lock;
271: lock.lock();
272: try {
273: q.clear();
274: } finally {
275: lock.unlock();
276: }
277: }
278:
279: /**
280: * Always returns <tt>Integer.MAX_VALUE</tt> because
281: * a <tt>DelayQueue</tt> is not capacity constrained.
282: * @return <tt>Integer.MAX_VALUE</tt>
283: */
284: public int remainingCapacity() {
285: return Integer.MAX_VALUE;
286: }
287:
288: public Object[] toArray() {
289: final ReentrantLock lock = this .lock;
290: lock.lock();
291: try {
292: return q.toArray();
293: } finally {
294: lock.unlock();
295: }
296: }
297:
298: public <T> T[] toArray(T[] array) {
299: final ReentrantLock lock = this .lock;
300: lock.lock();
301: try {
302: return q.toArray(array);
303: } finally {
304: lock.unlock();
305: }
306: }
307:
308: public boolean remove(Object o) {
309: final ReentrantLock lock = this .lock;
310: lock.lock();
311: try {
312: return q.remove(o);
313: } finally {
314: lock.unlock();
315: }
316: }
317:
318: /**
319: * Returns an iterator over the elements in this queue. The iterator
320: * does not return the elements in any particular order. The
321: * returned iterator is a thread-safe "fast-fail" iterator that will
322: * throw {@link java.util.ConcurrentModificationException}
323: * upon detected interference.
324: *
325: * @return an iterator over the elements in this queue.
326: */
327: public Iterator<E> iterator() {
328: final ReentrantLock lock = this .lock;
329: lock.lock();
330: try {
331: return new Itr(q.iterator());
332: } finally {
333: lock.unlock();
334: }
335: }
336:
337: private class Itr<E> implements Iterator<E> {
338: private final Iterator<E> iter;
339:
340: Itr(Iterator<E> i) {
341: iter = i;
342: }
343:
344: public boolean hasNext() {
345: return iter.hasNext();
346: }
347:
348: public E next() {
349: final ReentrantLock lock = DelayQueue.this .lock;
350: lock.lock();
351: try {
352: return iter.next();
353: } finally {
354: lock.unlock();
355: }
356: }
357:
358: public void remove() {
359: final ReentrantLock lock = DelayQueue.this.lock;
360: lock.lock();
361: try {
362: iter.remove();
363: } finally {
364: lock.unlock();
365: }
366: }
367: }
368:
369: }
|