001: /*
002: File: SynchronousChannel.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: 17Jul1998 dl Disabled direct semaphore permit check
013: 31Jul1998 dl Replaced main algorithm with one with
014: better scaling and fairness properties.
015: 25aug1998 dl added peek
016: 24Nov2001 dl Replaced main algorithm with faster one.
017: */
018:
019: package org.dbunit.util.concurrent;
020:
021: import org.slf4j.Logger;
022: import org.slf4j.LoggerFactory;
023:
024: /**
025: * A rendezvous channel, similar to those used in CSP and Ada. Each
026: * put must wait for a take, and vice versa. Synchronous channels
027: * are well suited for handoff designs, in which an object running in
028: * one thread must synch up with an object running in another thread
029: * in order to hand it some information, event, or task.
030: * <p> If you only need threads to synch up without
031: * exchanging information, consider using a Barrier. If you need
032: * bidirectional exchanges, consider using a Rendezvous. <p>
033: *
034: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
035: * @see CyclicBarrier
036: * @see Rendezvous
037: **/
038:
039: public class SynchronousChannel implements BoundedChannel {
040:
041: /**
042: * Logger for this class
043: */
044: private static final Logger logger = LoggerFactory
045: .getLogger(SynchronousChannel.class);
046:
047: /*
048: This implementation divides actions into two cases for puts:
049:
050: * An arriving putter that does not already have a waiting taker
051: creates a node holding item, and then waits for a taker to take it.
052: * An arriving putter that does already have a waiting taker fills
053: the slot node created by the taker, and notifies it to continue.
054:
055: And symmetrically, two for takes:
056:
057: * An arriving taker that does not already have a waiting putter
058: creates an empty slot node, and then waits for a putter to fill it.
059: * An arriving taker that does already have a waiting putter takes
060: item from the node created by the putter, and notifies it to continue.
061:
062: This requires keeping two simple queues: waitingPuts and waitingTakes.
063:
064: When a put or take waiting for the actions of its counterpart
065: aborts due to interruption or timeout, it marks the node
066: it created as "CANCELLED", which causes its counterpart to retry
067: the entire put or take sequence.
068: */
069:
070: /**
071: * Special marker used in queue nodes to indicate that
072: * the thread waiting for a change in the node has timed out
073: * or been interrupted.
074: **/
075: protected static final Object CANCELLED = new Object();
076:
077: /**
078: * Simple FIFO queue class to hold waiting puts/takes.
079: **/
080: protected static class Queue {
081:
082: /**
083: * Logger for this class
084: */
085: private static final Logger logger = LoggerFactory
086: .getLogger(Queue.class);
087:
088: protected LinkedNode head;
089: protected LinkedNode last;
090:
091: protected void enq(LinkedNode p) {
092: logger.debug("enq(p=" + p + ") - start");
093:
094: if (last == null)
095: last = head = p;
096: else
097: last = last.next = p;
098: }
099:
100: protected LinkedNode deq() {
101: logger.debug("deq() - start");
102:
103: LinkedNode p = head;
104: if (p != null && (head = p.next) == null)
105: last = null;
106: return p;
107: }
108: }
109:
110: protected final Queue waitingPuts = new Queue();
111: protected final Queue waitingTakes = new Queue();
112:
113: /**
114: * @return zero --
115: * Synchronous channels have no internal capacity.
116: **/
117: public int capacity() {
118: logger.debug("capacity() - start");
119: return 0;
120: }
121:
122: /**
123: * @return null --
124: * Synchronous channels do not hold contents unless actively taken
125: **/
126: public Object peek() {
127: logger.debug("peek() - start");
128: return null;
129: }
130:
131: public void put(Object x) throws InterruptedException {
132: logger.debug("put(x=" + x + ") - start");
133:
134: if (x == null)
135: throw new IllegalArgumentException();
136:
137: // This code is conceptually straightforward, but messy
138: // because we need to intertwine handling of put-arrives first
139: // vs take-arrives first cases.
140:
141: // Outer loop is to handle retry due to cancelled waiting taker
142: for (;;) {
143:
144: // Get out now if we are interrupted
145: if (Thread.interrupted())
146: throw new InterruptedException();
147:
148: // Exactly one of item or slot will be nonnull at end of
149: // synchronized block, depending on whether a put or a take
150: // arrived first.
151: LinkedNode slot;
152: LinkedNode item = null;
153:
154: synchronized (this ) {
155: // Try to match up with a waiting taker; fill and signal it below
156: slot = waitingTakes.deq();
157:
158: // If no takers yet, create a node and wait below
159: if (slot == null)
160: waitingPuts.enq(item = new LinkedNode(x));
161: }
162:
163: if (slot != null) { // There is a waiting taker.
164: // Fill in the slot created by the taker and signal taker to
165: // continue.
166: synchronized (slot) {
167: if (slot.value != CANCELLED) {
168: slot.value = x;
169: slot.notify();
170: return;
171: }
172: // else the taker has cancelled, so retry outer loop
173: }
174: }
175:
176: else {
177: // Wait for a taker to arrive and take the item.
178: synchronized (item) {
179: try {
180: while (item.value != null)
181: item.wait();
182: return;
183: } catch (InterruptedException ie) {
184: logger.error("put()", ie);
185:
186: // If item was taken, return normally but set interrupt status
187: if (item.value == null) {
188: Thread.currentThread().interrupt();
189: return;
190: } else {
191: item.value = CANCELLED;
192: throw ie;
193: }
194: }
195: }
196: }
197: }
198: }
199:
200: public Object take() throws InterruptedException {
201: logger.debug("take() - start");
202:
203: // Entirely symmetric to put()
204:
205: for (;;) {
206: if (Thread.interrupted())
207: throw new InterruptedException();
208:
209: LinkedNode item;
210: LinkedNode slot = null;
211:
212: synchronized (this ) {
213: item = waitingPuts.deq();
214: if (item == null)
215: waitingTakes.enq(slot = new LinkedNode());
216: }
217:
218: if (item != null) {
219: synchronized (item) {
220: Object x = item.value;
221: if (x != CANCELLED) {
222: item.value = null;
223: item.next = null;
224: item.notify();
225: return x;
226: }
227: }
228: }
229:
230: else {
231: synchronized (slot) {
232: try {
233: for (;;) {
234: Object x = slot.value;
235: if (x != null) {
236: slot.value = null;
237: slot.next = null;
238: return x;
239: } else
240: slot.wait();
241: }
242: } catch (InterruptedException ie) {
243: logger.error("take()", ie);
244:
245: Object x = slot.value;
246: if (x != null) {
247: slot.value = null;
248: slot.next = null;
249: Thread.currentThread().interrupt();
250: return x;
251: } else {
252: slot.value = CANCELLED;
253: throw ie;
254: }
255: }
256: }
257: }
258: }
259: }
260:
261: /*
262: Offer and poll are just like put and take, except even messier.
263: */
264:
265: public boolean offer(Object x, long msecs)
266: throws InterruptedException {
267: logger.debug("offer(x=" + x + ", msecs=" + msecs + ") - start");
268:
269: if (x == null)
270: throw new IllegalArgumentException();
271: long waitTime = msecs;
272: long startTime = 0; // lazily initialize below if needed
273:
274: for (;;) {
275: if (Thread.interrupted())
276: throw new InterruptedException();
277:
278: LinkedNode slot;
279: LinkedNode item = null;
280:
281: synchronized (this ) {
282: slot = waitingTakes.deq();
283: if (slot == null) {
284: if (waitTime <= 0)
285: return false;
286: else
287: waitingPuts.enq(item = new LinkedNode(x));
288: }
289: }
290:
291: if (slot != null) {
292: synchronized (slot) {
293: if (slot.value != CANCELLED) {
294: slot.value = x;
295: slot.notify();
296: return true;
297: }
298: }
299: }
300:
301: long now = System.currentTimeMillis();
302: if (startTime == 0)
303: startTime = now;
304: else
305: waitTime = msecs - (now - startTime);
306:
307: if (item != null) {
308: synchronized (item) {
309: try {
310: for (;;) {
311: if (item.value == null)
312: return true;
313: if (waitTime <= 0) {
314: item.value = CANCELLED;
315: return false;
316: }
317: item.wait(waitTime);
318: waitTime = msecs
319: - (System.currentTimeMillis() - startTime);
320: }
321: } catch (InterruptedException ie) {
322: logger.error("offer()", ie);
323:
324: if (item.value == null) {
325: Thread.currentThread().interrupt();
326: return true;
327: } else {
328: item.value = CANCELLED;
329: throw ie;
330: }
331: }
332: }
333: }
334: }
335: }
336:
337: public Object poll(long msecs) throws InterruptedException {
338: logger.debug("poll(msecs=" + msecs + ") - start");
339:
340: long waitTime = msecs;
341: long startTime = 0;
342:
343: for (;;) {
344: if (Thread.interrupted())
345: throw new InterruptedException();
346:
347: LinkedNode item;
348: LinkedNode slot = null;
349:
350: synchronized (this ) {
351: item = waitingPuts.deq();
352: if (item == null) {
353: if (waitTime <= 0)
354: return null;
355: else
356: waitingTakes.enq(slot = new LinkedNode());
357: }
358: }
359:
360: if (item != null) {
361: synchronized (item) {
362: Object x = item.value;
363: if (x != CANCELLED) {
364: item.value = null;
365: item.next = null;
366: item.notify();
367: return x;
368: }
369: }
370: }
371:
372: long now = System.currentTimeMillis();
373: if (startTime == 0)
374: startTime = now;
375: else
376: waitTime = msecs - (now - startTime);
377:
378: if (slot != null) {
379: synchronized (slot) {
380: try {
381: for (;;) {
382: Object x = slot.value;
383: if (x != null) {
384: slot.value = null;
385: slot.next = null;
386: return x;
387: }
388: if (waitTime <= 0) {
389: slot.value = CANCELLED;
390: return null;
391: }
392: slot.wait(waitTime);
393: waitTime = msecs
394: - (System.currentTimeMillis() - startTime);
395: }
396: } catch (InterruptedException ie) {
397: logger.error("poll()", ie);
398:
399: Object x = slot.value;
400: if (x != null) {
401: slot.value = null;
402: slot.next = null;
403: Thread.currentThread().interrupt();
404: return x;
405: } else {
406: slot.value = CANCELLED;
407: throw ie;
408: }
409: }
410: }
411: }
412: }
413: }
414:
415: }
|