001: /*
002: * Copyright 2004-2005 OpenSymphony
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License"); you may not
005: * use this file except in compliance with the License. You may obtain a copy
006: * of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
012: * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
013: * License for the specific language governing permissions and limitations
014: * under the License.
015: *
016: */
017:
018: /*
019: * Previously Copyright (c) 2001-2004 James House
020: */
021: package org.quartz.simpl;
022:
023: import org.apache.commons.logging.Log;
024: import org.apache.commons.logging.LogFactory;
025: import org.quartz.SchedulerConfigException;
026: import org.quartz.spi.ThreadPool;
027:
028: import java.util.Iterator;
029: import java.util.LinkedList;
030: import java.util.List;
031:
032: /**
033: * <p>
034: * This is class is a simple implementation of a thread pool, based on the
035: * <code>{@link org.quartz.spi.ThreadPool}</code> interface.
036: * </p>
037: *
038: * <p>
039: * <CODE>Runnable</CODE> objects are sent to the pool with the <code>{@link #runInThread(Runnable)}</code>
040: * method, which blocks until a <code>Thread</code> becomes available.
041: * </p>
042: *
043: * <p>
044: * The pool has a fixed number of <code>Thread</code>s, and does not grow or
045: * shrink based on demand.
046: * </p>
047: *
048: * @author James House
049: * @author Juergen Donnerstag
050: */
051: public class SimpleThreadPool implements ThreadPool {
052:
053: /*
054: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
055: *
056: * Data members.
057: *
058: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
059: */
060:
061: private int count = -1;
062:
063: private int prio = Thread.NORM_PRIORITY;
064:
065: private boolean isShutdown = false;
066: private boolean handoffPending = false;
067:
068: private boolean inheritLoader = false;
069:
070: private boolean inheritGroup = true;
071:
072: private boolean makeThreadsDaemons = false;
073:
074: private ThreadGroup threadGroup;
075:
076: private final Object nextRunnableLock = new Object();
077:
078: private List workers;
079: private LinkedList availWorkers = new LinkedList();
080: private LinkedList busyWorkers = new LinkedList();
081:
082: private String threadNamePrefix = "SimpleThreadPoolWorker";
083:
084: private final Log log = LogFactory.getLog(getClass());
085:
086: /*
087: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
088: *
089: * Constructors.
090: *
091: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
092: */
093:
094: /**
095: * <p>
096: * Create a new (unconfigured) <code>SimpleThreadPool</code>.
097: * </p>
098: *
099: * @see #setThreadCount(int)
100: * @see #setThreadPriority(int)
101: */
102: public SimpleThreadPool() {
103: }
104:
105: /**
106: * <p>
107: * Create a new <code>SimpleThreadPool</code> with the specified number
108: * of <code>Thread</code> s that have the given priority.
109: * </p>
110: *
111: * @param threadCount
112: * the number of worker <code>Threads</code> in the pool, must
113: * be > 0.
114: * @param threadPriority
115: * the thread priority for the worker threads.
116: *
117: * @see java.lang.Thread
118: */
119: public SimpleThreadPool(int threadCount, int threadPriority) {
120: setThreadCount(threadCount);
121: setThreadPriority(threadPriority);
122: }
123:
124: /*
125: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
126: *
127: * Interface.
128: *
129: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
130: */
131:
132: public Log getLog() {
133: return log;
134: }
135:
136: public int getPoolSize() {
137: return getThreadCount();
138: }
139:
140: /**
141: * <p>
142: * Set the number of worker threads in the pool - has no effect after
143: * <code>initialize()</code> has been called.
144: * </p>
145: */
146: public void setThreadCount(int count) {
147: this .count = count;
148: }
149:
150: /**
151: * <p>
152: * Get the number of worker threads in the pool.
153: * </p>
154: */
155: public int getThreadCount() {
156: return count;
157: }
158:
159: /**
160: * <p>
161: * Set the thread priority of worker threads in the pool - has no effect
162: * after <code>initialize()</code> has been called.
163: * </p>
164: */
165: public void setThreadPriority(int prio) {
166: this .prio = prio;
167: }
168:
169: /**
170: * <p>
171: * Get the thread priority of worker threads in the pool.
172: * </p>
173: */
174: public int getThreadPriority() {
175: return prio;
176: }
177:
178: public void setThreadNamePrefix(String prfx) {
179: this .threadNamePrefix = prfx;
180: }
181:
182: public String getThreadNamePrefix() {
183: return threadNamePrefix;
184: }
185:
186: /**
187: * @return Returns the
188: * threadsInheritContextClassLoaderOfInitializingThread.
189: */
190: public boolean isThreadsInheritContextClassLoaderOfInitializingThread() {
191: return inheritLoader;
192: }
193:
194: /**
195: * @param inheritLoader
196: * The threadsInheritContextClassLoaderOfInitializingThread to
197: * set.
198: */
199: public void setThreadsInheritContextClassLoaderOfInitializingThread(
200: boolean inheritLoader) {
201: this .inheritLoader = inheritLoader;
202: }
203:
204: public boolean isThreadsInheritGroupOfInitializingThread() {
205: return inheritGroup;
206: }
207:
208: public void setThreadsInheritGroupOfInitializingThread(
209: boolean inheritGroup) {
210: this .inheritGroup = inheritGroup;
211: }
212:
213: /**
214: * @return Returns the value of makeThreadsDaemons.
215: */
216: public boolean isMakeThreadsDaemons() {
217: return makeThreadsDaemons;
218: }
219:
220: /**
221: * @param makeThreadsDaemons
222: * The value of makeThreadsDaemons to set.
223: */
224: public void setMakeThreadsDaemons(boolean makeThreadsDaemons) {
225: this .makeThreadsDaemons = makeThreadsDaemons;
226: }
227:
228: public void initialize() throws SchedulerConfigException {
229:
230: if (count <= 0) {
231: throw new SchedulerConfigException(
232: "Thread count must be > 0");
233: }
234: if (prio <= 0 || prio > 9) {
235: throw new SchedulerConfigException(
236: "Thread priority must be > 0 and <= 9");
237: }
238:
239: if (isThreadsInheritGroupOfInitializingThread()) {
240: threadGroup = Thread.currentThread().getThreadGroup();
241: } else {
242: // follow the threadGroup tree to the root thread group.
243: threadGroup = Thread.currentThread().getThreadGroup();
244: ThreadGroup parent = threadGroup;
245: while (!parent.getName().equals("main")) {
246: threadGroup = parent;
247: parent = threadGroup.getParent();
248: }
249: threadGroup = new ThreadGroup(parent, "SimpleThreadPool");
250: if (isMakeThreadsDaemons()) {
251: threadGroup.setDaemon(true);
252: }
253: }
254:
255: if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
256: getLog().info(
257: "Job execution threads will use class loader of thread: "
258: + Thread.currentThread().getName());
259: }
260:
261: // create the worker threads and start them
262: Iterator workerThreads = createWorkerThreads(count).iterator();
263: while (workerThreads.hasNext()) {
264: WorkerThread wt = (WorkerThread) workerThreads.next();
265: wt.start();
266: availWorkers.add(wt);
267: }
268: }
269:
270: protected List createWorkerThreads(int count) {
271: workers = new LinkedList();
272: for (int i = 1; i <= count; ++i) {
273: WorkerThread wt = new WorkerThread(this , threadGroup,
274: getThreadNamePrefix() + "-" + i,
275: getThreadPriority(), isMakeThreadsDaemons());
276: if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
277: wt.setContextClassLoader(Thread.currentThread()
278: .getContextClassLoader());
279: }
280: workers.add(wt);
281: }
282:
283: return workers;
284: }
285:
286: /**
287: * <p>
288: * Terminate any worker threads in this thread group.
289: * </p>
290: *
291: * <p>
292: * Jobs currently in progress will complete.
293: * </p>
294: */
295: public void shutdown() {
296: shutdown(true);
297: }
298:
299: /**
300: * <p>
301: * Terminate any worker threads in this thread group.
302: * </p>
303: *
304: * <p>
305: * Jobs currently in progress will complete.
306: * </p>
307: */
308: public void shutdown(boolean waitForJobsToComplete) {
309:
310: synchronized (nextRunnableLock) {
311: isShutdown = true;
312:
313: // signal each worker thread to shut down
314: Iterator workerThreads = workers.iterator();
315: while (workerThreads.hasNext()) {
316: WorkerThread wt = (WorkerThread) workerThreads.next();
317: wt.shutdown();
318: availWorkers.remove(wt);
319: }
320:
321: // Give waiting (wait(1000)) worker threads a chance to shut down.
322: // Active worker threads will shut down after finishing their
323: // current job.
324: nextRunnableLock.notifyAll();
325:
326: if (waitForJobsToComplete == true) {
327:
328: // wait for hand-off in runInThread to complete...
329: while (handoffPending)
330: try {
331: nextRunnableLock.wait(100);
332: } catch (Throwable t) {
333: }
334:
335: // Wait until all worker threads are shut down
336: while (busyWorkers.size() > 0) {
337: WorkerThread wt = (WorkerThread) busyWorkers
338: .getFirst();
339: try {
340: getLog().debug(
341: "Waiting for thread " + wt.getName()
342: + " to shut down");
343:
344: // note: with waiting infinite time the
345: // application may appear to 'hang'.
346: nextRunnableLock.wait(2000);
347: } catch (InterruptedException ex) {
348: }
349: }
350:
351: int activeCount = threadGroup.activeCount();
352: if (activeCount > 0) {
353: getLog()
354: .info(
355: "There are still "
356: + activeCount
357: + " worker threads active."
358: + " See javadoc runInThread(Runnable) for a possible explanation");
359: }
360:
361: getLog().debug("shutdown complete");
362: }
363: }
364: }
365:
366: /**
367: * <p>
368: * Run the given <code>Runnable</code> object in the next available
369: * <code>Thread</code>. If while waiting the thread pool is asked to
370: * shut down, the Runnable is executed immediately within a new additional
371: * thread.
372: * </p>
373: *
374: * @param runnable
375: * the <code>Runnable</code> to be added.
376: */
377: public boolean runInThread(Runnable runnable) {
378: if (runnable == null) {
379: return false;
380: }
381:
382: synchronized (nextRunnableLock) {
383:
384: handoffPending = true;
385:
386: // Wait until a worker thread is available
387: while ((availWorkers.size() < 1) && !isShutdown) {
388: try {
389: nextRunnableLock.wait(500);
390: } catch (InterruptedException ignore) {
391: }
392: }
393:
394: if (!isShutdown) {
395: WorkerThread wt = (WorkerThread) availWorkers
396: .removeFirst();
397: busyWorkers.add(wt);
398: wt.run(runnable);
399: } else {
400: // If the thread pool is going down, execute the Runnable
401: // within a new additional worker thread (no thread from the pool).
402: WorkerThread wt = new WorkerThread(this , threadGroup,
403: "WorkerThread-LastJob", prio,
404: isMakeThreadsDaemons(), runnable);
405: busyWorkers.add(wt);
406: workers.add(wt);
407: wt.start();
408: }
409: nextRunnableLock.notifyAll();
410: handoffPending = false;
411: }
412:
413: return true;
414: }
415:
416: public int blockForAvailableThreads() {
417: synchronized (nextRunnableLock) {
418:
419: while ((availWorkers.size() < 1 || handoffPending)
420: && !isShutdown) {
421: try {
422: nextRunnableLock.wait(500);
423: } catch (InterruptedException ignore) {
424: }
425: }
426:
427: return availWorkers.size();
428: }
429: }
430:
431: protected void makeAvailable(WorkerThread wt) {
432: synchronized (nextRunnableLock) {
433: if (!isShutdown)
434: availWorkers.add(wt);
435: busyWorkers.remove(wt);
436: nextRunnableLock.notifyAll();
437: }
438: }
439:
440: /*
441: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
442: *
443: * WorkerThread Class.
444: *
445: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
446: */
447:
448: /**
449: * <p>
450: * A Worker loops, waiting to execute tasks.
451: * </p>
452: */
453: class WorkerThread extends Thread {
454:
455: // A flag that signals the WorkerThread to terminate.
456: private boolean run = true;
457:
458: private SimpleThreadPool tp;
459:
460: private Runnable runnable = null;
461:
462: /**
463: * <p>
464: * Create a worker thread and start it. Waiting for the next Runnable,
465: * executing it, and waiting for the next Runnable, until the shutdown
466: * flag is set.
467: * </p>
468: */
469: WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup,
470: String name, int prio, boolean isDaemon) {
471:
472: this (tp, threadGroup, name, prio, isDaemon, null);
473: }
474:
475: /**
476: * <p>
477: * Create a worker thread, start it, execute the runnable and terminate
478: * the thread (one time execution).
479: * </p>
480: */
481: WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup,
482: String name, int prio, boolean isDaemon,
483: Runnable runnable) {
484:
485: super (threadGroup, name);
486: this .tp = tp;
487: this .runnable = runnable;
488: setPriority(prio);
489: setDaemon(isDaemon);
490: }
491:
492: /**
493: * <p>
494: * Signal the thread that it should terminate.
495: * </p>
496: */
497: void shutdown() {
498: run = false;
499:
500: // Javadoc mentions that it interrupts blocked I/O operations as
501: // well. Hence the job will most likely fail. I think we should
502: // shut the work thread gracefully, by letting the job finish
503: // uninterrupted. See SimpleThreadPool.shutdown()
504: //interrupt();
505: }
506:
507: public void run(Runnable newRunnable) {
508: synchronized (this ) {
509: if (runnable != null)
510: throw new IllegalStateException(
511: "Already running a Runnable!");
512:
513: runnable = newRunnable;
514: this .notifyAll();
515: }
516: }
517:
518: /**
519: * <p>
520: * Loop, executing targets as they are received.
521: * </p>
522: */
523: public void run() {
524: boolean runOnce = (runnable != null);
525:
526: boolean ran = false;
527: while (run) {
528: try {
529: synchronized (this ) {
530: while (runnable == null && run) {
531: this .wait(500);
532: }
533: }
534:
535: if (runnable != null) {
536: ran = true;
537: runnable.run();
538: }
539: } catch (InterruptedException unblock) {
540: // do nothing (loop will terminate if shutdown() was called
541: try {
542: getLog().error(
543: "worker threat got 'interrupt'ed.",
544: unblock);
545: } catch (Exception e) {
546: // ignore to help with a tomcat glitch
547: }
548: } catch (Exception exceptionInRunnable) {
549: try {
550: getLog().error(
551: "Error while executing the Runnable: ",
552: exceptionInRunnable);
553: } catch (Exception e) {
554: // ignore to help with a tomcat glitch
555: }
556: } finally {
557: runnable = null;
558: // repair the thread in case the runnable mucked it up...
559: if (getPriority() != tp.getThreadPriority())
560: setPriority(tp.getThreadPriority());
561:
562: if (runOnce) {
563: run = false;
564: } else if (ran) {
565: ran = false;
566: makeAvailable(this );
567: }
568:
569: }
570: }
571:
572: //if (log.isDebugEnabled())
573: try {
574: getLog().debug("WorkerThread is shutting down");
575: } catch (Exception e) {
576: // ignore to help with a tomcat glitch
577: }
578: }
579: }
580: }
|