Source Code Cross Referenced for Parallelizer.java in  » Template-Engine » ostermillerutils » com » Ostermiller » util » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Template Engine » ostermillerutils » com.Ostermiller.util 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Runs multiple jobs in parallel
003:         * Copyright (C) 2004-2005 Matt Conway
004:         * http://simplygenius.com/
005:         * Copyright (C) 2005 Stephen Ostermiller
006:         * http://ostermiller.org/contact.pl?regarding=Java+Utilities
007:         *
008:         * This program is free software; you can redistribute it and/or modify
009:         * it under the terms of the GNU General Public License as published by
010:         * the Free Software Foundation; either version 2 of the License, or
011:         * (at your option) any later version.
012:         *
013:         * This program is distributed in the hope that it will be useful,
014:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
015:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016:         * GNU General Public License for more details.
017:         *
018:         * See COPYING.TXT for details.
019:         */
020:
021:        package com.Ostermiller.util;
022:
023:        import java.util.*;
024:
025:        /**
026:         * Runs multiple jobs in parallel, n threads at a time, and waits
027:         * until all threads are complete before continuing.
028:         * <p>
029:         * Typically, Parallelizer would be used to run each of the items-
030:         * in a for loop at the same time.  For example the following for
031:         * loop:
032:         * <pre>
033:         * for (int i=0; i<10; i++){
034:         *    System.out.println("Hello World " + i);
035:         * }
036:         * System.out.println("done");
037:         * </pre>
038:         * To this:
039:         * <pre>
040:         * Parallelizer parallelizer = new Parallelizer();
041:         * for (int i=0; i<10; i++){
042:         *     final int j = i;
043:         *     parallelizer.run(
044:         *         new Runnable(){
045:         *             System.out.println("Hello World " + j);
046:         *         }
047:         *     );
048:         * }
049:         * parallelizer.join();
050:         * System.out.println("done");
051:         *
052:         * More information about this class is available from <a target="_top" href=
053:         * "http://ostermiller.org/utils/Parallelizer.html">ostermiller.org</a>.
054:         *
055:         * @author Matt Conway - http://simplygenius.com/
056:         * @author Stephen Ostermiller - http://ostermiller.org/contact.pl?regarding=Java+Utilities
057:         * @since ostermillerutils 1.05.00
058:         */
059:        public class Parallelizer {
060:            /**
061:             * Constant that may be passed concurrentThreadLimit argument
062:             * of the constructor indicating that no limit should be placed
063:             * on the number of threads that are allowed to run concurrently.
064:             *
065:             * @since ostermillerutils 1.05.00
066:             */
067:            public static final int INFINITE_THREAD_LIMIT = 0;
068:
069:            /**
070:             * The number of threads that are allowed to be run concurrently.
071:             * (INFINITE_THREAD_LIMIT for no limit)
072:             */
073:            private int concurrentThreadLimit = INFINITE_THREAD_LIMIT;
074:
075:            /**
076:             * Create a new Parallelizer with no limit on the number
077:             * of threads that will be allowed to be run concurrently.
078:             *
079:             * @since ostermillerutils 1.05.00
080:             */
081:            public Parallelizer() {
082:                this (INFINITE_THREAD_LIMIT);
083:            }
084:
085:            /**
086:             * Create a new Parallelizer with the specified limit on the number
087:             * of threads that will be allowed to be run concurrently.
088:             * <p>
089:             * When the concurrent thread limit is reached and the parallelizer
090:             * gets a new thread to run, the new thread will be queued until
091:             * a thread finishes.
092:             *
093:             * @param concurrentThreadLimit number of threads that will be allowed
094:             *     to run simultaneously or INFINITE_THREAD_LIMIT for no limit.
095:             * @throws IllegalArgumentException if concurrentThreadLimit not a whole
096:             *     number or INFINITE_THREAD_LIMIT
097:             *
098:             * @since ostermillerutils 1.05.00
099:             */
100:            public Parallelizer(int concurrentThreadLimit) {
101:                if (concurrentThreadLimit < INFINITE_THREAD_LIMIT)
102:                    throw new IllegalArgumentException(
103:                            "Bad concurrent thread limit: "
104:                                    + concurrentThreadLimit);
105:                this .concurrentThreadLimit = concurrentThreadLimit;
106:            }
107:
108:            /**
109:             * A Set of threads that are currently running.
110:             * This set is also used as a lock to synchronize
111:             * anything that touches running threads.
112:             */
113:            private HashSet<Thread> runningThreads = new HashSet<Thread>();
114:
115:            /**
116:             * A queue of jobs that have not yet been started.
117:             */
118:            private LinkedList<Thread> toRunQueue = new LinkedList<Thread>();
119:
120:            /**
121:             * Run the given job.  The given job is either run
122:             * immediately or if the max number of concurrent jobs are already
123:             * running, it is queued to be run when some job is finished.
124:             * <p>
125:             * If this method throws an error, that
126:             * error may be handled and this method
127:             * may be called again as it will not re-throw the same
128:             * instance of the error.
129:             *
130:             * @param job job which is to be run in parallel with other jobs.
131:             * @throws Error if any thread that is already running has thrown an Error.
132:             * @throws NullPointerException if job is null.
133:             *
134:             * @since ostermillerutils 1.05.00
135:             */
136:            public void run(Runnable job) {
137:                run(null, job, null, 0);
138:            }
139:
140:            /**
141:             * Run the given job.  The given job is either run
142:             * immediately or if the max number of concurrent jobs are already
143:             * running, it is queued to be run when some job is finished.
144:             * <p>
145:             * If this method throws an error, that
146:             * error may be handled and this method
147:             * may be called again as it will not re-throw the same
148:             * instance of the error.
149:             *
150:             * @param job job which is to be run in parallel with other jobs.
151:             * @param threadName name for the thread that will be created to run the job (null for auto generated thread name)
152:             * @throws Error if any thread that is already running has thrown an Error.
153:             * @throws NullPointerException if job is null.
154:             *
155:             * @since ostermillerutils 1.05.00
156:             */
157:            public void run(Runnable job, String threadName) {
158:                run(null, job, threadName, 0);
159:            }
160:
161:            /**
162:             * Run the given job.  The given job is either run
163:             * immediately or if the max number of concurrent jobs are already
164:             * running, it is queued to be run when some job is finished.
165:             * <p>
166:             * If this method throws an error, that
167:             * error may be handled and this method
168:             * may be called again as it will not re-throw the same
169:             * instance of the error.
170:             *
171:             * @param threadGroup group in which this job should be run (null for default group).
172:             * @param job job which is to be run in parallel with other jobs.
173:             * @throws Error if any thread that is already running has thrown an Error.
174:             * @throws NullPointerException if job is null.
175:             *
176:             * @since ostermillerutils 1.05.00
177:             */
178:            public void run(ThreadGroup threadGroup, Runnable job) {
179:                run(threadGroup, job, null, 0);
180:            }
181:
182:            /**
183:             * Run the given job.  The given job is either run
184:             * immediately or if the max number of concurrent jobs are already
185:             * running, it is queued to be run when some job is finished.
186:             * <p>
187:             * If this method throws an error, that
188:             * error may be handled and this method
189:             * may be called again as it will not re-throw the same
190:             * instance of the error.
191:             *
192:             * @param threadGroup group in which this job should be run (null for default group).
193:             * @param job job which is to be run in parallel with other jobs.
194:             * @param threadName name for the thread that will be created to run the job (null for auto generated thread name)
195:             * @throws Error if any thread that is already running has thrown an Error.
196:             * @throws NullPointerException if job is null.
197:             *
198:             * @since ostermillerutils 1.05.00
199:             */
200:            public void run(ThreadGroup threadGroup, Runnable job,
201:                    String threadName) {
202:                run(threadGroup, job, threadName, 0);
203:            }
204:
205:            /**
206:             * Run the given job.  The given job is either run
207:             * immediately or if the max number of concurrent jobs are already
208:             * running, it is queued to be run when some job is finished.
209:             * <p>
210:             * If this method throws an error, that
211:             * error may be handled and this method
212:             * may be called again as it will not re-throw the same
213:             * instance of the error.
214:             *
215:             * @param threadGroup group in which this job should be run (null for default group).
216:             * @param job job which is to be run in parallel with other jobs.
217:             * @param threadName name for the thread that will be created to run the job (null for auto generated thread name)
218:             * @param stackSize system dependent stack size suggestion for thread creation (0 for default stack size).
219:             * @throws Error if any thread that is already running has thrown an Error.
220:             * @throws NullPointerException if job is null.
221:             *
222:             * @since ostermillerutils 1.05.00
223:             */
224:            public void run(ThreadGroup threadGroup, final Runnable job,
225:                    String threadName, long stackSize) {
226:                throwFirstError();
227:
228:                Runnable jobWrapper = new Runnable() {
229:                    public void run() {
230:                        try {
231:                            job.run();
232:                        } catch (RuntimeException runtimeException) {
233:                            // Put exceptions in the exception queue
234:                            synchronized (runningThreads) {
235:                                exceptionList.add(runtimeException);
236:                            }
237:                        } catch (Error error) {
238:                            // Put errors in the error queue
239:                            synchronized (runningThreads) {
240:                                errorList.add(error);
241:                            }
242:                        } finally {
243:                            synchronized (runningThreads) {
244:                                // when done remove ourselves from the list
245:                                // of running threads.
246:                                runningThreads.remove(Thread.currentThread());
247:                                // Notify the block method.
248:                                runningThreads.notifyAll();
249:                            }
250:                            // If there are jobs queued up to be run, now would
251:                            // be a good time to run them.
252:                            startAJobIfNeeded();
253:                        }
254:                    }
255:                };
256:
257:                // ensure the thread name is not null, and auto generate a name if it is
258:                threadName = getNextThreadName(threadName);
259:
260:                // If we are already running the max number of jobs, queue this job up
261:                synchronized (runningThreads) {
262:                    toRunQueue.add(new Thread(threadGroup, jobWrapper,
263:                            threadName, stackSize));
264:                }
265:
266:                // Now that the job is in the queue of jobs to run,
267:                // check the queue and see if the job should be started
268:                startAJobIfNeeded();
269:            }
270:
271:            /**
272:             * An number to assign to the next auto generated thread name
273:             */
274:            private static int threadNameCount = 0;
275:
276:            /**
277:             * Ensure the given thread name is not null.  If not null, return it,
278:             * if it is null, then then generate a name.
279:             *
280:             * @param threadName existing thread name to check
281:             * @return the given thread name or a generated thread name if the specified name was null.
282:             */
283:            private static String getNextThreadName(String threadName) {
284:                if (threadName != null)
285:                    return threadName;
286:                return "Parallelizer-" + (threadNameCount++);
287:            }
288:
289:            /**
290:             * A queue of exceptions that running threads have thrown.
291:             */
292:            private LinkedList<RuntimeException> exceptionList = new LinkedList<RuntimeException>();
293:
294:            /**
295:             * Remove the first exception from the exception list and throw it.
296:             *
297:             * @throws RuntimeException if a running thread has thrown an exception not yet thrown by this method.
298:             */
299:            private void throwFirstException() {
300:                synchronized (runningThreads) {
301:                    if (exceptionList.size() > 0) {
302:                        throw exceptionList.removeFirst();
303:                    }
304:                }
305:            }
306:
307:            /**
308:             * A queue of exceptions that running threads have thrown.
309:             */
310:            private LinkedList<Error> errorList = new LinkedList<Error>();
311:
312:            /**
313:             * Remove the first error from the error list and throw it.
314:             *
315:             * @throws Error if a running thread has thrown an error not yet thrown by this method.
316:             */
317:            private void throwFirstError() throws Error {
318:                synchronized (runningThreads) {
319:                    if (errorList.size() > 0) {
320:                        throw errorList.removeFirst();
321:                    }
322:                }
323:            }
324:
325:            /**
326:             * Remove a job from the toRunQueue, create a thread for it,
327:             * start the thread, and put the job in the set of running jobs.
328:             * But do all this only if there are jobs queued up to be run
329:             * and we are not already running the max number of concurrent
330:             * jobs at once.
331:             */
332:            private void startAJobIfNeeded() {
333:                synchronized (runningThreads) {
334:                    // If we are already running the max number of jobs, just return
335:                    if (concurrentThreadLimit != INFINITE_THREAD_LIMIT) {
336:                        if (runningThreads.size() >= concurrentThreadLimit)
337:                            return;
338:                    }
339:
340:                    // If there are no more job to run, return
341:                    if (toRunQueue.size() == 0)
342:                        return;
343:
344:                    // Get a job out of the queue
345:                    Thread thread = toRunQueue.removeFirst();
346:
347:                    // Put the thread in the list of running threads
348:                    runningThreads.add(thread);
349:                    thread.start();
350:                }
351:            }
352:
353:            /**
354:             * Return true iff all jobs that have been requested to run
355:             * in this Parallelizer have completed.
356:             * <p>
357:             * If this method throws an error, that
358:             * error may be handled and this method
359:             * may be called again as it will not re-throw the same
360:             * instance of the error.
361:             *
362:             * @return Whether all jobs are done or not.
363:             * @throws Error if any of the running threads has thrown an Error.
364:             *
365:             * @since ostermillerutils 1.05.00
366:             */
367:            public boolean done() {
368:                throwFirstError();
369:                synchronized (runningThreads) {
370:                    return (toRunQueue.size() + runningThreads.size()) == 0;
371:                }
372:            }
373:
374:            /**
375:             * All currently running threads will be interrupted.
376:             * The threads interrupted threads may die, causing
377:             * jobs that were queued but not yet started, to start.
378:             * <p>
379:             * If this method throws an error, that
380:             * error may be handled and this method
381:             * may be called again as it will not re-throw the same
382:             * instance of the error.
383:             *
384:             * @throws Error if any of the running threads has thrown an Error.
385:             *
386:             * @since ostermillerutils 1.05.00
387:             */
388:            public void interrupt() {
389:                throwFirstError();
390:                synchronized (runningThreads) {
391:                    for (Thread thread : runningThreads) {
392:                        (thread).interrupt();
393:                        throwFirstError();
394:                    }
395:                }
396:            }
397:
398:            /**
399:             * Dump the stack of each running thread.
400:             * <p>
401:             * If this method throws an error, that
402:             * error may be handled and this method
403:             * may be called again as it will not re-throw the same
404:             * instance of the error.
405:             *
406:             * @throws Error if any of the running threads has thrown an Error.
407:             *
408:             * @since ostermillerutils 1.05.00
409:             */
410:            public void dumpStack() {
411:                throwFirstError();
412:                synchronized (runningThreads) {
413:                    for (Thread thread : runningThreads) {
414:                        for (StackTraceElement stackTraceElement : thread
415:                                .getStackTrace()) {
416:                            System.out.println(stackTraceElement.toString());
417:                        }
418:                        throwFirstError();
419:                    }
420:                }
421:            }
422:
423:            /**
424:             * Gets a list of all running threads.  There may be jobs that
425:             * are queued and do not yet have threads.  These job are not
426:             * returned.
427:             * <p>
428:             * If this method throws an error, that
429:             * error may be handled and this method
430:             * may be called again as it will not re-throw the same
431:             * instance of the error.
432:             *
433:             * @throws Error if any of the running threads has thrown an Error.
434:             * @return an array of all currently running threads.
435:             *
436:             * @since ostermillerutils 1.05.00
437:             */
438:            public Thread[] getRunningThreads() {
439:                throwFirstError();
440:                synchronized (runningThreads) {
441:                    return runningThreads.toArray(new Thread[0]);
442:                }
443:            }
444:
445:            /**
446:             * Block until all the jobs in this Parallelizer have run
447:             * and then return.
448:             * <p>
449:             * If this method throws an exception or an error, that
450:             * exception or error may be handled and this method
451:             * may be called again as it will not re-throw the same
452:             * instance of the exception or error.
453:             *
454:             * @throws InterruptedException if interrupted while waiting.
455:             * @throws RuntimeException any running thread throws or has thrown a runtime exception.
456:             * @throws Error if any of the running threads throws or has thrown an Error.
457:             *
458:             * @since ostermillerutils 1.05.00
459:             */
460:            public void join() throws InterruptedException {
461:                while (!done()) {
462:                    synchronized (runningThreads) {
463:                        throwFirstException();
464:                        runningThreads.wait();
465:                        throwFirstError();
466:                        throwFirstException();
467:                    }
468:                }
469:            }
470:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.