001: /*
002: * Runs multiple jobs in parallel
003: * Copyright (C) 2004-2005 Matt Conway
004: * http://simplygenius.com/
005: * Copyright (C) 2005 Stephen Ostermiller
006: * http://ostermiller.org/contact.pl?regarding=Java+Utilities
007: *
008: * This program is free software; you can redistribute it and/or modify
009: * it under the terms of the GNU General Public License as published by
010: * the Free Software Foundation; either version 2 of the License, or
011: * (at your option) any later version.
012: *
013: * This program is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
016: * GNU General Public License for more details.
017: *
018: * See COPYING.TXT for details.
019: */
020:
021: package com.Ostermiller.util;
022:
023: import java.util.*;
024:
025: /**
026: * Runs multiple jobs in parallel, n threads at a time, and waits
027: * until all threads are complete before continuing.
028: * <p>
029: * Typically, Parallelizer would be used to run each of the items-
030: * in a for loop at the same time. For example the following for
031: * loop:
032: * <pre>
033: * for (int i=0; i<10; i++){
034: * System.out.println("Hello World " + i);
035: * }
036: * System.out.println("done");
037: * </pre>
038: * To this:
039: * <pre>
040: * Parallelizer parallelizer = new Parallelizer();
041: * for (int i=0; i<10; i++){
042: * final int j = i;
043: * parallelizer.run(
044: * new Runnable(){
045: * System.out.println("Hello World " + j);
046: * }
047: * );
048: * }
049: * parallelizer.join();
050: * System.out.println("done");
051: *
052: * More information about this class is available from <a target="_top" href=
053: * "http://ostermiller.org/utils/Parallelizer.html">ostermiller.org</a>.
054: *
055: * @author Matt Conway - http://simplygenius.com/
056: * @author Stephen Ostermiller - http://ostermiller.org/contact.pl?regarding=Java+Utilities
057: * @since ostermillerutils 1.05.00
058: */
059: public class Parallelizer {
060: /**
061: * Constant that may be passed concurrentThreadLimit argument
062: * of the constructor indicating that no limit should be placed
063: * on the number of threads that are allowed to run concurrently.
064: *
065: * @since ostermillerutils 1.05.00
066: */
067: public static final int INFINITE_THREAD_LIMIT = 0;
068:
069: /**
070: * The number of threads that are allowed to be run concurrently.
071: * (INFINITE_THREAD_LIMIT for no limit)
072: */
073: private int concurrentThreadLimit = INFINITE_THREAD_LIMIT;
074:
075: /**
076: * Create a new Parallelizer with no limit on the number
077: * of threads that will be allowed to be run concurrently.
078: *
079: * @since ostermillerutils 1.05.00
080: */
081: public Parallelizer() {
082: this (INFINITE_THREAD_LIMIT);
083: }
084:
085: /**
086: * Create a new Parallelizer with the specified limit on the number
087: * of threads that will be allowed to be run concurrently.
088: * <p>
089: * When the concurrent thread limit is reached and the parallelizer
090: * gets a new thread to run, the new thread will be queued until
091: * a thread finishes.
092: *
093: * @param concurrentThreadLimit number of threads that will be allowed
094: * to run simultaneously or INFINITE_THREAD_LIMIT for no limit.
095: * @throws IllegalArgumentException if concurrentThreadLimit not a whole
096: * number or INFINITE_THREAD_LIMIT
097: *
098: * @since ostermillerutils 1.05.00
099: */
100: public Parallelizer(int concurrentThreadLimit) {
101: if (concurrentThreadLimit < INFINITE_THREAD_LIMIT)
102: throw new IllegalArgumentException(
103: "Bad concurrent thread limit: "
104: + concurrentThreadLimit);
105: this .concurrentThreadLimit = concurrentThreadLimit;
106: }
107:
108: /**
109: * A Set of threads that are currently running.
110: * This set is also used as a lock to synchronize
111: * anything that touches running threads.
112: */
113: private HashSet<Thread> runningThreads = new HashSet<Thread>();
114:
115: /**
116: * A queue of jobs that have not yet been started.
117: */
118: private LinkedList<Thread> toRunQueue = new LinkedList<Thread>();
119:
120: /**
121: * Run the given job. The given job is either run
122: * immediately or if the max number of concurrent jobs are already
123: * running, it is queued to be run when some job is finished.
124: * <p>
125: * If this method throws an error, that
126: * error may be handled and this method
127: * may be called again as it will not re-throw the same
128: * instance of the error.
129: *
130: * @param job job which is to be run in parallel with other jobs.
131: * @throws Error if any thread that is already running has thrown an Error.
132: * @throws NullPointerException if job is null.
133: *
134: * @since ostermillerutils 1.05.00
135: */
136: public void run(Runnable job) {
137: run(null, job, null, 0);
138: }
139:
140: /**
141: * Run the given job. The given job is either run
142: * immediately or if the max number of concurrent jobs are already
143: * running, it is queued to be run when some job is finished.
144: * <p>
145: * If this method throws an error, that
146: * error may be handled and this method
147: * may be called again as it will not re-throw the same
148: * instance of the error.
149: *
150: * @param job job which is to be run in parallel with other jobs.
151: * @param threadName name for the thread that will be created to run the job (null for auto generated thread name)
152: * @throws Error if any thread that is already running has thrown an Error.
153: * @throws NullPointerException if job is null.
154: *
155: * @since ostermillerutils 1.05.00
156: */
157: public void run(Runnable job, String threadName) {
158: run(null, job, threadName, 0);
159: }
160:
161: /**
162: * Run the given job. The given job is either run
163: * immediately or if the max number of concurrent jobs are already
164: * running, it is queued to be run when some job is finished.
165: * <p>
166: * If this method throws an error, that
167: * error may be handled and this method
168: * may be called again as it will not re-throw the same
169: * instance of the error.
170: *
171: * @param threadGroup group in which this job should be run (null for default group).
172: * @param job job which is to be run in parallel with other jobs.
173: * @throws Error if any thread that is already running has thrown an Error.
174: * @throws NullPointerException if job is null.
175: *
176: * @since ostermillerutils 1.05.00
177: */
178: public void run(ThreadGroup threadGroup, Runnable job) {
179: run(threadGroup, job, null, 0);
180: }
181:
182: /**
183: * Run the given job. The given job is either run
184: * immediately or if the max number of concurrent jobs are already
185: * running, it is queued to be run when some job is finished.
186: * <p>
187: * If this method throws an error, that
188: * error may be handled and this method
189: * may be called again as it will not re-throw the same
190: * instance of the error.
191: *
192: * @param threadGroup group in which this job should be run (null for default group).
193: * @param job job which is to be run in parallel with other jobs.
194: * @param threadName name for the thread that will be created to run the job (null for auto generated thread name)
195: * @throws Error if any thread that is already running has thrown an Error.
196: * @throws NullPointerException if job is null.
197: *
198: * @since ostermillerutils 1.05.00
199: */
200: public void run(ThreadGroup threadGroup, Runnable job,
201: String threadName) {
202: run(threadGroup, job, threadName, 0);
203: }
204:
205: /**
206: * Run the given job. The given job is either run
207: * immediately or if the max number of concurrent jobs are already
208: * running, it is queued to be run when some job is finished.
209: * <p>
210: * If this method throws an error, that
211: * error may be handled and this method
212: * may be called again as it will not re-throw the same
213: * instance of the error.
214: *
215: * @param threadGroup group in which this job should be run (null for default group).
216: * @param job job which is to be run in parallel with other jobs.
217: * @param threadName name for the thread that will be created to run the job (null for auto generated thread name)
218: * @param stackSize system dependent stack size suggestion for thread creation (0 for default stack size).
219: * @throws Error if any thread that is already running has thrown an Error.
220: * @throws NullPointerException if job is null.
221: *
222: * @since ostermillerutils 1.05.00
223: */
224: public void run(ThreadGroup threadGroup, final Runnable job,
225: String threadName, long stackSize) {
226: throwFirstError();
227:
228: Runnable jobWrapper = new Runnable() {
229: public void run() {
230: try {
231: job.run();
232: } catch (RuntimeException runtimeException) {
233: // Put exceptions in the exception queue
234: synchronized (runningThreads) {
235: exceptionList.add(runtimeException);
236: }
237: } catch (Error error) {
238: // Put errors in the error queue
239: synchronized (runningThreads) {
240: errorList.add(error);
241: }
242: } finally {
243: synchronized (runningThreads) {
244: // when done remove ourselves from the list
245: // of running threads.
246: runningThreads.remove(Thread.currentThread());
247: // Notify the block method.
248: runningThreads.notifyAll();
249: }
250: // If there are jobs queued up to be run, now would
251: // be a good time to run them.
252: startAJobIfNeeded();
253: }
254: }
255: };
256:
257: // ensure the thread name is not null, and auto generate a name if it is
258: threadName = getNextThreadName(threadName);
259:
260: // If we are already running the max number of jobs, queue this job up
261: synchronized (runningThreads) {
262: toRunQueue.add(new Thread(threadGroup, jobWrapper,
263: threadName, stackSize));
264: }
265:
266: // Now that the job is in the queue of jobs to run,
267: // check the queue and see if the job should be started
268: startAJobIfNeeded();
269: }
270:
271: /**
272: * An number to assign to the next auto generated thread name
273: */
274: private static int threadNameCount = 0;
275:
276: /**
277: * Ensure the given thread name is not null. If not null, return it,
278: * if it is null, then then generate a name.
279: *
280: * @param threadName existing thread name to check
281: * @return the given thread name or a generated thread name if the specified name was null.
282: */
283: private static String getNextThreadName(String threadName) {
284: if (threadName != null)
285: return threadName;
286: return "Parallelizer-" + (threadNameCount++);
287: }
288:
289: /**
290: * A queue of exceptions that running threads have thrown.
291: */
292: private LinkedList<RuntimeException> exceptionList = new LinkedList<RuntimeException>();
293:
294: /**
295: * Remove the first exception from the exception list and throw it.
296: *
297: * @throws RuntimeException if a running thread has thrown an exception not yet thrown by this method.
298: */
299: private void throwFirstException() {
300: synchronized (runningThreads) {
301: if (exceptionList.size() > 0) {
302: throw exceptionList.removeFirst();
303: }
304: }
305: }
306:
307: /**
308: * A queue of exceptions that running threads have thrown.
309: */
310: private LinkedList<Error> errorList = new LinkedList<Error>();
311:
312: /**
313: * Remove the first error from the error list and throw it.
314: *
315: * @throws Error if a running thread has thrown an error not yet thrown by this method.
316: */
317: private void throwFirstError() throws Error {
318: synchronized (runningThreads) {
319: if (errorList.size() > 0) {
320: throw errorList.removeFirst();
321: }
322: }
323: }
324:
325: /**
326: * Remove a job from the toRunQueue, create a thread for it,
327: * start the thread, and put the job in the set of running jobs.
328: * But do all this only if there are jobs queued up to be run
329: * and we are not already running the max number of concurrent
330: * jobs at once.
331: */
332: private void startAJobIfNeeded() {
333: synchronized (runningThreads) {
334: // If we are already running the max number of jobs, just return
335: if (concurrentThreadLimit != INFINITE_THREAD_LIMIT) {
336: if (runningThreads.size() >= concurrentThreadLimit)
337: return;
338: }
339:
340: // If there are no more job to run, return
341: if (toRunQueue.size() == 0)
342: return;
343:
344: // Get a job out of the queue
345: Thread thread = toRunQueue.removeFirst();
346:
347: // Put the thread in the list of running threads
348: runningThreads.add(thread);
349: thread.start();
350: }
351: }
352:
353: /**
354: * Return true iff all jobs that have been requested to run
355: * in this Parallelizer have completed.
356: * <p>
357: * If this method throws an error, that
358: * error may be handled and this method
359: * may be called again as it will not re-throw the same
360: * instance of the error.
361: *
362: * @return Whether all jobs are done or not.
363: * @throws Error if any of the running threads has thrown an Error.
364: *
365: * @since ostermillerutils 1.05.00
366: */
367: public boolean done() {
368: throwFirstError();
369: synchronized (runningThreads) {
370: return (toRunQueue.size() + runningThreads.size()) == 0;
371: }
372: }
373:
374: /**
375: * All currently running threads will be interrupted.
376: * The threads interrupted threads may die, causing
377: * jobs that were queued but not yet started, to start.
378: * <p>
379: * If this method throws an error, that
380: * error may be handled and this method
381: * may be called again as it will not re-throw the same
382: * instance of the error.
383: *
384: * @throws Error if any of the running threads has thrown an Error.
385: *
386: * @since ostermillerutils 1.05.00
387: */
388: public void interrupt() {
389: throwFirstError();
390: synchronized (runningThreads) {
391: for (Thread thread : runningThreads) {
392: (thread).interrupt();
393: throwFirstError();
394: }
395: }
396: }
397:
398: /**
399: * Dump the stack of each running thread.
400: * <p>
401: * If this method throws an error, that
402: * error may be handled and this method
403: * may be called again as it will not re-throw the same
404: * instance of the error.
405: *
406: * @throws Error if any of the running threads has thrown an Error.
407: *
408: * @since ostermillerutils 1.05.00
409: */
410: public void dumpStack() {
411: throwFirstError();
412: synchronized (runningThreads) {
413: for (Thread thread : runningThreads) {
414: for (StackTraceElement stackTraceElement : thread
415: .getStackTrace()) {
416: System.out.println(stackTraceElement.toString());
417: }
418: throwFirstError();
419: }
420: }
421: }
422:
423: /**
424: * Gets a list of all running threads. There may be jobs that
425: * are queued and do not yet have threads. These job are not
426: * returned.
427: * <p>
428: * If this method throws an error, that
429: * error may be handled and this method
430: * may be called again as it will not re-throw the same
431: * instance of the error.
432: *
433: * @throws Error if any of the running threads has thrown an Error.
434: * @return an array of all currently running threads.
435: *
436: * @since ostermillerutils 1.05.00
437: */
438: public Thread[] getRunningThreads() {
439: throwFirstError();
440: synchronized (runningThreads) {
441: return runningThreads.toArray(new Thread[0]);
442: }
443: }
444:
445: /**
446: * Block until all the jobs in this Parallelizer have run
447: * and then return.
448: * <p>
449: * If this method throws an exception or an error, that
450: * exception or error may be handled and this method
451: * may be called again as it will not re-throw the same
452: * instance of the exception or error.
453: *
454: * @throws InterruptedException if interrupted while waiting.
455: * @throws RuntimeException any running thread throws or has thrown a runtime exception.
456: * @throws Error if any of the running threads throws or has thrown an Error.
457: *
458: * @since ostermillerutils 1.05.00
459: */
460: public void join() throws InterruptedException {
461: while (!done()) {
462: synchronized (runningThreads) {
463: throwFirstException();
464: runningThreads.wait();
465: throwFirstError();
466: throwFirstException();
467: }
468: }
469: }
470: }
|