001: /*
002: * <copyright>
003: *
004: * Copyright 2001-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.agent.service.scheduler;
028:
029: import java.io.File;
030: import java.io.FileOutputStream;
031: import java.io.IOException;
032: import java.io.PrintStream;
033: import java.util.ArrayList;
034: import java.util.HashMap;
035: import java.util.HashSet;
036: import java.util.Iterator;
037: import java.util.Map;
038:
039: import org.cougaar.bootstrap.SystemProperties;
040: import org.cougaar.core.component.ServiceBroker;
041: import org.cougaar.core.component.ServiceProvider;
042: import org.cougaar.core.service.LoggingService;
043: import org.cougaar.core.service.SchedulerService;
044: import org.cougaar.core.service.ThreadService;
045: import org.cougaar.core.thread.Schedulable;
046: import org.cougaar.util.CircularQueue;
047: import org.cougaar.util.Trigger;
048:
049: /**
050: * Package-private scheduler service provider implementation, for
051: * use by the SchedulerServiceComponent.
052: * <p>
053: * The standard "normal" scheduler just wraps the new ThreadService.
054: * In fact, all SchedulerService clients will likely be ported to
055: * use the ThreadService directly, making this class obsolete.
056: * <p>
057: * Scheduler that runs its schedulees in a shared thread.<br>
058: * The schedulees tell the Scheduler they want to be run via a Trigger.<br>
059: * The schedulees pass in a Trigger that the Scheduler calls to activate them.
060: * <p>
061: * Debugging and behavior parameters: The basic idea is to watch how simple
062: * plugins are scheduled: we can watch how long "Shared Thread"
063: * plugins take to execute and keep statistics. We also watch to see
064: * if plugins block or otherwise fail to return from execute().
065: * <p>
066: * @property org.cougaar.core.service.SchedulerService.statistics=false Set
067: * it to true to collect plugin statistics.
068: * @property org.cougaar.core.service.SchedulerService.dumpStatistics=false Set
069: * it to true to get periodic dumps of plugin statistics to the file
070: * NODENAME.statistics in the current directory.
071: * @property org.cougaar.core.service.SchedulerService.watching=true Set it
072: * to false to disable the watcher (default is enabled). When enabled,
073: * will complain whever it sees a plugin run or block for more than
074: * 15 seconds. It will also cause the above statistics file(s) to be
075: * (re)generated approximately every two minutes. The watcher is one
076: * thread per vm, so it isn't too expensive.
077: * @property org.cougaar.core.service.SchedulerService.staticScheduler=true
078: * When true does shared-thread scheduling over the whole node/vm rather than
079: * per-agent. Uses the MultiScheduler instead of SimpleScheduler to support
080: * multiple worker threads. Set to false to get the pre-8.6.1 scheduler behavior.
081: * @property org.cougaar.core.service.SchedulerService.schedulerThreads=4
082: * The number of threads to use as workers for the MultiScheduler - these
083: * threads are shared among all scheduled components in the node.
084: */
085: class SchedulerServiceProvider implements ServiceProvider {
086: /** Should we keep statistics on plugin runtimes? */
087: static boolean keepingStatistics = false;
088:
089: /** Should we dump the stats periodically? */
090: static boolean dumpingStatistics = false;
091:
092: /** Should we watch for blocked plugins? */
093: static boolean isWatching = true;
094:
095: /** how long a plugin runs before we complain when watching (default 2 minutes)*/
096: static long warningTime = 120 * 1000L;
097:
098: /** Should we use a single per-node/vm scheduler (true) or per-agent (false)? */
099: static boolean staticScheduler = false;
100:
101: /** How many threads should we use to schedule components when using the MultiScheduler (4) */
102: static int nThreads = 4;
103:
104: static final Class[] emptyTypeArray = new Class[0];
105:
106: static final Object[] emptyObjectArray = new Object[0];
107:
108: static {
109: String p = "org.cougaar.core.service.SchedulerService.";
110: keepingStatistics = SystemProperties.getBoolean(p
111: + "statistics", keepingStatistics);
112: dumpingStatistics = SystemProperties.getBoolean(p
113: + "dumpStatistics", dumpingStatistics);
114: if (dumpingStatistics)
115: keepingStatistics = true;
116: isWatching = SystemProperties.getBoolean(p + "watching",
117: isWatching);
118: warningTime = SystemProperties.getLong(p + "warningTime",
119: warningTime);
120: staticScheduler = SystemProperties.getBoolean(p
121: + "staticScheduler", staticScheduler);
122: nThreads = SystemProperties.getInt(p + "schedulerThreads",
123: nThreads);
124: }
125:
126: private SchedulerBase scheduler;
127:
128: public SchedulerServiceProvider(ThreadService threadService,
129: LoggingService log) {
130: this (threadService, log, "Anonymous");
131: }
132:
133: public SchedulerServiceProvider(ThreadService threadService,
134: LoggingService log, String name) {
135: scheduler = createScheduler(threadService, name, log);
136: }
137:
138: private static final Object ssLock = new Object();
139: private static SchedulerBase singletonScheduler = null;
140:
141: protected SchedulerBase createScheduler(
142: ThreadService threadService, String id, LoggingService log) {
143: if (staticScheduler) {
144: synchronized (ssLock) {
145: if (singletonScheduler == null) {
146: //singletonScheduler = new SimpleScheduler(SystemProperties.getProperty("org.cougaar.core.node.Node.name", "unknown"));
147: singletonScheduler = new MultiScheduler(
148: SystemProperties.getProperty(
149: "org.cougaar.core.node.Node.name",
150: "unknown"));
151: }
152: return singletonScheduler;
153: }
154: } else {
155: return new NormalScheduler(threadService, log);
156: }
157: }
158:
159: // ServiceProvider methods
160: public Object getService(ServiceBroker sb, Object requestor,
161: Class serviceClass) {
162: return new SchedulerProxy(scheduler, requestor);
163: }
164:
165: public void releaseService(ServiceBroker sb, Object requestor,
166: Class serviceClass, Object service) {
167: }
168:
169: public void suspend() {
170: scheduler.suspend();
171: }
172:
173: public void resume() {
174: scheduler.resume();
175: }
176:
177: static abstract class SchedulerBase {
178: public Trigger register(Trigger manageMe, Object req) {
179: assureStarted();
180: addClient(manageMe, req);
181: return new SchedulerCallback(manageMe);
182: }
183:
184: public void unregister(Trigger stopPokingMe) {
185: removeClient(stopPokingMe);
186: }
187:
188: /** Lazy startup of scheduler threads */
189: abstract void assureStarted();
190:
191: /** add a client to the schedule list */
192: abstract void addClient(Trigger client, Object requestor);
193:
194: /** called to request that client stop being scheduled */
195: abstract void removeClient(Trigger client);
196:
197: /** called to request that client be activated asap */
198: abstract void scheduleClient(Trigger client);
199:
200: void suspend() {
201: }
202:
203: void resume() {
204: }
205:
206: /**
207: * Components hook into me
208: */
209: class SchedulerCallback implements Trigger {
210: private Trigger client;
211:
212: public SchedulerCallback(Trigger manageMe) {
213: client = manageMe;
214: }
215:
216: /**
217: * Add component to the list of pokables to be triggerd
218: */
219: public void trigger() {
220: scheduleClient(client);
221: }
222: }
223:
224: }
225:
226: public static class WorkerBase {
227: private long t0 = 0;
228: private Trigger currentTrigger = null;
229:
230: public boolean runTrigger(Trigger t) {
231: try {
232: t0 = System.currentTimeMillis();
233: currentTrigger = t;
234: t.trigger();
235: } catch (Throwable die) {
236: System.err.println("\nWarning Trigger " + t + " threw "
237: + die);
238: die.printStackTrace();
239: return true;
240: } finally {
241: if (keepingStatistics) {
242: long delta = System.currentTimeMillis() - t0;
243: accumulateStatistics(currentTrigger, delta);
244: }
245: currentTrigger = null;
246: t0 = 0;
247: }
248: return false;
249: }
250:
251: public void checkHealth() {
252: long tx = t0;
253: if (tx > 0) {
254: long delta = System.currentTimeMillis() - tx;
255: if (delta >= warningTime) {
256: System.err.println("Warning: Trigger "
257: + currentTrigger + " has been running for "
258: + (delta / 1000.0) + " seconds.");
259: }
260: }
261: }
262: }
263:
264: /**
265: * NormalScheduler applies threads from a ThreadService to scheduled
266: * clients. Requests are handled in the order they are requested.
267: */
268: static class NormalScheduler extends SchedulerBase {
269: private ThreadService threadService;
270:
271: /**
272: * Maps client Triggers to Worker instances
273: */
274: private final Map clients = new HashMap(13);
275:
276: NormalScheduler(ThreadService threadService, LoggingService log) {
277: this .threadService = threadService;
278: }
279:
280: void addClient(Trigger client, Object requestor) {
281: synchronized (clients) {
282: if (!clients.containsKey(client)) {
283: clients.put(client, new Worker(client, requestor));
284: }
285: }
286: }
287:
288: void removeClient(Trigger client) {
289: synchronized (clients) {
290: clients.remove(client);
291: }
292: }
293:
294: void scheduleClient(Trigger client) {
295: Worker worker;
296: synchronized (clients) {
297: worker = (Worker) clients.get(client);
298: if (worker == null) {
299: throw new IllegalArgumentException(
300: "Attempt to schedule unregistered client: "
301: + client);
302: }
303: }
304: worker.start();
305: }
306:
307: void assureStarted() {
308: }
309:
310: void suspend() {
311: }
312:
313: void resume() {
314: }
315:
316: class Worker extends WorkerBase implements Runnable {
317: private Trigger client;
318: private Schedulable schedulable;
319:
320: Worker(Trigger client, Object requestor) {
321: this .client = client;
322: String name = requestor.toString();
323: // invokeMethod("getBlackboardClientName",
324: // requestor,
325: // requestor.getClass().getName()).toString();
326: schedulable = threadService.getThread(requestor, this ,
327: name);
328: }
329:
330: public void start() {
331: schedulable.start();
332: }
333:
334: public void run() {
335: runTrigger(client);
336: }
337: }
338: }
339:
340: /** MultiScheduler applies a fixed set of workers against scheduled
341: * clients. Requests are handled in the order they are requested.
342: */
343: static class MultiScheduler extends SchedulerBase {
344: private final String id;
345:
346: MultiScheduler(String id) {
347: this .id = id;
348: }
349:
350: private final HashSet clients = new HashSet(13);
351:
352: private final CircularQueue runnables = new CircularQueue(32);
353:
354: void addClient(Trigger client, Object requestor) {
355: synchronized (clients) {
356: // only put it on the list if it hasn't already been scheduled
357: // Note that this will still allow rescheduling when it is currently
358: // being run! Might be better to have three well-managed states:
359: // idle, pending, running
360: // as is, the trigger probably needs to be synchronized to be
361: // safe.
362: if (!clients.contains(client)) {
363: clients.add(client);
364: }
365: }
366: }
367:
368: void removeClient(Trigger client) {
369: synchronized (clients) {
370: clients.remove(client);
371: }
372: }
373:
374: void scheduleClient(Trigger client) {
375: synchronized (runnables) {
376: // if (!runnables.contains(client)) {
377: runnables.add(client);
378: runnables.notifyAll();
379: // }
380: }
381: }
382:
383: /** the scheduler instance (if started) */
384: private boolean running = false;
385:
386: private ArrayList threads = null;
387:
388: synchronized void assureStarted() {
389: if (threads == null) {
390: threads = new ArrayList(nThreads);
391: for (int i = 0; i < nThreads; i++) {
392: Worker scheduler = new Worker(i);
393: if (isWatching) {
394: getWatcher().register(scheduler);
395: }
396: String name = "MultiScheduler/" + id + "(" + i
397: + ")";
398: Thread thread = new Thread(scheduler, name);
399: running = true;
400: thread.start();
401: threads.add(thread);
402: }
403: }
404: }
405:
406: synchronized void suspend() {
407: if (running) {
408: // BUG 842: disable MultiScheduler suspend. This is the low-risk
409: // solution until the better fix is ready and well tested. See
410: // the bug report for further details.
411: System.err
412: .println("Warning: Bug 842"
413: + " (scheduler \"suspend\" disabled, okay to proceed)");
414: /*
415: running = false;
416: synchronized (runnables) {
417: runnables.notifyAll();
418: }
419: try {
420: for (int i =0;i<nThreads;i++) {
421: ((Thread)threads.get(i)).join(60000);
422: }
423: } catch (InterruptedException ie) {
424: }
425: //schedulerThread = null;
426: threads = null;
427: */
428: }
429: }
430:
431: synchronized void resume() {
432: if (!running) {
433: assureStarted();
434: }
435: }
436:
437: class Worker extends WorkerBase implements Runnable {
438: Worker(int i) {
439: }
440:
441: public void run() {
442: while (true) {
443: Trigger t;
444: synchronized (runnables) {
445: while (true) {
446: if (!(running)) {
447: return;
448: }
449: t = (Trigger) runnables.next();
450: if (t != null) {
451: break;
452: }
453: try {
454: runnables.wait();
455: } catch (InterruptedException ie) {
456: }
457: }
458: }
459: runTrigger(t);
460: }
461: }
462: }
463: }
464:
465: /** SimpleScheduler is a simple on-demand scheduler of trigger requests.
466: * Requests are handled in the order they are requested.
467: */
468: static class SimpleScheduler extends SchedulerBase {
469: private final String id;
470:
471: SimpleScheduler(String id) {
472: this .id = id;
473: }
474:
475: private final HashSet clients = new HashSet(13);
476: private final Semaphore sem = new Semaphore();
477:
478: private final Object runnableLock = new Object();
479: private ArrayList runnables = new ArrayList(13);
480: private ArrayList working = new ArrayList(13);
481:
482: void addClient(Trigger client, Object requestor) {
483: synchronized (clients) {
484: clients.add(client);
485: }
486: }
487:
488: void removeClient(Trigger client) {
489: synchronized (clients) {
490: clients.remove(client);
491: }
492: }
493:
494: void scheduleClient(Trigger client) {
495: synchronized (runnableLock) {
496: runnables.add(client);
497: }
498: sem.set();
499: }
500:
501: /** the scheduler instance (if started) */
502: private Thread schedulerThread = null;
503: private boolean running = false;
504:
505: synchronized void assureStarted() {
506: if (schedulerThread == null) {
507: Worker scheduler = new Worker();
508: if (isWatching) {
509: getWatcher().register(scheduler);
510: }
511: String name = "SimpleScheduler/" + id;
512: schedulerThread = new Thread(scheduler, name);
513: running = true;
514: schedulerThread.start();
515: }
516: }
517:
518: synchronized void suspend() {
519: if (running) {
520: running = false;
521: sem.set();
522: try {
523: schedulerThread.join(60000);
524: } catch (InterruptedException ie) {
525: }
526: schedulerThread = null;
527: }
528: }
529:
530: synchronized void resume() {
531: if (!running) {
532: assureStarted();
533: }
534: }
535:
536: class Worker extends WorkerBase implements Runnable {
537: public void run() {
538: while (running) {
539: sem.waitForSet();
540: synchronized (runnableLock) {
541: // swap runnables and working arrays, leaving runnables clear
542: ArrayList tmp = runnables;
543: runnables = working;
544: runnables.clear();
545: working = tmp;
546: }
547:
548: int l = working.size();
549: for (int i = 0; i < l; i++) {
550: Trigger pc = (Trigger) working.get(i);
551: runTrigger(pc);
552: }
553: working.clear();
554: }
555: }
556: }
557: }
558:
559: // watcher and statistics support
560:
561: private static Watcher watcher = null;
562:
563: private static synchronized Watcher getWatcher() {
564: if (watcher == null) {
565: watcher = new Watcher();
566: new Thread(watcher, "SchedulerService.Watcher").start();
567: }
568: return watcher;
569: }
570:
571: private static class Watcher implements Runnable {
572: private long reportTime = 0;
573:
574: public void run() {
575: while (true) {
576: try {
577: Thread.sleep(10 * 1000L); // sleep for a 10 seconds at a time
578: long now = System.currentTimeMillis();
579:
580: if (isWatching)
581: check();
582:
583: // no more often then every two minutes
584: if (dumpingStatistics && keepingStatistics
585: && (now - reportTime) >= 120 * 1000L) {
586: reportTime = now;
587: report();
588: }
589: } catch (Throwable t) {
590: t.printStackTrace();
591: }
592: }
593: }
594:
595: /** List<WorkerBase> */
596: private ArrayList pims = new ArrayList();
597:
598: synchronized void register(WorkerBase worker) {
599: pims.add(worker);
600: }
601:
602: /** dump reports on plugin usage */
603: private synchronized void report() {
604:
605: String nodeName = SystemProperties.getProperty(
606: "org.cougaar.core.node.Node.name", "unknown");
607: try {
608: File f = new File(nodeName + ".statistics");
609: FileOutputStream fos = new FileOutputStream(f);
610: PrintStream ps = new PrintStream(fos);
611: reportStatistics(ps);
612: ps.close();
613: } catch (IOException e) {
614: e.printStackTrace();
615: }
616: }
617:
618: /** check the health of each plugin manager, reporting problems */
619: private synchronized void check() {
620: for (Iterator i = pims.iterator(); i.hasNext();) {
621: WorkerBase pim = (WorkerBase) i.next();
622: pim.checkHealth();
623: }
624: }
625: }
626:
627: // statistics keeper
628:
629: private static final HashMap statistics = new HashMap(29);
630:
631: static void accumulateStatistics(Trigger trig, long elapsed) {
632: synchronized (statistics) {
633: InvocationStatistics is = (InvocationStatistics) statistics
634: .get(trig);
635: if (is == null) {
636: is = new InvocationStatistics(trig);
637: statistics.put(trig, is);
638: }
639: is.accumulate(elapsed);
640: }
641: }
642:
643: public static void reportStatistics(PrintStream os) {
644: // the cid should be part of the stats toString
645: //os.println(cid.toString());
646: synchronized (statistics) {
647: for (Iterator i = statistics.values().iterator(); i
648: .hasNext();) {
649: InvocationStatistics is = (InvocationStatistics) i
650: .next();
651: os.println(is.toString());
652: }
653: }
654: }
655:
656: public static class InvocationStatistics {
657: private int count = 0;
658: private long millis = 0L;
659:
660: Trigger trigger;
661:
662: InvocationStatistics(Trigger p) {
663: trigger = p;
664: }
665:
666: synchronized void accumulate(long elapsed) {
667: count++;
668: millis += elapsed;
669: }
670:
671: public synchronized String toString() {
672: double mean = ((millis / count) / 1000.0);
673: return trigger.toString() + "\t" + count + "\t" + mean;
674: }
675: }
676:
677: // support classes
678:
679: /** proxy class to shield the real scheduler from clients */
680: static final class SchedulerProxy implements SchedulerService {
681: private final SchedulerBase scheduler;
682: private final Object requestor;
683:
684: SchedulerProxy(SchedulerBase r, Object req) {
685: scheduler = r;
686: requestor = req;
687: }
688:
689: public Trigger register(Trigger manageMe) {
690: return scheduler.register(manageMe, requestor);
691: }
692:
693: public void unregister(Trigger stopPokingMe) {
694: scheduler.unregister(stopPokingMe);
695: }
696: }
697:
698: public static final class Semaphore {
699: public Semaphore() {
700: }
701:
702: private boolean attention = false;
703:
704: public synchronized boolean isSet() {
705: if (attention) {
706: attention = false;
707: return true;
708: } else {
709: return false;
710: }
711: }
712:
713: public synchronized void set() {
714: attention = true;
715: notifyAll();
716: }
717:
718: public synchronized void waitForSet() {
719: while (!attention) {
720: try {
721: wait();
722: } catch (InterruptedException ie) {
723: }
724: }
725: attention = false;
726: }
727: }
728: }
|