001: /*
002: * Copyright 1999-2004 The Apache Software Foundation
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.tomcat.util.threads;
018:
019: import java.util.*;
020: import org.apache.commons.logging.Log;
021: import org.apache.commons.logging.LogFactory;
022: import org.apache.tomcat.util.res.StringManager;
023:
024: /**
025: * A thread pool that is trying to copy the apache process management.
026: *
027: * Should we remove this in favor of Doug Lea's thread package?
028: *
029: * @author Gal Shachor
030: * @author Yoav Shapira <yoavs@apache.org>
031: */
032: public class ThreadPool {
033:
034: private static Log log = LogFactory.getLog(ThreadPool.class);
035:
036: private static StringManager sm = StringManager
037: .getManager("org.apache.tomcat.util.threads.res");
038:
039: private static boolean logfull = true;
040:
041: /*
042: * Default values ...
043: */
044: public static final int MAX_THREADS = 200;
045: public static final int MAX_THREADS_MIN = 10;
046: public static final int MAX_SPARE_THREADS = 50;
047: public static final int MIN_SPARE_THREADS = 4;
048: public static final int WORK_WAIT_TIMEOUT = 60 * 1000;
049:
050: /*
051: * Where the threads are held.
052: */
053: protected ControlRunnable[] pool = null;
054:
055: /*
056: * A monitor thread that monitors the pool for idel threads.
057: */
058: protected MonitorRunnable monitor;
059:
060: /*
061: * Max number of threads that you can open in the pool.
062: */
063: protected int maxThreads;
064:
065: /*
066: * Min number of idel threads that you can leave in the pool.
067: */
068: protected int minSpareThreads;
069:
070: /*
071: * Max number of idel threads that you can leave in the pool.
072: */
073: protected int maxSpareThreads;
074:
075: /*
076: * Number of threads in the pool.
077: */
078: protected int currentThreadCount;
079:
080: /*
081: * Number of busy threads in the pool.
082: */
083: protected int currentThreadsBusy;
084:
085: /*
086: * Flag that the pool should terminate all the threads and stop.
087: */
088: protected boolean stopThePool;
089:
090: /* Flag to control if the main thread is 'daemon' */
091: protected boolean isDaemon = true;
092:
093: /** The threads that are part of the pool.
094: * Key is Thread, value is the ControlRunnable
095: */
096: protected Hashtable threads = new Hashtable();
097:
098: protected Vector listeners = new Vector();
099:
100: /** Name of the threadpool
101: */
102: protected String name = "TP";
103:
104: /**
105: * Sequence.
106: */
107: protected int sequence = 1;
108:
109: /**
110: * Thread priority.
111: */
112: protected int threadPriority = Thread.NORM_PRIORITY;
113:
114: /**
115: * Constructor.
116: */
117: public ThreadPool() {
118: maxThreads = MAX_THREADS;
119: maxSpareThreads = MAX_SPARE_THREADS;
120: minSpareThreads = MIN_SPARE_THREADS;
121: currentThreadCount = 0;
122: currentThreadsBusy = 0;
123: stopThePool = false;
124: }
125:
126: /** Create a ThreadPool instance.
127: *
128: * @param jmx UNUSED
129: * @return ThreadPool instance. If JMX support is requested, you need to
130: * call register() in order to set a name.
131: */
132: public static ThreadPool createThreadPool(boolean jmx) {
133: return new ThreadPool();
134: }
135:
136: public synchronized void start() {
137: stopThePool = false;
138: currentThreadCount = 0;
139: currentThreadsBusy = 0;
140:
141: adjustLimits();
142:
143: pool = new ControlRunnable[maxThreads];
144:
145: openThreads(minSpareThreads);
146: if (maxSpareThreads < maxThreads) {
147: monitor = new MonitorRunnable(this );
148: }
149: }
150:
151: public MonitorRunnable getMonitor() {
152: return monitor;
153: }
154:
155: /**
156: * Sets the thread priority for current
157: * and future threads in this pool.
158: *
159: * @param threadPriority The new priority
160: * @throws IllegalArgumentException If the specified
161: * priority is less than Thread.MIN_PRIORITY or
162: * more than Thread.MAX_PRIORITY
163: */
164: public synchronized void setThreadPriority(int threadPriority) {
165: if (log.isDebugEnabled())
166: log.debug(getClass().getName() + ": setPriority("
167: + threadPriority + "): here.");
168:
169: if (threadPriority < Thread.MIN_PRIORITY) {
170: throw new IllegalArgumentException(
171: "new priority < MIN_PRIORITY");
172: } else if (threadPriority > Thread.MAX_PRIORITY) {
173: throw new IllegalArgumentException(
174: "new priority > MIN_PRIORITY");
175: }
176:
177: // Set for future threads
178: this .threadPriority = threadPriority;
179:
180: Enumeration currentThreads = getThreads();
181: Thread t = null;
182: while (currentThreads.hasMoreElements()) {
183: t = (Thread) currentThreads.nextElement();
184: t.setPriority(threadPriority);
185: }
186: }
187:
188: /**
189: * Returns the priority level of current and
190: * future threads in this pool.
191: *
192: * @return The priority
193: */
194: public int getThreadPriority() {
195: return threadPriority;
196: }
197:
198: public void setMaxThreads(int maxThreads) {
199: this .maxThreads = maxThreads;
200: }
201:
202: public int getMaxThreads() {
203: return maxThreads;
204: }
205:
206: public void setMinSpareThreads(int minSpareThreads) {
207: this .minSpareThreads = minSpareThreads;
208: }
209:
210: public int getMinSpareThreads() {
211: return minSpareThreads;
212: }
213:
214: public void setMaxSpareThreads(int maxSpareThreads) {
215: this .maxSpareThreads = maxSpareThreads;
216: }
217:
218: public int getMaxSpareThreads() {
219: return maxSpareThreads;
220: }
221:
222: public int getCurrentThreadCount() {
223: return currentThreadCount;
224: }
225:
226: public int getCurrentThreadsBusy() {
227: return currentThreadsBusy;
228: }
229:
230: public boolean isDaemon() {
231: return isDaemon;
232: }
233:
234: public static int getDebug() {
235: return 0;
236: }
237:
238: /** The default is true - the created threads will be
239: * in daemon mode. If set to false, the control thread
240: * will not be daemon - and will keep the process alive.
241: */
242: public void setDaemon(boolean b) {
243: isDaemon = b;
244: }
245:
246: public boolean getDaemon() {
247: return isDaemon;
248: }
249:
250: public void setName(String name) {
251: this .name = name;
252: }
253:
254: public String getName() {
255: return name;
256: }
257:
258: public int getSequence() {
259: return sequence++;
260: }
261:
262: public void addThread(Thread t, ControlRunnable cr) {
263: threads.put(t, cr);
264: for (int i = 0; i < listeners.size(); i++) {
265: ThreadPoolListener tpl = (ThreadPoolListener) listeners
266: .elementAt(i);
267: tpl.threadStart(this , t);
268: }
269: }
270:
271: public void removeThread(Thread t) {
272: threads.remove(t);
273: for (int i = 0; i < listeners.size(); i++) {
274: ThreadPoolListener tpl = (ThreadPoolListener) listeners
275: .elementAt(i);
276: tpl.threadEnd(this , t);
277: }
278: }
279:
280: public void addThreadPoolListener(ThreadPoolListener tpl) {
281: listeners.addElement(tpl);
282: }
283:
284: public Enumeration getThreads() {
285: return threads.keys();
286: }
287:
288: public void run(Runnable r) {
289: ControlRunnable c = findControlRunnable();
290: c.runIt(r);
291: }
292:
293: //
294: // You may wonder what you see here ... basically I am trying
295: // to maintain a stack of threads. This way locality in time
296: // is kept and there is a better chance to find residues of the
297: // thread in memory next time it runs.
298: //
299:
300: /**
301: * Executes a given Runnable on a thread in the pool, block if needed.
302: */
303: public void runIt(ThreadPoolRunnable r) {
304: if (null == r) {
305: throw new NullPointerException();
306: }
307:
308: ControlRunnable c = findControlRunnable();
309: c.runIt(r);
310: }
311:
312: private ControlRunnable findControlRunnable() {
313: ControlRunnable c = null;
314:
315: if (stopThePool) {
316: throw new IllegalStateException();
317: }
318:
319: // Obtain a free thread from the pool.
320: synchronized (this ) {
321:
322: while (currentThreadsBusy == currentThreadCount) {
323: // All threads are busy
324: if (currentThreadCount < maxThreads) {
325: // Not all threads were open,
326: // Open new threads up to the max number of idel threads
327: int toOpen = currentThreadCount + minSpareThreads;
328: openThreads(toOpen);
329: } else {
330: logFull(log, currentThreadCount, maxThreads);
331: // Wait for a thread to become idel.
332: try {
333: this .wait();
334: }
335: // was just catch Throwable -- but no other
336: // exceptions can be thrown by wait, right?
337: // So we catch and ignore this one, since
338: // it'll never actually happen, since nowhere
339: // do we say pool.interrupt().
340: catch (InterruptedException e) {
341: log.error("Unexpected exception", e);
342: }
343: if (log.isDebugEnabled()) {
344: log.debug("Finished waiting: CTC="
345: + currentThreadCount + ", CTB="
346: + currentThreadsBusy);
347: }
348: // Pool was stopped. Get away of the pool.
349: if (stopThePool) {
350: break;
351: }
352: }
353: }
354: // Pool was stopped. Get away of the pool.
355: if (0 == currentThreadCount || stopThePool) {
356: throw new IllegalStateException();
357: }
358:
359: // If we are here it means that there is a free thread. Take it.
360: int pos = currentThreadCount - currentThreadsBusy - 1;
361: c = pool[pos];
362: pool[pos] = null;
363: currentThreadsBusy++;
364:
365: }
366: return c;
367: }
368:
369: private static void logFull(Log loghelper, int currentThreadCount,
370: int maxThreads) {
371: if (logfull) {
372: log.error(sm.getString("threadpool.busy", new Integer(
373: currentThreadCount), new Integer(maxThreads)));
374: logfull = false;
375: } else if (log.isDebugEnabled()) {
376: log.debug("All threads are busy " + currentThreadCount
377: + " " + maxThreads);
378: }
379: }
380:
381: /**
382: * Stop the thread pool
383: */
384: public synchronized void shutdown() {
385: if (!stopThePool) {
386: stopThePool = true;
387: if (monitor != null) {
388: monitor.terminate();
389: monitor = null;
390: }
391: for (int i = 0; i < currentThreadCount - currentThreadsBusy; i++) {
392: try {
393: pool[i].terminate();
394: } catch (Throwable t) {
395: /*
396: * Do nothing... The show must go on, we are shutting
397: * down the pool and nothing should stop that.
398: */
399: log
400: .error(
401: "Ignored exception while shutting down thread pool",
402: t);
403: }
404: }
405: currentThreadsBusy = currentThreadCount = 0;
406: pool = null;
407: notifyAll();
408: }
409: }
410:
411: /**
412: * Called by the monitor thread to harvest idle threads.
413: */
414: protected synchronized void checkSpareControllers() {
415:
416: if (stopThePool) {
417: return;
418: }
419:
420: if ((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
421: int toFree = currentThreadCount - currentThreadsBusy
422: - maxSpareThreads;
423:
424: for (int i = 0; i < toFree; i++) {
425: ControlRunnable c = pool[currentThreadCount
426: - currentThreadsBusy - 1];
427: c.terminate();
428: pool[currentThreadCount - currentThreadsBusy - 1] = null;
429: currentThreadCount--;
430: }
431:
432: }
433:
434: }
435:
436: /**
437: * Returns the thread to the pool.
438: * Called by threads as they are becoming idel.
439: */
440: protected synchronized void returnController(ControlRunnable c) {
441:
442: if (0 == currentThreadCount || stopThePool) {
443: c.terminate();
444: return;
445: }
446:
447: // atomic
448: currentThreadsBusy--;
449:
450: pool[currentThreadCount - currentThreadsBusy - 1] = c;
451: notify();
452: }
453:
454: /**
455: * Inform the pool that the specific thread finish.
456: *
457: * Called by the ControlRunnable.run() when the runnable
458: * throws an exception.
459: */
460: protected synchronized void notifyThreadEnd(ControlRunnable c) {
461: currentThreadsBusy--;
462: currentThreadCount--;
463: notify();
464: }
465:
466: /*
467: * Checks for problematic configuration and fix it.
468: * The fix provides reasonable settings for a single CPU
469: * with medium load.
470: */
471: protected void adjustLimits() {
472: if (maxThreads <= 0) {
473: maxThreads = MAX_THREADS;
474: } else if (maxThreads < MAX_THREADS_MIN) {
475: log.warn(sm.getString("threadpool.max_threads_too_low",
476: new Integer(maxThreads), new Integer(
477: MAX_THREADS_MIN)));
478: maxThreads = MAX_THREADS_MIN;
479: }
480:
481: if (maxSpareThreads >= maxThreads) {
482: maxSpareThreads = maxThreads;
483: }
484:
485: if (maxSpareThreads <= 0) {
486: if (1 == maxThreads) {
487: maxSpareThreads = 1;
488: } else {
489: maxSpareThreads = maxThreads / 2;
490: }
491: }
492:
493: if (minSpareThreads > maxSpareThreads) {
494: minSpareThreads = maxSpareThreads;
495: }
496:
497: if (minSpareThreads <= 0) {
498: if (1 == maxSpareThreads) {
499: minSpareThreads = 1;
500: } else {
501: minSpareThreads = maxSpareThreads / 2;
502: }
503: }
504: }
505:
506: /** Create missing threads.
507: *
508: * @param toOpen Total number of threads we'll have open
509: */
510: protected void openThreads(int toOpen) {
511:
512: if (toOpen > maxThreads) {
513: toOpen = maxThreads;
514: }
515:
516: for (int i = currentThreadCount; i < toOpen; i++) {
517: pool[i - currentThreadsBusy] = new ControlRunnable(this );
518: }
519:
520: currentThreadCount = toOpen;
521: }
522:
523: /** @deprecated */
524: void log(String s) {
525: log.info(s);
526: //loghelper.flush();
527: }
528:
529: /**
530: * Periodically execute an action - cleanup in this case
531: */
532: public static class MonitorRunnable implements Runnable {
533: ThreadPool p;
534: Thread t;
535: int interval = WORK_WAIT_TIMEOUT;
536: boolean shouldTerminate;
537:
538: MonitorRunnable(ThreadPool p) {
539: this .p = p;
540: this .start();
541: }
542:
543: public void start() {
544: shouldTerminate = false;
545: t = new Thread(this );
546: t.setDaemon(p.getDaemon());
547: t.setName(p.getName() + "-Monitor");
548: t.start();
549: }
550:
551: public void setInterval(int i) {
552: this .interval = i;
553: }
554:
555: public void run() {
556: while (true) {
557: try {
558:
559: // Sleep for a while.
560: synchronized (this ) {
561: this .wait(WORK_WAIT_TIMEOUT);
562: }
563:
564: // Check if should terminate.
565: // termination happens when the pool is shutting down.
566: if (shouldTerminate) {
567: break;
568: }
569:
570: // Harvest idle threads.
571: p.checkSpareControllers();
572:
573: } catch (Throwable t) {
574: ThreadPool.log.error("Unexpected exception", t);
575: }
576: }
577: }
578:
579: public void stop() {
580: this .terminate();
581: }
582:
583: /** Stop the monitor
584: */
585: public synchronized void terminate() {
586: shouldTerminate = true;
587: this .notify();
588: }
589: }
590:
591: /**
592: * A Thread object that executes various actions ( ThreadPoolRunnable )
593: * under control of ThreadPool
594: */
595: public static class ControlRunnable implements Runnable {
596: /**
597: * ThreadPool where this thread will be returned
598: */
599: private ThreadPool p;
600:
601: /**
602: * The thread that executes the actions
603: */
604: private ThreadWithAttributes t;
605:
606: /**
607: * The method that is executed in this thread
608: */
609:
610: private ThreadPoolRunnable toRun;
611: private Runnable toRunRunnable;
612:
613: /**
614: * Stop this thread
615: */
616: private boolean shouldTerminate;
617:
618: /**
619: * Activate the execution of the action
620: */
621: private boolean shouldRun;
622:
623: /**
624: * Per thread data - can be used only if all actions are
625: * of the same type.
626: * A better mechanism is possible ( that would allow association of
627: * thread data with action type ), but right now it's enough.
628: */
629: private boolean noThData;
630:
631: /**
632: * Start a new thread, with no method in it
633: */
634: ControlRunnable(ThreadPool p) {
635: toRun = null;
636: shouldTerminate = false;
637: shouldRun = false;
638: this .p = p;
639: t = new ThreadWithAttributes(p, this );
640: t.setDaemon(true);
641: t.setName(p.getName() + "-Processor" + p.getSequence());
642: p.addThread(t, this );
643: noThData = true;
644: t.start();
645: }
646:
647: public void run() {
648: boolean _shouldRun = false;
649: boolean _shouldTerminate = false;
650: ThreadPoolRunnable _toRun = null;
651: try {
652: while (true) {
653: try {
654: /* Wait for work. */
655: synchronized (this ) {
656: while (!shouldRun && !shouldTerminate) {
657: this .wait();
658: }
659: _shouldRun = shouldRun;
660: _shouldTerminate = shouldTerminate;
661: _toRun = toRun;
662: }
663:
664: if (_shouldTerminate) {
665: if (ThreadPool.log.isDebugEnabled())
666: ThreadPool.log.debug("Terminate");
667: break;
668: }
669:
670: /* Check if should execute a runnable. */
671: try {
672: if (noThData) {
673: if (_toRun != null) {
674: Object thData[] = _toRun
675: .getInitData();
676: t.setThreadData(p, thData);
677: if (ThreadPool.log.isDebugEnabled())
678: ThreadPool.log
679: .debug("Getting new thread data");
680: }
681: noThData = false;
682: }
683:
684: if (_shouldRun) {
685: if (_toRun != null) {
686: _toRun.runIt(t.getThreadData(p));
687: } else if (toRunRunnable != null) {
688: toRunRunnable.run();
689: } else {
690: if (ThreadPool.log.isDebugEnabled())
691: ThreadPool.log
692: .debug("No toRun ???");
693: }
694: }
695: } catch (Throwable t) {
696: ThreadPool.log.error(sm.getString(
697: "threadpool.thread_error", t, toRun
698: .toString()));
699: /*
700: * The runnable throw an exception (can be even a ThreadDeath),
701: * signalling that the thread die.
702: *
703: * The meaning is that we should release the thread from
704: * the pool.
705: */
706: shouldTerminate = true;
707: shouldRun = false;
708: p.notifyThreadEnd(this );
709: } finally {
710: if (_shouldRun) {
711: shouldRun = false;
712: /*
713: * Notify the pool that the thread is now idle.
714: */
715: p.returnController(this );
716: }
717: }
718:
719: /*
720: * Check if should terminate.
721: * termination happens when the pool is shutting down.
722: */
723: if (_shouldTerminate) {
724: break;
725: }
726: } catch (InterruptedException ie) { /* for the wait operation */
727: // can never happen, since we don't call interrupt
728: ThreadPool.log
729: .error("Unexpected exception", ie);
730: }
731: }
732: } finally {
733: p.removeThread(Thread.currentThread());
734: }
735: }
736:
737: /** Run a task
738: *
739: * @param toRun
740: */
741: public synchronized void runIt(Runnable toRun) {
742: this .toRunRunnable = toRun;
743: // Do not re-init, the whole idea is to run init only once per
744: // thread - the pool is supposed to run a single task, that is
745: // initialized once.
746: // noThData = true;
747: shouldRun = true;
748: this .notify();
749: }
750:
751: /** Run a task
752: *
753: * @param toRun
754: */
755: public synchronized void runIt(ThreadPoolRunnable toRun) {
756: this .toRun = toRun;
757: // Do not re-init, the whole idea is to run init only once per
758: // thread - the pool is supposed to run a single task, that is
759: // initialized once.
760: // noThData = true;
761: shouldRun = true;
762: this .notify();
763: }
764:
765: public void stop() {
766: this .terminate();
767: }
768:
769: public void kill() {
770: t.stop();
771: }
772:
773: public synchronized void terminate() {
774: shouldTerminate = true;
775: this .notify();
776: }
777: }
778:
779: /** Debug display of the stage of each thread. The return is html style,
780: * for display in the console ( it can be easily parsed too )
781: *
782: * @return
783: */
784: public String threadStatusString() {
785: StringBuffer sb = new StringBuffer();
786: Iterator it = threads.keySet().iterator();
787: sb.append("<ul>");
788: while (it.hasNext()) {
789: sb.append("<li>");
790: ThreadWithAttributes twa = (ThreadWithAttributes) it.next();
791: sb.append(twa.getCurrentStage(this )).append(" ");
792: sb.append(twa.getParam(this ));
793: sb.append("</li>\n");
794: }
795: sb.append("</ul>");
796: return sb.toString();
797: }
798:
799: /** Return an array with the status of each thread. The status
800: * indicates the current request processing stage ( for tomcat ) or
801: * whatever the thread is doing ( if the application using TP provide
802: * this info )
803: *
804: * @return
805: */
806: public String[] getThreadStatus() {
807: String status[] = new String[threads.size()];
808: Iterator it = threads.keySet().iterator();
809: for (int i = 0; (i < status.length && it.hasNext()); i++) {
810: ThreadWithAttributes twa = (ThreadWithAttributes) it.next();
811: status[i] = twa.getCurrentStage(this );
812: }
813: return status;
814: }
815:
816: /** Return an array with the current "param" ( XXX better name ? )
817: * of each thread. This is typically the last request.
818: *
819: * @return
820: */
821: public String[] getThreadParam() {
822: String status[] = new String[threads.size()];
823: Iterator it = threads.keySet().iterator();
824: for (int i = 0; (i < status.length && it.hasNext()); i++) {
825: ThreadWithAttributes twa = (ThreadWithAttributes) it.next();
826: Object o = twa.getParam(this );
827: status[i] = (o == null) ? null : o.toString();
828: }
829: return status;
830: }
831:
832: /** Interface to allow applications to be notified when
833: * a threads are created and stopped.
834: */
835: public static interface ThreadPoolListener {
836: public void threadStart(ThreadPool tp, Thread t);
837:
838: public void threadEnd(ThreadPool tp, Thread t);
839: }
840: }
|