001: /*
002: File: Semaphore.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: 5Aug1998 dl replaced int counters with longs
013: 24Aug1999 dl release(n): screen arguments
014: */
015:
016: package org.dbunit.util.concurrent;
017:
018: import org.slf4j.Logger;
019: import org.slf4j.LoggerFactory;
020:
021: /**
022: * Base class for counting semaphores.
023: * Conceptually, a semaphore maintains a set of permits.
024: * Each acquire() blocks if necessary
025: * until a permit is available, and then takes it.
026: * Each release adds a permit. However, no actual permit objects
027: * are used; the Semaphore just keeps a count of the number
028: * available and acts accordingly.
029: * <p>
030: * A semaphore initialized to 1 can serve as a mutual exclusion
031: * lock.
032: * <p>
033: * Different implementation subclasses may provide different
034: * ordering guarantees (or lack thereof) surrounding which
035: * threads will be resumed upon a signal.
036: * <p>
037: * The default implementation makes NO
038: * guarantees about the order in which threads will
039: * acquire permits. It is often faster than other implementations.
040: * <p>
041: * <b>Sample usage.</b> Here is a class that uses a semaphore to
042: * help manage access to a pool of items.
043: * <pre>
044: * class Pool {
045: * static final MAX_AVAILABLE = 100;
046: * private final Semaphore available = new Semaphore(MAX_AVAILABLE);
047: *
048: * public Object getItem() throws InterruptedException { // no synch
049: * available.acquire();
050: * return getNextAvailableItem();
051: * }
052: *
053: * public void putItem(Object x) { // no synch
054: * if (markAsUnused(x))
055: * available.release();
056: * }
057: *
058: * // Not a particularly efficient data structure; just for demo
059: *
060: * protected Object[] items = ... whatever kinds of items being managed
061: * protected boolean[] used = new boolean[MAX_AVAILABLE];
062: *
063: * protected synchronized Object getNextAvailableItem() {
064: * for (int i = 0; i < MAX_AVAILABLE; ++i) {
065: * if (!used[i]) {
066: * used[i] = true;
067: * return items[i];
068: * }
069: * }
070: * return null; // not reached
071: * }
072: *
073: * protected synchronized boolean markAsUnused(Object item) {
074: * for (int i = 0; i < MAX_AVAILABLE; ++i) {
075: * if (item == items[i]) {
076: * if (used[i]) {
077: * used[i] = false;
078: * return true;
079: * }
080: * else
081: * return false;
082: * }
083: * }
084: * return false;
085: * }
086: *
087: * }
088: *</pre>
089: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
090: **/
091:
092: public class Semaphore implements Sync {
093:
094: /**
095: * Logger for this class
096: */
097: private static final Logger logger = LoggerFactory
098: .getLogger(Semaphore.class);
099:
100: /** current number of available permits **/
101: protected long permits_;
102:
103: /**
104: * Create a Semaphore with the given initial number of permits.
105: * Using a seed of one makes the semaphore act as a mutual exclusion lock.
106: * Negative seeds are also allowed, in which case no acquires will proceed
107: * until the number of releases has pushed the number of permits past 0.
108: **/
109: public Semaphore(long initialPermits) {
110: permits_ = initialPermits;
111: }
112:
113: /** Wait until a permit is available, and take one **/
114: public void acquire() throws InterruptedException {
115: logger.debug("acquire() - start");
116:
117: if (Thread.interrupted())
118: throw new InterruptedException();
119: synchronized (this ) {
120: try {
121: while (permits_ <= 0)
122: wait();
123: --permits_;
124: } catch (InterruptedException ex) {
125: logger.error("acquire()", ex);
126:
127: notify();
128: throw ex;
129: }
130: }
131: }
132:
133: /** Wait at most msecs millisconds for a permit. **/
134: public boolean attempt(long msecs) throws InterruptedException {
135: logger.debug("attempt(msecs=" + msecs + ") - start");
136:
137: if (Thread.interrupted())
138: throw new InterruptedException();
139:
140: synchronized (this ) {
141: if (permits_ > 0) {
142: --permits_;
143: return true;
144: } else if (msecs <= 0)
145: return false;
146: else {
147: try {
148: long startTime = System.currentTimeMillis();
149: long waitTime = msecs;
150:
151: for (;;) {
152: wait(waitTime);
153: if (permits_ > 0) {
154: --permits_;
155: return true;
156: } else {
157: waitTime = msecs
158: - (System.currentTimeMillis() - startTime);
159: if (waitTime <= 0)
160: return false;
161: }
162: }
163: } catch (InterruptedException ex) {
164: logger.error("attempt()", ex);
165:
166: notify();
167: throw ex;
168: }
169: }
170: }
171: }
172:
173: /** Release a permit **/
174: public synchronized void release() {
175: logger.debug("release() - start");
176:
177: ++permits_;
178: notify();
179: }
180:
181: /**
182: * Release N permits. <code>release(n)</code> is
183: * equivalent in effect to:
184: * <pre>
185: * for (int i = 0; i < n; ++i) release();
186: * </pre>
187: * <p>
188: * But may be more efficient in some semaphore implementations.
189: * @exception IllegalArgumentException if n is negative.
190: **/
191: public synchronized void release(long n) {
192: logger.debug("release(n=" + n + ") - start");
193:
194: if (n < 0)
195: throw new IllegalArgumentException("Negative argument");
196:
197: permits_ += n;
198: for (long i = 0; i < n; ++i)
199: notify();
200: }
201:
202: /**
203: * Return the current number of available permits.
204: * Returns an accurate, but possibly unstable value,
205: * that may change immediately after returning.
206: **/
207: public synchronized long permits() {
208: logger.debug("permits() - start");
209:
210: return permits_;
211: }
212:
213: }
|