001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.thread;
028:
029: import java.util.ArrayList;
030: import java.util.List;
031:
032: import org.cougaar.util.log.Logger;
033: import org.cougaar.util.log.Logging;
034:
035: /**
036: * A pool of native Java threads used by the standard implementation of the
037: * {@link ThreadService}. By default this pool has a fixed size.
038: */
039: class ThreadPool {
040:
041: static final class PooledThread extends Thread {
042: private static final long MAX_CONTINUATION_TIME = 100;
043:
044: private SchedulableObject schedulable;
045:
046: private boolean in_use = false;
047:
048: private boolean should_stop = false;
049:
050: /** reference to our thread pool so we can return when we die * */
051: private final ThreadPool pool;
052:
053: /**
054: * Has this thread already be actually started yet? access needs to be
055: * guarded by runLock.
056: */
057: private boolean isStarted = false;
058:
059: /** are we actively running the runnable? * */
060: private boolean isRunning = false;
061:
062: /**
063: * guards isRunning, synced while actually executing and waits when
064: * suspended.
065: */
066: private final Object runLock;
067:
068: SchedulableObject getSchedulable() {
069: return schedulable;
070: }
071:
072: /** The only constructor. * */
073: private PooledThread(ThreadPool p, String name) {
074: super (p.getThreadGroup(), null, name);
075: runLock = new NamedLock(name);
076: setDaemon(true);
077: pool = p;
078: }
079:
080: // Hook for subclasses
081: private void claim() {
082: schedulable.claim();
083: }
084:
085: // Keep running Schedulables as long as we have one to run,
086: // or we've been running for too long.
087: private void continuationLoop() {
088: SchedulableObject last_schedulable = null;
089: long continuation_start = System.currentTimeMillis();
090: while (schedulable != null) {
091: last_schedulable = schedulable;
092: claim();
093: try {
094: schedulable.run();
095: } catch (Throwable any_ex) {
096: pool.logger.error(
097: "Uncaught exception in pooled thread ("
098: + schedulable + ")", any_ex);
099: }
100:
101: isRunning = false;
102: long elapsed = System.currentTimeMillis()
103: - continuation_start;
104: if (elapsed < MAX_CONTINUATION_TIME) {
105: schedulable = reclaim(true);
106: } else {
107: // Too much time,
108: reclaim(false);
109: schedulable = null;
110: break; // don't look for anything else to run
111: }
112: }
113: in_use = false; // thread is now reusable
114: if (last_schedulable != null) {
115: last_schedulable.addToReclaimer();
116: }
117: }
118:
119: public final void run() {
120: synchronized (runLock) {
121: while (true) {
122:
123: // Run schedulables as long as we have one to run
124: continuationLoop();
125:
126: if (should_stop) {
127: // Thread service is shutting down
128: return;
129: }
130:
131: // Wait for more work, ignoring spurious wakeups and interrupts
132: do {
133: try {
134: runLock.wait();
135: if (should_stop) {
136: // Thread service is shutting down
137: return;
138: }
139: if (schedulable == null) {
140: pool.logger
141: .warn("Spurious wake up (no work), runLock = "
142: + runLock);
143: }
144: } catch (InterruptedException ie) {
145: pool.logger
146: .warn("Unexpected interrupt, thread="
147: + this );
148: }
149: } while (schedulable == null);
150: }
151: }
152: }
153:
154: public void start() {
155: throw new RuntimeException(
156: "You can't call start() on a PooledThread");
157: }
158:
159: void start_running(SchedulableObject newSchedulable)
160: throws IllegalThreadStateException {
161: synchronized (runLock) {
162: schedulable = newSchedulable;
163: if (isRunning) {
164: throw new IllegalThreadStateException(
165: "PooledThread already started: "
166: + schedulable);
167: }
168: isRunning = true;
169:
170: if (!isStarted) {
171: isStarted = true;
172: super .start();
173: } else {
174: if (schedulable == null) {
175: pool.logger.warn(this
176: + " was started with no work");
177: }
178: runLock.notify(); // resume
179: }
180: }
181: }
182:
183: void stop_running() {
184: synchronized (runLock) {
185: should_stop = true;
186: runLock.notify();
187: }
188: try {
189: join();
190: } catch (InterruptedException ie) {
191: }
192: synchronized (runLock) {
193: should_stop = false;
194: isStarted = false;
195: }
196: }
197:
198: private SchedulableObject reclaim(boolean reuse) {
199: SchedulableObject new_schedulable = schedulable
200: .reclaim(reuse);
201: if (pool.logger.isInfoEnabled()) {
202: if (new_schedulable != null) {
203: setName(new_schedulable.getName());
204: } else {
205: setName("Reclaimed");
206: }
207: }
208: return new_schedulable;
209: }
210: }
211:
212: /**
213: * The ThreadGroup of the pool - all threads in the pool must be members of
214: * the same threadgroup.
215: */
216: private final ThreadGroup group;
217: /**
218: * The maximum number of unused threads to keep around in the pool. anything
219: * beyond this may be destroyed or GCed.
220: */
221:
222: /** the actual pool * */
223: private PooledThread pool[];
224: private List<PooledThread> list_pool;
225: private final Logger logger;
226: private int index = 0;
227:
228: ThreadPool(int maximumSize, int initialSize, String name) {
229: // Maybe give each pool its own group?
230: group = new ThreadGroup(name);
231: // Thread.currentThread().getThreadGroup();
232:
233: logger = Logging.getLogger(getClass().getName());
234: if (maximumSize < 0) {
235: // Unlimited. Make an array of a somewhat arbitrary size
236: // (100 or initialSize, whichever is larger), and also
237: // make an ArrayList which will be used if the array runs
238: // out.
239: pool = new PooledThread[Math.max(initialSize, 100)];
240: list_pool = new ArrayList<PooledThread>(100);
241: } else {
242: if (initialSize > maximumSize) {
243: initialSize = maximumSize;
244: }
245: pool = new PooledThread[maximumSize];
246: }
247: for (int i = 0; i < initialSize; i++) {
248: pool[i] = constructReusableThread();
249: }
250: }
251:
252: private synchronized String nextName() {
253: return "CougaarPooledThread-" + index++;
254: }
255:
256: ThreadGroup getThreadGroup() {
257: return group;
258: }
259:
260: PooledThread getThread(String name) {
261: PooledThread thread = null;
262: PooledThread candidate = null;
263:
264: synchronized (this ) {
265: if (pool == null) {
266: throw new RuntimeException(
267: "The ThreadPool has been stopped");
268: }
269:
270: for (int i = 0; i < pool.length; i++) {
271: candidate = pool[i];
272: if (candidate == null) {
273: thread = constructReusableThread();
274: pool[i] = thread;
275: thread.in_use = true;
276: break;
277: } else if (!candidate.in_use) {
278: thread = candidate;
279: thread.in_use = true;
280: break;
281: }
282: }
283:
284: if (thread == null && list_pool != null) {
285: // Use the slow ArrayList. This is only enabled if
286: // there's no thread limit.
287: for (int i = 0, n = list_pool.size(); i < n; i++) {
288: candidate = list_pool.get(i);
289: if (!candidate.in_use) {
290: thread = candidate;
291: thread.in_use = true;
292: break;
293: }
294: }
295: if (thread == null) {
296: // None in the list either. Make one and add it,
297: thread = constructReusableThread();
298: thread.in_use = true;
299: list_pool.add(thread);
300: }
301: }
302: }
303:
304: if (thread == null) {
305: // None available. This is unrecoverable.
306: throw new RuntimeException("Exceeded ThreadPool max");
307: }
308:
309: if (logger.isInfoEnabled()) {
310: thread.setName(name);
311: }
312:
313: return thread;
314: }
315:
316: /** actually construct a new PooledThread * */
317: PooledThread constructReusableThread() {
318: // If info logging is enabled the thread's name will get set
319: // when it's run, so it can start out empty.
320: String name = "";
321: if (!logger.isInfoEnabled()) {
322: name = nextName();
323: }
324: return new PooledThread(this , name);
325: }
326:
327: String generateName() {
328: // Generate a name for a Schedulable. If info logging is
329: // enabled the name won't be used anywhere, so just return
330: // null. Otherwise make a unique one.
331: if (logger.isInfoEnabled()) {
332: return nextName();
333: } else {
334: return null;
335: }
336: }
337:
338: int iterateOverRunningThreads(ThreadStatusService.Body body) {
339: PooledThread[] p = pool;
340: if (p == null) {
341: return 0;
342: }
343: int count = 0;
344: for (int i = 0; i < p.length; i++) {
345: PooledThread thread = p[i];
346: if (thread == null || thread.isRunning) {
347: continue;
348: }
349: try {
350: SchedulableObject sched = thread.schedulable;
351: // Even though thread.isRunning was true just
352: // above, thread.schedulable could have become
353: // null by now (since iterateOverRunningThreads
354: // doesn't lock anything).
355: if (sched != null) {
356: Scheduler scheduler = sched.getScheduler();
357: String scheduler_name = null;
358: if (scheduler != null) {
359: scheduler_name = scheduler.getName();
360: }
361: body.run(scheduler_name, sched);
362: count++;
363: }
364: } catch (Throwable t) {
365: logger.error("ThreadStatusService error in body", t);
366: }
367: }
368: return count;
369: }
370:
371: void stopAllThreads() {
372: synchronized (this ) {
373: int n = pool == null ? 0 : pool.length;
374: for (int i = 0; i < n; i++) {
375: PooledThread thread = pool[i];
376: if (thread == null) {
377: continue;
378: }
379: thread.in_use = true;
380: thread.stop_running();
381: pool[i] = null;
382: }
383: pool = null;
384: }
385: }
386:
387: private static class NamedLock {
388: private final String name;
389:
390: NamedLock(String name) {
391: this .name = "Run lock " + name;
392: }
393:
394: public String toString() {
395: return name;
396: }
397: }
398: }
|