0001 /*
0002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
0003 *
0004 * This code is free software; you can redistribute it and/or modify it
0005 * under the terms of the GNU General Public License version 2 only, as
0006 * published by the Free Software Foundation. Sun designates this
0007 * particular file as subject to the "Classpath" exception as provided
0008 * by Sun in the LICENSE file that accompanied this code.
0009 *
0010 * This code is distributed in the hope that it will be useful, but WITHOUT
0011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
0012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
0013 * version 2 for more details (a copy is included in the LICENSE file that
0014 * accompanied this code).
0015 *
0016 * You should have received a copy of the GNU General Public License version
0017 * 2 along with this work; if not, write to the Free Software Foundation,
0018 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
0019 *
0020 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
0021 * CA 95054 USA or visit www.sun.com if you need additional information or
0022 * have any questions.
0023 */
0024
0025 /*
0026 * This file is available under and governed by the GNU General Public
0027 * License version 2 only, as published by the Free Software Foundation.
0028 * However, the following notice accompanied the original version of this
0029 * file:
0030 *
0031 * Written by Doug Lea with assistance from members of JCP JSR-166
0032 * Expert Group and released to the public domain, as explained at
0033 * http://creativecommons.org/licenses/publicdomain
0034 */
0035
0036 package java.util.concurrent.locks;
0037
0038 import java.util.*;
0039 import java.util.concurrent.*;
0040 import java.util.concurrent.atomic.*;
0041 import sun.misc.Unsafe;
0042
0043 /**
0044 * Provides a framework for implementing blocking locks and related
0045 * synchronizers (semaphores, events, etc) that rely on
0046 * first-in-first-out (FIFO) wait queues. This class is designed to
0047 * be a useful basis for most kinds of synchronizers that rely on a
0048 * single atomic <tt>int</tt> value to represent state. Subclasses
0049 * must define the protected methods that change this state, and which
0050 * define what that state means in terms of this object being acquired
0051 * or released. Given these, the other methods in this class carry
0052 * out all queuing and blocking mechanics. Subclasses can maintain
0053 * other state fields, but only the atomically updated <tt>int</tt>
0054 * value manipulated using methods {@link #getState}, {@link
0055 * #setState} and {@link #compareAndSetState} is tracked with respect
0056 * to synchronization.
0057 *
0058 * <p>Subclasses should be defined as non-public internal helper
0059 * classes that are used to implement the synchronization properties
0060 * of their enclosing class. Class
0061 * <tt>AbstractQueuedSynchronizer</tt> does not implement any
0062 * synchronization interface. Instead it defines methods such as
0063 * {@link #acquireInterruptibly} that can be invoked as
0064 * appropriate by concrete locks and related synchronizers to
0065 * implement their public methods.
0066 *
0067 * <p>This class supports either or both a default <em>exclusive</em>
0068 * mode and a <em>shared</em> mode. When acquired in exclusive mode,
0069 * attempted acquires by other threads cannot succeed. Shared mode
0070 * acquires by multiple threads may (but need not) succeed. This class
0071 * does not "understand" these differences except in the
0072 * mechanical sense that when a shared mode acquire succeeds, the next
0073 * waiting thread (if one exists) must also determine whether it can
0074 * acquire as well. Threads waiting in the different modes share the
0075 * same FIFO queue. Usually, implementation subclasses support only
0076 * one of these modes, but both can come into play for example in a
0077 * {@link ReadWriteLock}. Subclasses that support only exclusive or
0078 * only shared modes need not define the methods supporting the unused mode.
0079 *
0080 * <p>This class defines a nested {@link ConditionObject} class that
0081 * can be used as a {@link Condition} implementation by subclasses
0082 * supporting exclusive mode for which method {@link
0083 * #isHeldExclusively} reports whether synchronization is exclusively
0084 * held with respect to the current thread, method {@link #release}
0085 * invoked with the current {@link #getState} value fully releases
0086 * this object, and {@link #acquire}, given this saved state value,
0087 * eventually restores this object to its previous acquired state. No
0088 * <tt>AbstractQueuedSynchronizer</tt> method otherwise creates such a
0089 * condition, so if this constraint cannot be met, do not use it. The
0090 * behavior of {@link ConditionObject} depends of course on the
0091 * semantics of its synchronizer implementation.
0092 *
0093 * <p>This class provides inspection, instrumentation, and monitoring
0094 * methods for the internal queue, as well as similar methods for
0095 * condition objects. These can be exported as desired into classes
0096 * using an <tt>AbstractQueuedSynchronizer</tt> for their
0097 * synchronization mechanics.
0098 *
0099 * <p>Serialization of this class stores only the underlying atomic
0100 * integer maintaining state, so deserialized objects have empty
0101 * thread queues. Typical subclasses requiring serializability will
0102 * define a <tt>readObject</tt> method that restores this to a known
0103 * initial state upon deserialization.
0104 *
0105 * <h3>Usage</h3>
0106 *
0107 * <p>To use this class as the basis of a synchronizer, redefine the
0108 * following methods, as applicable, by inspecting and/or modifying
0109 * the synchronization state using {@link #getState}, {@link
0110 * #setState} and/or {@link #compareAndSetState}:
0111 *
0112 * <ul>
0113 * <li> {@link #tryAcquire}
0114 * <li> {@link #tryRelease}
0115 * <li> {@link #tryAcquireShared}
0116 * <li> {@link #tryReleaseShared}
0117 * <li> {@link #isHeldExclusively}
0118 *</ul>
0119 *
0120 * Each of these methods by default throws {@link
0121 * UnsupportedOperationException}. Implementations of these methods
0122 * must be internally thread-safe, and should in general be short and
0123 * not block. Defining these methods is the <em>only</em> supported
0124 * means of using this class. All other methods are declared
0125 * <tt>final</tt> because they cannot be independently varied.
0126 *
0127 * <p>You may also find the inherited methods from {@link
0128 * AbstractOwnableSynchronizer} useful to keep track of the thread
0129 * owning an exclusive synchronizer. You are encouraged to use them
0130 * -- this enables monitoring and diagnostic tools to assist users in
0131 * determining which threads hold locks.
0132 *
0133 * <p>Even though this class is based on an internal FIFO queue, it
0134 * does not automatically enforce FIFO acquisition policies. The core
0135 * of exclusive synchronization takes the form:
0136 *
0137 * <pre>
0138 * Acquire:
0139 * while (!tryAcquire(arg)) {
0140 * <em>enqueue thread if it is not already queued</em>;
0141 * <em>possibly block current thread</em>;
0142 * }
0143 *
0144 * Release:
0145 * if (tryRelease(arg))
0146 * <em>unblock the first queued thread</em>;
0147 * </pre>
0148 *
0149 * (Shared mode is similar but may involve cascading signals.)
0150 *
0151 * <p><a name="barging">Because checks in acquire are invoked before
0152 * enqueuing, a newly acquiring thread may <em>barge</em> ahead of
0153 * others that are blocked and queued. However, you can, if desired,
0154 * define <tt>tryAcquire</tt> and/or <tt>tryAcquireShared</tt> to
0155 * disable barging by internally invoking one or more of the inspection
0156 * methods, thereby providing a <em>fair</em> FIFO acquisition order.
0157 * In particular, most fair synchronizers can define <tt>tryAcquire</tt>
0158 * to return <tt>false</tt> if {@link #hasQueuedPredecessors} (a method
0159 * specifically designed to be used by fair synchronizers) returns
0160 * <tt>true</tt>. Other variations are possible.
0161 *
0162 * <p>Throughput and scalability are generally highest for the
0163 * default barging (also known as <em>greedy</em>,
0164 * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy.
0165 * While this is not guaranteed to be fair or starvation-free, earlier
0166 * queued threads are allowed to recontend before later queued
0167 * threads, and each recontention has an unbiased chance to succeed
0168 * against incoming threads. Also, while acquires do not
0169 * "spin" in the usual sense, they may perform multiple
0170 * invocations of <tt>tryAcquire</tt> interspersed with other
0171 * computations before blocking. This gives most of the benefits of
0172 * spins when exclusive synchronization is only briefly held, without
0173 * most of the liabilities when it isn't. If so desired, you can
0174 * augment this by preceding calls to acquire methods with
0175 * "fast-path" checks, possibly prechecking {@link #hasContended}
0176 * and/or {@link #hasQueuedThreads} to only do so if the synchronizer
0177 * is likely not to be contended.
0178 *
0179 * <p>This class provides an efficient and scalable basis for
0180 * synchronization in part by specializing its range of use to
0181 * synchronizers that can rely on <tt>int</tt> state, acquire, and
0182 * release parameters, and an internal FIFO wait queue. When this does
0183 * not suffice, you can build synchronizers from a lower level using
0184 * {@link java.util.concurrent.atomic atomic} classes, your own custom
0185 * {@link java.util.Queue} classes, and {@link LockSupport} blocking
0186 * support.
0187 *
0188 * <h3>Usage Examples</h3>
0189 *
0190 * <p>Here is a non-reentrant mutual exclusion lock class that uses
0191 * the value zero to represent the unlocked state, and one to
0192 * represent the locked state. While a non-reentrant lock
0193 * does not strictly require recording of the current owner
0194 * thread, this class does so anyway to make usage easier to monitor.
0195 * It also supports conditions and exposes
0196 * one of the instrumentation methods:
0197 *
0198 * <pre>
0199 * class Mutex implements Lock, java.io.Serializable {
0200 *
0201 * // Our internal helper class
0202 * private static class Sync extends AbstractQueuedSynchronizer {
0203 * // Report whether in locked state
0204 * protected boolean isHeldExclusively() {
0205 * return getState() == 1;
0206 * }
0207 *
0208 * // Acquire the lock if state is zero
0209 * public boolean tryAcquire(int acquires) {
0210 * assert acquires == 1; // Otherwise unused
0211 * if (compareAndSetState(0, 1)) {
0212 * setExclusiveOwnerThread(Thread.currentThread());
0213 * return true;
0214 * }
0215 * return false;
0216 * }
0217 *
0218 * // Release the lock by setting state to zero
0219 * protected boolean tryRelease(int releases) {
0220 * assert releases == 1; // Otherwise unused
0221 * if (getState() == 0) throw new IllegalMonitorStateException();
0222 * setExclusiveOwnerThread(null);
0223 * setState(0);
0224 * return true;
0225 * }
0226 *
0227 * // Provide a Condition
0228 * Condition newCondition() { return new ConditionObject(); }
0229 *
0230 * // Deserialize properly
0231 * private void readObject(ObjectInputStream s)
0232 * throws IOException, ClassNotFoundException {
0233 * s.defaultReadObject();
0234 * setState(0); // reset to unlocked state
0235 * }
0236 * }
0237 *
0238 * // The sync object does all the hard work. We just forward to it.
0239 * private final Sync sync = new Sync();
0240 *
0241 * public void lock() { sync.acquire(1); }
0242 * public boolean tryLock() { return sync.tryAcquire(1); }
0243 * public void unlock() { sync.release(1); }
0244 * public Condition newCondition() { return sync.newCondition(); }
0245 * public boolean isLocked() { return sync.isHeldExclusively(); }
0246 * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
0247 * public void lockInterruptibly() throws InterruptedException {
0248 * sync.acquireInterruptibly(1);
0249 * }
0250 * public boolean tryLock(long timeout, TimeUnit unit)
0251 * throws InterruptedException {
0252 * return sync.tryAcquireNanos(1, unit.toNanos(timeout));
0253 * }
0254 * }
0255 * </pre>
0256 *
0257 * <p>Here is a latch class that is like a {@link CountDownLatch}
0258 * except that it only requires a single <tt>signal</tt> to
0259 * fire. Because a latch is non-exclusive, it uses the <tt>shared</tt>
0260 * acquire and release methods.
0261 *
0262 * <pre>
0263 * class BooleanLatch {
0264 *
0265 * private static class Sync extends AbstractQueuedSynchronizer {
0266 * boolean isSignalled() { return getState() != 0; }
0267 *
0268 * protected int tryAcquireShared(int ignore) {
0269 * return isSignalled()? 1 : -1;
0270 * }
0271 *
0272 * protected boolean tryReleaseShared(int ignore) {
0273 * setState(1);
0274 * return true;
0275 * }
0276 * }
0277 *
0278 * private final Sync sync = new Sync();
0279 * public boolean isSignalled() { return sync.isSignalled(); }
0280 * public void signal() { sync.releaseShared(1); }
0281 * public void await() throws InterruptedException {
0282 * sync.acquireSharedInterruptibly(1);
0283 * }
0284 * }
0285 * </pre>
0286 *
0287 * @since 1.5
0288 * @author Doug Lea
0289 */
0290 public abstract class AbstractQueuedSynchronizer extends
0291 AbstractOwnableSynchronizer implements java.io.Serializable {
0292
0293 private static final long serialVersionUID = 7373984972572414691L;
0294
0295 /**
0296 * Creates a new <tt>AbstractQueuedSynchronizer</tt> instance
0297 * with initial synchronization state of zero.
0298 */
0299 protected AbstractQueuedSynchronizer() {
0300 }
0301
0302 /**
0303 * Wait queue node class.
0304 *
0305 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
0306 * Hagersten) lock queue. CLH locks are normally used for
0307 * spinlocks. We instead use them for blocking synchronizers, but
0308 * use the same basic tactic of holding some of the control
0309 * information about a thread in the predecessor of its node. A
0310 * "status" field in each node keeps track of whether a thread
0311 * should block. A node is signalled when its predecessor
0312 * releases. Each node of the queue otherwise serves as a
0313 * specific-notification-style monitor holding a single waiting
0314 * thread. The status field does NOT control whether threads are
0315 * granted locks etc though. A thread may try to acquire if it is
0316 * first in the queue. But being first does not guarantee success;
0317 * it only gives the right to contend. So the currently released
0318 * contender thread may need to rewait.
0319 *
0320 * <p>To enqueue into a CLH lock, you atomically splice it in as new
0321 * tail. To dequeue, you just set the head field.
0322 * <pre>
0323 * +------+ prev +-----+ +-----+
0324 * head | | <---- | | <---- | | tail
0325 * +------+ +-----+ +-----+
0326 * </pre>
0327 *
0328 * <p>Insertion into a CLH queue requires only a single atomic
0329 * operation on "tail", so there is a simple atomic point of
0330 * demarcation from unqueued to queued. Similarly, dequeing
0331 * involves only updating the "head". However, it takes a bit
0332 * more work for nodes to determine who their successors are,
0333 * in part to deal with possible cancellation due to timeouts
0334 * and interrupts.
0335 *
0336 * <p>The "prev" links (not used in original CLH locks), are mainly
0337 * needed to handle cancellation. If a node is cancelled, its
0338 * successor is (normally) relinked to a non-cancelled
0339 * predecessor. For explanation of similar mechanics in the case
0340 * of spin locks, see the papers by Scott and Scherer at
0341 * http://www.cs.rochester.edu/u/scott/synchronization/
0342 *
0343 * <p>We also use "next" links to implement blocking mechanics.
0344 * The thread id for each node is kept in its own node, so a
0345 * predecessor signals the next node to wake up by traversing
0346 * next link to determine which thread it is. Determination of
0347 * successor must avoid races with newly queued nodes to set
0348 * the "next" fields of their predecessors. This is solved
0349 * when necessary by checking backwards from the atomically
0350 * updated "tail" when a node's successor appears to be null.
0351 * (Or, said differently, the next-links are an optimization
0352 * so that we don't usually need a backward scan.)
0353 *
0354 * <p>Cancellation introduces some conservatism to the basic
0355 * algorithms. Since we must poll for cancellation of other
0356 * nodes, we can miss noticing whether a cancelled node is
0357 * ahead or behind us. This is dealt with by always unparking
0358 * successors upon cancellation, allowing them to stabilize on
0359 * a new predecessor, unless we can identify an uncancelled
0360 * predecessor who will carry this responsibility.
0361 *
0362 * <p>CLH queues need a dummy header node to get started. But
0363 * we don't create them on construction, because it would be wasted
0364 * effort if there is never contention. Instead, the node
0365 * is constructed and head and tail pointers are set upon first
0366 * contention.
0367 *
0368 * <p>Threads waiting on Conditions use the same nodes, but
0369 * use an additional link. Conditions only need to link nodes
0370 * in simple (non-concurrent) linked queues because they are
0371 * only accessed when exclusively held. Upon await, a node is
0372 * inserted into a condition queue. Upon signal, the node is
0373 * transferred to the main queue. A special value of status
0374 * field is used to mark which queue a node is on.
0375 *
0376 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
0377 * Scherer and Michael Scott, along with members of JSR-166
0378 * expert group, for helpful ideas, discussions, and critiques
0379 * on the design of this class.
0380 */
0381 static final class Node {
0382 /** Marker to indicate a node is waiting in shared mode */
0383 static final Node SHARED = new Node();
0384 /** Marker to indicate a node is waiting in exclusive mode */
0385 static final Node EXCLUSIVE = null;
0386
0387 /** waitStatus value to indicate thread has cancelled */
0388 static final int CANCELLED = 1;
0389 /** waitStatus value to indicate successor's thread needs unparking */
0390 static final int SIGNAL = -1;
0391 /** waitStatus value to indicate thread is waiting on condition */
0392 static final int CONDITION = -2;
0393
0394 /**
0395 * Status field, taking on only the values:
0396 * SIGNAL: The successor of this node is (or will soon be)
0397 * blocked (via park), so the current node must
0398 * unpark its successor when it releases or
0399 * cancels. To avoid races, acquire methods must
0400 * first indicate they need a signal,
0401 * then retry the atomic acquire, and then,
0402 * on failure, block.
0403 * CANCELLED: This node is cancelled due to timeout or interrupt.
0404 * Nodes never leave this state. In particular,
0405 * a thread with cancelled node never again blocks.
0406 * CONDITION: This node is currently on a condition queue.
0407 * It will not be used as a sync queue node until
0408 * transferred. (Use of this value here
0409 * has nothing to do with the other uses
0410 * of the field, but simplifies mechanics.)
0411 * 0: None of the above
0412 *
0413 * The values are arranged numerically to simplify use.
0414 * Non-negative values mean that a node doesn't need to
0415 * signal. So, most code doesn't need to check for particular
0416 * values, just for sign.
0417 *
0418 * The field is initialized to 0 for normal sync nodes, and
0419 * CONDITION for condition nodes. It is modified using CAS
0420 * (or when possible, unconditional volatile writes).
0421 */
0422 volatile int waitStatus;
0423
0424 /**
0425 * Link to predecessor node that current node/thread relies on
0426 * for checking waitStatus. Assigned during enqueing, and nulled
0427 * out (for sake of GC) only upon dequeuing. Also, upon
0428 * cancellation of a predecessor, we short-circuit while
0429 * finding a non-cancelled one, which will always exist
0430 * because the head node is never cancelled: A node becomes
0431 * head only as a result of successful acquire. A
0432 * cancelled thread never succeeds in acquiring, and a thread only
0433 * cancels itself, not any other node.
0434 */
0435 volatile Node prev;
0436
0437 /**
0438 * Link to the successor node that the current node/thread
0439 * unparks upon release. Assigned during enqueuing, adjusted
0440 * when bypassing cancelled predecessors, and nulled out (for
0441 * sake of GC) when dequeued. The enq operation does not
0442 * assign next field of a predecessor until after attachment,
0443 * so seeing a null next field does not necessarily mean that
0444 * node is at end of queue. However, if a next field appears
0445 * to be null, we can scan prev's from the tail to
0446 * double-check. The next field of cancelled nodes is set to
0447 * point to the node itself instead of null, to make life
0448 * easier for isOnSyncQueue.
0449 */
0450 volatile Node next;
0451
0452 /**
0453 * The thread that enqueued this node. Initialized on
0454 * construction and nulled out after use.
0455 */
0456 volatile Thread thread;
0457
0458 /**
0459 * Link to next node waiting on condition, or the special
0460 * value SHARED. Because condition queues are accessed only
0461 * when holding in exclusive mode, we just need a simple
0462 * linked queue to hold nodes while they are waiting on
0463 * conditions. They are then transferred to the queue to
0464 * re-acquire. And because conditions can only be exclusive,
0465 * we save a field by using special value to indicate shared
0466 * mode.
0467 */
0468 Node nextWaiter;
0469
0470 /**
0471 * Returns true if node is waiting in shared mode
0472 */
0473 final boolean isShared() {
0474 return nextWaiter == SHARED;
0475 }
0476
0477 /**
0478 * Returns previous node, or throws NullPointerException if null.
0479 * Use when predecessor cannot be null. The null check could
0480 * be elided, but is present to help the VM.
0481 *
0482 * @return the predecessor of this node
0483 */
0484 final Node predecessor() throws NullPointerException {
0485 Node p = prev;
0486 if (p == null)
0487 throw new NullPointerException();
0488 else
0489 return p;
0490 }
0491
0492 Node() { // Used to establish initial head or SHARED marker
0493 }
0494
0495 Node(Thread thread, Node mode) { // Used by addWaiter
0496 this .nextWaiter = mode;
0497 this .thread = thread;
0498 }
0499
0500 Node(Thread thread, int waitStatus) { // Used by Condition
0501 this .waitStatus = waitStatus;
0502 this .thread = thread;
0503 }
0504 }
0505
0506 /**
0507 * Head of the wait queue, lazily initialized. Except for
0508 * initialization, it is modified only via method setHead. Note:
0509 * If head exists, its waitStatus is guaranteed not to be
0510 * CANCELLED.
0511 */
0512 private transient volatile Node head;
0513
0514 /**
0515 * Tail of the wait queue, lazily initialized. Modified only via
0516 * method enq to add new wait node.
0517 */
0518 private transient volatile Node tail;
0519
0520 /**
0521 * The synchronization state.
0522 */
0523 private volatile int state;
0524
0525 /**
0526 * Returns the current value of synchronization state.
0527 * This operation has memory semantics of a <tt>volatile</tt> read.
0528 * @return current state value
0529 */
0530 protected final int getState() {
0531 return state;
0532 }
0533
0534 /**
0535 * Sets the value of synchronization state.
0536 * This operation has memory semantics of a <tt>volatile</tt> write.
0537 * @param newState the new state value
0538 */
0539 protected final void setState(int newState) {
0540 state = newState;
0541 }
0542
0543 /**
0544 * Atomically sets synchronization state to the given updated
0545 * value if the current state value equals the expected value.
0546 * This operation has memory semantics of a <tt>volatile</tt> read
0547 * and write.
0548 *
0549 * @param expect the expected value
0550 * @param update the new value
0551 * @return true if successful. False return indicates that the actual
0552 * value was not equal to the expected value.
0553 */
0554 protected final boolean compareAndSetState(int expect, int update) {
0555 // See below for intrinsics setup to support this
0556 return unsafe.compareAndSwapInt(this , stateOffset, expect,
0557 update);
0558 }
0559
0560 // Queuing utilities
0561
0562 /**
0563 * The number of nanoseconds for which it is faster to spin
0564 * rather than to use timed park. A rough estimate suffices
0565 * to improve responsiveness with very short timeouts.
0566 */
0567 static final long spinForTimeoutThreshold = 1000L;
0568
0569 /**
0570 * Inserts node into queue, initializing if necessary. See picture above.
0571 * @param node the node to insert
0572 * @return node's predecessor
0573 */
0574 private Node enq(final Node node) {
0575 for (;;) {
0576 Node t = tail;
0577 if (t == null) { // Must initialize
0578 if (compareAndSetHead(new Node()))
0579 tail = head;
0580 } else {
0581 node.prev = t;
0582 if (compareAndSetTail(t, node)) {
0583 t.next = node;
0584 return t;
0585 }
0586 }
0587 }
0588 }
0589
0590 /**
0591 * Creates and enqueues node for current thread and given mode.
0592 *
0593 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
0594 * @return the new node
0595 */
0596 private Node addWaiter(Node mode) {
0597 Node node = new Node(Thread.currentThread(), mode);
0598 // Try the fast path of enq; backup to full enq on failure
0599 Node pred = tail;
0600 if (pred != null) {
0601 node.prev = pred;
0602 if (compareAndSetTail(pred, node)) {
0603 pred.next = node;
0604 return node;
0605 }
0606 }
0607 enq(node);
0608 return node;
0609 }
0610
0611 /**
0612 * Sets head of queue to be node, thus dequeuing. Called only by
0613 * acquire methods. Also nulls out unused fields for sake of GC
0614 * and to suppress unnecessary signals and traversals.
0615 *
0616 * @param node the node
0617 */
0618 private void setHead(Node node) {
0619 head = node;
0620 node.thread = null;
0621 node.prev = null;
0622 }
0623
0624 /**
0625 * Wakes up node's successor, if one exists.
0626 *
0627 * @param node the node
0628 */
0629 private void unparkSuccessor(Node node) {
0630 /*
0631 * Try to clear status in anticipation of signalling. It is
0632 * OK if this fails or if status is changed by waiting thread.
0633 */
0634 compareAndSetWaitStatus(node, Node.SIGNAL, 0);
0635
0636 /*
0637 * Thread to unpark is held in successor, which is normally
0638 * just the next node. But if cancelled or apparently null,
0639 * traverse backwards from tail to find the actual
0640 * non-cancelled successor.
0641 */
0642 Node s = node.next;
0643 if (s == null || s.waitStatus > 0) {
0644 s = null;
0645 for (Node t = tail; t != null && t != node; t = t.prev)
0646 if (t.waitStatus <= 0)
0647 s = t;
0648 }
0649 if (s != null)
0650 LockSupport.unpark(s.thread);
0651 }
0652
0653 /**
0654 * Sets head of queue, and checks if successor may be waiting
0655 * in shared mode, if so propagating if propagate > 0.
0656 *
0657 * @param pred the node holding waitStatus for node
0658 * @param node the node
0659 * @param propagate the return value from a tryAcquireShared
0660 */
0661 private void setHeadAndPropagate(Node node, int propagate) {
0662 setHead(node);
0663 if (propagate > 0 && node.waitStatus != 0) {
0664 /*
0665 * Don't bother fully figuring out successor. If it
0666 * looks null, call unparkSuccessor anyway to be safe.
0667 */
0668 Node s = node.next;
0669 if (s == null || s.isShared())
0670 unparkSuccessor(node);
0671 }
0672 }
0673
0674 // Utilities for various versions of acquire
0675
0676 /**
0677 * Cancels an ongoing attempt to acquire.
0678 *
0679 * @param node the node
0680 */
0681 private void cancelAcquire(Node node) {
0682 // Ignore if node doesn't exist
0683 if (node == null)
0684 return;
0685
0686 node.thread = null;
0687
0688 // Skip cancelled predecessors
0689 Node pred = node.prev;
0690 while (pred.waitStatus > 0)
0691 node.prev = pred = pred.prev;
0692
0693 // Getting this before setting waitStatus ensures staleness
0694 Node predNext = pred.next;
0695
0696 // Can use unconditional write instead of CAS here
0697 node.waitStatus = Node.CANCELLED;
0698
0699 // If we are the tail, remove ourselves
0700 if (node == tail && compareAndSetTail(node, pred)) {
0701 compareAndSetNext(pred, predNext, null);
0702 } else {
0703 // If "active" predecessor found...
0704 if (pred != head
0705 && (pred.waitStatus == Node.SIGNAL || compareAndSetWaitStatus(
0706 pred, 0, Node.SIGNAL))
0707 && pred.thread != null) {
0708
0709 // If successor is active, set predecessor's next link
0710 Node next = node.next;
0711 if (next != null && next.waitStatus <= 0)
0712 compareAndSetNext(pred, predNext, next);
0713 } else {
0714 unparkSuccessor(node);
0715 }
0716
0717 node.next = node; // help GC
0718 }
0719 }
0720
0721 /**
0722 * Checks and updates status for a node that failed to acquire.
0723 * Returns true if thread should block. This is the main signal
0724 * control in all acquire loops. Requires that pred == node.prev
0725 *
0726 * @param pred node's predecessor holding status
0727 * @param node the node
0728 * @return {@code true} if thread should block
0729 */
0730 private static boolean shouldParkAfterFailedAcquire(Node pred,
0731 Node node) {
0732 int s = pred.waitStatus;
0733 if (s < 0)
0734 /*
0735 * This node has already set status asking a release
0736 * to signal it, so it can safely park.
0737 */
0738 return true;
0739 if (s > 0) {
0740 /*
0741 * Predecessor was cancelled. Skip over predecessors and
0742 * indicate retry.
0743 */
0744 do {
0745 node.prev = pred = pred.prev;
0746 } while (pred.waitStatus > 0);
0747 pred.next = node;
0748 } else
0749 /*
0750 * Indicate that we need a signal, but don't park yet. Caller
0751 * will need to retry to make sure it cannot acquire before
0752 * parking.
0753 */
0754 compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
0755 return false;
0756 }
0757
0758 /**
0759 * Convenience method to interrupt current thread.
0760 */
0761 private static void selfInterrupt() {
0762 Thread.currentThread().interrupt();
0763 }
0764
0765 /**
0766 * Convenience method to park and then check if interrupted
0767 *
0768 * @return {@code true} if interrupted
0769 */
0770 private final boolean parkAndCheckInterrupt() {
0771 LockSupport.park(this );
0772 return Thread.interrupted();
0773 }
0774
0775 /*
0776 * Various flavors of acquire, varying in exclusive/shared and
0777 * control modes. Each is mostly the same, but annoyingly
0778 * different. Only a little bit of factoring is possible due to
0779 * interactions of exception mechanics (including ensuring that we
0780 * cancel if tryAcquire throws exception) and other control, at
0781 * least not without hurting performance too much.
0782 */
0783
0784 /**
0785 * Acquires in exclusive uninterruptible mode for thread already in
0786 * queue. Used by condition wait methods as well as acquire.
0787 *
0788 * @param node the node
0789 * @param arg the acquire argument
0790 * @return {@code true} if interrupted while waiting
0791 */
0792 final boolean acquireQueued(final Node node, int arg) {
0793 boolean failed = true;
0794 try {
0795 boolean interrupted = false;
0796 for (;;) {
0797 final Node p = node.predecessor();
0798 if (p == head && tryAcquire(arg)) {
0799 setHead(node);
0800 p.next = null; // help GC
0801 failed = false;
0802 return interrupted;
0803 }
0804 if (shouldParkAfterFailedAcquire(p, node)
0805 && parkAndCheckInterrupt())
0806 interrupted = true;
0807 }
0808 } finally {
0809 if (failed)
0810 cancelAcquire(node);
0811 }
0812 }
0813
0814 /**
0815 * Acquires in exclusive interruptible mode.
0816 * @param arg the acquire argument
0817 */
0818 private void doAcquireInterruptibly(int arg)
0819 throws InterruptedException {
0820 final Node node = addWaiter(Node.EXCLUSIVE);
0821 boolean failed = true;
0822 try {
0823 for (;;) {
0824 final Node p = node.predecessor();
0825 if (p == head && tryAcquire(arg)) {
0826 setHead(node);
0827 p.next = null; // help GC
0828 failed = false;
0829 return;
0830 }
0831 if (shouldParkAfterFailedAcquire(p, node)
0832 && parkAndCheckInterrupt())
0833 throw new InterruptedException();
0834 }
0835 } finally {
0836 if (failed)
0837 cancelAcquire(node);
0838 }
0839 }
0840
0841 /**
0842 * Acquires in exclusive timed mode.
0843 *
0844 * @param arg the acquire argument
0845 * @param nanosTimeout max wait time
0846 * @return {@code true} if acquired
0847 */
0848 private boolean doAcquireNanos(int arg, long nanosTimeout)
0849 throws InterruptedException {
0850 long lastTime = System.nanoTime();
0851 final Node node = addWaiter(Node.EXCLUSIVE);
0852 boolean failed = true;
0853 try {
0854 for (;;) {
0855 final Node p = node.predecessor();
0856 if (p == head && tryAcquire(arg)) {
0857 setHead(node);
0858 p.next = null; // help GC
0859 failed = false;
0860 return true;
0861 }
0862 if (nanosTimeout <= 0)
0863 return false;
0864 if (shouldParkAfterFailedAcquire(p, node)
0865 && nanosTimeout > spinForTimeoutThreshold)
0866 LockSupport.parkNanos(this , nanosTimeout);
0867 long now = System.nanoTime();
0868 nanosTimeout -= now - lastTime;
0869 lastTime = now;
0870 if (Thread.interrupted())
0871 throw new InterruptedException();
0872 }
0873 } finally {
0874 if (failed)
0875 cancelAcquire(node);
0876 }
0877 }
0878
0879 /**
0880 * Acquires in shared uninterruptible mode.
0881 * @param arg the acquire argument
0882 */
0883 private void doAcquireShared(int arg) {
0884 final Node node = addWaiter(Node.SHARED);
0885 boolean failed = true;
0886 try {
0887 boolean interrupted = false;
0888 for (;;) {
0889 final Node p = node.predecessor();
0890 if (p == head) {
0891 int r = tryAcquireShared(arg);
0892 if (r >= 0) {
0893 setHeadAndPropagate(node, r);
0894 p.next = null; // help GC
0895 if (interrupted)
0896 selfInterrupt();
0897 failed = false;
0898 return;
0899 }
0900 }
0901 if (shouldParkAfterFailedAcquire(p, node)
0902 && parkAndCheckInterrupt())
0903 interrupted = true;
0904 }
0905 } finally {
0906 if (failed)
0907 cancelAcquire(node);
0908 }
0909 }
0910
0911 /**
0912 * Acquires in shared interruptible mode.
0913 * @param arg the acquire argument
0914 */
0915 private void doAcquireSharedInterruptibly(int arg)
0916 throws InterruptedException {
0917 final Node node = addWaiter(Node.SHARED);
0918 boolean failed = true;
0919 try {
0920 for (;;) {
0921 final Node p = node.predecessor();
0922 if (p == head) {
0923 int r = tryAcquireShared(arg);
0924 if (r >= 0) {
0925 setHeadAndPropagate(node, r);
0926 p.next = null; // help GC
0927 failed = false;
0928 return;
0929 }
0930 }
0931 if (shouldParkAfterFailedAcquire(p, node)
0932 && parkAndCheckInterrupt())
0933 throw new InterruptedException();
0934 }
0935 } finally {
0936 if (failed)
0937 cancelAcquire(node);
0938 }
0939 }
0940
0941 /**
0942 * Acquires in shared timed mode.
0943 *
0944 * @param arg the acquire argument
0945 * @param nanosTimeout max wait time
0946 * @return {@code true} if acquired
0947 */
0948 private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
0949 throws InterruptedException {
0950
0951 long lastTime = System.nanoTime();
0952 final Node node = addWaiter(Node.SHARED);
0953 boolean failed = true;
0954 try {
0955 for (;;) {
0956 final Node p = node.predecessor();
0957 if (p == head) {
0958 int r = tryAcquireShared(arg);
0959 if (r >= 0) {
0960 setHeadAndPropagate(node, r);
0961 p.next = null; // help GC
0962 failed = false;
0963 return true;
0964 }
0965 }
0966 if (nanosTimeout <= 0)
0967 return false;
0968 if (shouldParkAfterFailedAcquire(p, node)
0969 && nanosTimeout > spinForTimeoutThreshold)
0970 LockSupport.parkNanos(this , nanosTimeout);
0971 long now = System.nanoTime();
0972 nanosTimeout -= now - lastTime;
0973 lastTime = now;
0974 if (Thread.interrupted())
0975 throw new InterruptedException();
0976 }
0977 } finally {
0978 if (failed)
0979 cancelAcquire(node);
0980 }
0981 }
0982
0983 // Main exported methods
0984
0985 /**
0986 * Attempts to acquire in exclusive mode. This method should query
0987 * if the state of the object permits it to be acquired in the
0988 * exclusive mode, and if so to acquire it.
0989 *
0990 * <p>This method is always invoked by the thread performing
0991 * acquire. If this method reports failure, the acquire method
0992 * may queue the thread, if it is not already queued, until it is
0993 * signalled by a release from some other thread. This can be used
0994 * to implement method {@link Lock#tryLock()}.
0995 *
0996 * <p>The default
0997 * implementation throws {@link UnsupportedOperationException}.
0998 *
0999 * @param arg the acquire argument. This value is always the one
1000 * passed to an acquire method, or is the value saved on entry
1001 * to a condition wait. The value is otherwise uninterpreted
1002 * and can represent anything you like.
1003 * @return {@code true} if successful. Upon success, this object has
1004 * been acquired.
1005 * @throws IllegalMonitorStateException if acquiring would place this
1006 * synchronizer in an illegal state. This exception must be
1007 * thrown in a consistent fashion for synchronization to work
1008 * correctly.
1009 * @throws UnsupportedOperationException if exclusive mode is not supported
1010 */
1011 protected boolean tryAcquire(int arg) {
1012 throw new UnsupportedOperationException();
1013 }
1014
1015 /**
1016 * Attempts to set the state to reflect a release in exclusive
1017 * mode.
1018 *
1019 * <p>This method is always invoked by the thread performing release.
1020 *
1021 * <p>The default implementation throws
1022 * {@link UnsupportedOperationException}.
1023 *
1024 * @param arg the release argument. This value is always the one
1025 * passed to a release method, or the current state value upon
1026 * entry to a condition wait. The value is otherwise
1027 * uninterpreted and can represent anything you like.
1028 * @return {@code true} if this object is now in a fully released
1029 * state, so that any waiting threads may attempt to acquire;
1030 * and {@code false} otherwise.
1031 * @throws IllegalMonitorStateException if releasing would place this
1032 * synchronizer in an illegal state. This exception must be
1033 * thrown in a consistent fashion for synchronization to work
1034 * correctly.
1035 * @throws UnsupportedOperationException if exclusive mode is not supported
1036 */
1037 protected boolean tryRelease(int arg) {
1038 throw new UnsupportedOperationException();
1039 }
1040
1041 /**
1042 * Attempts to acquire in shared mode. This method should query if
1043 * the state of the object permits it to be acquired in the shared
1044 * mode, and if so to acquire it.
1045 *
1046 * <p>This method is always invoked by the thread performing
1047 * acquire. If this method reports failure, the acquire method
1048 * may queue the thread, if it is not already queued, until it is
1049 * signalled by a release from some other thread.
1050 *
1051 * <p>The default implementation throws {@link
1052 * UnsupportedOperationException}.
1053 *
1054 * @param arg the acquire argument. This value is always the one
1055 * passed to an acquire method, or is the value saved on entry
1056 * to a condition wait. The value is otherwise uninterpreted
1057 * and can represent anything you like.
1058 * @return a negative value on failure; zero if acquisition in shared
1059 * mode succeeded but no subsequent shared-mode acquire can
1060 * succeed; and a positive value if acquisition in shared
1061 * mode succeeded and subsequent shared-mode acquires might
1062 * also succeed, in which case a subsequent waiting thread
1063 * must check availability. (Support for three different
1064 * return values enables this method to be used in contexts
1065 * where acquires only sometimes act exclusively.) Upon
1066 * success, this object has been acquired.
1067 * @throws IllegalMonitorStateException if acquiring would place this
1068 * synchronizer in an illegal state. This exception must be
1069 * thrown in a consistent fashion for synchronization to work
1070 * correctly.
1071 * @throws UnsupportedOperationException if shared mode is not supported
1072 */
1073 protected int tryAcquireShared(int arg) {
1074 throw new UnsupportedOperationException();
1075 }
1076
1077 /**
1078 * Attempts to set the state to reflect a release in shared mode.
1079 *
1080 * <p>This method is always invoked by the thread performing release.
1081 *
1082 * <p>The default implementation throws
1083 * {@link UnsupportedOperationException}.
1084 *
1085 * @param arg the release argument. This value is always the one
1086 * passed to a release method, or the current state value upon
1087 * entry to a condition wait. The value is otherwise
1088 * uninterpreted and can represent anything you like.
1089 * @return {@code true} if this release of shared mode may permit a
1090 * waiting acquire (shared or exclusive) to succeed; and
1091 * {@code false} otherwise
1092 * @throws IllegalMonitorStateException if releasing would place this
1093 * synchronizer in an illegal state. This exception must be
1094 * thrown in a consistent fashion for synchronization to work
1095 * correctly.
1096 * @throws UnsupportedOperationException if shared mode is not supported
1097 */
1098 protected boolean tryReleaseShared(int arg) {
1099 throw new UnsupportedOperationException();
1100 }
1101
1102 /**
1103 * Returns {@code true} if synchronization is held exclusively with
1104 * respect to the current (calling) thread. This method is invoked
1105 * upon each call to a non-waiting {@link ConditionObject} method.
1106 * (Waiting methods instead invoke {@link #release}.)
1107 *
1108 * <p>The default implementation throws {@link
1109 * UnsupportedOperationException}. This method is invoked
1110 * internally only within {@link ConditionObject} methods, so need
1111 * not be defined if conditions are not used.
1112 *
1113 * @return {@code true} if synchronization is held exclusively;
1114 * {@code false} otherwise
1115 * @throws UnsupportedOperationException if conditions are not supported
1116 */
1117 protected boolean isHeldExclusively() {
1118 throw new UnsupportedOperationException();
1119 }
1120
1121 /**
1122 * Acquires in exclusive mode, ignoring interrupts. Implemented
1123 * by invoking at least once {@link #tryAcquire},
1124 * returning on success. Otherwise the thread is queued, possibly
1125 * repeatedly blocking and unblocking, invoking {@link
1126 * #tryAcquire} until success. This method can be used
1127 * to implement method {@link Lock#lock}.
1128 *
1129 * @param arg the acquire argument. This value is conveyed to
1130 * {@link #tryAcquire} but is otherwise uninterpreted and
1131 * can represent anything you like.
1132 */
1133 public final void acquire(int arg) {
1134 if (!tryAcquire(arg)
1135 && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
1136 selfInterrupt();
1137 }
1138
1139 /**
1140 * Acquires in exclusive mode, aborting if interrupted.
1141 * Implemented by first checking interrupt status, then invoking
1142 * at least once {@link #tryAcquire}, returning on
1143 * success. Otherwise the thread is queued, possibly repeatedly
1144 * blocking and unblocking, invoking {@link #tryAcquire}
1145 * until success or the thread is interrupted. This method can be
1146 * used to implement method {@link Lock#lockInterruptibly}.
1147 *
1148 * @param arg the acquire argument. This value is conveyed to
1149 * {@link #tryAcquire} but is otherwise uninterpreted and
1150 * can represent anything you like.
1151 * @throws InterruptedException if the current thread is interrupted
1152 */
1153 public final void acquireInterruptibly(int arg)
1154 throws InterruptedException {
1155 if (Thread.interrupted())
1156 throw new InterruptedException();
1157 if (!tryAcquire(arg))
1158 doAcquireInterruptibly(arg);
1159 }
1160
1161 /**
1162 * Attempts to acquire in exclusive mode, aborting if interrupted,
1163 * and failing if the given timeout elapses. Implemented by first
1164 * checking interrupt status, then invoking at least once {@link
1165 * #tryAcquire}, returning on success. Otherwise, the thread is
1166 * queued, possibly repeatedly blocking and unblocking, invoking
1167 * {@link #tryAcquire} until success or the thread is interrupted
1168 * or the timeout elapses. This method can be used to implement
1169 * method {@link Lock#tryLock(long, TimeUnit)}.
1170 *
1171 * @param arg the acquire argument. This value is conveyed to
1172 * {@link #tryAcquire} but is otherwise uninterpreted and
1173 * can represent anything you like.
1174 * @param nanosTimeout the maximum number of nanoseconds to wait
1175 * @return {@code true} if acquired; {@code false} if timed out
1176 * @throws InterruptedException if the current thread is interrupted
1177 */
1178 public final boolean tryAcquireNanos(int arg, long nanosTimeout)
1179 throws InterruptedException {
1180 if (Thread.interrupted())
1181 throw new InterruptedException();
1182 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
1183 }
1184
1185 /**
1186 * Releases in exclusive mode. Implemented by unblocking one or
1187 * more threads if {@link #tryRelease} returns true.
1188 * This method can be used to implement method {@link Lock#unlock}.
1189 *
1190 * @param arg the release argument. This value is conveyed to
1191 * {@link #tryRelease} but is otherwise uninterpreted and
1192 * can represent anything you like.
1193 * @return the value returned from {@link #tryRelease}
1194 */
1195 public final boolean release(int arg) {
1196 if (tryRelease(arg)) {
1197 Node h = head;
1198 if (h != null && h.waitStatus != 0)
1199 unparkSuccessor(h);
1200 return true;
1201 }
1202 return false;
1203 }
1204
1205 /**
1206 * Acquires in shared mode, ignoring interrupts. Implemented by
1207 * first invoking at least once {@link #tryAcquireShared},
1208 * returning on success. Otherwise the thread is queued, possibly
1209 * repeatedly blocking and unblocking, invoking {@link
1210 * #tryAcquireShared} until success.
1211 *
1212 * @param arg the acquire argument. This value is conveyed to
1213 * {@link #tryAcquireShared} but is otherwise uninterpreted
1214 * and can represent anything you like.
1215 */
1216 public final void acquireShared(int arg) {
1217 if (tryAcquireShared(arg) < 0)
1218 doAcquireShared(arg);
1219 }
1220
1221 /**
1222 * Acquires in shared mode, aborting if interrupted. Implemented
1223 * by first checking interrupt status, then invoking at least once
1224 * {@link #tryAcquireShared}, returning on success. Otherwise the
1225 * thread is queued, possibly repeatedly blocking and unblocking,
1226 * invoking {@link #tryAcquireShared} until success or the thread
1227 * is interrupted.
1228 * @param arg the acquire argument
1229 * This value is conveyed to {@link #tryAcquireShared} but is
1230 * otherwise uninterpreted and can represent anything
1231 * you like.
1232 * @throws InterruptedException if the current thread is interrupted
1233 */
1234 public final void acquireSharedInterruptibly(int arg)
1235 throws InterruptedException {
1236 if (Thread.interrupted())
1237 throw new InterruptedException();
1238 if (tryAcquireShared(arg) < 0)
1239 doAcquireSharedInterruptibly(arg);
1240 }
1241
1242 /**
1243 * Attempts to acquire in shared mode, aborting if interrupted, and
1244 * failing if the given timeout elapses. Implemented by first
1245 * checking interrupt status, then invoking at least once {@link
1246 * #tryAcquireShared}, returning on success. Otherwise, the
1247 * thread is queued, possibly repeatedly blocking and unblocking,
1248 * invoking {@link #tryAcquireShared} until success or the thread
1249 * is interrupted or the timeout elapses.
1250 *
1251 * @param arg the acquire argument. This value is conveyed to
1252 * {@link #tryAcquireShared} but is otherwise uninterpreted
1253 * and can represent anything you like.
1254 * @param nanosTimeout the maximum number of nanoseconds to wait
1255 * @return {@code true} if acquired; {@code false} if timed out
1256 * @throws InterruptedException if the current thread is interrupted
1257 */
1258 public final boolean tryAcquireSharedNanos(int arg,
1259 long nanosTimeout) throws InterruptedException {
1260 if (Thread.interrupted())
1261 throw new InterruptedException();
1262 return tryAcquireShared(arg) >= 0
1263 || doAcquireSharedNanos(arg, nanosTimeout);
1264 }
1265
1266 /**
1267 * Releases in shared mode. Implemented by unblocking one or more
1268 * threads if {@link #tryReleaseShared} returns true.
1269 *
1270 * @param arg the release argument. This value is conveyed to
1271 * {@link #tryReleaseShared} but is otherwise uninterpreted
1272 * and can represent anything you like.
1273 * @return the value returned from {@link #tryReleaseShared}
1274 */
1275 public final boolean releaseShared(int arg) {
1276 if (tryReleaseShared(arg)) {
1277 Node h = head;
1278 if (h != null && h.waitStatus != 0)
1279 unparkSuccessor(h);
1280 return true;
1281 }
1282 return false;
1283 }
1284
1285 // Queue inspection methods
1286
1287 /**
1288 * Queries whether any threads are waiting to acquire. Note that
1289 * because cancellations due to interrupts and timeouts may occur
1290 * at any time, a {@code true} return does not guarantee that any
1291 * other thread will ever acquire.
1292 *
1293 * <p>In this implementation, this operation returns in
1294 * constant time.
1295 *
1296 * @return {@code true} if there may be other threads waiting to acquire
1297 */
1298 public final boolean hasQueuedThreads() {
1299 return head != tail;
1300 }
1301
1302 /**
1303 * Queries whether any threads have ever contended to acquire this
1304 * synchronizer; that is if an acquire method has ever blocked.
1305 *
1306 * <p>In this implementation, this operation returns in
1307 * constant time.
1308 *
1309 * @return {@code true} if there has ever been contention
1310 */
1311 public final boolean hasContended() {
1312 return head != null;
1313 }
1314
1315 /**
1316 * Returns the first (longest-waiting) thread in the queue, or
1317 * {@code null} if no threads are currently queued.
1318 *
1319 * <p>In this implementation, this operation normally returns in
1320 * constant time, but may iterate upon contention if other threads are
1321 * concurrently modifying the queue.
1322 *
1323 * @return the first (longest-waiting) thread in the queue, or
1324 * {@code null} if no threads are currently queued
1325 */
1326 public final Thread getFirstQueuedThread() {
1327 // handle only fast path, else relay
1328 return (head == tail) ? null : fullGetFirstQueuedThread();
1329 }
1330
1331 /**
1332 * Version of getFirstQueuedThread called when fastpath fails
1333 */
1334 private Thread fullGetFirstQueuedThread() {
1335 /*
1336 * The first node is normally head.next. Try to get its
1337 * thread field, ensuring consistent reads: If thread
1338 * field is nulled out or s.prev is no longer head, then
1339 * some other thread(s) concurrently performed setHead in
1340 * between some of our reads. We try this twice before
1341 * resorting to traversal.
1342 */
1343 Node h, s;
1344 Thread st;
1345 if (((h = head) != null && (s = h.next) != null
1346 && s.prev == head && (st = s.thread) != null)
1347 || ((h = head) != null && (s = h.next) != null
1348 && s.prev == head && (st = s.thread) != null))
1349 return st;
1350
1351 /*
1352 * Head's next field might not have been set yet, or may have
1353 * been unset after setHead. So we must check to see if tail
1354 * is actually first node. If not, we continue on, safely
1355 * traversing from tail back to head to find first,
1356 * guaranteeing termination.
1357 */
1358
1359 Node t = tail;
1360 Thread firstThread = null;
1361 while (t != null && t != head) {
1362 Thread tt = t.thread;
1363 if (tt != null)
1364 firstThread = tt;
1365 t = t.prev;
1366 }
1367 return firstThread;
1368 }
1369
1370 /**
1371 * Returns true if the given thread is currently queued.
1372 *
1373 * <p>This implementation traverses the queue to determine
1374 * presence of the given thread.
1375 *
1376 * @param thread the thread
1377 * @return {@code true} if the given thread is on the queue
1378 * @throws NullPointerException if the thread is null
1379 */
1380 public final boolean isQueued(Thread thread) {
1381 if (thread == null)
1382 throw new NullPointerException();
1383 for (Node p = tail; p != null; p = p.prev)
1384 if (p.thread == thread)
1385 return true;
1386 return false;
1387 }
1388
1389 /**
1390 * Returns {@code true} if the apparent first queued thread, if one
1391 * exists, is waiting in exclusive mode. If this method returns
1392 * {@code true}, and the current thread is attempting to acquire in
1393 * shared mode (that is, this method is invoked from {@link
1394 * #tryAcquireShared}) then it is guaranteed that the current thread
1395 * is not the first queued thread. Used only as a heuristic in
1396 * ReentrantReadWriteLock.
1397 */
1398 final boolean apparentlyFirstQueuedIsExclusive() {
1399 Node h, s;
1400 return (h = head) != null && (s = h.next) != null
1401 && !s.isShared() && s.thread != null;
1402 }
1403
1404 /**
1405 * Queries whether any threads have been waiting to acquire longer
1406 * than the current thread.
1407 *
1408 * <p>An invocation of this method is equivalent to (but may be
1409 * more efficient than):
1410 * <pre> {@code
1411 * getFirstQueuedThread() != Thread.currentThread() &&
1412 * hasQueuedThreads()}</pre>
1413 *
1414 * <p>Note that because cancellations due to interrupts and
1415 * timeouts may occur at any time, a {@code true} return does not
1416 * guarantee that some other thread will acquire before the current
1417 * thread. Likewise, it is possible for another thread to win a
1418 * race to enqueue after this method has returned {@code false},
1419 * due to the queue being empty.
1420 *
1421 * <p>This method is designed to be used by a fair synchronizer to
1422 * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
1423 * Such a synchronizer's {@link #tryAcquire} method should return
1424 * {@code false}, and its {@link #tryAcquireShared} method should
1425 * return a negative value, if this method returns {@code true}
1426 * (unless this is a reentrant acquire). For example, the {@code
1427 * tryAcquire} method for a fair, reentrant, exclusive mode
1428 * synchronizer might look like this:
1429 *
1430 * <pre> {@code
1431 * protected boolean tryAcquire(int arg) {
1432 * if (isHeldExclusively()) {
1433 * // A reentrant acquire; increment hold count
1434 * return true;
1435 * } else if (hasQueuedPredecessors()) {
1436 * return false;
1437 * } else {
1438 * // try to acquire normally
1439 * }
1440 * }}</pre>
1441 *
1442 * @return {@code true} if there is a queued thread preceding the
1443 * current thread, and {@code false} if the current thread
1444 * is at the head of the queue or the queue is empty
1445 * @since 1.7
1446 */
1447 public final boolean hasQueuedPredecessors() {
1448 // The correctness of this depends on head being initialized
1449 // before tail and on head.next being accurate if the current
1450 // thread is first in queue.
1451 Node h, s;
1452 return (h = head) != tail
1453 && ((s = h.next) == null || s.thread != Thread
1454 .currentThread());
1455 }
1456
1457 // Instrumentation and monitoring methods
1458
1459 /**
1460 * Returns an estimate of the number of threads waiting to
1461 * acquire. The value is only an estimate because the number of
1462 * threads may change dynamically while this method traverses
1463 * internal data structures. This method is designed for use in
1464 * monitoring system state, not for synchronization
1465 * control.
1466 *
1467 * @return the estimated number of threads waiting to acquire
1468 */
1469 public final int getQueueLength() {
1470 int n = 0;
1471 for (Node p = tail; p != null; p = p.prev) {
1472 if (p.thread != null)
1473 ++n;
1474 }
1475 return n;
1476 }
1477
1478 /**
1479 * Returns a collection containing threads that may be waiting to
1480 * acquire. Because the actual set of threads may change
1481 * dynamically while constructing this result, the returned
1482 * collection is only a best-effort estimate. The elements of the
1483 * returned collection are in no particular order. This method is
1484 * designed to facilitate construction of subclasses that provide
1485 * more extensive monitoring facilities.
1486 *
1487 * @return the collection of threads
1488 */
1489 public final Collection<Thread> getQueuedThreads() {
1490 ArrayList<Thread> list = new ArrayList<Thread>();
1491 for (Node p = tail; p != null; p = p.prev) {
1492 Thread t = p.thread;
1493 if (t != null)
1494 list.add(t);
1495 }
1496 return list;
1497 }
1498
1499 /**
1500 * Returns a collection containing threads that may be waiting to
1501 * acquire in exclusive mode. This has the same properties
1502 * as {@link #getQueuedThreads} except that it only returns
1503 * those threads waiting due to an exclusive acquire.
1504 *
1505 * @return the collection of threads
1506 */
1507 public final Collection<Thread> getExclusiveQueuedThreads() {
1508 ArrayList<Thread> list = new ArrayList<Thread>();
1509 for (Node p = tail; p != null; p = p.prev) {
1510 if (!p.isShared()) {
1511 Thread t = p.thread;
1512 if (t != null)
1513 list.add(t);
1514 }
1515 }
1516 return list;
1517 }
1518
1519 /**
1520 * Returns a collection containing threads that may be waiting to
1521 * acquire in shared mode. This has the same properties
1522 * as {@link #getQueuedThreads} except that it only returns
1523 * those threads waiting due to a shared acquire.
1524 *
1525 * @return the collection of threads
1526 */
1527 public final Collection<Thread> getSharedQueuedThreads() {
1528 ArrayList<Thread> list = new ArrayList<Thread>();
1529 for (Node p = tail; p != null; p = p.prev) {
1530 if (p.isShared()) {
1531 Thread t = p.thread;
1532 if (t != null)
1533 list.add(t);
1534 }
1535 }
1536 return list;
1537 }
1538
1539 /**
1540 * Returns a string identifying this synchronizer, as well as its state.
1541 * The state, in brackets, includes the String {@code "State ="}
1542 * followed by the current value of {@link #getState}, and either
1543 * {@code "nonempty"} or {@code "empty"} depending on whether the
1544 * queue is empty.
1545 *
1546 * @return a string identifying this synchronizer, as well as its state
1547 */
1548 public String toString() {
1549 int s = getState();
1550 String q = hasQueuedThreads() ? "non" : "";
1551 return super .toString() + "[State = " + s + ", " + q
1552 + "empty queue]";
1553 }
1554
1555 // Internal support methods for Conditions
1556
1557 /**
1558 * Returns true if a node, always one that was initially placed on
1559 * a condition queue, is now waiting to reacquire on sync queue.
1560 * @param node the node
1561 * @return true if is reacquiring
1562 */
1563 final boolean isOnSyncQueue(Node node) {
1564 if (node.waitStatus == Node.CONDITION || node.prev == null)
1565 return false;
1566 if (node.next != null) // If has successor, it must be on queue
1567 return true;
1568 /*
1569 * node.prev can be non-null, but not yet on queue because
1570 * the CAS to place it on queue can fail. So we have to
1571 * traverse from tail to make sure it actually made it. It
1572 * will always be near the tail in calls to this method, and
1573 * unless the CAS failed (which is unlikely), it will be
1574 * there, so we hardly ever traverse much.
1575 */
1576 return findNodeFromTail(node);
1577 }
1578
1579 /**
1580 * Returns true if node is on sync queue by searching backwards from tail.
1581 * Called only when needed by isOnSyncQueue.
1582 * @return true if present
1583 */
1584 private boolean findNodeFromTail(Node node) {
1585 Node t = tail;
1586 for (;;) {
1587 if (t == node)
1588 return true;
1589 if (t == null)
1590 return false;
1591 t = t.prev;
1592 }
1593 }
1594
1595 /**
1596 * Transfers a node from a condition queue onto sync queue.
1597 * Returns true if successful.
1598 * @param node the node
1599 * @return true if successfully transferred (else the node was
1600 * cancelled before signal).
1601 */
1602 final boolean transferForSignal(Node node) {
1603 /*
1604 * If cannot change waitStatus, the node has been cancelled.
1605 */
1606 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
1607 return false;
1608
1609 /*
1610 * Splice onto queue and try to set waitStatus of predecessor to
1611 * indicate that thread is (probably) waiting. If cancelled or
1612 * attempt to set waitStatus fails, wake up to resync (in which
1613 * case the waitStatus can be transiently and harmlessly wrong).
1614 */
1615 Node p = enq(node);
1616 int c = p.waitStatus;
1617 if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
1618 LockSupport.unpark(node.thread);
1619 return true;
1620 }
1621
1622 /**
1623 * Transfers node, if necessary, to sync queue after a cancelled
1624 * wait. Returns true if thread was cancelled before being
1625 * signalled.
1626 * @param current the waiting thread
1627 * @param node its node
1628 * @return true if cancelled before the node was signalled
1629 */
1630 final boolean transferAfterCancelledWait(Node node) {
1631 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
1632 enq(node);
1633 return true;
1634 }
1635 /*
1636 * If we lost out to a signal(), then we can't proceed
1637 * until it finishes its enq(). Cancelling during an
1638 * incomplete transfer is both rare and transient, so just
1639 * spin.
1640 */
1641 while (!isOnSyncQueue(node))
1642 Thread.yield();
1643 return false;
1644 }
1645
1646 /**
1647 * Invokes release with current state value; returns saved state.
1648 * Cancels node and throws exception on failure.
1649 * @param node the condition node for this wait
1650 * @return previous sync state
1651 */
1652 final int fullyRelease(Node node) {
1653 boolean failed = true;
1654 try {
1655 int savedState = getState();
1656 if (release(savedState)) {
1657 failed = false;
1658 return savedState;
1659 } else {
1660 throw new IllegalMonitorStateException();
1661 }
1662 } finally {
1663 if (failed)
1664 node.waitStatus = Node.CANCELLED;
1665 }
1666 }
1667
1668 // Instrumentation methods for conditions
1669
1670 /**
1671 * Queries whether the given ConditionObject
1672 * uses this synchronizer as its lock.
1673 *
1674 * @param condition the condition
1675 * @return <tt>true</tt> if owned
1676 * @throws NullPointerException if the condition is null
1677 */
1678 public final boolean owns(ConditionObject condition) {
1679 if (condition == null)
1680 throw new NullPointerException();
1681 return condition.isOwnedBy(this );
1682 }
1683
1684 /**
1685 * Queries whether any threads are waiting on the given condition
1686 * associated with this synchronizer. Note that because timeouts
1687 * and interrupts may occur at any time, a <tt>true</tt> return
1688 * does not guarantee that a future <tt>signal</tt> will awaken
1689 * any threads. This method is designed primarily for use in
1690 * monitoring of the system state.
1691 *
1692 * @param condition the condition
1693 * @return <tt>true</tt> if there are any waiting threads
1694 * @throws IllegalMonitorStateException if exclusive synchronization
1695 * is not held
1696 * @throws IllegalArgumentException if the given condition is
1697 * not associated with this synchronizer
1698 * @throws NullPointerException if the condition is null
1699 */
1700 public final boolean hasWaiters(ConditionObject condition) {
1701 if (!owns(condition))
1702 throw new IllegalArgumentException("Not owner");
1703 return condition.hasWaiters();
1704 }
1705
1706 /**
1707 * Returns an estimate of the number of threads waiting on the
1708 * given condition associated with this synchronizer. Note that
1709 * because timeouts and interrupts may occur at any time, the
1710 * estimate serves only as an upper bound on the actual number of
1711 * waiters. This method is designed for use in monitoring of the
1712 * system state, not for synchronization control.
1713 *
1714 * @param condition the condition
1715 * @return the estimated number of waiting threads
1716 * @throws IllegalMonitorStateException if exclusive synchronization
1717 * is not held
1718 * @throws IllegalArgumentException if the given condition is
1719 * not associated with this synchronizer
1720 * @throws NullPointerException if the condition is null
1721 */
1722 public final int getWaitQueueLength(ConditionObject condition) {
1723 if (!owns(condition))
1724 throw new IllegalArgumentException("Not owner");
1725 return condition.getWaitQueueLength();
1726 }
1727
1728 /**
1729 * Returns a collection containing those threads that may be
1730 * waiting on the given condition associated with this
1731 * synchronizer. Because the actual set of threads may change
1732 * dynamically while constructing this result, the returned
1733 * collection is only a best-effort estimate. The elements of the
1734 * returned collection are in no particular order.
1735 *
1736 * @param condition the condition
1737 * @return the collection of threads
1738 * @throws IllegalMonitorStateException if exclusive synchronization
1739 * is not held
1740 * @throws IllegalArgumentException if the given condition is
1741 * not associated with this synchronizer
1742 * @throws NullPointerException if the condition is null
1743 */
1744 public final Collection<Thread> getWaitingThreads(
1745 ConditionObject condition) {
1746 if (!owns(condition))
1747 throw new IllegalArgumentException("Not owner");
1748 return condition.getWaitingThreads();
1749 }
1750
1751 /**
1752 * Condition implementation for a {@link
1753 * AbstractQueuedSynchronizer} serving as the basis of a {@link
1754 * Lock} implementation.
1755 *
1756 * <p>Method documentation for this class describes mechanics,
1757 * not behavioral specifications from the point of view of Lock
1758 * and Condition users. Exported versions of this class will in
1759 * general need to be accompanied by documentation describing
1760 * condition semantics that rely on those of the associated
1761 * <tt>AbstractQueuedSynchronizer</tt>.
1762 *
1763 * <p>This class is Serializable, but all fields are transient,
1764 * so deserialized conditions have no waiters.
1765 */
1766 public class ConditionObject implements Condition,
1767 java.io.Serializable {
1768 private static final long serialVersionUID = 1173984872572414699L;
1769 /** First node of condition queue. */
1770 private transient Node firstWaiter;
1771 /** Last node of condition queue. */
1772 private transient Node lastWaiter;
1773
1774 /**
1775 * Creates a new <tt>ConditionObject</tt> instance.
1776 */
1777 public ConditionObject() {
1778 }
1779
1780 // Internal methods
1781
1782 /**
1783 * Adds a new waiter to wait queue.
1784 * @return its new wait node
1785 */
1786 private Node addConditionWaiter() {
1787 Node t = lastWaiter;
1788 // If lastWaiter is cancelled, clean out.
1789 if (t != null && t.waitStatus != Node.CONDITION) {
1790 unlinkCancelledWaiters();
1791 t = lastWaiter;
1792 }
1793 Node node = new Node(Thread.currentThread(), Node.CONDITION);
1794 if (t == null)
1795 firstWaiter = node;
1796 else
1797 t.nextWaiter = node;
1798 lastWaiter = node;
1799 return node;
1800 }
1801
1802 /**
1803 * Removes and transfers nodes until hit non-cancelled one or
1804 * null. Split out from signal in part to encourage compilers
1805 * to inline the case of no waiters.
1806 * @param first (non-null) the first node on condition queue
1807 */
1808 private void doSignal(Node first) {
1809 do {
1810 if ((firstWaiter = first.nextWaiter) == null)
1811 lastWaiter = null;
1812 first.nextWaiter = null;
1813 } while (!transferForSignal(first)
1814 && (first = firstWaiter) != null);
1815 }
1816
1817 /**
1818 * Removes and transfers all nodes.
1819 * @param first (non-null) the first node on condition queue
1820 */
1821 private void doSignalAll(Node first) {
1822 lastWaiter = firstWaiter = null;
1823 do {
1824 Node next = first.nextWaiter;
1825 first.nextWaiter = null;
1826 transferForSignal(first);
1827 first = next;
1828 } while (first != null);
1829 }
1830
1831 /**
1832 * Unlinks cancelled waiter nodes from condition queue.
1833 * Called only while holding lock. This is called when
1834 * cancellation occurred during condition wait, and upon
1835 * insertion of a new waiter when lastWaiter is seen to have
1836 * been cancelled. This method is needed to avoid garbage
1837 * retention in the absence of signals. So even though it may
1838 * require a full traversal, it comes into play only when
1839 * timeouts or cancellations occur in the absence of
1840 * signals. It traverses all nodes rather than stopping at a
1841 * particular target to unlink all pointers to garbage nodes
1842 * without requiring many re-traversals during cancellation
1843 * storms.
1844 */
1845 private void unlinkCancelledWaiters() {
1846 Node t = firstWaiter;
1847 Node trail = null;
1848 while (t != null) {
1849 Node next = t.nextWaiter;
1850 if (t.waitStatus != Node.CONDITION) {
1851 t.nextWaiter = null;
1852 if (trail == null)
1853 firstWaiter = next;
1854 else
1855 trail.nextWaiter = next;
1856 if (next == null)
1857 lastWaiter = trail;
1858 } else
1859 trail = t;
1860 t = next;
1861 }
1862 }
1863
1864 // public methods
1865
1866 /**
1867 * Moves the longest-waiting thread, if one exists, from the
1868 * wait queue for this condition to the wait queue for the
1869 * owning lock.
1870 *
1871 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1872 * returns {@code false}
1873 */
1874 public final void signal() {
1875 if (!isHeldExclusively())
1876 throw new IllegalMonitorStateException();
1877 Node first = firstWaiter;
1878 if (first != null)
1879 doSignal(first);
1880 }
1881
1882 /**
1883 * Moves all threads from the wait queue for this condition to
1884 * the wait queue for the owning lock.
1885 *
1886 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1887 * returns {@code false}
1888 */
1889 public final void signalAll() {
1890 if (!isHeldExclusively())
1891 throw new IllegalMonitorStateException();
1892 Node first = firstWaiter;
1893 if (first != null)
1894 doSignalAll(first);
1895 }
1896
1897 /**
1898 * Implements uninterruptible condition wait.
1899 * <ol>
1900 * <li> Save lock state returned by {@link #getState}.
1901 * <li> Invoke {@link #release} with
1902 * saved state as argument, throwing
1903 * IllegalMonitorStateException if it fails.
1904 * <li> Block until signalled.
1905 * <li> Reacquire by invoking specialized version of
1906 * {@link #acquire} with saved state as argument.
1907 * </ol>
1908 */
1909 public final void awaitUninterruptibly() {
1910 Node node = addConditionWaiter();
1911 int savedState = fullyRelease(node);
1912 boolean interrupted = false;
1913 while (!isOnSyncQueue(node)) {
1914 LockSupport.park(this );
1915 if (Thread.interrupted())
1916 interrupted = true;
1917 }
1918 if (acquireQueued(node, savedState) || interrupted)
1919 selfInterrupt();
1920 }
1921
1922 /*
1923 * For interruptible waits, we need to track whether to throw
1924 * InterruptedException, if interrupted while blocked on
1925 * condition, versus reinterrupt current thread, if
1926 * interrupted while blocked waiting to re-acquire.
1927 */
1928
1929 /** Mode meaning to reinterrupt on exit from wait */
1930 private static final int REINTERRUPT = 1;
1931 /** Mode meaning to throw InterruptedException on exit from wait */
1932 private static final int THROW_IE = -1;
1933
1934 /**
1935 * Checks for interrupt, returning THROW_IE if interrupted
1936 * before signalled, REINTERRUPT if after signalled, or
1937 * 0 if not interrupted.
1938 */
1939 private int checkInterruptWhileWaiting(Node node) {
1940 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE
1941 : REINTERRUPT)
1942 : 0;
1943 }
1944
1945 /**
1946 * Throws InterruptedException, reinterrupts current thread, or
1947 * does nothing, depending on mode.
1948 */
1949 private void reportInterruptAfterWait(int interruptMode)
1950 throws InterruptedException {
1951 if (interruptMode == THROW_IE)
1952 throw new InterruptedException();
1953 else if (interruptMode == REINTERRUPT)
1954 selfInterrupt();
1955 }
1956
1957 /**
1958 * Implements interruptible condition wait.
1959 * <ol>
1960 * <li> If current thread is interrupted, throw InterruptedException.
1961 * <li> Save lock state returned by {@link #getState}.
1962 * <li> Invoke {@link #release} with
1963 * saved state as argument, throwing
1964 * IllegalMonitorStateException if it fails.
1965 * <li> Block until signalled or interrupted.
1966 * <li> Reacquire by invoking specialized version of
1967 * {@link #acquire} with saved state as argument.
1968 * <li> If interrupted while blocked in step 4, throw InterruptedException.
1969 * </ol>
1970 */
1971 public final void await() throws InterruptedException {
1972 if (Thread.interrupted())
1973 throw new InterruptedException();
1974 Node node = addConditionWaiter();
1975 int savedState = fullyRelease(node);
1976 int interruptMode = 0;
1977 while (!isOnSyncQueue(node)) {
1978 LockSupport.park(this );
1979 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1980 break;
1981 }
1982 if (acquireQueued(node, savedState)
1983 && interruptMode != THROW_IE)
1984 interruptMode = REINTERRUPT;
1985 if (node.nextWaiter != null) // clean up if cancelled
1986 unlinkCancelledWaiters();
1987 if (interruptMode != 0)
1988 reportInterruptAfterWait(interruptMode);
1989 }
1990
1991 /**
1992 * Implements timed condition wait.
1993 * <ol>
1994 * <li> If current thread is interrupted, throw InterruptedException.
1995 * <li> Save lock state returned by {@link #getState}.
1996 * <li> Invoke {@link #release} with
1997 * saved state as argument, throwing
1998 * IllegalMonitorStateException if it fails.
1999 * <li> Block until signalled, interrupted, or timed out.
2000 * <li> Reacquire by invoking specialized version of
2001 * {@link #acquire} with saved state as argument.
2002 * <li> If interrupted while blocked in step 4, throw InterruptedException.
2003 * </ol>
2004 */
2005 public final long awaitNanos(long nanosTimeout)
2006 throws InterruptedException {
2007 if (Thread.interrupted())
2008 throw new InterruptedException();
2009 Node node = addConditionWaiter();
2010 int savedState = fullyRelease(node);
2011 long lastTime = System.nanoTime();
2012 int interruptMode = 0;
2013 while (!isOnSyncQueue(node)) {
2014 if (nanosTimeout <= 0L) {
2015 transferAfterCancelledWait(node);
2016 break;
2017 }
2018 LockSupport.parkNanos(this , nanosTimeout);
2019 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
2020 break;
2021
2022 long now = System.nanoTime();
2023 nanosTimeout -= now - lastTime;
2024 lastTime = now;
2025 }
2026 if (acquireQueued(node, savedState)
2027 && interruptMode != THROW_IE)
2028 interruptMode = REINTERRUPT;
2029 if (node.nextWaiter != null)
2030 unlinkCancelledWaiters();
2031 if (interruptMode != 0)
2032 reportInterruptAfterWait(interruptMode);
2033 return nanosTimeout - (System.nanoTime() - lastTime);
2034 }
2035
2036 /**
2037 * Implements absolute timed condition wait.
2038 * <ol>
2039 * <li> If current thread is interrupted, throw InterruptedException.
2040 * <li> Save lock state returned by {@link #getState}.
2041 * <li> Invoke {@link #release} with
2042 * saved state as argument, throwing
2043 * IllegalMonitorStateException if it fails.
2044 * <li> Block until signalled, interrupted, or timed out.
2045 * <li> Reacquire by invoking specialized version of
2046 * {@link #acquire} with saved state as argument.
2047 * <li> If interrupted while blocked in step 4, throw InterruptedException.
2048 * <li> If timed out while blocked in step 4, return false, else true.
2049 * </ol>
2050 */
2051 public final boolean awaitUntil(Date deadline)
2052 throws InterruptedException {
2053 if (deadline == null)
2054 throw new NullPointerException();
2055 long abstime = deadline.getTime();
2056 if (Thread.interrupted())
2057 throw new InterruptedException();
2058 Node node = addConditionWaiter();
2059 int savedState = fullyRelease(node);
2060 boolean timedout = false;
2061 int interruptMode = 0;
2062 while (!isOnSyncQueue(node)) {
2063 if (System.currentTimeMillis() > abstime) {
2064 timedout = transferAfterCancelledWait(node);
2065 break;
2066 }
2067 LockSupport.parkUntil(this , abstime);
2068 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
2069 break;
2070 }
2071 if (acquireQueued(node, savedState)
2072 && interruptMode != THROW_IE)
2073 interruptMode = REINTERRUPT;
2074 if (node.nextWaiter != null)
2075 unlinkCancelledWaiters();
2076 if (interruptMode != 0)
2077 reportInterruptAfterWait(interruptMode);
2078 return !timedout;
2079 }
2080
2081 /**
2082 * Implements timed condition wait.
2083 * <ol>
2084 * <li> If current thread is interrupted, throw InterruptedException.
2085 * <li> Save lock state returned by {@link #getState}.
2086 * <li> Invoke {@link #release} with
2087 * saved state as argument, throwing
2088 * IllegalMonitorStateException if it fails.
2089 * <li> Block until signalled, interrupted, or timed out.
2090 * <li> Reacquire by invoking specialized version of
2091 * {@link #acquire} with saved state as argument.
2092 * <li> If interrupted while blocked in step 4, throw InterruptedException.
2093 * <li> If timed out while blocked in step 4, return false, else true.
2094 * </ol>
2095 */
2096 public final boolean await(long time, TimeUnit unit)
2097 throws InterruptedException {
2098 if (unit == null)
2099 throw new NullPointerException();
2100 long nanosTimeout = unit.toNanos(time);
2101 if (Thread.interrupted())
2102 throw new InterruptedException();
2103 Node node = addConditionWaiter();
2104 int savedState = fullyRelease(node);
2105 long lastTime = System.nanoTime();
2106 boolean timedout = false;
2107 int interruptMode = 0;
2108 while (!isOnSyncQueue(node)) {
2109 if (nanosTimeout <= 0L) {
2110 timedout = transferAfterCancelledWait(node);
2111 break;
2112 }
2113 if (nanosTimeout >= spinForTimeoutThreshold)
2114 LockSupport.parkNanos(this , nanosTimeout);
2115 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
2116 break;
2117 long now = System.nanoTime();
2118 nanosTimeout -= now - lastTime;
2119 lastTime = now;
2120 }
2121 if (acquireQueued(node, savedState)
2122 && interruptMode != THROW_IE)
2123 interruptMode = REINTERRUPT;
2124 if (node.nextWaiter != null)
2125 unlinkCancelledWaiters();
2126 if (interruptMode != 0)
2127 reportInterruptAfterWait(interruptMode);
2128 return !timedout;
2129 }
2130
2131 // support for instrumentation
2132
2133 /**
2134 * Returns true if this condition was created by the given
2135 * synchronization object.
2136 *
2137 * @return {@code true} if owned
2138 */
2139 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
2140 return sync == AbstractQueuedSynchronizer.this ;
2141 }
2142
2143 /**
2144 * Queries whether any threads are waiting on this condition.
2145 * Implements {@link AbstractQueuedSynchronizer#hasWaiters}.
2146 *
2147 * @return {@code true} if there are any waiting threads
2148 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
2149 * returns {@code false}
2150 */
2151 protected final boolean hasWaiters() {
2152 if (!isHeldExclusively())
2153 throw new IllegalMonitorStateException();
2154 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
2155 if (w.waitStatus == Node.CONDITION)
2156 return true;
2157 }
2158 return false;
2159 }
2160
2161 /**
2162 * Returns an estimate of the number of threads waiting on
2163 * this condition.
2164 * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength}.
2165 *
2166 * @return the estimated number of waiting threads
2167 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
2168 * returns {@code false}
2169 */
2170 protected final int getWaitQueueLength() {
2171 if (!isHeldExclusively())
2172 throw new IllegalMonitorStateException();
2173 int n = 0;
2174 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
2175 if (w.waitStatus == Node.CONDITION)
2176 ++n;
2177 }
2178 return n;
2179 }
2180
2181 /**
2182 * Returns a collection containing those threads that may be
2183 * waiting on this Condition.
2184 * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads}.
2185 *
2186 * @return the collection of threads
2187 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
2188 * returns {@code false}
2189 */
2190 protected final Collection<Thread> getWaitingThreads() {
2191 if (!isHeldExclusively())
2192 throw new IllegalMonitorStateException();
2193 ArrayList<Thread> list = new ArrayList<Thread>();
2194 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
2195 if (w.waitStatus == Node.CONDITION) {
2196 Thread t = w.thread;
2197 if (t != null)
2198 list.add(t);
2199 }
2200 }
2201 return list;
2202 }
2203 }
2204
2205 /**
2206 * Setup to support compareAndSet. We need to natively implement
2207 * this here: For the sake of permitting future enhancements, we
2208 * cannot explicitly subclass AtomicInteger, which would be
2209 * efficient and useful otherwise. So, as the lesser of evils, we
2210 * natively implement using hotspot intrinsics API. And while we
2211 * are at it, we do the same for other CASable fields (which could
2212 * otherwise be done with atomic field updaters).
2213 */
2214 private static final Unsafe unsafe = Unsafe.getUnsafe();
2215 private static final long stateOffset;
2216 private static final long headOffset;
2217 private static final long tailOffset;
2218 private static final long waitStatusOffset;
2219 private static final long nextOffset;
2220
2221 static {
2222 try {
2223 stateOffset = unsafe
2224 .objectFieldOffset(AbstractQueuedSynchronizer.class
2225 .getDeclaredField("state"));
2226 headOffset = unsafe
2227 .objectFieldOffset(AbstractQueuedSynchronizer.class
2228 .getDeclaredField("head"));
2229 tailOffset = unsafe
2230 .objectFieldOffset(AbstractQueuedSynchronizer.class
2231 .getDeclaredField("tail"));
2232 waitStatusOffset = unsafe.objectFieldOffset(Node.class
2233 .getDeclaredField("waitStatus"));
2234 nextOffset = unsafe.objectFieldOffset(Node.class
2235 .getDeclaredField("next"));
2236
2237 } catch (Exception ex) {
2238 throw new Error(ex);
2239 }
2240 }
2241
2242 /**
2243 * CAS head field. Used only by enq.
2244 */
2245 private final boolean compareAndSetHead(Node update) {
2246 return unsafe.compareAndSwapObject(this , headOffset, null,
2247 update);
2248 }
2249
2250 /**
2251 * CAS tail field. Used only by enq.
2252 */
2253 private final boolean compareAndSetTail(Node expect, Node update) {
2254 return unsafe.compareAndSwapObject(this , tailOffset, expect,
2255 update);
2256 }
2257
2258 /**
2259 * CAS waitStatus field of a node.
2260 */
2261 private final static boolean compareAndSetWaitStatus(Node node,
2262 int expect, int update) {
2263 return unsafe.compareAndSwapInt(node, waitStatusOffset, expect,
2264 update);
2265 }
2266
2267 /**
2268 * CAS next field of a node.
2269 */
2270 private final static boolean compareAndSetNext(Node node,
2271 Node expect, Node update) {
2272 return unsafe.compareAndSwapObject(node, nextOffset, expect,
2273 update);
2274 }
2275 }
|