001: package dalma.helpers;
002:
003: import dalma.Engine;
004: import dalma.Executor;
005:
006: import java.util.ArrayList;
007: import java.util.Collection;
008: import java.util.LinkedList;
009: import java.util.List;
010:
011: /**
012: * {@link Executor} implemented as a thread pool.
013: *
014: * @author Kohsuke Kawaguchi
015: */
016: public class ThreadPoolExecutor implements Executor {
017:
018: /**
019: * Queue of conversations that can be run.
020: * Needs to be synchronized before use.
021: */
022: private final List<Runnable> jobQueue = new LinkedList<Runnable>();
023:
024: private final Collection<WorkerThread> threads = new ArrayList<WorkerThread>();
025:
026: /**
027: * This object signals when all the threads terminate.
028: */
029: private final Object terminationSignal = new Object();
030:
031: /**
032: * True to make worker threads daemon thread.
033: */
034: private final boolean daemon;
035:
036: /**
037: * Set to true if we start stopping.
038: *
039: * All the worker threads are interrupted, but applications
040: * often eat interruption, so it's better to have this too, to be safe.
041: */
042: private boolean isStopping;
043:
044: /**
045: * Creates a new thread pool executor.
046: *
047: * @param nThreads
048: * number of worker threads to create.
049: * @param daemon
050: * true to make worker threads daemon threads.
051: * daemon threads allows the VM to shut down as soon as
052: * the application thread exits. Otherwise, you have to
053: * call {@link Engine#stop()} before your main thread exits,
054: * or else the JVM will run forever.
055: */
056: public ThreadPoolExecutor(int nThreads, boolean daemon) {
057: this .daemon = daemon;
058: synchronized (threads) {
059: for (int i = 0; i < nThreads; i++) {
060: WorkerThread thread = new WorkerThread();
061: thread.start();
062: threads.add(thread);
063: }
064: }
065: }
066:
067: public ThreadPoolExecutor(int nThreads) {
068: this (nThreads, false);
069: }
070:
071: protected void finalize() throws Throwable {
072: super .finalize();
073: sendKillSignal();
074: // let them die, but don't wait
075: }
076:
077: private void sendKillSignal() {
078: synchronized (threads) {
079: isStopping = true;
080: for (WorkerThread t : threads)
081: t.interrupt();
082: }
083: }
084:
085: public void execute(Runnable command) {
086: synchronized (jobQueue) {
087: jobQueue.add(command);
088: jobQueue.notify();
089: }
090: }
091:
092: public void stop(long timeout) throws InterruptedException {
093: synchronized (terminationSignal) {
094: sendKillSignal();
095: if (timeout == -1)
096: terminationSignal.wait();
097: else
098: terminationSignal.wait(timeout);
099: }
100: }
101:
102: private final class WorkerThread extends Thread {
103: public WorkerThread() {
104: super ("Dalma engine worker thread");
105: setDaemon(daemon);
106: }
107:
108: public void run() {
109: try {
110: while (!isStopping) {
111: Runnable job;
112: synchronized (jobQueue) {
113: while (jobQueue.isEmpty())
114: jobQueue.wait();
115: job = jobQueue.remove(0);
116: }
117:
118: job.run();
119: }
120: } catch (InterruptedException e) {
121: // treat this as a signal to die
122: } finally {
123: synchronized (threads) {
124: threads.remove(this);
125: if (threads.isEmpty()) {
126: synchronized (terminationSignal) {
127: terminationSignal.notifyAll();
128: }
129: }
130: }
131: }
132: }
133: }
134: }
|