0001 /*
0002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
0003 *
0004 * This code is free software; you can redistribute it and/or modify it
0005 * under the terms of the GNU General Public License version 2 only, as
0006 * published by the Free Software Foundation. Sun designates this
0007 * particular file as subject to the "Classpath" exception as provided
0008 * by Sun in the LICENSE file that accompanied this code.
0009 *
0010 * This code is distributed in the hope that it will be useful, but WITHOUT
0011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
0012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
0013 * version 2 for more details (a copy is included in the LICENSE file that
0014 * accompanied this code).
0015 *
0016 * You should have received a copy of the GNU General Public License version
0017 * 2 along with this work; if not, write to the Free Software Foundation,
0018 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
0019 *
0020 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
0021 * CA 95054 USA or visit www.sun.com if you need additional information or
0022 * have any questions.
0023 */
0024
0025 /*
0026 * This file is available under and governed by the GNU General Public
0027 * License version 2 only, as published by the Free Software Foundation.
0028 * However, the following notice accompanied the original version of this
0029 * file:
0030 *
0031 * Written by Doug Lea with assistance from members of JCP JSR-166
0032 * Expert Group and released to the public domain, as explained at
0033 * http://creativecommons.org/licenses/publicdomain
0034 */
0035
0036 package java.util.concurrent.locks;
0037
0038 import java.util.*;
0039 import java.util.concurrent.*;
0040 import java.util.concurrent.atomic.*;
0041 import sun.misc.Unsafe;
0042
0043 /**
0044 * A version of {@link AbstractQueuedSynchronizer} in
0045 * which synchronization state is maintained as a <tt>long</tt>.
0046 * This class has exactly the same structure, properties, and methods
0047 * as <tt>AbstractQueuedSynchronizer</tt> with the exception
0048 * that all state-related parameters and results are defined
0049 * as <tt>long</tt> rather than <tt>int</tt>. This class
0050 * may be useful when creating synchronizers such as
0051 * multilevel locks and barriers that require
0052 * 64 bits of state.
0053 *
0054 * <p>See {@link AbstractQueuedSynchronizer} for usage
0055 * notes and examples.
0056 *
0057 * @since 1.6
0058 * @author Doug Lea
0059 */
0060 public abstract class AbstractQueuedLongSynchronizer extends
0061 AbstractOwnableSynchronizer implements java.io.Serializable {
0062
0063 private static final long serialVersionUID = 7373984972572414692L;
0064
0065 /*
0066 To keep sources in sync, the remainder of this source file is
0067 exactly cloned from AbstractQueuedSynchronizer, replacing class
0068 name and changing ints related with sync state to longs. Please
0069 keep it that way.
0070 */
0071
0072 /**
0073 * Creates a new <tt>AbstractQueuedLongSynchronizer</tt> instance
0074 * with initial synchronization state of zero.
0075 */
0076 protected AbstractQueuedLongSynchronizer() {
0077 }
0078
0079 /**
0080 * Wait queue node class.
0081 *
0082 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
0083 * Hagersten) lock queue. CLH locks are normally used for
0084 * spinlocks. We instead use them for blocking synchronizers, but
0085 * use the same basic tactic of holding some of the control
0086 * information about a thread in the predecessor of its node. A
0087 * "status" field in each node keeps track of whether a thread
0088 * should block. A node is signalled when its predecessor
0089 * releases. Each node of the queue otherwise serves as a
0090 * specific-notification-style monitor holding a single waiting
0091 * thread. The status field does NOT control whether threads are
0092 * granted locks etc though. A thread may try to acquire if it is
0093 * first in the queue. But being first does not guarantee success;
0094 * it only gives the right to contend. So the currently released
0095 * contender thread may need to rewait.
0096 *
0097 * <p>To enqueue into a CLH lock, you atomically splice it in as new
0098 * tail. To dequeue, you just set the head field.
0099 * <pre>
0100 * +------+ prev +-----+ +-----+
0101 * head | | <---- | | <---- | | tail
0102 * +------+ +-----+ +-----+
0103 * </pre>
0104 *
0105 * <p>Insertion into a CLH queue requires only a single atomic
0106 * operation on "tail", so there is a simple atomic point of
0107 * demarcation from unqueued to queued. Similarly, dequeing
0108 * involves only updating the "head". However, it takes a bit
0109 * more work for nodes to determine who their successors are,
0110 * in part to deal with possible cancellation due to timeouts
0111 * and interrupts.
0112 *
0113 * <p>The "prev" links (not used in original CLH locks), are mainly
0114 * needed to handle cancellation. If a node is cancelled, its
0115 * successor is (normally) relinked to a non-cancelled
0116 * predecessor. For explanation of similar mechanics in the case
0117 * of spin locks, see the papers by Scott and Scherer at
0118 * http://www.cs.rochester.edu/u/scott/synchronization/
0119 *
0120 * <p>We also use "next" links to implement blocking mechanics.
0121 * The thread id for each node is kept in its own node, so a
0122 * predecessor signals the next node to wake up by traversing
0123 * next link to determine which thread it is. Determination of
0124 * successor must avoid races with newly queued nodes to set
0125 * the "next" fields of their predecessors. This is solved
0126 * when necessary by checking backwards from the atomically
0127 * updated "tail" when a node's successor appears to be null.
0128 * (Or, said differently, the next-links are an optimization
0129 * so that we don't usually need a backward scan.)
0130 *
0131 * <p>Cancellation introduces some conservatism to the basic
0132 * algorithms. Since we must poll for cancellation of other
0133 * nodes, we can miss noticing whether a cancelled node is
0134 * ahead or behind us. This is dealt with by always unparking
0135 * successors upon cancellation, allowing them to stabilize on
0136 * a new predecessor, unless we can identify an uncancelled
0137 * predecessor who will carry this responsibility.
0138 *
0139 * <p>CLH queues need a dummy header node to get started. But
0140 * we don't create them on construction, because it would be wasted
0141 * effort if there is never contention. Instead, the node
0142 * is constructed and head and tail pointers are set upon first
0143 * contention.
0144 *
0145 * <p>Threads waiting on Conditions use the same nodes, but
0146 * use an additional link. Conditions only need to link nodes
0147 * in simple (non-concurrent) linked queues because they are
0148 * only accessed when exclusively held. Upon await, a node is
0149 * inserted into a condition queue. Upon signal, the node is
0150 * transferred to the main queue. A special value of status
0151 * field is used to mark which queue a node is on.
0152 *
0153 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
0154 * Scherer and Michael Scott, along with members of JSR-166
0155 * expert group, for helpful ideas, discussions, and critiques
0156 * on the design of this class.
0157 */
0158 static final class Node {
0159 /** Marker to indicate a node is waiting in shared mode */
0160 static final Node SHARED = new Node();
0161 /** Marker to indicate a node is waiting in exclusive mode */
0162 static final Node EXCLUSIVE = null;
0163
0164 /** waitStatus value to indicate thread has cancelled */
0165 static final int CANCELLED = 1;
0166 /** waitStatus value to indicate successor's thread needs unparking */
0167 static final int SIGNAL = -1;
0168 /** waitStatus value to indicate thread is waiting on condition */
0169 static final int CONDITION = -2;
0170
0171 /**
0172 * Status field, taking on only the values:
0173 * SIGNAL: The successor of this node is (or will soon be)
0174 * blocked (via park), so the current node must
0175 * unpark its successor when it releases or
0176 * cancels. To avoid races, acquire methods must
0177 * first indicate they need a signal,
0178 * then retry the atomic acquire, and then,
0179 * on failure, block.
0180 * CANCELLED: This node is cancelled due to timeout or interrupt.
0181 * Nodes never leave this state. In particular,
0182 * a thread with cancelled node never again blocks.
0183 * CONDITION: This node is currently on a condition queue.
0184 * It will not be used as a sync queue node until
0185 * transferred. (Use of this value here
0186 * has nothing to do with the other uses
0187 * of the field, but simplifies mechanics.)
0188 * 0: None of the above
0189 *
0190 * The values are arranged numerically to simplify use.
0191 * Non-negative values mean that a node doesn't need to
0192 * signal. So, most code doesn't need to check for particular
0193 * values, just for sign.
0194 *
0195 * The field is initialized to 0 for normal sync nodes, and
0196 * CONDITION for condition nodes. It is modified using CAS
0197 * (or when possible, unconditional volatile writes).
0198 */
0199 volatile int waitStatus;
0200
0201 /**
0202 * Link to predecessor node that current node/thread relies on
0203 * for checking waitStatus. Assigned during enqueing, and nulled
0204 * out (for sake of GC) only upon dequeuing. Also, upon
0205 * cancellation of a predecessor, we short-circuit while
0206 * finding a non-cancelled one, which will always exist
0207 * because the head node is never cancelled: A node becomes
0208 * head only as a result of successful acquire. A
0209 * cancelled thread never succeeds in acquiring, and a thread only
0210 * cancels itself, not any other node.
0211 */
0212 volatile Node prev;
0213
0214 /**
0215 * Link to the successor node that the current node/thread
0216 * unparks upon release. Assigned during enqueuing, adjusted
0217 * when bypassing cancelled predecessors, and nulled out (for
0218 * sake of GC) when dequeued. The enq operation does not
0219 * assign next field of a predecessor until after attachment,
0220 * so seeing a null next field does not necessarily mean that
0221 * node is at end of queue. However, if a next field appears
0222 * to be null, we can scan prev's from the tail to
0223 * double-check. The next field of cancelled nodes is set to
0224 * point to the node itself instead of null, to make life
0225 * easier for isOnSyncQueue.
0226 */
0227 volatile Node next;
0228
0229 /**
0230 * The thread that enqueued this node. Initialized on
0231 * construction and nulled out after use.
0232 */
0233 volatile Thread thread;
0234
0235 /**
0236 * Link to next node waiting on condition, or the special
0237 * value SHARED. Because condition queues are accessed only
0238 * when holding in exclusive mode, we just need a simple
0239 * linked queue to hold nodes while they are waiting on
0240 * conditions. They are then transferred to the queue to
0241 * re-acquire. And because conditions can only be exclusive,
0242 * we save a field by using special value to indicate shared
0243 * mode.
0244 */
0245 Node nextWaiter;
0246
0247 /**
0248 * Returns true if node is waiting in shared mode
0249 */
0250 final boolean isShared() {
0251 return nextWaiter == SHARED;
0252 }
0253
0254 /**
0255 * Returns previous node, or throws NullPointerException if null.
0256 * Use when predecessor cannot be null. The null check could
0257 * be elided, but is present to help the VM.
0258 *
0259 * @return the predecessor of this node
0260 */
0261 final Node predecessor() throws NullPointerException {
0262 Node p = prev;
0263 if (p == null)
0264 throw new NullPointerException();
0265 else
0266 return p;
0267 }
0268
0269 Node() { // Used to establish initial head or SHARED marker
0270 }
0271
0272 Node(Thread thread, Node mode) { // Used by addWaiter
0273 this .nextWaiter = mode;
0274 this .thread = thread;
0275 }
0276
0277 Node(Thread thread, int waitStatus) { // Used by Condition
0278 this .waitStatus = waitStatus;
0279 this .thread = thread;
0280 }
0281 }
0282
0283 /**
0284 * Head of the wait queue, lazily initialized. Except for
0285 * initialization, it is modified only via method setHead. Note:
0286 * If head exists, its waitStatus is guaranteed not to be
0287 * CANCELLED.
0288 */
0289 private transient volatile Node head;
0290
0291 /**
0292 * Tail of the wait queue, lazily initialized. Modified only via
0293 * method enq to add new wait node.
0294 */
0295 private transient volatile Node tail;
0296
0297 /**
0298 * The synchronization state.
0299 */
0300 private volatile long state;
0301
0302 /**
0303 * Returns the current value of synchronization state.
0304 * This operation has memory semantics of a <tt>volatile</tt> read.
0305 * @return current state value
0306 */
0307 protected final long getState() {
0308 return state;
0309 }
0310
0311 /**
0312 * Sets the value of synchronization state.
0313 * This operation has memory semantics of a <tt>volatile</tt> write.
0314 * @param newState the new state value
0315 */
0316 protected final void setState(long newState) {
0317 state = newState;
0318 }
0319
0320 /**
0321 * Atomically sets synchronization state to the given updated
0322 * value if the current state value equals the expected value.
0323 * This operation has memory semantics of a <tt>volatile</tt> read
0324 * and write.
0325 *
0326 * @param expect the expected value
0327 * @param update the new value
0328 * @return true if successful. False return indicates that the actual
0329 * value was not equal to the expected value.
0330 */
0331 protected final boolean compareAndSetState(long expect, long update) {
0332 // See below for intrinsics setup to support this
0333 return unsafe.compareAndSwapLong(this , stateOffset, expect,
0334 update);
0335 }
0336
0337 // Queuing utilities
0338
0339 /**
0340 * The number of nanoseconds for which it is faster to spin
0341 * rather than to use timed park. A rough estimate suffices
0342 * to improve responsiveness with very short timeouts.
0343 */
0344 static final long spinForTimeoutThreshold = 1000L;
0345
0346 /**
0347 * Inserts node into queue, initializing if necessary. See picture above.
0348 * @param node the node to insert
0349 * @return node's predecessor
0350 */
0351 private Node enq(final Node node) {
0352 for (;;) {
0353 Node t = tail;
0354 if (t == null) { // Must initialize
0355 if (compareAndSetHead(new Node()))
0356 tail = head;
0357 } else {
0358 node.prev = t;
0359 if (compareAndSetTail(t, node)) {
0360 t.next = node;
0361 return t;
0362 }
0363 }
0364 }
0365 }
0366
0367 /**
0368 * Creates and enqueues node for current thread and given mode.
0369 *
0370 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
0371 * @return the new node
0372 */
0373 private Node addWaiter(Node mode) {
0374 Node node = new Node(Thread.currentThread(), mode);
0375 // Try the fast path of enq; backup to full enq on failure
0376 Node pred = tail;
0377 if (pred != null) {
0378 node.prev = pred;
0379 if (compareAndSetTail(pred, node)) {
0380 pred.next = node;
0381 return node;
0382 }
0383 }
0384 enq(node);
0385 return node;
0386 }
0387
0388 /**
0389 * Sets head of queue to be node, thus dequeuing. Called only by
0390 * acquire methods. Also nulls out unused fields for sake of GC
0391 * and to suppress unnecessary signals and traversals.
0392 *
0393 * @param node the node
0394 */
0395 private void setHead(Node node) {
0396 head = node;
0397 node.thread = null;
0398 node.prev = null;
0399 }
0400
0401 /**
0402 * Wakes up node's successor, if one exists.
0403 *
0404 * @param node the node
0405 */
0406 private void unparkSuccessor(Node node) {
0407 /*
0408 * Try to clear status in anticipation of signalling. It is
0409 * OK if this fails or if status is changed by waiting thread.
0410 */
0411 compareAndSetWaitStatus(node, Node.SIGNAL, 0);
0412
0413 /*
0414 * Thread to unpark is held in successor, which is normally
0415 * just the next node. But if cancelled or apparently null,
0416 * traverse backwards from tail to find the actual
0417 * non-cancelled successor.
0418 */
0419 Node s = node.next;
0420 if (s == null || s.waitStatus > 0) {
0421 s = null;
0422 for (Node t = tail; t != null && t != node; t = t.prev)
0423 if (t.waitStatus <= 0)
0424 s = t;
0425 }
0426 if (s != null)
0427 LockSupport.unpark(s.thread);
0428 }
0429
0430 /**
0431 * Sets head of queue, and checks if successor may be waiting
0432 * in shared mode, if so propagating if propagate > 0.
0433 *
0434 * @param pred the node holding waitStatus for node
0435 * @param node the node
0436 * @param propagate the return value from a tryAcquireShared
0437 */
0438 private void setHeadAndPropagate(Node node, long propagate) {
0439 setHead(node);
0440 if (propagate > 0 && node.waitStatus != 0) {
0441 /*
0442 * Don't bother fully figuring out successor. If it
0443 * looks null, call unparkSuccessor anyway to be safe.
0444 */
0445 Node s = node.next;
0446 if (s == null || s.isShared())
0447 unparkSuccessor(node);
0448 }
0449 }
0450
0451 // Utilities for various versions of acquire
0452
0453 /**
0454 * Cancels an ongoing attempt to acquire.
0455 *
0456 * @param node the node
0457 */
0458 private void cancelAcquire(Node node) {
0459 // Ignore if node doesn't exist
0460 if (node == null)
0461 return;
0462
0463 node.thread = null;
0464
0465 // Skip cancelled predecessors
0466 Node pred = node.prev;
0467 while (pred.waitStatus > 0)
0468 node.prev = pred = pred.prev;
0469
0470 // Getting this before setting waitStatus ensures staleness
0471 Node predNext = pred.next;
0472
0473 // Can use unconditional write instead of CAS here
0474 node.waitStatus = Node.CANCELLED;
0475
0476 // If we are the tail, remove ourselves
0477 if (node == tail && compareAndSetTail(node, pred)) {
0478 compareAndSetNext(pred, predNext, null);
0479 } else {
0480 // If "active" predecessor found...
0481 if (pred != head
0482 && (pred.waitStatus == Node.SIGNAL || compareAndSetWaitStatus(
0483 pred, 0, Node.SIGNAL))
0484 && pred.thread != null) {
0485
0486 // If successor is active, set predecessor's next link
0487 Node next = node.next;
0488 if (next != null && next.waitStatus <= 0)
0489 compareAndSetNext(pred, predNext, next);
0490 } else {
0491 unparkSuccessor(node);
0492 }
0493
0494 node.next = node; // help GC
0495 }
0496 }
0497
0498 /**
0499 * Checks and updates status for a node that failed to acquire.
0500 * Returns true if thread should block. This is the main signal
0501 * control in all acquire loops. Requires that pred == node.prev
0502 *
0503 * @param pred node's predecessor holding status
0504 * @param node the node
0505 * @return {@code true} if thread should block
0506 */
0507 private static boolean shouldParkAfterFailedAcquire(Node pred,
0508 Node node) {
0509 int s = pred.waitStatus;
0510 if (s < 0)
0511 /*
0512 * This node has already set status asking a release
0513 * to signal it, so it can safely park.
0514 */
0515 return true;
0516 if (s > 0) {
0517 /*
0518 * Predecessor was cancelled. Skip over predecessors and
0519 * indicate retry.
0520 */
0521 do {
0522 node.prev = pred = pred.prev;
0523 } while (pred.waitStatus > 0);
0524 pred.next = node;
0525 } else
0526 /*
0527 * Indicate that we need a signal, but don't park yet. Caller
0528 * will need to retry to make sure it cannot acquire before
0529 * parking.
0530 */
0531 compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
0532 return false;
0533 }
0534
0535 /**
0536 * Convenience method to interrupt current thread.
0537 */
0538 private static void selfInterrupt() {
0539 Thread.currentThread().interrupt();
0540 }
0541
0542 /**
0543 * Convenience method to park and then check if interrupted
0544 *
0545 * @return {@code true} if interrupted
0546 */
0547 private final boolean parkAndCheckInterrupt() {
0548 LockSupport.park(this );
0549 return Thread.interrupted();
0550 }
0551
0552 /*
0553 * Various flavors of acquire, varying in exclusive/shared and
0554 * control modes. Each is mostly the same, but annoyingly
0555 * different. Only a little bit of factoring is possible due to
0556 * interactions of exception mechanics (including ensuring that we
0557 * cancel if tryAcquire throws exception) and other control, at
0558 * least not without hurting performance too much.
0559 */
0560
0561 /**
0562 * Acquires in exclusive uninterruptible mode for thread already in
0563 * queue. Used by condition wait methods as well as acquire.
0564 *
0565 * @param node the node
0566 * @param arg the acquire argument
0567 * @return {@code true} if interrupted while waiting
0568 */
0569 final boolean acquireQueued(final Node node, long arg) {
0570 boolean failed = true;
0571 try {
0572 boolean interrupted = false;
0573 for (;;) {
0574 final Node p = node.predecessor();
0575 if (p == head && tryAcquire(arg)) {
0576 setHead(node);
0577 p.next = null; // help GC
0578 failed = false;
0579 return interrupted;
0580 }
0581 if (shouldParkAfterFailedAcquire(p, node)
0582 && parkAndCheckInterrupt())
0583 interrupted = true;
0584 }
0585 } finally {
0586 if (failed)
0587 cancelAcquire(node);
0588 }
0589 }
0590
0591 /**
0592 * Acquires in exclusive interruptible mode.
0593 * @param arg the acquire argument
0594 */
0595 private void doAcquireInterruptibly(long arg)
0596 throws InterruptedException {
0597 final Node node = addWaiter(Node.EXCLUSIVE);
0598 boolean failed = true;
0599 try {
0600 for (;;) {
0601 final Node p = node.predecessor();
0602 if (p == head && tryAcquire(arg)) {
0603 setHead(node);
0604 p.next = null; // help GC
0605 failed = false;
0606 return;
0607 }
0608 if (shouldParkAfterFailedAcquire(p, node)
0609 && parkAndCheckInterrupt())
0610 throw new InterruptedException();
0611 }
0612 } finally {
0613 if (failed)
0614 cancelAcquire(node);
0615 }
0616 }
0617
0618 /**
0619 * Acquires in exclusive timed mode.
0620 *
0621 * @param arg the acquire argument
0622 * @param nanosTimeout max wait time
0623 * @return {@code true} if acquired
0624 */
0625 private boolean doAcquireNanos(long arg, long nanosTimeout)
0626 throws InterruptedException {
0627 long lastTime = System.nanoTime();
0628 final Node node = addWaiter(Node.EXCLUSIVE);
0629 boolean failed = true;
0630 try {
0631 for (;;) {
0632 final Node p = node.predecessor();
0633 if (p == head && tryAcquire(arg)) {
0634 setHead(node);
0635 p.next = null; // help GC
0636 failed = false;
0637 return true;
0638 }
0639 if (nanosTimeout <= 0)
0640 return false;
0641 if (shouldParkAfterFailedAcquire(p, node)
0642 && nanosTimeout > spinForTimeoutThreshold)
0643 LockSupport.parkNanos(this , nanosTimeout);
0644 long now = System.nanoTime();
0645 nanosTimeout -= now - lastTime;
0646 lastTime = now;
0647 if (Thread.interrupted())
0648 throw new InterruptedException();
0649 }
0650 } finally {
0651 if (failed)
0652 cancelAcquire(node);
0653 }
0654 }
0655
0656 /**
0657 * Acquires in shared uninterruptible mode.
0658 * @param arg the acquire argument
0659 */
0660 private void doAcquireShared(long arg) {
0661 final Node node = addWaiter(Node.SHARED);
0662 boolean failed = true;
0663 try {
0664 boolean interrupted = false;
0665 for (;;) {
0666 final Node p = node.predecessor();
0667 if (p == head) {
0668 long r = tryAcquireShared(arg);
0669 if (r >= 0) {
0670 setHeadAndPropagate(node, r);
0671 p.next = null; // help GC
0672 if (interrupted)
0673 selfInterrupt();
0674 failed = false;
0675 return;
0676 }
0677 }
0678 if (shouldParkAfterFailedAcquire(p, node)
0679 && parkAndCheckInterrupt())
0680 interrupted = true;
0681 }
0682 } finally {
0683 if (failed)
0684 cancelAcquire(node);
0685 }
0686 }
0687
0688 /**
0689 * Acquires in shared interruptible mode.
0690 * @param arg the acquire argument
0691 */
0692 private void doAcquireSharedInterruptibly(long arg)
0693 throws InterruptedException {
0694 final Node node = addWaiter(Node.SHARED);
0695 boolean failed = true;
0696 try {
0697 for (;;) {
0698 final Node p = node.predecessor();
0699 if (p == head) {
0700 long r = tryAcquireShared(arg);
0701 if (r >= 0) {
0702 setHeadAndPropagate(node, r);
0703 p.next = null; // help GC
0704 failed = false;
0705 return;
0706 }
0707 }
0708 if (shouldParkAfterFailedAcquire(p, node)
0709 && parkAndCheckInterrupt())
0710 throw new InterruptedException();
0711 }
0712 } finally {
0713 if (failed)
0714 cancelAcquire(node);
0715 }
0716 }
0717
0718 /**
0719 * Acquires in shared timed mode.
0720 *
0721 * @param arg the acquire argument
0722 * @param nanosTimeout max wait time
0723 * @return {@code true} if acquired
0724 */
0725 private boolean doAcquireSharedNanos(long arg, long nanosTimeout)
0726 throws InterruptedException {
0727
0728 long lastTime = System.nanoTime();
0729 final Node node = addWaiter(Node.SHARED);
0730 boolean failed = true;
0731 try {
0732 for (;;) {
0733 final Node p = node.predecessor();
0734 if (p == head) {
0735 long r = tryAcquireShared(arg);
0736 if (r >= 0) {
0737 setHeadAndPropagate(node, r);
0738 p.next = null; // help GC
0739 failed = false;
0740 return true;
0741 }
0742 }
0743 if (nanosTimeout <= 0)
0744 return false;
0745 if (shouldParkAfterFailedAcquire(p, node)
0746 && nanosTimeout > spinForTimeoutThreshold)
0747 LockSupport.parkNanos(this , nanosTimeout);
0748 long now = System.nanoTime();
0749 nanosTimeout -= now - lastTime;
0750 lastTime = now;
0751 if (Thread.interrupted())
0752 throw new InterruptedException();
0753 }
0754 } finally {
0755 if (failed)
0756 cancelAcquire(node);
0757 }
0758 }
0759
0760 // Main exported methods
0761
0762 /**
0763 * Attempts to acquire in exclusive mode. This method should query
0764 * if the state of the object permits it to be acquired in the
0765 * exclusive mode, and if so to acquire it.
0766 *
0767 * <p>This method is always invoked by the thread performing
0768 * acquire. If this method reports failure, the acquire method
0769 * may queue the thread, if it is not already queued, until it is
0770 * signalled by a release from some other thread. This can be used
0771 * to implement method {@link Lock#tryLock()}.
0772 *
0773 * <p>The default
0774 * implementation throws {@link UnsupportedOperationException}.
0775 *
0776 * @param arg the acquire argument. This value is always the one
0777 * passed to an acquire method, or is the value saved on entry
0778 * to a condition wait. The value is otherwise uninterpreted
0779 * and can represent anything you like.
0780 * @return {@code true} if successful. Upon success, this object has
0781 * been acquired.
0782 * @throws IllegalMonitorStateException if acquiring would place this
0783 * synchronizer in an illegal state. This exception must be
0784 * thrown in a consistent fashion for synchronization to work
0785 * correctly.
0786 * @throws UnsupportedOperationException if exclusive mode is not supported
0787 */
0788 protected boolean tryAcquire(long arg) {
0789 throw new UnsupportedOperationException();
0790 }
0791
0792 /**
0793 * Attempts to set the state to reflect a release in exclusive
0794 * mode.
0795 *
0796 * <p>This method is always invoked by the thread performing release.
0797 *
0798 * <p>The default implementation throws
0799 * {@link UnsupportedOperationException}.
0800 *
0801 * @param arg the release argument. This value is always the one
0802 * passed to a release method, or the current state value upon
0803 * entry to a condition wait. The value is otherwise
0804 * uninterpreted and can represent anything you like.
0805 * @return {@code true} if this object is now in a fully released
0806 * state, so that any waiting threads may attempt to acquire;
0807 * and {@code false} otherwise.
0808 * @throws IllegalMonitorStateException if releasing would place this
0809 * synchronizer in an illegal state. This exception must be
0810 * thrown in a consistent fashion for synchronization to work
0811 * correctly.
0812 * @throws UnsupportedOperationException if exclusive mode is not supported
0813 */
0814 protected boolean tryRelease(long arg) {
0815 throw new UnsupportedOperationException();
0816 }
0817
0818 /**
0819 * Attempts to acquire in shared mode. This method should query if
0820 * the state of the object permits it to be acquired in the shared
0821 * mode, and if so to acquire it.
0822 *
0823 * <p>This method is always invoked by the thread performing
0824 * acquire. If this method reports failure, the acquire method
0825 * may queue the thread, if it is not already queued, until it is
0826 * signalled by a release from some other thread.
0827 *
0828 * <p>The default implementation throws {@link
0829 * UnsupportedOperationException}.
0830 *
0831 * @param arg the acquire argument. This value is always the one
0832 * passed to an acquire method, or is the value saved on entry
0833 * to a condition wait. The value is otherwise uninterpreted
0834 * and can represent anything you like.
0835 * @return a negative value on failure; zero if acquisition in shared
0836 * mode succeeded but no subsequent shared-mode acquire can
0837 * succeed; and a positive value if acquisition in shared
0838 * mode succeeded and subsequent shared-mode acquires might
0839 * also succeed, in which case a subsequent waiting thread
0840 * must check availability. (Support for three different
0841 * return values enables this method to be used in contexts
0842 * where acquires only sometimes act exclusively.) Upon
0843 * success, this object has been acquired.
0844 * @throws IllegalMonitorStateException if acquiring would place this
0845 * synchronizer in an illegal state. This exception must be
0846 * thrown in a consistent fashion for synchronization to work
0847 * correctly.
0848 * @throws UnsupportedOperationException if shared mode is not supported
0849 */
0850 protected long tryAcquireShared(long arg) {
0851 throw new UnsupportedOperationException();
0852 }
0853
0854 /**
0855 * Attempts to set the state to reflect a release in shared mode.
0856 *
0857 * <p>This method is always invoked by the thread performing release.
0858 *
0859 * <p>The default implementation throws
0860 * {@link UnsupportedOperationException}.
0861 *
0862 * @param arg the release argument. This value is always the one
0863 * passed to a release method, or the current state value upon
0864 * entry to a condition wait. The value is otherwise
0865 * uninterpreted and can represent anything you like.
0866 * @return {@code true} if this release of shared mode may permit a
0867 * waiting acquire (shared or exclusive) to succeed; and
0868 * {@code false} otherwise
0869 * @throws IllegalMonitorStateException if releasing would place this
0870 * synchronizer in an illegal state. This exception must be
0871 * thrown in a consistent fashion for synchronization to work
0872 * correctly.
0873 * @throws UnsupportedOperationException if shared mode is not supported
0874 */
0875 protected boolean tryReleaseShared(long arg) {
0876 throw new UnsupportedOperationException();
0877 }
0878
0879 /**
0880 * Returns {@code true} if synchronization is held exclusively with
0881 * respect to the current (calling) thread. This method is invoked
0882 * upon each call to a non-waiting {@link ConditionObject} method.
0883 * (Waiting methods instead invoke {@link #release}.)
0884 *
0885 * <p>The default implementation throws {@link
0886 * UnsupportedOperationException}. This method is invoked
0887 * internally only within {@link ConditionObject} methods, so need
0888 * not be defined if conditions are not used.
0889 *
0890 * @return {@code true} if synchronization is held exclusively;
0891 * {@code false} otherwise
0892 * @throws UnsupportedOperationException if conditions are not supported
0893 */
0894 protected boolean isHeldExclusively() {
0895 throw new UnsupportedOperationException();
0896 }
0897
0898 /**
0899 * Acquires in exclusive mode, ignoring interrupts. Implemented
0900 * by invoking at least once {@link #tryAcquire},
0901 * returning on success. Otherwise the thread is queued, possibly
0902 * repeatedly blocking and unblocking, invoking {@link
0903 * #tryAcquire} until success. This method can be used
0904 * to implement method {@link Lock#lock}.
0905 *
0906 * @param arg the acquire argument. This value is conveyed to
0907 * {@link #tryAcquire} but is otherwise uninterpreted and
0908 * can represent anything you like.
0909 */
0910 public final void acquire(long arg) {
0911 if (!tryAcquire(arg)
0912 && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
0913 selfInterrupt();
0914 }
0915
0916 /**
0917 * Acquires in exclusive mode, aborting if interrupted.
0918 * Implemented by first checking interrupt status, then invoking
0919 * at least once {@link #tryAcquire}, returning on
0920 * success. Otherwise the thread is queued, possibly repeatedly
0921 * blocking and unblocking, invoking {@link #tryAcquire}
0922 * until success or the thread is interrupted. This method can be
0923 * used to implement method {@link Lock#lockInterruptibly}.
0924 *
0925 * @param arg the acquire argument. This value is conveyed to
0926 * {@link #tryAcquire} but is otherwise uninterpreted and
0927 * can represent anything you like.
0928 * @throws InterruptedException if the current thread is interrupted
0929 */
0930 public final void acquireInterruptibly(long arg)
0931 throws InterruptedException {
0932 if (Thread.interrupted())
0933 throw new InterruptedException();
0934 if (!tryAcquire(arg))
0935 doAcquireInterruptibly(arg);
0936 }
0937
0938 /**
0939 * Attempts to acquire in exclusive mode, aborting if interrupted,
0940 * and failing if the given timeout elapses. Implemented by first
0941 * checking interrupt status, then invoking at least once {@link
0942 * #tryAcquire}, returning on success. Otherwise, the thread is
0943 * queued, possibly repeatedly blocking and unblocking, invoking
0944 * {@link #tryAcquire} until success or the thread is interrupted
0945 * or the timeout elapses. This method can be used to implement
0946 * method {@link Lock#tryLock(long, TimeUnit)}.
0947 *
0948 * @param arg the acquire argument. This value is conveyed to
0949 * {@link #tryAcquire} but is otherwise uninterpreted and
0950 * can represent anything you like.
0951 * @param nanosTimeout the maximum number of nanoseconds to wait
0952 * @return {@code true} if acquired; {@code false} if timed out
0953 * @throws InterruptedException if the current thread is interrupted
0954 */
0955 public final boolean tryAcquireNanos(long arg, long nanosTimeout)
0956 throws InterruptedException {
0957 if (Thread.interrupted())
0958 throw new InterruptedException();
0959 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
0960 }
0961
0962 /**
0963 * Releases in exclusive mode. Implemented by unblocking one or
0964 * more threads if {@link #tryRelease} returns true.
0965 * This method can be used to implement method {@link Lock#unlock}.
0966 *
0967 * @param arg the release argument. This value is conveyed to
0968 * {@link #tryRelease} but is otherwise uninterpreted and
0969 * can represent anything you like.
0970 * @return the value returned from {@link #tryRelease}
0971 */
0972 public final boolean release(long arg) {
0973 if (tryRelease(arg)) {
0974 Node h = head;
0975 if (h != null && h.waitStatus != 0)
0976 unparkSuccessor(h);
0977 return true;
0978 }
0979 return false;
0980 }
0981
0982 /**
0983 * Acquires in shared mode, ignoring interrupts. Implemented by
0984 * first invoking at least once {@link #tryAcquireShared},
0985 * returning on success. Otherwise the thread is queued, possibly
0986 * repeatedly blocking and unblocking, invoking {@link
0987 * #tryAcquireShared} until success.
0988 *
0989 * @param arg the acquire argument. This value is conveyed to
0990 * {@link #tryAcquireShared} but is otherwise uninterpreted
0991 * and can represent anything you like.
0992 */
0993 public final void acquireShared(long arg) {
0994 if (tryAcquireShared(arg) < 0)
0995 doAcquireShared(arg);
0996 }
0997
0998 /**
0999 * Acquires in shared mode, aborting if interrupted. Implemented
1000 * by first checking interrupt status, then invoking at least once
1001 * {@link #tryAcquireShared}, returning on success. Otherwise the
1002 * thread is queued, possibly repeatedly blocking and unblocking,
1003 * invoking {@link #tryAcquireShared} until success or the thread
1004 * is interrupted.
1005 * @param arg the acquire argument
1006 * This value is conveyed to {@link #tryAcquireShared} but is
1007 * otherwise uninterpreted and can represent anything
1008 * you like.
1009 * @throws InterruptedException if the current thread is interrupted
1010 */
1011 public final void acquireSharedInterruptibly(long arg)
1012 throws InterruptedException {
1013 if (Thread.interrupted())
1014 throw new InterruptedException();
1015 if (tryAcquireShared(arg) < 0)
1016 doAcquireSharedInterruptibly(arg);
1017 }
1018
1019 /**
1020 * Attempts to acquire in shared mode, aborting if interrupted, and
1021 * failing if the given timeout elapses. Implemented by first
1022 * checking interrupt status, then invoking at least once {@link
1023 * #tryAcquireShared}, returning on success. Otherwise, the
1024 * thread is queued, possibly repeatedly blocking and unblocking,
1025 * invoking {@link #tryAcquireShared} until success or the thread
1026 * is interrupted or the timeout elapses.
1027 *
1028 * @param arg the acquire argument. This value is conveyed to
1029 * {@link #tryAcquireShared} but is otherwise uninterpreted
1030 * and can represent anything you like.
1031 * @param nanosTimeout the maximum number of nanoseconds to wait
1032 * @return {@code true} if acquired; {@code false} if timed out
1033 * @throws InterruptedException if the current thread is interrupted
1034 */
1035 public final boolean tryAcquireSharedNanos(long arg,
1036 long nanosTimeout) throws InterruptedException {
1037 if (Thread.interrupted())
1038 throw new InterruptedException();
1039 return tryAcquireShared(arg) >= 0
1040 || doAcquireSharedNanos(arg, nanosTimeout);
1041 }
1042
1043 /**
1044 * Releases in shared mode. Implemented by unblocking one or more
1045 * threads if {@link #tryReleaseShared} returns true.
1046 *
1047 * @param arg the release argument. This value is conveyed to
1048 * {@link #tryReleaseShared} but is otherwise uninterpreted
1049 * and can represent anything you like.
1050 * @return the value returned from {@link #tryReleaseShared}
1051 */
1052 public final boolean releaseShared(long arg) {
1053 if (tryReleaseShared(arg)) {
1054 Node h = head;
1055 if (h != null && h.waitStatus != 0)
1056 unparkSuccessor(h);
1057 return true;
1058 }
1059 return false;
1060 }
1061
1062 // Queue inspection methods
1063
1064 /**
1065 * Queries whether any threads are waiting to acquire. Note that
1066 * because cancellations due to interrupts and timeouts may occur
1067 * at any time, a {@code true} return does not guarantee that any
1068 * other thread will ever acquire.
1069 *
1070 * <p>In this implementation, this operation returns in
1071 * constant time.
1072 *
1073 * @return {@code true} if there may be other threads waiting to acquire
1074 */
1075 public final boolean hasQueuedThreads() {
1076 return head != tail;
1077 }
1078
1079 /**
1080 * Queries whether any threads have ever contended to acquire this
1081 * synchronizer; that is if an acquire method has ever blocked.
1082 *
1083 * <p>In this implementation, this operation returns in
1084 * constant time.
1085 *
1086 * @return {@code true} if there has ever been contention
1087 */
1088 public final boolean hasContended() {
1089 return head != null;
1090 }
1091
1092 /**
1093 * Returns the first (longest-waiting) thread in the queue, or
1094 * {@code null} if no threads are currently queued.
1095 *
1096 * <p>In this implementation, this operation normally returns in
1097 * constant time, but may iterate upon contention if other threads are
1098 * concurrently modifying the queue.
1099 *
1100 * @return the first (longest-waiting) thread in the queue, or
1101 * {@code null} if no threads are currently queued
1102 */
1103 public final Thread getFirstQueuedThread() {
1104 // handle only fast path, else relay
1105 return (head == tail) ? null : fullGetFirstQueuedThread();
1106 }
1107
1108 /**
1109 * Version of getFirstQueuedThread called when fastpath fails
1110 */
1111 private Thread fullGetFirstQueuedThread() {
1112 /*
1113 * The first node is normally head.next. Try to get its
1114 * thread field, ensuring consistent reads: If thread
1115 * field is nulled out or s.prev is no longer head, then
1116 * some other thread(s) concurrently performed setHead in
1117 * between some of our reads. We try this twice before
1118 * resorting to traversal.
1119 */
1120 Node h, s;
1121 Thread st;
1122 if (((h = head) != null && (s = h.next) != null
1123 && s.prev == head && (st = s.thread) != null)
1124 || ((h = head) != null && (s = h.next) != null
1125 && s.prev == head && (st = s.thread) != null))
1126 return st;
1127
1128 /*
1129 * Head's next field might not have been set yet, or may have
1130 * been unset after setHead. So we must check to see if tail
1131 * is actually first node. If not, we continue on, safely
1132 * traversing from tail back to head to find first,
1133 * guaranteeing termination.
1134 */
1135
1136 Node t = tail;
1137 Thread firstThread = null;
1138 while (t != null && t != head) {
1139 Thread tt = t.thread;
1140 if (tt != null)
1141 firstThread = tt;
1142 t = t.prev;
1143 }
1144 return firstThread;
1145 }
1146
1147 /**
1148 * Returns true if the given thread is currently queued.
1149 *
1150 * <p>This implementation traverses the queue to determine
1151 * presence of the given thread.
1152 *
1153 * @param thread the thread
1154 * @return {@code true} if the given thread is on the queue
1155 * @throws NullPointerException if the thread is null
1156 */
1157 public final boolean isQueued(Thread thread) {
1158 if (thread == null)
1159 throw new NullPointerException();
1160 for (Node p = tail; p != null; p = p.prev)
1161 if (p.thread == thread)
1162 return true;
1163 return false;
1164 }
1165
1166 /**
1167 * Returns {@code true} if the apparent first queued thread, if one
1168 * exists, is waiting in exclusive mode. If this method returns
1169 * {@code true}, and the current thread is attempting to acquire in
1170 * shared mode (that is, this method is invoked from {@link
1171 * #tryAcquireShared}) then it is guaranteed that the current thread
1172 * is not the first queued thread. Used only as a heuristic in
1173 * ReentrantReadWriteLock.
1174 */
1175 final boolean apparentlyFirstQueuedIsExclusive() {
1176 Node h, s;
1177 return (h = head) != null && (s = h.next) != null
1178 && !s.isShared() && s.thread != null;
1179 }
1180
1181 /**
1182 * Queries whether any threads have been waiting to acquire longer
1183 * than the current thread.
1184 *
1185 * <p>An invocation of this method is equivalent to (but may be
1186 * more efficient than):
1187 * <pre> {@code
1188 * getFirstQueuedThread() != Thread.currentThread() &&
1189 * hasQueuedThreads()}</pre>
1190 *
1191 * <p>Note that because cancellations due to interrupts and
1192 * timeouts may occur at any time, a {@code true} return does not
1193 * guarantee that some other thread will acquire before the current
1194 * thread. Likewise, it is possible for another thread to win a
1195 * race to enqueue after this method has returned {@code false},
1196 * due to the queue being empty.
1197 *
1198 * <p>This method is designed to be used by a fair synchronizer to
1199 * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
1200 * Such a synchronizer's {@link #tryAcquire} method should return
1201 * {@code false}, and its {@link #tryAcquireShared} method should
1202 * return a negative value, if this method returns {@code true}
1203 * (unless this is a reentrant acquire). For example, the {@code
1204 * tryAcquire} method for a fair, reentrant, exclusive mode
1205 * synchronizer might look like this:
1206 *
1207 * <pre> {@code
1208 * protected boolean tryAcquire(int arg) {
1209 * if (isHeldExclusively()) {
1210 * // A reentrant acquire; increment hold count
1211 * return true;
1212 * } else if (hasQueuedPredecessors()) {
1213 * return false;
1214 * } else {
1215 * // try to acquire normally
1216 * }
1217 * }}</pre>
1218 *
1219 * @return {@code true} if there is a queued thread preceding the
1220 * current thread, and {@code false} if the current thread
1221 * is at the head of the queue or the queue is empty
1222 * @since 1.7
1223 */
1224 public final boolean hasQueuedPredecessors() {
1225 // The correctness of this depends on head being initialized
1226 // before tail and on head.next being accurate if the current
1227 // thread is first in queue.
1228 Node h, s;
1229 return (h = head) != tail
1230 && ((s = h.next) == null || s.thread != Thread
1231 .currentThread());
1232 }
1233
1234 // Instrumentation and monitoring methods
1235
1236 /**
1237 * Returns an estimate of the number of threads waiting to
1238 * acquire. The value is only an estimate because the number of
1239 * threads may change dynamically while this method traverses
1240 * internal data structures. This method is designed for use in
1241 * monitoring system state, not for synchronization
1242 * control.
1243 *
1244 * @return the estimated number of threads waiting to acquire
1245 */
1246 public final int getQueueLength() {
1247 int n = 0;
1248 for (Node p = tail; p != null; p = p.prev) {
1249 if (p.thread != null)
1250 ++n;
1251 }
1252 return n;
1253 }
1254
1255 /**
1256 * Returns a collection containing threads that may be waiting to
1257 * acquire. Because the actual set of threads may change
1258 * dynamically while constructing this result, the returned
1259 * collection is only a best-effort estimate. The elements of the
1260 * returned collection are in no particular order. This method is
1261 * designed to facilitate construction of subclasses that provide
1262 * more extensive monitoring facilities.
1263 *
1264 * @return the collection of threads
1265 */
1266 public final Collection<Thread> getQueuedThreads() {
1267 ArrayList<Thread> list = new ArrayList<Thread>();
1268 for (Node p = tail; p != null; p = p.prev) {
1269 Thread t = p.thread;
1270 if (t != null)
1271 list.add(t);
1272 }
1273 return list;
1274 }
1275
1276 /**
1277 * Returns a collection containing threads that may be waiting to
1278 * acquire in exclusive mode. This has the same properties
1279 * as {@link #getQueuedThreads} except that it only returns
1280 * those threads waiting due to an exclusive acquire.
1281 *
1282 * @return the collection of threads
1283 */
1284 public final Collection<Thread> getExclusiveQueuedThreads() {
1285 ArrayList<Thread> list = new ArrayList<Thread>();
1286 for (Node p = tail; p != null; p = p.prev) {
1287 if (!p.isShared()) {
1288 Thread t = p.thread;
1289 if (t != null)
1290 list.add(t);
1291 }
1292 }
1293 return list;
1294 }
1295
1296 /**
1297 * Returns a collection containing threads that may be waiting to
1298 * acquire in shared mode. This has the same properties
1299 * as {@link #getQueuedThreads} except that it only returns
1300 * those threads waiting due to a shared acquire.
1301 *
1302 * @return the collection of threads
1303 */
1304 public final Collection<Thread> getSharedQueuedThreads() {
1305 ArrayList<Thread> list = new ArrayList<Thread>();
1306 for (Node p = tail; p != null; p = p.prev) {
1307 if (p.isShared()) {
1308 Thread t = p.thread;
1309 if (t != null)
1310 list.add(t);
1311 }
1312 }
1313 return list;
1314 }
1315
1316 /**
1317 * Returns a string identifying this synchronizer, as well as its state.
1318 * The state, in brackets, includes the String {@code "State ="}
1319 * followed by the current value of {@link #getState}, and either
1320 * {@code "nonempty"} or {@code "empty"} depending on whether the
1321 * queue is empty.
1322 *
1323 * @return a string identifying this synchronizer, as well as its state
1324 */
1325 public String toString() {
1326 long s = getState();
1327 String q = hasQueuedThreads() ? "non" : "";
1328 return super .toString() + "[State = " + s + ", " + q
1329 + "empty queue]";
1330 }
1331
1332 // Internal support methods for Conditions
1333
1334 /**
1335 * Returns true if a node, always one that was initially placed on
1336 * a condition queue, is now waiting to reacquire on sync queue.
1337 * @param node the node
1338 * @return true if is reacquiring
1339 */
1340 final boolean isOnSyncQueue(Node node) {
1341 if (node.waitStatus == Node.CONDITION || node.prev == null)
1342 return false;
1343 if (node.next != null) // If has successor, it must be on queue
1344 return true;
1345 /*
1346 * node.prev can be non-null, but not yet on queue because
1347 * the CAS to place it on queue can fail. So we have to
1348 * traverse from tail to make sure it actually made it. It
1349 * will always be near the tail in calls to this method, and
1350 * unless the CAS failed (which is unlikely), it will be
1351 * there, so we hardly ever traverse much.
1352 */
1353 return findNodeFromTail(node);
1354 }
1355
1356 /**
1357 * Returns true if node is on sync queue by searching backwards from tail.
1358 * Called only when needed by isOnSyncQueue.
1359 * @return true if present
1360 */
1361 private boolean findNodeFromTail(Node node) {
1362 Node t = tail;
1363 for (;;) {
1364 if (t == node)
1365 return true;
1366 if (t == null)
1367 return false;
1368 t = t.prev;
1369 }
1370 }
1371
1372 /**
1373 * Transfers a node from a condition queue onto sync queue.
1374 * Returns true if successful.
1375 * @param node the node
1376 * @return true if successfully transferred (else the node was
1377 * cancelled before signal).
1378 */
1379 final boolean transferForSignal(Node node) {
1380 /*
1381 * If cannot change waitStatus, the node has been cancelled.
1382 */
1383 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
1384 return false;
1385
1386 /*
1387 * Splice onto queue and try to set waitStatus of predecessor to
1388 * indicate that thread is (probably) waiting. If cancelled or
1389 * attempt to set waitStatus fails, wake up to resync (in which
1390 * case the waitStatus can be transiently and harmlessly wrong).
1391 */
1392 Node p = enq(node);
1393 int c = p.waitStatus;
1394 if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
1395 LockSupport.unpark(node.thread);
1396 return true;
1397 }
1398
1399 /**
1400 * Transfers node, if necessary, to sync queue after a cancelled
1401 * wait. Returns true if thread was cancelled before being
1402 * signalled.
1403 * @param current the waiting thread
1404 * @param node its node
1405 * @return true if cancelled before the node was signalled
1406 */
1407 final boolean transferAfterCancelledWait(Node node) {
1408 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
1409 enq(node);
1410 return true;
1411 }
1412 /*
1413 * If we lost out to a signal(), then we can't proceed
1414 * until it finishes its enq(). Cancelling during an
1415 * incomplete transfer is both rare and transient, so just
1416 * spin.
1417 */
1418 while (!isOnSyncQueue(node))
1419 Thread.yield();
1420 return false;
1421 }
1422
1423 /**
1424 * Invokes release with current state value; returns saved state.
1425 * Cancels node and throws exception on failure.
1426 * @param node the condition node for this wait
1427 * @return previous sync state
1428 */
1429 final long fullyRelease(Node node) {
1430 boolean failed = true;
1431 try {
1432 long savedState = getState();
1433 if (release(savedState)) {
1434 failed = false;
1435 return savedState;
1436 } else {
1437 throw new IllegalMonitorStateException();
1438 }
1439 } finally {
1440 if (failed)
1441 node.waitStatus = Node.CANCELLED;
1442 }
1443 }
1444
1445 // Instrumentation methods for conditions
1446
1447 /**
1448 * Queries whether the given ConditionObject
1449 * uses this synchronizer as its lock.
1450 *
1451 * @param condition the condition
1452 * @return <tt>true</tt> if owned
1453 * @throws NullPointerException if the condition is null
1454 */
1455 public final boolean owns(ConditionObject condition) {
1456 if (condition == null)
1457 throw new NullPointerException();
1458 return condition.isOwnedBy(this );
1459 }
1460
1461 /**
1462 * Queries whether any threads are waiting on the given condition
1463 * associated with this synchronizer. Note that because timeouts
1464 * and interrupts may occur at any time, a <tt>true</tt> return
1465 * does not guarantee that a future <tt>signal</tt> will awaken
1466 * any threads. This method is designed primarily for use in
1467 * monitoring of the system state.
1468 *
1469 * @param condition the condition
1470 * @return <tt>true</tt> if there are any waiting threads
1471 * @throws IllegalMonitorStateException if exclusive synchronization
1472 * is not held
1473 * @throws IllegalArgumentException if the given condition is
1474 * not associated with this synchronizer
1475 * @throws NullPointerException if the condition is null
1476 */
1477 public final boolean hasWaiters(ConditionObject condition) {
1478 if (!owns(condition))
1479 throw new IllegalArgumentException("Not owner");
1480 return condition.hasWaiters();
1481 }
1482
1483 /**
1484 * Returns an estimate of the number of threads waiting on the
1485 * given condition associated with this synchronizer. Note that
1486 * because timeouts and interrupts may occur at any time, the
1487 * estimate serves only as an upper bound on the actual number of
1488 * waiters. This method is designed for use in monitoring of the
1489 * system state, not for synchronization control.
1490 *
1491 * @param condition the condition
1492 * @return the estimated number of waiting threads
1493 * @throws IllegalMonitorStateException if exclusive synchronization
1494 * is not held
1495 * @throws IllegalArgumentException if the given condition is
1496 * not associated with this synchronizer
1497 * @throws NullPointerException if the condition is null
1498 */
1499 public final int getWaitQueueLength(ConditionObject condition) {
1500 if (!owns(condition))
1501 throw new IllegalArgumentException("Not owner");
1502 return condition.getWaitQueueLength();
1503 }
1504
1505 /**
1506 * Returns a collection containing those threads that may be
1507 * waiting on the given condition associated with this
1508 * synchronizer. Because the actual set of threads may change
1509 * dynamically while constructing this result, the returned
1510 * collection is only a best-effort estimate. The elements of the
1511 * returned collection are in no particular order.
1512 *
1513 * @param condition the condition
1514 * @return the collection of threads
1515 * @throws IllegalMonitorStateException if exclusive synchronization
1516 * is not held
1517 * @throws IllegalArgumentException if the given condition is
1518 * not associated with this synchronizer
1519 * @throws NullPointerException if the condition is null
1520 */
1521 public final Collection<Thread> getWaitingThreads(
1522 ConditionObject condition) {
1523 if (!owns(condition))
1524 throw new IllegalArgumentException("Not owner");
1525 return condition.getWaitingThreads();
1526 }
1527
1528 /**
1529 * Condition implementation for a {@link
1530 * AbstractQueuedLongSynchronizer} serving as the basis of a {@link
1531 * Lock} implementation.
1532 *
1533 * <p>Method documentation for this class describes mechanics,
1534 * not behavioral specifications from the point of view of Lock
1535 * and Condition users. Exported versions of this class will in
1536 * general need to be accompanied by documentation describing
1537 * condition semantics that rely on those of the associated
1538 * <tt>AbstractQueuedLongSynchronizer</tt>.
1539 *
1540 * <p>This class is Serializable, but all fields are transient,
1541 * so deserialized conditions have no waiters.
1542 *
1543 * @since 1.6
1544 */
1545 public class ConditionObject implements Condition,
1546 java.io.Serializable {
1547 private static final long serialVersionUID = 1173984872572414699L;
1548 /** First node of condition queue. */
1549 private transient Node firstWaiter;
1550 /** Last node of condition queue. */
1551 private transient Node lastWaiter;
1552
1553 /**
1554 * Creates a new <tt>ConditionObject</tt> instance.
1555 */
1556 public ConditionObject() {
1557 }
1558
1559 // Internal methods
1560
1561 /**
1562 * Adds a new waiter to wait queue.
1563 * @return its new wait node
1564 */
1565 private Node addConditionWaiter() {
1566 Node t = lastWaiter;
1567 // If lastWaiter is cancelled, clean out.
1568 if (t != null && t.waitStatus != Node.CONDITION) {
1569 unlinkCancelledWaiters();
1570 t = lastWaiter;
1571 }
1572 Node node = new Node(Thread.currentThread(), Node.CONDITION);
1573 if (t == null)
1574 firstWaiter = node;
1575 else
1576 t.nextWaiter = node;
1577 lastWaiter = node;
1578 return node;
1579 }
1580
1581 /**
1582 * Removes and transfers nodes until hit non-cancelled one or
1583 * null. Split out from signal in part to encourage compilers
1584 * to inline the case of no waiters.
1585 * @param first (non-null) the first node on condition queue
1586 */
1587 private void doSignal(Node first) {
1588 do {
1589 if ((firstWaiter = first.nextWaiter) == null)
1590 lastWaiter = null;
1591 first.nextWaiter = null;
1592 } while (!transferForSignal(first)
1593 && (first = firstWaiter) != null);
1594 }
1595
1596 /**
1597 * Removes and transfers all nodes.
1598 * @param first (non-null) the first node on condition queue
1599 */
1600 private void doSignalAll(Node first) {
1601 lastWaiter = firstWaiter = null;
1602 do {
1603 Node next = first.nextWaiter;
1604 first.nextWaiter = null;
1605 transferForSignal(first);
1606 first = next;
1607 } while (first != null);
1608 }
1609
1610 /**
1611 * Unlinks cancelled waiter nodes from condition queue.
1612 * Called only while holding lock. This is called when
1613 * cancellation occurred during condition wait, and upon
1614 * insertion of a new waiter when lastWaiter is seen to have
1615 * been cancelled. This method is needed to avoid garbage
1616 * retention in the absence of signals. So even though it may
1617 * require a full traversal, it comes into play only when
1618 * timeouts or cancellations occur in the absence of
1619 * signals. It traverses all nodes rather than stopping at a
1620 * particular target to unlink all pointers to garbage nodes
1621 * without requiring many re-traversals during cancellation
1622 * storms.
1623 */
1624 private void unlinkCancelledWaiters() {
1625 Node t = firstWaiter;
1626 Node trail = null;
1627 while (t != null) {
1628 Node next = t.nextWaiter;
1629 if (t.waitStatus != Node.CONDITION) {
1630 t.nextWaiter = null;
1631 if (trail == null)
1632 firstWaiter = next;
1633 else
1634 trail.nextWaiter = next;
1635 if (next == null)
1636 lastWaiter = trail;
1637 } else
1638 trail = t;
1639 t = next;
1640 }
1641 }
1642
1643 // public methods
1644
1645 /**
1646 * Moves the longest-waiting thread, if one exists, from the
1647 * wait queue for this condition to the wait queue for the
1648 * owning lock.
1649 *
1650 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1651 * returns {@code false}
1652 */
1653 public final void signal() {
1654 if (!isHeldExclusively())
1655 throw new IllegalMonitorStateException();
1656 Node first = firstWaiter;
1657 if (first != null)
1658 doSignal(first);
1659 }
1660
1661 /**
1662 * Moves all threads from the wait queue for this condition to
1663 * the wait queue for the owning lock.
1664 *
1665 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1666 * returns {@code false}
1667 */
1668 public final void signalAll() {
1669 if (!isHeldExclusively())
1670 throw new IllegalMonitorStateException();
1671 Node first = firstWaiter;
1672 if (first != null)
1673 doSignalAll(first);
1674 }
1675
1676 /**
1677 * Implements uninterruptible condition wait.
1678 * <ol>
1679 * <li> Save lock state returned by {@link #getState}.
1680 * <li> Invoke {@link #release} with
1681 * saved state as argument, throwing
1682 * IllegalMonitorStateException if it fails.
1683 * <li> Block until signalled.
1684 * <li> Reacquire by invoking specialized version of
1685 * {@link #acquire} with saved state as argument.
1686 * </ol>
1687 */
1688 public final void awaitUninterruptibly() {
1689 Node node = addConditionWaiter();
1690 long savedState = fullyRelease(node);
1691 boolean interrupted = false;
1692 while (!isOnSyncQueue(node)) {
1693 LockSupport.park(this );
1694 if (Thread.interrupted())
1695 interrupted = true;
1696 }
1697 if (acquireQueued(node, savedState) || interrupted)
1698 selfInterrupt();
1699 }
1700
1701 /*
1702 * For interruptible waits, we need to track whether to throw
1703 * InterruptedException, if interrupted while blocked on
1704 * condition, versus reinterrupt current thread, if
1705 * interrupted while blocked waiting to re-acquire.
1706 */
1707
1708 /** Mode meaning to reinterrupt on exit from wait */
1709 private static final int REINTERRUPT = 1;
1710 /** Mode meaning to throw InterruptedException on exit from wait */
1711 private static final int THROW_IE = -1;
1712
1713 /**
1714 * Checks for interrupt, returning THROW_IE if interrupted
1715 * before signalled, REINTERRUPT if after signalled, or
1716 * 0 if not interrupted.
1717 */
1718 private int checkInterruptWhileWaiting(Node node) {
1719 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE
1720 : REINTERRUPT)
1721 : 0;
1722 }
1723
1724 /**
1725 * Throws InterruptedException, reinterrupts current thread, or
1726 * does nothing, depending on mode.
1727 */
1728 private void reportInterruptAfterWait(int interruptMode)
1729 throws InterruptedException {
1730 if (interruptMode == THROW_IE)
1731 throw new InterruptedException();
1732 else if (interruptMode == REINTERRUPT)
1733 selfInterrupt();
1734 }
1735
1736 /**
1737 * Implements interruptible condition wait.
1738 * <ol>
1739 * <li> If current thread is interrupted, throw InterruptedException.
1740 * <li> Save lock state returned by {@link #getState}.
1741 * <li> Invoke {@link #release} with
1742 * saved state as argument, throwing
1743 * IllegalMonitorStateException if it fails.
1744 * <li> Block until signalled or interrupted.
1745 * <li> Reacquire by invoking specialized version of
1746 * {@link #acquire} with saved state as argument.
1747 * <li> If interrupted while blocked in step 4, throw InterruptedException.
1748 * </ol>
1749 */
1750 public final void await() throws InterruptedException {
1751 if (Thread.interrupted())
1752 throw new InterruptedException();
1753 Node node = addConditionWaiter();
1754 long savedState = fullyRelease(node);
1755 int interruptMode = 0;
1756 while (!isOnSyncQueue(node)) {
1757 LockSupport.park(this );
1758 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1759 break;
1760 }
1761 if (acquireQueued(node, savedState)
1762 && interruptMode != THROW_IE)
1763 interruptMode = REINTERRUPT;
1764 if (node.nextWaiter != null) // clean up if cancelled
1765 unlinkCancelledWaiters();
1766 if (interruptMode != 0)
1767 reportInterruptAfterWait(interruptMode);
1768 }
1769
1770 /**
1771 * Implements timed condition wait.
1772 * <ol>
1773 * <li> If current thread is interrupted, throw InterruptedException.
1774 * <li> Save lock state returned by {@link #getState}.
1775 * <li> Invoke {@link #release} with
1776 * saved state as argument, throwing
1777 * IllegalMonitorStateException if it fails.
1778 * <li> Block until signalled, interrupted, or timed out.
1779 * <li> Reacquire by invoking specialized version of
1780 * {@link #acquire} with saved state as argument.
1781 * <li> If interrupted while blocked in step 4, throw InterruptedException.
1782 * </ol>
1783 */
1784 public final long awaitNanos(long nanosTimeout)
1785 throws InterruptedException {
1786 if (Thread.interrupted())
1787 throw new InterruptedException();
1788 Node node = addConditionWaiter();
1789 long savedState = fullyRelease(node);
1790 long lastTime = System.nanoTime();
1791 int interruptMode = 0;
1792 while (!isOnSyncQueue(node)) {
1793 if (nanosTimeout <= 0L) {
1794 transferAfterCancelledWait(node);
1795 break;
1796 }
1797 LockSupport.parkNanos(this , nanosTimeout);
1798 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1799 break;
1800
1801 long now = System.nanoTime();
1802 nanosTimeout -= now - lastTime;
1803 lastTime = now;
1804 }
1805 if (acquireQueued(node, savedState)
1806 && interruptMode != THROW_IE)
1807 interruptMode = REINTERRUPT;
1808 if (node.nextWaiter != null)
1809 unlinkCancelledWaiters();
1810 if (interruptMode != 0)
1811 reportInterruptAfterWait(interruptMode);
1812 return nanosTimeout - (System.nanoTime() - lastTime);
1813 }
1814
1815 /**
1816 * Implements absolute timed condition wait.
1817 * <ol>
1818 * <li> If current thread is interrupted, throw InterruptedException.
1819 * <li> Save lock state returned by {@link #getState}.
1820 * <li> Invoke {@link #release} with
1821 * saved state as argument, throwing
1822 * IllegalMonitorStateException if it fails.
1823 * <li> Block until signalled, interrupted, or timed out.
1824 * <li> Reacquire by invoking specialized version of
1825 * {@link #acquire} with saved state as argument.
1826 * <li> If interrupted while blocked in step 4, throw InterruptedException.
1827 * <li> If timed out while blocked in step 4, return false, else true.
1828 * </ol>
1829 */
1830 public final boolean awaitUntil(Date deadline)
1831 throws InterruptedException {
1832 if (deadline == null)
1833 throw new NullPointerException();
1834 long abstime = deadline.getTime();
1835 if (Thread.interrupted())
1836 throw new InterruptedException();
1837 Node node = addConditionWaiter();
1838 long savedState = fullyRelease(node);
1839 boolean timedout = false;
1840 int interruptMode = 0;
1841 while (!isOnSyncQueue(node)) {
1842 if (System.currentTimeMillis() > abstime) {
1843 timedout = transferAfterCancelledWait(node);
1844 break;
1845 }
1846 LockSupport.parkUntil(this , abstime);
1847 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1848 break;
1849 }
1850 if (acquireQueued(node, savedState)
1851 && interruptMode != THROW_IE)
1852 interruptMode = REINTERRUPT;
1853 if (node.nextWaiter != null)
1854 unlinkCancelledWaiters();
1855 if (interruptMode != 0)
1856 reportInterruptAfterWait(interruptMode);
1857 return !timedout;
1858 }
1859
1860 /**
1861 * Implements timed condition wait.
1862 * <ol>
1863 * <li> If current thread is interrupted, throw InterruptedException.
1864 * <li> Save lock state returned by {@link #getState}.
1865 * <li> Invoke {@link #release} with
1866 * saved state as argument, throwing
1867 * IllegalMonitorStateException if it fails.
1868 * <li> Block until signalled, interrupted, or timed out.
1869 * <li> Reacquire by invoking specialized version of
1870 * {@link #acquire} with saved state as argument.
1871 * <li> If interrupted while blocked in step 4, throw InterruptedException.
1872 * <li> If timed out while blocked in step 4, return false, else true.
1873 * </ol>
1874 */
1875 public final boolean await(long time, TimeUnit unit)
1876 throws InterruptedException {
1877 if (unit == null)
1878 throw new NullPointerException();
1879 long nanosTimeout = unit.toNanos(time);
1880 if (Thread.interrupted())
1881 throw new InterruptedException();
1882 Node node = addConditionWaiter();
1883 long savedState = fullyRelease(node);
1884 long lastTime = System.nanoTime();
1885 boolean timedout = false;
1886 int interruptMode = 0;
1887 while (!isOnSyncQueue(node)) {
1888 if (nanosTimeout <= 0L) {
1889 timedout = transferAfterCancelledWait(node);
1890 break;
1891 }
1892 if (nanosTimeout >= spinForTimeoutThreshold)
1893 LockSupport.parkNanos(this , nanosTimeout);
1894 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1895 break;
1896 long now = System.nanoTime();
1897 nanosTimeout -= now - lastTime;
1898 lastTime = now;
1899 }
1900 if (acquireQueued(node, savedState)
1901 && interruptMode != THROW_IE)
1902 interruptMode = REINTERRUPT;
1903 if (node.nextWaiter != null)
1904 unlinkCancelledWaiters();
1905 if (interruptMode != 0)
1906 reportInterruptAfterWait(interruptMode);
1907 return !timedout;
1908 }
1909
1910 // support for instrumentation
1911
1912 /**
1913 * Returns true if this condition was created by the given
1914 * synchronization object.
1915 *
1916 * @return {@code true} if owned
1917 */
1918 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) {
1919 return sync == AbstractQueuedLongSynchronizer.this ;
1920 }
1921
1922 /**
1923 * Queries whether any threads are waiting on this condition.
1924 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters}.
1925 *
1926 * @return {@code true} if there are any waiting threads
1927 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1928 * returns {@code false}
1929 */
1930 protected final boolean hasWaiters() {
1931 if (!isHeldExclusively())
1932 throw new IllegalMonitorStateException();
1933 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1934 if (w.waitStatus == Node.CONDITION)
1935 return true;
1936 }
1937 return false;
1938 }
1939
1940 /**
1941 * Returns an estimate of the number of threads waiting on
1942 * this condition.
1943 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength}.
1944 *
1945 * @return the estimated number of waiting threads
1946 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1947 * returns {@code false}
1948 */
1949 protected final int getWaitQueueLength() {
1950 if (!isHeldExclusively())
1951 throw new IllegalMonitorStateException();
1952 int n = 0;
1953 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1954 if (w.waitStatus == Node.CONDITION)
1955 ++n;
1956 }
1957 return n;
1958 }
1959
1960 /**
1961 * Returns a collection containing those threads that may be
1962 * waiting on this Condition.
1963 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads}.
1964 *
1965 * @return the collection of threads
1966 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1967 * returns {@code false}
1968 */
1969 protected final Collection<Thread> getWaitingThreads() {
1970 if (!isHeldExclusively())
1971 throw new IllegalMonitorStateException();
1972 ArrayList<Thread> list = new ArrayList<Thread>();
1973 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1974 if (w.waitStatus == Node.CONDITION) {
1975 Thread t = w.thread;
1976 if (t != null)
1977 list.add(t);
1978 }
1979 }
1980 return list;
1981 }
1982 }
1983
1984 /**
1985 * Setup to support compareAndSet. We need to natively implement
1986 * this here: For the sake of permitting future enhancements, we
1987 * cannot explicitly subclass AtomicLong, which would be
1988 * efficient and useful otherwise. So, as the lesser of evils, we
1989 * natively implement using hotspot intrinsics API. And while we
1990 * are at it, we do the same for other CASable fields (which could
1991 * otherwise be done with atomic field updaters).
1992 */
1993 private static final Unsafe unsafe = Unsafe.getUnsafe();
1994 private static final long stateOffset;
1995 private static final long headOffset;
1996 private static final long tailOffset;
1997 private static final long waitStatusOffset;
1998 private static final long nextOffset;
1999
2000 static {
2001 try {
2002 stateOffset = unsafe
2003 .objectFieldOffset(AbstractQueuedLongSynchronizer.class
2004 .getDeclaredField("state"));
2005 headOffset = unsafe
2006 .objectFieldOffset(AbstractQueuedLongSynchronizer.class
2007 .getDeclaredField("head"));
2008 tailOffset = unsafe
2009 .objectFieldOffset(AbstractQueuedLongSynchronizer.class
2010 .getDeclaredField("tail"));
2011 waitStatusOffset = unsafe.objectFieldOffset(Node.class
2012 .getDeclaredField("waitStatus"));
2013 nextOffset = unsafe.objectFieldOffset(Node.class
2014 .getDeclaredField("next"));
2015
2016 } catch (Exception ex) {
2017 throw new Error(ex);
2018 }
2019 }
2020
2021 /**
2022 * CAS head field. Used only by enq.
2023 */
2024 private final boolean compareAndSetHead(Node update) {
2025 return unsafe.compareAndSwapObject(this , headOffset, null,
2026 update);
2027 }
2028
2029 /**
2030 * CAS tail field. Used only by enq.
2031 */
2032 private final boolean compareAndSetTail(Node expect, Node update) {
2033 return unsafe.compareAndSwapObject(this , tailOffset, expect,
2034 update);
2035 }
2036
2037 /**
2038 * CAS waitStatus field of a node.
2039 */
2040 private final static boolean compareAndSetWaitStatus(Node node,
2041 int expect, int update) {
2042 return unsafe.compareAndSwapInt(node, waitStatusOffset, expect,
2043 update);
2044 }
2045
2046 /**
2047 * CAS next field of a node.
2048 */
2049 private final static boolean compareAndSetNext(Node node,
2050 Node expect, Node update) {
2051 return unsafe.compareAndSwapObject(node, nextOffset, expect,
2052 update);
2053 }
2054 }
|