001: /*
002: File: FJTaskRunner.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 7Jan1999 dl First public release
012: 13Jan1999 dl correct a stat counter update;
013: ensure inactive status on run termination;
014: misc minor cleaup
015: 14Jan1999 dl Use random starting point in scan;
016: variable renamings.
017: 18Jan1999 dl Runloop allowed to die on task exception;
018: remove useless timed join
019: 22Jan1999 dl Rework scan to allow use of priorities.
020: 6Feb1999 dl Documentation updates.
021: 7Mar1999 dl Add array-based coInvoke
022: 31Mar1999 dl Revise scan to remove need for NullTasks
023: 27Apr1999 dl Renamed
024: 23oct1999 dl Earlier detect of interrupt in scanWhileIdling
025: 24nov1999 dl Now works on JVMs that do not properly
026: implement read-after-write of 2 volatiles.
027: */
028:
029: package EDU.oswego.cs.dl.util.concurrent;
030:
031: import java.util.Random;
032:
033: /**
034: * Specialized Thread subclass for running FJTasks.
035: * <p>
036: * Each FJTaskRunner keeps FJTasks in a double-ended queue (DEQ).
037: * Double-ended queues support stack-based operations
038: * push and pop, as well as queue-based operations put and take.
039: * Normally, threads run their own tasks. But they
040: * may also steal tasks from each others DEQs.
041: * <p>
042: * The algorithms are minor variants of those used
043: * in <A href="http://supertech.lcs.mit.edu/cilk/"> Cilk</A> and
044: * <A href="http://www.cs.utexas.edu/users/hood/"> Hood</A>, and
045: * to a lesser extent
046: * <A href="http://www.cs.uga.edu/~dkl/filaments/dist.html"> Filaments</A>,
047: * but are adapted to work in Java.
048: * <p>
049: * The two most important capabilities are:
050: * <ul>
051: * <li> Fork a FJTask:
052: * <pre>
053: * Push task onto DEQ
054: * </pre>
055: * <li> Get a task to run (for example within taskYield)
056: * <pre>
057: * If DEQ is not empty,
058: * Pop a task and run it.
059: * Else if any other DEQ is not empty,
060: * Take ("steal") a task from it and run it.
061: * Else if the entry queue for our group is not empty,
062: * Take a task from it and run it.
063: * Else if current thread is otherwise idling
064: * If all threads are idling
065: * Wait for a task to be put on group entry queue
066: * Else
067: * Yield or Sleep for a while, and then retry
068: * </pre>
069: * </ul>
070: * The push, pop, and put are designed to only ever called by the
071: * current thread, and take (steal) is only ever called by
072: * other threads.
073: * All other operations are composites and variants of these,
074: * plus a few miscellaneous bookkeeping methods.
075: * <p>
076: * Implementations of the underlying representations and operations
077: * are geared for use on JVMs operating on multiple CPUs (although
078: * they should of course work fine on single CPUs as well).
079: * <p>
080: * A possible snapshot of a FJTaskRunner's DEQ is:
081: * <pre>
082: * 0 1 2 3 4 5 6 ...
083: * +-----+-----+-----+-----+-----+-----+-----+--
084: * | | t | t | t | t | | | ... deq array
085: * +-----+-----+-----+-----+-----+-----+-----+--
086: * ^ ^
087: * base top
088: * (incremented (incremented
089: * on take, on push
090: * decremented decremented
091: * on put) on pop)
092: * </pre>
093: * <p>
094: * FJTasks are held in elements of the DEQ.
095: * They are maintained in a bounded array that
096: * works similarly to a circular bounded buffer. To ensure
097: * visibility of stolen FJTasks across threads, the array elements
098: * must be <code>volatile</code>.
099: * Using volatile rather than synchronizing suffices here since
100: * each task accessed by a thread is either one that it
101: * created or one that has never seen before. Thus we cannot
102: * encounter any staleness problems executing run methods,
103: * although FJTask programmers must be still sure to either synch or use
104: * volatile for shared data within their run methods.
105: * <p>
106: * However, since there is no way
107: * to declare an array of volatiles in Java, the DEQ elements actually
108: * hold VolatileTaskRef objects, each of which in turn holds a
109: * volatile reference to a FJTask.
110: * Even with the double-indirection overhead of
111: * volatile refs, using an array for the DEQ works out
112: * better than linking them since fewer shared
113: * memory locations need to be
114: * touched or modified by the threads while using the DEQ.
115: * Further, the double indirection may alleviate cache-line
116: * sharing effects (which cannot otherwise be directly dealt with in Java).
117: * <p>
118: * The indices for the <code>base</code> and <code>top</code> of the DEQ
119: * are declared as volatile. The main contention point with
120: * multiple FJTaskRunner threads occurs when one thread is trying
121: * to pop its own stack while another is trying to steal from it.
122: * This is handled via a specialization of Dekker's algorithm,
123: * in which the popping thread pre-decrements <code>top</code>,
124: * and then checks it against <code>base</code>.
125: * To be conservative in the face of JVMs that only partially
126: * honor the specification for volatile, the pop proceeds
127: * without synchronization only if there are apparently enough
128: * items for both a simultaneous pop and take to succeed.
129: * It otherwise enters a
130: * synchronized lock to check if the DEQ is actually empty,
131: * if so failing. The stealing thread
132: * does almost the opposite, but is set up to be less likely
133: * to win in cases of contention: Steals always run under synchronized
134: * locks in order to avoid conflicts with other ongoing steals.
135: * They pre-increment <code>base</code>, and then check against
136: * <code>top</code>. They back out (resetting the base index
137: * and failing to steal) if the
138: * DEQ is empty or is about to become empty by an ongoing pop.
139: * <p>
140: * A push operation can normally run concurrently with a steal.
141: * A push enters a synch lock only if the DEQ appears full so must
142: * either be resized or have indices adjusted due to wrap-around
143: * of the bounded DEQ. The put operation always requires synchronization.
144: * <p>
145: * When a FJTaskRunner thread has no tasks of its own to run,
146: * it tries to be a good citizen.
147: * Threads run at lower priority while scanning for work.
148: * <p>
149: * If the task is currently waiting
150: * via yield, the thread alternates scans (starting at a randomly
151: * chosen victim) with Thread.yields. This is
152: * well-behaved so long as the JVM handles Thread.yield in a
153: * sensible fashion. (It need not. Thread.yield is so underspecified
154: * that it is legal for a JVM to treat it as a no-op.) This also
155: * keeps things well-behaved even if we are running on a uniprocessor
156: * JVM using a simple cooperative threading model.
157: * <p>
158: * If a thread needing work is
159: * is otherwise idle (which occurs only in the main runloop), and
160: * there are no available tasks to steal or poll, it
161: * instead enters into a sleep-based (actually timed wait(msec))
162: * phase in which it progressively sleeps for longer durations
163: * (up to a maximum of FJTaskRunnerGroup.MAX_SLEEP_TIME,
164: * currently 100ms) between scans.
165: * If all threads in the group
166: * are idling, they further progress to a hard wait phase, suspending
167: * until a new task is entered into the FJTaskRunnerGroup entry queue.
168: * A sleeping FJTaskRunner thread may be awakened by a new
169: * task being put into the group entry queue or by another FJTaskRunner
170: * becoming active, but not merely by some DEQ becoming non-empty.
171: * Thus the MAX_SLEEP_TIME provides a bound for sleep durations
172: * in cases where all but one worker thread start sleeping
173: * even though there will eventually be work produced
174: * by a thread that is taking a long time to place tasks in DEQ.
175: * These sleep mechanics are handled in the FJTaskRunnerGroup class.
176: * <p>
177: * Composite operations such as taskJoin include heavy
178: * manual inlining of the most time-critical operations
179: * (mainly FJTask.invoke).
180: * This opens up a few opportunities for further hand-optimizations.
181: * Until Java compilers get a lot smarter, these tweaks
182: * improve performance significantly enough for task-intensive
183: * programs to be worth the poorer maintainability and code duplication.
184: * <p>
185: * Because they are so fragile and performance-sensitive, nearly
186: * all methods are declared as final. However, nearly all fields
187: * and methods are also declared as protected, so it is possible,
188: * with much care, to extend functionality in subclasses. (Normally
189: * you would also need to subclass FJTaskRunnerGroup.)
190: * <p>
191: * None of the normal java.lang.Thread class methods should ever be called
192: * on FJTaskRunners. For this reason, it might have been nicer to
193: * declare FJTaskRunner as a Runnable to run within a Thread. However,
194: * this would have complicated many minor logistics. And since
195: * no FJTaskRunner methods should normally be called from outside the
196: * FJTask and FJTaskRunnerGroup classes either, this decision doesn't impact
197: * usage.
198: * <p>
199: * You might think that layering this kind of framework on top of
200: * Java threads, which are already several levels removed from raw CPU
201: * scheduling on most systems, would lead to very poor performance.
202: * But on the platforms
203: * tested, the performance is quite good.
204: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
205: * @see FJTask
206: * @see FJTaskRunnerGroup
207: **/
208:
209: public class FJTaskRunner extends Thread {
210:
211: /** The group of which this FJTaskRunner is a member **/
212: protected final FJTaskRunnerGroup group;
213:
214: /**
215: * Constructor called only during FJTaskRunnerGroup initialization
216: **/
217:
218: protected FJTaskRunner(FJTaskRunnerGroup g) {
219: group = g;
220: victimRNG = new Random(System.identityHashCode(this ));
221: runPriority = getPriority();
222: setDaemon(true);
223: }
224:
225: /**
226: * Return the FJTaskRunnerGroup of which this thread is a member
227: **/
228:
229: protected final FJTaskRunnerGroup getGroup() {
230: return group;
231: }
232:
233: /* ------------ DEQ Representation ------------------- */
234:
235: /**
236: * FJTasks are held in an array-based DEQ with INITIAL_CAPACITY
237: * elements. The DEQ is grown if necessary, but default value is
238: * normally much more than sufficient unless there are
239: * user programming errors or questionable operations generating
240: * large numbers of Tasks without running them.
241: * Capacities must be a power of two.
242: **/
243:
244: protected static final int INITIAL_CAPACITY = 4096;
245:
246: /**
247: * The maximum supported DEQ capacity.
248: * When exceeded, FJTaskRunner operations throw Errors
249: **/
250:
251: protected static final int MAX_CAPACITY = 1 << 30;
252:
253: /**
254: * An object holding a single volatile reference to a FJTask.
255: **/
256:
257: protected final static class VolatileTaskRef {
258: /** The reference **/
259: protected volatile FJTask ref;
260:
261: /** Set the reference **/
262: protected final void put(FJTask r) {
263: ref = r;
264: }
265:
266: /** Return the reference **/
267: protected final FJTask get() {
268: return ref;
269: }
270:
271: /** Return the reference and clear it **/
272: protected final FJTask take() {
273: FJTask r = ref;
274: ref = null;
275: return r;
276: }
277:
278: /**
279: * Initialization utility for constructing arrays.
280: * Make an array of given capacity and fill it with
281: * VolatileTaskRefs.
282: **/
283: protected static VolatileTaskRef[] newArray(int cap) {
284: VolatileTaskRef[] a = new VolatileTaskRef[cap];
285: for (int k = 0; k < cap; k++)
286: a[k] = new VolatileTaskRef();
287: return a;
288: }
289:
290: }
291:
292: /**
293: * The DEQ array.
294: **/
295:
296: protected VolatileTaskRef[] deq = VolatileTaskRef
297: .newArray(INITIAL_CAPACITY);
298:
299: /** Current size of the task DEQ **/
300: protected int deqSize() {
301: return deq.length;
302: }
303:
304: /**
305: * Current top of DEQ. Generally acts just like a stack pointer in an
306: * array-based stack, except that it circularly wraps around the
307: * array, as in an array-based queue. The value is NOT
308: * always kept within <code>0 ... deq.length</code> though.
309: * The current top element is always at <code>top & (deq.length-1)</code>.
310: * To avoid integer overflow, top is reset down
311: * within bounds whenever it is noticed to be out out bounds;
312: * at worst when it is at <code>2 * deq.length</code>.
313: **/
314: protected volatile int top = 0;
315:
316: /**
317: * Current base of DEQ. Acts like a take-pointer in an
318: * array-based bounded queue. Same bounds and usage as top.
319: **/
320:
321: protected volatile int base = 0;
322:
323: /**
324: * An extra object to synchronize on in order to
325: * achieve a memory barrier.
326: **/
327:
328: protected final Object barrier = new Object();
329:
330: /* ------------ Other BookKeeping ------------------- */
331:
332: /**
333: * Record whether current thread may be processing a task
334: * (i.e., has been started and is not in an idle wait).
335: * Accessed, under synch, ONLY by FJTaskRunnerGroup, but the field is
336: * stored here for simplicity.
337: **/
338:
339: protected boolean active = false;
340:
341: /** Random starting point generator for scan() **/
342: protected final Random victimRNG;
343:
344: /** Priority to use while scanning for work **/
345: protected int scanPriority = FJTaskRunnerGroup.DEFAULT_SCAN_PRIORITY;
346:
347: /** Priority to use while running tasks **/
348: protected int runPriority;
349:
350: /**
351: * Set the priority to use while scanning.
352: * We do not bother synchronizing access, since
353: * by the time the value is needed, both this FJTaskRunner
354: * and its FJTaskRunnerGroup will
355: * necessarily have performed enough synchronization
356: * to avoid staleness problems of any consequence.
357: **/
358: protected void setScanPriority(int pri) {
359: scanPriority = pri;
360: }
361:
362: /**
363: * Set the priority to use while running tasks.
364: * Same usage and rationale as setScanPriority.
365: **/
366: protected void setRunPriority(int pri) {
367: runPriority = pri;
368: }
369:
370: /**
371: * Compile-time constant for statistics gathering.
372: * Even when set, reported values may not be accurate
373: * since all are read and written without synchronization.
374: **/
375:
376: static final boolean COLLECT_STATS = true;
377: // static final boolean COLLECT_STATS = false;
378:
379: // for stat collection
380:
381: /** Total number of tasks run **/
382: protected int runs = 0;
383:
384: /** Total number of queues scanned for work **/
385: protected int scans = 0;
386:
387: /** Total number of tasks obtained via scan **/
388: protected int steals = 0;
389:
390: /* ------------ DEQ operations ------------------- */
391:
392: /**
393: * Push a task onto DEQ.
394: * Called ONLY by current thread.
395: **/
396:
397: protected final void push(final FJTask r) {
398: int t = top;
399:
400: /*
401: This test catches both overflows and index wraps. It doesn't
402: really matter if base value is in the midst of changing in take.
403: As long as deq length is < 2^30, we are guaranteed to catch wrap in
404: time since base can only be incremented at most length times
405: between pushes (or puts).
406: */
407:
408: if (t < (base & (deq.length - 1)) + deq.length) {
409:
410: deq[t & (deq.length - 1)].put(r);
411: top = t + 1;
412: }
413:
414: else
415: // isolate slow case to increase chances push is inlined
416: slowPush(r); // check overflow and retry
417: }
418:
419: /**
420: * Handle slow case for push
421: **/
422:
423: protected synchronized void slowPush(final FJTask r) {
424: checkOverflow();
425: push(r); // just recurse -- this one is sure to succeed.
426: }
427:
428: /**
429: * Enqueue task at base of DEQ.
430: * Called ONLY by current thread.
431: * This method is currently not called from class FJTask. It could be used
432: * as a faster way to do FJTask.start, but most users would
433: * find the semantics too confusing and unpredictable.
434: **/
435:
436: protected final synchronized void put(final FJTask r) {
437: for (;;) {
438: int b = base - 1;
439: if (top < b + deq.length) {
440:
441: int newBase = b & (deq.length - 1);
442: deq[newBase].put(r);
443: base = newBase;
444:
445: if (b != newBase) { // Adjust for index underflow
446: int newTop = top & (deq.length - 1);
447: if (newTop < newBase)
448: newTop += deq.length;
449: top = newTop;
450: }
451: return;
452: } else {
453: checkOverflow();
454: // ... and retry
455: }
456: }
457: }
458:
459: /**
460: * Return a popped task, or null if DEQ is empty.
461: * Called ONLY by current thread.
462: * <p>
463: * This is not usually called directly but is
464: * instead inlined in callers. This version differs from the
465: * cilk algorithm in that pop does not fully back down and
466: * retry in the case of potential conflict with take. It simply
467: * rechecks under synch lock. This gives a preference
468: * for threads to run their own tasks, which seems to
469: * reduce flailing a bit when there are few tasks to run.
470: **/
471:
472: protected final FJTask pop() {
473: /*
474: Decrement top, to force a contending take to back down.
475: */
476:
477: int t = --top;
478:
479: /*
480: To avoid problems with JVMs that do not properly implement
481: read-after-write of a pair of volatiles, we conservatively
482: grab without lock only if the DEQ appears to have at least two
483: elements, thus guaranteeing that both a pop and take will succeed,
484: even if the pre-increment in take is not seen by current thread.
485: Otherwise we recheck under synch.
486: */
487:
488: if (base + 1 < t)
489: return deq[t & (deq.length - 1)].take();
490: else
491: return confirmPop(t);
492:
493: }
494:
495: /**
496: * Check under synch lock if DEQ is really empty when doing pop.
497: * Return task if not empty, else null.
498: **/
499:
500: protected final synchronized FJTask confirmPop(int provisionalTop) {
501: if (base <= provisionalTop)
502: return deq[provisionalTop & (deq.length - 1)].take();
503: else { // was empty
504: /*
505: Reset DEQ indices to zero whenever it is empty.
506: This both avoids unnecessary calls to checkOverflow
507: in push, and helps keep the DEQ from accumulating garbage
508: */
509:
510: top = base = 0;
511: return null;
512: }
513: }
514:
515: /**
516: * Take a task from the base of the DEQ.
517: * Always called by other threads via scan()
518: **/
519:
520: protected final synchronized FJTask take() {
521:
522: /*
523: Increment base in order to suppress a contending pop
524: */
525:
526: int b = base++;
527:
528: if (b < top)
529: return confirmTake(b);
530: else {
531: // back out
532: base = b;
533: return null;
534: }
535: }
536:
537: /**
538: * double-check a potential take
539: **/
540:
541: protected FJTask confirmTake(int oldBase) {
542:
543: /*
544: Use a second (guaranteed uncontended) synch
545: to serve as a barrier in case JVM does not
546: properly process read-after-write of 2 volatiles
547: */
548:
549: synchronized (barrier) {
550: if (oldBase < top) {
551: /*
552: We cannot call deq[oldBase].take here because of possible races when
553: nulling out versus concurrent push operations. Resulting
554: accumulated garbage is swept out periodically in
555: checkOverflow, or more typically, just by keeping indices
556: zero-based when found to be empty in pop, which keeps active
557: region small and constantly overwritten.
558: */
559:
560: return deq[oldBase & (deq.length - 1)].get();
561: } else {
562: base = oldBase;
563: return null;
564: }
565: }
566: }
567:
568: /**
569: * Adjust top and base, and grow DEQ if necessary.
570: * Called only while DEQ synch lock being held.
571: * We don't expect this to be called very often. In most
572: * programs using FJTasks, it is never called.
573: **/
574:
575: protected void checkOverflow() {
576: int t = top;
577: int b = base;
578:
579: if (t - b < deq.length - 1) { // check if just need an index reset
580:
581: int newBase = b & (deq.length - 1);
582: int newTop = top & (deq.length - 1);
583: if (newTop < newBase)
584: newTop += deq.length;
585: top = newTop;
586: base = newBase;
587:
588: /*
589: Null out refs to stolen tasks.
590: This is the only time we can safely do it.
591: */
592:
593: int i = newBase;
594: while (i != newTop && deq[i].ref != null) {
595: deq[i].ref = null;
596: i = (i - 1) & (deq.length - 1);
597: }
598:
599: } else { // grow by doubling array
600:
601: int newTop = t - b;
602: int oldcap = deq.length;
603: int newcap = oldcap * 2;
604:
605: if (newcap >= MAX_CAPACITY)
606: throw new Error(
607: "FJTask queue maximum capacity exceeded");
608:
609: VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap];
610:
611: // copy in bottom half of new deq with refs from old deq
612: for (int j = 0; j < oldcap; ++j)
613: newdeq[j] = deq[b++ & (oldcap - 1)];
614:
615: // fill top half of new deq with new refs
616: for (int j = oldcap; j < newcap; ++j)
617: newdeq[j] = new VolatileTaskRef();
618:
619: deq = newdeq;
620: base = 0;
621: top = newTop;
622: }
623: }
624:
625: /* ------------ Scheduling ------------------- */
626:
627: /**
628: * Do all but the pop() part of yield or join, by
629: * traversing all DEQs in our group looking for a task to
630: * steal. If none, it checks the entry queue.
631: * <p>
632: * Since there are no good, portable alternatives,
633: * we rely here on a mixture of Thread.yield and priorities
634: * to reduce wasted spinning, even though these are
635: * not well defined. We are hoping here that the JVM
636: * does something sensible.
637: * @param waitingFor if non-null, the current task being joined
638: **/
639:
640: protected void scan(final FJTask waitingFor) {
641:
642: FJTask task = null;
643:
644: // to delay lowering priority until first failure to steal
645: boolean lowered = false;
646:
647: /*
648: Circularly traverse from a random start index.
649:
650: This differs slightly from cilk version that uses a random index
651: for each attempted steal.
652: Exhaustive scanning might impede analytic tractablity of
653: the scheduling policy, but makes it much easier to deal with
654: startup and shutdown.
655: */
656:
657: FJTaskRunner[] ts = group.getArray();
658: int idx = victimRNG.nextInt(ts.length);
659:
660: for (int i = 0; i < ts.length; ++i) {
661:
662: FJTaskRunner t = ts[idx];
663: if (++idx >= ts.length)
664: idx = 0; // circularly traverse
665:
666: if (t != null && t != this ) {
667:
668: if (waitingFor != null && waitingFor.isDone()) {
669: break;
670: } else {
671: if (COLLECT_STATS)
672: ++scans;
673: task = t.take();
674: if (task != null) {
675: if (COLLECT_STATS)
676: ++steals;
677: break;
678: } else if (isInterrupted()) {
679: break;
680: } else if (!lowered) { // if this is first fail, lower priority
681: lowered = true;
682: setPriority(scanPriority);
683: } else { // otherwise we are at low priority; just yield
684: yield();
685: }
686: }
687: }
688:
689: }
690:
691: if (task == null) {
692: if (COLLECT_STATS)
693: ++scans;
694: task = group.pollEntryQueue();
695: if (COLLECT_STATS)
696: if (task != null)
697: ++steals;
698: }
699:
700: if (lowered)
701: setPriority(runPriority);
702:
703: if (task != null && !task.isDone()) {
704: if (COLLECT_STATS)
705: ++runs;
706: task.run();
707: task.setDone();
708: }
709:
710: }
711:
712: /**
713: * Same as scan, but called when current thread is idling.
714: * It repeatedly scans other threads for tasks,
715: * sleeping while none are available.
716: * <p>
717: * This differs from scan mainly in that
718: * since there is no reason to return to recheck any
719: * condition, we iterate until a task is found, backing
720: * off via sleeps if necessary.
721: **/
722:
723: protected void scanWhileIdling() {
724: FJTask task = null;
725:
726: boolean lowered = false;
727: long iters = 0;
728:
729: FJTaskRunner[] ts = group.getArray();
730: int idx = victimRNG.nextInt(ts.length);
731:
732: do {
733: for (int i = 0; i < ts.length; ++i) {
734:
735: FJTaskRunner t = ts[idx];
736: if (++idx >= ts.length)
737: idx = 0; // circularly traverse
738:
739: if (t != null && t != this ) {
740: if (COLLECT_STATS)
741: ++scans;
742:
743: task = t.take();
744: if (task != null) {
745: if (COLLECT_STATS)
746: ++steals;
747: if (lowered)
748: setPriority(runPriority);
749: group.setActive(this );
750: break;
751: }
752: }
753: }
754:
755: if (task == null) {
756: if (isInterrupted())
757: return;
758:
759: if (COLLECT_STATS)
760: ++scans;
761: task = group.pollEntryQueue();
762:
763: if (task != null) {
764: if (COLLECT_STATS)
765: ++steals;
766: if (lowered)
767: setPriority(runPriority);
768: group.setActive(this );
769: } else {
770: ++iters;
771: // Check here for yield vs sleep to avoid entering group synch lock
772: if (iters >= group.SCANS_PER_SLEEP) {
773: group.checkActive(this , iters);
774: if (isInterrupted())
775: return;
776: } else if (!lowered) {
777: lowered = true;
778: setPriority(scanPriority);
779: } else {
780: yield();
781: }
782: }
783: }
784: } while (task == null);
785:
786: if (!task.isDone()) {
787: if (COLLECT_STATS)
788: ++runs;
789: task.run();
790: task.setDone();
791: }
792:
793: }
794:
795: /* ------------ composite operations ------------------- */
796:
797: /**
798: * Main runloop
799: **/
800:
801: public void run() {
802: try {
803: while (!interrupted()) {
804:
805: FJTask task = pop();
806: if (task != null) {
807: if (!task.isDone()) {
808: // inline FJTask.invoke
809: if (COLLECT_STATS)
810: ++runs;
811: task.run();
812: task.setDone();
813: }
814: } else
815: scanWhileIdling();
816: }
817: } finally {
818: group.setInactive(this );
819: }
820: }
821:
822: /**
823: * Execute a task in this thread. Generally called when current task
824: * cannot otherwise continue.
825: **/
826:
827: protected final void taskYield() {
828: FJTask task = pop();
829: if (task != null) {
830: if (!task.isDone()) {
831: if (COLLECT_STATS)
832: ++runs;
833: task.run();
834: task.setDone();
835: }
836: } else
837: scan(null);
838: }
839:
840: /**
841: * Process tasks until w is done.
842: * Equivalent to <code>while(!w.isDone()) taskYield(); </code>
843: **/
844:
845: protected final void taskJoin(final FJTask w) {
846:
847: while (!w.isDone()) {
848:
849: FJTask task = pop();
850: if (task != null) {
851: if (!task.isDone()) {
852: if (COLLECT_STATS)
853: ++runs;
854: task.run();
855: task.setDone();
856: if (task == w)
857: return; // fast exit if we just ran w
858: }
859: } else
860: scan(w);
861: }
862: }
863:
864: /**
865: * A specialized expansion of
866: * <code> w.fork(); invoke(v); w.join(); </code>
867: **/
868:
869: protected final void coInvoke(final FJTask w, final FJTask v) {
870:
871: // inline push
872:
873: int t = top;
874: if (t < (base & (deq.length - 1)) + deq.length) {
875:
876: deq[t & (deq.length - 1)].put(w);
877: top = t + 1;
878:
879: // inline invoke
880:
881: if (!v.isDone()) {
882: if (COLLECT_STATS)
883: ++runs;
884: v.run();
885: v.setDone();
886: }
887:
888: // inline taskJoin
889:
890: while (!w.isDone()) {
891: FJTask task = pop();
892: if (task != null) {
893: if (!task.isDone()) {
894: if (COLLECT_STATS)
895: ++runs;
896: task.run();
897: task.setDone();
898: if (task == w)
899: return; // fast exit if we just ran w
900: }
901: } else
902: scan(w);
903: }
904: }
905:
906: else
907: // handle non-inlinable cases
908: slowCoInvoke(w, v);
909: }
910:
911: /**
912: * Backup to handle noninlinable cases of coInvoke
913: **/
914:
915: protected void slowCoInvoke(final FJTask w, final FJTask v) {
916: push(w); // let push deal with overflow
917: FJTask.invoke(v);
918: taskJoin(w);
919: }
920:
921: /**
922: * Array-based version of coInvoke
923: **/
924:
925: protected final void coInvoke(FJTask[] tasks) {
926: int nforks = tasks.length - 1;
927:
928: // inline bulk push of all but one task
929:
930: int t = top;
931:
932: if (nforks >= 0
933: && t + nforks < (base & (deq.length - 1)) + deq.length) {
934: for (int i = 0; i < nforks; ++i) {
935: deq[t++ & (deq.length - 1)].put(tasks[i]);
936: top = t;
937: }
938:
939: // inline invoke of one task
940: FJTask v = tasks[nforks];
941: if (!v.isDone()) {
942: if (COLLECT_STATS)
943: ++runs;
944: v.run();
945: v.setDone();
946: }
947:
948: // inline taskJoins
949:
950: for (int i = 0; i < nforks; ++i) {
951: FJTask w = tasks[i];
952: while (!w.isDone()) {
953:
954: FJTask task = pop();
955: if (task != null) {
956: if (!task.isDone()) {
957: if (COLLECT_STATS)
958: ++runs;
959: task.run();
960: task.setDone();
961: }
962: } else
963: scan(w);
964: }
965: }
966: }
967:
968: else
969: // handle non-inlinable cases
970: slowCoInvoke(tasks);
971: }
972:
973: /**
974: * Backup to handle atypical or noninlinable cases of coInvoke
975: **/
976:
977: protected void slowCoInvoke(FJTask[] tasks) {
978: for (int i = 0; i < tasks.length; ++i)
979: push(tasks[i]);
980: for (int i = 0; i < tasks.length; ++i)
981: taskJoin(tasks[i]);
982: }
983:
984: }
|