Main interface for buffers, queues, pipes, conduits, etc.
A Channel represents anything that you can put items
into and take them out of. As with the Sync
interface, both
blocking (put(x), take),
and timeouts (offer(x, msecs), poll(msecs)) policies
are provided. Using a
zero timeout for offer and poll results in a pure balking policy.
To aid in efforts to use Channels in a more typesafe manner,
this interface extends Puttable and Takable. You can restrict
arguments of instance variables to this type as a way of
guaranteeing that producers never try to take, or consumers put.
for example:
class Producer implements Runnable {
final Puttable chan;
Producer(Puttable channel) { chan = channel; }
public void run() {
try {
for(;;) { chan.put(produce()); }
}
catch (InterruptedException ex) {}
}
Object produce() { ... }
}
class Consumer implements Runnable {
final Takable chan;
Consumer(Takable channel) { chan = channel; }
public void run() {
try {
for(;;) { consume(chan.take()); }
}
catch (InterruptedException ex) {}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
Channel chan = new SomeChannelImplementation();
Producer p = new Producer(chan);
Consumer c = new Consumer(chan);
new Thread(p).start();
new Thread(c).start();
}
}
A given channel implementation might or might not have bounded
capacity or other insertion constraints, so in general, you cannot tell if
a given put will block. However,
Channels that are designed to
have an element capacity (and so always block when full)
should implement the
BoundedChannel
subinterface.
Channels may hold any kind of item. However,
insertion of null is not in general supported. Implementations
may (all currently do) throw IllegalArgumentExceptions upon attempts to
insert null.
By design, the Channel interface does not support any methods to determine
the current number of elements being held in the channel.
This decision reflects the fact that in
concurrent programming, such methods are so rarely useful
that including them invites misuse; at best they could
provide a snapshot of current
state, that could change immediately after being reported.
It is better practice to instead use poll and offer to try
to take and put elements without blocking. For example,
to empty out the current contents of a channel, you could write:
try {
for (;;) {
Object item = channel.poll(0);
if (item != null)
process(item);
else
break;
}
}
catch(InterruptedException ex) { ... }
However, it is possible to determine whether an item
exists in a Channel via peek , which returns
but does NOT remove the next item that can be taken (or null
if there is no such item). The peek operation has a limited
range of applicability, and must be used with care. Unless it
is known that a given thread is the only possible consumer
of a channel, and that no time-out-based offer operations
are ever invoked, there is no guarantee that the item returned
by peek will be available for a subsequent take.
When appropriate, you can define an isEmpty method to
return whether peek returns null.
Also, as a compromise, even though it does not appear in interface,
implementation classes that can readily compute the number
of elements support a size() method. This allows careful
use, for example in queue length monitors, appropriate to the
particular implementation constraints and properties.
All channels allow multiple producers and/or consumers.
They do not support any kind of close method
to shut down operation or indicate completion of particular
producer or consumer threads.
If you need to signal completion, one way to do it is to
create a class such as
class EndOfStream {
// Application-dependent field/methods
}
And to have producers put an instance of this class into
the channel when they are done. The consumer side can then
check this via
Object x = aChannel.take();
if (x instanceof EndOfStream)
// special actions; perhaps terminate
else
// process normally
In time-out based methods (poll(msecs) and offer(x, msecs),
time bounds are interpreted in
a coarse-grained, best-effort fashion. Since there is no
way in Java to escape out of a wait for a synchronized
method/block, time bounds can sometimes be exceeded when
there is a lot contention for the channel. Additionally,
some Channel semantics entail a ``point of
no return'' where, once some parts of the operation have completed,
others must follow, regardless of time bound.
Interruptions are in general handled as early as possible
in all methods. Normally, InterruptionExceptions are thrown
in put/take and offer(msec)/poll(msec) if interruption
is detected upon entry to the method, as well as in any
later context surrounding waits.
If a put returns normally, an offer
returns true, or a put or poll returns non-null, the operation
completed successfully.
In all other cases, the operation fails cleanly -- the
element is not put or taken.
As with Sync classes, spinloops are not directly supported,
are not particularly recommended for routine use, but are not hard
to construct. For example, here is an exponential backoff version:
Object backOffTake(Channel q) throws InterruptedException {
long waitTime = 0;
for (;;) {
Object x = q.poll(0);
if (x != null)
return x;
else {
Thread.sleep(waitTime);
waitTime = 3 * waitTime / 2 + 1;
}
}
Sample Usage. Here is a producer/consumer design
where the channel is used to hold Runnable commands representing
background tasks.
class Service {
private final Channel channel = ... some Channel implementation;
private void backgroundTask(int taskParam) { ... }
public void action(final int arg) {
Runnable command =
new Runnable() {
public void run() { backgroundTask(arg); }
};
try { channel.put(command) }
catch (InterruptedException ex) {
Thread.currentThread().interrupt(); // ignore but propagate
}
}
public Service() {
Runnable backgroundLoop =
new Runnable() {
public void run() {
for (;;) {
try {
Runnable task = (Runnable)(channel.take());
task.run();
}
catch (InterruptedException ex) { return; }
}
}
};
new Thread(backgroundLoop).start();
}
}
[ Introduction to this package. ]
See Also: Sync See Also: BoundedChannel |