001: /*
002: File: Channel.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: 25aug1998 dl added peek
013: */
014:
015: package EDU.oswego.cs.dl.util.concurrent;
016:
017: /**
018: * Main interface for buffers, queues, pipes, conduits, etc.
019: * <p>
020: * A Channel represents anything that you can put items
021: * into and take them out of. As with the Sync
022: * interface, both
023: * blocking (put(x), take),
024: * and timeouts (offer(x, msecs), poll(msecs)) policies
025: * are provided. Using a
026: * zero timeout for offer and poll results in a pure balking policy.
027: * <p>
028: * To aid in efforts to use Channels in a more typesafe manner,
029: * this interface extends Puttable and Takable. You can restrict
030: * arguments of instance variables to this type as a way of
031: * guaranteeing that producers never try to take, or consumers put.
032: * for example:
033: * <pre>
034: * class Producer implements Runnable {
035: * final Puttable chan;
036: * Producer(Puttable channel) { chan = channel; }
037: * public void run() {
038: * try {
039: * for(;;) { chan.put(produce()); }
040: * }
041: * catch (InterruptedException ex) {}
042: * }
043: * Object produce() { ... }
044: * }
045: *
046: *
047: * class Consumer implements Runnable {
048: * final Takable chan;
049: * Consumer(Takable channel) { chan = channel; }
050: * public void run() {
051: * try {
052: * for(;;) { consume(chan.take()); }
053: * }
054: * catch (InterruptedException ex) {}
055: * }
056: * void consume(Object x) { ... }
057: * }
058: *
059: * class Setup {
060: * void main() {
061: * Channel chan = new SomeChannelImplementation();
062: * Producer p = new Producer(chan);
063: * Consumer c = new Consumer(chan);
064: * new Thread(p).start();
065: * new Thread(c).start();
066: * }
067: * }
068: * </pre>
069: * <p>
070: * A given channel implementation might or might not have bounded
071: * capacity or other insertion constraints, so in general, you cannot tell if
072: * a given put will block. However,
073: * Channels that are designed to
074: * have an element capacity (and so always block when full)
075: * should implement the
076: * BoundedChannel
077: * subinterface.
078: * <p>
079: * Channels may hold any kind of item. However,
080: * insertion of null is not in general supported. Implementations
081: * may (all currently do) throw IllegalArgumentExceptions upon attempts to
082: * insert null.
083: * <p>
084: * By design, the Channel interface does not support any methods to determine
085: * the current number of elements being held in the channel.
086: * This decision reflects the fact that in
087: * concurrent programming, such methods are so rarely useful
088: * that including them invites misuse; at best they could
089: * provide a snapshot of current
090: * state, that could change immediately after being reported.
091: * It is better practice to instead use poll and offer to try
092: * to take and put elements without blocking. For example,
093: * to empty out the current contents of a channel, you could write:
094: * <pre>
095: * try {
096: * for (;;) {
097: * Object item = channel.poll(0);
098: * if (item != null)
099: * process(item);
100: * else
101: * break;
102: * }
103: * }
104: * catch(InterruptedException ex) { ... }
105: * </pre>
106: * <p>
107: * However, it is possible to determine whether an item
108: * exists in a Channel via <code>peek</code>, which returns
109: * but does NOT remove the next item that can be taken (or null
110: * if there is no such item). The peek operation has a limited
111: * range of applicability, and must be used with care. Unless it
112: * is known that a given thread is the only possible consumer
113: * of a channel, and that no time-out-based <code>offer</code> operations
114: * are ever invoked, there is no guarantee that the item returned
115: * by peek will be available for a subsequent take.
116: * <p>
117: * When appropriate, you can define an isEmpty method to
118: * return whether <code>peek</code> returns null.
119: * <p>
120: * Also, as a compromise, even though it does not appear in interface,
121: * implementation classes that can readily compute the number
122: * of elements support a <code>size()</code> method. This allows careful
123: * use, for example in queue length monitors, appropriate to the
124: * particular implementation constraints and properties.
125: * <p>
126: * All channels allow multiple producers and/or consumers.
127: * They do not support any kind of <em>close</em> method
128: * to shut down operation or indicate completion of particular
129: * producer or consumer threads.
130: * If you need to signal completion, one way to do it is to
131: * create a class such as
132: * <pre>
133: * class EndOfStream {
134: * // Application-dependent field/methods
135: * }
136: * </pre>
137: * And to have producers put an instance of this class into
138: * the channel when they are done. The consumer side can then
139: * check this via
140: * <pre>
141: * Object x = aChannel.take();
142: * if (x instanceof EndOfStream)
143: * // special actions; perhaps terminate
144: * else
145: * // process normally
146: * </pre>
147: * <p>
148: * In time-out based methods (poll(msecs) and offer(x, msecs),
149: * time bounds are interpreted in
150: * a coarse-grained, best-effort fashion. Since there is no
151: * way in Java to escape out of a wait for a synchronized
152: * method/block, time bounds can sometimes be exceeded when
153: * there is a lot contention for the channel. Additionally,
154: * some Channel semantics entail a ``point of
155: * no return'' where, once some parts of the operation have completed,
156: * others must follow, regardless of time bound.
157: * <p>
158: * Interruptions are in general handled as early as possible
159: * in all methods. Normally, InterruptionExceptions are thrown
160: * in put/take and offer(msec)/poll(msec) if interruption
161: * is detected upon entry to the method, as well as in any
162: * later context surrounding waits.
163: * <p>
164: * If a put returns normally, an offer
165: * returns true, or a put or poll returns non-null, the operation
166: * completed successfully.
167: * In all other cases, the operation fails cleanly -- the
168: * element is not put or taken.
169: * <p>
170: * As with Sync classes, spinloops are not directly supported,
171: * are not particularly recommended for routine use, but are not hard
172: * to construct. For example, here is an exponential backoff version:
173: * <pre>
174: * Object backOffTake(Channel q) throws InterruptedException {
175: * long waitTime = 0;
176: * for (;;) {
177: * Object x = q.poll(0);
178: * if (x != null)
179: * return x;
180: * else {
181: * Thread.sleep(waitTime);
182: * waitTime = 3 * waitTime / 2 + 1;
183: * }
184: * }
185: * </pre>
186: * <p>
187: * <b>Sample Usage</b>. Here is a producer/consumer design
188: * where the channel is used to hold Runnable commands representing
189: * background tasks.
190: * <pre>
191: * class Service {
192: * private final Channel channel = ... some Channel implementation;
193: *
194: * private void backgroundTask(int taskParam) { ... }
195: *
196: * public void action(final int arg) {
197: * Runnable command =
198: * new Runnable() {
199: * public void run() { backgroundTask(arg); }
200: * };
201: * try { channel.put(command) }
202: * catch (InterruptedException ex) {
203: * Thread.currentThread().interrupt(); // ignore but propagate
204: * }
205: * }
206: *
207: * public Service() {
208: * Runnable backgroundLoop =
209: * new Runnable() {
210: * public void run() {
211: * for (;;) {
212: * try {
213: * Runnable task = (Runnable)(channel.take());
214: * task.run();
215: * }
216: * catch (InterruptedException ex) { return; }
217: * }
218: * }
219: * };
220: * new Thread(backgroundLoop).start();
221: * }
222: * }
223: *
224: * </pre>
225: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
226: * @see Sync
227: * @see BoundedChannel
228: **/
229:
230: public interface Channel extends Puttable, Takable {
231:
232: /**
233: * Place item in the channel, possibly waiting indefinitely until
234: * it can be accepted. Channels implementing the BoundedChannel
235: * subinterface are generally guaranteed to block on puts upon
236: * reaching capacity, but other implementations may or may not block.
237: * @param item the element to be inserted. Should be non-null.
238: * @exception InterruptedException if the current thread has
239: * been interrupted at a point at which interruption
240: * is detected, in which case the element is guaranteed not
241: * to be inserted. Otherwise, on normal return, the element is guaranteed
242: * to have been inserted.
243: **/
244: public void put(Object item) throws InterruptedException;
245:
246: /**
247: * Place item in channel only if it can be accepted within
248: * msecs milliseconds. The time bound is interpreted in
249: * a coarse-grained, best-effort fashion.
250: * @param item the element to be inserted. Should be non-null.
251: * @param msecs the number of milliseconds to wait. If less than
252: * or equal to zero, the method does not perform any timed waits,
253: * but might still require
254: * access to a synchronization lock, which can impose unbounded
255: * delay if there is a lot of contention for the channel.
256: * @return true if accepted, else false
257: * @exception InterruptedException if the current thread has
258: * been interrupted at a point at which interruption
259: * is detected, in which case the element is guaranteed not
260: * to be inserted (i.e., is equivalent to a false return).
261: **/
262: public boolean offer(Object item, long msecs)
263: throws InterruptedException;
264:
265: /**
266: * Return and remove an item from channel,
267: * possibly waiting indefinitely until
268: * such an item exists.
269: * @return some item from the channel. Different implementations
270: * may guarantee various properties (such as FIFO) about that item
271: * @exception InterruptedException if the current thread has
272: * been interrupted at a point at which interruption
273: * is detected, in which case state of the channel is unchanged.
274: *
275: **/
276: public Object take() throws InterruptedException;
277:
278: /**
279: * Return and remove an item from channel only if one is available within
280: * msecs milliseconds. The time bound is interpreted in a coarse
281: * grained, best-effort fashion.
282: * @param msecs the number of milliseconds to wait. If less than
283: * or equal to zero, the operation does not perform any timed waits,
284: * but might still require
285: * access to a synchronization lock, which can impose unbounded
286: * delay if there is a lot of contention for the channel.
287: * @return some item, or null if the channel is empty.
288: * @exception InterruptedException if the current thread has
289: * been interrupted at a point at which interruption
290: * is detected, in which case state of the channel is unchanged
291: * (i.e., equivalent to a null return).
292: **/
293:
294: public Object poll(long msecs) throws InterruptedException;
295:
296: /**
297: * Return, but do not remove object at head of Channel,
298: * or null if it is empty.
299: **/
300:
301: public Object peek();
302:
303: }
|