001: // Copyright 03/14/00 Sun Microsystems, Inc. All Rights Reserved.
002:
003: //#pragma ident "@(#)ThreadPool.java 1.3 00/03/14 Sun Microsystems"
004:
005: package com.sun.portal.util;
006:
007: // TODO:
008: // Stopping individual tasks
009: // ThreadGroups
010: // Exception propagation or callbacks
011:
012: import java.util.logging.Level;
013: import java.util.logging.Logger;
014:
015: /**
016: * <p>
017: * ThreadPool is a generic thread pool that manages and recycles threads instead
018: * of creating them everytime some task needs to be run on a different thread.
019: * Thread pooling saves the virtual machine the work of creating brand new
020: * threads for every short-lived task. In addition, it minimizes overhead
021: * associated with getting a thread started and cleaning it up after it dies. By
022: * creating a pool of threads, a single thread from the pool can be reused any
023: * number of times for different tasks.
024: * </p>
025: * <p>
026: * This reduces response time because a thread is already constructed and
027: * started and is simply waiting for its next task. This is particularly useful
028: * when using many short-lived tasks. This may not be useful for long-lived
029: * tasks.
030: * </p>
031: * <p>
032: * Another characteristic of this thread pool is that it is fixed in size at the
033: * time of construction. All the threads are started, and then each goes into a
034: * wait state until a task is assigned to it. If all the threads in the pool are
035: * currently assigned a task, the pool is empty and new requests (tasks) will
036: * have to wait before being scheduled to run. This is a way to put an upper
037: * bound on the amount of resources any pool can use up.
038: * </p>
039: * <p>
040: * In future, this class may be enhanced to provide support growing the size of
041: * the pool at runtime to facilitate dynamic tuning.
042: * </p>
043: */
044:
045: public final class ThreadPool {
046: private static int nextThreadID = 1;
047:
048: private static Logger logger;
049:
050: private String poolName;
051:
052: private IPSThread[] allThreadList;
053:
054: private IPSThread[] idleThreadList;
055:
056: /**
057: * tail points to the last available idle thread in the idleThreadList. When
058: * the idleThreadList is empty, tail is set to -1. IMPORTANT: tail MUST
059: * always be accessed by acquiring a lock on idleThreadList. Otherwise, the
060: * code will break.
061: */
062: private volatile int tail = -1;
063:
064: /**
065: * <p>
066: * Constructs a thread pool with the poolName and given number of threads.
067: * Another characteristic of this thread pool is that it is fixed in size at
068: * the time of construction. All the threads are started, and then each goes
069: * into a wait state until a task is assigned to it.
070: * </p>
071: *
072: * <p>
073: * The size of the pool should be carefully choosen. If the size is too
074: * large, there may be excessive context switching and the performance may
075: * suffer. If the tasks are fairly distributed between CPU and I/O usage,
076: * start with a pool size of 10 per CPU; later experiment and fine-tune for
077: * optimal pool size. Ensure that the pool size is configurable at
078: * install/runtime.
079: * </p>
080: *
081: * @param poolName
082: * name of the thread pool
083: * @param numThreads
084: * maximum number of threads in the thread pool.
085: * @param daemon
086: * if true, all threads created will be daemon threads. If false,
087: * all threads created will be non-daemon threads.
088: * @param logger
089: * logger object to send log messages to.
090: *
091: * @throws IllegalArgumentException
092: * if poolName is null
093: */
094: public ThreadPool(String poolName, int numThreads, boolean daemon,
095: Logger logger) throws IllegalArgumentException {
096: if (poolName != null) {
097: this .poolName = poolName;
098: } else {
099: throw new IllegalArgumentException(
100: "Must assign a non-null pool name to ThreadPool");
101: }
102:
103: this .logger = logger;
104:
105: // ensure that there is at least one thread in the pool
106: numThreads = Math.max(1, numThreads);
107:
108: // Normally Lists should be used. However, since this class is a
109: // performance-critical low level code, arrays are used to fully
110: // optmize.
111: idleThreadList = new IPSThread[numThreads];
112: allThreadList = new IPSThread[numThreads];
113:
114: // Now initialized only allThreads list. idleThreads list will be
115: // updated by the threads when they are idle and ready.
116:
117: for (int i = 0; i < numThreads; ++i) {
118: allThreadList[i] = new IPSThread(getNextIPSThreadID(),
119: daemon);
120: }
121: }
122:
123: /** Suppress the no-arg constructor because it is not supported */
124: private ThreadPool() {
125: }
126:
127: /** Gets the unique name for the internal thread in the pool */
128: private synchronized String getNextIPSThreadID() {
129: return poolName + ".Thread#" + Integer.toString(nextThreadID++);
130: }
131:
132: /**
133: * Runs the user-defined task. To enable better debugging or profiling
134: * capabilities, the <code>task</code> <code>Runnable</code> should
135: * implement <code>toString()</code> to intuitively identify the task.
136: *
137: * @param task
138: * the user-defined Runnable to be scheduled for execution on
139: * this thread pool
140: * @throws InterruptedException
141: * when the thread invoking <code>run</code> is interrupted.
142: */
143: public final void run(Runnable task) throws InterruptedException {
144: IPSThread ipsThread;
145: synchronized (idleThreadList) {
146: while (tail == -1) {
147: if (logger != null) {
148: logger.warning(Thread.currentThread().getName()
149: + " waiting for an idle thread in "
150: + toString());
151: }
152: idleThreadList.wait();
153: }
154:
155: // Now there is at least one idle thread available
156: ipsThread = idleThreadList[tail--];
157:
158: // Now that the idle thread is off the idleThreadList, there is no
159: // danger of some other task being simultaneously assigned to that
160: // idle thread. Release the lock on idleThreadList now so that
161: // other tasks can be processed concurrently.
162: }
163: // logMessage(Thread.currentThread().getName() + " assigning task '" +
164: // task + "' to " + ipsThread.getName());
165:
166: ipsThread.process(task);
167: }
168:
169: /**
170: * Stops all the idle threads in the pool. Note that these stopped threads
171: * are no longer availble for future tasks because they are returned to
172: * underlying virtual machine. Also note that none of the active threads in
173: * the pool are stopped.
174: */
175: public final void stopIdleThreads() {
176: synchronized (idleThreadList) {
177: while (tail >= 0) {
178: IPSThread idleThread = idleThreadList[tail];
179: idleThreadList[tail--] = null;
180: idleThread.stop();
181: }
182: }
183: }
184:
185: /**
186: * Destroys the thread pool. This stops all the threads, active and idle, in
187: * the pool and releases all resources.
188: */
189: public final void destroy() {
190: // stop the idle threads first to be more productive
191: stopIdleThreads();
192:
193: try {
194: // give the idle threads a chance to die
195: Thread.sleep(500);
196: } catch (InterruptedException e) {
197: // no need to reassert InterruptedException here because the
198: // pool is being shutdown
199: }
200:
201: // now go through allThreadList and stop everything
202: synchronized (allThreadList) {
203: int numThreads = allThreadList.length;
204: int i = 0;
205: while (i < numThreads) {
206: IPSThread ipsThread = allThreadList[i];
207: allThreadList[i++] = null;
208: if (ipsThread.isAlive()) {
209: ipsThread.stop();
210: }
211: }
212: }
213: }
214:
215: /**
216: * Returns the string representation of this thread pool that includes the
217: * name, size and the number of currently idle threads
218: */
219: public String toString() {
220: synchronized (idleThreadList) {
221: return poolName + "[" + allThreadList.length
222: + " Total threads, "
223: + ((tail >= 0) ? (tail + 1) : 0) + " Idle threads]";
224: }
225: }
226:
227: /** Returns the name of this thread pool */
228: public final String getName() {
229: return poolName;
230: }
231:
232: private final void logMessage(String msg) {
233: if (logger == null) {
234: return;
235: }
236: // logger.info(msg);
237: Object[] params = { msg };
238: logger.log(Level.INFO, "PSSR_CSPU097", params);
239: }
240:
241: /** *********************************************************************** */
242: /** An inner class that actually executes the user-defined tasks. */
243: private final class IPSThread {
244: // This internal thread will handle only one task at a time
245: private volatile Runnable task;
246:
247: private Thread thread;
248:
249: private volatile boolean stopped = false;
250:
251: IPSThread(String name, boolean daemon) {
252: Runnable r = new Runnable() {
253: public void run() {
254: try {
255: runTask();
256: } catch (Throwable t) {
257: // In case any exception slips through
258: if (logger != null) {
259: // logger.log(Level.SEVERE,
260: // IPSThread.this.thread.getName() + " caught
261: // exception that fell through", t);
262: Object[] params = {
263: IPSThread.this .thread.getName(), t };
264: logger.log(Level.SEVERE, "PSSR_CSPU098",
265: params);
266: }
267: }
268: }
269: };
270: thread = new Thread(r, name);
271: thread.setDaemon(daemon);
272: thread.start();
273: }
274:
275: /** Explicitly suppressed because it is not supported */
276: private IPSThread() {
277: }
278:
279: /**
280: * Accepts the user-defined task for execution. Note that by design this
281: * method is called only when this thread is idle.
282: */
283: final synchronized void process(Runnable task)
284: throws InterruptedException {
285: this .task = task;
286:
287: // Since only one thread can wait on this object, notifyAll() is
288: // not used.
289: notify();
290: }
291:
292: /**
293: * Actually runs the user-defined task. This thread adds itself to the
294: * idleThreadList in the parent pool and waits to be assinged a task.
295: * When the task is assinged, it goes ahead and executes it. While
296: * executing, this thread is not on the idleThreadList.
297: */
298: private void runTask() {
299: while (!stopped) {
300: try {
301: // This thread is ready to rock-n-roll! Add this thread to
302: // the idleThreadList
303: synchronized (idleThreadList) {
304: idleThreadList[++tail] = this ;
305:
306: // If idleThreadList was empty, notify the waiting
307: // threads
308: if (tail == 0) {
309: idleThreadList.notifyAll();
310: }
311: }
312:
313: // Now wait until the parent pool assigns this thread a task
314: synchronized (this ) {
315: while (task == null) {
316: wait();
317: }
318: }
319:
320: // logMessage(thread.getName() + " is running the task '" +
321: // task + "'");
322: try {
323: task.run();
324: } catch (Exception e) {
325: if (logger != null) {
326: // logger.log(Level.SEVERE, thread.getName() + "
327: // caught exception that fell through from " + task
328: // + ".run()", e);
329: Object[] params = { thread.getName(), task,
330: e };
331: logger.log(Level.SEVERE, "PSSR_CSPU099",
332: params);
333: }
334: } finally {
335: // Clear the interrupted flag (in case it comes back
336: // set) so that if the loops goes again, the task.wait()
337: // does not mistakenly throw an InterruptedException.
338:
339: Thread.interrupted();
340: }
341: } catch (InterruptedException e) {
342: // This catch must be here (in addition to the one inside
343: // the corresponding try{}) so that a task is not run
344: // mistakenly after this thread is interrupted.
345:
346: Thread.currentThread().interrupt(); // re-assert
347: } catch (Throwable t) {
348: // Fatal exception occurred. But we don't want to stop this
349: // thread as that might only deplete the thread pool.
350: if (logger != null) {
351: // logger.log(Level.SEVERE, thread.getName() + ":
352: // runTask() caught throwable. Investigate the problem",
353: // t);
354: Object[] params = { thread.getName(), t };
355: logger
356: .log(Level.SEVERE, "PSSR_CSPU100",
357: params);
358: }
359: } finally {
360: // set task to null so that the task doesn't get executed
361: // repeatedly.
362: // logMessage(thread.getName() +
363: // " compeleted the task '" + task + "'");
364: task = null;
365: }
366: }
367: if (logger != null) {
368: // logger.severe(thread.getName() + " stopped.");
369: Object[] params4 = { " stopped." };
370: logger.log(Level.SEVERE, "PSSR_CSPU101", params4);
371: }
372: }
373:
374: /**
375: * Stops this thread. This method may return before this thread actually
376: * dies.
377: */
378: private final void stop() {
379: logMessage(thread.getName() + " received stop() request.");
380: stopped = true;
381: thread.interrupt();
382: }
383:
384: /** Gets the name of this thread */
385: final String getName() {
386: return thread.getName();
387: }
388:
389: /** Checks if this thread is alive or dead */
390: final boolean isAlive() {
391: return thread.isAlive();
392: }
393: }
394: }
|