001: /*
002: * $RCSfile: ThreadPool.java,v $
003: * $Revision: 1.2 $
004: * $Date: 2005/09/26 22:08:13 $
005: * $State: Exp $
006: *
007: * Class: ThreadPool
008: *
009: * Description: A pool of threads
010: *
011: *
012: *
013: * COPYRIGHT:
014: *
015: * This software module was originally developed by Raphaël Grosbois and
016: * Diego Santa Cruz (Swiss Federal Institute of Technology-EPFL); Joel
017: * Askelöf (Ericsson Radio Systems AB); and Bertrand Berthelot, David
018: * Bouchard, Félix Henry, Gerard Mozelle and Patrice Onno (Canon Research
019: * Centre France S.A) in the course of development of the JPEG2000
020: * standard as specified by ISO/IEC 15444 (JPEG 2000 Standard). This
021: * software module is an implementation of a part of the JPEG 2000
022: * Standard. Swiss Federal Institute of Technology-EPFL, Ericsson Radio
023: * Systems AB and Canon Research Centre France S.A (collectively JJ2000
024: * Partners) agree not to assert against ISO/IEC and users of the JPEG
025: * 2000 Standard (Users) any of their rights under the copyright, not
026: * including other intellectual property rights, for this software module
027: * with respect to the usage by ISO/IEC and Users of this software module
028: * or modifications thereof for use in hardware or software products
029: * claiming conformance to the JPEG 2000 Standard. Those intending to use
030: * this software module in hardware or software products are advised that
031: * their use may infringe existing patents. The original developers of
032: * this software module, JJ2000 Partners and ISO/IEC assume no liability
033: * for use of this software module or modifications thereof. No license
034: * or right to this software module is granted for non JPEG 2000 Standard
035: * conforming products. JJ2000 Partners have full right to use this
036: * software module for his/her own purpose, assign or donate this
037: * software module to any third party and to inhibit third parties from
038: * using this software module for non JPEG 2000 Standard conforming
039: * products. This copyright notice must be included in all copies or
040: * derivative works of this software module.
041: *
042: * Copyright (c) 1999/2000 JJ2000 Partners.
043: *
044: *
045: *
046: */
047:
048: package jj2000.j2k.util;
049:
050: /**
051: * This class implements a thread pool. The thread pool contains a set of
052: * threads which can be given work to do.
053: *
054: * <P>If the Java Virtual Machine (JVM) uses native threads, then the
055: * different threads will be able to execute in different processors in
056: * parallel on multiprocessors machines. However, under some JVMs and
057: * operating systems using native threads is not sufficient to allow the JVM
058: * access to multiple processors. This is the case when native threads are
059: * implemented using POSIX threads on lightweight processes
060: * (i.e. PTHREAD_SCOPE_PROCESS sopce scheduling), which is the case on most
061: * UNIX operating systems. In order to do provide access to multiple
062: * processors it is necessary to set the concurrency level to the number of
063: * processors or slightly higher. This can be achieved by setting the Java
064: * system property with the name defined by CONCURRENCY_PROP_NAME to some
065: * non-negative number. This will make use of the 'NativeServices' class and
066: * supporting native libraries. See 'NativeServices' for details. See
067: * 'CONCURRENCY_PROP_NAME' for the name of the property.
068: *
069: * <P>Initially the thread pool contains a user specified number of idle
070: * threads. Idle threads can be given a target which is run. While running the
071: * target the thread temporarily leaves the idle list. When the target
072: * finishes, it joins the idle list again, waiting for a new target. When a
073: * target is finished a thread can be notified on a particular object that is
074: * given as a lock.
075: *
076: * <P>Jobs can be submitted using Runnable interfaces, using the 'runTarget()'
077: * methods. When the job is submitted, an idle thread will be obtained, the
078: * 'run()' method of the 'Runnable' interface will be executed and when it
079: * completes the thread will be returned to the idle list. In general the
080: * 'run()' method should complete in a rather short time, so that the threds
081: * of the pool are not starved.
082: *
083: * <P>If using the non-asynchronous calls to 'runTarget()', it is important
084: * that any target's 'run()' method, or any method called from it, does not
085: * use non-asynchronous calls to 'runTarget()' on the same thread pool where
086: * it was started. Otherwise this could create a dead-lock when there are not
087: * enough idle threads.
088: *
089: * <P>The pool also has a global error and runtime exception condition (one
090: * for 'Error' and one for 'RuntimeException'). If a target's 'run()' method
091: * throws an 'Error' or 'RuntimeException' the corresponding exception
092: * condition is set and the exception object saved. In any subsequent call to
093: * 'checkTargetErrors()' the saved exception object is thrown. Likewise, if a
094: * target's 'run()' method throws any other subclass of 'Throwable' a new
095: * 'RuntimeException' is created and saved. It will be thrown on a subsequent
096: * call to 'checkTargetErrors()'. If more than one exception occurs between
097: * calls to 'checkTargetErrors()' only the last one is saved. Any 'Error'
098: * condition has precedence on all 'RuntimeException' conditions. The threads
099: * in the pool are unaffected by any exceptions thrown by targets.
100: *
101: * <P>The only exception to the above is the 'ThreadDeath' exception. If a
102: * target's 'run()' method throws the 'ThreadDeath' exception a warning
103: * message is printed and the exception is propagated, which will terminate
104: * the thread in which it occurs. This could lead to instabilities of the
105: * pool. The 'ThreadDeath' exception should never be thrown by the program. It
106: * is thrown by the Java(TM) Virtual Machine when Thread.stop() is
107: * called. This method is deprecated and should never be called.
108: *
109: * <P>All the threads in the pool are "daemon" threads and will automatically
110: * terminate when no daemon threads are running.
111: *
112: * @see NativeServices
113: *
114: * @see #CONCURRENCY_PROP_NAME
115: *
116: * @see Runnable
117: *
118: * @see Thread
119: *
120: * @see Error
121: *
122: * @see RuntimeException
123: *
124: * */
125: public class ThreadPool {
126:
127: /** The name of the property that sets the concurrency level:
128: jj2000.j2k.util.ThreadPool.concurrency */
129: public final static String CONCURRENCY_PROP_NAME = "jj2000.j2k.util.ThreadPool.concurrency";
130:
131: /** The array of idle threads and the lock for the manipulation of the
132: * idle thread list. */
133: private ThreadPoolThread idle[];
134:
135: /** The number of idle threads */
136: private int nidle;
137:
138: /** The name of the pool */
139: private String poolName;
140:
141: /** The priority for the pool */
142: private int poolPriority;
143:
144: /** The last error thrown by a target. Null if none */
145: // NOTE: needs to be volatile, so that only one copy exits in memory
146: private volatile Error targetE;
147:
148: /** The last runtime exception thrown by a target. Null if none */
149: // NOTE: needs to be volatile, so that only one copy exits in memory
150: private volatile RuntimeException targetRE;
151:
152: /**
153: * The threads that are managed by the pool.
154: * */
155: class ThreadPoolThread extends Thread {
156: private Runnable target;
157: private Object lock;
158: private boolean doNotifyAll;
159:
160: /**
161: * Creates a ThreadPoolThread object, setting its name according to
162: * the given 'idx', daemon type and the priority to the one of the
163: * pool.
164: *
165: * @param idx The index of this thread in the pool
166: *
167: * @param name The name of the thread
168: * */
169: public ThreadPoolThread(int idx, String name) {
170: super (name);
171: setDaemon(true);
172: setPriority(poolPriority);
173: }
174:
175: /**
176: * The method that is run by the thread. This method first joins the
177: * idle state in the pool and then enters an infinite loop. In this
178: * loop it waits until a target to run exists and runs it. Once the
179: * target's run() method is done it re-joins the idle state and
180: * notifies the waiting lock object, if one exists.
181: *
182: * <P>An interrupt on this thread has no effect other than forcing a
183: * check on the target. Normally the target is checked every time the
184: * thread is woken up by notify, no interrupts should be done.
185: *
186: * <P>Any exception thrown by the target's 'run()' method is catched
187: * and this thread is not affected, except for 'ThreadDeath'. If a
188: * 'ThreadDeath' exception is catched a warning message is printed by
189: * the 'FacilityManager' and the exception is propagated up. For
190: * exceptions which are subclasses of 'Error' or 'RuntimeException'
191: * the corresponding error condition is set and this thread is not
192: * affected. For any other exceptions a new 'RuntimeException' is
193: * created and the error condition is set, this thread is not affected.
194: * */
195: public void run() {
196: // Join the idle threads list
197: putInIdleList(this );
198: // Permanently lock the object while running so that target can
199: // not be changed until we are waiting again. While waiting for a
200: // target the lock is released.
201: synchronized (this ) {
202: while (true) {
203: // Wait until we get a target
204: while (target == null) {
205: try {
206: this .wait();
207: } catch (InterruptedException e) {
208: }
209: }
210: // Run the target and catch all possible errors
211: try {
212: target.run();
213: } catch (ThreadDeath td) {
214: // We have been instructed to abruptly terminate
215: // the thread, which should never be done. This can
216: // cause another thread, or the system, to lock.
217: FacilityManager
218: .getMsgLogger()
219: .printmsg(
220: MsgLogger.WARNING,
221: "Thread.stop() called on a ThreadPool "
222: + "thread or ThreadDeath thrown. This is "
223: + "deprecated. Lock-up might occur.");
224: throw td;
225: } catch (Error e) {
226: targetE = e;
227: } catch (RuntimeException re) {
228: targetRE = re;
229: } catch (Throwable ue) {
230: // A totally unexpected error has occurred
231: // (Thread.stop(Throwable) has been used, which should
232: // never be.
233: targetRE = new RuntimeException(
234: "Unchecked exception "
235: + "thrown by target's "
236: + "run() method in pool "
237: + poolName + ".");
238: }
239: // Join idle threads
240: putInIdleList(this );
241: // Release the target and notify lock (i.e. wakeup)
242: target = null;
243: if (lock != null) {
244: synchronized (lock) {
245: if (doNotifyAll) {
246: lock.notifyAll();
247: } else {
248: lock.notify();
249: }
250: }
251: }
252: }
253: }
254: }
255:
256: /**
257: * Assigns a target to this thread, with an optional notify lock and a
258: * notify mode. The another target is currently running the method
259: * will block until it terminates. After setting the new target the
260: * runner thread will be wakenup and execytion will start.
261: *
262: * @param target The runnable object containing the 'run()' method to
263: * run.
264: *
265: * @param lock An object on which notify will be called once the
266: * target's run method has finished. A thread to be notified should be
267: * waiting on that object. If null no thread is notified.
268: *
269: * @param notifyAll If true 'notifyAll()', instead of 'notify()', will
270: * be called on tghe lock.
271: * */
272: synchronized void setTarget(Runnable target, Object lock,
273: boolean notifyAll) {
274: // Set the target
275: this .target = target;
276: this .lock = lock;
277: doNotifyAll = notifyAll;
278: // Wakeup the thread
279: this .notify();
280: }
281: }
282:
283: /**
284: * Creates a new thread pool of the given size, thread priority and pool
285: * name.
286: *
287: * <P>If the Java system property of the name defined by
288: * 'CONCURRENCY_PROP_NAME' is set, then an attempt will be made to load
289: * the library that supports concurrency setting (see
290: * 'NativeServices'). If that succeds the concurrency level will be set to
291: * the specified value. Otherwise a warning is printed.
292: *
293: * @param size The size of the pool (number of threads to create in the
294: * pool).
295: *
296: * @param priority The priority to give to the threads in the pool. If
297: * less than 'Thread.MIN_PRIORITY' it will be the same as the priority of
298: * the calling thread.
299: *
300: * @param name The name of the pool. If null a default generic name is
301: * chosen.
302: *
303: * @see NativeServices
304: *
305: * @see #CONCURRENCY_PROP_NAME
306: * */
307: public ThreadPool(int size, int priority, String name) {
308: int i;
309: ThreadPoolThread t;
310: String prop;
311: int clevel;
312:
313: // Initialize variables checking for special cases
314: if (size <= 0) {
315: throw new IllegalArgumentException(
316: "Pool must be of positive size");
317: }
318: if (priority < Thread.MIN_PRIORITY) {
319: poolPriority = Thread.currentThread().getPriority();
320: } else {
321: poolPriority = (priority < Thread.MAX_PRIORITY) ? priority
322: : Thread.MAX_PRIORITY;
323: }
324: if (name == null) {
325: poolName = "Anonymous ThreadPool";
326: } else {
327: poolName = name;
328: }
329:
330: // If requested to set concurrency try to do it
331: prop = null;
332: try {
333: prop = System.getProperty(CONCURRENCY_PROP_NAME);
334: } catch (SecurityException se) {
335: // Ignore it.
336: }
337: if (prop == null) {
338: // No concurrency to set, do nothing
339: } else {
340: // Get concurrency level
341: try {
342: clevel = Integer.parseInt(prop);
343: if (clevel < 0)
344: throw new NumberFormatException();
345: } catch (NumberFormatException e) {
346: throw new IllegalArgumentException(
347: "Invalid concurrency level " + "in property "
348: + CONCURRENCY_PROP_NAME);
349: }
350: // Attempt to load library
351: if (NativeServices.loadLibrary()) {
352: // Library load successful
353: FacilityManager.getMsgLogger().printmsg(
354: MsgLogger.INFO,
355: "Changing thread concurrency " + "level from "
356: + NativeServices.getThreadConcurrency()
357: + " to " + clevel + ".");
358: NativeServices.setThreadConcurrency(clevel);
359: } else {
360: // Could not load the library => warn
361: FacilityManager
362: .getMsgLogger()
363: .printmsg(
364: MsgLogger.WARNING,
365: "Native library to set "
366: + "thread concurrency level as specified by the "
367: + CONCURRENCY_PROP_NAME
368: + " property not found. "
369: + "Thread concurrency unchanged.");
370: }
371: }
372:
373: // Allocate internal variables
374: idle = new ThreadPoolThread[size];
375: nidle = 0;
376:
377: // Create and start the threads
378: for (i = 0; i < size; i++) {
379: t = new ThreadPoolThread(i, poolName + "-" + i);
380: t.start();
381: }
382: }
383:
384: /**
385: * Returns the size of the pool. That is the number of threads in this
386: * pool (idle + busy).
387: *
388: * @return The pool's size.
389: *
390: * */
391: public int getSize() {
392: return idle.length;
393: }
394:
395: /**
396: * Runs the run method of the specified target in an idle thread of this
397: * pool. When the target's run method completes, the thread waiting on the
398: * lock object is notified, if any. If there is currently no idle thread
399: * the method will block until a thread of the pool becomes idle or the
400: * calling thread is interrupted.
401: *
402: * <P>This method is the same as <tt>runTarget(t,l,true,false)</tt>.
403: *
404: * @param t The target. The 'run()' method of this object will be run in
405: * an idle thread of the pool.
406: *
407: * @param l The lock object. A thread waiting on the lock of the 'l'
408: * object will be notified, through the 'notify()' call, when the target's
409: * run method completes. If null no thread is notified.
410: *
411: * @return True if the target was submitted to some thread. False if no
412: * idle thread could be found and the target was not submitted for
413: * execution.
414: *
415: * */
416: public boolean runTarget(Runnable t, Object l) {
417: return runTarget(t, l, false, false);
418: }
419:
420: /**
421: * Runs the run method of the specified target in an idle thread of this
422: * pool. When the target's run method completes, the thread waiting on the
423: * lock object is notified, if any. If there is currently no idle thread
424: * and the asynchronous mode is not used the method will block until a
425: * thread of the pool becomes idle or the calling thread is
426: * interrupted. If the asynchronous mode is used then the method will not
427: * block and will return false.
428: *
429: * <P>This method is the same as <tt>runTarget(t,l,async,false)</tt>.
430: *
431: * @param t The target. The 'run()' method of this object will be run in
432: * an idle thread of the pool.
433: *
434: * @param l The lock object. A thread waiting on the lock of the 'l'
435: * object will be notified, through the 'notify()' call, when the target's
436: * run method completes. If null no thread is notified.
437: *
438: * @param async If true the asynchronous mode will be used.
439: *
440: * @return True if the target was submitted to some thread. False if no
441: * idle thread could be found and the target was not submitted for
442: * execution.
443: *
444: * */
445: public boolean runTarget(Runnable t, Object l, boolean async) {
446: return runTarget(t, l, async, false);
447: }
448:
449: /**
450: * Runs the run method of the specified target in an idle thread of this
451: * pool. When the target's run method completes, the thread waiting on the
452: * lock object is notified, if any. If there is currently no idle thread
453: * and the asynchronous mode is not used the method will block until a
454: * thread of the pool becomes idle or the calling thread is
455: * interrupted. If the asynchronous mode is used then the method will not
456: * block and will return false.
457: *
458: * @param t The target. The 'run()' method of this object will be run in
459: * an idle thread of the pool.
460: *
461: * @param l The lock object. A thread waiting on the lock of the 'l'
462: * object will be notified, through the 'notify()' call, when the target's
463: * run method completes. If null no thread is notified.
464: *
465: * @param async If true the asynchronous mode will be used.
466: *
467: * @param notifyAll If true, threads waiting on the lock of the 'l' object
468: * will be notified trough the 'notifyAll()' instead of the normal
469: * 'notify()' call. This is not normally needed.
470: *
471: * @return True if the target was submitted to some thread. False if no
472: * idle thread could be found and the target was not submitted for
473: * execution.
474: *
475: * */
476: public boolean runTarget(Runnable t, Object l, boolean async,
477: boolean notifyAll) {
478: ThreadPoolThread runner; // The thread to run the target
479:
480: // Get a thread to run
481: runner = getIdle(async);
482: // If no runner return failure
483: if (runner == null)
484: return false;
485: // Set the runner
486: runner.setTarget(t, l, notifyAll);
487: return true;
488: }
489:
490: /**
491: * Checks that no error or runtime exception in any target have occurred
492: * so far. If an error or runtime exception has occurred in a target's run
493: * method they are thrown by this method.
494: *
495: * @exception Error If an error condition has been thrown by a target
496: * 'run()' method.
497: *
498: * @exception RuntimeException If a runtime exception has been thrown by a
499: * target 'run()' method.
500: * */
501: public void checkTargetErrors() {
502: // Check for Error
503: if (targetE != null)
504: throw targetE;
505: // Check for RuntimeException
506: if (targetRE != null)
507: throw targetRE;
508: }
509:
510: /**
511: * Clears the current target error conditions, if any. Note that a thread
512: * in the pool might have set the error conditions since the last check
513: * and that those error conditions will be lost. Likewise, before
514: * returning from this method another thread might set the error
515: * conditions. There is no guarantee that no error conditions exist when
516: * returning from this method.
517: *
518: * <P>In order to ensure that no error conditions exist when returning
519: * from this method cooperation from the targets and the thread using this
520: * pool is necessary (i.e. currently no targets running or waiting to
521: * run).
522: * */
523: public void clearTargetErrors() {
524: // Clear the error and runtime exception conditions
525: targetE = null;
526: targetRE = null;
527: }
528:
529: /**
530: * Puts the thread 't' in the idle list. The thread 't' should be in fact
531: * idle and ready to accept a new target when it joins the idle list.
532: *
533: * <P> An idle thread that is already in the list should never add itself
534: * to the list before it is removed. For efficiency reasons there is no
535: * check to see if the thread is already in the list of idle threads.
536: *
537: * <P> If the idle list was empty 'notify()' will be called on the 'idle'
538: * array, to wake up a thread that might be waiting (within the
539: * 'getIdle()' method) on an idle thread to become available.
540: *
541: * @param t The thread to put in the idle list.
542: * */
543: private void putInIdleList(ThreadPoolThread t) {
544: // NOTE: if already in idle => catastrophe! (should be OK since //
545: // this is private method)
546: // Lock the idle array to avoid races with 'getIdle()'
547: synchronized (idle) {
548: idle[nidle] = t;
549: nidle++;
550: // If idle array was empty wakeup any waiting threads.
551: if (nidle == 1)
552: idle.notify();
553: }
554: }
555:
556: /**
557: * Returns and idle thread and removes it from the list of idle
558: * threads. In asynchronous mode it will immediately return an idle
559: * thread, or null if none is available. In non-asynchronous mode it will
560: * block until a thread of the pool becomes idle or the calling thread is
561: * interrupted.
562: *
563: * <P>If in non-asynchronous mode and there are currently no idle threads
564: * available the calling thread will wait on the 'idle' array lock, until
565: * notified by 'putInIdleList()' that an idle thread might have become
566: * available.
567: *
568: * @param async If true asynchronous mode is used.
569: *
570: * @return An idle thread of the pool, that has been removed from the idle
571: * list, or null if none is available.
572: * */
573: private ThreadPoolThread getIdle(boolean async) {
574: // Lock the idle array to avoid races with 'putInIdleList()'
575: synchronized (idle) {
576: if (async) {
577: // In asynchronous mode just return null if no idle thread
578: if (nidle == 0)
579: return null;
580: } else {
581: // In synchronous mode wait until a thread becomes idle
582: while (nidle == 0) {
583: try {
584: idle.wait();
585: } catch (InterruptedException e) {
586: // If we were interrupted just return null
587: return null;
588: }
589: }
590: }
591: // Decrease the idle count and return one of the idle threads
592: nidle--;
593: return idle[nidle];
594: }
595: }
596: }
|