001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.tomcat.util.threads;
019:
020: import java.util.*;
021:
022: import org.apache.juli.logging.Log;
023: import org.apache.juli.logging.LogFactory;
024: import org.apache.tomcat.util.res.StringManager;
025:
026: /**
027: * A thread pool that is trying to copy the apache process management.
028: *
029: * Should we remove this in favor of Doug Lea's thread package?
030: *
031: * @author Gal Shachor
032: * @author Yoav Shapira <yoavs@apache.org>
033: */
034: public class ThreadPool {
035:
036: private static Log log = LogFactory.getLog(ThreadPool.class);
037:
038: private static StringManager sm = StringManager
039: .getManager("org.apache.tomcat.util.threads.res");
040:
041: private static boolean logfull = true;
042:
043: /*
044: * Default values ...
045: */
046: public static final int MAX_THREADS = 200;
047: public static final int MAX_THREADS_MIN = 10;
048: public static final int MAX_SPARE_THREADS = 50;
049: public static final int MIN_SPARE_THREADS = 4;
050: public static final int WORK_WAIT_TIMEOUT = 60 * 1000;
051:
052: /*
053: * Where the threads are held.
054: */
055: protected ControlRunnable[] pool = null;
056:
057: /*
058: * A monitor thread that monitors the pool for idel threads.
059: */
060: protected MonitorRunnable monitor;
061:
062: /*
063: * Max number of threads that you can open in the pool.
064: */
065: protected int maxThreads;
066:
067: /*
068: * Min number of idel threads that you can leave in the pool.
069: */
070: protected int minSpareThreads;
071:
072: /*
073: * Max number of idel threads that you can leave in the pool.
074: */
075: protected int maxSpareThreads;
076:
077: /*
078: * Number of threads in the pool.
079: */
080: protected int currentThreadCount;
081:
082: /*
083: * Number of busy threads in the pool.
084: */
085: protected int currentThreadsBusy;
086:
087: /*
088: * Flag that the pool should terminate all the threads and stop.
089: */
090: protected boolean stopThePool;
091:
092: /* Flag to control if the main thread is 'daemon' */
093: protected boolean isDaemon = true;
094:
095: /** The threads that are part of the pool.
096: * Key is Thread, value is the ControlRunnable
097: */
098: protected Hashtable threads = new Hashtable();
099:
100: protected Vector listeners = new Vector();
101:
102: /** Name of the threadpool
103: */
104: protected String name = "TP";
105:
106: /**
107: * Sequence.
108: */
109: protected int sequence = 1;
110:
111: /**
112: * Thread priority.
113: */
114: protected int threadPriority = Thread.NORM_PRIORITY;
115:
116: /**
117: * Constructor.
118: */
119: public ThreadPool() {
120: maxThreads = MAX_THREADS;
121: maxSpareThreads = MAX_SPARE_THREADS;
122: minSpareThreads = MIN_SPARE_THREADS;
123: currentThreadCount = 0;
124: currentThreadsBusy = 0;
125: stopThePool = false;
126: }
127:
128: /** Create a ThreadPool instance.
129: *
130: * @param jmx UNUSED
131: * @return ThreadPool instance. If JMX support is requested, you need to
132: * call register() in order to set a name.
133: */
134: public static ThreadPool createThreadPool(boolean jmx) {
135: return new ThreadPool();
136: }
137:
138: public synchronized void start() {
139: stopThePool = false;
140: currentThreadCount = 0;
141: currentThreadsBusy = 0;
142:
143: adjustLimits();
144:
145: pool = new ControlRunnable[maxThreads];
146:
147: openThreads(minSpareThreads);
148: if (maxSpareThreads < maxThreads) {
149: monitor = new MonitorRunnable(this );
150: }
151: }
152:
153: public MonitorRunnable getMonitor() {
154: return monitor;
155: }
156:
157: /**
158: * Sets the thread priority for current
159: * and future threads in this pool.
160: *
161: * @param threadPriority The new priority
162: * @throws IllegalArgumentException If the specified
163: * priority is less than Thread.MIN_PRIORITY or
164: * more than Thread.MAX_PRIORITY
165: */
166: public synchronized void setThreadPriority(int threadPriority) {
167: if (log.isDebugEnabled())
168: log.debug(getClass().getName() + ": setPriority("
169: + threadPriority + "): here.");
170:
171: if (threadPriority < Thread.MIN_PRIORITY) {
172: throw new IllegalArgumentException(
173: "new priority < MIN_PRIORITY");
174: } else if (threadPriority > Thread.MAX_PRIORITY) {
175: throw new IllegalArgumentException(
176: "new priority > MAX_PRIORITY");
177: }
178:
179: // Set for future threads
180: this .threadPriority = threadPriority;
181:
182: Enumeration currentThreads = getThreads();
183: Thread t = null;
184: while (currentThreads.hasMoreElements()) {
185: t = (Thread) currentThreads.nextElement();
186: t.setPriority(threadPriority);
187: }
188: }
189:
190: /**
191: * Returns the priority level of current and
192: * future threads in this pool.
193: *
194: * @return The priority
195: */
196: public int getThreadPriority() {
197: return threadPriority;
198: }
199:
200: public void setMaxThreads(int maxThreads) {
201: this .maxThreads = maxThreads;
202: }
203:
204: public int getMaxThreads() {
205: return maxThreads;
206: }
207:
208: public void setMinSpareThreads(int minSpareThreads) {
209: this .minSpareThreads = minSpareThreads;
210: }
211:
212: public int getMinSpareThreads() {
213: return minSpareThreads;
214: }
215:
216: public void setMaxSpareThreads(int maxSpareThreads) {
217: this .maxSpareThreads = maxSpareThreads;
218: }
219:
220: public int getMaxSpareThreads() {
221: return maxSpareThreads;
222: }
223:
224: public int getCurrentThreadCount() {
225: return currentThreadCount;
226: }
227:
228: public int getCurrentThreadsBusy() {
229: return currentThreadsBusy;
230: }
231:
232: public boolean isDaemon() {
233: return isDaemon;
234: }
235:
236: public static int getDebug() {
237: return 0;
238: }
239:
240: /** The default is true - the created threads will be
241: * in daemon mode. If set to false, the control thread
242: * will not be daemon - and will keep the process alive.
243: */
244: public void setDaemon(boolean b) {
245: isDaemon = b;
246: }
247:
248: public boolean getDaemon() {
249: return isDaemon;
250: }
251:
252: public void setName(String name) {
253: this .name = name;
254: }
255:
256: public String getName() {
257: return name;
258: }
259:
260: public int getSequence() {
261: return sequence;
262: }
263:
264: public int incSequence() {
265: return sequence++;
266: }
267:
268: public void addThread(Thread t, ControlRunnable cr) {
269: threads.put(t, cr);
270: for (int i = 0; i < listeners.size(); i++) {
271: ThreadPoolListener tpl = (ThreadPoolListener) listeners
272: .elementAt(i);
273: tpl.threadStart(this , t);
274: }
275: }
276:
277: public void removeThread(Thread t) {
278: threads.remove(t);
279: for (int i = 0; i < listeners.size(); i++) {
280: ThreadPoolListener tpl = (ThreadPoolListener) listeners
281: .elementAt(i);
282: tpl.threadEnd(this , t);
283: }
284: }
285:
286: public void addThreadPoolListener(ThreadPoolListener tpl) {
287: listeners.addElement(tpl);
288: }
289:
290: public Enumeration getThreads() {
291: return threads.keys();
292: }
293:
294: public void run(Runnable r) {
295: ControlRunnable c = findControlRunnable();
296: c.runIt(r);
297: }
298:
299: //
300: // You may wonder what you see here ... basically I am trying
301: // to maintain a stack of threads. This way locality in time
302: // is kept and there is a better chance to find residues of the
303: // thread in memory next time it runs.
304: //
305:
306: /**
307: * Executes a given Runnable on a thread in the pool, block if needed.
308: */
309: public void runIt(ThreadPoolRunnable r) {
310: if (null == r) {
311: throw new NullPointerException();
312: }
313:
314: ControlRunnable c = findControlRunnable();
315: c.runIt(r);
316: }
317:
318: private ControlRunnable findControlRunnable() {
319: ControlRunnable c = null;
320:
321: if (stopThePool) {
322: throw new IllegalStateException();
323: }
324:
325: // Obtain a free thread from the pool.
326: synchronized (this ) {
327:
328: while (currentThreadsBusy == currentThreadCount) {
329: // All threads are busy
330: if (currentThreadCount < maxThreads) {
331: // Not all threads were open,
332: // Open new threads up to the max number of idel threads
333: int toOpen = currentThreadCount + minSpareThreads;
334: openThreads(toOpen);
335: } else {
336: logFull(log, currentThreadCount, maxThreads);
337: // Wait for a thread to become idel.
338: try {
339: this .wait();
340: }
341: // was just catch Throwable -- but no other
342: // exceptions can be thrown by wait, right?
343: // So we catch and ignore this one, since
344: // it'll never actually happen, since nowhere
345: // do we say pool.interrupt().
346: catch (InterruptedException e) {
347: log.error("Unexpected exception", e);
348: }
349: if (log.isDebugEnabled()) {
350: log.debug("Finished waiting: CTC="
351: + currentThreadCount + ", CTB="
352: + currentThreadsBusy);
353: }
354: // Pool was stopped. Get away of the pool.
355: if (stopThePool) {
356: break;
357: }
358: }
359: }
360: // Pool was stopped. Get away of the pool.
361: if (0 == currentThreadCount || stopThePool) {
362: throw new IllegalStateException();
363: }
364:
365: // If we are here it means that there is a free thread. Take it.
366: int pos = currentThreadCount - currentThreadsBusy - 1;
367: c = pool[pos];
368: pool[pos] = null;
369: currentThreadsBusy++;
370:
371: }
372: return c;
373: }
374:
375: private static void logFull(Log loghelper, int currentThreadCount,
376: int maxThreads) {
377: if (logfull) {
378: log.error(sm.getString("threadpool.busy", new Integer(
379: currentThreadCount), new Integer(maxThreads)));
380: logfull = false;
381: } else if (log.isDebugEnabled()) {
382: log.debug("All threads are busy " + currentThreadCount
383: + " " + maxThreads);
384: }
385: }
386:
387: /**
388: * Stop the thread pool
389: */
390: public synchronized void shutdown() {
391: if (!stopThePool) {
392: stopThePool = true;
393: if (monitor != null) {
394: monitor.terminate();
395: monitor = null;
396: }
397: for (int i = 0; i < currentThreadCount - currentThreadsBusy; i++) {
398: try {
399: pool[i].terminate();
400: } catch (Throwable t) {
401: /*
402: * Do nothing... The show must go on, we are shutting
403: * down the pool and nothing should stop that.
404: */
405: log
406: .error(
407: "Ignored exception while shutting down thread pool",
408: t);
409: }
410: }
411: currentThreadsBusy = currentThreadCount = 0;
412: pool = null;
413: notifyAll();
414: }
415: }
416:
417: /**
418: * Called by the monitor thread to harvest idle threads.
419: */
420: protected synchronized void checkSpareControllers() {
421:
422: if (stopThePool) {
423: return;
424: }
425:
426: if ((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
427: int toFree = currentThreadCount - currentThreadsBusy
428: - maxSpareThreads;
429:
430: for (int i = 0; i < toFree; i++) {
431: ControlRunnable c = pool[currentThreadCount
432: - currentThreadsBusy - 1];
433: c.terminate();
434: pool[currentThreadCount - currentThreadsBusy - 1] = null;
435: currentThreadCount--;
436: }
437:
438: }
439:
440: }
441:
442: /**
443: * Returns the thread to the pool.
444: * Called by threads as they are becoming idel.
445: */
446: protected synchronized void returnController(ControlRunnable c) {
447:
448: if (0 == currentThreadCount || stopThePool) {
449: c.terminate();
450: return;
451: }
452:
453: // atomic
454: currentThreadsBusy--;
455:
456: pool[currentThreadCount - currentThreadsBusy - 1] = c;
457: notify();
458: }
459:
460: /**
461: * Inform the pool that the specific thread finish.
462: *
463: * Called by the ControlRunnable.run() when the runnable
464: * throws an exception.
465: */
466: protected synchronized void notifyThreadEnd(ControlRunnable c) {
467: currentThreadsBusy--;
468: currentThreadCount--;
469: notify();
470: }
471:
472: /*
473: * Checks for problematic configuration and fix it.
474: * The fix provides reasonable settings for a single CPU
475: * with medium load.
476: */
477: protected void adjustLimits() {
478: if (maxThreads <= 0) {
479: maxThreads = MAX_THREADS;
480: } else if (maxThreads < MAX_THREADS_MIN) {
481: log.warn(sm.getString("threadpool.max_threads_too_low",
482: new Integer(maxThreads), new Integer(
483: MAX_THREADS_MIN)));
484: maxThreads = MAX_THREADS_MIN;
485: }
486:
487: if (maxSpareThreads >= maxThreads) {
488: maxSpareThreads = maxThreads;
489: }
490:
491: if (maxSpareThreads <= 0) {
492: if (1 == maxThreads) {
493: maxSpareThreads = 1;
494: } else {
495: maxSpareThreads = maxThreads / 2;
496: }
497: }
498:
499: if (minSpareThreads > maxSpareThreads) {
500: minSpareThreads = maxSpareThreads;
501: }
502:
503: if (minSpareThreads <= 0) {
504: if (1 == maxSpareThreads) {
505: minSpareThreads = 1;
506: } else {
507: minSpareThreads = maxSpareThreads / 2;
508: }
509: }
510: }
511:
512: /** Create missing threads.
513: *
514: * @param toOpen Total number of threads we'll have open
515: */
516: protected void openThreads(int toOpen) {
517:
518: if (toOpen > maxThreads) {
519: toOpen = maxThreads;
520: }
521:
522: for (int i = currentThreadCount; i < toOpen; i++) {
523: pool[i - currentThreadsBusy] = new ControlRunnable(this );
524: }
525:
526: currentThreadCount = toOpen;
527: }
528:
529: /** @deprecated */
530: void log(String s) {
531: log.info(s);
532: //loghelper.flush();
533: }
534:
535: /**
536: * Periodically execute an action - cleanup in this case
537: */
538: public static class MonitorRunnable implements Runnable {
539: ThreadPool p;
540: Thread t;
541: int interval = WORK_WAIT_TIMEOUT;
542: boolean shouldTerminate;
543:
544: MonitorRunnable(ThreadPool p) {
545: this .p = p;
546: this .start();
547: }
548:
549: public void start() {
550: shouldTerminate = false;
551: t = new Thread(this );
552: t.setDaemon(p.getDaemon());
553: t.setName(p.getName() + "-Monitor");
554: t.start();
555: }
556:
557: public void setInterval(int i) {
558: this .interval = i;
559: }
560:
561: public void run() {
562: while (true) {
563: try {
564:
565: // Sleep for a while.
566: synchronized (this ) {
567: this .wait(interval);
568: }
569:
570: // Check if should terminate.
571: // termination happens when the pool is shutting down.
572: if (shouldTerminate) {
573: break;
574: }
575:
576: // Harvest idle threads.
577: p.checkSpareControllers();
578:
579: } catch (Throwable t) {
580: ThreadPool.log.error("Unexpected exception", t);
581: }
582: }
583: }
584:
585: public void stop() {
586: this .terminate();
587: }
588:
589: /** Stop the monitor
590: */
591: public synchronized void terminate() {
592: shouldTerminate = true;
593: this .notify();
594: }
595: }
596:
597: /**
598: * A Thread object that executes various actions ( ThreadPoolRunnable )
599: * under control of ThreadPool
600: */
601: public static class ControlRunnable implements Runnable {
602: /**
603: * ThreadPool where this thread will be returned
604: */
605: private ThreadPool p;
606:
607: /**
608: * The thread that executes the actions
609: */
610: private ThreadWithAttributes t;
611:
612: /**
613: * The method that is executed in this thread
614: */
615:
616: private ThreadPoolRunnable toRun;
617: private Runnable toRunRunnable;
618:
619: /**
620: * Stop this thread
621: */
622: private boolean shouldTerminate;
623:
624: /**
625: * Activate the execution of the action
626: */
627: private boolean shouldRun;
628:
629: /**
630: * Per thread data - can be used only if all actions are
631: * of the same type.
632: * A better mechanism is possible ( that would allow association of
633: * thread data with action type ), but right now it's enough.
634: */
635: private boolean noThData;
636:
637: /**
638: * Start a new thread, with no method in it
639: */
640: ControlRunnable(ThreadPool p) {
641: toRun = null;
642: shouldTerminate = false;
643: shouldRun = false;
644: this .p = p;
645: t = new ThreadWithAttributes(p, this );
646: t.setDaemon(true);
647: t.setName(p.getName() + "-Processor" + p.incSequence());
648: t.setPriority(p.getThreadPriority());
649: p.addThread(t, this );
650: noThData = true;
651: t.start();
652: }
653:
654: public void run() {
655: boolean _shouldRun = false;
656: boolean _shouldTerminate = false;
657: ThreadPoolRunnable _toRun = null;
658: try {
659: while (true) {
660: try {
661: /* Wait for work. */
662: synchronized (this ) {
663: while (!shouldRun && !shouldTerminate) {
664: this .wait();
665: }
666: _shouldRun = shouldRun;
667: _shouldTerminate = shouldTerminate;
668: _toRun = toRun;
669: }
670:
671: if (_shouldTerminate) {
672: if (ThreadPool.log.isDebugEnabled())
673: ThreadPool.log.debug("Terminate");
674: break;
675: }
676:
677: /* Check if should execute a runnable. */
678: try {
679: if (noThData) {
680: if (_toRun != null) {
681: Object thData[] = _toRun
682: .getInitData();
683: t.setThreadData(p, thData);
684: if (ThreadPool.log.isDebugEnabled())
685: ThreadPool.log
686: .debug("Getting new thread data");
687: }
688: noThData = false;
689: }
690:
691: if (_shouldRun) {
692: if (_toRun != null) {
693: _toRun.runIt(t.getThreadData(p));
694: } else if (toRunRunnable != null) {
695: toRunRunnable.run();
696: } else {
697: if (ThreadPool.log.isDebugEnabled())
698: ThreadPool.log
699: .debug("No toRun ???");
700: }
701: }
702: } catch (Throwable t) {
703: ThreadPool.log.error(sm.getString(
704: "threadpool.thread_error", t, toRun
705: .toString()));
706: /*
707: * The runnable throw an exception (can be even a ThreadDeath),
708: * signalling that the thread die.
709: *
710: * The meaning is that we should release the thread from
711: * the pool.
712: */
713: _shouldTerminate = true;
714: _shouldRun = false;
715: p.notifyThreadEnd(this );
716: } finally {
717: if (_shouldRun) {
718: shouldRun = false;
719: /*
720: * Notify the pool that the thread is now idle.
721: */
722: p.returnController(this );
723: }
724: }
725:
726: /*
727: * Check if should terminate.
728: * termination happens when the pool is shutting down.
729: */
730: if (_shouldTerminate) {
731: break;
732: }
733: } catch (InterruptedException ie) { /* for the wait operation */
734: // can never happen, since we don't call interrupt
735: ThreadPool.log
736: .error("Unexpected exception", ie);
737: }
738: }
739: } finally {
740: p.removeThread(Thread.currentThread());
741: }
742: }
743:
744: /** Run a task
745: *
746: * @param toRun
747: */
748: public synchronized void runIt(Runnable toRun) {
749: this .toRunRunnable = toRun;
750: // Do not re-init, the whole idea is to run init only once per
751: // thread - the pool is supposed to run a single task, that is
752: // initialized once.
753: // noThData = true;
754: shouldRun = true;
755: this .notify();
756: }
757:
758: /** Run a task
759: *
760: * @param toRun
761: */
762: public synchronized void runIt(ThreadPoolRunnable toRun) {
763: this .toRun = toRun;
764: // Do not re-init, the whole idea is to run init only once per
765: // thread - the pool is supposed to run a single task, that is
766: // initialized once.
767: // noThData = true;
768: shouldRun = true;
769: this .notify();
770: }
771:
772: public void stop() {
773: this .terminate();
774: }
775:
776: public void kill() {
777: t.stop();
778: }
779:
780: public synchronized void terminate() {
781: shouldTerminate = true;
782: this .notify();
783: }
784: }
785:
786: /**
787: * Debug display of the stage of each thread. The return is html style,
788: * for display in the console ( it can be easily parsed too ).
789: *
790: * @return The thread status display
791: */
792: public String threadStatusString() {
793: StringBuffer sb = new StringBuffer();
794: Iterator it = threads.keySet().iterator();
795: sb.append("<ul>");
796: while (it.hasNext()) {
797: sb.append("<li>");
798: ThreadWithAttributes twa = (ThreadWithAttributes) it.next();
799: sb.append(twa.getCurrentStage(this )).append(" ");
800: sb.append(twa.getParam(this ));
801: sb.append("</li>\n");
802: }
803: sb.append("</ul>");
804: return sb.toString();
805: }
806:
807: /** Return an array with the status of each thread. The status
808: * indicates the current request processing stage ( for tomcat ) or
809: * whatever the thread is doing ( if the application using TP provide
810: * this info )
811: *
812: * @return The status of all threads
813: */
814: public String[] getThreadStatus() {
815: String status[] = new String[threads.size()];
816: Iterator it = threads.keySet().iterator();
817: for (int i = 0; (i < status.length && it.hasNext()); i++) {
818: ThreadWithAttributes twa = (ThreadWithAttributes) it.next();
819: status[i] = twa.getCurrentStage(this );
820: }
821: return status;
822: }
823:
824: /** Return an array with the current "param" ( XXX better name ? )
825: * of each thread. This is typically the last request.
826: *
827: * @return The params of all threads
828: */
829: public String[] getThreadParam() {
830: String status[] = new String[threads.size()];
831: Iterator it = threads.keySet().iterator();
832: for (int i = 0; (i < status.length && it.hasNext()); i++) {
833: ThreadWithAttributes twa = (ThreadWithAttributes) it.next();
834: Object o = twa.getParam(this );
835: status[i] = (o == null) ? null : o.toString();
836: }
837: return status;
838: }
839:
840: /** Interface to allow applications to be notified when
841: * a threads are created and stopped.
842: */
843: public static interface ThreadPoolListener {
844: public void threadStart(ThreadPool tp, Thread t);
845:
846: public void threadEnd(ThreadPool tp, Thread t);
847: }
848: }
|