001: /*
002: * Copyright (c) 1998 - 2005 Versant Corporation
003: * All rights reserved. This program and the accompanying materials
004: * are made available under the terms of the Eclipse Public License v1.0
005: * which accompanies this distribution, and is available at
006: * http://www.eclipse.org/legal/epl-v10.html
007: *
008: * Contributors:
009: * Versant Corporation - initial API and implementation
010: */
011: package com.versant.core.util;
012:
013: import java.util.LinkedList;
014: import java.util.Iterator;
015: import java.util.HashSet;
016:
017: /**
018: * Simple pool of Threads.
019: */
020: public class ThreadPool {
021:
022: private String name;
023: private HashSet active = new HashSet();
024: private LinkedList idle = new LinkedList();
025: private int idleCount;
026: private int maxActive = 10;
027: private int maxIdle = 3;
028: private int lastThreadId;
029: private boolean closed;
030:
031: public ThreadPool(String name) {
032: this .name = name;
033: }
034:
035: public int getMaxActive() {
036: return maxActive;
037: }
038:
039: public void setMaxActive(int maxActive) {
040: this .maxActive = maxActive;
041: }
042:
043: public int getMaxIdle() {
044: return maxIdle;
045: }
046:
047: public void setMaxIdle(int maxIdle) {
048: this .maxIdle = maxIdle;
049: }
050:
051: public synchronized int getActiveCount() {
052: return active.size();
053: }
054:
055: public synchronized int getIdleCount() {
056: return idleCount;
057: }
058:
059: /**
060: * Close the pool, stopping all threads. This does not wait for the
061: * threads to actually stop before returning. This is a NOP if the
062: * pool has already been closed.
063: */
064: public synchronized void close() {
065: if (closed) {
066: return;
067: }
068: closed = true;
069: for (Iterator i = idle.iterator(); i.hasNext();) {
070: Worker w = (Worker) i.next();
071: w.terminate();
072: }
073: idle = null;
074: idleCount = 0;
075: for (Iterator i = active.iterator(); i.hasNext();) {
076: Worker w = (Worker) i.next();
077: w.terminate();
078: }
079: active = null;
080: }
081:
082: /**
083: * Executed runnable using a Thread from the pool. This will block for
084: * timeoutMs and forever if this is 0. Returns true if the task is
085: * being executed (i.e. a Thread was available) or false if not (i.e.
086: * pool full).
087: */
088: public synchronized boolean execute(Runnable runnable, int timeoutMs) {
089: if (closed) {
090: throw new IllegalStateException("Pool has been closed");
091: }
092: Worker t;
093: if (idleCount == 0) {
094: for (; isFull();) {
095: try {
096: wait(timeoutMs);
097: if (isFull()) {
098: return false;
099: }
100: } catch (InterruptedException e) {
101: // ignore
102: }
103: }
104: t = new Worker();
105: } else {
106: t = (Worker) idle.removeFirst();
107: --idleCount;
108: }
109: active.add(t);
110: t.execute(runnable);
111: return true;
112: }
113:
114: protected boolean isFull() {
115: return active.size() >= maxActive;
116: }
117:
118: private synchronized void finishedWork(Worker t) {
119: if (!closed) {
120: active.remove(t);
121: if (idleCount >= maxIdle) {
122: t.terminate();
123: } else {
124: idle.addLast(t);
125: ++idleCount;
126: }
127: }
128: }
129:
130: private class Worker extends Thread {
131:
132: private boolean stopFlag;
133: private Runnable runnable;
134:
135: public Worker() {
136: super (name + " " + ++lastThreadId);
137: setDaemon(true);
138: }
139:
140: /**
141: * Executed runnable.
142: */
143: public void execute(Runnable runnable) {
144: this .runnable = runnable;
145: if (!isAlive()) {
146: start();
147: } else {
148: synchronized (this ) {
149: notify();
150: }
151: }
152: }
153:
154: /**
155: * Stop this thread as soon as possible.
156: */
157: public void terminate() {
158: stopFlag = true;
159: interrupt();
160: }
161:
162: public void run() {
163: for (; !stopFlag;) {
164: try {
165: runnable.run();
166: } catch (Throwable e) {
167: if (e instanceof ThreadDeath) {
168: throw (ThreadDeath) e;
169: }
170: }
171: runnable = null;
172: finishedWork(this );
173: if (stopFlag)
174: break;
175: synchronized (this ) {
176: try {
177: wait();
178: } catch (InterruptedException e) {
179: // ignore
180: }
181: }
182: }
183: }
184:
185: }
186:
187: }
|