001: /*
002: File: WaiterPreferenceSemaphore.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: 5Aug1998 dl replaced int counters with longs
013: */
014:
015: package EDU.oswego.cs.dl.util.concurrent;
016:
017: /**
018: * An implementation of counting Semaphores that
019: * enforces enough fairness for applications that
020: * need to avoid indefinite overtaking without
021: * necessarily requiring FIFO ordered access.
022: * Empirically, very little is paid for this property
023: * unless there is a lot of contention among threads
024: * or very unfair JVM scheduling.
025: * The acquire method waits even if there are permits
026: * available but have not yet been claimed by threads that have
027: * been notified but not yet resumed. This makes the semaphore
028: * almost as fair as the underlying Java primitives allow.
029: * So, if synch lock entry and notify are both fair
030: * so is the semaphore -- almost: Rewaits stemming
031: * from timeouts in attempt, along with potentials for
032: * interrupted threads to be notified can compromise fairness,
033: * possibly allowing later-arriving threads to pass before
034: * later arriving ones. However, in no case can a newly
035: * entering thread obtain a permit if there are still others waiting.
036: * Also, signalling order need not coincide with
037: * resumption order. Later-arriving threads might get permits
038: * and continue before other resumable threads are actually resumed.
039: * However, all of these potential fairness breaches are
040: * very rare in practice unless the underlying JVM
041: * performs strictly LIFO notifications (which has, sadly enough,
042: * been known to occur) in which case you need to use
043: * a FIFOSemaphore to maintain a reasonable approximation
044: * of fairness.
045: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
046: **/
047:
048: public final class WaiterPreferenceSemaphore extends Semaphore {
049:
050: /**
051: * Create a Semaphore with the given initial number of permits.
052: **/
053:
054: public WaiterPreferenceSemaphore(long initial) {
055: super (initial);
056: }
057:
058: /** Number of waiting threads **/
059: protected long waits_ = 0;
060:
061: public void acquire() throws InterruptedException {
062: if (Thread.interrupted())
063: throw new InterruptedException();
064: synchronized (this ) {
065: /*
066: Only take if there are more permits than threads waiting
067: for permits. This prevents infinite overtaking.
068: */
069: if (permits_ > waits_) {
070: --permits_;
071: return;
072: } else {
073: ++waits_;
074: try {
075: for (;;) {
076: wait();
077: if (permits_ > 0) {
078: --waits_;
079: --permits_;
080: return;
081: }
082: }
083: } catch (InterruptedException ex) {
084: --waits_;
085: notify();
086: throw ex;
087: }
088: }
089: }
090: }
091:
092: public boolean attempt(long msecs) throws InterruptedException {
093: if (Thread.interrupted())
094: throw new InterruptedException();
095:
096: synchronized (this ) {
097: if (permits_ > waits_) {
098: --permits_;
099: return true;
100: } else if (msecs <= 0)
101: return false;
102: else {
103: ++waits_;
104:
105: long startTime = System.currentTimeMillis();
106: long waitTime = msecs;
107:
108: try {
109: for (;;) {
110: wait(waitTime);
111: if (permits_ > 0) {
112: --waits_;
113: --permits_;
114: return true;
115: } else { // got a time-out or false-alarm notify
116: waitTime = msecs
117: - (System.currentTimeMillis() - startTime);
118: if (waitTime <= 0) {
119: --waits_;
120: return false;
121: }
122: }
123: }
124: } catch (InterruptedException ex) {
125: --waits_;
126: notify();
127: throw ex;
128: }
129: }
130: }
131: }
132:
133: public synchronized void release() {
134: ++permits_;
135: notify();
136: }
137:
138: /** Release N permits **/
139: public synchronized void release(long n) {
140: permits_ += n;
141: for (long i = 0; i < n; ++i)
142: notify();
143: }
144:
145: }
|