0001: /*
0002: * Written by Doug Lea with assistance from members of JCP JSR-166
0003: * Expert Group and released to the public domain, as explained at
0004: * http://creativecommons.org/licenses/publicdomain
0005: */
0006:
0007: package java.util.concurrent.locks;
0008:
0009: import java.util.*;
0010: import java.util.concurrent.*;
0011: import java.util.concurrent.atomic.*;
0012: import sun.misc.Unsafe;
0013:
0014: /**
0015: * Provides a framework for implementing blocking locks and related
0016: * synchronizers (semaphores, events, etc) that rely on
0017: * first-in-first-out (FIFO) wait queues. This class is designed to
0018: * be a useful basis for most kinds of synchronizers that rely on a
0019: * single atomic <tt>int</tt> value to represent state. Subclasses
0020: * must define the protected methods that change this state, and which
0021: * define what that state means in terms of this object being acquired
0022: * or released. Given these, the other methods in this class carry
0023: * out all queuing and blocking mechanics. Subclasses can maintain
0024: * other state fields, but only the atomically updated <tt>int</tt>
0025: * value manipulated using methods {@link #getState}, {@link
0026: * #setState} and {@link #compareAndSetState} is tracked with respect
0027: * to synchronization.
0028: *
0029: * <p>Subclasses should be defined as non-public internal helper
0030: * classes that are used to implement the synchronization properties
0031: * of their enclosing class. Class
0032: * <tt>AbstractQueuedSynchronizer</tt> does not implement any
0033: * synchronization interface. Instead it defines methods such as
0034: * {@link #acquireInterruptibly} that can be invoked as
0035: * appropriate by concrete locks and related synchronizers to
0036: * implement their public methods.
0037: *
0038: * <p>This class supports either or both a default <em>exclusive</em>
0039: * mode and a <em>shared</em> mode. When acquired in exclusive mode,
0040: * attempted acquires by other threads cannot succeed. Shared mode
0041: * acquires by multiple threads may (but need not) succeed. This class
0042: * does not "understand" these differences except in the
0043: * mechanical sense that when a shared mode acquire succeeds, the next
0044: * waiting thread (if one exists) must also determine whether it can
0045: * acquire as well. Threads waiting in the different modes share the
0046: * same FIFO queue. Usually, implementation subclasses support only
0047: * one of these modes, but both can come into play for example in a
0048: * {@link ReadWriteLock}. Subclasses that support only exclusive or
0049: * only shared modes need not define the methods supporting the unused mode.
0050: *
0051: * <p>This class defines a nested {@link ConditionObject} class that
0052: * can be used as a {@link Condition} implementation by subclasses
0053: * supporting exclusive mode for which method {@link
0054: * #isHeldExclusively} reports whether synchronization is exclusively
0055: * held with respect to the current thread, method {@link #release}
0056: * invoked with the current {@link #getState} value fully releases
0057: * this object, and {@link #acquire}, given this saved state value,
0058: * eventually restores this object to its previous acquired state. No
0059: * <tt>AbstractQueuedSynchronizer</tt> method otherwise creates such a
0060: * condition, so if this constraint cannot be met, do not use it. The
0061: * behavior of {@link ConditionObject} depends of course on the
0062: * semantics of its synchronizer implementation.
0063: *
0064: * <p> This class provides inspection, instrumentation, and monitoring
0065: * methods for the internal queue, as well as similar methods for
0066: * condition objects. These can be exported as desired into classes
0067: * using an <tt>AbstractQueuedSynchronizer</tt> for their
0068: * synchronization mechanics.
0069: *
0070: * <p> Serialization of this class stores only the underlying atomic
0071: * integer maintaining state, so deserialized objects have empty
0072: * thread queues. Typical subclasses requiring serializability will
0073: * define a <tt>readObject</tt> method that restores this to a known
0074: * initial state upon deserialization.
0075: *
0076: * <h3>Usage</h3>
0077: *
0078: * <p> To use this class as the basis of a synchronizer, redefine the
0079: * following methods, as applicable, by inspecting and/or modifying
0080: * the synchronization state using {@link #getState}, {@link
0081: * #setState} and/or {@link #compareAndSetState}:
0082: *
0083: * <ul>
0084: * <li> {@link #tryAcquire}
0085: * <li> {@link #tryRelease}
0086: * <li> {@link #tryAcquireShared}
0087: * <li> {@link #tryReleaseShared}
0088: * <li> {@link #isHeldExclusively}
0089: *</ul>
0090: *
0091: * Each of these methods by default throws {@link
0092: * UnsupportedOperationException}. Implementations of these methods
0093: * must be internally thread-safe, and should in general be short and
0094: * not block. Defining these methods is the <em>only</em> supported
0095: * means of using this class. All other methods are declared
0096: * <tt>final</tt> because they cannot be independently varied.
0097: *
0098: * <p> Even though this class is based on an internal FIFO queue, it
0099: * does not automatically enforce FIFO acquisition policies. The core
0100: * of exclusive synchronization takes the form:
0101: *
0102: * <pre>
0103: * Acquire:
0104: * while (!tryAcquire(arg)) {
0105: * <em>enqueue thread if it is not already queued</em>;
0106: * <em>possibly block current thread</em>;
0107: * }
0108: *
0109: * Release:
0110: * if (tryRelease(arg))
0111: * <em>unblock the first queued thread</em>;
0112: * </pre>
0113: *
0114: * (Shared mode is similar but may involve cascading signals.)
0115: *
0116: * <p> Because checks in acquire are invoked before enqueuing, a newly
0117: * acquiring thread may <em>barge</em> ahead of others that are
0118: * blocked and queued. However, you can, if desired, define
0119: * <tt>tryAcquire</tt> and/or <tt>tryAcquireShared</tt> to disable
0120: * barging by internally invoking one or more of the inspection
0121: * methods. In particular, a strict FIFO lock can define
0122: * <tt>tryAcquire</tt> to immediately return <tt>false</tt> if {@link
0123: * #getFirstQueuedThread} does not return the current thread. A
0124: * normally preferable non-strict fair version can immediately return
0125: * <tt>false</tt> only if {@link #hasQueuedThreads} returns
0126: * <tt>true</tt> and <tt>getFirstQueuedThread</tt> is not the current
0127: * thread; or equivalently, that <tt>getFirstQueuedThread</tt> is both
0128: * non-null and not the current thread. Further variations are
0129: * possible.
0130: *
0131: * <p> Throughput and scalability are generally highest for the
0132: * default barging (also known as <em>greedy</em>,
0133: * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy.
0134: * While this is not guaranteed to be fair or starvation-free, earlier
0135: * queued threads are allowed to recontend before later queued
0136: * threads, and each recontention has an unbiased chance to succeed
0137: * against incoming threads. Also, while acquires do not
0138: * "spin" in the usual sense, they may perform multiple
0139: * invocations of <tt>tryAcquire</tt> interspersed with other
0140: * computations before blocking. This gives most of the benefits of
0141: * spins when exclusive synchronization is only briefly held, without
0142: * most of the liabilities when it isn't. If so desired, you can
0143: * augment this by preceding calls to acquire methods with
0144: * "fast-path" checks, possibly prechecking {@link #hasContended}
0145: * and/or {@link #hasQueuedThreads} to only do so if the synchronizer
0146: * is likely not to be contended.
0147: *
0148: * <p> This class provides an efficient and scalable basis for
0149: * synchronization in part by specializing its range of use to
0150: * synchronizers that can rely on <tt>int</tt> state, acquire, and
0151: * release parameters, and an internal FIFO wait queue. When this does
0152: * not suffice, you can build synchronizers from a lower level using
0153: * {@link java.util.concurrent.atomic atomic} classes, your own custom
0154: * {@link java.util.Queue} classes, and {@link LockSupport} blocking
0155: * support.
0156: *
0157: * <h3>Usage Examples</h3>
0158: *
0159: * <p>Here is a non-reentrant mutual exclusion lock class that uses
0160: * the value zero to represent the unlocked state, and one to
0161: * represent the locked state. It also supports conditions and exposes
0162: * one of the instrumentation methods:
0163: *
0164: * <pre>
0165: * class Mutex implements Lock, java.io.Serializable {
0166: *
0167: * // Our internal helper class
0168: * private static class Sync extends AbstractQueuedSynchronizer {
0169: * // Report whether in locked state
0170: * protected boolean isHeldExclusively() {
0171: * return getState() == 1;
0172: * }
0173: *
0174: * // Acquire the lock if state is zero
0175: * public boolean tryAcquire(int acquires) {
0176: * assert acquires == 1; // Otherwise unused
0177: * return compareAndSetState(0, 1);
0178: * }
0179: *
0180: * // Release the lock by setting state to zero
0181: * protected boolean tryRelease(int releases) {
0182: * assert releases == 1; // Otherwise unused
0183: * if (getState() == 0) throw new IllegalMonitorStateException();
0184: * setState(0);
0185: * return true;
0186: * }
0187: *
0188: * // Provide a Condition
0189: * Condition newCondition() { return new ConditionObject(); }
0190: *
0191: * // Deserialize properly
0192: * private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
0193: * s.defaultReadObject();
0194: * setState(0); // reset to unlocked state
0195: * }
0196: * }
0197: *
0198: * // The sync object does all the hard work. We just forward to it.
0199: * private final Sync sync = new Sync();
0200: *
0201: * public void lock() { sync.acquire(1); }
0202: * public boolean tryLock() { return sync.tryAcquire(1); }
0203: * public void unlock() { sync.release(1); }
0204: * public Condition newCondition() { return sync.newCondition(); }
0205: * public boolean isLocked() { return sync.isHeldExclusively(); }
0206: * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
0207: * public void lockInterruptibly() throws InterruptedException {
0208: * sync.acquireInterruptibly(1);
0209: * }
0210: * public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
0211: * return sync.tryAcquireNanos(1, unit.toNanos(timeout));
0212: * }
0213: * }
0214: * </pre>
0215: *
0216: * <p> Here is a latch class that is like a {@link CountDownLatch}
0217: * except that it only requires a single <tt>signal</tt> to
0218: * fire. Because a latch is non-exclusive, it uses the <tt>shared</tt>
0219: * acquire and release methods.
0220: *
0221: * <pre>
0222: * class BooleanLatch {
0223: *
0224: * private static class Sync extends AbstractQueuedSynchronizer {
0225: * boolean isSignalled() { return getState() != 0; }
0226: *
0227: * protected int tryAcquireShared(int ignore) {
0228: * return isSignalled()? 1 : -1;
0229: * }
0230: *
0231: * protected boolean tryReleaseShared(int ignore) {
0232: * setState(1);
0233: * return true;
0234: * }
0235: * }
0236: *
0237: * private final Sync sync = new Sync();
0238: * public boolean isSignalled() { return sync.isSignalled(); }
0239: * public void signal() { sync.releaseShared(1); }
0240: * public void await() throws InterruptedException {
0241: * sync.acquireSharedInterruptibly(1);
0242: * }
0243: * }
0244: *
0245: * </pre>
0246: *
0247: * @since 1.5
0248: * @author Doug Lea
0249: */
0250: public abstract class AbstractQueuedSynchronizer implements
0251: java.io.Serializable {
0252: private static final long serialVersionUID = 7373984972572414691L;
0253:
0254: /**
0255: * Creates a new <tt>AbstractQueuedSynchronizer</tt> instance
0256: * with initial synchronization state of zero.
0257: */
0258: protected AbstractQueuedSynchronizer() {
0259: }
0260:
0261: /**
0262: * Wait queue node class.
0263: *
0264: * <p> The wait queue is a variant of a "CLH" (Craig, Landin, and
0265: * Hagersten) lock queue. CLH locks are normally used for
0266: * spinlocks. We instead use them for blocking synchronizers, but
0267: * use the same basic tactic of holding some of the control
0268: * information about a thread in the predecessor of its node. A
0269: * "status" field in each node keeps track of whether a thread
0270: * should block. A node is signalled when its predecessor
0271: * releases. Each node of the queue otherwise serves as a
0272: * specific-notification-style monitor holding a single waiting
0273: * thread. The status field does NOT control whether threads are
0274: * granted locks etc though. A thread may try to acquire if it is
0275: * first in the queue. But being first does not guarantee success;
0276: * it only gives the right to contend. So the currently released
0277: * contender thread may need to rewait.
0278: *
0279: * <p>To enqueue into a CLH lock, you atomically splice it in as new
0280: * tail. To dequeue, you just set the head field.
0281: * <pre>
0282: * +------+ prev +-----+ +-----+
0283: * head | | <---- | | <---- | | tail
0284: * +------+ +-----+ +-----+
0285: * </pre>
0286: *
0287: * <p>Insertion into a CLH queue requires only a single atomic
0288: * operation on "tail", so there is a simple atomic point of
0289: * demarcation from unqueued to queued. Similarly, dequeing
0290: * involves only updating the "head". However, it takes a bit
0291: * more work for nodes to determine who their successors are,
0292: * in part to deal with possible cancellation due to timeouts
0293: * and interrupts.
0294: *
0295: * <p>The "prev" links (not used in original CLH locks), are mainly
0296: * needed to handle cancellation. If a node is cancelled, its
0297: * successor is (normally) relinked to a non-cancelled
0298: * predecessor. For explanation of similar mechanics in the case
0299: * of spin locks, see the papers by Scott and Scherer at
0300: * http://www.cs.rochester.edu/u/scott/synchronization/
0301: *
0302: * <p>We also use "next" links to implement blocking mechanics.
0303: * The thread id for each node is kept in its own node, so a
0304: * predecessor signals the next node to wake up by traversing
0305: * next link to determine which thread it is. Determination of
0306: * successor must avoid races with newly queued nodes to set
0307: * the "next" fields of their predecessors. This is solved
0308: * when necessary by checking backwards from the atomically
0309: * updated "tail" when a node's successor appears to be null.
0310: * (Or, said differently, the next-links are an optimization
0311: * so that we don't usually need a backward scan.)
0312: *
0313: * <p>Cancellation introduces some conservatism to the basic
0314: * algorithms. Since we must poll for cancellation of other
0315: * nodes, we can miss noticing whether a cancelled node is
0316: * ahead or behind us. This is dealt with by always unparking
0317: * successors upon cancellation, allowing them to stabilize on
0318: * a new predecessor.
0319: *
0320: * <p>CLH queues need a dummy header node to get started. But
0321: * we don't create them on construction, because it would be wasted
0322: * effort if there is never contention. Instead, the node
0323: * is constructed and head and tail pointers are set upon first
0324: * contention.
0325: *
0326: * <p>Threads waiting on Conditions use the same nodes, but
0327: * use an additional link. Conditions only need to link nodes
0328: * in simple (non-concurrent) linked queues because they are
0329: * only accessed when exclusively held. Upon await, a node is
0330: * inserted into a condition queue. Upon signal, the node is
0331: * transferred to the main queue. A special value of status
0332: * field is used to mark which queue a node is on.
0333: *
0334: * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
0335: * Scherer and Michael Scott, along with members of JSR-166
0336: * expert group, for helpful ideas, discussions, and critiques
0337: * on the design of this class.
0338: */
0339: static final class Node {
0340: /** waitStatus value to indicate thread has cancelled */
0341: static final int CANCELLED = 1;
0342: /** waitStatus value to indicate thread needs unparking */
0343: static final int SIGNAL = -1;
0344: /** waitStatus value to indicate thread is waiting on condition */
0345: static final int CONDITION = -2;
0346: /** Marker to indicate a node is waiting in shared mode */
0347: static final Node SHARED = new Node();
0348: /** Marker to indicate a node is waiting in exclusive mode */
0349: static final Node EXCLUSIVE = null;
0350:
0351: /**
0352: * Status field, taking on only the values:
0353: * SIGNAL: The successor of this node is (or will soon be)
0354: * blocked (via park), so the current node must
0355: * unpark its successor when it releases or
0356: * cancels. To avoid races, acquire methods must
0357: * first indicate they need a signal,
0358: * then retry the atomic acquire, and then,
0359: * on failure, block.
0360: * CANCELLED: Node is cancelled due to timeout or interrupt
0361: * Nodes never leave this state. In particular,
0362: * a thread with cancelled node never again blocks.
0363: * CONDITION: Node is currently on a condition queue
0364: * It will not be used as a sync queue node until
0365: * transferred. (Use of this value here
0366: * has nothing to do with the other uses
0367: * of the field, but simplifies mechanics.)
0368: * 0: None of the above
0369: *
0370: * The values are arranged numerically to simplify use.
0371: * Non-negative values mean that a node doesn't need to
0372: * signal. So, most code doesn't need to check for particular
0373: * values, just for sign.
0374: *
0375: * The field is initialized to 0 for normal sync nodes, and
0376: * CONDITION for condition nodes. It is modified only using
0377: * CAS.
0378: */
0379: volatile int waitStatus;
0380:
0381: /**
0382: * Link to predecessor node that current node/thread relies on
0383: * for checking waitStatus. Assigned during enqueing, and nulled
0384: * out (for sake of GC) only upon dequeuing. Also, upon
0385: * cancellation of a predecessor, we short-circuit while
0386: * finding a non-cancelled one, which will always exist
0387: * because the head node is never cancelled: A node becomes
0388: * head only as a result of successful acquire. A
0389: * cancelled thread never succeeds in acquiring, and a thread only
0390: * cancels itself, not any other node.
0391: */
0392: volatile Node prev;
0393:
0394: /**
0395: * Link to the successor node that the current node/thread
0396: * unparks upon release. Assigned once during enqueuing, and
0397: * nulled out (for sake of GC) when no longer needed. Upon
0398: * cancellation, we cannot adjust this field, but can notice
0399: * status and bypass the node if cancelled. The enq operation
0400: * does not assign next field of a predecessor until after
0401: * attachment, so seeing a null next field does not
0402: * necessarily mean that node is at end of queue. However, if
0403: * a next field appears to be null, we can scan prev's from
0404: * the tail to double-check.
0405: */
0406: volatile Node next;
0407:
0408: /**
0409: * The thread that enqueued this node. Initialized on
0410: * construction and nulled out after use.
0411: */
0412: volatile Thread thread;
0413:
0414: /**
0415: * Link to next node waiting on condition, or the special
0416: * value SHARED. Because condition queues are accessed only
0417: * when holding in exclusive mode, we just need a simple
0418: * linked queue to hold nodes while they are waiting on
0419: * conditions. They are then transferred to the queue to
0420: * re-acquire. And because conditions can only be exclusive,
0421: * we save a field by using special value to indicate shared
0422: * mode.
0423: */
0424: Node nextWaiter;
0425:
0426: /**
0427: * Returns true if node is waiting in shared mode
0428: */
0429: final boolean isShared() {
0430: return nextWaiter == SHARED;
0431: }
0432:
0433: /**
0434: * Returns previous node, or throws NullPointerException if
0435: * null. Use when predecessor cannot be null.
0436: * @return the predecessor of this node
0437: */
0438: final Node predecessor() throws NullPointerException {
0439: Node p = prev;
0440: if (p == null)
0441: throw new NullPointerException();
0442: else
0443: return p;
0444: }
0445:
0446: Node() { // Used to establish initial head or SHARED marker
0447: }
0448:
0449: Node(Thread thread, Node mode) { // Used by addWaiter
0450: this .nextWaiter = mode;
0451: this .thread = thread;
0452: }
0453:
0454: Node(Thread thread, int waitStatus) { // Used by Condition
0455: this .waitStatus = waitStatus;
0456: this .thread = thread;
0457: }
0458: }
0459:
0460: /**
0461: * Head of the wait queue, lazily initialized. Except for
0462: * initialization, it is modified only via method setHead. Note:
0463: * If head exists, its waitStatus is guaranteed not to be
0464: * CANCELLED.
0465: */
0466: private transient volatile Node head;
0467:
0468: /**
0469: * Tail of the wait queue, lazily initialized. Modified only via
0470: * method enq to add new wait node.
0471: */
0472: private transient volatile Node tail;
0473:
0474: /**
0475: * The synchronization state.
0476: */
0477: private volatile int state;
0478:
0479: /**
0480: * Returns the current value of synchronization state.
0481: * This operation has memory semantics of a <tt>volatile</tt> read.
0482: * @return current state value
0483: */
0484: protected final int getState() {
0485: return state;
0486: }
0487:
0488: /**
0489: * Sets the value of synchronization state.
0490: * This operation has memory semantics of a <tt>volatile</tt> write.
0491: * @param newState the new state value
0492: */
0493: protected final void setState(int newState) {
0494: state = newState;
0495: }
0496:
0497: /**
0498: * Atomically sets synchronization state to the given updated
0499: * value if the current state value equals the expected value.
0500: * This operation has memory semantics of a <tt>volatile</tt> read
0501: * and write.
0502: * @param expect the expected value
0503: * @param update the new value
0504: * @return true if successful. False return indicates that
0505: * the actual value was not equal to the expected value.
0506: */
0507: protected final boolean compareAndSetState(int expect, int update) {
0508: // See below for intrinsics setup to support this
0509: return unsafe.compareAndSwapInt(this , stateOffset, expect,
0510: update);
0511: }
0512:
0513: // Queuing utilities
0514:
0515: /**
0516: * Insert node into queue, initializing if necessary. See picture above.
0517: * @param node the node to insert
0518: * @return node's predecessor
0519: */
0520: private Node enq(final Node node) {
0521: for (;;) {
0522: Node t = tail;
0523: if (t == null) { // Must initialize
0524: Node h = new Node(); // Dummy header
0525: h.next = node;
0526: node.prev = h;
0527: if (compareAndSetHead(h)) {
0528: tail = node;
0529: return h;
0530: }
0531: } else {
0532: node.prev = t;
0533: if (compareAndSetTail(t, node)) {
0534: t.next = node;
0535: return t;
0536: }
0537: }
0538: }
0539: }
0540:
0541: /**
0542: * Create and enq node for given thread and mode
0543: * @param current the thread
0544: * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
0545: * @return the new node
0546: */
0547: private Node addWaiter(Node mode) {
0548: Node node = new Node(Thread.currentThread(), mode);
0549: // Try the fast path of enq; backup to full enq on failure
0550: Node pred = tail;
0551: if (pred != null) {
0552: node.prev = pred;
0553: if (compareAndSetTail(pred, node)) {
0554: pred.next = node;
0555: return node;
0556: }
0557: }
0558: enq(node);
0559: return node;
0560: }
0561:
0562: /**
0563: * Set head of queue to be node, thus dequeuing. Called only by
0564: * acquire methods. Also nulls out unused fields for sake of GC
0565: * and to suppress unnecessary signals and traversals.
0566: * @param node the node
0567: */
0568: private void setHead(Node node) {
0569: head = node;
0570: node.thread = null;
0571: node.prev = null;
0572: }
0573:
0574: /**
0575: * Wake up node's successor, if one exists.
0576: * @param node the node
0577: */
0578: private void unparkSuccessor(Node node) {
0579: /*
0580: * Try to clear status in anticipation of signalling. It is
0581: * OK if this fails or if status is changed by waiting thread.
0582: */
0583: compareAndSetWaitStatus(node, Node.SIGNAL, 0);
0584:
0585: /*
0586: * Thread to unpark is held in successor, which is normally
0587: * just the next node. But if cancelled or apparently null,
0588: * traverse backwards from tail to find the actual
0589: * non-cancelled successor.
0590: */
0591: Thread thread;
0592: Node s = node.next;
0593: if (s != null && s.waitStatus <= 0)
0594: thread = s.thread;
0595: else {
0596: thread = null;
0597: for (s = tail; s != null && s != node; s = s.prev)
0598: if (s.waitStatus <= 0)
0599: thread = s.thread;
0600: }
0601: LockSupport.unpark(thread);
0602: }
0603:
0604: /**
0605: * Set head of queue, and check if successor may be waiting
0606: * in shared mode, if so propagating if propagate > 0.
0607: * @param pred the node holding waitStatus for node
0608: * @param node the node
0609: * @param propagate the return value from a tryAcquireShared
0610: */
0611: private void setHeadAndPropagate(Node node, int propagate) {
0612: setHead(node);
0613: if (propagate > 0 && node.waitStatus != 0) {
0614: /*
0615: * Don't bother fully figuring out successor. If it
0616: * looks null, call unparkSuccessor anyway to be safe.
0617: */
0618: Node s = node.next;
0619: if (s == null || s.isShared())
0620: unparkSuccessor(node);
0621: }
0622: }
0623:
0624: // Utilities for various versions of acquire
0625:
0626: /**
0627: * Cancel an ongoing attempt to acquire.
0628: * @param node the node
0629: */
0630: private void cancelAcquire(Node node) {
0631: if (node != null) { // Ignore if node doesn't exist
0632: node.thread = null;
0633: // Can use unconditional write instead of CAS here
0634: node.waitStatus = Node.CANCELLED;
0635: unparkSuccessor(node);
0636: }
0637: }
0638:
0639: /**
0640: * Checks and updates status for a node that failed to acquire.
0641: * Returns true if thread should block. This is the main signal
0642: * control in all acquire loops. Requires that pred == node.prev
0643: * @param pred node's predecessor holding status
0644: * @param node the node
0645: * @return true if thread should block
0646: */
0647: private static boolean shouldParkAfterFailedAcquire(Node pred,
0648: Node node) {
0649: int s = pred.waitStatus;
0650: if (s < 0)
0651: /*
0652: * This node has already set status asking a release
0653: * to signal it, so it can safely park
0654: */
0655: return true;
0656: if (s > 0)
0657: /*
0658: * Predecessor was cancelled. Move up to its predecessor
0659: * and indicate retry.
0660: */
0661: node.prev = pred.prev;
0662: else
0663: /*
0664: * Indicate that we need a signal, but don't park yet. Caller
0665: * will need to retry to make sure it cannot acquire before
0666: * parking.
0667: */
0668: compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
0669: return false;
0670: }
0671:
0672: /**
0673: * Convenience method to interrupt current thread.
0674: */
0675: private static void selfInterrupt() {
0676: Thread.currentThread().interrupt();
0677: }
0678:
0679: /**
0680: * Convenience method to park and then check if interrupted
0681: * @return true if interrupted
0682: */
0683: private static boolean parkAndCheckInterrupt() {
0684: LockSupport.park();
0685: return Thread.interrupted();
0686: }
0687:
0688: /*
0689: * Various flavors of acquire, varying in exclusive/shared and
0690: * control modes. Each is mostly the same, but annoyingly
0691: * different. Only a little bit of factoring is possible due to
0692: * interactions of exception mechanics (including ensuring that we
0693: * cancel if tryAcquire throws exception) and other control, at
0694: * least not without hurting performance too much.
0695: */
0696:
0697: /**
0698: * Acquire in exclusive uninterruptible mode for thread already in
0699: * queue. Used by condition wait methods as well as acquire.
0700: * @param node the node
0701: * @param arg the acquire argument
0702: * @return true if interrupted while waiting
0703: */
0704: final boolean acquireQueued(final Node node, int arg) {
0705: try {
0706: boolean interrupted = false;
0707: for (;;) {
0708: final Node p = node.predecessor();
0709: if (p == head && tryAcquire(arg)) {
0710: setHead(node);
0711: p.next = null; // help GC
0712: return interrupted;
0713: }
0714: if (shouldParkAfterFailedAcquire(p, node)
0715: && parkAndCheckInterrupt())
0716: interrupted = true;
0717: }
0718: } catch (RuntimeException ex) {
0719: cancelAcquire(node);
0720: throw ex;
0721: }
0722: }
0723:
0724: /**
0725: * Acquire in exclusive interruptible mode
0726: * @param arg the acquire argument
0727: */
0728: private void doAcquireInterruptibly(int arg)
0729: throws InterruptedException {
0730: final Node node = addWaiter(Node.EXCLUSIVE);
0731: try {
0732: for (;;) {
0733: final Node p = node.predecessor();
0734: if (p == head && tryAcquire(arg)) {
0735: setHead(node);
0736: p.next = null; // help GC
0737: return;
0738: }
0739: if (shouldParkAfterFailedAcquire(p, node)
0740: && parkAndCheckInterrupt())
0741: break;
0742: }
0743: } catch (RuntimeException ex) {
0744: cancelAcquire(node);
0745: throw ex;
0746: }
0747: // Arrive here only if interrupted
0748: cancelAcquire(node);
0749: throw new InterruptedException();
0750: }
0751:
0752: /**
0753: * Acquire in exclusive timed mode
0754: * @param arg the acquire argument
0755: * @param nanosTimeout max wait time
0756: * @return true if acquired
0757: */
0758: private boolean doAcquireNanos(int arg, long nanosTimeout)
0759: throws InterruptedException {
0760: long lastTime = System.nanoTime();
0761: final Node node = addWaiter(Node.EXCLUSIVE);
0762: try {
0763: for (;;) {
0764: final Node p = node.predecessor();
0765: if (p == head && tryAcquire(arg)) {
0766: setHead(node);
0767: p.next = null; // help GC
0768: return true;
0769: }
0770: if (nanosTimeout <= 0) {
0771: cancelAcquire(node);
0772: return false;
0773: }
0774: if (shouldParkAfterFailedAcquire(p, node)) {
0775: LockSupport.parkNanos(nanosTimeout);
0776: if (Thread.interrupted())
0777: break;
0778: long now = System.nanoTime();
0779: nanosTimeout -= now - lastTime;
0780: lastTime = now;
0781: }
0782: }
0783: } catch (RuntimeException ex) {
0784: cancelAcquire(node);
0785: throw ex;
0786: }
0787: // Arrive here only if interrupted
0788: cancelAcquire(node);
0789: throw new InterruptedException();
0790: }
0791:
0792: /**
0793: * Acquire in shared uninterruptible mode
0794: * @param arg the acquire argument
0795: */
0796: private void doAcquireShared(int arg) {
0797: final Node node = addWaiter(Node.SHARED);
0798: try {
0799: boolean interrupted = false;
0800: for (;;) {
0801: final Node p = node.predecessor();
0802: if (p == head) {
0803: int r = tryAcquireShared(arg);
0804: if (r >= 0) {
0805: setHeadAndPropagate(node, r);
0806: p.next = null; // help GC
0807: if (interrupted)
0808: selfInterrupt();
0809: return;
0810: }
0811: }
0812: if (shouldParkAfterFailedAcquire(p, node)
0813: && parkAndCheckInterrupt())
0814: interrupted = true;
0815: }
0816: } catch (RuntimeException ex) {
0817: cancelAcquire(node);
0818: throw ex;
0819: }
0820: }
0821:
0822: /**
0823: * Acquire in shared interruptible mode
0824: * @param arg the acquire argument
0825: */
0826: private void doAcquireSharedInterruptibly(int arg)
0827: throws InterruptedException {
0828: final Node node = addWaiter(Node.SHARED);
0829: try {
0830: for (;;) {
0831: final Node p = node.predecessor();
0832: if (p == head) {
0833: int r = tryAcquireShared(arg);
0834: if (r >= 0) {
0835: setHeadAndPropagate(node, r);
0836: p.next = null; // help GC
0837: return;
0838: }
0839: }
0840: if (shouldParkAfterFailedAcquire(p, node)
0841: && parkAndCheckInterrupt())
0842: break;
0843: }
0844: } catch (RuntimeException ex) {
0845: cancelAcquire(node);
0846: throw ex;
0847: }
0848: // Arrive here only if interrupted
0849: cancelAcquire(node);
0850: throw new InterruptedException();
0851: }
0852:
0853: /**
0854: * Acquire in shared timed mode
0855: * @param arg the acquire argument
0856: * @param nanosTimeout max wait time
0857: * @return true if acquired
0858: */
0859: private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
0860: throws InterruptedException {
0861:
0862: long lastTime = System.nanoTime();
0863: final Node node = addWaiter(Node.SHARED);
0864: try {
0865: for (;;) {
0866: final Node p = node.predecessor();
0867: if (p == head) {
0868: int r = tryAcquireShared(arg);
0869: if (r >= 0) {
0870: setHeadAndPropagate(node, r);
0871: p.next = null; // help GC
0872: return true;
0873: }
0874: }
0875: if (nanosTimeout <= 0) {
0876: cancelAcquire(node);
0877: return false;
0878: }
0879: if (shouldParkAfterFailedAcquire(p, node)) {
0880: LockSupport.parkNanos(nanosTimeout);
0881: if (Thread.interrupted())
0882: break;
0883: long now = System.nanoTime();
0884: nanosTimeout -= now - lastTime;
0885: lastTime = now;
0886: }
0887: }
0888: } catch (RuntimeException ex) {
0889: cancelAcquire(node);
0890: throw ex;
0891: }
0892: // Arrive here only if interrupted
0893: cancelAcquire(node);
0894: throw new InterruptedException();
0895: }
0896:
0897: // Main exported methods
0898:
0899: /**
0900: * Attempts to acquire in exclusive mode. This method should query
0901: * if the state of the object permits it to be acquired in the
0902: * exclusive mode, and if so to acquire it.
0903: *
0904: * <p>This method is always invoked by the thread performing
0905: * acquire. If this method reports failure, the acquire method
0906: * may queue the thread, if it is not already queued, until it is
0907: * signalled by a release from some other thread. This can be used
0908: * to implement method {@link Lock#tryLock()}.
0909: *
0910: * <p>The default
0911: * implementation throws {@link UnsupportedOperationException}
0912: *
0913: * @param arg the acquire argument. This value
0914: * is always the one passed to an acquire method,
0915: * or is the value saved on entry to a condition wait.
0916: * The value is otherwise uninterpreted and can represent anything
0917: * you like.
0918: * @return true if successful. Upon success, this object has been
0919: * acquired.
0920: * @throws IllegalMonitorStateException if acquiring would place
0921: * this synchronizer in an illegal state. This exception must be
0922: * thrown in a consistent fashion for synchronization to work
0923: * correctly.
0924: * @throws UnsupportedOperationException if exclusive mode is not supported
0925: */
0926: protected boolean tryAcquire(int arg) {
0927: throw new UnsupportedOperationException();
0928: }
0929:
0930: /**
0931: * Attempts to set the state to reflect a release in exclusive
0932: * mode. <p>This method is always invoked by the thread
0933: * performing release.
0934: *
0935: * <p>The default implementation throws
0936: * {@link UnsupportedOperationException}
0937: * @param arg the release argument. This value
0938: * is always the one passed to a release method,
0939: * or the current state value upon entry to a condition wait.
0940: * The value is otherwise uninterpreted and can represent anything
0941: * you like.
0942: * @return <tt>true</tt> if this object is now in a fully released state,
0943: * so that any waiting threads may attempt to acquire; and <tt>false</tt>
0944: * otherwise.
0945: * @throws IllegalMonitorStateException if releasing would place
0946: * this synchronizer in an illegal state. This exception must be
0947: * thrown in a consistent fashion for synchronization to work
0948: * correctly.
0949: * @throws UnsupportedOperationException if exclusive mode is not supported
0950: */
0951: protected boolean tryRelease(int arg) {
0952: throw new UnsupportedOperationException();
0953: }
0954:
0955: /**
0956: * Attempts to acquire in shared mode. This method should query if
0957: * the state of the object permits it to be acquired in the shared
0958: * mode, and if so to acquire it.
0959: *
0960: * <p>This method is always invoked by the thread performing
0961: * acquire. If this method reports failure, the acquire method
0962: * may queue the thread, if it is not already queued, until it is
0963: * signalled by a release from some other thread.
0964: *
0965: * <p>The default implementation throws {@link
0966: * UnsupportedOperationException}
0967: *
0968: * @param arg the acquire argument. This value
0969: * is always the one passed to an acquire method,
0970: * or is the value saved on entry to a condition wait.
0971: * The value is otherwise uninterpreted and can represent anything
0972: * you like.
0973: * @return a negative value on failure, zero on exclusive success,
0974: * and a positive value if non-exclusively successful, in which
0975: * case a subsequent waiting thread must check
0976: * availability. (Support for three different return values
0977: * enables this method to be used in contexts where acquires only
0978: * sometimes act exclusively.) Upon success, this object has been
0979: * acquired.
0980: * @throws IllegalMonitorStateException if acquiring would place
0981: * this synchronizer in an illegal state. This exception must be
0982: * thrown in a consistent fashion for synchronization to work
0983: * correctly.
0984: * @throws UnsupportedOperationException if shared mode is not supported
0985: */
0986: protected int tryAcquireShared(int arg) {
0987: throw new UnsupportedOperationException();
0988: }
0989:
0990: /**
0991: * Attempts to set the state to reflect a release in shared mode.
0992: * <p>This method is always invoked by the thread performing release.
0993: * <p> The default implementation throws
0994: * {@link UnsupportedOperationException}
0995: * @param arg the release argument. This value
0996: * is always the one passed to a release method,
0997: * or the current state value upon entry to a condition wait.
0998: * The value is otherwise uninterpreted and can represent anything
0999: * you like.
1000: * @return <tt>true</tt> if this object is now in a fully released state,
1001: * so that any waiting threads may attempt to acquire; and <tt>false</tt>
1002: * otherwise.
1003: * @throws IllegalMonitorStateException if releasing would place
1004: * this synchronizer in an illegal state. This exception must be
1005: * thrown in a consistent fashion for synchronization to work
1006: * correctly.
1007: * @throws UnsupportedOperationException if shared mode is not supported
1008: */
1009: protected boolean tryReleaseShared(int arg) {
1010: throw new UnsupportedOperationException();
1011: }
1012:
1013: /**
1014: * Returns true if synchronization is held exclusively with respect
1015: * to the current (calling) thread. This method is invoked
1016: * upon each call to a non-waiting {@link ConditionObject} method.
1017: * (Waiting methods instead invoke {@link #release}.)
1018: * <p>The default implementation throws {@link
1019: * UnsupportedOperationException}. This method is invoked
1020: * internally only within {@link ConditionObject} methods, so need
1021: * not be defined if conditions are not used.
1022: *
1023: * @return true if synchronization is held exclusively;
1024: * else false
1025: * @throws UnsupportedOperationException if conditions are not supported
1026: */
1027: protected boolean isHeldExclusively() {
1028: throw new UnsupportedOperationException();
1029: }
1030:
1031: /**
1032: * Acquires in exclusive mode, ignoring interrupts. Implemented
1033: * by invoking at least once {@link #tryAcquire},
1034: * returning on success. Otherwise the thread is queued, possibly
1035: * repeatedly blocking and unblocking, invoking {@link
1036: * #tryAcquire} until success. This method can be used
1037: * to implement method {@link Lock#lock}
1038: * @param arg the acquire argument.
1039: * This value is conveyed to {@link #tryAcquire} but is
1040: * otherwise uninterpreted and can represent anything
1041: * you like.
1042: */
1043: public final void acquire(int arg) {
1044: if (!tryAcquire(arg)
1045: && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
1046: selfInterrupt();
1047: }
1048:
1049: /**
1050: * Acquires in exclusive mode, aborting if interrupted.
1051: * Implemented by first checking interrupt status, then invoking
1052: * at least once {@link #tryAcquire}, returning on
1053: * success. Otherwise the thread is queued, possibly repeatedly
1054: * blocking and unblocking, invoking {@link #tryAcquire}
1055: * until success or the thread is interrupted. This method can be
1056: * used to implement method {@link Lock#lockInterruptibly}
1057: * @param arg the acquire argument.
1058: * This value is conveyed to {@link #tryAcquire} but is
1059: * otherwise uninterpreted and can represent anything
1060: * you like.
1061: * @throws InterruptedException if the current thread is interrupted
1062: */
1063: public final void acquireInterruptibly(int arg)
1064: throws InterruptedException {
1065: if (Thread.interrupted())
1066: throw new InterruptedException();
1067: if (!tryAcquire(arg))
1068: doAcquireInterruptibly(arg);
1069: }
1070:
1071: /**
1072: * Attempts to acquire in exclusive mode, aborting if interrupted,
1073: * and failing if the given timeout elapses. Implemented by first
1074: * checking interrupt status, then invoking at least once {@link
1075: * #tryAcquire}, returning on success. Otherwise, the thread is
1076: * queued, possibly repeatedly blocking and unblocking, invoking
1077: * {@link #tryAcquire} until success or the thread is interrupted
1078: * or the timeout elapses. This method can be used to implement
1079: * method {@link Lock#tryLock(long, TimeUnit)}.
1080: * @param arg the acquire argument.
1081: * This value is conveyed to {@link #tryAcquire} but is
1082: * otherwise uninterpreted and can represent anything
1083: * you like.
1084: * @param nanosTimeout the maximum number of nanoseconds to wait
1085: * @return true if acquired; false if timed out
1086: * @throws InterruptedException if the current thread is interrupted
1087: */
1088: public final boolean tryAcquireNanos(int arg, long nanosTimeout)
1089: throws InterruptedException {
1090: if (Thread.interrupted())
1091: throw new InterruptedException();
1092: return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
1093: }
1094:
1095: /**
1096: * Releases in exclusive mode. Implemented by unblocking one or
1097: * more threads if {@link #tryRelease} returns true.
1098: * This method can be used to implement method {@link Lock#unlock}
1099: * @param arg the release argument.
1100: * This value is conveyed to {@link #tryRelease} but is
1101: * otherwise uninterpreted and can represent anything
1102: * you like.
1103: * @return the value returned from {@link #tryRelease}
1104: */
1105: public final boolean release(int arg) {
1106: if (tryRelease(arg)) {
1107: Node h = head;
1108: if (h != null && h.waitStatus != 0)
1109: unparkSuccessor(h);
1110: return true;
1111: }
1112: return false;
1113: }
1114:
1115: /**
1116: * Acquires in shared mode, ignoring interrupts. Implemented by
1117: * first invoking at least once {@link #tryAcquireShared},
1118: * returning on success. Otherwise the thread is queued, possibly
1119: * repeatedly blocking and unblocking, invoking {@link
1120: * #tryAcquireShared} until success.
1121: * @param arg the acquire argument.
1122: * This value is conveyed to {@link #tryAcquireShared} but is
1123: * otherwise uninterpreted and can represent anything
1124: * you like.
1125: */
1126: public final void acquireShared(int arg) {
1127: if (tryAcquireShared(arg) < 0)
1128: doAcquireShared(arg);
1129: }
1130:
1131: /**
1132: * Acquires in shared mode, aborting if interrupted. Implemented
1133: * by first checking interrupt status, then invoking at least once
1134: * {@link #tryAcquireShared}, returning on success. Otherwise the
1135: * thread is queued, possibly repeatedly blocking and unblocking,
1136: * invoking {@link #tryAcquireShared} until success or the thread
1137: * is interrupted.
1138: * @param arg the acquire argument.
1139: * This value is conveyed to {@link #tryAcquireShared} but is
1140: * otherwise uninterpreted and can represent anything
1141: * you like.
1142: * @throws InterruptedException if the current thread is interrupted
1143: */
1144: public final void acquireSharedInterruptibly(int arg)
1145: throws InterruptedException {
1146: if (Thread.interrupted())
1147: throw new InterruptedException();
1148: if (tryAcquireShared(arg) < 0)
1149: doAcquireSharedInterruptibly(arg);
1150: }
1151:
1152: /**
1153: * Attempts to acquire in shared mode, aborting if interrupted, and
1154: * failing if the given timeout elapses. Implemented by first
1155: * checking interrupt status, then invoking at least once {@link
1156: * #tryAcquireShared}, returning on success. Otherwise, the
1157: * thread is queued, possibly repeatedly blocking and unblocking,
1158: * invoking {@link #tryAcquireShared} until success or the thread
1159: * is interrupted or the timeout elapses.
1160: * @param arg the acquire argument.
1161: * This value is conveyed to {@link #tryAcquireShared} but is
1162: * otherwise uninterpreted and can represent anything
1163: * you like.
1164: * @param nanosTimeout the maximum number of nanoseconds to wait
1165: * @return true if acquired; false if timed out
1166: * @throws InterruptedException if the current thread is interrupted
1167: */
1168: public final boolean tryAcquireSharedNanos(int arg,
1169: long nanosTimeout) throws InterruptedException {
1170: if (Thread.interrupted())
1171: throw new InterruptedException();
1172: return tryAcquireShared(arg) >= 0
1173: || doAcquireSharedNanos(arg, nanosTimeout);
1174: }
1175:
1176: /**
1177: * Releases in shared mode. Implemented by unblocking one or more
1178: * threads if {@link #tryReleaseShared} returns true.
1179: * @param arg the release argument.
1180: * This value is conveyed to {@link #tryReleaseShared} but is
1181: * otherwise uninterpreted and can represent anything
1182: * you like.
1183: * @return the value returned from {@link #tryReleaseShared}
1184: */
1185: public final boolean releaseShared(int arg) {
1186: if (tryReleaseShared(arg)) {
1187: Node h = head;
1188: if (h != null && h.waitStatus != 0)
1189: unparkSuccessor(h);
1190: return true;
1191: }
1192: return false;
1193: }
1194:
1195: // Queue inspection methods
1196:
1197: /**
1198: * Queries whether any threads are waiting to acquire. Note that
1199: * because cancellations due to interrupts and timeouts may occur
1200: * at any time, a <tt>true</tt> return does not guarantee that any
1201: * other thread will ever acquire.
1202: *
1203: * <p> In this implementation, this operation returns in
1204: * constant time.
1205: *
1206: * @return true if there may be other threads waiting to acquire
1207: * the lock.
1208: */
1209: public final boolean hasQueuedThreads() {
1210: return head != tail;
1211: }
1212:
1213: /**
1214: * Queries whether any threads have ever contended to acquire this
1215: * synchronizer; that is if an acquire method has ever blocked.
1216: *
1217: * <p> In this implementation, this operation returns in
1218: * constant time.
1219: *
1220: * @return true if there has ever been contention
1221: */
1222: public final boolean hasContended() {
1223: return head != null;
1224: }
1225:
1226: /**
1227: * Returns the first (longest-waiting) thread in the queue, or
1228: * <tt>null</tt> if no threads are currently queued.
1229: *
1230: * <p> In this implementation, this operation normally returns in
1231: * constant time, but may iterate upon contention if other threads are
1232: * concurrently modifying the queue.
1233: *
1234: * @return the first (longest-waiting) thread in the queue, or
1235: * <tt>null</tt> if no threads are currently queued.
1236: */
1237: public final Thread getFirstQueuedThread() {
1238: // handle only fast path, else relay
1239: return (head == tail) ? null : fullGetFirstQueuedThread();
1240: }
1241:
1242: /**
1243: * Version of getFirstQueuedThread called when fastpath fails
1244: */
1245: private Thread fullGetFirstQueuedThread() {
1246: /*
1247: * This loops only if the queue changes while we read sets of
1248: * fields.
1249: */
1250: for (;;) {
1251: Node h = head;
1252: if (h == null) // No queue
1253: return null;
1254:
1255: /*
1256: * The first node is normally h.next. Try to get its
1257: * thread field, ensuring consistent reads: If thread
1258: * field is nulled out or s.prev is no longer head, then
1259: * some other thread(s) concurrently performed setHead in
1260: * between some of our reads, so we must reread.
1261: */
1262: Node s = h.next;
1263: if (s != null) {
1264: Thread st = s.thread;
1265: Node sp = s.prev;
1266: if (st != null && sp == head)
1267: return st;
1268: }
1269:
1270: /*
1271: * Head's next field might not have been set yet, or may
1272: * have been unset after setHead. So we must check to see
1273: * if tail is actually first node, in almost the same way
1274: * as above.
1275: */
1276: Node t = tail;
1277: if (t == h) // Empty queue
1278: return null;
1279:
1280: if (t != null) {
1281: Thread tt = t.thread;
1282: Node tp = t.prev;
1283: if (tt != null && tp == head)
1284: return tt;
1285: }
1286: }
1287: }
1288:
1289: /**
1290: * Returns true if the given thread is currently queued.
1291: *
1292: * <p> This implementation traverses the queue to determine
1293: * presence of the given thread.
1294: *
1295: * @param thread the thread
1296: * @return true if the given thread in on the queue
1297: * @throws NullPointerException if thread null
1298: */
1299: public final boolean isQueued(Thread thread) {
1300: if (thread == null)
1301: throw new NullPointerException();
1302: for (Node p = tail; p != null; p = p.prev)
1303: if (p.thread == thread)
1304: return true;
1305: return false;
1306: }
1307:
1308: // Instrumentation and monitoring methods
1309:
1310: /**
1311: * Returns an estimate of the number of threads waiting to
1312: * acquire. The value is only an estimate because the number of
1313: * threads may change dynamically while this method traverses
1314: * internal data structures. This method is designed for use in
1315: * monitoring system state, not for synchronization
1316: * control.
1317: *
1318: * @return the estimated number of threads waiting for this lock
1319: */
1320: public final int getQueueLength() {
1321: int n = 0;
1322: for (Node p = tail; p != null; p = p.prev) {
1323: if (p.thread != null)
1324: ++n;
1325: }
1326: return n;
1327: }
1328:
1329: /**
1330: * Returns a collection containing threads that may be waiting to
1331: * acquire. Because the actual set of threads may change
1332: * dynamically while constructing this result, the returned
1333: * collection is only a best-effort estimate. The elements of the
1334: * returned collection are in no particular order. This method is
1335: * designed to facilitate construction of subclasses that provide
1336: * more extensive monitoring facilities.
1337: * @return the collection of threads
1338: */
1339: public final Collection<Thread> getQueuedThreads() {
1340: ArrayList<Thread> list = new ArrayList<Thread>();
1341: for (Node p = tail; p != null; p = p.prev) {
1342: Thread t = p.thread;
1343: if (t != null)
1344: list.add(t);
1345: }
1346: return list;
1347: }
1348:
1349: /**
1350: * Returns a collection containing threads that may be waiting to
1351: * acquire in exclusive mode. This has the same properties
1352: * as {@link #getQueuedThreads} except that it only returns
1353: * those threads waiting due to an exclusive acquire.
1354: * @return the collection of threads
1355: */
1356: public final Collection<Thread> getExclusiveQueuedThreads() {
1357: ArrayList<Thread> list = new ArrayList<Thread>();
1358: for (Node p = tail; p != null; p = p.prev) {
1359: if (!p.isShared()) {
1360: Thread t = p.thread;
1361: if (t != null)
1362: list.add(t);
1363: }
1364: }
1365: return list;
1366: }
1367:
1368: /**
1369: * Returns a collection containing threads that may be waiting to
1370: * acquire in shared mode. This has the same properties
1371: * as {@link #getQueuedThreads} except that it only returns
1372: * those threads waiting due to a shared acquire.
1373: * @return the collection of threads
1374: */
1375: public final Collection<Thread> getSharedQueuedThreads() {
1376: ArrayList<Thread> list = new ArrayList<Thread>();
1377: for (Node p = tail; p != null; p = p.prev) {
1378: if (p.isShared()) {
1379: Thread t = p.thread;
1380: if (t != null)
1381: list.add(t);
1382: }
1383: }
1384: return list;
1385: }
1386:
1387: /**
1388: * Returns a string identifying this synchronizer, as well as its
1389: * state. The state, in brackets, includes the String "State
1390: * =" followed by the current value of {@link #getState}, and
1391: * either "nonempty" or "empty" depending on
1392: * whether the queue is empty.
1393: *
1394: * @return a string identifying this synchronizer, as well as its state.
1395: */
1396: public String toString() {
1397: int s = getState();
1398: String q = hasQueuedThreads() ? "non" : "";
1399: return super .toString() + "[State = " + s + ", " + q
1400: + "empty queue]";
1401: }
1402:
1403: // Internal support methods for Conditions
1404:
1405: /**
1406: * Returns true if a node, always one that was initially placed on
1407: * a condition queue, is now waiting to reacquire on sync queue.
1408: * @param node the node
1409: * @return true if is reacquiring
1410: */
1411: final boolean isOnSyncQueue(Node node) {
1412: if (node.waitStatus == Node.CONDITION || node.prev == null)
1413: return false;
1414: if (node.next != null) // If has successor, it must be on queue
1415: return true;
1416: /*
1417: * node.prev can be non-null, but not yet on queue because
1418: * the CAS to place it on queue can fail. So we have to
1419: * traverse from tail to make sure it actually made it. It
1420: * will always be near the tail in calls to this method, and
1421: * unless the CAS failed (which is unlikely), it will be
1422: * there, so we hardly ever traverse much.
1423: */
1424: return findNodeFromTail(node);
1425: }
1426:
1427: /**
1428: * Returns true if node is on sync queue by searching backwards from tail.
1429: * Called only when needed by isOnSyncQueue.
1430: * @return true if present
1431: */
1432: private boolean findNodeFromTail(Node node) {
1433: Node t = tail;
1434: for (;;) {
1435: if (t == node)
1436: return true;
1437: if (t == null)
1438: return false;
1439: t = t.prev;
1440: }
1441: }
1442:
1443: /**
1444: * Transfers a node from a condition queue onto sync queue.
1445: * Returns true if successful.
1446: * @param node the node
1447: * @return true if successfully transferred (else the node was
1448: * cancelled before signal).
1449: */
1450: final boolean transferForSignal(Node node) {
1451: /*
1452: * If cannot change waitStatus, the node has been cancelled.
1453: */
1454: if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
1455: return false;
1456:
1457: /*
1458: * Splice onto queue and try to set waitStatus of predecessor to
1459: * indicate that thread is (probably) waiting. If cancelled or
1460: * attempt to set waitStatus fails, wake up to resync (in which
1461: * case the waitStatus can be transiently and harmlessly wrong).
1462: */
1463: Node p = enq(node);
1464: int c = p.waitStatus;
1465: if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
1466: LockSupport.unpark(node.thread);
1467: return true;
1468: }
1469:
1470: /**
1471: * Transfers node, if necessary, to sync queue after a cancelled
1472: * wait. Returns true if thread was cancelled before being
1473: * signalled.
1474: * @param current the waiting thread
1475: * @param node its node
1476: * @return true if cancelled before the node was signalled.
1477: */
1478: final boolean transferAfterCancelledWait(Node node) {
1479: if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
1480: enq(node);
1481: return true;
1482: }
1483: /*
1484: * If we lost out to a signal(), then we can't proceed
1485: * until it finishes its enq(). Cancelling during an
1486: * incomplete transfer is both rare and transient, so just
1487: * spin.
1488: */
1489: while (!isOnSyncQueue(node))
1490: Thread.yield();
1491: return false;
1492: }
1493:
1494: /**
1495: * Invoke release with current state value; return saved state.
1496: * Cancel node and throw exception on failure.
1497: * @param node the condition node for this wait
1498: * @return previous sync state
1499: */
1500: final int fullyRelease(Node node) {
1501: try {
1502: int savedState = getState();
1503: if (release(savedState))
1504: return savedState;
1505: } catch (RuntimeException ex) {
1506: node.waitStatus = Node.CANCELLED;
1507: throw ex;
1508: }
1509: // reach here if release fails
1510: node.waitStatus = Node.CANCELLED;
1511: throw new IllegalMonitorStateException();
1512: }
1513:
1514: // Instrumentation methods for conditions
1515:
1516: /**
1517: * Queries whether the given ConditionObject
1518: * uses this synchronizer as its lock.
1519: * @param condition the condition
1520: * @return <tt>true</tt> if owned
1521: * @throws NullPointerException if condition null
1522: */
1523: public final boolean owns(ConditionObject condition) {
1524: if (condition == null)
1525: throw new NullPointerException();
1526: return condition.isOwnedBy(this );
1527: }
1528:
1529: /**
1530: * Queries whether any threads are waiting on the given condition
1531: * associated with this synchronizer. Note that because timeouts
1532: * and interrupts may occur at any time, a <tt>true</tt> return
1533: * does not guarantee that a future <tt>signal</tt> will awaken
1534: * any threads. This method is designed primarily for use in
1535: * monitoring of the system state.
1536: * @param condition the condition
1537: * @return <tt>true</tt> if there are any waiting threads.
1538: * @throws IllegalMonitorStateException if exclusive synchronization
1539: * is not held
1540: * @throws IllegalArgumentException if the given condition is
1541: * not associated with this synchronizer
1542: * @throws NullPointerException if condition null
1543: */
1544: public final boolean hasWaiters(ConditionObject condition) {
1545: if (!owns(condition))
1546: throw new IllegalArgumentException("Not owner");
1547: return condition.hasWaiters();
1548: }
1549:
1550: /**
1551: * Returns an estimate of the number of threads waiting on the
1552: * given condition associated with this synchronizer. Note that
1553: * because timeouts and interrupts may occur at any time, the
1554: * estimate serves only as an upper bound on the actual number of
1555: * waiters. This method is designed for use in monitoring of the
1556: * system state, not for synchronization control.
1557: * @param condition the condition
1558: * @return the estimated number of waiting threads.
1559: * @throws IllegalMonitorStateException if exclusive synchronization
1560: * is not held
1561: * @throws IllegalArgumentException if the given condition is
1562: * not associated with this synchronizer
1563: * @throws NullPointerException if condition null
1564: */
1565: public final int getWaitQueueLength(ConditionObject condition) {
1566: if (!owns(condition))
1567: throw new IllegalArgumentException("Not owner");
1568: return condition.getWaitQueueLength();
1569: }
1570:
1571: /**
1572: * Returns a collection containing those threads that may be
1573: * waiting on the given condition associated with this
1574: * synchronizer. Because the actual set of threads may change
1575: * dynamically while constructing this result, the returned
1576: * collection is only a best-effort estimate. The elements of the
1577: * returned collection are in no particular order.
1578: * @param condition the condition
1579: * @return the collection of threads
1580: * @throws IllegalMonitorStateException if exclusive synchronization
1581: * is not held
1582: * @throws IllegalArgumentException if the given condition is
1583: * not associated with this synchronizer
1584: * @throws NullPointerException if condition null
1585: */
1586: public final Collection<Thread> getWaitingThreads(
1587: ConditionObject condition) {
1588: if (!owns(condition))
1589: throw new IllegalArgumentException("Not owner");
1590: return condition.getWaitingThreads();
1591: }
1592:
1593: /**
1594: * Condition implementation for a {@link
1595: * AbstractQueuedSynchronizer} serving as the basis of a {@link
1596: * Lock} implementation.
1597: *
1598: * <p> Method documentation for this class describes mechanics,
1599: * not behavioral specifications from the point of view of Lock
1600: * and Condition users. Exported versions of this class will in
1601: * general need to be accompanied by documentation describing
1602: * condition semantics that rely on those of the associated
1603: * <tt>AbstractQueuedSynchronizer</tt>.
1604: *
1605: * <p> This class is Serializable, but all fields are transient,
1606: * so deserialized conditions have no waiters.
1607: */
1608: public class ConditionObject implements Condition,
1609: java.io.Serializable {
1610: private static final long serialVersionUID = 1173984872572414699L;
1611: /** First node of condition queue. */
1612: private transient Node firstWaiter;
1613: /** Last node of condition queue. */
1614: private transient Node lastWaiter;
1615:
1616: /**
1617: * Creates a new <tt>ConditionObject</tt> instance.
1618: */
1619: public ConditionObject() {
1620: }
1621:
1622: // Internal methods
1623:
1624: /**
1625: * Add a new waiter to wait queue
1626: * @return its new wait node
1627: */
1628: private Node addConditionWaiter() {
1629: Node node = new Node(Thread.currentThread(), Node.CONDITION);
1630: Node t = lastWaiter;
1631: if (t == null)
1632: firstWaiter = node;
1633: else
1634: t.nextWaiter = node;
1635: lastWaiter = node;
1636: return node;
1637: }
1638:
1639: /**
1640: * Remove and transfer nodes until hit non-cancelled one or
1641: * null. Split out from signal in part to encourage compilers
1642: * to inline the case of no waiters.
1643: * @param first (non-null) the first node on condition queue
1644: */
1645: private void doSignal(Node first) {
1646: do {
1647: if ((firstWaiter = first.nextWaiter) == null)
1648: lastWaiter = null;
1649: first.nextWaiter = null;
1650: } while (!transferForSignal(first)
1651: && (first = firstWaiter) != null);
1652: }
1653:
1654: /**
1655: * Remove and transfer all nodes.
1656: * @param first (non-null) the first node on condition queue
1657: */
1658: private void doSignalAll(Node first) {
1659: lastWaiter = firstWaiter = null;
1660: do {
1661: Node next = first.nextWaiter;
1662: first.nextWaiter = null;
1663: transferForSignal(first);
1664: first = next;
1665: } while (first != null);
1666: }
1667:
1668: // public methods
1669:
1670: /**
1671: * Moves the longest-waiting thread, if one exists, from the
1672: * wait queue for this condition to the wait queue for the
1673: * owning lock.
1674: * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1675: * returns false
1676: */
1677: public final void signal() {
1678: if (!isHeldExclusively())
1679: throw new IllegalMonitorStateException();
1680: Node first = firstWaiter;
1681: if (first != null)
1682: doSignal(first);
1683: }
1684:
1685: /**
1686: * Moves all threads from the wait queue for this condition to
1687: * the wait queue for the owning lock.
1688: * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1689: * returns false
1690: */
1691: public final void signalAll() {
1692: if (!isHeldExclusively())
1693: throw new IllegalMonitorStateException();
1694: Node first = firstWaiter;
1695: if (first != null)
1696: doSignalAll(first);
1697: }
1698:
1699: /**
1700: * Implements uninterruptible condition wait.
1701: * <ol>
1702: * <li> Save lock state returned by {@link #getState}
1703: * <li> Invoke {@link #release} with
1704: * saved state as argument, throwing
1705: * IllegalMonitorStateException if it fails.
1706: * <li> Block until signalled
1707: * <li> Reacquire by invoking specialized version of
1708: * {@link #acquire} with saved state as argument.
1709: * </ol>
1710: */
1711: public final void awaitUninterruptibly() {
1712: Node node = addConditionWaiter();
1713: int savedState = fullyRelease(node);
1714: boolean interrupted = false;
1715: while (!isOnSyncQueue(node)) {
1716: LockSupport.park();
1717: if (Thread.interrupted())
1718: interrupted = true;
1719: }
1720: if (acquireQueued(node, savedState) || interrupted)
1721: selfInterrupt();
1722: }
1723:
1724: /*
1725: * For interruptible waits, we need to track whether to throw
1726: * InterruptedException, if interrupted while blocked on
1727: * condition, versus reinterrupt current thread, if
1728: * interrupted while blocked waiting to re-acquire.
1729: */
1730:
1731: /** Mode meaning to reinterrupt on exit from wait */
1732: private static final int REINTERRUPT = 1;
1733: /** Mode meaning to throw InterruptedException on exit from wait */
1734: private static final int THROW_IE = -1;
1735:
1736: /**
1737: * Check for interrupt, returning THROW_IE if interrupted
1738: * before signalled, REINTERRUPT if after signalled, or
1739: * 0 if not interrupted.
1740: */
1741: private int checkInterruptWhileWaiting(Node node) {
1742: return (Thread.interrupted()) ? ((transferAfterCancelledWait(node)) ? THROW_IE
1743: : REINTERRUPT)
1744: : 0;
1745: }
1746:
1747: /**
1748: * Throw InterruptedException, reinterrupt current thread, or
1749: * do nothing, depending on mode.
1750: */
1751: private void reportInterruptAfterWait(int interruptMode)
1752: throws InterruptedException {
1753: if (interruptMode == THROW_IE)
1754: throw new InterruptedException();
1755: else if (interruptMode == REINTERRUPT)
1756: Thread.currentThread().interrupt();
1757: }
1758:
1759: /**
1760: * Implements interruptible condition wait.
1761: * <ol>
1762: * <li> If current thread is interrupted, throw InterruptedException
1763: * <li> Save lock state returned by {@link #getState}
1764: * <li> Invoke {@link #release} with
1765: * saved state as argument, throwing
1766: * IllegalMonitorStateException if it fails.
1767: * <li> Block until signalled or interrupted
1768: * <li> Reacquire by invoking specialized version of
1769: * {@link #acquire} with saved state as argument.
1770: * <li> If interrupted while blocked in step 4, throw exception
1771: * </ol>
1772: */
1773: public final void await() throws InterruptedException {
1774: if (Thread.interrupted())
1775: throw new InterruptedException();
1776: Node node = addConditionWaiter();
1777: int savedState = fullyRelease(node);
1778: int interruptMode = 0;
1779: while (!isOnSyncQueue(node)) {
1780: LockSupport.park();
1781: if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1782: break;
1783: }
1784: if (acquireQueued(node, savedState)
1785: && interruptMode != THROW_IE)
1786: interruptMode = REINTERRUPT;
1787: if (interruptMode != 0)
1788: reportInterruptAfterWait(interruptMode);
1789: }
1790:
1791: /**
1792: * Implements timed condition wait.
1793: * <ol>
1794: * <li> If current thread is interrupted, throw InterruptedException
1795: * <li> Save lock state returned by {@link #getState}
1796: * <li> Invoke {@link #release} with
1797: * saved state as argument, throwing
1798: * IllegalMonitorStateException if it fails.
1799: * <li> Block until signalled, interrupted, or timed out
1800: * <li> Reacquire by invoking specialized version of
1801: * {@link #acquire} with saved state as argument.
1802: * <li> If interrupted while blocked in step 4, throw InterruptedException
1803: * </ol>
1804: */
1805: public final long awaitNanos(long nanosTimeout)
1806: throws InterruptedException {
1807: if (Thread.interrupted())
1808: throw new InterruptedException();
1809: Node node = addConditionWaiter();
1810: int savedState = fullyRelease(node);
1811: long lastTime = System.nanoTime();
1812: int interruptMode = 0;
1813: while (!isOnSyncQueue(node)) {
1814: if (nanosTimeout <= 0L) {
1815: transferAfterCancelledWait(node);
1816: break;
1817: }
1818: LockSupport.parkNanos(nanosTimeout);
1819: if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1820: break;
1821: long now = System.nanoTime();
1822: nanosTimeout -= now - lastTime;
1823: lastTime = now;
1824: }
1825: if (acquireQueued(node, savedState)
1826: && interruptMode != THROW_IE)
1827: interruptMode = REINTERRUPT;
1828: if (interruptMode != 0)
1829: reportInterruptAfterWait(interruptMode);
1830: return nanosTimeout - (System.nanoTime() - lastTime);
1831: }
1832:
1833: /**
1834: * Implements absolute timed condition wait.
1835: * <ol>
1836: * <li> If current thread is interrupted, throw InterruptedException
1837: * <li> Save lock state returned by {@link #getState}
1838: * <li> Invoke {@link #release} with
1839: * saved state as argument, throwing
1840: * IllegalMonitorStateException if it fails.
1841: * <li> Block until signalled, interrupted, or timed out
1842: * <li> Reacquire by invoking specialized version of
1843: * {@link #acquire} with saved state as argument.
1844: * <li> If interrupted while blocked in step 4, throw InterruptedException
1845: * <li> If timed out while blocked in step 4, return false, else true
1846: * </ol>
1847: */
1848: public final boolean awaitUntil(Date deadline)
1849: throws InterruptedException {
1850: if (deadline == null)
1851: throw new NullPointerException();
1852: long abstime = deadline.getTime();
1853: if (Thread.interrupted())
1854: throw new InterruptedException();
1855: Node node = addConditionWaiter();
1856: int savedState = fullyRelease(node);
1857: boolean timedout = false;
1858: int interruptMode = 0;
1859: while (!isOnSyncQueue(node)) {
1860: if (System.currentTimeMillis() > abstime) {
1861: timedout = transferAfterCancelledWait(node);
1862: break;
1863: }
1864: LockSupport.parkUntil(abstime);
1865: if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1866: break;
1867: }
1868: if (acquireQueued(node, savedState)
1869: && interruptMode != THROW_IE)
1870: interruptMode = REINTERRUPT;
1871: if (interruptMode != 0)
1872: reportInterruptAfterWait(interruptMode);
1873: return !timedout;
1874: }
1875:
1876: /**
1877: * Implements timed condition wait.
1878: * <ol>
1879: * <li> If current thread is interrupted, throw InterruptedException
1880: * <li> Save lock state returned by {@link #getState}
1881: * <li> Invoke {@link #release} with
1882: * saved state as argument, throwing
1883: * IllegalMonitorStateException if it fails.
1884: * <li> Block until signalled, interrupted, or timed out
1885: * <li> Reacquire by invoking specialized version of
1886: * {@link #acquire} with saved state as argument.
1887: * <li> If interrupted while blocked in step 4, throw InterruptedException
1888: * <li> If timed out while blocked in step 4, return false, else true
1889: * </ol>
1890: */
1891: public final boolean await(long time, TimeUnit unit)
1892: throws InterruptedException {
1893: if (unit == null)
1894: throw new NullPointerException();
1895: long nanosTimeout = unit.toNanos(time);
1896: if (Thread.interrupted())
1897: throw new InterruptedException();
1898: Node node = addConditionWaiter();
1899: int savedState = fullyRelease(node);
1900: long lastTime = System.nanoTime();
1901: boolean timedout = false;
1902: int interruptMode = 0;
1903: while (!isOnSyncQueue(node)) {
1904: if (nanosTimeout <= 0L) {
1905: timedout = transferAfterCancelledWait(node);
1906: break;
1907: }
1908: LockSupport.parkNanos(nanosTimeout);
1909: if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1910: break;
1911: long now = System.nanoTime();
1912: nanosTimeout -= now - lastTime;
1913: lastTime = now;
1914: }
1915: if (acquireQueued(node, savedState)
1916: && interruptMode != THROW_IE)
1917: interruptMode = REINTERRUPT;
1918: if (interruptMode != 0)
1919: reportInterruptAfterWait(interruptMode);
1920: return !timedout;
1921: }
1922:
1923: // support for instrumentation
1924:
1925: /**
1926: * Returns true if this condition was created by the given
1927: * synchronization object
1928: * @return true if owned
1929: */
1930: final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
1931: return sync == AbstractQueuedSynchronizer.this ;
1932: }
1933:
1934: /**
1935: * Queries whether any threads are waiting on this condition.
1936: * Implements {@link AbstractQueuedSynchronizer#hasWaiters}
1937: * @return <tt>true</tt> if there are any waiting threads.
1938: * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1939: * returns false
1940: */
1941: protected final boolean hasWaiters() {
1942: if (!isHeldExclusively())
1943: throw new IllegalMonitorStateException();
1944: for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1945: if (w.waitStatus == Node.CONDITION)
1946: return true;
1947: }
1948: return false;
1949: }
1950:
1951: /**
1952: * Returns an estimate of the number of threads waiting on
1953: * this condition.
1954: * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength}
1955: * @return the estimated number of waiting threads.
1956: * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1957: * returns false
1958: */
1959: protected final int getWaitQueueLength() {
1960: if (!isHeldExclusively())
1961: throw new IllegalMonitorStateException();
1962: int n = 0;
1963: for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1964: if (w.waitStatus == Node.CONDITION)
1965: ++n;
1966: }
1967: return n;
1968: }
1969:
1970: /**
1971: * Returns a collection containing those threads that may be
1972: * waiting on this Condition.
1973: * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads}
1974: * @return the collection of threads
1975: * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1976: * returns false
1977: */
1978: protected final Collection<Thread> getWaitingThreads() {
1979: if (!isHeldExclusively())
1980: throw new IllegalMonitorStateException();
1981: ArrayList<Thread> list = new ArrayList<Thread>();
1982: for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1983: if (w.waitStatus == Node.CONDITION) {
1984: Thread t = w.thread;
1985: if (t != null)
1986: list.add(t);
1987: }
1988: }
1989: return list;
1990: }
1991: }
1992:
1993: /**
1994: * Setup to support compareAndSet. We need to natively implement
1995: * this here: For the sake of permitting future enhancements, we
1996: * cannot explicitly subclass AtomicInteger, which would be
1997: * efficient and useful otherwise. So, as the lesser of evils, we
1998: * natively implement using hotspot intrinsics API. And while we
1999: * are at it, we do the same for other CASable fields (which could
2000: * otherwise be done with atomic field updaters).
2001: */
2002: private static final Unsafe unsafe = Unsafe.getUnsafe();
2003: private static final long stateOffset;
2004: private static final long headOffset;
2005: private static final long tailOffset;
2006: private static final long waitStatusOffset;
2007:
2008: static {
2009: try {
2010: stateOffset = unsafe
2011: .objectFieldOffset(AbstractQueuedSynchronizer.class
2012: .getDeclaredField("state"));
2013: headOffset = unsafe
2014: .objectFieldOffset(AbstractQueuedSynchronizer.class
2015: .getDeclaredField("head"));
2016: tailOffset = unsafe
2017: .objectFieldOffset(AbstractQueuedSynchronizer.class
2018: .getDeclaredField("tail"));
2019: waitStatusOffset = unsafe.objectFieldOffset(Node.class
2020: .getDeclaredField("waitStatus"));
2021:
2022: } catch (Exception ex) {
2023: throw new Error(ex);
2024: }
2025: }
2026:
2027: /**
2028: * CAS head field. Used only by enq
2029: */
2030: private final boolean compareAndSetHead(Node update) {
2031: return unsafe.compareAndSwapObject(this , headOffset, null,
2032: update);
2033: }
2034:
2035: /**
2036: * CAS tail field. Used only by enq
2037: */
2038: private final boolean compareAndSetTail(Node expect, Node update) {
2039: return unsafe.compareAndSwapObject(this , tailOffset, expect,
2040: update);
2041: }
2042:
2043: /**
2044: * CAS waitStatus field of a node.
2045: */
2046: private final static boolean compareAndSetWaitStatus(Node node,
2047: int expect, int update) {
2048: return unsafe.compareAndSwapInt(node, waitStatusOffset, expect,
2049: update);
2050: }
2051:
2052: }
|