001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package java.util.concurrent;
008:
009: import java.util.*;
010: import java.util.concurrent.atomic.*;
011:
012: /**
013: * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
014: * This queue orders elements FIFO (first-in-first-out).
015: * The <em>head</em> of the queue is that element that has been on the
016: * queue the longest time.
017: * The <em>tail</em> of the queue is that element that has been on the
018: * queue the shortest time. New elements
019: * are inserted at the tail of the queue, and the queue retrieval
020: * operations obtain elements at the head of the queue.
021: * A <tt>ConcurrentLinkedQueue</tt> is an appropriate choice when
022: * many threads will share access to a common collection.
023: * This queue does not permit <tt>null</tt> elements.
024: *
025: * <p>This implementation employs an efficient "wait-free"
026: * algorithm based on one described in <a
027: * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
028: * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
029: * Algorithms</a> by Maged M. Michael and Michael L. Scott.
030: *
031: * <p>Beware that, unlike in most collections, the <tt>size</tt> method
032: * is <em>NOT</em> a constant-time operation. Because of the
033: * asynchronous nature of these queues, determining the current number
034: * of elements requires a traversal of the elements.
035: *
036: * <p>This class implements all of the <em>optional</em> methods
037: * of the {@link Collection} and {@link Iterator} interfaces.
038: *
039: * <p>This class is a member of the
040: * <a href="{@docRoot}/../guide/collections/index.html">
041: * Java Collections Framework</a>.
042: *
043: * @since 1.5
044: * @author Doug Lea
045: * @param <E> the type of elements held in this collection
046: *
047: */
048: public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
049: implements Queue<E>, java.io.Serializable {
050: private static final long serialVersionUID = 196745693267521676L;
051:
052: /*
053: * This is a straight adaptation of Michael & Scott algorithm.
054: * For explanation, read the paper. The only (minor) algorithmic
055: * difference is that this version supports lazy deletion of
056: * internal nodes (method remove(Object)) -- remove CAS'es item
057: * fields to null. The normal queue operations unlink but then
058: * pass over nodes with null item fields. Similarly, iteration
059: * methods ignore those with nulls.
060: */
061:
062: private static class Node<E> {
063: private volatile E item;
064: private volatile Node<E> next;
065:
066: private static final AtomicReferenceFieldUpdater<Node, Node> nextUpdater = AtomicReferenceFieldUpdater
067: .newUpdater(Node.class, Node.class, "next");
068: private static final AtomicReferenceFieldUpdater<Node, Object> itemUpdater = AtomicReferenceFieldUpdater
069: .newUpdater(Node.class, Object.class, "item");
070:
071: Node(E x) {
072: item = x;
073: }
074:
075: Node(E x, Node<E> n) {
076: item = x;
077: next = n;
078: }
079:
080: E getItem() {
081: return item;
082: }
083:
084: boolean casItem(E cmp, E val) {
085: return itemUpdater.compareAndSet(this , cmp, val);
086: }
087:
088: void setItem(E val) {
089: itemUpdater.set(this , val);
090: }
091:
092: Node<E> getNext() {
093: return next;
094: }
095:
096: boolean casNext(Node<E> cmp, Node<E> val) {
097: return nextUpdater.compareAndSet(this , cmp, val);
098: }
099:
100: void setNext(Node<E> val) {
101: nextUpdater.set(this , val);
102: }
103:
104: }
105:
106: private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node> tailUpdater = AtomicReferenceFieldUpdater
107: .newUpdater(ConcurrentLinkedQueue.class, Node.class, "tail");
108: private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node> headUpdater = AtomicReferenceFieldUpdater
109: .newUpdater(ConcurrentLinkedQueue.class, Node.class, "head");
110:
111: private boolean casTail(Node<E> cmp, Node<E> val) {
112: return tailUpdater.compareAndSet(this , cmp, val);
113: }
114:
115: private boolean casHead(Node<E> cmp, Node<E> val) {
116: return headUpdater.compareAndSet(this , cmp, val);
117: }
118:
119: /**
120: * Pointer to header node, initialized to a dummy node. The first
121: * actual node is at head.getNext().
122: */
123: private transient volatile Node<E> head = new Node<E>(null, null);
124:
125: /** Pointer to last node on list **/
126: private transient volatile Node<E> tail = head;
127:
128: /**
129: * Creates a <tt>ConcurrentLinkedQueue</tt> that is initially empty.
130: */
131: public ConcurrentLinkedQueue() {
132: }
133:
134: /**
135: * Creates a <tt>ConcurrentLinkedQueue</tt>
136: * initially containing the elements of the given collection,
137: * added in traversal order of the collection's iterator.
138: * @param c the collection of elements to initially contain
139: * @throws NullPointerException if <tt>c</tt> or any element within it
140: * is <tt>null</tt>
141: */
142: public ConcurrentLinkedQueue(Collection<? extends E> c) {
143: for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
144: add(it.next());
145: }
146:
147: // Have to override just to update the javadoc
148:
149: /**
150: * Adds the specified element to the tail of this queue.
151: * @param o the element to add.
152: * @return <tt>true</tt> (as per the general contract of
153: * <tt>Collection.add</tt>).
154: *
155: * @throws NullPointerException if the specified element is <tt>null</tt>
156: */
157: public boolean add(E o) {
158: return offer(o);
159: }
160:
161: /**
162: * Inserts the specified element to the tail of this queue.
163: *
164: * @param o the element to add.
165: * @return <tt>true</tt> (as per the general contract of
166: * <tt>Queue.offer</tt>).
167: * @throws NullPointerException if the specified element is <tt>null</tt>
168: */
169: public boolean offer(E o) {
170: if (o == null)
171: throw new NullPointerException();
172: Node<E> n = new Node<E>(o, null);
173: for (;;) {
174: Node<E> t = tail;
175: Node<E> s = t.getNext();
176: if (t == tail) {
177: if (s == null) {
178: if (t.casNext(s, n)) {
179: casTail(t, n);
180: return true;
181: }
182: } else {
183: casTail(t, s);
184: }
185: }
186: }
187: }
188:
189: public E poll() {
190: for (;;) {
191: Node<E> h = head;
192: Node<E> t = tail;
193: Node<E> first = h.getNext();
194: if (h == head) {
195: if (h == t) {
196: if (first == null)
197: return null;
198: else
199: casTail(t, first);
200: } else if (casHead(h, first)) {
201: E item = first.getItem();
202: if (item != null) {
203: first.setItem(null);
204: return item;
205: }
206: // else skip over deleted item, continue loop,
207: }
208: }
209: }
210: }
211:
212: public E peek() { // same as poll except don't remove item
213: for (;;) {
214: Node<E> h = head;
215: Node<E> t = tail;
216: Node<E> first = h.getNext();
217: if (h == head) {
218: if (h == t) {
219: if (first == null)
220: return null;
221: else
222: casTail(t, first);
223: } else {
224: E item = first.getItem();
225: if (item != null)
226: return item;
227: else
228: // remove deleted node and continue
229: casHead(h, first);
230: }
231: }
232: }
233: }
234:
235: /**
236: * Returns the first actual (non-header) node on list. This is yet
237: * another variant of poll/peek; here returning out the first
238: * node, not element (so we cannot collapse with peek() without
239: * introducing race.)
240: */
241: Node<E> first() {
242: for (;;) {
243: Node<E> h = head;
244: Node<E> t = tail;
245: Node<E> first = h.getNext();
246: if (h == head) {
247: if (h == t) {
248: if (first == null)
249: return null;
250: else
251: casTail(t, first);
252: } else {
253: if (first.getItem() != null)
254: return first;
255: else
256: // remove deleted node and continue
257: casHead(h, first);
258: }
259: }
260: }
261: }
262:
263: public boolean isEmpty() {
264: return first() == null;
265: }
266:
267: /**
268: * Returns the number of elements in this queue. If this queue
269: * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
270: * <tt>Integer.MAX_VALUE</tt>.
271: *
272: * <p>Beware that, unlike in most collections, this method is
273: * <em>NOT</em> a constant-time operation. Because of the
274: * asynchronous nature of these queues, determining the current
275: * number of elements requires an O(n) traversal.
276: *
277: * @return the number of elements in this queue.
278: */
279: public int size() {
280: int count = 0;
281: for (Node<E> p = first(); p != null; p = p.getNext()) {
282: if (p.getItem() != null) {
283: // Collections.size() spec says to max out
284: if (++count == Integer.MAX_VALUE)
285: break;
286: }
287: }
288: return count;
289: }
290:
291: public boolean contains(Object o) {
292: if (o == null)
293: return false;
294: for (Node<E> p = first(); p != null; p = p.getNext()) {
295: E item = p.getItem();
296: if (item != null && o.equals(item))
297: return true;
298: }
299: return false;
300: }
301:
302: public boolean remove(Object o) {
303: if (o == null)
304: return false;
305: for (Node<E> p = first(); p != null; p = p.getNext()) {
306: E item = p.getItem();
307: if (item != null && o.equals(item) && p.casItem(item, null))
308: return true;
309: }
310: return false;
311: }
312:
313: public Object[] toArray() {
314: // Use ArrayList to deal with resizing.
315: ArrayList<E> al = new ArrayList<E>();
316: for (Node<E> p = first(); p != null; p = p.getNext()) {
317: E item = p.getItem();
318: if (item != null)
319: al.add(item);
320: }
321: return al.toArray();
322: }
323:
324: public <T> T[] toArray(T[] a) {
325: // try to use sent-in array
326: int k = 0;
327: Node<E> p;
328: for (p = first(); p != null && k < a.length; p = p.getNext()) {
329: E item = p.getItem();
330: if (item != null)
331: a[k++] = (T) item;
332: }
333: if (p == null) {
334: if (k < a.length)
335: a[k] = null;
336: return a;
337: }
338:
339: // If won't fit, use ArrayList version
340: ArrayList<E> al = new ArrayList<E>();
341: for (Node<E> q = first(); q != null; q = q.getNext()) {
342: E item = q.getItem();
343: if (item != null)
344: al.add(item);
345: }
346: return (T[]) al.toArray(a);
347: }
348:
349: /**
350: * Returns an iterator over the elements in this queue in proper sequence.
351: * The returned iterator is a "weakly consistent" iterator that
352: * will never throw {@link java.util.ConcurrentModificationException},
353: * and guarantees to traverse elements as they existed upon
354: * construction of the iterator, and may (but is not guaranteed to)
355: * reflect any modifications subsequent to construction.
356: *
357: * @return an iterator over the elements in this queue in proper sequence.
358: */
359: public Iterator<E> iterator() {
360: return new Itr();
361: }
362:
363: private class Itr implements Iterator<E> {
364: /**
365: * Next node to return item for.
366: */
367: private Node<E> nextNode;
368:
369: /**
370: * nextItem holds on to item fields because once we claim
371: * that an element exists in hasNext(), we must return it in
372: * the following next() call even if it was in the process of
373: * being removed when hasNext() was called.
374: **/
375: private E nextItem;
376:
377: /**
378: * Node of the last returned item, to support remove.
379: */
380: private Node<E> lastRet;
381:
382: Itr() {
383: advance();
384: }
385:
386: /**
387: * Moves to next valid node and returns item to return for
388: * next(), or null if no such.
389: */
390: private E advance() {
391: lastRet = nextNode;
392: E x = nextItem;
393:
394: Node<E> p = (nextNode == null) ? first() : nextNode
395: .getNext();
396: for (;;) {
397: if (p == null) {
398: nextNode = null;
399: nextItem = null;
400: return x;
401: }
402: E item = p.getItem();
403: if (item != null) {
404: nextNode = p;
405: nextItem = item;
406: return x;
407: } else
408: // skip over nulls
409: p = p.getNext();
410: }
411: }
412:
413: public boolean hasNext() {
414: return nextNode != null;
415: }
416:
417: public E next() {
418: if (nextNode == null)
419: throw new NoSuchElementException();
420: return advance();
421: }
422:
423: public void remove() {
424: Node<E> l = lastRet;
425: if (l == null)
426: throw new IllegalStateException();
427: // rely on a future traversal to relink.
428: l.setItem(null);
429: lastRet = null;
430: }
431: }
432:
433: /**
434: * Save the state to a stream (that is, serialize it).
435: *
436: * @serialData All of the elements (each an <tt>E</tt>) in
437: * the proper order, followed by a null
438: * @param s the stream
439: */
440: private void writeObject(java.io.ObjectOutputStream s)
441: throws java.io.IOException {
442:
443: // Write out any hidden stuff
444: s.defaultWriteObject();
445:
446: // Write out all elements in the proper order.
447: for (Node<E> p = first(); p != null; p = p.getNext()) {
448: Object item = p.getItem();
449: if (item != null)
450: s.writeObject(item);
451: }
452:
453: // Use trailing null as sentinel
454: s.writeObject(null);
455: }
456:
457: /**
458: * Reconstitute the Queue instance from a stream (that is,
459: * deserialize it).
460: * @param s the stream
461: */
462: private void readObject(java.io.ObjectInputStream s)
463: throws java.io.IOException, ClassNotFoundException {
464: // Read in capacity, and any hidden stuff
465: s.defaultReadObject();
466: head = new Node<E>(null, null);
467: tail = head;
468: // Read in all elements and place in queue
469: for (;;) {
470: E item = (E) s.readObject();
471: if (item == null)
472: break;
473: else
474: offer(item);
475: }
476: }
477:
478: }
|