001: /*
002: File: WaitFreeQueue.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 16Jun1998 dl Create public version
012: 5Aug1998 dl replaced int counters with longs
013: 17nov2001 dl Simplify given Bill Pugh's observation
014: that counted pointers are unnecessary.
015: */
016:
017: package EDU.oswego.cs.dl.util.concurrent;
018:
019: /**
020: * A wait-free linked list based queue implementation.
021: * <p>
022: *
023: * While this class conforms to the full Channel interface, only the
024: * <code>put</code> and <code>poll</code> methods are useful in most
025: * applications. Because the queue does not support blocking
026: * operations, <code>take</code> relies on spin-loops, which can be
027: * extremely wasteful. <p>
028: *
029: * This class is adapted from the algorithm described in <a
030: * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
031: * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
032: * Algorithms</a> by Maged M. Michael and Michael L. Scott. This
033: * implementation is not strictly wait-free since it relies on locking
034: * for basic atomicity and visibility requirements. Locks can impose
035: * unbounded waits, although this should not be a major practical
036: * concern here since each lock is held for the duration of only a few
037: * statements. (However, the overhead of using so many locks can make
038: * it less attractive than other Channel implementations on JVMs where
039: * locking operations are very slow.) <p>
040: *
041: * @see BoundedLinkedQueue
042: * @see LinkedQueue
043: *
044: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
045:
046: **/
047:
048: public class WaitFreeQueue implements Channel {
049:
050: /*
051: This is a straightforward adaptation of Michael & Scott
052: algorithm, with CAS's simulated via per-field locks,
053: and without version numbers for pointers since, under
054: Java Garbage Collection, you can never see the "wrong"
055: node with the same address as the one you think you have.
056: */
057:
058: /** List nodes for Queue **/
059: protected final static class Node {
060: protected final Object value;
061: protected volatile Node next;
062:
063: /** Make a new node with indicated item, and null link **/
064: protected Node(Object x) {
065: value = x;
066: }
067:
068: /** Simulate a CAS operation for 'next' field **/
069: protected synchronized boolean CASNext(Node oldNext,
070: Node newNext) {
071: if (next == oldNext) {
072: next = newNext;
073: return true;
074: } else
075: return false;
076: }
077: }
078:
079: /** Head of list is always a dummy node **/
080: protected volatile Node head = new Node(null);
081: /** Pointer to last node on list **/
082: protected volatile Node tail = head;
083:
084: /** Lock for simulating CAS for tail field **/
085: protected final Object tailLock = new Object();
086:
087: /** Simulate CAS for head field, using 'this' lock **/
088: protected synchronized boolean CASHead(Node oldHead, Node newHead) {
089: if (head == oldHead) {
090: head = newHead;
091: return true;
092: } else
093: return false;
094: }
095:
096: /** Simulate CAS for tail field **/
097: protected boolean CASTail(Node oldTail, Node newTail) {
098: synchronized (tailLock) {
099: if (tail == oldTail) {
100: tail = newTail;
101: return true;
102: } else
103: return false;
104: }
105: }
106:
107: public void put(Object x) throws InterruptedException {
108: if (x == null)
109: throw new IllegalArgumentException();
110: if (Thread.interrupted())
111: throw new InterruptedException();
112: Node n = new Node(x);
113:
114: for (;;) {
115: Node t = tail;
116: // Try to link new node to end of list.
117: if (t.CASNext(null, n)) {
118: // Must now change tail field.
119: // This CAS might fail, but if so, it will be fixed by others.
120: CASTail(t, n);
121: return;
122: }
123:
124: // If cannot link, help out a previous failed attempt to move tail
125: CASTail(t, t.next);
126: }
127: }
128:
129: public boolean offer(Object x, long msecs)
130: throws InterruptedException {
131: put(x);
132: return true;
133: }
134:
135: /** Main dequeue algorithm, called by poll, take. **/
136: protected Object extract() throws InterruptedException {
137: for (;;) {
138: Node h = head;
139: Node first = h.next;
140:
141: if (first == null)
142: return null;
143:
144: Object result = first.value;
145: if (CASHead(h, first))
146: return result;
147: }
148: }
149:
150: public Object peek() {
151: Node first = head.next;
152:
153: if (first == null)
154: return null;
155:
156: // Note: This synch unnecessary after JSR-133.
157: // It exists only to guarantee visibility of returned object,
158: // No other synch is needed, but "old" memory model requires one.
159: synchronized (this ) {
160: return first.value;
161: }
162: }
163:
164: /**
165: * Spin until poll returns a non-null value.
166: * You probably don't want to call this method.
167: * A Thread.sleep(0) is performed on each iteration
168: * as a heuristic to reduce contention. If you would
169: * rather use, for example, an exponential backoff,
170: * you could manually set this up using poll.
171: **/
172: public Object take() throws InterruptedException {
173: if (Thread.interrupted())
174: throw new InterruptedException();
175: for (;;) {
176: Object x = extract();
177: if (x != null)
178: return x;
179: else
180: Thread.sleep(0);
181: }
182: }
183:
184: /**
185: * Spin until poll returns a non-null value or time elapses.
186: * if msecs is positive, a Thread.sleep(0) is performed on each iteration
187: * as a heuristic to reduce contention.
188: **/
189: public Object poll(long msecs) throws InterruptedException {
190: if (Thread.interrupted())
191: throw new InterruptedException();
192: if (msecs <= 0)
193: return extract();
194:
195: long startTime = System.currentTimeMillis();
196: for (;;) {
197: Object x = extract();
198: if (x != null)
199: return x;
200: else if (System.currentTimeMillis() - startTime >= msecs)
201: return null;
202: else
203: Thread.sleep(0);
204: }
205:
206: }
207: }
|