Source Code Cross Referenced for Exchanger.java in  » 6.0-JDK-Core » Collections-Jar-Zip-Logging-regex » java » util » concurrent » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Home
Java Source Code / Java Documentation
1.6.0 JDK Core
2.6.0 JDK Modules
3.6.0 JDK Modules com.sun
4.6.0 JDK Modules com.sun.java
5.6.0 JDK Modules sun
6.6.0 JDK Platform
7.Ajax
8.Apache Harmony Java SE
9.Aspect oriented
10.Authentication Authorization
11.Blogger System
12.Build
13.Byte Code
14.Cache
15.Chart
16.Chat
17.Code Analyzer
18.Collaboration
19.Content Management System
20.Database Client
21.Database DBMS
22.Database JDBC Connection Pool
23.Database ORM
24.Development
25.EJB Server
26.ERP CRM Financial
27.ESB
28.Forum
29.Game
30.GIS
31.Graphic 3D
32.Graphic Library
33.Groupware
34.HTML Parser
35.IDE
36.IDE Eclipse
37.IDE Netbeans
38.Installer
39.Internationalization Localization
40.Inversion of Control
41.Issue Tracking
42.J2EE
43.J2ME
44.JBoss
45.JMS
46.JMX
47.Library
48.Mail Clients
49.Music
50.Net
51.Parser
52.PDF
53.Portal
54.Profiler
55.Project Management
56.Report
57.RSS RDF
58.Rule Engine
59.Science
60.Scripting
61.Search Engine
62.Security
63.Sevlet Container
64.Source Control
65.Swing Library
66.Template Engine
67.Test Coverage
68.Testing
69.UML
70.Web Crawler
71.Web Framework
72.Web Mail
73.Web Server
74.Web Services
75.Web Services apache cxf 2.2.6
76.Web Services AXIS2
77.Wiki Engine
78.Workflow Engines
79.XML
80.XML UI
Java Source Code / Java Documentation » 6.0 JDK Core » Collections Jar Zip Logging regex » java.util.concurrent 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001        /*
002         * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
003         *
004         * This code is free software; you can redistribute it and/or modify it
005         * under the terms of the GNU General Public License version 2 only, as
006         * published by the Free Software Foundation.  Sun designates this
007         * particular file as subject to the "Classpath" exception as provided
008         * by Sun in the LICENSE file that accompanied this code.
009         *
010         * This code is distributed in the hope that it will be useful, but WITHOUT
011         * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012         * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
013         * version 2 for more details (a copy is included in the LICENSE file that
014         * accompanied this code).
015         *
016         * You should have received a copy of the GNU General Public License version
017         * 2 along with this work; if not, write to the Free Software Foundation,
018         * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
019         *
020         * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
021         * CA 95054 USA or visit www.sun.com if you need additional information or
022         * have any questions.
023         */
024
025        /*
026         * This file is available under and governed by the GNU General Public
027         * License version 2 only, as published by the Free Software Foundation.
028         * However, the following notice accompanied the original version of this
029         * file:
030         *
031         * Written by Doug Lea, Bill Scherer, and Michael Scott with
032         * assistance from members of JCP JSR-166 Expert Group and released to
033         * the public domain, as explained at
034         * http://creativecommons.org/licenses/publicdomain
035         */
036
037        package java.util.concurrent;
038
039        import java.util.concurrent.atomic.*;
040        import java.util.concurrent.locks.LockSupport;
041
042        /**
043         * A synchronization point at which threads can pair and swap elements
044         * within pairs.  Each thread presents some object on entry to the
045         * {@link #exchange exchange} method, matches with a partner thread,
046         * and receives its partner's object on return.  An Exchanger may be
047         * viewed as a bidirectional form of a {@link SynchronousQueue}.
048         * Exchangers may be useful in applications such as genetic algorithms
049         * and pipeline designs.
050         *
051         * <p><b>Sample Usage:</b>
052         * Here are the highlights of a class that uses an {@code Exchanger}
053         * to swap buffers between threads so that the thread filling the
054         * buffer gets a freshly emptied one when it needs it, handing off the
055         * filled one to the thread emptying the buffer.
056         * <pre>{@code
057         * class FillAndEmpty {
058         *   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
059         *   DataBuffer initialEmptyBuffer = ... a made-up type
060         *   DataBuffer initialFullBuffer = ...
061         *
062         *   class FillingLoop implements Runnable {
063         *     public void run() {
064         *       DataBuffer currentBuffer = initialEmptyBuffer;
065         *       try {
066         *         while (currentBuffer != null) {
067         *           addToBuffer(currentBuffer);
068         *           if (currentBuffer.isFull())
069         *             currentBuffer = exchanger.exchange(currentBuffer);
070         *         }
071         *       } catch (InterruptedException ex) { ... handle ... }
072         *     }
073         *   }
074         *
075         *   class EmptyingLoop implements Runnable {
076         *     public void run() {
077         *       DataBuffer currentBuffer = initialFullBuffer;
078         *       try {
079         *         while (currentBuffer != null) {
080         *           takeFromBuffer(currentBuffer);
081         *           if (currentBuffer.isEmpty())
082         *             currentBuffer = exchanger.exchange(currentBuffer);
083         *         }
084         *       } catch (InterruptedException ex) { ... handle ...}
085         *     }
086         *   }
087         *
088         *   void start() {
089         *     new Thread(new FillingLoop()).start();
090         *     new Thread(new EmptyingLoop()).start();
091         *   }
092         * }
093         * }</pre>
094         *
095         * <p>Memory consistency effects: For each pair of threads that
096         * successfully exchange objects via an {@code Exchanger}, actions
097         * prior to the {@code exchange()} in each thread
098         * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
099         * those subsequent to a return from the corresponding {@code exchange()}
100         * in the other thread.
101         *
102         * @since 1.5
103         * @author Doug Lea and Bill Scherer and Michael Scott
104         * @param <V> The type of objects that may be exchanged
105         */
106        public class Exchanger<V> {
107            /*
108             * Algorithm Description:
109             *
110             * The basic idea is to maintain a "slot", which is a reference to
111             * a Node containing both an Item to offer and a "hole" waiting to
112             * get filled in.  If an incoming "occupying" thread sees that the
113             * slot is null, it CAS'es (compareAndSets) a Node there and waits
114             * for another to invoke exchange.  That second "fulfilling" thread
115             * sees that the slot is non-null, and so CASes it back to null,
116             * also exchanging items by CASing the hole, plus waking up the
117             * occupying thread if it is blocked.  In each case CAS'es may
118             * fail because a slot at first appears non-null but is null upon
119             * CAS, or vice-versa.  So threads may need to retry these
120             * actions.
121             *
122             * This simple approach works great when there are only a few
123             * threads using an Exchanger, but performance rapidly
124             * deteriorates due to CAS contention on the single slot when
125             * there are lots of threads using an exchanger.  So instead we use
126             * an "arena"; basically a kind of hash table with a dynamically
127             * varying number of slots, any one of which can be used by
128             * threads performing an exchange.  Incoming threads pick slots
129             * based on a hash of their Thread ids.  If an incoming thread
130             * fails to CAS in its chosen slot, it picks an alternative slot
131             * instead.  And similarly from there.  If a thread successfully
132             * CASes into a slot but no other thread arrives, it tries
133             * another, heading toward the zero slot, which always exists even
134             * if the table shrinks.  The particular mechanics controlling this
135             * are as follows:
136             *
137             * Waiting: Slot zero is special in that it is the only slot that
138             * exists when there is no contention.  A thread occupying slot
139             * zero will block if no thread fulfills it after a short spin.
140             * In other cases, occupying threads eventually give up and try
141             * another slot.  Waiting threads spin for a while (a period that
142             * should be a little less than a typical context-switch time)
143             * before either blocking (if slot zero) or giving up (if other
144             * slots) and restarting.  There is no reason for threads to block
145             * unless there are unlikely to be any other threads present.
146             * Occupants are mainly avoiding memory contention so sit there
147             * quietly polling for a shorter period than it would take to
148             * block and then unblock them.  Non-slot-zero waits that elapse
149             * because of lack of other threads waste around one extra
150             * context-switch time per try, which is still on average much
151             * faster than alternative approaches.
152             *
153             * Sizing: Usually, using only a few slots suffices to reduce
154             * contention.  Especially with small numbers of threads, using
155             * too many slots can lead to just as poor performance as using
156             * too few of them, and there's not much room for error.  The
157             * variable "max" maintains the number of slots actually in
158             * use.  It is increased when a thread sees too many CAS
159             * failures.  (This is analogous to resizing a regular hash table
160             * based on a target load factor, except here, growth steps are
161             * just one-by-one rather than proportional.)  Growth requires
162             * contention failures in each of three tried slots.  Requiring
163             * multiple failures for expansion copes with the fact that some
164             * failed CASes are not due to contention but instead to simple
165             * races between two threads or thread pre-emptions occurring
166             * between reading and CASing.  Also, very transient peak
167             * contention can be much higher than the average sustainable
168             * levels.  The max limit is decreased on average 50% of the times
169             * that a non-slot-zero wait elapses without being fulfilled.
170             * Threads experiencing elapsed waits move closer to zero, so
171             * eventually find existing (or future) threads even if the table
172             * has been shrunk due to inactivity.  The chosen mechanics and
173             * thresholds for growing and shrinking are intrinsically
174             * entangled with indexing and hashing inside the exchange code,
175             * and can't be nicely abstracted out.
176             *
177             * Hashing: Each thread picks its initial slot to use in accord
178             * with a simple hashcode.  The sequence is the same on each
179             * encounter by any given thread, but effectively random across
180             * threads.  Using arenas encounters the classic cost vs quality
181             * tradeoffs of all hash tables.  Here, we use a one-step FNV-1a
182             * hash code based on the current thread's Thread.getId(), along
183             * with a cheap approximation to a mod operation to select an
184             * index.  The downside of optimizing index selection in this way
185             * is that the code is hardwired to use a maximum table size of
186             * 32.  But this value more than suffices for known platforms and
187             * applications.
188             *
189             * Probing: On sensed contention of a selected slot, we probe
190             * sequentially through the table, analogously to linear probing
191             * after collision in a hash table.  (We move circularly, in
192             * reverse order, to mesh best with table growth and shrinkage
193             * rules.)  Except that to minimize the effects of false-alarms
194             * and cache thrashing, we try the first selected slot twice
195             * before moving.
196             *
197             * Padding: Even with contention management, slots are heavily
198             * contended, so use cache-padding to avoid poor memory
199             * performance.  Because of this, slots are lazily constructed
200             * only when used, to avoid wasting this space unnecessarily.
201             * While isolation of locations is not much of an issue at first
202             * in an application, as time goes on and garbage-collectors
203             * perform compaction, slots are very likely to be moved adjacent
204             * to each other, which can cause much thrashing of cache lines on
205             * MPs unless padding is employed.
206             *
207             * This is an improvement of the algorithm described in the paper
208             * "A Scalable Elimination-based Exchange Channel" by William
209             * Scherer, Doug Lea, and Michael Scott in Proceedings of SCOOL05
210             * workshop.  Available at: http://hdl.handle.net/1802/2104
211             */
212
213            /** The number of CPUs, for sizing and spin control */
214            private static final int NCPU = Runtime.getRuntime()
215                    .availableProcessors();
216
217            /**
218             * The capacity of the arena.  Set to a value that provides more
219             * than enough space to handle contention.  On small machines
220             * most slots won't be used, but it is still not wasted because
221             * the extra space provides some machine-level address padding
222             * to minimize interference with heavily CAS'ed Slot locations.
223             * And on very large machines, performance eventually becomes
224             * bounded by memory bandwidth, not numbers of threads/CPUs.
225             * This constant cannot be changed without also modifying
226             * indexing and hashing algorithms.
227             */
228            private static final int CAPACITY = 32;
229
230            /**
231             * The value of "max" that will hold all threads without
232             * contention.  When this value is less than CAPACITY, some
233             * otherwise wasted expansion can be avoided.
234             */
235            private static final int FULL = Math.max(0, Math.min(CAPACITY,
236                    NCPU / 2) - 1);
237
238            /**
239             * The number of times to spin (doing nothing except polling a
240             * memory location) before blocking or giving up while waiting to
241             * be fulfilled.  Should be zero on uniprocessors.  On
242             * multiprocessors, this value should be large enough so that two
243             * threads exchanging items as fast as possible block only when
244             * one of them is stalled (due to GC or preemption), but not much
245             * longer, to avoid wasting CPU resources.  Seen differently, this
246             * value is a little over half the number of cycles of an average
247             * context switch time on most systems.  The value here is
248             * approximately the average of those across a range of tested
249             * systems.
250             */
251            private static final int SPINS = (NCPU == 1) ? 0 : 2000;
252
253            /**
254             * The number of times to spin before blocking in timed waits.
255             * Timed waits spin more slowly because checking the time takes
256             * time.  The best value relies mainly on the relative rate of
257             * System.nanoTime vs memory accesses.  The value is empirically
258             * derived to work well across a variety of systems.
259             */
260            private static final int TIMED_SPINS = SPINS / 20;
261
262            /**
263             * Sentinel item representing cancellation of a wait due to
264             * interruption, timeout, or elapsed spin-waits.  This value is
265             * placed in holes on cancellation, and used as a return value
266             * from waiting methods to indicate failure to set or get hole.
267             */
268            private static final Object CANCEL = new Object();
269
270            /**
271             * Value representing null arguments/returns from public
272             * methods.  This disambiguates from internal requirement that
273             * holes start out as null to mean they are not yet set.
274             */
275            private static final Object NULL_ITEM = new Object();
276
277            /**
278             * Nodes hold partially exchanged data.  This class
279             * opportunistically subclasses AtomicReference to represent the
280             * hole.  So get() returns hole, and compareAndSet CAS'es value
281             * into hole.  This class cannot be parameterized as "V" because
282             * of the use of non-V CANCEL sentinels.
283             */
284            private static final class Node extends AtomicReference<Object> {
285                /** The element offered by the Thread creating this node. */
286                public final Object item;
287
288                /** The Thread waiting to be signalled; null until waiting. */
289                public volatile Thread waiter;
290
291                /**
292                 * Creates node with given item and empty hole.
293                 * @param item the item
294                 */
295                public Node(Object item) {
296                    this .item = item;
297                }
298            }
299
300            /**
301             * A Slot is an AtomicReference with heuristic padding to lessen
302             * cache effects of this heavily CAS'ed location.  While the
303             * padding adds noticeable space, all slots are created only on
304             * demand, and there will be more than one of them only when it
305             * would improve throughput more than enough to outweigh using
306             * extra space.
307             */
308            private static final class Slot extends AtomicReference<Object> {
309                // Improve likelihood of isolation on <= 64 byte cache lines
310                long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd,
311                        qe;
312            }
313
314            /**
315             * Slot array.  Elements are lazily initialized when needed.
316             * Declared volatile to enable double-checked lazy construction.
317             */
318            private volatile Slot[] arena = new Slot[CAPACITY];
319
320            /**
321             * The maximum slot index being used.  The value sometimes
322             * increases when a thread experiences too many CAS contentions,
323             * and sometimes decreases when a spin-wait elapses.  Changes
324             * are performed only via compareAndSet, to avoid stale values
325             * when a thread happens to stall right before setting.
326             */
327            private final AtomicInteger max = new AtomicInteger();
328
329            /**
330             * Main exchange function, handling the different policy variants.
331             * Uses Object, not "V" as argument and return value to simplify
332             * handling of sentinel values.  Callers from public methods decode
333             * and cast accordingly.
334             *
335             * @param item the (non-null) item to exchange
336             * @param timed true if the wait is timed
337             * @param nanos if timed, the maximum wait time
338             * @return the other thread's item, or CANCEL if interrupted or timed out
339             */
340            private Object doExchange(Object item, boolean timed, long nanos) {
341                Node me = new Node(item); // Create in case occupying
342                int index = hashIndex(); // Index of current slot
343                int fails = 0; // Number of CAS failures
344
345                for (;;) {
346                    Object y; // Contents of current slot
347                    Slot slot = arena[index];
348                    if (slot == null) // Lazily initialize slots
349                        createSlot(index); // Continue loop to reread
350                    else if ((y = slot.get()) != null && // Try to fulfill
351                            slot.compareAndSet(y, null)) {
352                        Node you = (Node) y; // Transfer item
353                        if (you.compareAndSet(null, item)) {
354                            LockSupport.unpark(you.waiter);
355                            return you.item;
356                        } // Else cancelled; continue
357                    } else if (y == null && // Try to occupy
358                            slot.compareAndSet(null, me)) {
359                        if (index == 0) // Blocking wait for slot 0
360                            return timed ? awaitNanos(me, slot, nanos) : await(
361                                    me, slot);
362                        Object v = spinWait(me, slot); // Spin wait for non-0
363                        if (v != CANCEL)
364                            return v;
365                        me = new Node(item); // Throw away cancelled node
366                        int m = max.get();
367                        if (m > (index >>>= 1)) // Decrease index
368                            max.compareAndSet(m, m - 1); // Maybe shrink table
369                    } else if (++fails > 1) { // Allow 2 fails on 1st slot
370                        int m = max.get();
371                        if (fails > 3 && m < FULL
372                                && max.compareAndSet(m, m + 1))
373                            index = m + 1; // Grow on 3rd failed slot
374                        else if (--index < 0)
375                            index = m; // Circularly traverse
376                    }
377                }
378            }
379
380            /**
381             * Returns a hash index for the current thread.  Uses a one-step
382             * FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/)
383             * based on the current thread's Thread.getId().  These hash codes
384             * have more uniform distribution properties with respect to small
385             * moduli (here 1-31) than do other simple hashing functions.
386             *
387             * <p>To return an index between 0 and max, we use a cheap
388             * approximation to a mod operation, that also corrects for bias
389             * due to non-power-of-2 remaindering (see {@link
390             * java.util.Random#nextInt}).  Bits of the hashcode are masked
391             * with "nbits", the ceiling power of two of table size (looked up
392             * in a table packed into three ints).  If too large, this is
393             * retried after rotating the hash by nbits bits, while forcing new
394             * top bit to 0, which guarantees eventual termination (although
395             * with a non-random-bias).  This requires an average of less than
396             * 2 tries for all table sizes, and has a maximum 2% difference
397             * from perfectly uniform slot probabilities when applied to all
398             * possible hash codes for sizes less than 32.
399             *
400             * @return a per-thread-random index, 0 <= index < max
401             */
402            private final int hashIndex() {
403                long id = Thread.currentThread().getId();
404                int hash = (((int) (id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;
405
406                int m = max.get();
407                int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1))
408                        ((0x000001f8 >>> m) & 2) | // The constants hold
409                ((0xffff00f2 >>> m) & 1)); // a lookup table
410                int index;
411                while ((index = hash & ((1 << nbits) - 1)) > m)
412                    // May retry on
413                    hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
414                return index;
415            }
416
417            /**
418             * Creates a new slot at given index.  Called only when the slot
419             * appears to be null.  Relies on double-check using builtin
420             * locks, since they rarely contend.  This in turn relies on the
421             * arena array being declared volatile.
422             *
423             * @param index the index to add slot at
424             */
425            private void createSlot(int index) {
426                // Create slot outside of lock to narrow sync region
427                Slot newSlot = new Slot();
428                Slot[] a = arena;
429                synchronized (a) {
430                    if (a[index] == null)
431                        a[index] = newSlot;
432                }
433            }
434
435            /**
436             * Tries to cancel a wait for the given node waiting in the given
437             * slot, if so, helping clear the node from its slot to avoid
438             * garbage retention.
439             *
440             * @param node the waiting node
441             * @param the slot it is waiting in
442             * @return true if successfully cancelled
443             */
444            private static boolean tryCancel(Node node, Slot slot) {
445                if (!node.compareAndSet(null, CANCEL))
446                    return false;
447                if (slot.get() == node) // pre-check to minimize contention
448                    slot.compareAndSet(node, null);
449                return true;
450            }
451
452            // Three forms of waiting. Each just different enough not to merge
453            // code with others.
454
455            /**
456             * Spin-waits for hole for a non-0 slot.  Fails if spin elapses
457             * before hole filled.  Does not check interrupt, relying on check
458             * in public exchange method to abort if interrupted on entry.
459             *
460             * @param node the waiting node
461             * @return on success, the hole; on failure, CANCEL
462             */
463            private static Object spinWait(Node node, Slot slot) {
464                int spins = SPINS;
465                for (;;) {
466                    Object v = node.get();
467                    if (v != null)
468                        return v;
469                    else if (spins > 0)
470                        --spins;
471                    else
472                        tryCancel(node, slot);
473                }
474            }
475
476            /**
477             * Waits for (by spinning and/or blocking) and gets the hole
478             * filled in by another thread.  Fails if interrupted before
479             * hole filled.
480             *
481             * When a node/thread is about to block, it sets its waiter field
482             * and then rechecks state at least one more time before actually
483             * parking, thus covering race vs fulfiller noticing that waiter
484             * is non-null so should be woken.
485             *
486             * Thread interruption status is checked only surrounding calls to
487             * park.  The caller is assumed to have checked interrupt status
488             * on entry.
489             *
490             * @param node the waiting node
491             * @return on success, the hole; on failure, CANCEL
492             */
493            private static Object await(Node node, Slot slot) {
494                Thread w = Thread.currentThread();
495                int spins = SPINS;
496                for (;;) {
497                    Object v = node.get();
498                    if (v != null)
499                        return v;
500                    else if (spins > 0) // Spin-wait phase
501                        --spins;
502                    else if (node.waiter == null) // Set up to block next
503                        node.waiter = w;
504                    else if (w.isInterrupted()) // Abort on interrupt
505                        tryCancel(node, slot);
506                    else
507                        // Block
508                        LockSupport.park(node);
509                }
510            }
511
512            /**
513             * Waits for (at index 0) and gets the hole filled in by another
514             * thread.  Fails if timed out or interrupted before hole filled.
515             * Same basic logic as untimed version, but a bit messier.
516             *
517             * @param node the waiting node
518             * @param nanos the wait time
519             * @return on success, the hole; on failure, CANCEL
520             */
521            private Object awaitNanos(Node node, Slot slot, long nanos) {
522                int spins = TIMED_SPINS;
523                long lastTime = 0;
524                Thread w = null;
525                for (;;) {
526                    Object v = node.get();
527                    if (v != null)
528                        return v;
529                    long now = System.nanoTime();
530                    if (w == null)
531                        w = Thread.currentThread();
532                    else
533                        nanos -= now - lastTime;
534                    lastTime = now;
535                    if (nanos > 0) {
536                        if (spins > 0)
537                            --spins;
538                        else if (node.waiter == null)
539                            node.waiter = w;
540                        else if (w.isInterrupted())
541                            tryCancel(node, slot);
542                        else
543                            LockSupport.parkNanos(node, nanos);
544                    } else if (tryCancel(node, slot) && !w.isInterrupted())
545                        return scanOnTimeout(node);
546                }
547            }
548
549            /**
550             * Sweeps through arena checking for any waiting threads.  Called
551             * only upon return from timeout while waiting in slot 0.  When a
552             * thread gives up on a timed wait, it is possible that a
553             * previously-entered thread is still waiting in some other
554             * slot.  So we scan to check for any.  This is almost always
555             * overkill, but decreases the likelihood of timeouts when there
556             * are other threads present to far less than that in lock-based
557             * exchangers in which earlier-arriving threads may still be
558             * waiting on entry locks.
559             *
560             * @param node the waiting node
561             * @return another thread's item, or CANCEL
562             */
563            private Object scanOnTimeout(Node node) {
564                Object y;
565                for (int j = arena.length - 1; j >= 0; --j) {
566                    Slot slot = arena[j];
567                    if (slot != null) {
568                        while ((y = slot.get()) != null) {
569                            if (slot.compareAndSet(y, null)) {
570                                Node you = (Node) y;
571                                if (you.compareAndSet(null, node.item)) {
572                                    LockSupport.unpark(you.waiter);
573                                    return you.item;
574                                }
575                            }
576                        }
577                    }
578                }
579                return CANCEL;
580            }
581
582            /**
583             * Creates a new Exchanger.
584             */
585            public Exchanger() {
586            }
587
588            /**
589             * Waits for another thread to arrive at this exchange point (unless
590             * the current thread is {@linkplain Thread#interrupt interrupted}),
591             * and then transfers the given object to it, receiving its object
592             * in return.
593             *
594             * <p>If another thread is already waiting at the exchange point then
595             * it is resumed for thread scheduling purposes and receives the object
596             * passed in by the current thread.  The current thread returns immediately,
597             * receiving the object passed to the exchange by that other thread.
598             *
599             * <p>If no other thread is already waiting at the exchange then the
600             * current thread is disabled for thread scheduling purposes and lies
601             * dormant until one of two things happens:
602             * <ul>
603             * <li>Some other thread enters the exchange; or
604             * <li>Some other thread {@linkplain Thread#interrupt interrupts} the current
605             * thread.
606             * </ul>
607             * <p>If the current thread:
608             * <ul>
609             * <li>has its interrupted status set on entry to this method; or
610             * <li>is {@linkplain Thread#interrupt interrupted} while waiting
611             * for the exchange,
612             * </ul>
613             * then {@link InterruptedException} is thrown and the current thread's
614             * interrupted status is cleared.
615             *
616             * @param x the object to exchange
617             * @return the object provided by the other thread
618             * @throws InterruptedException if the current thread was
619             *         interrupted while waiting
620             */
621            public V exchange(V x) throws InterruptedException {
622                if (!Thread.interrupted()) {
623                    Object v = doExchange(x == null ? NULL_ITEM : x, false, 0);
624                    if (v == NULL_ITEM)
625                        return null;
626                    if (v != CANCEL)
627                        return (V) v;
628                    Thread.interrupted(); // Clear interrupt status on IE throw
629                }
630                throw new InterruptedException();
631            }
632
633            /**
634             * Waits for another thread to arrive at this exchange point (unless
635             * the current thread is {@linkplain Thread#interrupt interrupted} or
636             * the specified waiting time elapses), and then transfers the given
637             * object to it, receiving its object in return.
638             *
639             * <p>If another thread is already waiting at the exchange point then
640             * it is resumed for thread scheduling purposes and receives the object
641             * passed in by the current thread.  The current thread returns immediately,
642             * receiving the object passed to the exchange by that other thread.
643             *
644             * <p>If no other thread is already waiting at the exchange then the
645             * current thread is disabled for thread scheduling purposes and lies
646             * dormant until one of three things happens:
647             * <ul>
648             * <li>Some other thread enters the exchange; or
649             * <li>Some other thread {@linkplain Thread#interrupt interrupts}
650             * the current thread; or
651             * <li>The specified waiting time elapses.
652             * </ul>
653             * <p>If the current thread:
654             * <ul>
655             * <li>has its interrupted status set on entry to this method; or
656             * <li>is {@linkplain Thread#interrupt interrupted} while waiting
657             * for the exchange,
658             * </ul>
659             * then {@link InterruptedException} is thrown and the current thread's
660             * interrupted status is cleared.
661             *
662             * <p>If the specified waiting time elapses then {@link
663             * TimeoutException} is thrown.  If the time is less than or equal
664             * to zero, the method will not wait at all.
665             *
666             * @param x the object to exchange
667             * @param timeout the maximum time to wait
668             * @param unit the time unit of the <tt>timeout</tt> argument
669             * @return the object provided by the other thread
670             * @throws InterruptedException if the current thread was
671             *         interrupted while waiting
672             * @throws TimeoutException if the specified waiting time elapses
673             *         before another thread enters the exchange
674             */
675            public V exchange(V x, long timeout, TimeUnit unit)
676                    throws InterruptedException, TimeoutException {
677                if (!Thread.interrupted()) {
678                    Object v = doExchange(x == null ? NULL_ITEM : x, true, unit
679                            .toNanos(timeout));
680                    if (v == NULL_ITEM)
681                        return null;
682                    if (v != CANCEL)
683                        return (V) v;
684                    if (!Thread.interrupted())
685                        throw new TimeoutException();
686                }
687                throw new InterruptedException();
688            }
689        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.