Source Code Cross Referenced for SynchronousQueue.java in  » 6.0-JDK-Core » Collections-Jar-Zip-Logging-regex » java » util » concurrent » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Home
Java Source Code / Java Documentation
1.6.0 JDK Core
2.6.0 JDK Modules
3.6.0 JDK Modules com.sun
4.6.0 JDK Modules com.sun.java
5.6.0 JDK Modules sun
6.6.0 JDK Platform
7.Ajax
8.Apache Harmony Java SE
9.Aspect oriented
10.Authentication Authorization
11.Blogger System
12.Build
13.Byte Code
14.Cache
15.Chart
16.Chat
17.Code Analyzer
18.Collaboration
19.Content Management System
20.Database Client
21.Database DBMS
22.Database JDBC Connection Pool
23.Database ORM
24.Development
25.EJB Server
26.ERP CRM Financial
27.ESB
28.Forum
29.Game
30.GIS
31.Graphic 3D
32.Graphic Library
33.Groupware
34.HTML Parser
35.IDE
36.IDE Eclipse
37.IDE Netbeans
38.Installer
39.Internationalization Localization
40.Inversion of Control
41.Issue Tracking
42.J2EE
43.J2ME
44.JBoss
45.JMS
46.JMX
47.Library
48.Mail Clients
49.Music
50.Net
51.Parser
52.PDF
53.Portal
54.Profiler
55.Project Management
56.Report
57.RSS RDF
58.Rule Engine
59.Science
60.Scripting
61.Search Engine
62.Security
63.Sevlet Container
64.Source Control
65.Swing Library
66.Template Engine
67.Test Coverage
68.Testing
69.UML
70.Web Crawler
71.Web Framework
72.Web Mail
73.Web Server
74.Web Services
75.Web Services apache cxf 2.2.6
76.Web Services AXIS2
77.Wiki Engine
78.Workflow Engines
79.XML
80.XML UI
Java Source Code / Java Documentation » 6.0 JDK Core » Collections Jar Zip Logging regex » java.util.concurrent 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001        /*
0002         * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
0003         *
0004         * This code is free software; you can redistribute it and/or modify it
0005         * under the terms of the GNU General Public License version 2 only, as
0006         * published by the Free Software Foundation.  Sun designates this
0007         * particular file as subject to the "Classpath" exception as provided
0008         * by Sun in the LICENSE file that accompanied this code.
0009         *
0010         * This code is distributed in the hope that it will be useful, but WITHOUT
0011         * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
0012         * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
0013         * version 2 for more details (a copy is included in the LICENSE file that
0014         * accompanied this code).
0015         *
0016         * You should have received a copy of the GNU General Public License version
0017         * 2 along with this work; if not, write to the Free Software Foundation,
0018         * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
0019         *
0020         * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
0021         * CA 95054 USA or visit www.sun.com if you need additional information or
0022         * have any questions.
0023         */
0024
0025        /*
0026         * This file is available under and governed by the GNU General Public
0027         * License version 2 only, as published by the Free Software Foundation.
0028         * However, the following notice accompanied the original version of this
0029         * file:
0030         *
0031         * Written by Doug Lea, Bill Scherer, and Michael Scott with
0032         * assistance from members of JCP JSR-166 Expert Group and released to
0033         * the public domain, as explained at
0034         * http://creativecommons.org/licenses/publicdomain
0035         */
0036
0037        package java.util.concurrent;
0038
0039        import java.util.concurrent.locks.*;
0040        import java.util.concurrent.atomic.*;
0041        import java.util.*;
0042
0043        /**
0044         * A {@linkplain BlockingQueue blocking queue} in which each insert
0045         * operation must wait for a corresponding remove operation by another
0046         * thread, and vice versa.  A synchronous queue does not have any
0047         * internal capacity, not even a capacity of one.  You cannot
0048         * <tt>peek</tt> at a synchronous queue because an element is only
0049         * present when you try to remove it; you cannot insert an element
0050         * (using any method) unless another thread is trying to remove it;
0051         * you cannot iterate as there is nothing to iterate.  The
0052         * <em>head</em> of the queue is the element that the first queued
0053         * inserting thread is trying to add to the queue; if there is no such
0054         * queued thread then no element is available for removal and
0055         * <tt>poll()</tt> will return <tt>null</tt>.  For purposes of other
0056         * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
0057         * <tt>SynchronousQueue</tt> acts as an empty collection.  This queue
0058         * does not permit <tt>null</tt> elements.
0059         *
0060         * <p>Synchronous queues are similar to rendezvous channels used in
0061         * CSP and Ada. They are well suited for handoff designs, in which an
0062         * object running in one thread must sync up with an object running
0063         * in another thread in order to hand it some information, event, or
0064         * task.
0065         *
0066         * <p> This class supports an optional fairness policy for ordering
0067         * waiting producer and consumer threads.  By default, this ordering
0068         * is not guaranteed. However, a queue constructed with fairness set
0069         * to <tt>true</tt> grants threads access in FIFO order.
0070         *
0071         * <p>This class and its iterator implement all of the
0072         * <em>optional</em> methods of the {@link Collection} and {@link
0073         * Iterator} interfaces.
0074         *
0075         * <p>This class is a member of the
0076         * <a href="{@docRoot}/../technotes/guides/collections/index.html">
0077         * Java Collections Framework</a>.
0078         *
0079         * @since 1.5
0080         * @author Doug Lea and Bill Scherer and Michael Scott
0081         * @param <E> the type of elements held in this collection
0082         */
0083        public class SynchronousQueue<E> extends AbstractQueue<E> implements 
0084                BlockingQueue<E>, java.io.Serializable {
0085            private static final long serialVersionUID = -3223113410248163686L;
0086
0087            /*
0088             * This class implements extensions of the dual stack and dual
0089             * queue algorithms described in "Nonblocking Concurrent Objects
0090             * with Condition Synchronization", by W. N. Scherer III and
0091             * M. L. Scott.  18th Annual Conf. on Distributed Computing,
0092             * Oct. 2004 (see also
0093             * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
0094             * The (Lifo) stack is used for non-fair mode, and the (Fifo)
0095             * queue for fair mode. The performance of the two is generally
0096             * similar. Fifo usually supports higher throughput under
0097             * contention but Lifo maintains higher thread locality in common
0098             * applications.
0099             *
0100             * A dual queue (and similarly stack) is one that at any given
0101             * time either holds "data" -- items provided by put operations,
0102             * or "requests" -- slots representing take operations, or is
0103             * empty. A call to "fulfill" (i.e., a call requesting an item
0104             * from a queue holding data or vice versa) dequeues a
0105             * complementary node.  The most interesting feature of these
0106             * queues is that any operation can figure out which mode the
0107             * queue is in, and act accordingly without needing locks.
0108             *
0109             * Both the queue and stack extend abstract class Transferer
0110             * defining the single method transfer that does a put or a
0111             * take. These are unified into a single method because in dual
0112             * data structures, the put and take operations are symmetrical,
0113             * so nearly all code can be combined. The resulting transfer
0114             * methods are on the long side, but are easier to follow than
0115             * they would be if broken up into nearly-duplicated parts.
0116             *
0117             * The queue and stack data structures share many conceptual
0118             * similarities but very few concrete details. For simplicity,
0119             * they are kept distinct so that they can later evolve
0120             * separately.
0121             *
0122             * The algorithms here differ from the versions in the above paper
0123             * in extending them for use in synchronous queues, as well as
0124             * dealing with cancellation. The main differences include:
0125             *
0126             *  1. The original algorithms used bit-marked pointers, but
0127             *     the ones here use mode bits in nodes, leading to a number
0128             *     of further adaptations.
0129             *  2. SynchronousQueues must block threads waiting to become
0130             *     fulfilled.
0131             *  3. Support for cancellation via timeout and interrupts,
0132             *     including cleaning out cancelled nodes/threads
0133             *     from lists to avoid garbage retention and memory depletion.
0134             *
0135             * Blocking is mainly accomplished using LockSupport park/unpark,
0136             * except that nodes that appear to be the next ones to become
0137             * fulfilled first spin a bit (on multiprocessors only). On very
0138             * busy synchronous queues, spinning can dramatically improve
0139             * throughput. And on less busy ones, the amount of spinning is
0140             * small enough not to be noticeable.
0141             *
0142             * Cleaning is done in different ways in queues vs stacks.  For
0143             * queues, we can almost always remove a node immediately in O(1)
0144             * time (modulo retries for consistency checks) when it is
0145             * cancelled. But if it may be pinned as the current tail, it must
0146             * wait until some subsequent cancellation. For stacks, we need a
0147             * potentially O(n) traversal to be sure that we can remove the
0148             * node, but this can run concurrently with other threads
0149             * accessing the stack.
0150             *
0151             * While garbage collection takes care of most node reclamation
0152             * issues that otherwise complicate nonblocking algorithms, care
0153             * is taken to "forget" references to data, other nodes, and
0154             * threads that might be held on to long-term by blocked
0155             * threads. In cases where setting to null would otherwise
0156             * conflict with main algorithms, this is done by changing a
0157             * node's link to now point to the node itself. This doesn't arise
0158             * much for Stack nodes (because blocked threads do not hang on to
0159             * old head pointers), but references in Queue nodes must be
0160             * aggressively forgotten to avoid reachability of everything any
0161             * node has ever referred to since arrival.
0162             */
0163
0164            /**
0165             * Shared internal API for dual stacks and queues.
0166             */
0167            static abstract class Transferer {
0168                /**
0169                 * Performs a put or take.
0170                 *
0171                 * @param e if non-null, the item to be handed to a consumer;
0172                 *          if null, requests that transfer return an item
0173                 *          offered by producer.
0174                 * @param timed if this operation should timeout
0175                 * @param nanos the timeout, in nanoseconds
0176                 * @return if non-null, the item provided or received; if null,
0177                 *         the operation failed due to timeout or interrupt --
0178                 *         the caller can distinguish which of these occurred
0179                 *         by checking Thread.interrupted.
0180                 */
0181                abstract Object transfer(Object e, boolean timed, long nanos);
0182            }
0183
0184            /** The number of CPUs, for spin control */
0185            static final int NCPUS = Runtime.getRuntime().availableProcessors();
0186
0187            /**
0188             * The number of times to spin before blocking in timed waits.
0189             * The value is empirically derived -- it works well across a
0190             * variety of processors and OSes. Empirically, the best value
0191             * seems not to vary with number of CPUs (beyond 2) so is just
0192             * a constant.
0193             */
0194            static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
0195
0196            /**
0197             * The number of times to spin before blocking in untimed waits.
0198             * This is greater than timed value because untimed waits spin
0199             * faster since they don't need to check times on each spin.
0200             */
0201            static final int maxUntimedSpins = maxTimedSpins * 16;
0202
0203            /**
0204             * The number of nanoseconds for which it is faster to spin
0205             * rather than to use timed park. A rough estimate suffices.
0206             */
0207            static final long spinForTimeoutThreshold = 1000L;
0208
0209            /** Dual stack */
0210            static final class TransferStack extends Transferer {
0211                /*
0212                 * This extends Scherer-Scott dual stack algorithm, differing,
0213                 * among other ways, by using "covering" nodes rather than
0214                 * bit-marked pointers: Fulfilling operations push on marker
0215                 * nodes (with FULFILLING bit set in mode) to reserve a spot
0216                 * to match a waiting node.
0217                 */
0218
0219                /* Modes for SNodes, ORed together in node fields */
0220                /** Node represents an unfulfilled consumer */
0221                static final int REQUEST = 0;
0222                /** Node represents an unfulfilled producer */
0223                static final int DATA = 1;
0224                /** Node is fulfilling another unfulfilled DATA or REQUEST */
0225                static final int FULFILLING = 2;
0226
0227                /** Return true if m has fulfilling bit set */
0228                static boolean isFulfilling(int m) {
0229                    return (m & FULFILLING) != 0;
0230                }
0231
0232                /** Node class for TransferStacks. */
0233                static final class SNode {
0234                    volatile SNode next; // next node in stack
0235                    volatile SNode match; // the node matched to this
0236                    volatile Thread waiter; // to control park/unpark
0237                    Object item; // data; or null for REQUESTs
0238                    int mode;
0239
0240                    // Note: item and mode fields don't need to be volatile
0241                    // since they are always written before, and read after,
0242                    // other volatile/atomic operations.
0243
0244                    SNode(Object item) {
0245                        this .item = item;
0246                    }
0247
0248                    static final AtomicReferenceFieldUpdater<SNode, SNode> nextUpdater = AtomicReferenceFieldUpdater
0249                            .newUpdater(SNode.class, SNode.class, "next");
0250
0251                    boolean casNext(SNode cmp, SNode val) {
0252                        return (cmp == next && nextUpdater.compareAndSet(this ,
0253                                cmp, val));
0254                    }
0255
0256                    static final AtomicReferenceFieldUpdater<SNode, SNode> matchUpdater = AtomicReferenceFieldUpdater
0257                            .newUpdater(SNode.class, SNode.class, "match");
0258
0259                    /**
0260                     * Tries to match node s to this node, if so, waking up thread.
0261                     * Fulfillers call tryMatch to identify their waiters.
0262                     * Waiters block until they have been matched.
0263                     *
0264                     * @param s the node to match
0265                     * @return true if successfully matched to s
0266                     */
0267                    boolean tryMatch(SNode s) {
0268                        if (match == null
0269                                && matchUpdater.compareAndSet(this , null, s)) {
0270                            Thread w = waiter;
0271                            if (w != null) { // waiters need at most one unpark
0272                                waiter = null;
0273                                LockSupport.unpark(w);
0274                            }
0275                            return true;
0276                        }
0277                        return match == s;
0278                    }
0279
0280                    /**
0281                     * Tries to cancel a wait by matching node to itself.
0282                     */
0283                    void tryCancel() {
0284                        matchUpdater.compareAndSet(this , null, this );
0285                    }
0286
0287                    boolean isCancelled() {
0288                        return match == this ;
0289                    }
0290                }
0291
0292                /** The head (top) of the stack */
0293                volatile SNode head;
0294
0295                static final AtomicReferenceFieldUpdater<TransferStack, SNode> headUpdater = AtomicReferenceFieldUpdater
0296                        .newUpdater(TransferStack.class, SNode.class, "head");
0297
0298                boolean casHead(SNode h, SNode nh) {
0299                    return h == head && headUpdater.compareAndSet(this , h, nh);
0300                }
0301
0302                /**
0303                 * Creates or resets fields of a node. Called only from transfer
0304                 * where the node to push on stack is lazily created and
0305                 * reused when possible to help reduce intervals between reads
0306                 * and CASes of head and to avoid surges of garbage when CASes
0307                 * to push nodes fail due to contention.
0308                 */
0309                static SNode snode(SNode s, Object e, SNode next, int mode) {
0310                    if (s == null)
0311                        s = new SNode(e);
0312                    s.mode = mode;
0313                    s.next = next;
0314                    return s;
0315                }
0316
0317                /**
0318                 * Puts or takes an item.
0319                 */
0320                Object transfer(Object e, boolean timed, long nanos) {
0321                    /*
0322                     * Basic algorithm is to loop trying one of three actions:
0323                     *
0324                     * 1. If apparently empty or already containing nodes of same
0325                     *    mode, try to push node on stack and wait for a match,
0326                     *    returning it, or null if cancelled.
0327                     *
0328                     * 2. If apparently containing node of complementary mode,
0329                     *    try to push a fulfilling node on to stack, match
0330                     *    with corresponding waiting node, pop both from
0331                     *    stack, and return matched item. The matching or
0332                     *    unlinking might not actually be necessary because of
0333                     *    other threads performing action 3:
0334                     *
0335                     * 3. If top of stack already holds another fulfilling node,
0336                     *    help it out by doing its match and/or pop
0337                     *    operations, and then continue. The code for helping
0338                     *    is essentially the same as for fulfilling, except
0339                     *    that it doesn't return the item.
0340                     */
0341
0342                    SNode s = null; // constructed/reused as needed
0343                    int mode = (e == null) ? REQUEST : DATA;
0344
0345                    for (;;) {
0346                        SNode h = head;
0347                        if (h == null || h.mode == mode) { // empty or same-mode
0348                            if (timed && nanos <= 0) { // can't wait
0349                                if (h != null && h.isCancelled())
0350                                    casHead(h, h.next); // pop cancelled node
0351                                else
0352                                    return null;
0353                            } else if (casHead(h, s = snode(s, e, h, mode))) {
0354                                SNode m = awaitFulfill(s, timed, nanos);
0355                                if (m == s) { // wait was cancelled
0356                                    clean(s);
0357                                    return null;
0358                                }
0359                                if ((h = head) != null && h.next == s)
0360                                    casHead(h, s.next); // help s's fulfiller
0361                                return mode == REQUEST ? m.item : s.item;
0362                            }
0363                        } else if (!isFulfilling(h.mode)) { // try to fulfill
0364                            if (h.isCancelled()) // already cancelled
0365                                casHead(h, h.next); // pop and retry
0366                            else if (casHead(h, s = snode(s, e, h, FULFILLING
0367                                    | mode))) {
0368                                for (;;) { // loop until matched or waiters disappear
0369                                    SNode m = s.next; // m is s's match
0370                                    if (m == null) { // all waiters are gone
0371                                        casHead(s, null); // pop fulfill node
0372                                        s = null; // use new node next time
0373                                        break; // restart main loop
0374                                    }
0375                                    SNode mn = m.next;
0376                                    if (m.tryMatch(s)) {
0377                                        casHead(s, mn); // pop both s and m
0378                                        return (mode == REQUEST) ? m.item
0379                                                : s.item;
0380                                    } else
0381                                        // lost match
0382                                        s.casNext(m, mn); // help unlink
0383                                }
0384                            }
0385                        } else { // help a fulfiller
0386                            SNode m = h.next; // m is h's match
0387                            if (m == null) // waiter is gone
0388                                casHead(h, null); // pop fulfilling node
0389                            else {
0390                                SNode mn = m.next;
0391                                if (m.tryMatch(h)) // help match
0392                                    casHead(h, mn); // pop both h and m
0393                                else
0394                                    // lost match
0395                                    h.casNext(m, mn); // help unlink
0396                            }
0397                        }
0398                    }
0399                }
0400
0401                /**
0402                 * Spins/blocks until node s is matched by a fulfill operation.
0403                 *
0404                 * @param s the waiting node
0405                 * @param timed true if timed wait
0406                 * @param nanos timeout value
0407                 * @return matched node, or s if cancelled
0408                 */
0409                SNode awaitFulfill(SNode s, boolean timed, long nanos) {
0410                    /*
0411                     * When a node/thread is about to block, it sets its waiter
0412                     * field and then rechecks state at least one more time
0413                     * before actually parking, thus covering race vs
0414                     * fulfiller noticing that waiter is non-null so should be
0415                     * woken.
0416                     *
0417                     * When invoked by nodes that appear at the point of call
0418                     * to be at the head of the stack, calls to park are
0419                     * preceded by spins to avoid blocking when producers and
0420                     * consumers are arriving very close in time.  This can
0421                     * happen enough to bother only on multiprocessors.
0422                     *
0423                     * The order of checks for returning out of main loop
0424                     * reflects fact that interrupts have precedence over
0425                     * normal returns, which have precedence over
0426                     * timeouts. (So, on timeout, one last check for match is
0427                     * done before giving up.) Except that calls from untimed
0428                     * SynchronousQueue.{poll/offer} don't check interrupts
0429                     * and don't wait at all, so are trapped in transfer
0430                     * method rather than calling awaitFulfill.
0431                     */
0432                    long lastTime = (timed) ? System.nanoTime() : 0;
0433                    Thread w = Thread.currentThread();
0434                    SNode h = head;
0435                    int spins = (shouldSpin(s) ? (timed ? maxTimedSpins
0436                            : maxUntimedSpins) : 0);
0437                    for (;;) {
0438                        if (w.isInterrupted())
0439                            s.tryCancel();
0440                        SNode m = s.match;
0441                        if (m != null)
0442                            return m;
0443                        if (timed) {
0444                            long now = System.nanoTime();
0445                            nanos -= now - lastTime;
0446                            lastTime = now;
0447                            if (nanos <= 0) {
0448                                s.tryCancel();
0449                                continue;
0450                            }
0451                        }
0452                        if (spins > 0)
0453                            spins = shouldSpin(s) ? (spins - 1) : 0;
0454                        else if (s.waiter == null)
0455                            s.waiter = w; // establish waiter so can park next iter
0456                        else if (!timed)
0457                            LockSupport.park(this );
0458                        else if (nanos > spinForTimeoutThreshold)
0459                            LockSupport.parkNanos(this , nanos);
0460                    }
0461                }
0462
0463                /**
0464                 * Returns true if node s is at head or there is an active
0465                 * fulfiller.
0466                 */
0467                boolean shouldSpin(SNode s) {
0468                    SNode h = head;
0469                    return (h == s || h == null || isFulfilling(h.mode));
0470                }
0471
0472                /**
0473                 * Unlinks s from the stack.
0474                 */
0475                void clean(SNode s) {
0476                    s.item = null; // forget item
0477                    s.waiter = null; // forget thread
0478
0479                    /*
0480                     * At worst we may need to traverse entire stack to unlink
0481                     * s. If there are multiple concurrent calls to clean, we
0482                     * might not see s if another thread has already removed
0483                     * it. But we can stop when we see any node known to
0484                     * follow s. We use s.next unless it too is cancelled, in
0485                     * which case we try the node one past. We don't check any
0486                     * further because we don't want to doubly traverse just to
0487                     * find sentinel.
0488                     */
0489
0490                    SNode past = s.next;
0491                    if (past != null && past.isCancelled())
0492                        past = past.next;
0493
0494                    // Absorb cancelled nodes at head
0495                    SNode p;
0496                    while ((p = head) != null && p != past && p.isCancelled())
0497                        casHead(p, p.next);
0498
0499                    // Unsplice embedded nodes
0500                    while (p != null && p != past) {
0501                        SNode n = p.next;
0502                        if (n != null && n.isCancelled())
0503                            p.casNext(n, n.next);
0504                        else
0505                            p = n;
0506                    }
0507                }
0508            }
0509
0510            /** Dual Queue */
0511            static final class TransferQueue extends Transferer {
0512                /*
0513                 * This extends Scherer-Scott dual queue algorithm, differing,
0514                 * among other ways, by using modes within nodes rather than
0515                 * marked pointers. The algorithm is a little simpler than
0516                 * that for stacks because fulfillers do not need explicit
0517                 * nodes, and matching is done by CAS'ing QNode.item field
0518                 * from non-null to null (for put) or vice versa (for take).
0519                 */
0520
0521                /** Node class for TransferQueue. */
0522                static final class QNode {
0523                    volatile QNode next; // next node in queue
0524                    volatile Object item; // CAS'ed to or from null
0525                    volatile Thread waiter; // to control park/unpark
0526                    final boolean isData;
0527
0528                    QNode(Object item, boolean isData) {
0529                        this .item = item;
0530                        this .isData = isData;
0531                    }
0532
0533                    static final AtomicReferenceFieldUpdater<QNode, QNode> nextUpdater = AtomicReferenceFieldUpdater
0534                            .newUpdater(QNode.class, QNode.class, "next");
0535
0536                    boolean casNext(QNode cmp, QNode val) {
0537                        return (next == cmp && nextUpdater.compareAndSet(this ,
0538                                cmp, val));
0539                    }
0540
0541                    static final AtomicReferenceFieldUpdater<QNode, Object> itemUpdater = AtomicReferenceFieldUpdater
0542                            .newUpdater(QNode.class, Object.class, "item");
0543
0544                    boolean casItem(Object cmp, Object val) {
0545                        return (item == cmp && itemUpdater.compareAndSet(this ,
0546                                cmp, val));
0547                    }
0548
0549                    /**
0550                     * Tries to cancel by CAS'ing ref to this as item.
0551                     */
0552                    void tryCancel(Object cmp) {
0553                        itemUpdater.compareAndSet(this , cmp, this );
0554                    }
0555
0556                    boolean isCancelled() {
0557                        return item == this ;
0558                    }
0559
0560                    /**
0561                     * Returns true if this node is known to be off the queue
0562                     * because its next pointer has been forgotten due to
0563                     * an advanceHead operation.
0564                     */
0565                    boolean isOffList() {
0566                        return next == this ;
0567                    }
0568                }
0569
0570                /** Head of queue */
0571                transient volatile QNode head;
0572                /** Tail of queue */
0573                transient volatile QNode tail;
0574                /**
0575                 * Reference to a cancelled node that might not yet have been
0576                 * unlinked from queue because it was the last inserted node
0577                 * when it cancelled.
0578                 */
0579                transient volatile QNode cleanMe;
0580
0581                TransferQueue() {
0582                    QNode h = new QNode(null, false); // initialize to dummy node.
0583                    head = h;
0584                    tail = h;
0585                }
0586
0587                static final AtomicReferenceFieldUpdater<TransferQueue, QNode> headUpdater = AtomicReferenceFieldUpdater
0588                        .newUpdater(TransferQueue.class, QNode.class, "head");
0589
0590                /**
0591                 * Tries to cas nh as new head; if successful, unlink
0592                 * old head's next node to avoid garbage retention.
0593                 */
0594                void advanceHead(QNode h, QNode nh) {
0595                    if (h == head && headUpdater.compareAndSet(this , h, nh))
0596                        h.next = h; // forget old next
0597                }
0598
0599                static final AtomicReferenceFieldUpdater<TransferQueue, QNode> tailUpdater = AtomicReferenceFieldUpdater
0600                        .newUpdater(TransferQueue.class, QNode.class, "tail");
0601
0602                /**
0603                 * Tries to cas nt as new tail.
0604                 */
0605                void advanceTail(QNode t, QNode nt) {
0606                    if (tail == t)
0607                        tailUpdater.compareAndSet(this , t, nt);
0608                }
0609
0610                static final AtomicReferenceFieldUpdater<TransferQueue, QNode> cleanMeUpdater = AtomicReferenceFieldUpdater
0611                        .newUpdater(TransferQueue.class, QNode.class, "cleanMe");
0612
0613                /**
0614                 * Tries to CAS cleanMe slot.
0615                 */
0616                boolean casCleanMe(QNode cmp, QNode val) {
0617                    return (cleanMe == cmp && cleanMeUpdater.compareAndSet(
0618                            this , cmp, val));
0619                }
0620
0621                /**
0622                 * Puts or takes an item.
0623                 */
0624                Object transfer(Object e, boolean timed, long nanos) {
0625                    /* Basic algorithm is to loop trying to take either of
0626                     * two actions:
0627                     *
0628                     * 1. If queue apparently empty or holding same-mode nodes,
0629                     *    try to add node to queue of waiters, wait to be
0630                     *    fulfilled (or cancelled) and return matching item.
0631                     *
0632                     * 2. If queue apparently contains waiting items, and this
0633                     *    call is of complementary mode, try to fulfill by CAS'ing
0634                     *    item field of waiting node and dequeuing it, and then
0635                     *    returning matching item.
0636                     *
0637                     * In each case, along the way, check for and try to help
0638                     * advance head and tail on behalf of other stalled/slow
0639                     * threads.
0640                     *
0641                     * The loop starts off with a null check guarding against
0642                     * seeing uninitialized head or tail values. This never
0643                     * happens in current SynchronousQueue, but could if
0644                     * callers held non-volatile/final ref to the
0645                     * transferer. The check is here anyway because it places
0646                     * null checks at top of loop, which is usually faster
0647                     * than having them implicitly interspersed.
0648                     */
0649
0650                    QNode s = null; // constructed/reused as needed
0651                    boolean isData = (e != null);
0652
0653                    for (;;) {
0654                        QNode t = tail;
0655                        QNode h = head;
0656                        if (t == null || h == null) // saw uninitialized value
0657                            continue; // spin
0658
0659                        if (h == t || t.isData == isData) { // empty or same-mode
0660                            QNode tn = t.next;
0661                            if (t != tail) // inconsistent read
0662                                continue;
0663                            if (tn != null) { // lagging tail
0664                                advanceTail(t, tn);
0665                                continue;
0666                            }
0667                            if (timed && nanos <= 0) // can't wait
0668                                return null;
0669                            if (s == null)
0670                                s = new QNode(e, isData);
0671                            if (!t.casNext(null, s)) // failed to link in
0672                                continue;
0673
0674                            advanceTail(t, s); // swing tail and wait
0675                            Object x = awaitFulfill(s, e, timed, nanos);
0676                            if (x == s) { // wait was cancelled
0677                                clean(t, s);
0678                                return null;
0679                            }
0680
0681                            if (!s.isOffList()) { // not already unlinked
0682                                advanceHead(t, s); // unlink if head
0683                                if (x != null) // and forget fields
0684                                    s.item = s;
0685                                s.waiter = null;
0686                            }
0687                            return (x != null) ? x : e;
0688
0689                        } else { // complementary-mode
0690                            QNode m = h.next; // node to fulfill
0691                            if (t != tail || m == null || h != head)
0692                                continue; // inconsistent read
0693
0694                            Object x = m.item;
0695                            if (isData == (x != null) || // m already fulfilled
0696                                    x == m || // m cancelled
0697                                    !m.casItem(x, e)) { // lost CAS
0698                                advanceHead(h, m); // dequeue and retry
0699                                continue;
0700                            }
0701
0702                            advanceHead(h, m); // successfully fulfilled
0703                            LockSupport.unpark(m.waiter);
0704                            return (x != null) ? x : e;
0705                        }
0706                    }
0707                }
0708
0709                /**
0710                 * Spins/blocks until node s is fulfilled.
0711                 *
0712                 * @param s the waiting node
0713                 * @param e the comparison value for checking match
0714                 * @param timed true if timed wait
0715                 * @param nanos timeout value
0716                 * @return matched item, or s if cancelled
0717                 */
0718                Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
0719                    /* Same idea as TransferStack.awaitFulfill */
0720                    long lastTime = (timed) ? System.nanoTime() : 0;
0721                    Thread w = Thread.currentThread();
0722                    int spins = ((head.next == s) ? (timed ? maxTimedSpins
0723                            : maxUntimedSpins) : 0);
0724                    for (;;) {
0725                        if (w.isInterrupted())
0726                            s.tryCancel(e);
0727                        Object x = s.item;
0728                        if (x != e)
0729                            return x;
0730                        if (timed) {
0731                            long now = System.nanoTime();
0732                            nanos -= now - lastTime;
0733                            lastTime = now;
0734                            if (nanos <= 0) {
0735                                s.tryCancel(e);
0736                                continue;
0737                            }
0738                        }
0739                        if (spins > 0)
0740                            --spins;
0741                        else if (s.waiter == null)
0742                            s.waiter = w;
0743                        else if (!timed)
0744                            LockSupport.park(this );
0745                        else if (nanos > spinForTimeoutThreshold)
0746                            LockSupport.parkNanos(this , nanos);
0747                    }
0748                }
0749
0750                /**
0751                 * Gets rid of cancelled node s with original predecessor pred.
0752                 */
0753                void clean(QNode pred, QNode s) {
0754                    s.waiter = null; // forget thread
0755                    /*
0756                     * At any given time, exactly one node on list cannot be
0757                     * deleted -- the last inserted node. To accommodate this,
0758                     * if we cannot delete s, we save its predecessor as
0759                     * "cleanMe", deleting the previously saved version
0760                     * first. At least one of node s or the node previously
0761                     * saved can always be deleted, so this always terminates.
0762                     */
0763                    while (pred.next == s) { // Return early if already unlinked
0764                        QNode h = head;
0765                        QNode hn = h.next; // Absorb cancelled first node as head
0766                        if (hn != null && hn.isCancelled()) {
0767                            advanceHead(h, hn);
0768                            continue;
0769                        }
0770                        QNode t = tail; // Ensure consistent read for tail
0771                        if (t == h)
0772                            return;
0773                        QNode tn = t.next;
0774                        if (t != tail)
0775                            continue;
0776                        if (tn != null) {
0777                            advanceTail(t, tn);
0778                            continue;
0779                        }
0780                        if (s != t) { // If not tail, try to unsplice
0781                            QNode sn = s.next;
0782                            if (sn == s || pred.casNext(s, sn))
0783                                return;
0784                        }
0785                        QNode dp = cleanMe;
0786                        if (dp != null) { // Try unlinking previous cancelled node
0787                            QNode d = dp.next;
0788                            QNode dn;
0789                            if (d == null || // d is gone or
0790                                    d == dp || // d is off list or
0791                                    !d.isCancelled() || // d not cancelled or
0792                                    (d != t && // d not tail and
0793                                            (dn = d.next) != null && //   has successor
0794                                            dn != d && //   that is on list
0795                                    dp.casNext(d, dn))) // d unspliced
0796                                casCleanMe(dp, null);
0797                            if (dp == pred)
0798                                return; // s is already saved node
0799                        } else if (casCleanMe(null, pred))
0800                            return; // Postpone cleaning s
0801                    }
0802                }
0803            }
0804
0805            /**
0806             * The transferer. Set only in constructor, but cannot be declared
0807             * as final without further complicating serialization.  Since
0808             * this is accessed only at most once per public method, there
0809             * isn't a noticeable performance penalty for using volatile
0810             * instead of final here.
0811             */
0812            private transient volatile Transferer transferer;
0813
0814            /**
0815             * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
0816             */
0817            public SynchronousQueue() {
0818                this (false);
0819            }
0820
0821            /**
0822             * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
0823             *
0824             * @param fair if true, waiting threads contend in FIFO order for
0825             *        access; otherwise the order is unspecified.
0826             */
0827            public SynchronousQueue(boolean fair) {
0828                transferer = (fair) ? new TransferQueue() : new TransferStack();
0829            }
0830
0831            /**
0832             * Adds the specified element to this queue, waiting if necessary for
0833             * another thread to receive it.
0834             *
0835             * @throws InterruptedException {@inheritDoc}
0836             * @throws NullPointerException {@inheritDoc}
0837             */
0838            public void put(E o) throws InterruptedException {
0839                if (o == null)
0840                    throw new NullPointerException();
0841                if (transferer.transfer(o, false, 0) == null) {
0842                    Thread.interrupted();
0843                    throw new InterruptedException();
0844                }
0845            }
0846
0847            /**
0848             * Inserts the specified element into this queue, waiting if necessary
0849             * up to the specified wait time for another thread to receive it.
0850             *
0851             * @return <tt>true</tt> if successful, or <tt>false</tt> if the
0852             *         specified waiting time elapses before a consumer appears.
0853             * @throws InterruptedException {@inheritDoc}
0854             * @throws NullPointerException {@inheritDoc}
0855             */
0856            public boolean offer(E o, long timeout, TimeUnit unit)
0857                    throws InterruptedException {
0858                if (o == null)
0859                    throw new NullPointerException();
0860                if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
0861                    return true;
0862                if (!Thread.interrupted())
0863                    return false;
0864                throw new InterruptedException();
0865            }
0866
0867            /**
0868             * Inserts the specified element into this queue, if another thread is
0869             * waiting to receive it.
0870             *
0871             * @param e the element to add
0872             * @return <tt>true</tt> if the element was added to this queue, else
0873             *         <tt>false</tt>
0874             * @throws NullPointerException if the specified element is null
0875             */
0876            public boolean offer(E e) {
0877                if (e == null)
0878                    throw new NullPointerException();
0879                return transferer.transfer(e, true, 0) != null;
0880            }
0881
0882            /**
0883             * Retrieves and removes the head of this queue, waiting if necessary
0884             * for another thread to insert it.
0885             *
0886             * @return the head of this queue
0887             * @throws InterruptedException {@inheritDoc}
0888             */
0889            public E take() throws InterruptedException {
0890                Object e = transferer.transfer(null, false, 0);
0891                if (e != null)
0892                    return (E) e;
0893                Thread.interrupted();
0894                throw new InterruptedException();
0895            }
0896
0897            /**
0898             * Retrieves and removes the head of this queue, waiting
0899             * if necessary up to the specified wait time, for another thread
0900             * to insert it.
0901             *
0902             * @return the head of this queue, or <tt>null</tt> if the
0903             *         specified waiting time elapses before an element is present.
0904             * @throws InterruptedException {@inheritDoc}
0905             */
0906            public E poll(long timeout, TimeUnit unit)
0907                    throws InterruptedException {
0908                Object e = transferer.transfer(null, true, unit
0909                        .toNanos(timeout));
0910                if (e != null || !Thread.interrupted())
0911                    return (E) e;
0912                throw new InterruptedException();
0913            }
0914
0915            /**
0916             * Retrieves and removes the head of this queue, if another thread
0917             * is currently making an element available.
0918             *
0919             * @return the head of this queue, or <tt>null</tt> if no
0920             *         element is available.
0921             */
0922            public E poll() {
0923                return (E) transferer.transfer(null, true, 0);
0924            }
0925
0926            /**
0927             * Always returns <tt>true</tt>.
0928             * A <tt>SynchronousQueue</tt> has no internal capacity.
0929             *
0930             * @return <tt>true</tt>
0931             */
0932            public boolean isEmpty() {
0933                return true;
0934            }
0935
0936            /**
0937             * Always returns zero.
0938             * A <tt>SynchronousQueue</tt> has no internal capacity.
0939             *
0940             * @return zero.
0941             */
0942            public int size() {
0943                return 0;
0944            }
0945
0946            /**
0947             * Always returns zero.
0948             * A <tt>SynchronousQueue</tt> has no internal capacity.
0949             *
0950             * @return zero.
0951             */
0952            public int remainingCapacity() {
0953                return 0;
0954            }
0955
0956            /**
0957             * Does nothing.
0958             * A <tt>SynchronousQueue</tt> has no internal capacity.
0959             */
0960            public void clear() {
0961            }
0962
0963            /**
0964             * Always returns <tt>false</tt>.
0965             * A <tt>SynchronousQueue</tt> has no internal capacity.
0966             *
0967             * @param o the element
0968             * @return <tt>false</tt>
0969             */
0970            public boolean contains(Object o) {
0971                return false;
0972            }
0973
0974            /**
0975             * Always returns <tt>false</tt>.
0976             * A <tt>SynchronousQueue</tt> has no internal capacity.
0977             *
0978             * @param o the element to remove
0979             * @return <tt>false</tt>
0980             */
0981            public boolean remove(Object o) {
0982                return false;
0983            }
0984
0985            /**
0986             * Returns <tt>false</tt> unless the given collection is empty.
0987             * A <tt>SynchronousQueue</tt> has no internal capacity.
0988             *
0989             * @param c the collection
0990             * @return <tt>false</tt> unless given collection is empty
0991             */
0992            public boolean containsAll(Collection<?> c) {
0993                return c.isEmpty();
0994            }
0995
0996            /**
0997             * Always returns <tt>false</tt>.
0998             * A <tt>SynchronousQueue</tt> has no internal capacity.
0999             *
1000             * @param c the collection
1001             * @return <tt>false</tt>
1002             */
1003            public boolean removeAll(Collection<?> c) {
1004                return false;
1005            }
1006
1007            /**
1008             * Always returns <tt>false</tt>.
1009             * A <tt>SynchronousQueue</tt> has no internal capacity.
1010             *
1011             * @param c the collection
1012             * @return <tt>false</tt>
1013             */
1014            public boolean retainAll(Collection<?> c) {
1015                return false;
1016            }
1017
1018            /**
1019             * Always returns <tt>null</tt>.
1020             * A <tt>SynchronousQueue</tt> does not return elements
1021             * unless actively waited on.
1022             *
1023             * @return <tt>null</tt>
1024             */
1025            public E peek() {
1026                return null;
1027            }
1028
1029            /**
1030             * Returns an empty iterator in which <tt>hasNext</tt> always returns
1031             * <tt>false</tt>.
1032             *
1033             * @return an empty iterator
1034             */
1035            public Iterator<E> iterator() {
1036                return Collections.emptyIterator();
1037            }
1038
1039            /**
1040             * Returns a zero-length array.
1041             * @return a zero-length array
1042             */
1043            public Object[] toArray() {
1044                return new Object[0];
1045            }
1046
1047            /**
1048             * Sets the zeroeth element of the specified array to <tt>null</tt>
1049             * (if the array has non-zero length) and returns it.
1050             *
1051             * @param a the array
1052             * @return the specified array
1053             * @throws NullPointerException if the specified array is null
1054             */
1055            public <T> T[] toArray(T[] a) {
1056                if (a.length > 0)
1057                    a[0] = null;
1058                return a;
1059            }
1060
1061            /**
1062             * @throws UnsupportedOperationException {@inheritDoc}
1063             * @throws ClassCastException            {@inheritDoc}
1064             * @throws NullPointerException          {@inheritDoc}
1065             * @throws IllegalArgumentException      {@inheritDoc}
1066             */
1067            public int drainTo(Collection<? super  E> c) {
1068                if (c == null)
1069                    throw new NullPointerException();
1070                if (c == this )
1071                    throw new IllegalArgumentException();
1072                int n = 0;
1073                E e;
1074                while ((e = poll()) != null) {
1075                    c.add(e);
1076                    ++n;
1077                }
1078                return n;
1079            }
1080
1081            /**
1082             * @throws UnsupportedOperationException {@inheritDoc}
1083             * @throws ClassCastException            {@inheritDoc}
1084             * @throws NullPointerException          {@inheritDoc}
1085             * @throws IllegalArgumentException      {@inheritDoc}
1086             */
1087            public int drainTo(Collection<? super  E> c, int maxElements) {
1088                if (c == null)
1089                    throw new NullPointerException();
1090                if (c == this )
1091                    throw new IllegalArgumentException();
1092                int n = 0;
1093                E e;
1094                while (n < maxElements && (e = poll()) != null) {
1095                    c.add(e);
1096                    ++n;
1097                }
1098                return n;
1099            }
1100
1101            /*
1102             * To cope with serialization strategy in the 1.5 version of
1103             * SynchronousQueue, we declare some unused classes and fields
1104             * that exist solely to enable serializability across versions.
1105             * These fields are never used, so are initialized only if this
1106             * object is ever serialized or deserialized.
1107             */
1108
1109            static class WaitQueue implements  java.io.Serializable {
1110            }
1111
1112            static class LifoWaitQueue extends WaitQueue {
1113                private static final long serialVersionUID = -3633113410248163686L;
1114            }
1115
1116            static class FifoWaitQueue extends WaitQueue {
1117                private static final long serialVersionUID = -3623113410248163686L;
1118            }
1119
1120            private ReentrantLock qlock;
1121            private WaitQueue waitingProducers;
1122            private WaitQueue waitingConsumers;
1123
1124            /**
1125             * Save the state to a stream (that is, serialize it).
1126             *
1127             * @param s the stream
1128             */
1129            private void writeObject(java.io.ObjectOutputStream s)
1130                    throws java.io.IOException {
1131                boolean fair = transferer instanceof  TransferQueue;
1132                if (fair) {
1133                    qlock = new ReentrantLock(true);
1134                    waitingProducers = new FifoWaitQueue();
1135                    waitingConsumers = new FifoWaitQueue();
1136                } else {
1137                    qlock = new ReentrantLock();
1138                    waitingProducers = new LifoWaitQueue();
1139                    waitingConsumers = new LifoWaitQueue();
1140                }
1141                s.defaultWriteObject();
1142            }
1143
1144            private void readObject(final java.io.ObjectInputStream s)
1145                    throws java.io.IOException, ClassNotFoundException {
1146                s.defaultReadObject();
1147                if (waitingProducers instanceof  FifoWaitQueue)
1148                    transferer = new TransferQueue();
1149                else
1150                    transferer = new TransferStack();
1151            }
1152
1153        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.