Source Code Cross Referenced for CyclicBarrier.java in  » 6.0-JDK-Core » Collections-Jar-Zip-Logging-regex » java » util » concurrent » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Home
Java Source Code / Java Documentation
1.6.0 JDK Core
2.6.0 JDK Modules
3.6.0 JDK Modules com.sun
4.6.0 JDK Modules com.sun.java
5.6.0 JDK Modules sun
6.6.0 JDK Platform
7.Ajax
8.Apache Harmony Java SE
9.Aspect oriented
10.Authentication Authorization
11.Blogger System
12.Build
13.Byte Code
14.Cache
15.Chart
16.Chat
17.Code Analyzer
18.Collaboration
19.Content Management System
20.Database Client
21.Database DBMS
22.Database JDBC Connection Pool
23.Database ORM
24.Development
25.EJB Server
26.ERP CRM Financial
27.ESB
28.Forum
29.Game
30.GIS
31.Graphic 3D
32.Graphic Library
33.Groupware
34.HTML Parser
35.IDE
36.IDE Eclipse
37.IDE Netbeans
38.Installer
39.Internationalization Localization
40.Inversion of Control
41.Issue Tracking
42.J2EE
43.J2ME
44.JBoss
45.JMS
46.JMX
47.Library
48.Mail Clients
49.Music
50.Net
51.Parser
52.PDF
53.Portal
54.Profiler
55.Project Management
56.Report
57.RSS RDF
58.Rule Engine
59.Science
60.Scripting
61.Search Engine
62.Security
63.Sevlet Container
64.Source Control
65.Swing Library
66.Template Engine
67.Test Coverage
68.Testing
69.UML
70.Web Crawler
71.Web Framework
72.Web Mail
73.Web Server
74.Web Services
75.Web Services apache cxf 2.2.6
76.Web Services AXIS2
77.Wiki Engine
78.Workflow Engines
79.XML
80.XML UI
Java Source Code / Java Documentation » 6.0 JDK Core » Collections Jar Zip Logging regex » java.util.concurrent 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001        /*
002         * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
003         *
004         * This code is free software; you can redistribute it and/or modify it
005         * under the terms of the GNU General Public License version 2 only, as
006         * published by the Free Software Foundation.  Sun designates this
007         * particular file as subject to the "Classpath" exception as provided
008         * by Sun in the LICENSE file that accompanied this code.
009         *
010         * This code is distributed in the hope that it will be useful, but WITHOUT
011         * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012         * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
013         * version 2 for more details (a copy is included in the LICENSE file that
014         * accompanied this code).
015         *
016         * You should have received a copy of the GNU General Public License version
017         * 2 along with this work; if not, write to the Free Software Foundation,
018         * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
019         *
020         * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
021         * CA 95054 USA or visit www.sun.com if you need additional information or
022         * have any questions.
023         */
024
025        /*
026         * This file is available under and governed by the GNU General Public
027         * License version 2 only, as published by the Free Software Foundation.
028         * However, the following notice accompanied the original version of this
029         * file:
030         *
031         * Written by Doug Lea with assistance from members of JCP JSR-166
032         * Expert Group and released to the public domain, as explained at
033         * http://creativecommons.org/licenses/publicdomain
034         */
035
036        package java.util.concurrent;
037
038        import java.util.concurrent.locks.*;
039
040        /**
041         * A synchronization aid that allows a set of threads to all wait for
042         * each other to reach a common barrier point.  CyclicBarriers are
043         * useful in programs involving a fixed sized party of threads that
044         * must occasionally wait for each other. The barrier is called
045         * <em>cyclic</em> because it can be re-used after the waiting threads
046         * are released.
047         *
048         * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
049         * that is run once per barrier point, after the last thread in the party
050         * arrives, but before any threads are released.
051         * This <em>barrier action</em> is useful
052         * for updating shared-state before any of the parties continue.
053         *
054         * <p><b>Sample usage:</b> Here is an example of
055         *  using a barrier in a parallel decomposition design:
056         * <pre>
057         * class Solver {
058         *   final int N;
059         *   final float[][] data;
060         *   final CyclicBarrier barrier;
061         *
062         *   class Worker implements Runnable {
063         *     int myRow;
064         *     Worker(int row) { myRow = row; }
065         *     public void run() {
066         *       while (!done()) {
067         *         processRow(myRow);
068         *
069         *         try {
070         *           barrier.await();
071         *         } catch (InterruptedException ex) {
072         *           return;
073         *         } catch (BrokenBarrierException ex) {
074         *           return;
075         *         }
076         *       }
077         *     }
078         *   }
079         *
080         *   public Solver(float[][] matrix) {
081         *     data = matrix;
082         *     N = matrix.length;
083         *     barrier = new CyclicBarrier(N,
084         *                                 new Runnable() {
085         *                                   public void run() {
086         *                                     mergeRows(...);
087         *                                   }
088         *                                 });
089         *     for (int i = 0; i < N; ++i)
090         *       new Thread(new Worker(i)).start();
091         *
092         *     waitUntilDone();
093         *   }
094         * }
095         * </pre>
096         * Here, each worker thread processes a row of the matrix then waits at the
097         * barrier until all rows have been processed. When all rows are processed
098         * the supplied {@link Runnable} barrier action is executed and merges the
099         * rows. If the merger
100         * determines that a solution has been found then <tt>done()</tt> will return
101         * <tt>true</tt> and each worker will terminate.
102         *
103         * <p>If the barrier action does not rely on the parties being suspended when
104         * it is executed, then any of the threads in the party could execute that
105         * action when it is released. To facilitate this, each invocation of
106         * {@link #await} returns the arrival index of that thread at the barrier.
107         * You can then choose which thread should execute the barrier action, for
108         * example:
109         * <pre>  if (barrier.await() == 0) {
110         *     // log the completion of this iteration
111         *   }</pre>
112         *
113         * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
114         * for failed synchronization attempts: If a thread leaves a barrier
115         * point prematurely because of interruption, failure, or timeout, all
116         * other threads waiting at that barrier point will also leave
117         * abnormally via {@link BrokenBarrierException} (or
118         * {@link InterruptedException} if they too were interrupted at about
119         * the same time).
120         *
121         * <p>Memory consistency effects: Actions in a thread prior to calling
122         * {@code await()}
123         * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
124         * actions that are part of the barrier action, which in turn
125         * <i>happen-before</i> actions following a successful return from the
126         * corresponding {@code await()} in other threads.
127         *
128         * @since 1.5
129         * @see CountDownLatch
130         *
131         * @author Doug Lea
132         */
133        public class CyclicBarrier {
134            /**
135             * Each use of the barrier is represented as a generation instance.
136             * The generation changes whenever the barrier is tripped, or
137             * is reset. There can be many generations associated with threads
138             * using the barrier - due to the non-deterministic way the lock
139             * may be allocated to waiting threads - but only one of these
140             * can be active at a time (the one to which <tt>count</tt> applies)
141             * and all the rest are either broken or tripped.
142             * There need not be an active generation if there has been a break
143             * but no subsequent reset.
144             */
145            private static class Generation {
146                boolean broken = false;
147            }
148
149            /** The lock for guarding barrier entry */
150            private final ReentrantLock lock = new ReentrantLock();
151            /** Condition to wait on until tripped */
152            private final Condition trip = lock.newCondition();
153            /** The number of parties */
154            private final int parties;
155            /* The command to run when tripped */
156            private final Runnable barrierCommand;
157            /** The current generation */
158            private Generation generation = new Generation();
159
160            /**
161             * Number of parties still waiting. Counts down from parties to 0
162             * on each generation.  It is reset to parties on each new
163             * generation or when broken.
164             */
165            private int count;
166
167            /**
168             * Updates state on barrier trip and wakes up everyone.
169             * Called only while holding lock.
170             */
171            private void nextGeneration() {
172                // signal completion of last generation
173                trip.signalAll();
174                // set up next generation
175                count = parties;
176                generation = new Generation();
177            }
178
179            /**
180             * Sets current barrier generation as broken and wakes up everyone.
181             * Called only while holding lock.
182             */
183            private void breakBarrier() {
184                generation.broken = true;
185                count = parties;
186                trip.signalAll();
187            }
188
189            /**
190             * Main barrier code, covering the various policies.
191             */
192            private int dowait(boolean timed, long nanos)
193                    throws InterruptedException, BrokenBarrierException,
194                    TimeoutException {
195                final ReentrantLock lock = this .lock;
196                lock.lock();
197                try {
198                    final Generation g = generation;
199
200                    if (g.broken)
201                        throw new BrokenBarrierException();
202
203                    if (Thread.interrupted()) {
204                        breakBarrier();
205                        throw new InterruptedException();
206                    }
207
208                    int index = --count;
209                    if (index == 0) { // tripped
210                        boolean ranAction = false;
211                        try {
212                            final Runnable command = barrierCommand;
213                            if (command != null)
214                                command.run();
215                            ranAction = true;
216                            nextGeneration();
217                            return 0;
218                        } finally {
219                            if (!ranAction)
220                                breakBarrier();
221                        }
222                    }
223
224                    // loop until tripped, broken, interrupted, or timed out
225                    for (;;) {
226                        try {
227                            if (!timed)
228                                trip.await();
229                            else if (nanos > 0L)
230                                nanos = trip.awaitNanos(nanos);
231                        } catch (InterruptedException ie) {
232                            if (g == generation && !g.broken) {
233                                breakBarrier();
234                                throw ie;
235                            } else {
236                                // We're about to finish waiting even if we had not
237                                // been interrupted, so this interrupt is deemed to
238                                // "belong" to subsequent execution.
239                                Thread.currentThread().interrupt();
240                            }
241                        }
242
243                        if (g.broken)
244                            throw new BrokenBarrierException();
245
246                        if (g != generation)
247                            return index;
248
249                        if (timed && nanos <= 0L) {
250                            breakBarrier();
251                            throw new TimeoutException();
252                        }
253                    }
254                } finally {
255                    lock.unlock();
256                }
257            }
258
259            /**
260             * Creates a new <tt>CyclicBarrier</tt> that will trip when the
261             * given number of parties (threads) are waiting upon it, and which
262             * will execute the given barrier action when the barrier is tripped,
263             * performed by the last thread entering the barrier.
264             *
265             * @param parties the number of threads that must invoke {@link #await}
266             *        before the barrier is tripped
267             * @param barrierAction the command to execute when the barrier is
268             *        tripped, or {@code null} if there is no action
269             * @throws IllegalArgumentException if {@code parties} is less than 1
270             */
271            public CyclicBarrier(int parties, Runnable barrierAction) {
272                if (parties <= 0)
273                    throw new IllegalArgumentException();
274                this .parties = parties;
275                this .count = parties;
276                this .barrierCommand = barrierAction;
277            }
278
279            /**
280             * Creates a new <tt>CyclicBarrier</tt> that will trip when the
281             * given number of parties (threads) are waiting upon it, and
282             * does not perform a predefined action when the barrier is tripped.
283             *
284             * @param parties the number of threads that must invoke {@link #await}
285             *        before the barrier is tripped
286             * @throws IllegalArgumentException if {@code parties} is less than 1
287             */
288            public CyclicBarrier(int parties) {
289                this (parties, null);
290            }
291
292            /**
293             * Returns the number of parties required to trip this barrier.
294             *
295             * @return the number of parties required to trip this barrier
296             */
297            public int getParties() {
298                return parties;
299            }
300
301            /**
302             * Waits until all {@linkplain #getParties parties} have invoked
303             * <tt>await</tt> on this barrier.
304             *
305             * <p>If the current thread is not the last to arrive then it is
306             * disabled for thread scheduling purposes and lies dormant until
307             * one of the following things happens:
308             * <ul>
309             * <li>The last thread arrives; or
310             * <li>Some other thread {@linkplain Thread#interrupt interrupts}
311             * the current thread; or
312             * <li>Some other thread {@linkplain Thread#interrupt interrupts}
313             * one of the other waiting threads; or
314             * <li>Some other thread times out while waiting for barrier; or
315             * <li>Some other thread invokes {@link #reset} on this barrier.
316             * </ul>
317             *
318             * <p>If the current thread:
319             * <ul>
320             * <li>has its interrupted status set on entry to this method; or
321             * <li>is {@linkplain Thread#interrupt interrupted} while waiting
322             * </ul>
323             * then {@link InterruptedException} is thrown and the current thread's
324             * interrupted status is cleared.
325             *
326             * <p>If the barrier is {@link #reset} while any thread is waiting,
327             * or if the barrier {@linkplain #isBroken is broken} when
328             * <tt>await</tt> is invoked, or while any thread is waiting, then
329             * {@link BrokenBarrierException} is thrown.
330             *
331             * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
332             * then all other waiting threads will throw
333             * {@link BrokenBarrierException} and the barrier is placed in the broken
334             * state.
335             *
336             * <p>If the current thread is the last thread to arrive, and a
337             * non-null barrier action was supplied in the constructor, then the
338             * current thread runs the action before allowing the other threads to
339             * continue.
340             * If an exception occurs during the barrier action then that exception
341             * will be propagated in the current thread and the barrier is placed in
342             * the broken state.
343             *
344             * @return the arrival index of the current thread, where index
345             *         <tt>{@link #getParties()} - 1</tt> indicates the first
346             *         to arrive and zero indicates the last to arrive
347             * @throws InterruptedException if the current thread was interrupted
348             *         while waiting
349             * @throws BrokenBarrierException if <em>another</em> thread was
350             *         interrupted or timed out while the current thread was
351             *         waiting, or the barrier was reset, or the barrier was
352             *         broken when {@code await} was called, or the barrier
353             *         action (if present) failed due an exception.
354             */
355            public int await() throws InterruptedException,
356                    BrokenBarrierException {
357                try {
358                    return dowait(false, 0L);
359                } catch (TimeoutException toe) {
360                    throw new Error(toe); // cannot happen;
361                }
362            }
363
364            /**
365             * Waits until all {@linkplain #getParties parties} have invoked
366             * <tt>await</tt> on this barrier, or the specified waiting time elapses.
367             *
368             * <p>If the current thread is not the last to arrive then it is
369             * disabled for thread scheduling purposes and lies dormant until
370             * one of the following things happens:
371             * <ul>
372             * <li>The last thread arrives; or
373             * <li>The specified timeout elapses; or
374             * <li>Some other thread {@linkplain Thread#interrupt interrupts}
375             * the current thread; or
376             * <li>Some other thread {@linkplain Thread#interrupt interrupts}
377             * one of the other waiting threads; or
378             * <li>Some other thread times out while waiting for barrier; or
379             * <li>Some other thread invokes {@link #reset} on this barrier.
380             * </ul>
381             *
382             * <p>If the current thread:
383             * <ul>
384             * <li>has its interrupted status set on entry to this method; or
385             * <li>is {@linkplain Thread#interrupt interrupted} while waiting
386             * </ul>
387             * then {@link InterruptedException} is thrown and the current thread's
388             * interrupted status is cleared.
389             *
390             * <p>If the specified waiting time elapses then {@link TimeoutException}
391             * is thrown. If the time is less than or equal to zero, the
392             * method will not wait at all.
393             *
394             * <p>If the barrier is {@link #reset} while any thread is waiting,
395             * or if the barrier {@linkplain #isBroken is broken} when
396             * <tt>await</tt> is invoked, or while any thread is waiting, then
397             * {@link BrokenBarrierException} is thrown.
398             *
399             * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
400             * waiting, then all other waiting threads will throw {@link
401             * BrokenBarrierException} and the barrier is placed in the broken
402             * state.
403             *
404             * <p>If the current thread is the last thread to arrive, and a
405             * non-null barrier action was supplied in the constructor, then the
406             * current thread runs the action before allowing the other threads to
407             * continue.
408             * If an exception occurs during the barrier action then that exception
409             * will be propagated in the current thread and the barrier is placed in
410             * the broken state.
411             *
412             * @param timeout the time to wait for the barrier
413             * @param unit the time unit of the timeout parameter
414             * @return the arrival index of the current thread, where index
415             *         <tt>{@link #getParties()} - 1</tt> indicates the first
416             *         to arrive and zero indicates the last to arrive
417             * @throws InterruptedException if the current thread was interrupted
418             *         while waiting
419             * @throws TimeoutException if the specified timeout elapses
420             * @throws BrokenBarrierException if <em>another</em> thread was
421             *         interrupted or timed out while the current thread was
422             *         waiting, or the barrier was reset, or the barrier was broken
423             *         when {@code await} was called, or the barrier action (if
424             *         present) failed due an exception
425             */
426            public int await(long timeout, TimeUnit unit)
427                    throws InterruptedException, BrokenBarrierException,
428                    TimeoutException {
429                return dowait(true, unit.toNanos(timeout));
430            }
431
432            /**
433             * Queries if this barrier is in a broken state.
434             *
435             * @return {@code true} if one or more parties broke out of this
436             *         barrier due to interruption or timeout since
437             *         construction or the last reset, or a barrier action
438             *         failed due to an exception; {@code false} otherwise.
439             */
440            public boolean isBroken() {
441                final ReentrantLock lock = this .lock;
442                lock.lock();
443                try {
444                    return generation.broken;
445                } finally {
446                    lock.unlock();
447                }
448            }
449
450            /**
451             * Resets the barrier to its initial state.  If any parties are
452             * currently waiting at the barrier, they will return with a
453             * {@link BrokenBarrierException}. Note that resets <em>after</em>
454             * a breakage has occurred for other reasons can be complicated to
455             * carry out; threads need to re-synchronize in some other way,
456             * and choose one to perform the reset.  It may be preferable to
457             * instead create a new barrier for subsequent use.
458             */
459            public void reset() {
460                final ReentrantLock lock = this .lock;
461                lock.lock();
462                try {
463                    breakBarrier(); // break the current generation
464                    nextGeneration(); // start a new generation
465                } finally {
466                    lock.unlock();
467                }
468            }
469
470            /**
471             * Returns the number of parties currently waiting at the barrier.
472             * This method is primarily useful for debugging and assertions.
473             *
474             * @return the number of parties currently blocked in {@link #await}
475             */
476            public int getNumberWaiting() {
477                final ReentrantLock lock = this.lock;
478                lock.lock();
479                try {
480                    return parties - count;
481                } finally {
482                    lock.unlock();
483                }
484            }
485        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.