001: /*
002: File: QueuedSemaphore.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: 5Aug1998 dl replaced int counters with longs
013: 24Aug1999 dl release(n): screen arguments
014: */
015:
016: package EDU.oswego.cs.dl.util.concurrent;
017:
018: /**
019: * Abstract base class for semaphores relying on queued wait nodes.
020: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
021: **/
022:
023: public abstract class QueuedSemaphore extends Semaphore {
024:
025: protected final WaitQueue wq_;
026:
027: QueuedSemaphore(WaitQueue q, long initialPermits) {
028: super (initialPermits);
029: wq_ = q;
030: }
031:
032: public void acquire() throws InterruptedException {
033: if (Thread.interrupted())
034: throw new InterruptedException();
035: if (precheck())
036: return;
037: WaitQueue.WaitNode w = new WaitQueue.WaitNode();
038: w.doWait(this );
039: }
040:
041: public boolean attempt(long msecs) throws InterruptedException {
042: if (Thread.interrupted())
043: throw new InterruptedException();
044: if (precheck())
045: return true;
046: if (msecs <= 0)
047: return false;
048:
049: WaitQueue.WaitNode w = new WaitQueue.WaitNode();
050: return w.doTimedWait(this , msecs);
051: }
052:
053: protected synchronized boolean precheck() {
054: boolean pass = (permits_ > 0);
055: if (pass)
056: --permits_;
057: return pass;
058: }
059:
060: protected synchronized boolean recheck(WaitQueue.WaitNode w) {
061: boolean pass = (permits_ > 0);
062: if (pass)
063: --permits_;
064: else
065: wq_.insert(w);
066: return pass;
067: }
068:
069: protected synchronized WaitQueue.WaitNode getSignallee() {
070: WaitQueue.WaitNode w = wq_.extract();
071: if (w == null)
072: ++permits_; // if none, inc permits for new arrivals
073: return w;
074: }
075:
076: public void release() {
077: for (;;) {
078: WaitQueue.WaitNode w = getSignallee();
079: if (w == null)
080: return; // no one to signal
081: if (w.signal())
082: return; // notify if still waiting, else skip
083: }
084: }
085:
086: /** Release N permits **/
087: public void release(long n) {
088: if (n < 0)
089: throw new IllegalArgumentException("Negative argument");
090:
091: for (long i = 0; i < n; ++i)
092: release();
093: }
094:
095: /**
096: * Base class for internal queue classes for semaphores, etc.
097: * Relies on subclasses to actually implement queue mechanics
098: **/
099:
100: protected static abstract class WaitQueue {
101:
102: protected abstract void insert(WaitNode w);// assumed not to block
103:
104: protected abstract WaitNode extract(); // should return null if empty
105:
106: protected static class WaitNode {
107: boolean waiting = true;
108: WaitNode next = null;
109:
110: protected synchronized boolean signal() {
111: boolean signalled = waiting;
112: if (signalled) {
113: waiting = false;
114: notify();
115: }
116: return signalled;
117: }
118:
119: protected synchronized boolean doTimedWait(
120: QueuedSemaphore sem, long msecs)
121: throws InterruptedException {
122: if (sem.recheck(this ) || !waiting)
123: return true;
124: else if (msecs <= 0) {
125: waiting = false;
126: return false;
127: } else {
128: long waitTime = msecs;
129: long start = System.currentTimeMillis();
130:
131: try {
132: for (;;) {
133: wait(waitTime);
134: if (!waiting) // definitely signalled
135: return true;
136: else {
137: waitTime = msecs
138: - (System.currentTimeMillis() - start);
139: if (waitTime <= 0) { // timed out
140: waiting = false;
141: return false;
142: }
143: }
144: }
145: } catch (InterruptedException ex) {
146: if (waiting) { // no notification
147: waiting = false; // invalidate for the signaller
148: throw ex;
149: } else { // thread was interrupted after it was notified
150: Thread.currentThread().interrupt();
151: return true;
152: }
153: }
154: }
155: }
156:
157: protected synchronized void doWait(QueuedSemaphore sem)
158: throws InterruptedException {
159: if (!sem.recheck(this )) {
160: try {
161: while (waiting)
162: wait();
163: } catch (InterruptedException ex) {
164: if (waiting) { // no notification
165: waiting = false; // invalidate for the signaller
166: throw ex;
167: } else { // thread was interrupted after it was notified
168: Thread.currentThread().interrupt();
169: return;
170: }
171: }
172: }
173: }
174: }
175:
176: }
177:
178: }
|