001: /*
002: File: FJTaskRunner.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 7Jan1999 dl First public release
012: 13Jan1999 dl correct a stat counter update;
013: ensure inactive status on run termination;
014: misc minor cleaup
015: 14Jan1999 dl Use random starting point in scan;
016: variable renamings.
017: 18Jan1999 dl Runloop allowed to die on task exception;
018: remove useless timed join
019: 22Jan1999 dl Rework scan to allow use of priorities.
020: 6Feb1999 dl Documentation updates.
021: 7Mar1999 dl Add array-based coInvoke
022: 31Mar1999 dl Revise scan to remove need for NullTasks
023: 27Apr1999 dl Renamed
024: 23oct1999 dl Earlier detect of interrupt in scanWhileIdling
025: 24nov1999 dl Now works on JVMs that do not properly
026: implement read-after-write of 2 volatiles.
027: */
028:
029: package org.logicalcobwebs.concurrent;
030:
031: import java.util.Random;
032:
033: /**
034: * Specialized Thread subclass for running FJTasks.
035: * <p>
036: * Each FJTaskRunner keeps FJTasks in a double-ended queue (DEQ).
037: * Double-ended queues support stack-based operations
038: * push and pop, as well as queue-based operations put and take.
039: * Normally, threads run their own tasks. But they
040: * may also steal tasks from each others DEQs.
041: * <p>
042: * The algorithms are minor variants of those used
043: * in <A href="http://supertech.lcs.mit.edu/cilk/"> Cilk</A> and
044: * <A href="http://www.cs.utexas.edu/users/hood/"> Hood</A>, and
045: * to a lesser extent
046: * <A href="http://www.cs.uga.edu/~dkl/filaments/dist.html"> Filaments</A>,
047: * but are adapted to work in Java.
048: * <p>
049: * The two most important capabilities are:
050: * <ul>
051: * <li> Fork a FJTask:
052: * <pre>
053: * Push task onto DEQ
054: * </pre>
055: * <li> Get a task to run (for example within taskYield)
056: * <pre>
057: * If DEQ is not empty,
058: * Pop a task and run it.
059: * Else if any other DEQ is not empty,
060: * Take ("steal") a task from it and run it.
061: * Else if the entry queue for our group is not empty,
062: * Take a task from it and run it.
063: * Else if current thread is otherwise idling
064: * If all threads are idling
065: * Wait for a task to be put on group entry queue
066: * Else
067: * Yield or Sleep for a while, and then retry
068: * </pre>
069: * </ul>
070: * The push, pop, and put are designed to only ever called by the
071: * current thread, and take (steal) is only ever called by
072: * other threads.
073: * All other operations are composites and variants of these,
074: * plus a few miscellaneous bookkeeping methods.
075: * <p>
076: * Implementations of the underlying representations and operations
077: * are geared for use on JVMs operating on multiple CPUs (although
078: * they should of course work fine on single CPUs as well).
079: * <p>
080: * A possible snapshot of a FJTaskRunner's DEQ is:
081: * <pre>
082: * 0 1 2 3 4 5 6 ...
083: * +-----+-----+-----+-----+-----+-----+-----+--
084: * | | t | t | t | t | | | ... deq array
085: * +-----+-----+-----+-----+-----+-----+-----+--
086: * ^ ^
087: * base top
088: * (incremented (incremented
089: * on take, on push
090: * decremented decremented
091: * on put) on pop)
092: * </pre>
093: * <p>
094: * FJTasks are held in elements of the DEQ.
095: * They are maintained in a bounded array that
096: * works similarly to a circular bounded buffer. To ensure
097: * visibility of stolen FJTasks across threads, the array elements
098: * must be <code>volatile</code>.
099: * Using volatile rather than synchronizing suffices here since
100: * each task accessed by a thread is either one that it
101: * created or one that has never seen before. Thus we cannot
102: * encounter any staleness problems executing run methods,
103: * although FJTask programmers must be still sure to either synch or use
104: * volatile for shared data within their run methods.
105: * <p>
106: * However, since there is no way
107: * to declare an array of volatiles in Java, the DEQ elements actually
108: * hold VolatileTaskRef objects, each of which in turn holds a
109: * volatile reference to a FJTask.
110: * Even with the double-indirection overhead of
111: * volatile refs, using an array for the DEQ works out
112: * better than linking them since fewer shared
113: * memory locations need to be
114: * touched or modified by the threads while using the DEQ.
115: * Further, the double indirection may alleviate cache-line
116: * sharing effects (which cannot otherwise be directly dealt with in Java).
117: * <p>
118: * The indices for the <code>base</code> and <code>top</code> of the DEQ
119: * are declared as volatile. The main contention point with
120: * multiple FJTaskRunner threads occurs when one thread is trying
121: * to pop its own stack while another is trying to steal from it.
122: * This is handled via a specialization of Dekker's algorithm,
123: * in which the popping thread pre-decrements <code>top</code>,
124: * and then checks it against <code>base</code>.
125: * To be conservative in the face of JVMs that only partially
126: * honor the specification for volatile, the pop proceeds
127: * without synchronization only if there are apparently enough
128: * items for both a simultaneous pop and take to succeed.
129: * It otherwise enters a
130: * synchronized lock to check if the DEQ is actually empty,
131: * if so failing. The stealing thread
132: * does almost the opposite, but is set up to be less likely
133: * to win in cases of contention: Steals always run under synchronized
134: * locks in order to avoid conflicts with other ongoing steals.
135: * They pre-increment <code>base</code>, and then check against
136: * <code>top</code>. They back out (resetting the base index
137: * and failing to steal) if the
138: * DEQ is empty or is about to become empty by an ongoing pop.
139: * <p>
140: * A push operation can normally run concurrently with a steal.
141: * A push enters a synch lock only if the DEQ appears full so must
142: * either be resized or have indices adjusted due to wrap-around
143: * of the bounded DEQ. The put operation always requires synchronization.
144: * <p>
145: * When a FJTaskRunner thread has no tasks of its own to run,
146: * it tries to be a good citizen.
147: * Threads run at lower priority while scanning for work.
148: * <p>
149: * If the task is currently waiting
150: * via yield, the thread alternates scans (starting at a randomly
151: * chosen victim) with Thread.yields. This is
152: * well-behaved so long as the JVM handles Thread.yield in a
153: * sensible fashion. (It need not. Thread.yield is so underspecified
154: * that it is legal for a JVM to treat it as a no-op.) This also
155: * keeps things well-behaved even if we are running on a uniprocessor
156: * JVM using a simple cooperative threading model.
157: * <p>
158: * If a thread needing work is
159: * is otherwise idle (which occurs only in the main runloop), and
160: * there are no available tasks to steal or poll, it
161: * instead enters into a sleep-based (actually timed wait(msec))
162: * phase in which it progressively sleeps for longer durations
163: * (up to a maximum of FJTaskRunnerGroup.MAX_SLEEP_TIME,
164: * currently 100ms) between scans.
165: * If all threads in the group
166: * are idling, they further progress to a hard wait phase, suspending
167: * until a new task is entered into the FJTaskRunnerGroup entry queue.
168: * A sleeping FJTaskRunner thread may be awakened by a new
169: * task being put into the group entry queue or by another FJTaskRunner
170: * becoming active, but not merely by some DEQ becoming non-empty.
171: * Thus the MAX_SLEEP_TIME provides a bound for sleep durations
172: * in cases where all but one worker thread start sleeping
173: * even though there will eventually be work produced
174: * by a thread that is taking a long time to place tasks in DEQ.
175: * These sleep mechanics are handled in the FJTaskRunnerGroup class.
176: * <p>
177: * Composite operations such as taskJoin include heavy
178: * manual inlining of the most time-critical operations
179: * (mainly FJTask.invoke).
180: * This opens up a few opportunities for further hand-optimizations.
181: * Until Java compilers get a lot smarter, these tweaks
182: * improve performance significantly enough for task-intensive
183: * programs to be worth the poorer maintainability and code duplication.
184: * <p>
185: * Because they are so fragile and performance-sensitive, nearly
186: * all methods are declared as final. However, nearly all fields
187: * and methods are also declared as protected, so it is possible,
188: * with much care, to extend functionality in subclasses. (Normally
189: * you would also need to subclass FJTaskRunnerGroup.)
190: * <p>
191: * None of the normal java.lang.Thread class methods should ever be called
192: * on FJTaskRunners. For this reason, it might have been nicer to
193: * declare FJTaskRunner as a Runnable to run within a Thread. However,
194: * this would have complicated many minor logistics. And since
195: * no FJTaskRunner methods should normally be called from outside the
196: * FJTask and FJTaskRunnerGroup classes either, this decision doesn't impact
197: * usage.
198: * <p>
199: * You might think that layering this kind of framework on top of
200: * Java threads, which are already several levels removed from raw CPU
201: * scheduling on most systems, would lead to very poor performance.
202: * But on the platforms
203: * tested, the performance is quite good.
204: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
205: * @see FJTask
206: * @see FJTaskRunnerGroup
207: **/
208:
209: public class FJTaskRunner extends Thread {
210:
211: /** The group of which this FJTaskRunner is a member **/
212: protected final FJTaskRunnerGroup group;
213:
214: /**
215: * Constructor called only during FJTaskRunnerGroup initialization
216: **/
217:
218: protected FJTaskRunner(FJTaskRunnerGroup g) {
219: group = g;
220: victimRNG = new Random(System.identityHashCode(this ));
221: runPriority = getPriority();
222: setDaemon(true);
223: }
224:
225: /**
226: * Return the FJTaskRunnerGroup of which this thread is a member
227: **/
228:
229: protected final FJTaskRunnerGroup getGroup() {
230: return group;
231: }
232:
233: /* ------------ DEQ Representation ------------------- */
234:
235: /**
236: * FJTasks are held in an array-based DEQ with INITIAL_CAPACITY
237: * elements. The DEQ is grown if necessary, but default value is
238: * normally much more than sufficient unless there are
239: * user programming errors or questionable operations generating
240: * large numbers of Tasks without running them.
241: * Capacities must be a power of two.
242: **/
243:
244: protected static final int INITIAL_CAPACITY = 4096;
245:
246: /**
247: * The maximum supported DEQ capacity.
248: * When exceeded, FJTaskRunner operations throw Errors
249: **/
250:
251: protected static final int MAX_CAPACITY = 1 << 30;
252:
253: /**
254: * An object holding a single volatile reference to a FJTask.
255: **/
256:
257: protected final static class VolatileTaskRef {
258: /** The reference **/
259: protected volatile FJTask ref;
260:
261: /** Set the reference **/
262: protected final void put(FJTask r) {
263: ref = r;
264: }
265:
266: /** Return the reference **/
267: protected final FJTask get() {
268: return ref;
269: }
270:
271: /** Return the reference and clear it **/
272: protected final FJTask take() {
273: FJTask r = ref;
274: ref = null;
275: return r;
276: }
277:
278: /**
279: * Initialization utility for constructing arrays.
280: * Make an array of given capacity and fill it with
281: * VolatileTaskRefs.
282: **/
283: protected static VolatileTaskRef[] newArray(int cap) {
284: VolatileTaskRef[] a = new VolatileTaskRef[cap];
285: for (int k = 0; k < cap; k++)
286: a[k] = new VolatileTaskRef();
287: return a;
288: }
289:
290: }
291:
292: /**
293: * The DEQ array.
294: **/
295:
296: protected VolatileTaskRef[] deq = VolatileTaskRef
297: .newArray(INITIAL_CAPACITY);
298:
299: /** Current size of the task DEQ **/
300: protected int deqSize() {
301: return deq.length;
302: }
303:
304: /**
305: * Current top of DEQ. Generally acts just like a stack pointer in an
306: * array-based stack, except that it circularly wraps around the
307: * array, as in an array-based queue. The value is NOT
308: * always kept within <code>0 ... deq.length</code> though.
309: * The current top element is always at <code>top & (deq.length-1)</code>.
310: * To avoid integer overflow, top is reset down
311: * within bounds whenever it is noticed to be out out bounds;
312: * at worst when it is at <code>2 * deq.length</code>.
313: **/
314: protected volatile int top = 0;
315:
316: /**
317: * Current base of DEQ. Acts like a take-pointer in an
318: * array-based bounded queue. Same bounds and usage as top.
319: **/
320:
321: protected volatile int base = 0;
322:
323: /**
324: * An extra object to synchronize on in order to
325: * achieve a memory barrier.
326: **/
327:
328: protected final Object barrier = new Object();
329:
330: /* ------------ Other BookKeeping ------------------- */
331:
332: /**
333: * Record whether current thread may be processing a task
334: * (i.e., has been started and is not in an idle wait).
335: * Accessed, under synch, ONLY by FJTaskRunnerGroup, but the field is
336: * stored here for simplicity.
337: **/
338:
339: protected boolean active = false;
340:
341: /** Random starting point generator for scan() **/
342: protected final Random victimRNG;
343:
344: /** Priority to use while scanning for work **/
345: protected int scanPriority = FJTaskRunnerGroup.DEFAULT_SCAN_PRIORITY;
346:
347: /** Priority to use while running tasks **/
348: protected int runPriority;
349:
350: /**
351: * Set the priority to use while scanning.
352: * We do not bother synchronizing access, since
353: * by the time the value is needed, both this FJTaskRunner
354: * and its FJTaskRunnerGroup will
355: * necessarily have performed enough synchronization
356: * to avoid staleness problems of any consequence.
357: **/
358: protected void setScanPriority(int pri) {
359: scanPriority = pri;
360: }
361:
362: /**
363: * Set the priority to use while running tasks.
364: * Same usage and rationale as setScanPriority.
365: **/
366: protected void setRunPriority(int pri) {
367: runPriority = pri;
368: }
369:
370: /**
371: * Compile-time constant for statistics gathering.
372: * Even when set, reported values may not be accurate
373: * since all are read and written without synchronization.
374: **/
375:
376: static final boolean COLLECT_STATS = true;
377: // static final boolean COLLECT_STATS = false;
378:
379: // for stat collection
380:
381: /** Total number of tasks run **/
382: protected int runs = 0;
383:
384: /** Total number of queues scanned for work **/
385: protected int scans = 0;
386:
387: /** Total number of tasks obtained via scan **/
388: protected int steals = 0;
389:
390: /* ------------ DEQ operations ------------------- */
391:
392: /**
393: * Push a task onto DEQ.
394: * Called ONLY by current thread.
395: **/
396:
397: protected final void push(final FJTask r) {
398: int t = top;
399:
400: /*
401: This test catches both overflows and index wraps. It doesn't
402: really matter if base value is in the midst of changing in take.
403: As long as deq length is < 2^30, we are guaranteed to catch wrap in
404: time since base can only be incremented at most length times
405: between pushes (or puts).
406: */
407:
408: if (t < (base & (deq.length - 1)) + deq.length) {
409:
410: deq[t & (deq.length - 1)].put(r);
411: top = t + 1;
412: } else
413: // isolate slow case to increase chances push is inlined
414: slowPush(r); // check overflow and retry
415: }
416:
417: /**
418: * Handle slow case for push
419: **/
420:
421: protected synchronized void slowPush(final FJTask r) {
422: checkOverflow();
423: push(r); // just recurse -- this one is sure to succeed.
424: }
425:
426: /**
427: * Enqueue task at base of DEQ.
428: * Called ONLY by current thread.
429: * This method is currently not called from class FJTask. It could be used
430: * as a faster way to do FJTask.start, but most users would
431: * find the semantics too confusing and unpredictable.
432: **/
433:
434: protected final synchronized void put(final FJTask r) {
435: for (;;) {
436: int b = base - 1;
437: if (top < b + deq.length) {
438:
439: int newBase = b & (deq.length - 1);
440: deq[newBase].put(r);
441: base = newBase;
442:
443: if (b != newBase) { // Adjust for index underflow
444: int newTop = top & (deq.length - 1);
445: if (newTop < newBase)
446: newTop += deq.length;
447: top = newTop;
448: }
449: return;
450: } else {
451: checkOverflow();
452: // ... and retry
453: }
454: }
455: }
456:
457: /**
458: * Return a popped task, or null if DEQ is empty.
459: * Called ONLY by current thread.
460: * <p>
461: * This is not usually called directly but is
462: * instead inlined in callers. This version differs from the
463: * cilk algorithm in that pop does not fully back down and
464: * retry in the case of potential conflict with take. It simply
465: * rechecks under synch lock. This gives a preference
466: * for threads to run their own tasks, which seems to
467: * reduce flailing a bit when there are few tasks to run.
468: **/
469:
470: protected final FJTask pop() {
471: /*
472: Decrement top, to force a contending take to back down.
473: */
474:
475: int t = --top;
476:
477: /*
478: To avoid problems with JVMs that do not properly implement
479: read-after-write of a pair of volatiles, we conservatively
480: grab without lock only if the DEQ appears to have at least two
481: elements, thus guaranteeing that both a pop and take will succeed,
482: even if the pre-increment in take is not seen by current thread.
483: Otherwise we recheck under synch.
484: */
485:
486: if (base + 1 < t)
487: return deq[t & (deq.length - 1)].take();
488: else
489: return confirmPop(t);
490:
491: }
492:
493: /**
494: * Check under synch lock if DEQ is really empty when doing pop.
495: * Return task if not empty, else null.
496: **/
497:
498: protected final synchronized FJTask confirmPop(int provisionalTop) {
499: if (base <= provisionalTop)
500: return deq[provisionalTop & (deq.length - 1)].take();
501: else { // was empty
502: /*
503: Reset DEQ indices to zero whenever it is empty.
504: This both avoids unnecessary calls to checkOverflow
505: in push, and helps keep the DEQ from accumulating garbage
506: */
507:
508: top = base = 0;
509: return null;
510: }
511: }
512:
513: /**
514: * Take a task from the base of the DEQ.
515: * Always called by other threads via scan()
516: **/
517:
518: protected final synchronized FJTask take() {
519:
520: /*
521: Increment base in order to suppress a contending pop
522: */
523:
524: int b = base++;
525:
526: if (b < top)
527: return confirmTake(b);
528: else {
529: // back out
530: base = b;
531: return null;
532: }
533: }
534:
535: /**
536: * double-check a potential take
537: **/
538:
539: protected FJTask confirmTake(int oldBase) {
540:
541: /*
542: Use a second (guaranteed uncontended) synch
543: to serve as a barrier in case JVM does not
544: properly process read-after-write of 2 volatiles
545: */
546:
547: synchronized (barrier) {
548: if (oldBase < top) {
549: /*
550: We cannot call deq[oldBase].take here because of possible races when
551: nulling out versus concurrent push operations. Resulting
552: accumulated garbage is swept out periodically in
553: checkOverflow, or more typically, just by keeping indices
554: zero-based when found to be empty in pop, which keeps active
555: region small and constantly overwritten.
556: */
557:
558: return deq[oldBase & (deq.length - 1)].get();
559: } else {
560: base = oldBase;
561: return null;
562: }
563: }
564: }
565:
566: /**
567: * Adjust top and base, and grow DEQ if necessary.
568: * Called only while DEQ synch lock being held.
569: * We don't expect this to be called very often. In most
570: * programs using FJTasks, it is never called.
571: **/
572:
573: protected void checkOverflow() {
574: int t = top;
575: int b = base;
576:
577: if (t - b < deq.length - 1) { // check if just need an index reset
578:
579: int newBase = b & (deq.length - 1);
580: int newTop = top & (deq.length - 1);
581: if (newTop < newBase)
582: newTop += deq.length;
583: top = newTop;
584: base = newBase;
585:
586: /*
587: Null out refs to stolen tasks.
588: This is the only time we can safely do it.
589: */
590:
591: int i = newBase;
592: while (i != newTop && deq[i].ref != null) {
593: deq[i].ref = null;
594: i = (i - 1) & (deq.length - 1);
595: }
596:
597: } else { // grow by doubling array
598:
599: int newTop = t - b;
600: int oldcap = deq.length;
601: int newcap = oldcap * 2;
602:
603: if (newcap >= MAX_CAPACITY)
604: throw new Error(
605: "FJTask queue maximum capacity exceeded");
606:
607: VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap];
608:
609: // copy in bottom half of new deq with refs from old deq
610: for (int j = 0; j < oldcap; ++j)
611: newdeq[j] = deq[b++ & (oldcap - 1)];
612:
613: // fill top half of new deq with new refs
614: for (int j = oldcap; j < newcap; ++j)
615: newdeq[j] = new VolatileTaskRef();
616:
617: deq = newdeq;
618: base = 0;
619: top = newTop;
620: }
621: }
622:
623: /* ------------ Scheduling ------------------- */
624:
625: /**
626: * Do all but the pop() part of yield or join, by
627: * traversing all DEQs in our group looking for a task to
628: * steal. If none, it checks the entry queue.
629: * <p>
630: * Since there are no good, portable alternatives,
631: * we rely here on a mixture of Thread.yield and priorities
632: * to reduce wasted spinning, even though these are
633: * not well defined. We are hoping here that the JVM
634: * does something sensible.
635: * @param waitingFor if non-null, the current task being joined
636: **/
637:
638: protected void scan(final FJTask waitingFor) {
639:
640: FJTask task = null;
641:
642: // to delay lowering priority until first failure to steal
643: boolean lowered = false;
644:
645: /*
646: Circularly traverse from a random start index.
647:
648: This differs slightly from cilk version that uses a random index
649: for each attempted steal.
650: Exhaustive scanning might impede analytic tractablity of
651: the scheduling policy, but makes it much easier to deal with
652: startup and shutdown.
653: */
654:
655: FJTaskRunner[] ts = group.getArray();
656: int idx = victimRNG.nextInt(ts.length);
657:
658: for (int i = 0; i < ts.length; ++i) {
659:
660: FJTaskRunner t = ts[idx];
661: if (++idx >= ts.length)
662: idx = 0; // circularly traverse
663:
664: if (t != null && t != this ) {
665:
666: if (waitingFor != null && waitingFor.isDone()) {
667: break;
668: } else {
669: if (COLLECT_STATS)
670: ++scans;
671: task = t.take();
672: if (task != null) {
673: if (COLLECT_STATS)
674: ++steals;
675: break;
676: } else if (isInterrupted()) {
677: break;
678: } else if (!lowered) { // if this is first fail, lower priority
679: lowered = true;
680: setPriority(scanPriority);
681: } else { // otherwise we are at low priority; just yield
682: yield();
683: }
684: }
685: }
686:
687: }
688:
689: if (task == null) {
690: if (COLLECT_STATS)
691: ++scans;
692: task = group.pollEntryQueue();
693: if (COLLECT_STATS)
694: if (task != null)
695: ++steals;
696: }
697:
698: if (lowered)
699: setPriority(runPriority);
700:
701: if (task != null && !task.isDone()) {
702: if (COLLECT_STATS)
703: ++runs;
704: task.run();
705: task.setDone();
706: }
707:
708: }
709:
710: /**
711: * Same as scan, but called when current thread is idling.
712: * It repeatedly scans other threads for tasks,
713: * sleeping while none are available.
714: * <p>
715: * This differs from scan mainly in that
716: * since there is no reason to return to recheck any
717: * condition, we iterate until a task is found, backing
718: * off via sleeps if necessary.
719: **/
720:
721: protected void scanWhileIdling() {
722: FJTask task = null;
723:
724: boolean lowered = false;
725: long iters = 0;
726:
727: FJTaskRunner[] ts = group.getArray();
728: int idx = victimRNG.nextInt(ts.length);
729:
730: do {
731: for (int i = 0; i < ts.length; ++i) {
732:
733: FJTaskRunner t = ts[idx];
734: if (++idx >= ts.length)
735: idx = 0; // circularly traverse
736:
737: if (t != null && t != this ) {
738: if (COLLECT_STATS)
739: ++scans;
740:
741: task = t.take();
742: if (task != null) {
743: if (COLLECT_STATS)
744: ++steals;
745: if (lowered)
746: setPriority(runPriority);
747: group.setActive(this );
748: break;
749: }
750: }
751: }
752:
753: if (task == null) {
754: if (isInterrupted())
755: return;
756:
757: if (COLLECT_STATS)
758: ++scans;
759: task = group.pollEntryQueue();
760:
761: if (task != null) {
762: if (COLLECT_STATS)
763: ++steals;
764: if (lowered)
765: setPriority(runPriority);
766: group.setActive(this );
767: } else {
768: ++iters;
769: // Check here for yield vs sleep to avoid entering group synch lock
770: if (iters >= group.SCANS_PER_SLEEP) {
771: group.checkActive(this , iters);
772: if (isInterrupted())
773: return;
774: } else if (!lowered) {
775: lowered = true;
776: setPriority(scanPriority);
777: } else {
778: yield();
779: }
780: }
781: }
782: } while (task == null);
783:
784: if (!task.isDone()) {
785: if (COLLECT_STATS)
786: ++runs;
787: task.run();
788: task.setDone();
789: }
790:
791: }
792:
793: /* ------------ composite operations ------------------- */
794:
795: /**
796: * Main runloop
797: **/
798:
799: public void run() {
800: try {
801: while (!interrupted()) {
802:
803: FJTask task = pop();
804: if (task != null) {
805: if (!task.isDone()) {
806: // inline FJTask.invoke
807: if (COLLECT_STATS)
808: ++runs;
809: task.run();
810: task.setDone();
811: }
812: } else
813: scanWhileIdling();
814: }
815: } finally {
816: group.setInactive(this );
817: }
818: }
819:
820: /**
821: * Execute a task in this thread. Generally called when current task
822: * cannot otherwise continue.
823: **/
824:
825: protected final void taskYield() {
826: FJTask task = pop();
827: if (task != null) {
828: if (!task.isDone()) {
829: if (COLLECT_STATS)
830: ++runs;
831: task.run();
832: task.setDone();
833: }
834: } else
835: scan(null);
836: }
837:
838: /**
839: * Process tasks until w is done.
840: * Equivalent to <code>while(!w.isDone()) taskYield(); </code>
841: **/
842:
843: protected final void taskJoin(final FJTask w) {
844:
845: while (!w.isDone()) {
846:
847: FJTask task = pop();
848: if (task != null) {
849: if (!task.isDone()) {
850: if (COLLECT_STATS)
851: ++runs;
852: task.run();
853: task.setDone();
854: if (task == w)
855: return; // fast exit if we just ran w
856: }
857: } else
858: scan(w);
859: }
860: }
861:
862: /**
863: * A specialized expansion of
864: * <code> w.fork(); invoke(v); w.join(); </code>
865: **/
866:
867: protected final void coInvoke(final FJTask w, final FJTask v) {
868:
869: // inline push
870:
871: int t = top;
872: if (t < (base & (deq.length - 1)) + deq.length) {
873:
874: deq[t & (deq.length - 1)].put(w);
875: top = t + 1;
876:
877: // inline invoke
878:
879: if (!v.isDone()) {
880: if (COLLECT_STATS)
881: ++runs;
882: v.run();
883: v.setDone();
884: }
885:
886: // inline taskJoin
887:
888: while (!w.isDone()) {
889: FJTask task = pop();
890: if (task != null) {
891: if (!task.isDone()) {
892: if (COLLECT_STATS)
893: ++runs;
894: task.run();
895: task.setDone();
896: if (task == w)
897: return; // fast exit if we just ran w
898: }
899: } else
900: scan(w);
901: }
902: } else
903: // handle non-inlinable cases
904: slowCoInvoke(w, v);
905: }
906:
907: /**
908: * Backup to handle noninlinable cases of coInvoke
909: **/
910:
911: protected void slowCoInvoke(final FJTask w, final FJTask v) {
912: push(w); // let push deal with overflow
913: FJTask.invoke(v);
914: taskJoin(w);
915: }
916:
917: /**
918: * Array-based version of coInvoke
919: **/
920:
921: protected final void coInvoke(FJTask[] tasks) {
922: int nforks = tasks.length - 1;
923:
924: // inline bulk push of all but one task
925:
926: int t = top;
927:
928: if (nforks >= 0
929: && t + nforks < (base & (deq.length - 1)) + deq.length) {
930: for (int i = 0; i < nforks; ++i) {
931: deq[t++ & (deq.length - 1)].put(tasks[i]);
932: top = t;
933: }
934:
935: // inline invoke of one task
936: FJTask v = tasks[nforks];
937: if (!v.isDone()) {
938: if (COLLECT_STATS)
939: ++runs;
940: v.run();
941: v.setDone();
942: }
943:
944: // inline taskJoins
945:
946: for (int i = 0; i < nforks; ++i) {
947: FJTask w = tasks[i];
948: while (!w.isDone()) {
949:
950: FJTask task = pop();
951: if (task != null) {
952: if (!task.isDone()) {
953: if (COLLECT_STATS)
954: ++runs;
955: task.run();
956: task.setDone();
957: }
958: } else
959: scan(w);
960: }
961: }
962: } else
963: // handle non-inlinable cases
964: slowCoInvoke(tasks);
965: }
966:
967: /**
968: * Backup to handle atypical or noninlinable cases of coInvoke
969: **/
970:
971: protected void slowCoInvoke(FJTask[] tasks) {
972: for (int i = 0; i < tasks.length; ++i)
973: push(tasks[i]);
974: for (int i = 0; i < tasks.length; ++i)
975: taskJoin(tasks[i]);
976: }
977:
978: }
|