001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.thread;
028:
029: import java.util.ArrayList;
030: import java.util.List;
031: import org.cougaar.bootstrap.SystemProperties;
032: import org.cougaar.util.log.Logger;
033: import org.cougaar.util.log.Logging;
034:
035: /**
036: * A simple pool of Java threads, used by the trivial {@link ThreadService}. The
037: * pool can grown without bounds, since there are no limits on the number of
038: * concurrent threads.
039: */
040: class TrivialThreadPool {
041: // As noted in Starter, we need at least one thread to be non-daemon
042: // to keep our JVM alive. We mark the first ThreadRunner as non-daemon.
043: private static final boolean DAEMON = SystemProperties
044: .getBoolean("org.cougaar.core.thread.daemon");
045:
046: private static TrivialThreadPool singleton;
047:
048: private final Logger logger = Logging.getLogger(getClass()
049: .getName());
050: private final List<ThreadRunner> list_pool = new ArrayList<ThreadRunner>();
051: private ThreadRunner[] pool = new ThreadRunner[100];
052: int anon_count = 0;
053:
054: private TrivialThreadPool() {
055: if (!DAEMON) {
056: // Start our keep-alive
057: pool[0] = new ThreadRunner(false);
058: }
059: }
060:
061: synchronized String generateName() {
062: return "TrivialThread-" + anon_count++;
063: }
064:
065: Thread getThread(TrivialSchedulable schedulable, Runnable runnable,
066: String name) {
067: ThreadRunner result = null;
068:
069: synchronized (this ) {
070: if (pool == null) {
071: throw new RuntimeException(
072: "The ThreadPool has been stopped");
073: }
074: for (int i = 0; i < pool.length; i++) {
075: ThreadRunner candidate = pool[i];
076: if (candidate == null) {
077: result = new ThreadRunner();
078: pool[i] = result;
079: result.in_use = true;
080: break;
081: } else if (!candidate.in_use) {
082: result = candidate;
083: result.in_use = true;
084: break;
085: }
086: }
087:
088: if (result == null && list_pool != null) {
089: // Use the slow ArrayList.
090: for (int i = 0; i < list_pool.size(); i++) {
091: ThreadRunner candidate = list_pool.get(i);
092: if (!candidate.in_use) {
093: result = candidate;
094: result.in_use = true;
095: break;
096: }
097: }
098: }
099:
100: if (result == null) {
101: // None in the list either. Make one and add it,
102: result = new ThreadRunner();
103: result.in_use = true;
104: list_pool.add(result);
105: }
106: }
107:
108: result.configure(schedulable, runnable, name);
109:
110: return result;
111: }
112:
113: // Unsynchronized read access to list_pool. This is by design
114: // (this operation cannot block the entire service) and should be
115: // ok. If it isn't, the list could be copied.
116: int iterateOverRunningThreads(ThreadStatusService.Body body) {
117: int count = 0;
118: int n = pool == null ? 0 : pool.length;
119: for (int i = 0; i < n; i++) {
120: ThreadRunner thread = pool[i];
121: count += runBody(thread, body);
122: }
123: if (list_pool != null) {
124: for (int i = 0, size = list_pool.size(); i < size; i++) {
125: ThreadRunner thread = null;
126: try {
127: thread = list_pool.get(i);
128: } catch (Exception ex) {
129: // list_pool size has changed - doesn't matter
130: if (logger.isDebugEnabled()) {
131: logger.debug("list_pool size changed");
132: }
133: }
134: count += runBody(thread, body);
135: }
136: }
137: return count;
138: }
139:
140: private int runBody(ThreadRunner thread,
141: ThreadStatusService.Body body) {
142: if (thread != null && thread.in_use) {
143: try {
144: TrivialSchedulable sched = thread.schedulable;
145: if (sched != null) {
146: body.run("root", sched);
147: return 1; // one Schedulable processed
148: }
149: } catch (Throwable t) {
150: logger.error("ThreadStatusService error in body", t);
151: }
152: }
153:
154: // No Schedulable processed
155: return 0;
156: }
157:
158: void stopAllThreads() {
159: synchronized (this ) {
160: int n = pool == null ? 0 : pool.length;
161: for (int i = 0; i < n; i++) {
162: ThreadRunner thread = pool[i];
163: if (thread == null) {
164: continue;
165: }
166: thread.in_use = true;
167: thread.stop_running();
168: pool[i] = null;
169: }
170: pool = null;
171: }
172: }
173:
174: static void makePool() {
175: if (singleton == null) {
176: singleton = new TrivialThreadPool();
177: }
178: }
179:
180: static TrivialThreadPool pool() {
181: return singleton;
182: }
183:
184: private class ThreadRunner extends Thread {
185: TrivialSchedulable schedulable;
186: Runnable body;
187: boolean in_use;
188: boolean should_stop;
189: final Object lock = new Object();
190:
191: ThreadRunner() {
192: this (true);
193: }
194:
195: ThreadRunner(boolean daemon) {
196: super ("ThreadRunner");
197: setDaemon(daemon);
198: super .start();
199: }
200:
201: public void start() {
202: throw new RuntimeException(
203: "You can't call start() on a PooledThread");
204: }
205:
206: void configure(TrivialSchedulable schedulable, Runnable body,
207: String name) {
208: synchronized (lock) {
209: this .schedulable = schedulable;
210: this .body = body;
211: // thread.setName(name);
212: lock.notify();
213: }
214: }
215:
216: public void run() {
217: while (true) {
218: synchronized (lock) {
219: if (body != null) {
220: body.run();
221: }
222: if (schedulable != null) {
223: schedulable.thread_stop();
224: }
225: in_use = false;
226: if (should_stop) {
227: break;
228: }
229: try {
230: lock.wait();
231: } catch (InterruptedException ex) {
232: }
233: if (should_stop) {
234: break;
235: }
236: }
237:
238: }
239: }
240:
241: void stop_running() {
242: synchronized (lock) {
243: should_stop = true;
244: lock.notify();
245: }
246: try {
247: join();
248: } catch (InterruptedException ie) {
249: }
250: synchronized (lock) {
251: should_stop = false;
252: }
253: }
254: }
255: }
|