001: /*
002: File: BoundedBuffer.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: 17Jul1998 dl Simplified by eliminating wait counts
013: 25aug1998 dl added peek
014: 5May1999 dl replace % with conditional (slightly faster)
015: */
016:
017: package EDU.oswego.cs.dl.util.concurrent;
018:
019: /**
020: * Efficient array-based bounded buffer class.
021: * Adapted from CPJ, chapter 8, which describes design.
022: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
023: **/
024:
025: public class BoundedBuffer implements BoundedChannel {
026:
027: protected final Object[] array_; // the elements
028:
029: protected int takePtr_ = 0; // circular indices
030: protected int putPtr_ = 0;
031:
032: protected int usedSlots_ = 0; // length
033: protected int emptySlots_; // capacity - length
034:
035: /**
036: * Helper monitor to handle puts.
037: **/
038: protected final Object putMonitor_ = new Object();
039:
040: /**
041: * Create a BoundedBuffer with the given capacity.
042: * @exception IllegalArgumentException if capacity less or equal to zero
043: **/
044: public BoundedBuffer(int capacity) throws IllegalArgumentException {
045: if (capacity <= 0)
046: throw new IllegalArgumentException();
047: array_ = new Object[capacity];
048: emptySlots_ = capacity;
049: }
050:
051: /**
052: * Create a buffer with the current default capacity
053: **/
054:
055: public BoundedBuffer() {
056: this (DefaultChannelCapacity.get());
057: }
058:
059: /**
060: * Return the number of elements in the buffer.
061: * This is only a snapshot value, that may change
062: * immediately after returning.
063: **/
064: public synchronized int size() {
065: return usedSlots_;
066: }
067:
068: public int capacity() {
069: return array_.length;
070: }
071:
072: protected void incEmptySlots() {
073: synchronized (putMonitor_) {
074: ++emptySlots_;
075: putMonitor_.notify();
076: }
077: }
078:
079: protected synchronized void incUsedSlots() {
080: ++usedSlots_;
081: notify();
082: }
083:
084: protected final void insert(Object x) { // mechanics of put
085: --emptySlots_;
086: array_[putPtr_] = x;
087: if (++putPtr_ >= array_.length)
088: putPtr_ = 0;
089: }
090:
091: protected final Object extract() { // mechanics of take
092: --usedSlots_;
093: Object old = array_[takePtr_];
094: array_[takePtr_] = null;
095: if (++takePtr_ >= array_.length)
096: takePtr_ = 0;
097: return old;
098: }
099:
100: public Object peek() {
101: synchronized (this ) {
102: if (usedSlots_ > 0)
103: return array_[takePtr_];
104: else
105: return null;
106: }
107: }
108:
109: public void put(Object x) throws InterruptedException {
110: if (x == null)
111: throw new IllegalArgumentException();
112: if (Thread.interrupted())
113: throw new InterruptedException();
114:
115: synchronized (putMonitor_) {
116: while (emptySlots_ <= 0) {
117: try {
118: putMonitor_.wait();
119: } catch (InterruptedException ex) {
120: putMonitor_.notify();
121: throw ex;
122: }
123: }
124: insert(x);
125: }
126: incUsedSlots();
127: }
128:
129: public boolean offer(Object x, long msecs)
130: throws InterruptedException {
131: if (x == null)
132: throw new IllegalArgumentException();
133: if (Thread.interrupted())
134: throw new InterruptedException();
135:
136: synchronized (putMonitor_) {
137: long start = (msecs <= 0) ? 0 : System.currentTimeMillis();
138: long waitTime = msecs;
139: while (emptySlots_ <= 0) {
140: if (waitTime <= 0)
141: return false;
142: try {
143: putMonitor_.wait(waitTime);
144: } catch (InterruptedException ex) {
145: putMonitor_.notify();
146: throw ex;
147: }
148: waitTime = msecs - (System.currentTimeMillis() - start);
149: }
150: insert(x);
151: }
152: incUsedSlots();
153: return true;
154: }
155:
156: public Object take() throws InterruptedException {
157: if (Thread.interrupted())
158: throw new InterruptedException();
159: Object old = null;
160: synchronized (this ) {
161: while (usedSlots_ <= 0) {
162: try {
163: wait();
164: } catch (InterruptedException ex) {
165: notify();
166: throw ex;
167: }
168: }
169: old = extract();
170: }
171: incEmptySlots();
172: return old;
173: }
174:
175: public Object poll(long msecs) throws InterruptedException {
176: if (Thread.interrupted())
177: throw new InterruptedException();
178: Object old = null;
179: synchronized (this ) {
180: long start = (msecs <= 0) ? 0 : System.currentTimeMillis();
181: long waitTime = msecs;
182:
183: while (usedSlots_ <= 0) {
184: if (waitTime <= 0)
185: return null;
186: try {
187: wait(waitTime);
188: } catch (InterruptedException ex) {
189: notify();
190: throw ex;
191: }
192: waitTime = msecs - (System.currentTimeMillis() - start);
193:
194: }
195: old = extract();
196: }
197: incEmptySlots();
198: return old;
199: }
200:
201: }
|