Source Code Cross Referenced for CyclicBarrier.java in  » Apache-Harmony-Java-SE » java-package » java » util » concurrent » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Apache Harmony Java SE » java package » java.util.concurrent 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Written by Doug Lea with assistance from members of JCP JSR-166
003:         * Expert Group and released to the public domain, as explained at
004:         * http://creativecommons.org/licenses/publicdomain
005:         */
006:
007:        package java.util.concurrent;
008:
009:        import java.util.concurrent.locks.*;
010:
011:        /**
012:         * A synchronization aid that allows a set of threads to all wait for
013:         * each other to reach a common barrier point.  CyclicBarriers are
014:         * useful in programs involving a fixed sized party of threads that
015:         * must occasionally wait for each other. The barrier is called
016:         * <em>cyclic</em> because it can be re-used after the waiting threads
017:         * are released.
018:         *
019:         * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
020:         * that is run once per barrier point, after the last thread in the party
021:         * arrives, but before any threads are released. 
022:         * This <em>barrier action</em> is useful
023:         * for updating shared-state before any of the parties continue.
024:         * 
025:         * <p><b>Sample usage:</b> Here is an example of
026:         *  using a barrier in a parallel decomposition design:
027:         * <pre>
028:         * class Solver {
029:         *   final int N;
030:         *   final float[][] data;
031:         *   final CyclicBarrier barrier;
032:         *   
033:         *   class Worker implements Runnable {
034:         *     int myRow;
035:         *     Worker(int row) { myRow = row; }
036:         *     public void run() {
037:         *       while (!done()) {
038:         *         processRow(myRow);
039:         *
040:         *         try {
041:         *           barrier.await(); 
042:         *         } catch (InterruptedException ex) { 
043:         *           return; 
044:         *         } catch (BrokenBarrierException ex) { 
045:         *           return; 
046:         *         }
047:         *       }
048:         *     }
049:         *   }
050:         *
051:         *   public Solver(float[][] matrix) {
052:         *     data = matrix;
053:         *     N = matrix.length;
054:         *     barrier = new CyclicBarrier(N, 
055:         *                                 new Runnable() {
056:         *                                   public void run() { 
057:         *                                     mergeRows(...); 
058:         *                                   }
059:         *                                 });
060:         *     for (int i = 0; i < N; ++i) 
061:         *       new Thread(new Worker(i)).start();
062:         *
063:         *     waitUntilDone();
064:         *   }
065:         * }
066:         * </pre>
067:         * Here, each worker thread processes a row of the matrix then waits at the 
068:         * barrier until all rows have been processed. When all rows are processed
069:         * the supplied {@link Runnable} barrier action is executed and merges the 
070:         * rows. If the merger
071:         * determines that a solution has been found then <tt>done()</tt> will return
072:         * <tt>true</tt> and each worker will terminate.
073:         *
074:         * <p>If the barrier action does not rely on the parties being suspended when
075:         * it is executed, then any of the threads in the party could execute that
076:         * action when it is released. To facilitate this, each invocation of
077:         * {@link #await} returns the arrival index of that thread at the barrier.
078:         * You can then choose which thread should execute the barrier action, for 
079:         * example:
080:         * <pre>  if (barrier.await() == 0) {
081:         *     // log the completion of this iteration
082:         *   }</pre>
083:         *
084:         * <p>The <tt>CyclicBarrier</tt> uses a fast-fail all-or-none breakage
085:         * model for failed synchronization attempts: If a thread leaves a
086:         * barrier point prematurely because of interruption, failure, or
087:         * timeout, all other threads, even those that have not yet resumed
088:         * from a previous {@link #await}. will also leave abnormally via
089:         * {@link BrokenBarrierException} (or <tt>InterruptedException</tt> if
090:         * they too were interrupted at about the same time).
091:         *
092:         * @since 1.5
093:         * @see CountDownLatch
094:         *
095:         * @author Doug Lea
096:         */
097:        public class CyclicBarrier {
098:            /** The lock for guarding barrier entry */
099:            private final ReentrantLock lock = new ReentrantLock();
100:            /** Condition to wait on until tripped */
101:            private final Condition trip = lock.newCondition();
102:            /** The number of parties */
103:            private final int parties;
104:            /* The command to run when tripped */
105:            private final Runnable barrierCommand;
106:
107:            /**
108:             * The generation number. Incremented upon barrier trip.
109:             * Retracted upon reset.
110:             */
111:            private long generation;
112:
113:            /** 
114:             * Breakage indicator.
115:             */
116:            private boolean broken;
117:
118:            /**
119:             * Number of parties still waiting. Counts down from parties to 0
120:             * on each cycle.
121:             */
122:            private int count;
123:
124:            /**
125:             * Updates state on barrier trip and wake up everyone.
126:             */
127:            private void nextGeneration() {
128:                count = parties;
129:                ++generation;
130:                trip.signalAll();
131:            }
132:
133:            /**
134:             * Sets barrier as broken and wake up everyone
135:             */
136:            private void breakBarrier() {
137:                broken = true;
138:                trip.signalAll();
139:            }
140:
141:            /**
142:             * Main barrier code, covering the various policies.
143:             */
144:            private int dowait(boolean timed, long nanos)
145:                    throws InterruptedException, BrokenBarrierException,
146:                    TimeoutException {
147:                final ReentrantLock lock = this .lock;
148:                lock.lock();
149:                try {
150:                    int index = --count;
151:                    long g = generation;
152:
153:                    if (broken)
154:                        throw new BrokenBarrierException();
155:
156:                    if (Thread.interrupted()) {
157:                        breakBarrier();
158:                        throw new InterruptedException();
159:                    }
160:
161:                    if (index == 0) { // tripped
162:                        nextGeneration();
163:                        boolean ranAction = false;
164:                        try {
165:                            Runnable command = barrierCommand;
166:                            if (command != null)
167:                                command.run();
168:                            ranAction = true;
169:                            return 0;
170:                        } finally {
171:                            if (!ranAction)
172:                                breakBarrier();
173:                        }
174:                    }
175:
176:                    for (;;) {
177:                        try {
178:                            if (!timed)
179:                                trip.await();
180:                            else if (nanos > 0L)
181:                                nanos = trip.awaitNanos(nanos);
182:                        } catch (InterruptedException ie) {
183:                            breakBarrier();
184:                            throw ie;
185:                        }
186:
187:                        if (broken || g > generation) // true if a reset occurred while waiting
188:                            throw new BrokenBarrierException();
189:
190:                        if (g < generation)
191:                            return index;
192:
193:                        if (timed && nanos <= 0L) {
194:                            breakBarrier();
195:                            throw new TimeoutException();
196:                        }
197:                    }
198:
199:                } finally {
200:                    lock.unlock();
201:                }
202:            }
203:
204:            /**
205:             * Creates a new <tt>CyclicBarrier</tt> that will trip when the
206:             * given number of parties (threads) are waiting upon it, and which
207:             * will execute the given barrier action when the barrier is tripped,
208:             * performed by the last thread entering the barrier.
209:             *
210:             * @param parties the number of threads that must invoke {@link #await}
211:             * before the barrier is tripped.
212:             * @param barrierAction the command to execute when the barrier is
213:             * tripped, or <tt>null</tt> if there is no action.
214:             *
215:             * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
216:             */
217:            public CyclicBarrier(int parties, Runnable barrierAction) {
218:                if (parties <= 0)
219:                    throw new IllegalArgumentException();
220:                this .parties = parties;
221:                this .count = parties;
222:                this .barrierCommand = barrierAction;
223:            }
224:
225:            /**
226:             * Creates a new <tt>CyclicBarrier</tt> that will trip when the
227:             * given number of parties (threads) are waiting upon it, and
228:             * does not perform a predefined action upon each barrier.
229:             *
230:             * @param parties the number of threads that must invoke {@link #await}
231:             * before the barrier is tripped.
232:             *
233:             * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
234:             */
235:            public CyclicBarrier(int parties) {
236:                this (parties, null);
237:            }
238:
239:            /**
240:             * Returns the number of parties required to trip this barrier.
241:             * @return the number of parties required to trip this barrier.
242:             **/
243:            public int getParties() {
244:                return parties;
245:            }
246:
247:            /**
248:             * Waits until all {@link #getParties parties} have invoked <tt>await</tt>
249:             * on this barrier.
250:             *
251:             * <p>If the current thread is not the last to arrive then it is
252:             * disabled for thread scheduling purposes and lies dormant until
253:             * one of following things happens:
254:             * <ul>
255:             * <li>The last thread arrives; or
256:             * <li>Some other thread {@link Thread#interrupt interrupts} the current
257:             * thread; or
258:             * <li>Some other thread  {@link Thread#interrupt interrupts} one of the
259:             * other waiting threads; or
260:             * <li>Some other thread times out while waiting for barrier; or
261:             * <li>Some other thread invokes {@link #reset} on this barrier.
262:             * </ul>
263:             * <p>If the current thread:
264:             * <ul>
265:             * <li>has its interrupted status set on entry to this method; or
266:             * <li>is {@link Thread#interrupt interrupted} while waiting
267:             * </ul>
268:             * then {@link InterruptedException} is thrown and the current thread's
269:             * interrupted status is cleared.
270:             *
271:             * <p>If the barrier is {@link #reset} while any thread is waiting, or if 
272:             * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked,
273:             * or while any thread is waiting,
274:             * then {@link BrokenBarrierException} is thrown.
275:             *
276:             * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
277:             * then all other waiting threads will throw 
278:             * {@link BrokenBarrierException} and the barrier is placed in the broken
279:             * state.
280:             *
281:             * <p>If the current thread is the last thread to arrive, and a
282:             * non-null barrier action was supplied in the constructor, then the
283:             * current thread runs the action before allowing the other threads to 
284:             * continue.
285:             * If an exception occurs during the barrier action then that exception
286:             * will be propagated in the current thread and the barrier is placed in
287:             * the broken state.
288:             *
289:             * @return the arrival index of the current thread, where index
290:             *  <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and 
291:             * zero indicates the last to arrive.
292:             *
293:             * @throws InterruptedException if the current thread was interrupted 
294:             * while waiting
295:             * @throws BrokenBarrierException if <em>another</em> thread was
296:             * interrupted while the current thread was waiting, or the barrier was
297:             * reset, or the barrier was broken when <tt>await</tt> was called,
298:             * or the barrier action (if present) failed due an exception.
299:             */
300:            public int await() throws InterruptedException,
301:                    BrokenBarrierException {
302:                try {
303:                    return dowait(false, 0L);
304:                } catch (TimeoutException toe) {
305:                    throw new Error(toe); // cannot happen;
306:                }
307:            }
308:
309:            /**
310:             * Waits until all {@link #getParties parties} have invoked <tt>await</tt>
311:             * on this barrier.
312:             *
313:             * <p>If the current thread is not the last to arrive then it is
314:             * disabled for thread scheduling purposes and lies dormant until
315:             * one of the following things happens:
316:             * <ul>
317:             * <li>The last thread arrives; or
318:             * <li>The specified timeout elapses; or
319:             * <li>Some other thread {@link Thread#interrupt interrupts} the current
320:             * thread; or
321:             * <li>Some other thread  {@link Thread#interrupt interrupts} one of the
322:             * other waiting threads; or
323:             * <li>Some other thread times out while waiting for barrier; or
324:             * <li>Some other thread invokes {@link #reset} on this barrier.
325:             * </ul>
326:             * <p>If the current thread:
327:             * <ul>
328:             * <li>has its interrupted status set on entry to this method; or
329:             * <li>is {@link Thread#interrupt interrupted} while waiting
330:             * </ul>
331:             * then {@link InterruptedException} is thrown and the current thread's
332:             * interrupted status is cleared.
333:             *
334:             * <p>If the barrier is {@link #reset} while any thread is waiting, or if 
335:             * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked,
336:             * or while any thread is waiting,
337:             * then {@link BrokenBarrierException} is thrown.
338:             *
339:             * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
340:             * then all other waiting threads will throw 
341:             * {@link BrokenBarrierException} and the barrier is placed in the broken
342:             * state.
343:             *
344:             * <p>If the current thread is the last thread to arrive, and a
345:             * non-null barrier action was supplied in the constructor, then the
346:             * current thread runs the action before allowing the other threads to 
347:             * continue.
348:             * If an exception occurs during the barrier action then that exception
349:             * will be propagated in the current thread and the barrier is placed in
350:             * the broken state.
351:             *
352:             * @param timeout the time to wait for the barrier
353:             * @param unit the time unit of the timeout parameter
354:             * @return the arrival index of the current thread, where index
355:             *  <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and 
356:             * zero indicates the last to arrive.
357:             *
358:             * @throws InterruptedException if the current thread was interrupted 
359:             * while waiting
360:             * @throws TimeoutException if the specified timeout elapses.
361:             * @throws BrokenBarrierException if <em>another</em> thread was
362:             * interrupted while the current thread was waiting, or the barrier was
363:             * reset, or the barrier was broken when <tt>await</tt> was called,
364:             * or the barrier action (if present) failed due an exception.
365:             */
366:            public int await(long timeout, TimeUnit unit)
367:                    throws InterruptedException, BrokenBarrierException,
368:                    TimeoutException {
369:                return dowait(true, unit.toNanos(timeout));
370:            }
371:
372:            /**
373:             * Queries if this barrier is in a broken state.
374:             * @return <tt>true</tt> if one or more parties broke out of this
375:             * barrier due to interruption or timeout since construction or
376:             * the last reset, or a barrier action failed due to an exception; 
377:             * and <tt>false</tt> otherwise.
378:             */
379:            public boolean isBroken() {
380:                final ReentrantLock lock = this .lock;
381:                lock.lock();
382:                try {
383:                    return broken;
384:                } finally {
385:                    lock.unlock();
386:                }
387:            }
388:
389:            /**
390:             * Resets the barrier to its initial state.  If any parties are
391:             * currently waiting at the barrier, they will return with a
392:             * {@link BrokenBarrierException}. Note that resets <em>after</em>
393:             * a breakage has occurred for other reasons can be complicated to
394:             * carry out; threads need to re-synchronize in some other way,
395:             * and choose one to perform the reset.  It may be preferable to
396:             * instead create a new barrier for subsequent use.
397:             */
398:            public void reset() {
399:                final ReentrantLock lock = this .lock;
400:                lock.lock();
401:                try {
402:                    /*
403:                     * Retract generation number enough to cover threads
404:                     * currently waiting on current and still resuming from
405:                     * previous generation, plus similarly accommodating spans
406:                     * after the reset.
407:                     */
408:                    generation -= 4;
409:                    broken = false;
410:                    trip.signalAll();
411:                } finally {
412:                    lock.unlock();
413:                }
414:            }
415:
416:            /**
417:             * Returns the number of parties currently waiting at the barrier.
418:             * This method is primarily useful for debugging and assertions.
419:             *
420:             * @return the number of parties currently blocked in {@link #await}
421:             **/
422:            public int getNumberWaiting() {
423:                final ReentrantLock lock = this.lock;
424:                lock.lock();
425:                try {
426:                    return parties - count;
427:                } finally {
428:                    lock.unlock();
429:                }
430:            }
431:
432:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.