001 /*
002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
003 *
004 * This code is free software; you can redistribute it and/or modify it
005 * under the terms of the GNU General Public License version 2 only, as
006 * published by the Free Software Foundation. Sun designates this
007 * particular file as subject to the "Classpath" exception as provided
008 * by Sun in the LICENSE file that accompanied this code.
009 *
010 * This code is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
013 * version 2 for more details (a copy is included in the LICENSE file that
014 * accompanied this code).
015 *
016 * You should have received a copy of the GNU General Public License version
017 * 2 along with this work; if not, write to the Free Software Foundation,
018 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
019 *
020 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
021 * CA 95054 USA or visit www.sun.com if you need additional information or
022 * have any questions.
023 */
024
025 /*
026 * This file is available under and governed by the GNU General Public
027 * License version 2 only, as published by the Free Software Foundation.
028 * However, the following notice accompanied the original version of this
029 * file:
030 *
031 * Written by Doug Lea with assistance from members of JCP JSR-166
032 * Expert Group and released to the public domain, as explained at
033 * http://creativecommons.org/licenses/publicdomain
034 */
035
036 package java.util.concurrent;
037
038 import java.util.*;
039 import java.util.concurrent.atomic.*;
040
041 /**
042 * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
043 * This queue orders elements FIFO (first-in-first-out).
044 * The <em>head</em> of the queue is that element that has been on the
045 * queue the longest time.
046 * The <em>tail</em> of the queue is that element that has been on the
047 * queue the shortest time. New elements
048 * are inserted at the tail of the queue, and the queue retrieval
049 * operations obtain elements at the head of the queue.
050 * A <tt>ConcurrentLinkedQueue</tt> is an appropriate choice when
051 * many threads will share access to a common collection.
052 * This queue does not permit <tt>null</tt> elements.
053 *
054 * <p>This implementation employs an efficient "wait-free"
055 * algorithm based on one described in <a
056 * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
057 * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
058 * Algorithms</a> by Maged M. Michael and Michael L. Scott.
059 *
060 * <p>Beware that, unlike in most collections, the <tt>size</tt> method
061 * is <em>NOT</em> a constant-time operation. Because of the
062 * asynchronous nature of these queues, determining the current number
063 * of elements requires a traversal of the elements.
064 *
065 * <p>This class and its iterator implement all of the
066 * <em>optional</em> methods of the {@link Collection} and {@link
067 * Iterator} interfaces.
068 *
069 * <p>Memory consistency effects: As with other concurrent
070 * collections, actions in a thread prior to placing an object into a
071 * {@code ConcurrentLinkedQueue}
072 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
073 * actions subsequent to the access or removal of that element from
074 * the {@code ConcurrentLinkedQueue} in another thread.
075 *
076 * <p>This class is a member of the
077 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
078 * Java Collections Framework</a>.
079 *
080 * @since 1.5
081 * @author Doug Lea
082 * @param <E> the type of elements held in this collection
083 *
084 */
085 public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
086 implements Queue<E>, java.io.Serializable {
087 private static final long serialVersionUID = 196745693267521676L;
088
089 /*
090 * This is a straight adaptation of Michael & Scott algorithm.
091 * For explanation, read the paper. The only (minor) algorithmic
092 * difference is that this version supports lazy deletion of
093 * internal nodes (method remove(Object)) -- remove CAS'es item
094 * fields to null. The normal queue operations unlink but then
095 * pass over nodes with null item fields. Similarly, iteration
096 * methods ignore those with nulls.
097 *
098 * Also note that like most non-blocking algorithms in this
099 * package, this implementation relies on the fact that in garbage
100 * collected systems, there is no possibility of ABA problems due
101 * to recycled nodes, so there is no need to use "counted
102 * pointers" or related techniques seen in versions used in
103 * non-GC'ed settings.
104 */
105
106 private static class Node<E> {
107 private volatile E item;
108 private volatile Node<E> next;
109
110 private static final AtomicReferenceFieldUpdater<Node, Node> nextUpdater = AtomicReferenceFieldUpdater
111 .newUpdater(Node.class, Node.class, "next");
112 private static final AtomicReferenceFieldUpdater<Node, Object> itemUpdater = AtomicReferenceFieldUpdater
113 .newUpdater(Node.class, Object.class, "item");
114
115 Node(E x) {
116 item = x;
117 }
118
119 Node(E x, Node<E> n) {
120 item = x;
121 next = n;
122 }
123
124 E getItem() {
125 return item;
126 }
127
128 boolean casItem(E cmp, E val) {
129 return itemUpdater.compareAndSet(this , cmp, val);
130 }
131
132 void setItem(E val) {
133 itemUpdater.set(this , val);
134 }
135
136 Node<E> getNext() {
137 return next;
138 }
139
140 boolean casNext(Node<E> cmp, Node<E> val) {
141 return nextUpdater.compareAndSet(this , cmp, val);
142 }
143
144 void setNext(Node<E> val) {
145 nextUpdater.set(this , val);
146 }
147
148 }
149
150 private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node> tailUpdater = AtomicReferenceFieldUpdater
151 .newUpdater(ConcurrentLinkedQueue.class, Node.class, "tail");
152 private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node> headUpdater = AtomicReferenceFieldUpdater
153 .newUpdater(ConcurrentLinkedQueue.class, Node.class, "head");
154
155 private boolean casTail(Node<E> cmp, Node<E> val) {
156 return tailUpdater.compareAndSet(this , cmp, val);
157 }
158
159 private boolean casHead(Node<E> cmp, Node<E> val) {
160 return headUpdater.compareAndSet(this , cmp, val);
161 }
162
163 /**
164 * Pointer to header node, initialized to a dummy node. The first
165 * actual node is at head.getNext().
166 */
167 private transient volatile Node<E> head = new Node<E>(null, null);
168
169 /** Pointer to last node on list **/
170 private transient volatile Node<E> tail = head;
171
172 /**
173 * Creates a <tt>ConcurrentLinkedQueue</tt> that is initially empty.
174 */
175 public ConcurrentLinkedQueue() {
176 }
177
178 /**
179 * Creates a <tt>ConcurrentLinkedQueue</tt>
180 * initially containing the elements of the given collection,
181 * added in traversal order of the collection's iterator.
182 * @param c the collection of elements to initially contain
183 * @throws NullPointerException if the specified collection or any
184 * of its elements are null
185 */
186 public ConcurrentLinkedQueue(Collection<? extends E> c) {
187 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
188 add(it.next());
189 }
190
191 // Have to override just to update the javadoc
192
193 /**
194 * Inserts the specified element at the tail of this queue.
195 *
196 * @return <tt>true</tt> (as specified by {@link Collection#add})
197 * @throws NullPointerException if the specified element is null
198 */
199 public boolean add(E e) {
200 return offer(e);
201 }
202
203 /**
204 * Inserts the specified element at the tail of this queue.
205 *
206 * @return <tt>true</tt> (as specified by {@link Queue#offer})
207 * @throws NullPointerException if the specified element is null
208 */
209 public boolean offer(E e) {
210 if (e == null)
211 throw new NullPointerException();
212 Node<E> n = new Node<E>(e, null);
213 for (;;) {
214 Node<E> t = tail;
215 Node<E> s = t.getNext();
216 if (t == tail) {
217 if (s == null) {
218 if (t.casNext(s, n)) {
219 casTail(t, n);
220 return true;
221 }
222 } else {
223 casTail(t, s);
224 }
225 }
226 }
227 }
228
229 public E poll() {
230 for (;;) {
231 Node<E> h = head;
232 Node<E> t = tail;
233 Node<E> first = h.getNext();
234 if (h == head) {
235 if (h == t) {
236 if (first == null)
237 return null;
238 else
239 casTail(t, first);
240 } else if (casHead(h, first)) {
241 E item = first.getItem();
242 if (item != null) {
243 first.setItem(null);
244 return item;
245 }
246 // else skip over deleted item, continue loop,
247 }
248 }
249 }
250 }
251
252 public E peek() { // same as poll except don't remove item
253 for (;;) {
254 Node<E> h = head;
255 Node<E> t = tail;
256 Node<E> first = h.getNext();
257 if (h == head) {
258 if (h == t) {
259 if (first == null)
260 return null;
261 else
262 casTail(t, first);
263 } else {
264 E item = first.getItem();
265 if (item != null)
266 return item;
267 else
268 // remove deleted node and continue
269 casHead(h, first);
270 }
271 }
272 }
273 }
274
275 /**
276 * Returns the first actual (non-header) node on list. This is yet
277 * another variant of poll/peek; here returning out the first
278 * node, not element (so we cannot collapse with peek() without
279 * introducing race.)
280 */
281 Node<E> first() {
282 for (;;) {
283 Node<E> h = head;
284 Node<E> t = tail;
285 Node<E> first = h.getNext();
286 if (h == head) {
287 if (h == t) {
288 if (first == null)
289 return null;
290 else
291 casTail(t, first);
292 } else {
293 if (first.getItem() != null)
294 return first;
295 else
296 // remove deleted node and continue
297 casHead(h, first);
298 }
299 }
300 }
301 }
302
303 /**
304 * Returns <tt>true</tt> if this queue contains no elements.
305 *
306 * @return <tt>true</tt> if this queue contains no elements
307 */
308 public boolean isEmpty() {
309 return first() == null;
310 }
311
312 /**
313 * Returns the number of elements in this queue. If this queue
314 * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
315 * <tt>Integer.MAX_VALUE</tt>.
316 *
317 * <p>Beware that, unlike in most collections, this method is
318 * <em>NOT</em> a constant-time operation. Because of the
319 * asynchronous nature of these queues, determining the current
320 * number of elements requires an O(n) traversal.
321 *
322 * @return the number of elements in this queue
323 */
324 public int size() {
325 int count = 0;
326 for (Node<E> p = first(); p != null; p = p.getNext()) {
327 if (p.getItem() != null) {
328 // Collections.size() spec says to max out
329 if (++count == Integer.MAX_VALUE)
330 break;
331 }
332 }
333 return count;
334 }
335
336 /**
337 * Returns <tt>true</tt> if this queue contains the specified element.
338 * More formally, returns <tt>true</tt> if and only if this queue contains
339 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
340 *
341 * @param o object to be checked for containment in this queue
342 * @return <tt>true</tt> if this queue contains the specified element
343 */
344 public boolean contains(Object o) {
345 if (o == null)
346 return false;
347 for (Node<E> p = first(); p != null; p = p.getNext()) {
348 E item = p.getItem();
349 if (item != null && o.equals(item))
350 return true;
351 }
352 return false;
353 }
354
355 /**
356 * Removes a single instance of the specified element from this queue,
357 * if it is present. More formally, removes an element <tt>e</tt> such
358 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
359 * elements.
360 * Returns <tt>true</tt> if this queue contained the specified element
361 * (or equivalently, if this queue changed as a result of the call).
362 *
363 * @param o element to be removed from this queue, if present
364 * @return <tt>true</tt> if this queue changed as a result of the call
365 */
366 public boolean remove(Object o) {
367 if (o == null)
368 return false;
369 for (Node<E> p = first(); p != null; p = p.getNext()) {
370 E item = p.getItem();
371 if (item != null && o.equals(item) && p.casItem(item, null))
372 return true;
373 }
374 return false;
375 }
376
377 /**
378 * Returns an array containing all of the elements in this queue, in
379 * proper sequence.
380 *
381 * <p>The returned array will be "safe" in that no references to it are
382 * maintained by this queue. (In other words, this method must allocate
383 * a new array). The caller is thus free to modify the returned array.
384 *
385 * <p>This method acts as bridge between array-based and collection-based
386 * APIs.
387 *
388 * @return an array containing all of the elements in this queue
389 */
390 public Object[] toArray() {
391 // Use ArrayList to deal with resizing.
392 ArrayList<E> al = new ArrayList<E>();
393 for (Node<E> p = first(); p != null; p = p.getNext()) {
394 E item = p.getItem();
395 if (item != null)
396 al.add(item);
397 }
398 return al.toArray();
399 }
400
401 /**
402 * Returns an array containing all of the elements in this queue, in
403 * proper sequence; the runtime type of the returned array is that of
404 * the specified array. If the queue fits in the specified array, it
405 * is returned therein. Otherwise, a new array is allocated with the
406 * runtime type of the specified array and the size of this queue.
407 *
408 * <p>If this queue fits in the specified array with room to spare
409 * (i.e., the array has more elements than this queue), the element in
410 * the array immediately following the end of the queue is set to
411 * <tt>null</tt>.
412 *
413 * <p>Like the {@link #toArray()} method, this method acts as bridge between
414 * array-based and collection-based APIs. Further, this method allows
415 * precise control over the runtime type of the output array, and may,
416 * under certain circumstances, be used to save allocation costs.
417 *
418 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
419 * The following code can be used to dump the queue into a newly
420 * allocated array of <tt>String</tt>:
421 *
422 * <pre>
423 * String[] y = x.toArray(new String[0]);</pre>
424 *
425 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
426 * <tt>toArray()</tt>.
427 *
428 * @param a the array into which the elements of the queue are to
429 * be stored, if it is big enough; otherwise, a new array of the
430 * same runtime type is allocated for this purpose
431 * @return an array containing all of the elements in this queue
432 * @throws ArrayStoreException if the runtime type of the specified array
433 * is not a supertype of the runtime type of every element in
434 * this queue
435 * @throws NullPointerException if the specified array is null
436 */
437 public <T> T[] toArray(T[] a) {
438 // try to use sent-in array
439 int k = 0;
440 Node<E> p;
441 for (p = first(); p != null && k < a.length; p = p.getNext()) {
442 E item = p.getItem();
443 if (item != null)
444 a[k++] = (T) item;
445 }
446 if (p == null) {
447 if (k < a.length)
448 a[k] = null;
449 return a;
450 }
451
452 // If won't fit, use ArrayList version
453 ArrayList<E> al = new ArrayList<E>();
454 for (Node<E> q = first(); q != null; q = q.getNext()) {
455 E item = q.getItem();
456 if (item != null)
457 al.add(item);
458 }
459 return (T[]) al.toArray(a);
460 }
461
462 /**
463 * Returns an iterator over the elements in this queue in proper sequence.
464 * The returned iterator is a "weakly consistent" iterator that
465 * will never throw {@link ConcurrentModificationException},
466 * and guarantees to traverse elements as they existed upon
467 * construction of the iterator, and may (but is not guaranteed to)
468 * reflect any modifications subsequent to construction.
469 *
470 * @return an iterator over the elements in this queue in proper sequence
471 */
472 public Iterator<E> iterator() {
473 return new Itr();
474 }
475
476 private class Itr implements Iterator<E> {
477 /**
478 * Next node to return item for.
479 */
480 private Node<E> nextNode;
481
482 /**
483 * nextItem holds on to item fields because once we claim
484 * that an element exists in hasNext(), we must return it in
485 * the following next() call even if it was in the process of
486 * being removed when hasNext() was called.
487 */
488 private E nextItem;
489
490 /**
491 * Node of the last returned item, to support remove.
492 */
493 private Node<E> lastRet;
494
495 Itr() {
496 advance();
497 }
498
499 /**
500 * Moves to next valid node and returns item to return for
501 * next(), or null if no such.
502 */
503 private E advance() {
504 lastRet = nextNode;
505 E x = nextItem;
506
507 Node<E> p = (nextNode == null) ? first() : nextNode
508 .getNext();
509 for (;;) {
510 if (p == null) {
511 nextNode = null;
512 nextItem = null;
513 return x;
514 }
515 E item = p.getItem();
516 if (item != null) {
517 nextNode = p;
518 nextItem = item;
519 return x;
520 } else
521 // skip over nulls
522 p = p.getNext();
523 }
524 }
525
526 public boolean hasNext() {
527 return nextNode != null;
528 }
529
530 public E next() {
531 if (nextNode == null)
532 throw new NoSuchElementException();
533 return advance();
534 }
535
536 public void remove() {
537 Node<E> l = lastRet;
538 if (l == null)
539 throw new IllegalStateException();
540 // rely on a future traversal to relink.
541 l.setItem(null);
542 lastRet = null;
543 }
544 }
545
546 /**
547 * Save the state to a stream (that is, serialize it).
548 *
549 * @serialData All of the elements (each an <tt>E</tt>) in
550 * the proper order, followed by a null
551 * @param s the stream
552 */
553 private void writeObject(java.io.ObjectOutputStream s)
554 throws java.io.IOException {
555
556 // Write out any hidden stuff
557 s.defaultWriteObject();
558
559 // Write out all elements in the proper order.
560 for (Node<E> p = first(); p != null; p = p.getNext()) {
561 Object item = p.getItem();
562 if (item != null)
563 s.writeObject(item);
564 }
565
566 // Use trailing null as sentinel
567 s.writeObject(null);
568 }
569
570 /**
571 * Reconstitute the Queue instance from a stream (that is,
572 * deserialize it).
573 * @param s the stream
574 */
575 private void readObject(java.io.ObjectInputStream s)
576 throws java.io.IOException, ClassNotFoundException {
577 // Read in capacity, and any hidden stuff
578 s.defaultReadObject();
579 head = new Node<E>(null, null);
580 tail = head;
581 // Read in all elements and place in queue
582 for (;;) {
583 E item = (E) s.readObject();
584 if (item == null)
585 break;
586 else
587 offer(item);
588 }
589 }
590
591 }
|