001: /*
002: File: FJTaskRunnerGroup.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 7Jan1999 dl First public release
012: 12Jan1999 dl made getActiveCount public; misc minor cleanup.
013: 14Jan1999 dl Added executeTask
014: 20Jan1999 dl Allow use of priorities; reformat stats
015: 6Feb1999 dl Lazy thread starts
016: 27Apr1999 dl Renamed
017: */
018:
019: package org.logicalcobwebs.concurrent;
020:
021: /**
022: * A stripped down analog of a ThreadGroup used for
023: * establishing and managing FJTaskRunner threads.
024: * ThreadRunnerGroups serve as the control boundary separating
025: * the general world of normal threads from the specialized world
026: * of FJTasks.
027: * <p>
028: * By intent, this class does not subclass java.lang.ThreadGroup, and
029: * does not support most methods found in ThreadGroups, since they
030: * would make no sense for FJTaskRunner threads. In fact, the class
031: * does not deal with ThreadGroups at all. If you want to restrict
032: * a FJTaskRunnerGroup to a particular ThreadGroup, you can create
033: * it from within that ThreadGroup.
034: * <p>
035: * The main contextual parameter for a FJTaskRunnerGroup is
036: * the group size, established in the constructor.
037: * Groups must be of a fixed size.
038: * There is no way to dynamically increase or decrease the number
039: * of threads in an existing group.
040: * <p>
041: * In general, the group size should be equal to the number
042: * of CPUs on the system. (Unfortunately, there is no portable
043: * means of automatically detecting the number of CPUs on a JVM, so there is
044: * no good way to automate defaults.) In principle, when
045: * FJTasks are used for computation-intensive tasks, having only
046: * as many threads as CPUs should minimize bookkeeping overhead
047: * and contention, and so maximize throughput. However, because
048: * FJTaskRunners lie atop Java threads, and in turn operating system
049: * thread support and scheduling policies,
050: * it is very possible that using more threads
051: * than CPUs will improve overall throughput even though it adds
052: * to overhead. This will always be so if FJTasks are I/O bound.
053: * So it may pay to experiment a bit when tuning on particular platforms.
054: * You can also use <code>setRunPriorities</code> to either
055: * increase or decrease the priorities of active threads, which
056: * may interact with group size choice.
057: * <p>
058: * In any case, overestimating group sizes never
059: * seriously degrades performance (at least within reasonable bounds).
060: * You can also use a value
061: * less than the number of CPUs in order to reserve processing
062: * for unrelated threads.
063: * <p>
064: * There are two general styles for using a FJTaskRunnerGroup.
065: * You can create one group per entire program execution, for example
066: * as a static singleton, and use it for all parallel tasks:
067: * <pre>
068: * class Tasks {
069: * static FJTaskRunnerGroup group;
070: * public void initialize(int groupsize) {
071: * group = new FJTaskRunnerGroup(groupSize);
072: * }
073: * // ...
074: * }
075: * </pre>
076: * Alternatively, you can make new groups on the fly and use them only for
077: * particular task sets. This is more flexible,,
078: * and leads to more controllable and deterministic execution patterns,
079: * but it encounters greater overhead on startup. Also, to reclaim
080: * system resources, you should
081: * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done
082: * using one-shot groups. Otherwise, because FJTaskRunners set
083: * <code>Thread.isDaemon</code>
084: * status, they will not normally be reclaimed until program termination.
085: * <p>
086: * The main supported methods are <code>execute</code>,
087: * which starts a task processed by FJTaskRunner threads,
088: * and <code>invoke</code>, which starts one and waits for completion.
089: * For example, you might extend the above <code>FJTasks</code>
090: * class to support a task-based computation, say, the
091: * <code>Fib</code> class from the <code>FJTask</code> documentation:
092: * <pre>
093: * class Tasks { // continued
094: * // ...
095: * static int fib(int n) {
096: * try {
097: * Fib f = new Fib(n);
098: * group.invoke(f);
099: * return f.getAnswer();
100: * }
101: * catch (InterruptedException ex) {
102: * throw new Error("Interrupted during computation");
103: * }
104: * }
105: * }
106: * </pre>
107: * <p>
108: * Method <code>stats()</code> can be used to monitor performance.
109: * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with
110: * the compile-time constant COLLECT_STATS set to false. In this
111: * case, various simple counts reported in stats() are not collected.
112: * On platforms tested,
113: * this leads to such a tiny performance improvement that there is
114: * very little motivation to bother.
115: *
116: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
117: * <p>
118: * @see FJTask
119: * @see FJTaskRunner
120: **/
121:
122: public class FJTaskRunnerGroup implements Executor {
123:
124: /** The threads in this group **/
125: protected final FJTaskRunner[] threads;
126:
127: /** Group-wide queue for tasks entered via execute() **/
128: protected final LinkedQueue entryQueue = new LinkedQueue();
129:
130: /** Number of threads that are not waiting for work **/
131: protected int activeCount = 0;
132:
133: /** Number of threads that have been started. Used to avoid
134: unecessary contention during startup of task sets.
135: **/
136: protected int nstarted = 0;
137:
138: /**
139: * Compile-time constant. If true, various counts of
140: * runs, waits, etc., are maintained. These are NOT
141: * updated with synchronization, so statistics reports
142: * might not be accurate.
143: **/
144:
145: static final boolean COLLECT_STATS = true;
146: // static final boolean COLLECT_STATS = false;
147:
148: // for stats
149:
150: /** The time at which this ThreadRunnerGroup was constructed **/
151: long initTime = 0;
152:
153: /** Total number of executes or invokes **/
154: int entries = 0;
155:
156: static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY + 1;
157:
158: /**
159: * Create a FJTaskRunnerGroup with the indicated number
160: * of FJTaskRunner threads. Normally, the best size to use is
161: * the number of CPUs on the system.
162: * <p>
163: * The threads in a FJTaskRunnerGroup are created with their
164: * isDaemon status set, so do not normally need to be
165: * shut down manually upon program termination.
166: **/
167:
168: public FJTaskRunnerGroup(int groupSize) {
169: threads = new FJTaskRunner[groupSize];
170: initializeThreads();
171: initTime = System.currentTimeMillis();
172: }
173:
174: /**
175: * Arrange for execution of the given task
176: * by placing it in a work queue. If the argument
177: * is not of type FJTask, it is embedded in a FJTask via
178: * <code>FJTask.Wrap</code>.
179: * @exception InterruptedException if current Thread is
180: * currently interrupted
181: **/
182:
183: public void execute(Runnable r) throws InterruptedException {
184: if (r instanceof FJTask) {
185: entryQueue.put((FJTask) r);
186: } else {
187: entryQueue.put(new FJTask.Wrap(r));
188: }
189: signalNewTask();
190: }
191:
192: /**
193: * Specialized form of execute called only from within FJTasks
194: **/
195: public void executeTask(FJTask t) {
196: try {
197: entryQueue.put(t);
198: signalNewTask();
199: } catch (InterruptedException ex) {
200: Thread.currentThread().interrupt();
201: }
202: }
203:
204: /**
205: * Start a task and wait it out. Returns when the task completes.
206: * @exception InterruptedException if current Thread is
207: * interrupted before completion of the task.
208: **/
209:
210: public void invoke(Runnable r) throws InterruptedException {
211: InvokableFJTask w = new InvokableFJTask(r);
212: entryQueue.put(w);
213: signalNewTask();
214: w.awaitTermination();
215: }
216:
217: /**
218: * Try to shut down all FJTaskRunner threads in this group
219: * by interrupting them all. This method is designed
220: * to be used during cleanup when it is somehow known
221: * that all threads are idle.
222: * FJTaskRunners only
223: * check for interruption when they are not otherwise
224: * processing a task (and its generated subtasks,
225: * if any), so if any threads are active, shutdown may
226: * take a while, and may lead to unpredictable
227: * task processing.
228: **/
229:
230: public void interruptAll() {
231: // paranoically interrupt current thread last if in group.
232: Thread current = Thread.currentThread();
233: boolean stopCurrent = false;
234:
235: for (int i = 0; i < threads.length; ++i) {
236: Thread t = threads[i];
237: if (t == current)
238: stopCurrent = true;
239: else
240: t.interrupt();
241: }
242: if (stopCurrent)
243: current.interrupt();
244: }
245:
246: /**
247: * Set the priority to use while a FJTaskRunner is
248: * polling for new tasks to perform. Default
249: * is currently Thread.MIN_PRIORITY+1. The value
250: * set may not go into effect immediately, but
251: * will be used at least the next time a thread scans for work.
252: **/
253: public synchronized void setScanPriorities(int pri) {
254: for (int i = 0; i < threads.length; ++i) {
255: FJTaskRunner t = threads[i];
256: t.setScanPriority(pri);
257: if (!t.active)
258: t.setPriority(pri);
259: }
260: }
261:
262: /**
263: * Set the priority to use while a FJTaskRunner is
264: * actively running tasks. Default
265: * is the priority that was in effect by the thread that
266: * constructed this FJTaskRunnerGroup. Setting this value
267: * while threads are running may momentarily result in
268: * them running at this priority even when idly waiting for work.
269: **/
270: public synchronized void setRunPriorities(int pri) {
271: for (int i = 0; i < threads.length; ++i) {
272: FJTaskRunner t = threads[i];
273: t.setRunPriority(pri);
274: if (t.active)
275: t.setPriority(pri);
276: }
277: }
278:
279: /** Return the number of FJTaskRunner threads in this group **/
280:
281: public int size() {
282: return threads.length;
283: }
284:
285: /**
286: * Return the number of threads that are not idly waiting for work.
287: * Beware that even active threads might not be doing any useful
288: * work, but just spinning waiting for other dependent tasks.
289: * Also, since this is just a snapshot value, some tasks
290: * may be in the process of becoming idle.
291: **/
292: public synchronized int getActiveCount() {
293: return activeCount;
294: }
295:
296: /**
297: * Prints various snapshot statistics to System.out.
298: * <ul>
299: * <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for
300: * <em>n</em> from zero to group size - 1):
301: * <ul>
302: * <li> A star "*" is printed if the thread is currently active;
303: * that is, not sleeping while waiting for work. Because
304: * threads gradually enter sleep modes, an active thread
305: * may in fact be about to sleep (or wake up).
306: * <li> <em>Q Cap</em> The current capacity of its task queue.
307: * <li> <em>Run</em> The total number of tasks that have been run.
308: * <li> <em>New</em> The number of these tasks that were
309: * taken from either the entry queue or from other
310: * thread queues; that is, the number of tasks run
311: * that were <em>not</em> forked by the thread itself.
312: * <li> <em>Scan</em> The number of times other task
313: * queues or the entry queue were polled for tasks.
314: * </ul>
315: * <li> <em>Execute</em> The total number of tasks entered
316: * (but not necessarily yet run) via execute or invoke.
317: * <li> <em>Time</em> Time in seconds since construction of this
318: * FJTaskRunnerGroup.
319: * <li> <em>Rate</em> The total number of tasks processed
320: * per second across all threads. This
321: * may be useful as a simple throughput indicator
322: * if all processed tasks take approximately the
323: * same time to run.
324: * </ul>
325: * <p>
326: * Cautions: Some statistics are updated and gathered
327: * without synchronization,
328: * so may not be accurate. However, reported counts may be considered
329: * as lower bounds of actual values.
330: * Some values may be zero if classes are compiled
331: * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup
332: * classes can be independently compiled with different values of
333: * COLLECT_STATS.) Also, the counts are maintained as ints so could
334: * overflow in exceptionally long-lived applications.
335: * <p>
336: * These statistics can be useful when tuning algorithms or diagnosing
337: * problems. For example:
338: * <ul>
339: * <li> High numbers of scans may mean that there is insufficient
340: * parallelism to keep threads busy. However, high scan rates
341: * are expected if the number
342: * of Executes is also high or there is a lot of global
343: * synchronization in the application, and the system is not otherwise
344: * busy. Threads may scan
345: * for work hundreds of times upon startup, shutdown, and
346: * global synch points of task sets.
347: * <li> Large imbalances in tasks run across different threads might
348: * just reflect contention with unrelated threads on a system
349: * (possibly including JVM threads such as GC), but may also
350: * indicate some systematic bias in how you generate tasks.
351: * <li> Large task queue capacities may mean that too many tasks are being
352: * generated before they can be run.
353: * Capacities are reported rather than current numbers of tasks
354: * in queues because they are better indicators of the existence
355: * of these kinds of possibly-transient problems.
356: * Queue capacities are
357: * resized on demand from their initial value of 4096 elements,
358: * which is much more than sufficient for the kinds of
359: * applications that this framework is intended to best support.
360: * </ul>
361: **/
362:
363: public void stats() {
364: long time = System.currentTimeMillis() - initTime;
365: double secs = ((double) time) / 1000.0;
366: long totalRuns = 0;
367: long totalScans = 0;
368: long totalSteals = 0;
369:
370: System.out.print("Thread" + "\tQ Cap" + "\tScans" + "\tNew"
371: + "\tRuns" + "\n");
372:
373: for (int i = 0; i < threads.length; ++i) {
374: FJTaskRunner t = threads[i];
375: int truns = t.runs;
376: totalRuns += truns;
377:
378: int tscans = t.scans;
379: totalScans += tscans;
380:
381: int tsteals = t.steals;
382: totalSteals += tsteals;
383:
384: String star = (getActive(t)) ? "*" : " ";
385:
386: System.out.print("T" + i + star + "\t" + t.deqSize() + "\t"
387: + tscans + "\t" + tsteals + "\t" + truns + "\n");
388: }
389:
390: System.out.print("Total" + "\t " + "\t" + totalScans + "\t"
391: + totalSteals + "\t" + totalRuns + "\n");
392:
393: System.out.print("Execute: " + entries);
394:
395: System.out.print("\tTime: " + secs);
396:
397: long rps = 0;
398: if (secs != 0)
399: rps = Math.round((double) (totalRuns) / secs);
400:
401: System.out.println("\tRate: " + rps);
402: }
403:
404: /* ------------ Methods called only by FJTaskRunners ------------- */
405:
406: /**
407: * Return the array of threads in this group.
408: * Called only by FJTaskRunner.scan().
409: **/
410:
411: protected FJTaskRunner[] getArray() {
412: return threads;
413: }
414:
415: /**
416: * Return a task from entry queue, or null if empty.
417: * Called only by FJTaskRunner.scan().
418: **/
419:
420: protected FJTask pollEntryQueue() {
421: try {
422: FJTask t = (FJTask) (entryQueue.poll(0));
423: return t;
424: } catch (InterruptedException ex) { // ignore interrupts
425: Thread.currentThread().interrupt();
426: return null;
427: }
428: }
429:
430: /**
431: * Return active status of t.
432: * Per-thread active status can only be accessed and
433: * modified via synchronized method here in the group class.
434: **/
435:
436: protected synchronized boolean getActive(FJTaskRunner t) {
437: return t.active;
438: }
439:
440: /**
441: * Set active status of thread t to true, and notify others
442: * that might be waiting for work.
443: **/
444:
445: protected synchronized void setActive(FJTaskRunner t) {
446: if (!t.active) {
447: t.active = true;
448: ++activeCount;
449: if (nstarted < threads.length)
450: threads[nstarted++].start();
451: else
452: notifyAll();
453: }
454: }
455:
456: /**
457: * Set active status of thread t to false.
458: **/
459:
460: protected synchronized void setInactive(FJTaskRunner t) {
461: if (t.active) {
462: t.active = false;
463: --activeCount;
464: }
465: }
466:
467: /**
468: * The number of times to scan other threads for tasks
469: * before transitioning to a mode where scans are
470: * interleaved with sleeps (actually timed waits).
471: * Upon transition, sleeps are for duration of
472: * scans / SCANS_PER_SLEEP milliseconds.
473: * <p>
474: * This is not treated as a user-tunable parameter because
475: * good values do not appear to vary much across JVMs or
476: * applications. Its main role is to help avoid some
477: * useless spinning and contention during task startup.
478: **/
479: static final long SCANS_PER_SLEEP = 15;
480:
481: /**
482: * The maximum time (in msecs) to sleep when a thread is idle,
483: * yet others are not, so may eventually generate work that
484: * the current thread can steal. This value reflects the maximum time
485: * that a thread may sleep when it possibly should not, because there
486: * are other active threads that might generate work. In practice,
487: * designs in which some threads become stalled because others
488: * are running yet not generating tasks are not likely to work
489: * well in this framework anyway, so the exact value does not matter
490: * too much. However, keeping it in the sub-second range does
491: * help smooth out startup and shutdown effects.
492: **/
493:
494: static final long MAX_SLEEP_TIME = 100;
495:
496: /**
497: * Set active status of thread t to false, and
498: * then wait until: (a) there is a task in the entry
499: * queue, or (b) other threads are active, or (c) the current
500: * thread is interrupted. Upon return, it
501: * is not certain that there will be work available.
502: * The thread must itself check.
503: * <p>
504: * The main underlying reason
505: * for these mechanics is that threads do not
506: * signal each other when they add elements to their queues.
507: * (This would add to task overhead, reduce locality.
508: * and increase contention.)
509: * So we must rely on a tamed form of polling. However, tasks
510: * inserted into the entry queue do result in signals, so
511: * tasks can wait on these if all of them are otherwise idle.
512: **/
513:
514: protected synchronized void checkActive(FJTaskRunner t, long scans) {
515:
516: setInactive(t);
517:
518: try {
519: // if nothing available, do a hard wait
520: if (activeCount == 0 && entryQueue.peek() == null) {
521: wait();
522: } else {
523: // If there is possibly some work,
524: // sleep for a while before rechecking
525:
526: long msecs = scans / SCANS_PER_SLEEP;
527: if (msecs > MAX_SLEEP_TIME)
528: msecs = MAX_SLEEP_TIME;
529: int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep
530: wait(msecs, nsecs);
531: }
532: } catch (InterruptedException ex) {
533: notify(); // avoid lost notifies on interrupts
534: Thread.currentThread().interrupt();
535: }
536: }
537:
538: /* ------------ Utility methods ------------- */
539:
540: /**
541: * Start or wake up any threads waiting for work
542: **/
543:
544: protected synchronized void signalNewTask() {
545: if (COLLECT_STATS)
546: ++entries;
547: if (nstarted < threads.length)
548: threads[nstarted++].start();
549: else
550: notify();
551: }
552:
553: /**
554: * Create all FJTaskRunner threads in this group.
555: **/
556:
557: protected void initializeThreads() {
558: for (int i = 0; i < threads.length; ++i)
559: threads[i] = new FJTaskRunner(this );
560: }
561:
562: /**
563: * Wrap wait/notify mechanics around a task so that
564: * invoke() can wait it out
565: **/
566: protected static final class InvokableFJTask extends FJTask {
567: protected final Runnable wrapped;
568: protected boolean terminated = false;
569:
570: protected InvokableFJTask(Runnable r) {
571: wrapped = r;
572: }
573:
574: public void run() {
575: try {
576: if (wrapped instanceof FJTask)
577: FJTask.invoke((FJTask) (wrapped));
578: else
579: wrapped.run();
580: } finally {
581: setTerminated();
582: }
583: }
584:
585: protected synchronized void setTerminated() {
586: terminated = true;
587: notifyAll();
588: }
589:
590: protected synchronized void awaitTermination()
591: throws InterruptedException {
592: while (!terminated)
593: wait();
594: }
595: }
596:
597: }
|