001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.internal;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.main.*;
031: import java.util.*;
032:
033: /**
034: * ThreadPool is a generic class which provides a thread pool.
035: *
036: * @author Matt Welsh
037: */
038:
039: public class ThreadPool implements ProfilableIF {
040:
041: private static final boolean DEBUG = false;
042:
043: private StageWrapperIF stage;
044: private ManagerIF mgr;
045: private String poolname;
046: private ThreadGroup pooltg;
047: private Runnable runnable;
048: private Vector threads, stoppedThreads;
049:
050: int minThreads, maxThreads;
051:
052: private int maxAggregation;
053: private int blockTime = 1000;
054: private int idleTimeThreshold;
055: private AggThrottle aggThrottle;
056:
057: /**
058: * Create a thread pool for the given stage, manager and runnable,
059: * with the thread pool controller determining the number of threads
060: * used.
061: */
062: public ThreadPool(StageWrapperIF stage, ManagerIF mgr,
063: Runnable runnable) {
064: this .stage = stage;
065: this .poolname = stage.getStage().getName();
066: this .mgr = mgr;
067: this .runnable = runnable;
068:
069: SandstormConfig config = mgr.getConfig();
070: if (config.getBoolean("global.batchController.enable")) {
071: aggThrottle = new AggThrottle(stage, mgr);
072: } else {
073: this .maxAggregation = config
074: .getInt("global.batchController.maxBatch");
075: }
076:
077: threads = new Vector();
078: stoppedThreads = new Vector();
079:
080: // First look for stages.[stageName] options, then global options
081: String tag = "stages." + (stage.getStage().getName())
082: + ".threadPool.";
083: String globaltag = "global.threadPool.";
084:
085: int initialSize = config.getInt(tag + "initialThreads");
086: if (initialSize < 1) {
087: initialSize = config.getInt(globaltag + "initialThreads");
088: if (initialSize < 1)
089: initialSize = 1;
090: }
091: minThreads = config.getInt(tag + "minThreads");
092: if (minThreads < 1) {
093: minThreads = config.getInt(globaltag + "minThreads");
094: if (minThreads < 1)
095: minThreads = 1;
096: }
097: maxThreads = config.getInt(tag + "maxThreads");
098: if (maxThreads < 1) {
099: maxThreads = config.getInt(globaltag + "maxThreads");
100: if (maxThreads < 1)
101: maxThreads = -1; // Infinite
102: }
103:
104: this .blockTime = config.getInt(tag + "blockTime", config
105: .getInt(globaltag + "blockTime", blockTime));
106: this .idleTimeThreshold = config.getInt(tag
107: + "sizeController.idleTimeThreshold", config.getInt(
108: globaltag + "sizeController.idleTimeThreshold",
109: blockTime));
110:
111: if (DEBUG)
112: System.err.println("TP <" + poolname + ">: initial "
113: + initialSize + ", min " + minThreads + ", max "
114: + maxThreads + ", blockTime " + blockTime
115: + ", idleTime " + idleTimeThreshold);
116:
117: addThreads(initialSize, false);
118: mgr.getProfiler().add("ThreadPool <" + poolname + ">", this );
119: pooltg = new ThreadGroup("TP <" + poolname + ">");
120: }
121:
122: /**
123: * Create a thread pool with the given name, manager, runnable,
124: * and thread sizing parameters.
125: */
126: public ThreadPool(StageWrapperIF stage, ManagerIF mgr,
127: Runnable runnable, int initialThreads, int minThreads,
128: int maxThreads, int blockTime, int idleTimeThreshold) {
129: this .stage = stage;
130: this .poolname = stage.getStage().getName();
131: this .mgr = mgr;
132: this .runnable = runnable;
133:
134: SandstormConfig config = mgr.getConfig();
135: if (config.getBoolean("global.batchController.enable")) {
136: aggThrottle = new AggThrottle(stage, mgr);
137: } else {
138: this .maxAggregation = config
139: .getInt("global.batchController.maxBatch");
140: }
141:
142: threads = new Vector();
143: stoppedThreads = new Vector();
144: if (initialThreads < 1)
145: initialThreads = 1;
146: this .minThreads = minThreads;
147: if (this .minThreads < 1)
148: this .minThreads = 1;
149: this .maxThreads = maxThreads;
150: //if (this.maxThreads < 1) this.maxThreads = initialThreads;
151: this .blockTime = blockTime;
152: this .idleTimeThreshold = idleTimeThreshold;
153:
154: addThreads(initialThreads, false);
155: mgr.getProfiler().add("ThreadPool <" + poolname + ">", this );
156: pooltg = new ThreadGroup("TP <" + poolname + ">");
157: }
158:
159: /**
160: * Create a thread pool with the given name, manager, runnable,
161: * and a fixed number of threads.
162: */
163: public ThreadPool(StageWrapperIF stage, ManagerIF mgr,
164: Runnable runnable, int numThreads) {
165: this .stage = stage;
166: this .poolname = stage.getStage().getName();
167: this .mgr = mgr;
168: this .runnable = runnable;
169:
170: SandstormConfig config = mgr.getConfig();
171: if (config.getBoolean("global.batchController.enable")) {
172: aggThrottle = new AggThrottle(stage, mgr);
173: } else {
174: this .maxAggregation = config
175: .getInt("global.batchController.maxBatch");
176: }
177:
178: threads = new Vector();
179: stoppedThreads = new Vector();
180: maxThreads = minThreads = numThreads;
181: addThreads(numThreads, false);
182: mgr.getProfiler().add("ThreadPool <" + poolname + ">", this );
183: pooltg = new ThreadGroup("TP <" + poolname + ">");
184: }
185:
186: /**
187: * Start the thread pool.
188: */
189: public void start() {
190: if (DEBUG) {
191: System.err.print("TP <" + poolname + ">: Starting "
192: + numThreads() + " threads");
193: if (aggThrottle != null) {
194: System.err.println(", batchController enabled");
195: } else {
196: System.err.println(", maxBatch=" + maxAggregation);
197: }
198: }
199:
200: for (int i = 0; i < threads.size(); i++) {
201: Thread t = (Thread) threads.elementAt(i);
202: t.start();
203: }
204: }
205:
206: /**
207: * Stop the thread pool.
208: */
209: public void stop() {
210: pooltg.stop();
211: }
212:
213: /**
214: * Add threads to this pool.
215: */
216: void addThreads(int num, boolean start) {
217: synchronized (this ) {
218: int numToAdd;
219: if (maxThreads < 0) {
220: numToAdd = num;
221: } else {
222: int numTotal = Math.min(maxThreads, numThreads() + num);
223: numToAdd = numTotal - numThreads();
224: }
225: if ((maxThreads < 0) || (numToAdd < maxThreads)) {
226: if (DEBUG)
227: System.err.println("TP <" + poolname + ">: Adding "
228: + numToAdd + " threads to pool, size "
229: + (numThreads() + numToAdd));
230: }
231: for (int i = 0; i < numToAdd; i++) {
232: String name = "TP-" + numThreads() + " <" + poolname
233: + ">";
234: Thread t = new Thread(pooltg, runnable, name);
235: threads.addElement(t);
236: mgr.getProfiler().getGraphProfiler()
237: .addThread(t, stage);
238: if (start)
239: t.start();
240: }
241: }
242: }
243:
244: /**
245: * Remove threads from pool.
246: */
247: void removeThreads(int num) {
248: if (DEBUG)
249: System.err.print("TP <" + poolname + ">: Removing " + num
250: + " threads from pool, ");
251: synchronized (this ) {
252: for (int i = 0; (i < num) && (numThreads() > minThreads); i++) {
253: Thread t = (Thread) threads.firstElement();
254: stopThread(t);
255: }
256: }
257: System.err.println("size " + numThreads());
258: }
259:
260: /**
261: * Cause the given thread to stop execution.
262: */
263: void stopThread(Thread t) {
264: synchronized (this ) {
265: threads.removeElement(t);
266: stoppedThreads.addElement(t);
267: }
268: if (DEBUG)
269: System.err.println("TP <" + poolname
270: + ">: stopping thread, size " + numThreads());
271: }
272:
273: /**
274: * Return the number of threads in this pool.
275: */
276: int numThreads() {
277: synchronized (this ) {
278: return threads.size();
279: }
280: }
281:
282: /**
283: * Used by a thread to determine its queue block time.
284: */
285: public long getBlockTime() {
286: return blockTime;
287: }
288:
289: /**
290: * Used by a thread to request its aggregation target from the pool.
291: */
292: public synchronized int getAggregationTarget() {
293: if (aggThrottle != null) {
294: return aggThrottle.getAggTarget();
295: } else {
296: return maxAggregation;
297: }
298: }
299:
300: /**
301: * Used by a thread to determine whether it should exit.
302: */
303: public boolean timeToStop(long idleTime) {
304: synchronized (this ) {
305: if ((idleTime > idleTimeThreshold)
306: && (numThreads() > minThreads)) {
307: stopThread(Thread.currentThread());
308: }
309: if (stoppedThreads.contains(Thread.currentThread()))
310: return true;
311: }
312: return false;
313: }
314:
315: public String toString() {
316: return "TP (size=" + numThreads() + ") for <" + poolname + ">";
317: }
318:
319: public String getName() {
320: return "ThreadPool <" + poolname + ">";
321: }
322:
323: public int profileSize() {
324: return numThreads();
325: }
326:
327: }
|