001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package java.util.concurrent;
008:
009: import java.util.*;
010: import java.util.concurrent.locks.*;
011: import java.util.concurrent.atomic.*;
012:
013: /**
014: * A counting semaphore. Conceptually, a semaphore maintains a set of
015: * permits. Each {@link #acquire} blocks if necessary until a permit is
016: * available, and then takes it. Each {@link #release} adds a permit,
017: * potentially releasing a blocking acquirer.
018: * However, no actual permit objects are used; the <tt>Semaphore</tt> just
019: * keeps a count of the number available and acts accordingly.
020: *
021: * <p>Semaphores are often used to restrict the number of threads than can
022: * access some (physical or logical) resource. For example, here is
023: * a class that uses a semaphore to control access to a pool of items:
024: * <pre>
025: * class Pool {
026: * private static final MAX_AVAILABLE = 100;
027: * private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
028: *
029: * public Object getItem() throws InterruptedException {
030: * available.acquire();
031: * return getNextAvailableItem();
032: * }
033: *
034: * public void putItem(Object x) {
035: * if (markAsUnused(x))
036: * available.release();
037: * }
038: *
039: * // Not a particularly efficient data structure; just for demo
040: *
041: * protected Object[] items = ... whatever kinds of items being managed
042: * protected boolean[] used = new boolean[MAX_AVAILABLE];
043: *
044: * protected synchronized Object getNextAvailableItem() {
045: * for (int i = 0; i < MAX_AVAILABLE; ++i) {
046: * if (!used[i]) {
047: * used[i] = true;
048: * return items[i];
049: * }
050: * }
051: * return null; // not reached
052: * }
053: *
054: * protected synchronized boolean markAsUnused(Object item) {
055: * for (int i = 0; i < MAX_AVAILABLE; ++i) {
056: * if (item == items[i]) {
057: * if (used[i]) {
058: * used[i] = false;
059: * return true;
060: * } else
061: * return false;
062: * }
063: * }
064: * return false;
065: * }
066: *
067: * }
068: * </pre>
069: *
070: * <p>Before obtaining an item each thread must acquire a permit from
071: * the semaphore, guaranteeing that an item is available for use. When
072: * the thread has finished with the item it is returned back to the
073: * pool and a permit is returned to the semaphore, allowing another
074: * thread to acquire that item. Note that no synchronization lock is
075: * held when {@link #acquire} is called as that would prevent an item
076: * from being returned to the pool. The semaphore encapsulates the
077: * synchronization needed to restrict access to the pool, separately
078: * from any synchronization needed to maintain the consistency of the
079: * pool itself.
080: *
081: * <p>A semaphore initialized to one, and which is used such that it
082: * only has at most one permit available, can serve as a mutual
083: * exclusion lock. This is more commonly known as a <em>binary
084: * semaphore</em>, because it only has two states: one permit
085: * available, or zero permits available. When used in this way, the
086: * binary semaphore has the property (unlike many {@link Lock}
087: * implementations), that the "lock" can be released by a
088: * thread other than the owner (as semaphores have no notion of
089: * ownership). This can be useful in some specialized contexts, such
090: * as deadlock recovery.
091: *
092: * <p> The constructor for this class optionally accepts a
093: * <em>fairness</em> parameter. When set false, this class makes no
094: * guarantees about the order in which threads acquire permits. In
095: * particular, <em>barging</em> is permitted, that is, a thread
096: * invoking {@link #acquire} can be allocated a permit ahead of a
097: * thread that has been waiting. When fairness is set true, the
098: * semaphore guarantees that threads invoking any of the {@link
099: * #acquire() acquire} methods are allocated permits in the order in
100: * which their invocation of those methods was processed
101: * (first-in-first-out; FIFO). Note that FIFO ordering necessarily
102: * applies to specific internal points of execution within these
103: * methods. So, it is possible for one thread to invoke
104: * <tt>acquire</tt> before another, but reach the ordering point after
105: * the other, and similarly upon return from the method.
106: *
107: * <p>Generally, semaphores used to control resource access should be
108: * initialized as fair, to ensure that no thread is starved out from
109: * accessing a resource. When using semaphores for other kinds of
110: * synchronization control, the throughput advantages of non-fair
111: * ordering often outweigh fairness considerations.
112: *
113: * <p>This class also provides convenience methods to {@link
114: * #acquire(int) acquire} and {@link #release(int) release} multiple
115: * permits at a time. Beware of the increased risk of indefinite
116: * postponement when these methods are used without fairness set true.
117: *
118: * @since 1.5
119: * @author Doug Lea
120: *
121: */
122:
123: public class Semaphore implements java.io.Serializable {
124: private static final long serialVersionUID = -3222578661600680210L;
125: /** All mechanics via AbstractQueuedSynchronizer subclass */
126: private final Sync sync;
127:
128: /**
129: * Synchronization implementation for semaphore. Uses AQS state
130: * to represent permits. Subclassed into fair and nonfair
131: * versions.
132: */
133: abstract static class Sync extends AbstractQueuedSynchronizer {
134: Sync(int permits) {
135: setState(permits);
136: }
137:
138: final int getPermits() {
139: return getState();
140: }
141:
142: final int nonfairTryAcquireShared(int acquires) {
143: for (;;) {
144: int available = getState();
145: int remaining = available - acquires;
146: if (remaining < 0
147: || compareAndSetState(available, remaining))
148: return remaining;
149: }
150: }
151:
152: protected final boolean tryReleaseShared(int releases) {
153: for (;;) {
154: int p = getState();
155: if (compareAndSetState(p, p + releases))
156: return true;
157: }
158: }
159:
160: final void reducePermits(int reductions) {
161: for (;;) {
162: int current = getState();
163: int next = current - reductions;
164: if (compareAndSetState(current, next))
165: return;
166: }
167: }
168:
169: final int drainPermits() {
170: for (;;) {
171: int current = getState();
172: if (current == 0 || compareAndSetState(current, 0))
173: return current;
174: }
175: }
176: }
177:
178: /**
179: * NonFair version
180: */
181: final static class NonfairSync extends Sync {
182: NonfairSync(int permits) {
183: super (permits);
184: }
185:
186: protected int tryAcquireShared(int acquires) {
187: return nonfairTryAcquireShared(acquires);
188: }
189: }
190:
191: /**
192: * Fair version
193: */
194: final static class FairSync extends Sync {
195: FairSync(int permits) {
196: super (permits);
197: }
198:
199: protected int tryAcquireShared(int acquires) {
200: Thread current = Thread.currentThread();
201: for (;;) {
202: Thread first = getFirstQueuedThread();
203: if (first != null && first != current)
204: return -1;
205: int available = getState();
206: int remaining = available - acquires;
207: if (remaining < 0
208: || compareAndSetState(available, remaining))
209: return remaining;
210: }
211: }
212: }
213:
214: /**
215: * Creates a <tt>Semaphore</tt> with the given number of
216: * permits and nonfair fairness setting.
217: * @param permits the initial number of permits available. This
218: * value may be negative, in which case releases must
219: * occur before any acquires will be granted.
220: */
221: public Semaphore(int permits) {
222: sync = new NonfairSync(permits);
223: }
224:
225: /**
226: * Creates a <tt>Semaphore</tt> with the given number of
227: * permits and the given fairness setting.
228: * @param permits the initial number of permits available. This
229: * value may be negative, in which case releases must
230: * occur before any acquires will be granted.
231: * @param fair true if this semaphore will guarantee first-in
232: * first-out granting of permits under contention, else false.
233: */
234: public Semaphore(int permits, boolean fair) {
235: sync = (fair) ? new FairSync(permits)
236: : new NonfairSync(permits);
237: }
238:
239: /**
240: * Acquires a permit from this semaphore, blocking until one is
241: * available, or the thread is {@link Thread#interrupt interrupted}.
242: *
243: * <p>Acquires a permit, if one is available and returns immediately,
244: * reducing the number of available permits by one.
245: * <p>If no permit is available then the current thread becomes
246: * disabled for thread scheduling purposes and lies dormant until
247: * one of two things happens:
248: * <ul>
249: * <li>Some other thread invokes the {@link #release} method for this
250: * semaphore and the current thread is next to be assigned a permit; or
251: * <li>Some other thread {@link Thread#interrupt interrupts} the current
252: * thread.
253: * </ul>
254: *
255: * <p>If the current thread:
256: * <ul>
257: * <li>has its interrupted status set on entry to this method; or
258: * <li>is {@link Thread#interrupt interrupted} while waiting
259: * for a permit,
260: * </ul>
261: * then {@link InterruptedException} is thrown and the current thread's
262: * interrupted status is cleared.
263: *
264: * @throws InterruptedException if the current thread is interrupted
265: *
266: * @see Thread#interrupt
267: */
268: public void acquire() throws InterruptedException {
269: sync.acquireSharedInterruptibly(1);
270: }
271:
272: /**
273: * Acquires a permit from this semaphore, blocking until one is
274: * available.
275: *
276: * <p>Acquires a permit, if one is available and returns immediately,
277: * reducing the number of available permits by one.
278: * <p>If no permit is available then the current thread becomes
279: * disabled for thread scheduling purposes and lies dormant until
280: * some other thread invokes the {@link #release} method for this
281: * semaphore and the current thread is next to be assigned a permit.
282: *
283: * <p>If the current thread
284: * is {@link Thread#interrupt interrupted} while waiting
285: * for a permit then it will continue to wait, but the time at which
286: * the thread is assigned a permit may change compared to the time it
287: * would have received the permit had no interruption occurred. When the
288: * thread does return from this method its interrupt status will be set.
289: *
290: */
291: public void acquireUninterruptibly() {
292: sync.acquireShared(1);
293: }
294:
295: /**
296: * Acquires a permit from this semaphore, only if one is available at the
297: * time of invocation.
298: * <p>Acquires a permit, if one is available and returns immediately,
299: * with the value <tt>true</tt>,
300: * reducing the number of available permits by one.
301: *
302: * <p>If no permit is available then this method will return
303: * immediately with the value <tt>false</tt>.
304: *
305: * <p>Even when this semaphore has been set to use a
306: * fair ordering policy, a call to <tt>tryAcquire()</tt> <em>will</em>
307: * immediately acquire a permit if one is available, whether or not
308: * other threads are currently waiting.
309: * This "barging" behavior can be useful in certain
310: * circumstances, even though it breaks fairness. If you want to honor
311: * the fairness setting, then use
312: * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }
313: * which is almost equivalent (it also detects interruption).
314: *
315: * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
316: * otherwise.
317: */
318: public boolean tryAcquire() {
319: return sync.nonfairTryAcquireShared(1) >= 0;
320: }
321:
322: /**
323: * Acquires a permit from this semaphore, if one becomes available
324: * within the given waiting time and the
325: * current thread has not been {@link Thread#interrupt interrupted}.
326: * <p>Acquires a permit, if one is available and returns immediately,
327: * with the value <tt>true</tt>,
328: * reducing the number of available permits by one.
329: * <p>If no permit is available then
330: * the current thread becomes disabled for thread scheduling
331: * purposes and lies dormant until one of three things happens:
332: * <ul>
333: * <li>Some other thread invokes the {@link #release} method for this
334: * semaphore and the current thread is next to be assigned a permit; or
335: * <li>Some other thread {@link Thread#interrupt interrupts} the current
336: * thread; or
337: * <li>The specified waiting time elapses.
338: * </ul>
339: * <p>If a permit is acquired then the value <tt>true</tt> is returned.
340: * <p>If the current thread:
341: * <ul>
342: * <li>has its interrupted status set on entry to this method; or
343: * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
344: * a permit,
345: * </ul>
346: * then {@link InterruptedException} is thrown and the current thread's
347: * interrupted status is cleared.
348: * <p>If the specified waiting time elapses then the value <tt>false</tt>
349: * is returned.
350: * If the time is less than or equal to zero, the method will not wait
351: * at all.
352: *
353: * @param timeout the maximum time to wait for a permit
354: * @param unit the time unit of the <tt>timeout</tt> argument.
355: * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
356: * if the waiting time elapsed before a permit was acquired.
357: *
358: * @throws InterruptedException if the current thread is interrupted
359: *
360: * @see Thread#interrupt
361: *
362: */
363: public boolean tryAcquire(long timeout, TimeUnit unit)
364: throws InterruptedException {
365: return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
366: }
367:
368: /**
369: * Releases a permit, returning it to the semaphore.
370: * <p>Releases a permit, increasing the number of available permits
371: * by one.
372: * If any threads are blocking trying to acquire a permit, then one
373: * is selected and given the permit that was just released.
374: * That thread is re-enabled for thread scheduling purposes.
375: * <p>There is no requirement that a thread that releases a permit must
376: * have acquired that permit by calling {@link #acquire}.
377: * Correct usage of a semaphore is established by programming convention
378: * in the application.
379: */
380: public void release() {
381: sync.releaseShared(1);
382: }
383:
384: /**
385: * Acquires the given number of permits from this semaphore,
386: * blocking until all are available,
387: * or the thread is {@link Thread#interrupt interrupted}.
388: *
389: * <p>Acquires the given number of permits, if they are available,
390: * and returns immediately,
391: * reducing the number of available permits by the given amount.
392: *
393: * <p>If insufficient permits are available then the current thread becomes
394: * disabled for thread scheduling purposes and lies dormant until
395: * one of two things happens:
396: * <ul>
397: * <li>Some other thread invokes one of the {@link #release() release}
398: * methods for this semaphore, the current thread is next to be assigned
399: * permits and the number of available permits satisfies this request; or
400: * <li>Some other thread {@link Thread#interrupt interrupts} the current
401: * thread.
402: * </ul>
403: *
404: * <p>If the current thread:
405: * <ul>
406: * <li>has its interrupted status set on entry to this method; or
407: * <li>is {@link Thread#interrupt interrupted} while waiting
408: * for a permit,
409: * </ul>
410: * then {@link InterruptedException} is thrown and the current thread's
411: * interrupted status is cleared.
412: * Any permits that were to be assigned to this thread are instead
413: * assigned to the next waiting thread(s), as if
414: * they had been made available by a call to {@link #release()}.
415: *
416: * @param permits the number of permits to acquire
417: *
418: * @throws InterruptedException if the current thread is interrupted
419: * @throws IllegalArgumentException if permits less than zero.
420: *
421: * @see Thread#interrupt
422: */
423: public void acquire(int permits) throws InterruptedException {
424: if (permits < 0)
425: throw new IllegalArgumentException();
426: sync.acquireSharedInterruptibly(permits);
427: }
428:
429: /**
430: * Acquires the given number of permits from this semaphore,
431: * blocking until all are available.
432: *
433: * <p>Acquires the given number of permits, if they are available,
434: * and returns immediately,
435: * reducing the number of available permits by the given amount.
436: *
437: * <p>If insufficient permits are available then the current thread becomes
438: * disabled for thread scheduling purposes and lies dormant until
439: * some other thread invokes one of the {@link #release() release}
440: * methods for this semaphore, the current thread is next to be assigned
441: * permits and the number of available permits satisfies this request.
442: *
443: * <p>If the current thread
444: * is {@link Thread#interrupt interrupted} while waiting
445: * for permits then it will continue to wait and its position in the
446: * queue is not affected. When the
447: * thread does return from this method its interrupt status will be set.
448: *
449: * @param permits the number of permits to acquire
450: * @throws IllegalArgumentException if permits less than zero.
451: *
452: */
453: public void acquireUninterruptibly(int permits) {
454: if (permits < 0)
455: throw new IllegalArgumentException();
456: sync.acquireShared(permits);
457: }
458:
459: /**
460: * Acquires the given number of permits from this semaphore, only
461: * if all are available at the time of invocation.
462: *
463: * <p>Acquires the given number of permits, if they are available, and
464: * returns immediately, with the value <tt>true</tt>,
465: * reducing the number of available permits by the given amount.
466: *
467: * <p>If insufficient permits are available then this method will return
468: * immediately with the value <tt>false</tt> and the number of available
469: * permits is unchanged.
470: *
471: * <p>Even when this semaphore has been set to use a fair ordering
472: * policy, a call to <tt>tryAcquire</tt> <em>will</em>
473: * immediately acquire a permit if one is available, whether or
474: * not other threads are currently waiting. This
475: * "barging" behavior can be useful in certain
476: * circumstances, even though it breaks fairness. If you want to
477: * honor the fairness setting, then use {@link #tryAcquire(int,
478: * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) }
479: * which is almost equivalent (it also detects interruption).
480: *
481: * @param permits the number of permits to acquire
482: *
483: * @return <tt>true</tt> if the permits were acquired and <tt>false</tt>
484: * otherwise.
485: * @throws IllegalArgumentException if permits less than zero.
486: */
487: public boolean tryAcquire(int permits) {
488: if (permits < 0)
489: throw new IllegalArgumentException();
490: return sync.nonfairTryAcquireShared(permits) >= 0;
491: }
492:
493: /**
494: * Acquires the given number of permits from this semaphore, if all
495: * become available within the given waiting time and the
496: * current thread has not been {@link Thread#interrupt interrupted}.
497: * <p>Acquires the given number of permits, if they are available and
498: * returns immediately, with the value <tt>true</tt>,
499: * reducing the number of available permits by the given amount.
500: * <p>If insufficient permits are available then
501: * the current thread becomes disabled for thread scheduling
502: * purposes and lies dormant until one of three things happens:
503: * <ul>
504: * <li>Some other thread invokes one of the {@link #release() release}
505: * methods for this semaphore, the current thread is next to be assigned
506: * permits and the number of available permits satisfies this request; or
507: * <li>Some other thread {@link Thread#interrupt interrupts} the current
508: * thread; or
509: * <li>The specified waiting time elapses.
510: * </ul>
511: * <p>If the permits are acquired then the value <tt>true</tt> is returned.
512: * <p>If the current thread:
513: * <ul>
514: * <li>has its interrupted status set on entry to this method; or
515: * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
516: * the permits,
517: * </ul>
518: * then {@link InterruptedException} is thrown and the current thread's
519: * interrupted status is cleared.
520: * Any permits that were to be assigned to this thread, are instead
521: * assigned to the next waiting thread(s), as if
522: * they had been made available by a call to {@link #release()}.
523: *
524: * <p>If the specified waiting time elapses then the value <tt>false</tt>
525: * is returned.
526: * If the time is
527: * less than or equal to zero, the method will not wait at all.
528: * Any permits that were to be assigned to this thread, are instead
529: * assigned to the next waiting thread(s), as if
530: * they had been made available by a call to {@link #release()}.
531: *
532: * @param permits the number of permits to acquire
533: * @param timeout the maximum time to wait for the permits
534: * @param unit the time unit of the <tt>timeout</tt> argument.
535: * @return <tt>true</tt> if all permits were acquired and <tt>false</tt>
536: * if the waiting time elapsed before all permits were acquired.
537: *
538: * @throws InterruptedException if the current thread is interrupted
539: * @throws IllegalArgumentException if permits less than zero.
540: *
541: * @see Thread#interrupt
542: *
543: */
544: public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
545: throws InterruptedException {
546: if (permits < 0)
547: throw new IllegalArgumentException();
548: return sync.tryAcquireSharedNanos(permits, unit
549: .toNanos(timeout));
550: }
551:
552: /**
553: * Releases the given number of permits, returning them to the semaphore.
554: * <p>Releases the given number of permits, increasing the number of
555: * available permits by that amount.
556: * If any threads are blocking trying to acquire permits, then the
557: * one that has been waiting the longest
558: * is selected and given the permits that were just released.
559: * If the number of available permits satisfies that thread's request
560: * then that thread is re-enabled for thread scheduling purposes; otherwise
561: * the thread continues to wait. If there are still permits available
562: * after the first thread's request has been satisfied, then those permits
563: * are assigned to the next waiting thread. If it is satisfied then it is
564: * re-enabled for thread scheduling purposes. This continues until there
565: * are insufficient permits to satisfy the next waiting thread, or there
566: * are no more waiting threads.
567: *
568: * <p>There is no requirement that a thread that releases a permit must
569: * have acquired that permit by calling {@link Semaphore#acquire acquire}.
570: * Correct usage of a semaphore is established by programming convention
571: * in the application.
572: *
573: * @param permits the number of permits to release
574: * @throws IllegalArgumentException if permits less than zero.
575: */
576: public void release(int permits) {
577: if (permits < 0)
578: throw new IllegalArgumentException();
579: sync.releaseShared(permits);
580: }
581:
582: /**
583: * Returns the current number of permits available in this semaphore.
584: * <p>This method is typically used for debugging and testing purposes.
585: * @return the number of permits available in this semaphore.
586: */
587: public int availablePermits() {
588: return sync.getPermits();
589: }
590:
591: /**
592: * Acquire and return all permits that are immediately available.
593: * @return the number of permits
594: */
595: public int drainPermits() {
596: return sync.drainPermits();
597: }
598:
599: /**
600: * Shrinks the number of available permits by the indicated
601: * reduction. This method can be useful in subclasses that use
602: * semaphores to track resources that become unavailable. This
603: * method differs from <tt>acquire</tt> in that it does not block
604: * waiting for permits to become available.
605: * @param reduction the number of permits to remove
606: * @throws IllegalArgumentException if reduction is negative
607: */
608: protected void reducePermits(int reduction) {
609: if (reduction < 0)
610: throw new IllegalArgumentException();
611: sync.reducePermits(reduction);
612: }
613:
614: /**
615: * Returns true if this semaphore has fairness set true.
616: * @return true if this semaphore has fairness set true.
617: */
618: public boolean isFair() {
619: return sync instanceof FairSync;
620: }
621:
622: /**
623: * Queries whether any threads are waiting to acquire. Note that
624: * because cancellations may occur at any time, a <tt>true</tt>
625: * return does not guarantee that any other thread will ever
626: * acquire. This method is designed primarily for use in
627: * monitoring of the system state.
628: *
629: * @return true if there may be other threads waiting to acquire
630: * the lock.
631: */
632: public final boolean hasQueuedThreads() {
633: return sync.hasQueuedThreads();
634: }
635:
636: /**
637: * Returns an estimate of the number of threads waiting to
638: * acquire. The value is only an estimate because the number of
639: * threads may change dynamically while this method traverses
640: * internal data structures. This method is designed for use in
641: * monitoring of the system state, not for synchronization
642: * control.
643: * @return the estimated number of threads waiting for this lock
644: */
645: public final int getQueueLength() {
646: return sync.getQueueLength();
647: }
648:
649: /**
650: * Returns a collection containing threads that may be waiting to
651: * acquire. Because the actual set of threads may change
652: * dynamically while constructing this result, the returned
653: * collection is only a best-effort estimate. The elements of the
654: * returned collection are in no particular order. This method is
655: * designed to facilitate construction of subclasses that provide
656: * more extensive monitoring facilities.
657: * @return the collection of threads
658: */
659: protected Collection<Thread> getQueuedThreads() {
660: return sync.getQueuedThreads();
661: }
662:
663: /**
664: * Returns a string identifying this semaphore, as well as its state.
665: * The state, in brackets, includes the String
666: * "Permits =" followed by the number of permits.
667: * @return a string identifying this semaphore, as well as its
668: * state
669: */
670: public String toString() {
671: return super .toString() + "[Permits = " + sync.getPermits()
672: + "]";
673: }
674:
675: }
|