001: /*
002: File: ConditionVariable.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: */
013:
014: package EDU.oswego.cs.dl.util.concurrent;
015:
016: /**
017: * This class is designed for fans of POSIX pthreads programming.
018: * If you restrict yourself to Mutexes and CondVars, you can
019: * use most of your favorite constructions. Don't randomly mix them
020: * with synchronized methods or blocks though.
021: * <p>
022: * Method names and behavior are as close as is reasonable to
023: * those in POSIX.
024: * <p>
025: * <b>Sample Usage.</b> Here is a full version of a bounded buffer
026: * that implements the BoundedChannel interface, written in
027: * a style reminscent of that in POSIX programming books.
028: * <pre>
029: * class CVBuffer implements BoundedChannel {
030: * private final Mutex mutex;
031: * private final CondVar notFull;
032: * private final CondVar notEmpty;
033: * private int count = 0;
034: * private int takePtr = 0;
035: * private int putPtr = 0;
036: * private final Object[] array;
037: *
038: * public CVBuffer(int capacity) {
039: * array = new Object[capacity];
040: * mutex = new Mutex();
041: * notFull = new CondVar(mutex);
042: * notEmpty = new CondVar(mutex);
043: * }
044: *
045: * public int capacity() { return array.length; }
046: *
047: * public void put(Object x) throws InterruptedException {
048: * mutex.acquire();
049: * try {
050: * while (count == array.length) {
051: * notFull.await();
052: * }
053: * array[putPtr] = x;
054: * putPtr = (putPtr + 1) % array.length;
055: * ++count;
056: * notEmpty.signal();
057: * }
058: * finally {
059: * mutex.release();
060: * }
061: * }
062: *
063: * public Object take() throws InterruptedException {
064: * Object x = null;
065: * mutex.acquire();
066: * try {
067: * while (count == 0) {
068: * notEmpty.await();
069: * }
070: * x = array[takePtr];
071: * array[takePtr] = null;
072: * takePtr = (takePtr + 1) % array.length;
073: * --count;
074: * notFull.signal();
075: * }
076: * finally {
077: * mutex.release();
078: * }
079: * return x;
080: * }
081: *
082: * public boolean offer(Object x, long msecs) throws InterruptedException {
083: * mutex.acquire();
084: * try {
085: * if (count == array.length) {
086: * notFull.timedwait(msecs);
087: * if (count == array.length)
088: * return false;
089: * }
090: * array[putPtr] = x;
091: * putPtr = (putPtr + 1) % array.length;
092: * ++count;
093: * notEmpty.signal();
094: * return true;
095: * }
096: * finally {
097: * mutex.release();
098: * }
099: * }
100: *
101: * public Object poll(long msecs) throws InterruptedException {
102: * Object x = null;
103: * mutex.acquire();
104: * try {
105: * if (count == 0) {
106: * notEmpty.timedwait(msecs);
107: * if (count == 0)
108: * return null;
109: * }
110: * x = array[takePtr];
111: * array[takePtr] = null;
112: * takePtr = (takePtr + 1) % array.length;
113: * --count;
114: * notFull.signal();
115: * }
116: * finally {
117: * mutex.release();
118: * }
119: * return x;
120: * }
121: * }
122: *
123: * </pre>
124: * @see Mutex
125: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
126:
127: **/
128:
129: public class CondVar {
130:
131: /** The mutex **/
132: protected final Sync mutex_;
133:
134: /**
135: * Create a new CondVar that relies on the given mutual
136: * exclusion lock.
137: * @param mutex A non-reentrant mutual exclusion lock.
138: * Standard usage is to supply an instance of <code>Mutex</code>,
139: * but, for example, a Semaphore initialized to 1 also works.
140: * On the other hand, many other Sync implementations would not
141: * work here, so some care is required to supply a sensible
142: * synchronization object.
143: * In normal use, the mutex should be one that is used for <em>all</em>
144: * synchronization of the object using the CondVar. Generally,
145: * to prevent nested monitor lockouts, this
146: * object should not use any native Java synchronized blocks.
147: **/
148:
149: public CondVar(Sync mutex) {
150: mutex_ = mutex;
151: }
152:
153: /**
154: * Wait for notification. This operation at least momentarily
155: * releases the mutex. The mutex is always held upon return,
156: * even if interrupted.
157: * @exception InterruptedException if the thread was interrupted
158: * before or during the wait. However, if the thread is interrupted
159: * after the wait but during mutex re-acquisition, the interruption
160: * is ignored, while still ensuring
161: * that the currentThread's interruption state stays true, so can
162: * be probed by callers.
163: **/
164: public void await() throws InterruptedException {
165: if (Thread.interrupted())
166: throw new InterruptedException();
167: try {
168: synchronized (this ) {
169: mutex_.release();
170: try {
171: wait();
172: } catch (InterruptedException ex) {
173: notify();
174: throw ex;
175: }
176: }
177: } finally {
178: // Must ignore interrupt on re-acquire
179: boolean interrupted = false;
180: for (;;) {
181: try {
182: mutex_.acquire();
183: break;
184: } catch (InterruptedException ex) {
185: interrupted = true;
186: }
187: }
188: if (interrupted) {
189: Thread.currentThread().interrupt();
190: }
191: }
192: }
193:
194: /**
195: * Wait for at most msecs for notification.
196: * This operation at least momentarily
197: * releases the mutex. The mutex is always held upon return,
198: * even if interrupted.
199: * @param msecs The time to wait. A value less than or equal to zero
200: * causes a momentarily release
201: * and re-acquire of the mutex, and always returns false.
202: * @return false if at least msecs have elapsed
203: * upon resumption; else true. A
204: * false return does NOT necessarily imply that the thread was
205: * not notified. For example, it might have been notified
206: * after the time elapsed but just before resuming.
207: * @exception InterruptedException if the thread was interrupted
208: * before or during the wait.
209: **/
210:
211: public boolean timedwait(long msecs) throws InterruptedException {
212: if (Thread.interrupted())
213: throw new InterruptedException();
214: boolean success = false;
215: try {
216: synchronized (this ) {
217: mutex_.release();
218: try {
219: if (msecs > 0) {
220: long start = System.currentTimeMillis();
221: wait(msecs);
222: success = System.currentTimeMillis() - start <= msecs;
223: }
224: } catch (InterruptedException ex) {
225: notify();
226: throw ex;
227: }
228: }
229: } finally {
230: // Must ignore interrupt on re-acquire
231: boolean interrupted = false;
232: for (;;) {
233: try {
234: mutex_.acquire();
235: break;
236: } catch (InterruptedException ex) {
237: interrupted = true;
238: }
239: }
240: if (interrupted) {
241: Thread.currentThread().interrupt();
242: }
243: }
244: return success;
245: }
246:
247: /**
248: * Notify a waiting thread.
249: * If one exists, a non-interrupted thread will return
250: * normally (i.e., not via InterruptedException) from await or timedwait.
251: **/
252: public synchronized void signal() {
253: notify();
254: }
255:
256: /** Notify all waiting threads **/
257: public synchronized void broadcast() {
258: notifyAll();
259: }
260: }
|