001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.thread;
028:
029: import java.util.ArrayList;
030: import java.util.Comparator;
031: import java.util.Iterator;
032: import java.util.List;
033: import java.util.RandomAccess;
034:
035: import org.cougaar.util.UnaryPredicate;
036: import org.cougaar.util.log.Logger;
037: import org.cougaar.util.log.Logging;
038:
039: /**
040: * The base class of thread-scheduler. It allows a certain maximum
041: * number of threads to running, queuing any requests beyond that. An
042: * items is dequeued when a running thread stops. The maximum is per
043: * thread-service, not global. This scheduler is not used by default
044: * (the PropagatingScheduler extension is the default).
045: *
046: * @property org.cougaar.thread.running.max specifies the maximum
047: * number of running threads. A negative number is interpreted to
048: * mean that there is no maximum. The precise meaning of 'maximum' is
049: * varies by scheduler class.
050: *
051: */
052: public class Scheduler {
053: private DynamicSortedQueue<SchedulableObject> pendingThreads;
054: private List<SchedulableObject> disqualified = new ArrayList<SchedulableObject>();
055: private UnaryPredicate qualifier;
056: private UnaryPredicate childQualifier;
057: private ThreadListenerProxy listenerProxy;
058: private String printName;
059: private TreeNode treeNode;
060: private int absoluteMax;
061: private int maxRunningThreads = 0;
062: private int runningThreadCount = 0;
063: private int lane;
064: protected Logger logger = Logging.getLogger(getClass().getName());
065: protected int rightsRequestCount = 0;
066:
067: private static Logger _logger = Logging.getLogger(Scheduler.class);
068:
069: private Comparator<SchedulableObject> timeComparator = new Comparator<SchedulableObject>() {
070: public boolean equals(Object x) {
071: return x == this ;
072: }
073:
074: public int compare(SchedulableObject x, SchedulableObject y) {
075: long t1 = x.getTimestamp();
076: long t2 = y.getTimestamp();
077: if (t1 < t2)
078: return -1;
079: else if (t1 > t2)
080: return 1;
081: else
082: return 0;
083: }
084: };
085:
086: public Scheduler(ThreadListenerProxy listenerProxy) {
087: pendingThreads = new DynamicSortedQueue<SchedulableObject>(
088: timeComparator);
089: this .listenerProxy = listenerProxy;
090: }
091:
092: void setAbsoluteMax(int absoluteMax) {
093: this .absoluteMax = absoluteMax;
094: maxRunningThreads = absoluteMax;
095: if (_logger.isInfoEnabled()) {
096: _logger.info("Initialized maxRunningThreads to "
097: + maxRunningThreads);
098: }
099: }
100:
101: // NB: By design this method is NOT synchronized! It should only
102: // be used by the ThreadStatusService and is intended to provide
103: // a best-effort snapshot. Failures are normal.
104: int iterateOverQueuedThreads(ThreadStatusService.Body body) {
105: // could copy the queue in a synchronized block, on the off
106: // chance that read-access is unsafe otherwise. But there are
107: // no indications this is really an issue.
108: try {
109: return pendingThreads.processEach(body, getName(), _logger);
110: } catch (IndexOutOfBoundsException r) {
111: // this may happen if pendingThreads was modified during the traversal
112: if (_logger.isDebugEnabled()) {
113: _logger
114: .debug(
115: this
116: + ": SortedQueue.processEach detected a collision",
117: r);
118: }
119: } catch (Throwable r) {
120: _logger
121: .error(
122: this
123: + ": SortedQueue.processEach threw an uncaught exception",
124: r);
125: }
126: return 0;
127: }
128:
129: private Logger getLogger() {
130: return logger;
131: }
132:
133: public void setRightsSelector(RightsSelector selector) {
134: // error? no-op?
135: }
136:
137: public String toString() {
138: return printName;
139: }
140:
141: void setTreeNode(TreeNode treeNode) {
142: this .treeNode = treeNode;
143: printName = "<Scheduler " + treeNode.getName() + " [" + lane
144: + "]>";
145: }
146:
147: TreeNode getTreeNode() {
148: return treeNode;
149: }
150:
151: int getLane() {
152: return lane;
153: }
154:
155: void setLane(int lane) {
156: this .lane = lane;
157: }
158:
159: String getName() {
160: return getTreeNode().getName();
161: }
162:
163: // ThreadControlService
164: synchronized public void setQueueComparator(
165: final Comparator<Schedulable> comparator) {
166: if (comparator != null) {
167: Comparator<SchedulableObject> c = new Comparator<SchedulableObject>() {
168: public int compare(SchedulableObject o1,
169: SchedulableObject o2) {
170: return comparator.compare(o1, o2);
171: }
172: };
173: pendingThreads.setComparator(c);
174: } else {
175: pendingThreads.setComparator(timeComparator);
176: }
177: }
178:
179: public int maxRunningThreadCount() {
180: return maxRunningThreads;
181: }
182:
183: public int pendingThreadCount() {
184: return pendingThreads.size();
185: }
186:
187: public int runningThreadCount() {
188: return runningThreadCount;
189: }
190:
191: // synchronize to keep the two addends consistent
192: synchronized public int activeThreadCount() {
193: return runningThreadCount + pendingThreads.size();
194: }
195:
196: public boolean setChildQualifier(UnaryPredicate predicate) {
197: if (predicate == null) {
198: childQualifier = null;
199: return true;
200: } else if (childQualifier == null) {
201: childQualifier = predicate;
202: return true;
203: } else {
204: // log an error
205: Logger logger = getLogger();
206: if (logger.isErrorEnabled()) {
207: logger.error("ChildQualifier is already set");
208: }
209: return false;
210: }
211: }
212:
213: boolean allowRightFor(Scheduler child) {
214: if (child == this || childQualifier == null) {
215: // Don't run this on yourself, leave it to the parent.
216: return true;
217: } else {
218: return childQualifier.execute(child);
219: }
220: }
221:
222: synchronized private boolean addPendingThreadSync(
223: SchedulableObject thread) {
224: if (pendingThreads.contains(thread)) {
225: return false;
226: }
227: thread.setQueued(true);
228: pendingThreads.add(thread);
229: return true;
230: }
231:
232: void addPendingThread(SchedulableObject thread) {
233: if (addPendingThreadSync(thread)) {
234: listenerProxy.notifyQueued(thread);
235: }
236: }
237:
238: synchronized void dequeue(SchedulableObject thread) {
239: pendingThreads.remove(thread);
240: threadDequeued(thread);
241: }
242:
243: void threadDequeued(SchedulableObject thread) {
244: listenerProxy.notifyDequeued(thread);
245: }
246:
247: // Called within the thread itself as the first thing it does.
248: void threadClaimed(SchedulableObject thread) {
249: listenerProxy.notifyStart(thread);
250: }
251:
252: synchronized private SchedulableObject getNextSync() {
253: return pendingThreads.next();
254: }
255:
256: // Called within the thread itself as the last thing it does.
257: SchedulableObject threadReclaimed(SchedulableObject thread,
258: boolean reuse) {
259: listenerProxy.notifyEnd(thread);
260: return reuse ? getNextSync() : null;
261: }
262:
263: // Suspend/Resume "hints" -- not used yet.
264: void threadResumed(SchedulableObject thread) {
265: }
266:
267: void threadSuspended(SchedulableObject thread) {
268: }
269:
270: synchronized void incrementRunCount(Scheduler consumer) {
271: ++runningThreadCount;
272: listenerProxy.notifyRightGiven(consumer);
273: }
274:
275: synchronized void decrementRunCount(Scheduler consumer) {
276: --runningThreadCount;
277: if (runningThreadCount < 0) {
278: StringBuffer buf = new StringBuffer();
279: buf.append(this .toString());
280: buf.append(" thread count is ").append(
281: Integer.toString(runningThreadCount));
282: TreeNode node = getTreeNode();
283: TreeNode parent = node.getParent();
284: if (parent != null) {
285: buf.append(" parent ").append(parent.getName());
286: }
287: List<TreeNode> children = node.getChildren();
288: if (children != null && !children.isEmpty()) {
289: buf.append(" children");
290: for (TreeNode child : children) {
291: buf.append(" " + child.getName());
292: }
293: }
294: logger.error(buf.toString());
295: runningThreadCount = 0;
296: }
297: listenerProxy.notifyRightReturned(consumer);
298: }
299:
300: SchedulableObject getNextPending() {
301: return popQueue();
302: }
303:
304: synchronized private SchedulableObject popQueueSync() {
305: if (!checkLocalRights()) {
306: return null;
307: }
308: SchedulableObject thread = pendingThreads.next();
309: if (thread != null) {
310: incrementRunCount(this );
311: }
312: return thread;
313: }
314:
315: SchedulableObject popQueue() {
316: SchedulableObject thread = popQueueSync();
317: // Notify listeners
318: if (thread != null) {
319: threadDequeued(thread);
320: }
321:
322: return thread;
323: }
324:
325: // Caller should synchronize.
326: boolean checkLocalRights() {
327: if (maxRunningThreads < 0) {
328: return true;
329: }
330: return runningThreadCount + rightsRequestCount < maxRunningThreads;
331: }
332:
333: synchronized boolean requestRights(Scheduler requestor) {
334: if (maxRunningThreads < 0
335: || runningThreadCount < maxRunningThreads) {
336: incrementRunCount(requestor);
337: return true;
338: }
339: return false;
340: }
341:
342: synchronized void releaseRights(Scheduler consumer) {
343: // If the max has recently decreased it may be lower than the
344: // running count. In that case don't do a handoff.
345: decrementRunCount(consumer);
346: SchedulableObject handoff = null;
347:
348: if (runningThreadCount < maxRunningThreads) {
349: handoff = getNextPending();
350: if (handoff != null) {
351: handoff.thread_start();
352: }
353: } else {
354: if (logger.isErrorEnabled()) {
355: logger
356: .error("Decreased thread count prevented handoff "
357: + runningThreadCount
358: + ">"
359: + maxRunningThreads);
360: }
361: }
362: }
363:
364: synchronized private SchedulableObject getNextPendingSync() {
365: return getNextPending();
366: }
367:
368: public void setMaxRunningThreadCount(int requested_max) {
369: int count = requested_max;
370: Logger logger = getLogger();
371: if (requested_max > absoluteMax) {
372: if (logger.isErrorEnabled()) {
373: logger.error("Attempt to set maxRunningThreadCount to "
374: + requested_max
375: + " which is greater than the absolute max of "
376: + absoluteMax);
377: }
378: count = absoluteMax;
379: } else {
380: if (_logger.isInfoEnabled()) {
381: _logger.info(this
382: + ": Setting maxRunningThreadCount to "
383: + requested_max + " from " + maxRunningThreads);
384: }
385: }
386: int additionalThreads = count - maxRunningThreads;
387: maxRunningThreads = count;
388: TreeNode parent_node = getTreeNode().getParent();
389: if (parent_node != null) {
390: return;
391: }
392:
393: // If we get here, we're the root node. Try to run more
394: // threads if the count has gone up.
395: for (int i = 0; i < additionalThreads; i++) {
396: SchedulableObject schedulable = getNextPendingSync();
397: if (schedulable == null) {
398: return;
399: }
400: // System.err.println("Increased thread count let me start one!");
401: schedulable.thread_start();
402: }
403: }
404:
405: private boolean qualified(SchedulableObject thread) {
406: return qualifier == null || qualifier.execute(thread);
407: }
408:
409: synchronized private void setNewQualifier(UnaryPredicate predicate) {
410: qualifier = predicate;
411: List<SchedulableObject> bad = pendingThreads.filter(predicate);
412: // move any disqualified items on the queue to the
413: // disqualified list
414: if (bad instanceof RandomAccess) {
415: Iterator<SchedulableObject> itr = bad.iterator();
416: while (itr.hasNext()) {
417: disqualify(itr.next());
418: }
419: } else {
420: for (SchedulableObject sched : bad) {
421: disqualify(sched);
422: }
423: }
424: }
425:
426: synchronized private List<SchedulableObject> resetQualifier() {
427: qualifier = null;
428: List<SchedulableObject> requeue = new ArrayList<SchedulableObject>(
429: disqualified);
430: disqualified.clear();
431: return requeue;
432: }
433:
434: public boolean setQualifier(UnaryPredicate predicate) {
435: if (predicate == null) {
436: List<SchedulableObject> requeue = resetQualifier();
437: Logger logger = getLogger();
438: if (logger.isDebugEnabled()) {
439: logger.debug("Restoring " + requeue.size()
440: + " previously disqualified threads");
441: }
442: for (int i = 0, n = requeue.size(); i < n; i++) {
443: SchedulableObject sched = requeue.get(i);
444: SchedulableStateChangeQueue.pushStart(sched);
445: }
446: return true;
447: } else if (qualifier == null) {
448: setNewQualifier(predicate);
449: return true;
450: } else {
451: Logger logger = getLogger();
452: if (logger.isErrorEnabled()) {
453: logger.error("Qualifier is already set");
454: }
455: return false;
456: }
457: }
458:
459: private void disqualify(SchedulableObject sched) {
460: sched.setDisqualified(true);
461: if (!disqualified.contains(sched)) {
462: disqualified.add(sched);
463: }
464: }
465:
466: synchronized boolean checkQualification(SchedulableObject thread) {
467: if (!qualified(thread)) {
468: disqualify(thread);
469: return false;
470: }
471: if (pendingThreadCount() > 0) {
472: addPendingThread(thread);
473: return false;
474: }
475: return true;
476: }
477:
478: void startOrQueue(SchedulableObject thread) {
479: // If the queue isn't empty, queue this one too.
480: if (!checkQualification(thread)) {
481: return;
482: }
483: boolean can_run = requestRights(this);
484: if (can_run) {
485: thread.thread_start();
486: } else {
487: addPendingThread(thread);
488: }
489: }
490: }
|