001: /* Copyright 2001 The JA-SIG Collaborative. All rights reserved.
002: * See license distributed with this file and
003: * available online at http://www.uportal.org/license.html
004: */
005:
006: package org.jasig.portal.utils;
007:
008: import java.util.ArrayList;
009: import java.util.Collections;
010: import java.util.EmptyStackException;
011: import java.util.List;
012:
013: import org.apache.commons.logging.Log;
014: import org.apache.commons.logging.LogFactory;
015:
016: /**
017: * A thread pool implementation with a few extra kinks,
018: * such as ThreadPoolReceipt.
019: * @author Peter Kharchenko {@link <a href="mailto:pkharchenko@interactivebusiness.com">pkharchenko@interactivebusiness.com</a>}
020: * @version $Revision: 36690 $
021: */
022: public class ThreadPool extends ThreadGroup {
023:
024: private static final Log log = LogFactory.getLog(ThreadPool.class);
025:
026: BlockingStack idleWorkers;
027: List workers;
028: ResourceLimits limits;
029:
030: public ThreadPool(String name, ResourceLimits rl) {
031: super (name);
032: if (rl == null) {
033: // use default resource limits
034: limits = new ResourceLimits();
035: rl.maxSize = 10;
036: rl.optimalSize = 3;
037: } else {
038: limits = rl;
039: }
040:
041: idleWorkers = new BlockingStack(); // doesn't make sense to put an upper limit on this stack
042: // workers = new Vector(limits.optimalSize);
043: workers = Collections.synchronizedList(new ArrayList(
044: limits.optimalSize));
045:
046: // initialize some workers
047: for (int i = 0; i < limits.optimalSize; i++) {
048: ThreadPoolWorker w = new ThreadPoolWorker(this );
049: workers.add(w);
050: w.start();
051: }
052: }
053:
054: public ThreadPoolReceipt execute(Runnable target)
055: throws InterruptedException {
056: // try growing workers if the stack is empty
057: if (idleWorkers.empty())
058: addWorker();
059: // block on waiting for the next available worker
060: // ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.pop();
061: // start the process and return a receipt
062: return (((ThreadPoolWorker) idleWorkers.pop()).process(target));
063: }
064:
065: /**
066: * Adjust the size of the worker pool.
067: * Adjustment is done by growing/shrinking the idle worker pool.
068: * Active workers will not be affected by this
069: */
070: protected synchronized void adjustSize(int newSize) {
071: // see if an adjustment can be done
072: if (newSize <= limits.maxSize) {
073: synchronized (workers) {
074: // determine the adjustment
075: int adjustment = newSize - workers.size();
076: // System.out.println("ThreadPool:adjustSize() : requested="+newSize+", current="+workers.size()+", adj="+adjustment);
077: if (adjustment < 0) {
078: // prune some idle workers
079: while (adjustment++ < 0) {
080: if (!idleWorkers.empty()) {
081: try {
082: releaseWorker((ThreadPoolWorker) idleWorkers
083: .nonBlockingPop());
084: } catch (EmptyStackException ese) {
085: adjustment--;
086: }
087: } else {
088: // signal some active workers to go
089: for (int i = 0; i < workers.size()
090: && adjustment < 0; i++) {
091: ThreadPoolWorker w = (ThreadPoolWorker) workers
092: .get(i);
093: if (releaseWorker(w))
094: adjustment++;
095: }
096: break;
097: }
098: }
099: }
100: if (adjustment > 0) {
101: // add some idle workers
102: for (int i = 0; i < adjustment; i++)
103: addNewWorker();
104: }
105: }
106: }
107: }
108:
109: /**
110: * Grow the size of the worker pool.
111: * This will "attempt" to add another worker, but
112: * unlike addNewWorker(), the resource limits are checked.
113: */
114: protected synchronized void addWorker() {
115: adjustSize(workers.size() + 1);
116: }
117:
118: /**
119: * Signals the worker that it try to interrupt
120: * the current job and quit.
121: */
122: protected void stopWorker(ThreadPoolWorker worker) {
123: // System.out.println("ThreadPool::stopWorker()");
124: worker.stopRequest();
125: }
126:
127: /**
128: * Signals the worker that it should quite as soon
129: * as a job (if any) is complete
130: * @return false if the worker has already been released
131: */
132: protected boolean releaseWorker(ThreadPoolWorker worker) {
133: // System.out.println("ThreadPool::releaseWorker()");
134: return worker.completeRequest();
135: }
136:
137: /**
138: * Adds a new worker. Doesn't check anything, just adds.
139: */
140: protected void addNewWorker() {
141: // System.out.println("ThreadPool::addNewWorker()");
142: ThreadPoolWorker w = new ThreadPoolWorker(this );
143: workers.add(w);
144: w.start();
145: }
146:
147: /**
148: * Clears all of the idle workers
149: */
150: public void clearIdle() {
151: try {
152: Object[] idle = new Object[idleWorkers.size()
153: - idleWorkers.getMinSize()];
154: int index = 0;
155: while (index < idle.length) {
156: idle[index++] = idleWorkers.pop();
157: }
158: for (int i = 0; i < idle.length; i++) {
159: ((ThreadPoolWorker) idle[i]).stopRequest();
160: }
161: } catch (InterruptedException x) {
162: Thread.currentThread().interrupt(); // re-assert
163: }
164: }
165:
166: /**
167: * Clears all of the workers.
168: */
169: public void clear() {
170: // Stop the idle one's first since that won't interfere with anything
171: // productive.
172: clearIdle();
173:
174: // give the idle workers a quick chance to die
175: try {
176: Thread.sleep(250);
177: } catch (InterruptedException x) {
178: }
179:
180: // Step through the list of ALL workers that are still alive.
181: for (int i = 0; i < workers.size(); i++) {
182: if (((ThreadPoolWorker) workers.get(i)).isAlive()) {
183: ((ThreadPoolWorker) workers.get(i)).stopRequest();
184: }
185: }
186: }
187:
188: /**
189: * Handle the case when some worker crashes
190: */
191: public void uncaughtException(Thread t, Throwable e) {
192: log.error("Registered an uncaught exception by thread "
193: + t.getName(), e);
194: if (t instanceof ThreadPoolWorker
195: && !(e instanceof ThreadDeath)) {
196: ThreadPoolWorker w = (ThreadPoolWorker) t;
197: // clean up currentReceipt if the thread didn't do it
198: try {
199: if (w.currentReceipt != null) {
200: w.currentReceipt.updateStatus(null, true, false, e);
201: }
202: } catch (Exception bad) {
203: }
204: ;
205:
206: notifyWorkerRestart(w);
207: }
208: }
209:
210: /**
211: * Notifies the pool that a certain worker is done and
212: * wants to have a replacement started.
213: */
214: protected void notifyWorkerRestart(ThreadPoolWorker pw) {
215: notifyWorkerFinished(pw);
216: this .addWorker();
217: }
218:
219: protected void killWorkerThread(ThreadPoolWorker pw) {
220: // check the receipt
221: try {
222: if (pw.currentReceipt != null) {
223: pw.currentReceipt.updateStatus(null, true, false, null);
224: }
225: } catch (Exception bad) {
226: }
227: ;
228:
229: notifyWorkerFinished(pw);
230: // hopefully all of the locks are released
231: pw.interrupt();
232: // pw.stop();
233: this .addWorker();
234: // System.out.println("Removed and stopped worker "+pw.getName());
235: }
236:
237: /**
238: * Notifies the pool that a certain worker has finished.
239: */
240: protected void notifyWorkerFinished(ThreadPoolWorker pw) {
241: // clean up
242: // System.out.println("ThreadPool::notifyWorkerFinished().");
243: idleWorkers.remove(pw);
244: synchronized (workers) {
245: int index = workers.indexOf(pw);
246: if (index != -1)
247: workers.remove(index);
248: }
249: }
250: }
|