001: /*
002: based on file: QueuedSemaphore.java
003: Originally written by Doug Lea and released into the public domain.
004: This may be used for any purposes whatsoever without acknowledgment.
005: Thanks for the assistance and support of Sun Microsystems Labs,
006: and everyone contributing, testing, and using this code.
007: History:
008: Date Who What
009: 11Jun1998 dl Create public version
010: 5Aug1998 dl replaced int counters with longs
011: 24Aug1999 dl release(n): screen arguments
012: */
013:
014: package org.apache.beehive.netui.util.internal.concurrent;
015:
016: import java.util.*;
017:
018: /**
019: * Base class for internal queue classes for semaphores, etc.
020: * Relies on subclasses to actually implement queue mechanics.
021: * NOTE: this class is NOT present in java.util.concurrent.
022: **/
023:
024: abstract class WaitQueue {
025:
026: public abstract void insert(WaitNode w); // assumed not to block
027:
028: public abstract WaitNode extract(); // should return null if empty
029:
030: public abstract boolean hasNodes();
031:
032: public abstract int getLength();
033:
034: public abstract Collection getWaitingThreads();
035:
036: public abstract boolean isWaiting(Thread thread);
037:
038: static interface QueuedSync {
039: // invoked with sync on wait node, (atomically) just before enqueuing
040: boolean recheck(WaitNode node);
041:
042: // invoked with sync on wait node, (atomically) just before signalling
043: void takeOver(WaitNode node);
044: }
045:
046: static class WaitNode {
047: boolean waiting = true;
048: WaitNode next = null;
049: final Thread owner;
050:
051: public WaitNode() {
052: this .owner = Thread.currentThread();
053: }
054:
055: public Thread getOwner() {
056: return owner;
057: }
058:
059: public synchronized boolean signal(QueuedSync sync) {
060: boolean signalled = waiting;
061: if (signalled) {
062: waiting = false;
063: notify();
064: sync.takeOver(this );
065: }
066: return signalled;
067: }
068:
069: public synchronized boolean doTimedWait(QueuedSync sync,
070: long nanos) throws InterruptedException {
071: if (sync.recheck(this ) || !waiting)
072: return true;
073: else if (nanos <= 0) {
074: waiting = false;
075: return false;
076: } else {
077: long deadline = Utils.nanoTime() + nanos;
078: try {
079: for (;;) {
080: TimeUnit.NANOSECONDS.timedWait(this , nanos);
081: if (!waiting) // definitely signalled
082: return true;
083: else {
084: nanos = deadline - Utils.nanoTime();
085: if (nanos <= 0) { // timed out
086: waiting = false;
087: return false;
088: }
089: }
090: }
091: } catch (InterruptedException ex) {
092: if (waiting) { // no notification
093: waiting = false; // invalidate for the signaller
094: throw ex;
095: } else { // thread was interrupted after it was notified
096: Thread.currentThread().interrupt();
097: return true;
098: }
099: }
100: }
101: }
102:
103: public synchronized void doWait(QueuedSync sync)
104: throws InterruptedException {
105: if (!sync.recheck(this )) {
106: try {
107: while (waiting)
108: wait();
109: } catch (InterruptedException ex) {
110: if (waiting) { // no notification
111: waiting = false; // invalidate for the signaller
112: throw ex;
113: } else { // thread was interrupted after it was notified
114: Thread.currentThread().interrupt();
115: return;
116: }
117: }
118: }
119: }
120: }
121:
122: }
|