| java.lang.Object EDU.oswego.cs.dl.util.concurrent.FJTaskRunnerGroup
FJTaskRunnerGroup | public class FJTaskRunnerGroup implements Executor(Code) | | A stripped down analog of a ThreadGroup used for
establishing and managing FJTaskRunner threads.
ThreadRunnerGroups serve as the control boundary separating
the general world of normal threads from the specialized world
of FJTasks.
By intent, this class does not subclass java.lang.ThreadGroup, and
does not support most methods found in ThreadGroups, since they
would make no sense for FJTaskRunner threads. In fact, the class
does not deal with ThreadGroups at all. If you want to restrict
a FJTaskRunnerGroup to a particular ThreadGroup, you can create
it from within that ThreadGroup.
The main contextual parameter for a FJTaskRunnerGroup is
the group size, established in the constructor.
Groups must be of a fixed size.
There is no way to dynamically increase or decrease the number
of threads in an existing group.
In general, the group size should be equal to the number
of CPUs on the system. (Unfortunately, there is no portable
means of automatically detecting the number of CPUs on a JVM, so there is
no good way to automate defaults.) In principle, when
FJTasks are used for computation-intensive tasks, having only
as many threads as CPUs should minimize bookkeeping overhead
and contention, and so maximize throughput. However, because
FJTaskRunners lie atop Java threads, and in turn operating system
thread support and scheduling policies,
it is very possible that using more threads
than CPUs will improve overall throughput even though it adds
to overhead. This will always be so if FJTasks are I/O bound.
So it may pay to experiment a bit when tuning on particular platforms.
You can also use setRunPriorities to either
increase or decrease the priorities of active threads, which
may interact with group size choice.
In any case, overestimating group sizes never
seriously degrades performance (at least within reasonable bounds).
You can also use a value
less than the number of CPUs in order to reserve processing
for unrelated threads.
There are two general styles for using a FJTaskRunnerGroup.
You can create one group per entire program execution, for example
as a static singleton, and use it for all parallel tasks:
class Tasks {
static FJTaskRunnerGroup group;
public void initialize(int groupsize) {
group = new FJTaskRunnerGroup(groupSize);
}
// ...
}
Alternatively, you can make new groups on the fly and use them only for
particular task sets. This is more flexible,,
and leads to more controllable and deterministic execution patterns,
but it encounters greater overhead on startup. Also, to reclaim
system resources, you should
call FJTaskRunnerGroup.interruptAll when you are done
using one-shot groups. Otherwise, because FJTaskRunners set
Thread.isDaemon
status, they will not normally be reclaimed until program termination.
The main supported methods are execute ,
which starts a task processed by FJTaskRunner threads,
and invoke , which starts one and waits for completion.
For example, you might extend the above FJTasks
class to support a task-based computation, say, the
Fib class from the FJTask documentation:
class Tasks { // continued
// ...
static int fib(int n) {
try {
Fib f = new Fib(n);
group.invoke(f);
return f.getAnswer();
}
catch (InterruptedException ex) {
throw new Error("Interrupted during computation");
}
}
}
Method stats() can be used to monitor performance.
Both FJTaskRunnerGroup and FJTaskRunner may be compiled with
the compile-time constant COLLECT_STATS set to false. In this
case, various simple counts reported in stats() are not collected.
On platforms tested,
this leads to such a tiny performance improvement that there is
very little motivation to bother.
[ Introduction to this package. ]
See Also: FJTask See Also: FJTaskRunner |
Inner Class :final protected static class InvokableFJTask extends FJTask | |
Field Summary | |
final static boolean | COLLECT_STATS Compile-time constant. | final static int | DEFAULT_SCAN_PRIORITY | final static long | MAX_SLEEP_TIME The maximum time (in msecs) to sleep when a thread is idle,
yet others are not, so may eventually generate work that
the current thread can steal. | final static long | SCANS_PER_SLEEP The number of times to scan other threads for tasks
before transitioning to a mode where scans are
interleaved with sleeps (actually timed waits).
Upon transition, sleeps are for duration of
scans / SCANS_PER_SLEEP milliseconds.
This is not treated as a user-tunable parameter because
good values do not appear to vary much across JVMs or
applications. | protected int | activeCount | int | entries | final protected LinkedQueue | entryQueue | long | initTime | protected int | nstarted Number of threads that have been started. | final protected FJTaskRunner[] | threads |
Constructor Summary | |
public | FJTaskRunnerGroup(int groupSize) Create a FJTaskRunnerGroup with the indicated number
of FJTaskRunner threads. |
Method Summary | |
protected synchronized void | checkActive(FJTaskRunner t, long scans) Set active status of thread t to false, and
then wait until: (a) there is a task in the entry
queue, or (b) other threads are active, or (c) the current
thread is interrupted. | public void | execute(Runnable r) Arrange for execution of the given task
by placing it in a work queue. | public void | executeTask(FJTask t) | protected synchronized boolean | getActive(FJTaskRunner t) Return active status of t. | public synchronized int | getActiveCount() Return the number of threads that are not idly waiting for work. | protected FJTaskRunner[] | getArray() Return the array of threads in this group. | protected void | initializeThreads() Create all FJTaskRunner threads in this group. | public void | interruptAll() Try to shut down all FJTaskRunner threads in this group
by interrupting them all. | public void | invoke(Runnable r) Start a task and wait it out. | protected FJTask | pollEntryQueue() Return a task from entry queue, or null if empty. | protected synchronized void | setActive(FJTaskRunner t) Set active status of thread t to true, and notify others
that might be waiting for work. | protected synchronized void | setInactive(FJTaskRunner t) Set active status of thread t to false. | public synchronized void | setRunPriorities(int pri) Set the priority to use while a FJTaskRunner is
actively running tasks. | public synchronized void | setScanPriorities(int pri) Set the priority to use while a FJTaskRunner is
polling for new tasks to perform. | protected synchronized void | signalNewTask() | public int | size() | public void | stats() Prints various snapshot statistics to System.out.
- For each FJTaskRunner thread (labeled as Tn, for
n from zero to group size - 1):
- A star "*" is printed if the thread is currently active;
that is, not sleeping while waiting for work.
|
COLLECT_STATS | final static boolean COLLECT_STATS(Code) | | Compile-time constant. If true, various counts of
runs, waits, etc., are maintained. These are NOT
updated with synchronization, so statistics reports
might not be accurate.
|
DEFAULT_SCAN_PRIORITY | final static int DEFAULT_SCAN_PRIORITY(Code) | | |
MAX_SLEEP_TIME | final static long MAX_SLEEP_TIME(Code) | | The maximum time (in msecs) to sleep when a thread is idle,
yet others are not, so may eventually generate work that
the current thread can steal. This value reflects the maximum time
that a thread may sleep when it possibly should not, because there
are other active threads that might generate work. In practice,
designs in which some threads become stalled because others
are running yet not generating tasks are not likely to work
well in this framework anyway, so the exact value does not matter
too much. However, keeping it in the sub-second range does
help smooth out startup and shutdown effects.
|
SCANS_PER_SLEEP | final static long SCANS_PER_SLEEP(Code) | | The number of times to scan other threads for tasks
before transitioning to a mode where scans are
interleaved with sleeps (actually timed waits).
Upon transition, sleeps are for duration of
scans / SCANS_PER_SLEEP milliseconds.
This is not treated as a user-tunable parameter because
good values do not appear to vary much across JVMs or
applications. Its main role is to help avoid some
useless spinning and contention during task startup.
|
activeCount | protected int activeCount(Code) | | Number of threads that are not waiting for work *
|
entries | int entries(Code) | | Total number of executes or invokes *
|
entryQueue | final protected LinkedQueue entryQueue(Code) | | Group-wide queue for tasks entered via execute() *
|
initTime | long initTime(Code) | | The time at which this ThreadRunnerGroup was constructed *
|
nstarted | protected int nstarted(Code) | | Number of threads that have been started. Used to avoid
unecessary contention during startup of task sets.
|
FJTaskRunnerGroup | public FJTaskRunnerGroup(int groupSize)(Code) | | Create a FJTaskRunnerGroup with the indicated number
of FJTaskRunner threads. Normally, the best size to use is
the number of CPUs on the system.
The threads in a FJTaskRunnerGroup are created with their
isDaemon status set, so do not normally need to be
shut down manually upon program termination.
|
checkActive | protected synchronized void checkActive(FJTaskRunner t, long scans)(Code) | | Set active status of thread t to false, and
then wait until: (a) there is a task in the entry
queue, or (b) other threads are active, or (c) the current
thread is interrupted. Upon return, it
is not certain that there will be work available.
The thread must itself check.
The main underlying reason
for these mechanics is that threads do not
signal each other when they add elements to their queues.
(This would add to task overhead, reduce locality.
and increase contention.)
So we must rely on a tamed form of polling. However, tasks
inserted into the entry queue do result in signals, so
tasks can wait on these if all of them are otherwise idle.
|
execute | public void execute(Runnable r) throws InterruptedException(Code) | | Arrange for execution of the given task
by placing it in a work queue. If the argument
is not of type FJTask, it is embedded in a FJTask via
FJTask.Wrap .
exception: InterruptedException - if current Thread iscurrently interrupted |
executeTask | public void executeTask(FJTask t)(Code) | | Specialized form of execute called only from within FJTasks
|
getActive | protected synchronized boolean getActive(FJTaskRunner t)(Code) | | Return active status of t.
Per-thread active status can only be accessed and
modified via synchronized method here in the group class.
|
getActiveCount | public synchronized int getActiveCount()(Code) | | Return the number of threads that are not idly waiting for work.
Beware that even active threads might not be doing any useful
work, but just spinning waiting for other dependent tasks.
Also, since this is just a snapshot value, some tasks
may be in the process of becoming idle.
|
getArray | protected FJTaskRunner[] getArray()(Code) | | Return the array of threads in this group.
Called only by FJTaskRunner.scan().
|
initializeThreads | protected void initializeThreads()(Code) | | Create all FJTaskRunner threads in this group.
|
interruptAll | public void interruptAll()(Code) | | Try to shut down all FJTaskRunner threads in this group
by interrupting them all. This method is designed
to be used during cleanup when it is somehow known
that all threads are idle.
FJTaskRunners only
check for interruption when they are not otherwise
processing a task (and its generated subtasks,
if any), so if any threads are active, shutdown may
take a while, and may lead to unpredictable
task processing.
|
pollEntryQueue | protected FJTask pollEntryQueue()(Code) | | Return a task from entry queue, or null if empty.
Called only by FJTaskRunner.scan().
|
setActive | protected synchronized void setActive(FJTaskRunner t)(Code) | | Set active status of thread t to true, and notify others
that might be waiting for work.
|
setInactive | protected synchronized void setInactive(FJTaskRunner t)(Code) | | Set active status of thread t to false.
|
setRunPriorities | public synchronized void setRunPriorities(int pri)(Code) | | Set the priority to use while a FJTaskRunner is
actively running tasks. Default
is the priority that was in effect by the thread that
constructed this FJTaskRunnerGroup. Setting this value
while threads are running may momentarily result in
them running at this priority even when idly waiting for work.
|
setScanPriorities | public synchronized void setScanPriorities(int pri)(Code) | | Set the priority to use while a FJTaskRunner is
polling for new tasks to perform. Default
is currently Thread.MIN_PRIORITY+1. The value
set may not go into effect immediately, but
will be used at least the next time a thread scans for work.
|
signalNewTask | protected synchronized void signalNewTask()(Code) | | Start or wake up any threads waiting for work
|
size | public int size()(Code) | | Return the number of FJTaskRunner threads in this group *
|
stats | public void stats()(Code) | | Prints various snapshot statistics to System.out.
- For each FJTaskRunner thread (labeled as Tn, for
n from zero to group size - 1):
- A star "*" is printed if the thread is currently active;
that is, not sleeping while waiting for work. Because
threads gradually enter sleep modes, an active thread
may in fact be about to sleep (or wake up).
- Q Cap The current capacity of its task queue.
- Run The total number of tasks that have been run.
- New The number of these tasks that were
taken from either the entry queue or from other
thread queues; that is, the number of tasks run
that were not forked by the thread itself.
- Scan The number of times other task
queues or the entry queue were polled for tasks.
- Execute The total number of tasks entered
(but not necessarily yet run) via execute or invoke.
- Time Time in seconds since construction of this
FJTaskRunnerGroup.
- Rate The total number of tasks processed
per second across all threads. This
may be useful as a simple throughput indicator
if all processed tasks take approximately the
same time to run.
Cautions: Some statistics are updated and gathered
without synchronization,
so may not be accurate. However, reported counts may be considered
as lower bounds of actual values.
Some values may be zero if classes are compiled
with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup
classes can be independently compiled with different values of
COLLECT_STATS.) Also, the counts are maintained as ints so could
overflow in exceptionally long-lived applications.
These statistics can be useful when tuning algorithms or diagnosing
problems. For example:
- High numbers of scans may mean that there is insufficient
parallelism to keep threads busy. However, high scan rates
are expected if the number
of Executes is also high or there is a lot of global
synchronization in the application, and the system is not otherwise
busy. Threads may scan
for work hundreds of times upon startup, shutdown, and
global synch points of task sets.
- Large imbalances in tasks run across different threads might
just reflect contention with unrelated threads on a system
(possibly including JVM threads such as GC), but may also
indicate some systematic bias in how you generate tasks.
- Large task queue capacities may mean that too many tasks are being
generated before they can be run.
Capacities are reported rather than current numbers of tasks
in queues because they are better indicators of the existence
of these kinds of possibly-transient problems.
Queue capacities are
resized on demand from their initial value of 4096 elements,
which is much more than sufficient for the kinds of
applications that this framework is intended to best support.
|
|
|