001: /*
002: File: BoundedBuffer.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: 17Jul1998 dl Simplified by eliminating wait counts
013: 25aug1998 dl added peek
014: 5May1999 dl replace % with conditional (slightly faster)
015: */
016:
017: package org.dbunit.util.concurrent;
018:
019: import org.slf4j.Logger;
020: import org.slf4j.LoggerFactory;
021:
022: /**
023: * Efficient array-based bounded buffer class.
024: * Adapted from CPJ, chapter 8, which describes design.
025: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
026: **/
027:
028: public class BoundedBuffer implements BoundedChannel {
029:
030: /**
031: * Logger for this class
032: */
033: private static final Logger logger = LoggerFactory
034: .getLogger(BoundedBuffer.class);
035:
036: protected final Object[] array_; // the elements
037:
038: protected int takePtr_ = 0; // circular indices
039: protected int putPtr_ = 0;
040:
041: protected int usedSlots_ = 0; // length
042: protected int emptySlots_; // capacity - length
043:
044: /**
045: * Helper monitor to handle puts.
046: **/
047: protected final Object putMonitor_ = new Object();
048:
049: /**
050: * Create a BoundedBuffer with the given capacity.
051: * @exception IllegalArgumentException if capacity less or equal to zero
052: **/
053: public BoundedBuffer(int capacity) throws IllegalArgumentException {
054: if (capacity <= 0)
055: throw new IllegalArgumentException();
056: array_ = new Object[capacity];
057: emptySlots_ = capacity;
058: }
059:
060: /**
061: * Create a buffer with the current default capacity
062: **/
063:
064: public BoundedBuffer() {
065: this (DefaultChannelCapacity.get());
066: }
067:
068: /**
069: * Return the number of elements in the buffer.
070: * This is only a snapshot value, that may change
071: * immediately after returning.
072: **/
073: public synchronized int size() {
074: logger.debug("size() - start");
075: return usedSlots_;
076: }
077:
078: public int capacity() {
079: logger.debug("capacity() - start");
080: return array_.length;
081: }
082:
083: protected void incEmptySlots() {
084: logger.debug("incEmptySlots() - start");
085:
086: synchronized (putMonitor_) {
087: ++emptySlots_;
088: putMonitor_.notify();
089: }
090: }
091:
092: protected synchronized void incUsedSlots() {
093: logger.debug("incUsedSlots() - start");
094:
095: ++usedSlots_;
096: notify();
097: }
098:
099: protected final void insert(Object x) {
100: logger.debug("insert(x=" + x + ") - start");
101: // mechanics of put
102: --emptySlots_;
103: array_[putPtr_] = x;
104: if (++putPtr_ >= array_.length)
105: putPtr_ = 0;
106: }
107:
108: protected final Object extract() {
109: logger.debug("extract() - start");
110: // mechanics of take
111: --usedSlots_;
112: Object old = array_[takePtr_];
113: array_[takePtr_] = null;
114: if (++takePtr_ >= array_.length)
115: takePtr_ = 0;
116: return old;
117: }
118:
119: public Object peek() {
120: logger.debug("peek() - start");
121:
122: synchronized (this ) {
123: if (usedSlots_ > 0)
124: return array_[takePtr_];
125: else
126: return null;
127: }
128: }
129:
130: public void put(Object x) throws InterruptedException {
131: logger.debug("put(x=" + x + ") - start");
132:
133: if (x == null)
134: throw new IllegalArgumentException();
135: if (Thread.interrupted())
136: throw new InterruptedException();
137:
138: synchronized (putMonitor_) {
139: while (emptySlots_ <= 0) {
140: try {
141: putMonitor_.wait();
142: } catch (InterruptedException ex) {
143: logger.error("put()", ex);
144:
145: putMonitor_.notify();
146: throw ex;
147: }
148: }
149: insert(x);
150: }
151: incUsedSlots();
152: }
153:
154: public boolean offer(Object x, long msecs)
155: throws InterruptedException {
156: logger.debug("offer(x=" + x + ", msecs=" + msecs + ") - start");
157:
158: if (x == null)
159: throw new IllegalArgumentException();
160: if (Thread.interrupted())
161: throw new InterruptedException();
162:
163: synchronized (putMonitor_) {
164: long start = (msecs <= 0) ? 0 : System.currentTimeMillis();
165: long waitTime = msecs;
166: while (emptySlots_ <= 0) {
167: if (waitTime <= 0)
168: return false;
169: try {
170: putMonitor_.wait(waitTime);
171: } catch (InterruptedException ex) {
172: logger.error("offer()", ex);
173:
174: putMonitor_.notify();
175: throw ex;
176: }
177: waitTime = msecs - (System.currentTimeMillis() - start);
178: }
179: insert(x);
180: }
181: incUsedSlots();
182: return true;
183: }
184:
185: public Object take() throws InterruptedException {
186: logger.debug("take() - start");
187:
188: if (Thread.interrupted())
189: throw new InterruptedException();
190: Object old = null;
191: synchronized (this ) {
192: while (usedSlots_ <= 0) {
193: try {
194: wait();
195: } catch (InterruptedException ex) {
196: logger.error("take()", ex);
197:
198: notify();
199: throw ex;
200: }
201: }
202: old = extract();
203: }
204: incEmptySlots();
205: return old;
206: }
207:
208: public Object poll(long msecs) throws InterruptedException {
209: logger.debug("poll(msecs=" + msecs + ") - start");
210:
211: if (Thread.interrupted())
212: throw new InterruptedException();
213: Object old = null;
214: synchronized (this ) {
215: long start = (msecs <= 0) ? 0 : System.currentTimeMillis();
216: long waitTime = msecs;
217:
218: while (usedSlots_ <= 0) {
219: if (waitTime <= 0)
220: return null;
221: try {
222: wait(waitTime);
223: } catch (InterruptedException ex) {
224: logger.error("poll()", ex);
225:
226: notify();
227: throw ex;
228: }
229: waitTime = msecs - (System.currentTimeMillis() - start);
230:
231: }
232: old = extract();
233: }
234: incEmptySlots();
235: return old;
236: }
237:
238: }
|