001: /*
002: * @(#)ThreadPool.java 1.2 04/12/06
003: *
004: * Copyright (c) 2002,2003 Sun Microsystems, Inc. All Rights Reserved.
005: *
006: * See the file "LICENSE.txt" for information on usage and redistribution
007: * of this file, and for a DISCLAIMER OF ALL WARRANTIES.
008: */
009: package org.pnuts.multithread;
010:
011: import java.util.Vector;
012: import java.util.Stack;
013: import java.util.Enumeration;
014:
015: /**
016: * A simple, general purpose thread pool implementation
017: */
018: public class ThreadPool {
019: private Queue tasks = new Queue();
020: private Vector workers = new Vector();
021: private Stack idleWorkers = new Stack();
022: private int maxThreads;
023: private int minThreads;
024: private long timeout;
025: private int taskRequests = 0;
026: private int priority = Thread.NORM_PRIORITY;
027: private ThreadGroup threadGroup;
028: private volatile boolean terminated = false;
029:
030: public ThreadPool(int maxThreads) {
031: this (maxThreads, 1, -1);
032: }
033:
034: public ThreadPool(int maxThreads, int minThreads, long timeout) {
035: this .maxThreads = maxThreads;
036: this .minThreads = minThreads;
037: this .timeout = timeout;
038: this .threadGroup = new WorkerThreadGroup();
039: }
040:
041: public void addTask(Runnable task) {
042: tasks.enqueue(task);
043: if (idleWorkers.isEmpty()) {
044: int n_workers = workers.size();
045: if (taskRequests == 0 && n_workers < maxThreads) {
046: Thread th = new WorkerThread();
047: synchronized (this ) {
048: workers.addElement(th);
049: }
050: th.setPriority(priority);
051: th.setDaemon(true);
052: th.start();
053: }
054: } else {
055: WorkerThread th = (WorkerThread) idleWorkers.pop();
056: synchronized (th) {
057: th.notify();
058: }
059: }
060: }
061:
062: public synchronized void shutdown() {
063: terminated = true;
064: threadGroup.interrupt();
065: }
066:
067: synchronized void removeWorker(WorkerThread th) {
068: workers.remove(th);
069: }
070:
071: Runnable getTask() {
072: try {
073: synchronized (this ) {
074: taskRequests++;
075: }
076: return (Runnable) tasks.dequeue(timeout);
077: } catch (InterruptedException e) {
078: return null;
079: } finally {
080: synchronized (this ) {
081: taskRequests--;
082: }
083: }
084: }
085:
086: public synchronized void setPriority(int prio) {
087: this .priority = prio;
088: for (Enumeration e = workers.elements(); e.hasMoreElements();) {
089: Thread th = (Thread) e.nextElement();
090: th.setPriority(prio);
091: }
092: }
093:
094: static class WorkerThreadGroup extends ThreadGroup {
095:
096: public WorkerThreadGroup() {
097: super ("Worker");
098: }
099:
100: public void uncaughtException(Thread t, Throwable e) {
101: if (!(e instanceof ThreadDeath)) {
102: System.err.println("Uncaught Exception: " + e + " by "
103: + t);
104: }
105: }
106: }
107:
108: static int worker_id = 0;
109:
110: class WorkerThread extends Thread {
111: public WorkerThread() {
112: super (threadGroup, "Worker-" + (worker_id++));
113: }
114:
115: public void run() {
116: while (true) {
117: if (tasks.isEmpty() && terminated) {
118: return;
119: }
120: Runnable task = getTask();
121: if (task == null) {
122: if (terminated) {
123: return;
124: }
125: if (!idleWorkers.isEmpty()
126: && workers.size() > minThreads) {
127: removeWorker(this );
128: return;
129: } else {
130: idleWorkers.push(this );
131: synchronized (this ) {
132: try {
133: wait();
134: } catch (InterruptedException e) {
135: idleWorkers.remove(this);
136: return;
137: }
138: }
139: }
140: } else {
141: task.run();
142: }
143: }
144: }
145: }
146: }
|