001: /*
002: File: BoundedLinkedQueue.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: 17Jul1998 dl Simplified by eliminating wait counts
013: 25aug1998 dl added peek
014: 10oct1999 dl lock on node object to ensure visibility
015: 27jan2000 dl setCapacity forces immediate permit reconcile
016: */
017:
018: package org.dbunit.util.concurrent;
019:
020: import org.slf4j.Logger;
021: import org.slf4j.LoggerFactory;
022:
023: /**
024: * A bounded variant of
025: * LinkedQueue
026: * class. This class may be
027: * preferable to
028: * BoundedBuffer
029: * because it allows a bit more
030: * concurency among puts and takes, because it does not
031: * pre-allocate fixed storage for elements, and allows
032: * capacity to be dynamically reset.
033: * On the other hand, since it allocates a node object
034: * on each put, it can be slow on systems with slow
035: * allocation and GC.
036: * Also, it may be
037: * preferable to
038: * LinkedQueue
039: * when you need to limit
040: * the capacity to prevent resource exhaustion. This protection
041: * normally does not hurt much performance-wise: When the
042: * queue is not empty or full, most puts and
043: * takes are still usually able to execute concurrently.
044: * @see LinkedQueue
045: * @see BoundedBuffer
046: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
047: **/
048:
049: public class BoundedLinkedQueue implements BoundedChannel {
050:
051: /**
052: * Logger for this class
053: */
054: private static final Logger logger = LoggerFactory
055: .getLogger(BoundedLinkedQueue.class);
056:
057: /*
058: * It might be a bit nicer if this were declared as
059: * a subclass of LinkedQueue, or a sibling class of
060: * a common abstract class. It shares much of the
061: * basic design and bookkeeping fields. But too
062: * many details differ to make this worth doing.
063: */
064:
065: /**
066: * Dummy header node of list. The first actual node, if it exists, is always
067: * at head_.next. After each take, the old first node becomes the head.
068: **/
069: protected LinkedNode head_;
070:
071: /**
072: * The last node of list. Put() appends to list, so modifies last_
073: **/
074: protected LinkedNode last_;
075:
076: /**
077: * Helper monitor. Ensures that only one put at a time executes.
078: **/
079:
080: protected final Object putGuard_ = new Object();
081:
082: /**
083: * Helper monitor. Protects and provides wait queue for takes
084: **/
085:
086: protected final Object takeGuard_ = new Object();
087:
088: /** Number of elements allowed **/
089: protected int capacity_;
090:
091: /**
092: * One side of a split permit count.
093: * The counts represent permits to do a put. (The queue is full when zero).
094: * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length.
095: * (The length is never separately recorded, so this cannot be
096: * checked explicitly.)
097: * To minimize contention between puts and takes, the
098: * put side uses up all of its permits before transfering them from
099: * the take side. The take side just increments the count upon each take.
100: * Thus, most puts and take can run independently of each other unless
101: * the queue is empty or full.
102: * Initial value is queue capacity.
103: **/
104:
105: protected int putSidePutPermits_;
106:
107: /** Number of takes since last reconcile **/
108: protected int takeSidePutPermits_ = 0;
109:
110: /**
111: * Create a queue with the given capacity
112: * @exception IllegalArgumentException if capacity less or equal to zero
113: **/
114: public BoundedLinkedQueue(int capacity) {
115: if (capacity <= 0)
116: throw new IllegalArgumentException();
117: capacity_ = capacity;
118: putSidePutPermits_ = capacity;
119: head_ = new LinkedNode(null);
120: last_ = head_;
121: }
122:
123: /**
124: * Create a queue with the current default capacity
125: **/
126:
127: public BoundedLinkedQueue() {
128: this (DefaultChannelCapacity.get());
129: }
130:
131: /**
132: * Move put permits from take side to put side;
133: * return the number of put side permits that are available.
134: * Call only under synch on puGuard_ AND this.
135: **/
136: protected final int reconcilePutPermits() {
137: logger.debug("reconcilePutPermits() - start");
138:
139: putSidePutPermits_ += takeSidePutPermits_;
140: takeSidePutPermits_ = 0;
141: return putSidePutPermits_;
142: }
143:
144: /** Return the current capacity of this queue **/
145: public synchronized int capacity() {
146: logger.debug("capacity() - start");
147: return capacity_;
148: }
149:
150: /**
151: * Return the number of elements in the queue.
152: * This is only a snapshot value, that may be in the midst
153: * of changing. The returned value will be unreliable in the presence of
154: * active puts and takes, and should only be used as a heuristic
155: * estimate, for example for resource monitoring purposes.
156: **/
157: public synchronized int size() {
158: logger.debug("size() - start");
159:
160: /*
161: This should ideally synch on putGuard_, but
162: doing so would cause it to block waiting for an in-progress
163: put, which might be stuck. So we instead use whatever
164: value of putSidePutPermits_ that we happen to read.
165: */
166: return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
167: }
168:
169: /**
170: * Reset the capacity of this queue.
171: * If the new capacity is less than the old capacity,
172: * existing elements are NOT removed, but
173: * incoming puts will not proceed until the number of elements
174: * is less than the new capacity.
175: * @exception IllegalArgumentException if capacity less or equal to zero
176: **/
177:
178: public void setCapacity(int newCapacity) {
179: logger.debug("setCapacity(newCapacity=" + newCapacity
180: + ") - start");
181:
182: if (newCapacity <= 0)
183: throw new IllegalArgumentException();
184: synchronized (putGuard_) {
185: synchronized (this ) {
186: takeSidePutPermits_ += (newCapacity - capacity_);
187: capacity_ = newCapacity;
188:
189: // Force immediate reconcilation.
190: reconcilePutPermits();
191: notifyAll();
192: }
193: }
194: }
195:
196: /** Main mechanics for take/poll **/
197: protected synchronized Object extract() {
198: logger.debug("extract() - start");
199:
200: synchronized (head_) {
201: Object x = null;
202: LinkedNode first = head_.next;
203: if (first != null) {
204: x = first.value;
205: first.value = null;
206: head_ = first;
207: ++takeSidePutPermits_;
208: notify();
209: }
210: return x;
211: }
212: }
213:
214: public Object peek() {
215: logger.debug("peek() - start");
216:
217: synchronized (head_) {
218: LinkedNode first = head_.next;
219: if (first != null)
220: return first.value;
221: else
222: return null;
223: }
224: }
225:
226: public Object take() throws InterruptedException {
227: logger.debug("take() - start");
228:
229: if (Thread.interrupted())
230: throw new InterruptedException();
231: Object x = extract();
232: if (x != null)
233: return x;
234: else {
235: synchronized (takeGuard_) {
236: try {
237: for (;;) {
238: x = extract();
239: if (x != null) {
240: return x;
241: } else {
242: takeGuard_.wait();
243: }
244: }
245: } catch (InterruptedException ex) {
246: logger.error("take()", ex);
247:
248: takeGuard_.notify();
249: throw ex;
250: }
251: }
252: }
253: }
254:
255: public Object poll(long msecs) throws InterruptedException {
256: logger.debug("poll(msecs=" + msecs + ") - start");
257:
258: if (Thread.interrupted())
259: throw new InterruptedException();
260: Object x = extract();
261: if (x != null)
262: return x;
263: else {
264: synchronized (takeGuard_) {
265: try {
266: long waitTime = msecs;
267: long start = (msecs <= 0) ? 0 : System
268: .currentTimeMillis();
269: for (;;) {
270: x = extract();
271: if (x != null || waitTime <= 0) {
272: return x;
273: } else {
274: takeGuard_.wait(waitTime);
275: waitTime = msecs
276: - (System.currentTimeMillis() - start);
277: }
278: }
279: } catch (InterruptedException ex) {
280: logger.error("poll()", ex);
281:
282: takeGuard_.notify();
283: throw ex;
284: }
285: }
286: }
287: }
288:
289: /** Notify a waiting take if needed **/
290: protected final void allowTake() {
291: logger.debug("allowTake() - start");
292:
293: synchronized (takeGuard_) {
294: takeGuard_.notify();
295: }
296: }
297:
298: /**
299: * Create and insert a node.
300: * Call only under synch on putGuard_
301: **/
302: protected void insert(Object x) {
303: logger.debug("insert(x=" + x + ") - start");
304:
305: --putSidePutPermits_;
306: LinkedNode p = new LinkedNode(x);
307: synchronized (last_) {
308: last_.next = p;
309: last_ = p;
310: }
311: }
312:
313: /*
314: put and offer(ms) differ only in policy before insert/allowTake
315: */
316:
317: public void put(Object x) throws InterruptedException {
318: logger.debug("put(x=" + x + ") - start");
319:
320: if (x == null)
321: throw new IllegalArgumentException();
322: if (Thread.interrupted())
323: throw new InterruptedException();
324:
325: synchronized (putGuard_) {
326:
327: if (putSidePutPermits_ <= 0) { // wait for permit.
328: synchronized (this ) {
329: if (reconcilePutPermits() <= 0) {
330: try {
331: for (;;) {
332: wait();
333: if (reconcilePutPermits() > 0) {
334: break;
335: }
336: }
337: } catch (InterruptedException ex) {
338: logger.error("put()", ex);
339:
340: notify();
341: throw ex;
342: }
343: }
344: }
345: }
346: insert(x);
347: }
348: // call outside of lock to loosen put/take coupling
349: allowTake();
350: }
351:
352: public boolean offer(Object x, long msecs)
353: throws InterruptedException {
354: logger.debug("offer(x=" + x + ", msecs=" + msecs + ") - start");
355:
356: if (x == null)
357: throw new IllegalArgumentException();
358: if (Thread.interrupted())
359: throw new InterruptedException();
360:
361: synchronized (putGuard_) {
362:
363: if (putSidePutPermits_ <= 0) {
364: synchronized (this ) {
365: if (reconcilePutPermits() <= 0) {
366: if (msecs <= 0)
367: return false;
368: else {
369: try {
370: long waitTime = msecs;
371: long start = System.currentTimeMillis();
372:
373: for (;;) {
374: wait(waitTime);
375: if (reconcilePutPermits() > 0) {
376: break;
377: } else {
378: waitTime = msecs
379: - (System
380: .currentTimeMillis() - start);
381: if (waitTime <= 0) {
382: return false;
383: }
384: }
385: }
386: } catch (InterruptedException ex) {
387: logger.error("offer()", ex);
388:
389: notify();
390: throw ex;
391: }
392: }
393: }
394: }
395: }
396:
397: insert(x);
398: }
399:
400: allowTake();
401: return true;
402: }
403:
404: public boolean isEmpty() {
405: logger.debug("isEmpty() - start");
406:
407: synchronized (head_) {
408: return head_.next == null;
409: }
410: }
411:
412: }
|