001: // ========================================================================
002: // Copyright 2004-2005 Mort Bay Consulting Pty. Ltd.
003: // ------------------------------------------------------------------------
004: // Licensed under the Apache License, Version 2.0 (the "License");
005: // you may not use this file except in compliance with the License.
006: // You may obtain a copy of the License at
007: // http://www.apache.org/licenses/LICENSE-2.0
008: // Unless required by applicable law or agreed to in writing, software
009: // distributed under the License is distributed on an "AS IS" BASIS,
010: // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011: // See the License for the specific language governing permissions and
012: // limitations under the License.
013: // ========================================================================
014:
015: package org.mortbay.thread;
016:
017: import java.io.Serializable;
018: import java.util.ArrayList;
019: import java.util.HashSet;
020: import java.util.Iterator;
021: import java.util.List;
022: import java.util.Set;
023:
024: import org.mortbay.component.AbstractLifeCycle;
025: import org.mortbay.log.Log;
026:
027: /* ------------------------------------------------------------ */
028: /** A pool of threads.
029: * <p>
030: * Avoids the expense of thread creation by pooling threads after
031: * their run methods exit for reuse.
032: * <p>
033: * If the maximum pool size is reached, jobs wait for a free thread.
034: * By default there is no maximum pool size. Idle threads timeout
035: * and terminate until the minimum number of threads are running.
036: * <p>
037: * @author Greg Wilkins <gregw@mortbay.com>
038: * @author Juancarlo Anez <juancarlo@modelistica.com>
039: */
040: public class BoundedThreadPool extends AbstractLifeCycle implements
041: Serializable, ThreadPool {
042: private static int __id;
043: private transient List _blocked;
044:
045: private boolean _daemon;
046:
047: private transient int _id;
048:
049: private final String _lock = "LOCK";
050: private final String _joinLock = "JOIN";
051: private int _maxIdleTimeMs = 60000;
052: private int _maxThreads = 255;
053: private int _minThreads = 1;
054: private int _lowThreads = 25;
055: private String _name;
056: int _priority = Thread.NORM_PRIORITY;
057: private Set _threads;
058: private List _idle;
059: private boolean _warned = false;
060: private long _lastShrink;
061:
062: /* ------------------------------------------------------------------- */
063: /* Construct
064: */
065: public BoundedThreadPool() {
066: _name = "btpool" + __id++;
067: }
068:
069: /* ------------------------------------------------------------ */
070: /** Get the number of idle threads in the pool.
071: * @see #getThreads
072: * @return Number of threads
073: */
074: public int getIdleThreads() {
075: return _idle == null ? 0 : _idle.size();
076: }
077:
078: /* ------------------------------------------------------------ */
079: /**
080: * @return low resource threads threshhold
081: */
082: public int getLowThreads() {
083: return _lowThreads;
084: }
085:
086: /* ------------------------------------------------------------ */
087: /**
088: * @param lowThreads low resource threads threshhold
089: */
090: public void setLowThreads(int lowThreads) {
091: _lowThreads = lowThreads;
092: }
093:
094: /* ------------------------------------------------------------ */
095: public boolean isLowOnThreads() {
096: return _maxThreads - getThreads() + getIdleThreads() < _lowThreads;
097: }
098:
099: /* ------------------------------------------------------------ */
100: /** Get the maximum thread idle time.
101: * Delegated to the named or anonymous Pool.
102: * @see #setMaxIdleTimeMs
103: * @return Max idle time in ms.
104: */
105: public int getMaxIdleTimeMs() {
106: return _maxIdleTimeMs;
107: }
108:
109: /* ------------------------------------------------------------ */
110: /** Set the maximum number of threads.
111: * Delegated to the named or anonymous Pool.
112: * @see #setMaxThreads
113: * @return maximum number of threads.
114: */
115: public int getMaxThreads() {
116: return _maxThreads;
117: }
118:
119: /* ------------------------------------------------------------ */
120: /** Get the minimum number of threads.
121: * Delegated to the named or anonymous Pool.
122: * @see #setMinThreads
123: * @return minimum number of threads.
124: */
125: public int getMinThreads() {
126: return _minThreads;
127: }
128:
129: /* ------------------------------------------------------------ */
130: /**
131: * @return The name of the BoundedThreadPool.
132: */
133: public String getName() {
134: return _name;
135: }
136:
137: /* ------------------------------------------------------------ */
138: /** Get the number of threads in the pool.
139: * @see #getIdleThreads
140: * @return Number of threads
141: */
142: public int getThreads() {
143: return _threads.size();
144: }
145:
146: /* ------------------------------------------------------------ */
147: /** Get the priority of the pool threads.
148: * @return the priority of the pool threads.
149: */
150: public int getThreadsPriority() {
151: return _priority;
152: }
153:
154: /* ------------------------------------------------------------ */
155: /**
156: * Delegated to the named or anonymous Pool.
157: */
158: public boolean isDaemon() {
159: return _daemon;
160: }
161:
162: /* ------------------------------------------------------------ */
163: public void join() throws InterruptedException {
164: synchronized (_joinLock) {
165: while (isRunning())
166: _joinLock.wait(getMaxIdleTimeMs());
167: }
168:
169: // TODO remove this semi busy loop!
170: while (isStopping())
171: Thread.sleep(10);
172: }
173:
174: /* ------------------------------------------------------------ */
175: protected void newThread() {
176: synchronized (_lock) {
177: Thread thread = new PoolThread();
178: _threads.add(thread);
179: _idle.add(thread);
180: thread.setName(_name + "-" + _id++);
181: thread.start();
182: }
183: }
184:
185: /* ------------------------------------------------------------ */
186: /** Run job.
187: * @return true if the job was given to a thread, false if no thread was
188: * available.
189: */
190: public boolean dispatch(Runnable job) {
191: boolean queued = false;
192: synchronized (_lock) {
193: if (!isRunning())
194: return false;
195:
196: int blockMs = _maxIdleTimeMs;
197:
198: // Wait for an idle thread!
199: while (_idle.size() == 0) {
200: // Are we at max size?
201: if (_threads.size() < _maxThreads) {
202: // No
203: newThread();
204: break;
205: } else if (!_warned) {
206: _warned = true;
207: Log.debug("Out of threads for {}", this );
208: }
209:
210: // pool is full
211: if (blockMs < 0)
212: return false;
213:
214: // Block waiting
215: try {
216: _blocked.add(Thread.currentThread());
217: _lock.wait(blockMs);
218: blockMs = -1;
219: } catch (InterruptedException ie) {
220: } finally {
221: _blocked.remove(Thread.currentThread());
222: }
223: }
224:
225: PoolThread thread = (PoolThread) _idle
226: .remove(_idle.size() - 1);
227: thread.dispatch(job);
228: queued = true;
229: }
230:
231: if (_idle.size() == 0)
232: Thread.yield();
233: return queued;
234: }
235:
236: /* ------------------------------------------------------------ */
237: /**
238: * Delegated to the named or anonymous Pool.
239: */
240: public void setDaemon(boolean daemon) {
241: _daemon = daemon;
242: }
243:
244: /* ------------------------------------------------------------ */
245: /** Set the maximum thread idle time.
246: * Threads that are idle for longer than this period may be
247: * stopped.
248: * Delegated to the named or anonymous Pool.
249: * @see #getMaxIdleTimeMs
250: * @param maxIdleTimeMs Max idle time in ms.
251: */
252: public void setMaxIdleTimeMs(int maxIdleTimeMs) {
253: _maxIdleTimeMs = maxIdleTimeMs;
254: }
255:
256: /* ------------------------------------------------------------ */
257: /** Set the maximum number of threads.
258: * Delegated to the named or anonymous Pool.
259: * @see #getMaxThreads
260: * @param maxThreads maximum number of threads.
261: */
262: public void setMaxThreads(int maxThreads) {
263: if (isStarted() && maxThreads < _minThreads)
264: throw new IllegalArgumentException("!minThreads<maxThreads");
265: _maxThreads = maxThreads;
266: }
267:
268: /* ------------------------------------------------------------ */
269: /** Set the minimum number of threads.
270: * Delegated to the named or anonymous Pool.
271: * @see #getMinThreads
272: * @param minThreads minimum number of threads
273: */
274: public void setMinThreads(int minThreads) {
275: if (isStarted()
276: && (minThreads <= 0 || minThreads > _maxThreads))
277: throw new IllegalArgumentException(
278: "!0<=minThreads<maxThreads");
279: _minThreads = minThreads;
280: synchronized (_lock) {
281: while (isStarted() && _threads.size() < _minThreads) {
282: newThread();
283: }
284: }
285: }
286:
287: /* ------------------------------------------------------------ */
288: /**
289: * @param name Name of the BoundedThreadPool to use when naming Threads.
290: */
291: public void setName(String name) {
292: _name = name;
293: }
294:
295: /* ------------------------------------------------------------ */
296: /** Set the priority of the pool threads.
297: * @param priority the new thread priority.
298: */
299: public void setThreadsPriority(int priority) {
300: _priority = priority;
301: }
302:
303: /* ------------------------------------------------------------ */
304: /* Start the BoundedThreadPool.
305: * Construct the minimum number of threads.
306: */
307: protected void doStart() throws Exception {
308: if (_maxThreads < _minThreads || _minThreads <= 0)
309: throw new IllegalArgumentException(
310: "!0<minThreads<maxThreads");
311:
312: _threads = new HashSet();
313: _idle = new ArrayList();
314: _blocked = new ArrayList();
315:
316: for (int i = 0; i < _minThreads; i++) {
317: newThread();
318: }
319: }
320:
321: /* ------------------------------------------------------------ */
322: /** Stop the BoundedThreadPool.
323: * New jobs are no longer accepted,idle threads are interrupted
324: * and stopJob is called on active threads.
325: * The method then waits
326: * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
327: * stop, at which time killJob is called.
328: */
329: protected void doStop() throws Exception {
330: super .doStop();
331:
332: for (int i = 0; i < 100; i++) {
333: synchronized (_lock) {
334: Iterator iter = _threads.iterator();
335: while (iter.hasNext())
336: ((Thread) iter.next()).interrupt();
337: }
338:
339: Thread.yield();
340: if (_threads.size() == 0)
341: break;
342:
343: try {
344: Thread.sleep(i * 100);
345: } catch (InterruptedException e) {
346: }
347: }
348:
349: // TODO perhaps force stops
350: if (_threads.size() > 0)
351: Log.warn(_threads.size() + " threads could not be stopped");
352:
353: synchronized (_joinLock) {
354: _joinLock.notifyAll();
355: }
356: }
357:
358: /* ------------------------------------------------------------ */
359: /** Stop a Job.
360: * This method is called by the Pool if a job needs to be stopped.
361: * The default implementation does nothing and should be extended by a
362: * derived thread pool class if special action is required.
363: * @param thread The thread allocated to the job, or null if no thread allocated.
364: * @param job The job object passed to run.
365: */
366: protected void stopJob(Thread thread, Object job) {
367: thread.interrupt();
368: }
369:
370: /* ------------------------------------------------------------ */
371: /** Pool Thread class.
372: * The PoolThread allows the threads job to be
373: * retrieved and active status to be indicated.
374: */
375: public class PoolThread extends Thread {
376: Runnable _job = null;
377:
378: PoolThread() {
379: setDaemon(_daemon);
380: setPriority(_priority);
381: }
382:
383: void dispatch(Runnable job) {
384: synchronized (this ) {
385: if (_job != null || job == null)
386: throw new IllegalStateException();
387: _job = job;
388: this .notify();
389: }
390: }
391:
392: /* ------------------------------------------------------------ */
393: /** BoundedThreadPool run.
394: * Loop getting jobs and handling them until idle or stopped.
395: */
396: public void run() {
397:
398: Runnable job = null;
399: try {
400: while (isRunning()) {
401: try {
402: synchronized (_lock) {
403: if (job != null) {
404: _idle.add(this );
405: if (_idle.size() >= _minThreads)
406: _warned = false;
407: if (_blocked.size() > 0)
408: ((Thread) _blocked.get(0))
409: .interrupt();
410: }
411: }
412:
413: synchronized (this ) {
414: job = null;
415: if (_job == null)
416: this .wait(getMaxIdleTimeMs());
417: job = _job;
418: _job = null;
419: }
420:
421: if (isRunning() && job != null)
422: job.run();
423:
424: synchronized (_lock) {
425: if (job == null) {
426: long now = System.currentTimeMillis();
427: if (_threads.size() > _maxThreads
428: || // we have too many threads OR
429: _idle.size() - _blocked.size() > 0
430: && // are there idle threads?
431: _threads.size() > _minThreads && // are there more than min threads?
432: (now - _lastShrink) > getMaxIdleTimeMs()) // have we shrunk recently?
433: {
434: _lastShrink = now;
435: _idle.remove(this );
436: return;
437: }
438: }
439: }
440: } catch (InterruptedException e) {
441: Log.ignore(e);
442: return;
443: }
444: }
445: } finally {
446: synchronized (_lock) {
447: _idle.remove(this );
448: _threads.remove(this );
449: }
450:
451: synchronized (this ) {
452: job = null;
453: job = _job;
454: }
455:
456: // catch all!
457: if (job != null && isRunning())
458: BoundedThreadPool.this.dispatch(job);
459: }
460: }
461: }
462:
463: }
|