001: /*
002: File: BoundedLinkedQueue.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: 17Jul1998 dl Simplified by eliminating wait counts
013: 25aug1998 dl added peek
014: 10oct1999 dl lock on node object to ensure visibility
015: 27jan2000 dl setCapacity forces immediate permit reconcile
016: */
017:
018: package EDU.oswego.cs.dl.util.concurrent;
019:
020: /**
021: * A bounded variant of
022: * LinkedQueue
023: * class. This class may be
024: * preferable to
025: * BoundedBuffer
026: * because it allows a bit more
027: * concurency among puts and takes, because it does not
028: * pre-allocate fixed storage for elements, and allows
029: * capacity to be dynamically reset.
030: * On the other hand, since it allocates a node object
031: * on each put, it can be slow on systems with slow
032: * allocation and GC.
033: * Also, it may be
034: * preferable to
035: * LinkedQueue
036: * when you need to limit
037: * the capacity to prevent resource exhaustion. This protection
038: * normally does not hurt much performance-wise: When the
039: * queue is not empty or full, most puts and
040: * takes are still usually able to execute concurrently.
041: * @see LinkedQueue
042: * @see BoundedBuffer
043: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
044: **/
045:
046: public class BoundedLinkedQueue implements BoundedChannel {
047:
048: /*
049: * It might be a bit nicer if this were declared as
050: * a subclass of LinkedQueue, or a sibling class of
051: * a common abstract class. It shares much of the
052: * basic design and bookkeeping fields. But too
053: * many details differ to make this worth doing.
054: */
055:
056: /**
057: * Dummy header node of list. The first actual node, if it exists, is always
058: * at head_.next. After each take, the old first node becomes the head.
059: **/
060: protected LinkedNode head_;
061:
062: /**
063: * The last node of list. Put() appends to list, so modifies last_
064: **/
065: protected LinkedNode last_;
066:
067: /**
068: * Helper monitor. Ensures that only one put at a time executes.
069: **/
070:
071: protected final Object putGuard_ = new Object();
072:
073: /**
074: * Helper monitor. Protects and provides wait queue for takes
075: **/
076:
077: protected final Object takeGuard_ = new Object();
078:
079: /** Number of elements allowed **/
080: protected int capacity_;
081:
082: /**
083: * One side of a split permit count.
084: * The counts represent permits to do a put. (The queue is full when zero).
085: * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length.
086: * (The length is never separately recorded, so this cannot be
087: * checked explicitly.)
088: * To minimize contention between puts and takes, the
089: * put side uses up all of its permits before transfering them from
090: * the take side. The take side just increments the count upon each take.
091: * Thus, most puts and take can run independently of each other unless
092: * the queue is empty or full.
093: * Initial value is queue capacity.
094: **/
095:
096: protected int putSidePutPermits_;
097:
098: /** Number of takes since last reconcile **/
099: protected int takeSidePutPermits_ = 0;
100:
101: /**
102: * Create a queue with the given capacity
103: * @exception IllegalArgumentException if capacity less or equal to zero
104: **/
105: public BoundedLinkedQueue(int capacity) {
106: if (capacity <= 0)
107: throw new IllegalArgumentException();
108: capacity_ = capacity;
109: putSidePutPermits_ = capacity;
110: head_ = new LinkedNode(null);
111: last_ = head_;
112: }
113:
114: /**
115: * Create a queue with the current default capacity
116: **/
117:
118: public BoundedLinkedQueue() {
119: this (DefaultChannelCapacity.get());
120: }
121:
122: /**
123: * Move put permits from take side to put side;
124: * return the number of put side permits that are available.
125: * Call only under synch on puGuard_ AND this.
126: **/
127: protected final int reconcilePutPermits() {
128: putSidePutPermits_ += takeSidePutPermits_;
129: takeSidePutPermits_ = 0;
130: return putSidePutPermits_;
131: }
132:
133: /** Return the current capacity of this queue **/
134: public synchronized int capacity() {
135: return capacity_;
136: }
137:
138: /**
139: * Return the number of elements in the queue.
140: * This is only a snapshot value, that may be in the midst
141: * of changing. The returned value will be unreliable in the presence of
142: * active puts and takes, and should only be used as a heuristic
143: * estimate, for example for resource monitoring purposes.
144: **/
145: public synchronized int size() {
146: /*
147: This should ideally synch on putGuard_, but
148: doing so would cause it to block waiting for an in-progress
149: put, which might be stuck. So we instead use whatever
150: value of putSidePutPermits_ that we happen to read.
151: */
152: return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
153: }
154:
155: /**
156: * Reset the capacity of this queue.
157: * If the new capacity is less than the old capacity,
158: * existing elements are NOT removed, but
159: * incoming puts will not proceed until the number of elements
160: * is less than the new capacity.
161: * @exception IllegalArgumentException if capacity less or equal to zero
162: **/
163:
164: public void setCapacity(int newCapacity) {
165: if (newCapacity <= 0)
166: throw new IllegalArgumentException();
167: synchronized (putGuard_) {
168: synchronized (this ) {
169: takeSidePutPermits_ += (newCapacity - capacity_);
170: capacity_ = newCapacity;
171:
172: // Force immediate reconcilation.
173: reconcilePutPermits();
174: notifyAll();
175: }
176: }
177: }
178:
179: /** Main mechanics for take/poll **/
180: protected synchronized Object extract() {
181: synchronized (head_) {
182: Object x = null;
183: LinkedNode first = head_.next;
184: if (first != null) {
185: x = first.value;
186: first.value = null;
187: head_ = first;
188: ++takeSidePutPermits_;
189: notify();
190: }
191: return x;
192: }
193: }
194:
195: public Object peek() {
196: synchronized (head_) {
197: LinkedNode first = head_.next;
198: if (first != null)
199: return first.value;
200: else
201: return null;
202: }
203: }
204:
205: public Object take() throws InterruptedException {
206: if (Thread.interrupted())
207: throw new InterruptedException();
208: Object x = extract();
209: if (x != null)
210: return x;
211: else {
212: synchronized (takeGuard_) {
213: try {
214: for (;;) {
215: x = extract();
216: if (x != null) {
217: return x;
218: } else {
219: takeGuard_.wait();
220: }
221: }
222: } catch (InterruptedException ex) {
223: takeGuard_.notify();
224: throw ex;
225: }
226: }
227: }
228: }
229:
230: public Object poll(long msecs) throws InterruptedException {
231: if (Thread.interrupted())
232: throw new InterruptedException();
233: Object x = extract();
234: if (x != null)
235: return x;
236: else {
237: synchronized (takeGuard_) {
238: try {
239: long waitTime = msecs;
240: long start = (msecs <= 0) ? 0 : System
241: .currentTimeMillis();
242: for (;;) {
243: x = extract();
244: if (x != null || waitTime <= 0) {
245: return x;
246: } else {
247: takeGuard_.wait(waitTime);
248: waitTime = msecs
249: - (System.currentTimeMillis() - start);
250: }
251: }
252: } catch (InterruptedException ex) {
253: takeGuard_.notify();
254: throw ex;
255: }
256: }
257: }
258: }
259:
260: /** Notify a waiting take if needed **/
261: protected final void allowTake() {
262: synchronized (takeGuard_) {
263: takeGuard_.notify();
264: }
265: }
266:
267: /**
268: * Create and insert a node.
269: * Call only under synch on putGuard_
270: **/
271: protected void insert(Object x) {
272: --putSidePutPermits_;
273: LinkedNode p = new LinkedNode(x);
274: synchronized (last_) {
275: last_.next = p;
276: last_ = p;
277: }
278: }
279:
280: /*
281: put and offer(ms) differ only in policy before insert/allowTake
282: */
283:
284: public void put(Object x) throws InterruptedException {
285: if (x == null)
286: throw new IllegalArgumentException();
287: if (Thread.interrupted())
288: throw new InterruptedException();
289:
290: synchronized (putGuard_) {
291:
292: if (putSidePutPermits_ <= 0) { // wait for permit.
293: synchronized (this ) {
294: if (reconcilePutPermits() <= 0) {
295: try {
296: for (;;) {
297: wait();
298: if (reconcilePutPermits() > 0) {
299: break;
300: }
301: }
302: } catch (InterruptedException ex) {
303: notify();
304: throw ex;
305: }
306: }
307: }
308: }
309: insert(x);
310: }
311: // call outside of lock to loosen put/take coupling
312: allowTake();
313: }
314:
315: public boolean offer(Object x, long msecs)
316: throws InterruptedException {
317: if (x == null)
318: throw new IllegalArgumentException();
319: if (Thread.interrupted())
320: throw new InterruptedException();
321:
322: synchronized (putGuard_) {
323:
324: if (putSidePutPermits_ <= 0) {
325: synchronized (this ) {
326: if (reconcilePutPermits() <= 0) {
327: if (msecs <= 0)
328: return false;
329: else {
330: try {
331: long waitTime = msecs;
332: long start = System.currentTimeMillis();
333:
334: for (;;) {
335: wait(waitTime);
336: if (reconcilePutPermits() > 0) {
337: break;
338: } else {
339: waitTime = msecs
340: - (System
341: .currentTimeMillis() - start);
342: if (waitTime <= 0) {
343: return false;
344: }
345: }
346: }
347: } catch (InterruptedException ex) {
348: notify();
349: throw ex;
350: }
351: }
352: }
353: }
354: }
355:
356: insert(x);
357: }
358:
359: allowTake();
360: return true;
361: }
362:
363: public boolean isEmpty() {
364: synchronized (head_) {
365: return head_.next == null;
366: }
367: }
368:
369: }
|