001: /*
002: * Written by Doug Lea with assistance from members of JCP JSR-166
003: * Expert Group and released to the public domain, as explained at
004: * http://creativecommons.org/licenses/publicdomain
005: */
006:
007: package org.drools.util.concurrent.locks;
008:
009: import java.util.Collection;
010: import java.util.Iterator;
011: import java.util.NoSuchElementException;
012:
013: /**
014: * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
015: * linked nodes.
016: * This queue orders elements FIFO (first-in-first-out).
017: * The <em>head</em> of the queue is that element that has been on the
018: * queue the longest time.
019: * The <em>tail</em> of the queue is that element that has been on the
020: * queue the shortest time. New elements
021: * are inserted at the tail of the queue, and the queue retrieval
022: * operations obtain elements at the head of the queue.
023: * Linked queues typically have higher throughput than array-based queues but
024: * less predictable performance in most concurrent applications.
025: *
026: * <p> The optional capacity bound constructor argument serves as a
027: * way to prevent excessive queue expansion. The capacity, if unspecified,
028: * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
029: * dynamically created upon each insertion unless this would bring the
030: * queue above capacity.
031: *
032: * <p>This class and its iterator implement all of the
033: * <em>optional</em> methods of the {@link Collection} and {@link
034: * Iterator} interfaces.
035: *
036: * <p>This class is a member of the
037: * <a href="{@docRoot}/../technotes/guides/collections/index.html">
038: * Java Collections Framework</a>.
039: *
040: * @since 1.5
041: * @author Doug Lea
042: *
043: */
044: public class LinkedBlockingQueue extends AbstractQueue implements
045: BlockingQueue, java.io.Serializable {
046: private static final long serialVersionUID = 400L;
047:
048: /*
049: * A variant of the "two lock queue" algorithm. The putLock gates
050: * entry to put (and offer), and has an associated condition for
051: * waiting puts. Similarly for the takeLock. The "count" field
052: * that they both rely on is maintained as an atomic to avoid
053: * needing to get both locks in most cases. Also, to minimize need
054: * for puts to get takeLock and vice-versa, cascading notifies are
055: * used. When a put notices that it has enabled at least one take,
056: * it signals taker. That taker in turn signals others if more
057: * items have been entered since the signal. And symmetrically for
058: * takes signalling puts. Operations such as remove(Object) and
059: * iterators acquire both locks.
060: */
061:
062: /**
063: * Linked list node class
064: */
065: static class Node {
066: /** The item, volatile to ensure barrier separating write and read */
067: volatile Object item;
068: Node next;
069:
070: Node(Object x) {
071: item = x;
072: }
073: }
074:
075: /** The capacity bound, or Integer.MAX_VALUE if none */
076: private final int capacity;
077:
078: /** Current number of elements */
079: private volatile int count = 0;
080:
081: /** Head of linked list */
082: private transient Node head;
083:
084: /** Tail of linked list */
085: private transient Node last;
086:
087: /** Lock held by take, poll, etc */
088: private final Object takeLock = new SerializableLock();
089:
090: /** Lock held by put, offer, etc */
091: private final Object putLock = new SerializableLock();
092:
093: /**
094: * Signals a waiting take. Called only from put/offer (which do not
095: * otherwise ordinarily lock takeLock.)
096: */
097: private void signalNotEmpty() {
098: synchronized (takeLock) {
099: takeLock.notify();
100: }
101: }
102:
103: /**
104: * Signals a waiting put. Called only from take/poll.
105: */
106: private void signalNotFull() {
107: synchronized (putLock) {
108: putLock.notify();
109: }
110: }
111:
112: /**
113: * Creates a node and links it at end of queue.
114: * @param x the item
115: */
116: private void insert(Object x) {
117: last = last.next = new Node(x);
118: }
119:
120: /**
121: * Removes a node from head of queue,
122: * @return the node
123: */
124: private Object extract() {
125: Node first = head.next;
126: head = first;
127: Object x = first.item;
128: first.item = null;
129: return x;
130: }
131:
132: /**
133: * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
134: * {@link Integer#MAX_VALUE}.
135: */
136: public LinkedBlockingQueue() {
137: this (Integer.MAX_VALUE);
138: }
139:
140: /**
141: * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
142: *
143: * @param capacity the capacity of this queue
144: * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
145: * than zero
146: */
147: public LinkedBlockingQueue(int capacity) {
148: if (capacity <= 0)
149: throw new IllegalArgumentException();
150: this .capacity = capacity;
151: last = head = new Node(null);
152: }
153:
154: /**
155: * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
156: * {@link Integer#MAX_VALUE}, initially containing the elements of the
157: * given collection,
158: * added in traversal order of the collection's iterator.
159: *
160: * @param c the collection of elements to initially contain
161: * @throws NullPointerException if the specified collection or any
162: * of its elements are null
163: */
164: public LinkedBlockingQueue(Collection c) {
165: this (Integer.MAX_VALUE);
166: for (Iterator itr = c.iterator(); itr.hasNext();) {
167: Object e = itr.next();
168: add(e);
169: }
170: }
171:
172: // this doc comment is overridden to remove the reference to collections
173: // greater in size than Integer.MAX_VALUE
174: /**
175: * Returns the number of elements in this queue.
176: *
177: * @return the number of elements in this queue
178: */
179: public int size() {
180: return count;
181: }
182:
183: // this doc comment is a modified copy of the inherited doc comment,
184: // without the reference to unlimited queues.
185: /**
186: * Returns the number of additional elements that this queue can ideally
187: * (in the absence of memory or resource constraints) accept without
188: * blocking. This is always equal to the initial capacity of this queue
189: * less the current <tt>size</tt> of this queue.
190: *
191: * <p>Note that you <em>cannot</em> always tell if an attempt to insert
192: * an element will succeed by inspecting <tt>remainingCapacity</tt>
193: * because it may be the case that another thread is about to
194: * insert or remove an element.
195: */
196: public int remainingCapacity() {
197: return capacity - count;
198: }
199:
200: /**
201: * Inserts the specified element at the tail of this queue, waiting if
202: * necessary for space to become available.
203: *
204: * @throws InterruptedException {@inheritDoc}
205: * @throws NullPointerException {@inheritDoc}
206: */
207: public void put(Object e) throws InterruptedException {
208: if (e == null)
209: throw new NullPointerException();
210: // Note: convention in all put/take/etc is to preset
211: // local var holding count negative to indicate failure unless set.
212: int c = -1;
213: synchronized (putLock) {
214: /*
215: * Note that count is used in wait guard even though it is
216: * not protected by lock. This works because count can
217: * only decrease at this point (all other puts are shut
218: * out by lock), and we (or some other waiting put) are
219: * signalled if it ever changes from
220: * capacity. Similarly for all other uses of count in
221: * other wait guards.
222: */
223: try {
224: while (count == capacity)
225: putLock.wait();
226: } catch (InterruptedException ie) {
227: putLock.notify(); // propagate to a non-interrupted thread
228: throw ie;
229: }
230: insert(e);
231: synchronized (this ) {
232: c = count++;
233: }
234: if (c + 1 < capacity)
235: putLock.notify();
236: }
237:
238: if (c == 0)
239: signalNotEmpty();
240: }
241:
242: /**
243: * Inserts the specified element at the tail of this queue, waiting if
244: * necessary up to the specified wait time for space to become available.
245: *
246: * @return <tt>true</tt> if successful, or <tt>false</tt> if
247: * the specified waiting time elapses before space is available.
248: * @throws InterruptedException {@inheritDoc}
249: * @throws NullPointerException {@inheritDoc}
250: */
251: public boolean offer(Object e, long timeout, TimeUnit unit)
252: throws InterruptedException {
253:
254: if (e == null)
255: throw new NullPointerException();
256: long nanos = unit.toNanos(timeout);
257: int c = -1;
258: synchronized (putLock) {
259: long deadline = Utils.nanoTime() + nanos;
260: for (;;) {
261: if (count < capacity) {
262: insert(e);
263: synchronized (this ) {
264: c = count++;
265: }
266: if (c + 1 < capacity)
267: putLock.notify();
268: break;
269: }
270: if (nanos <= 0)
271: return false;
272: try {
273: TimeUnit.NANOSECONDS.timedWait(putLock, nanos);
274: nanos = deadline - Utils.nanoTime();
275: } catch (InterruptedException ie) {
276: putLock.notify(); // propagate to a non-interrupted thread
277: throw ie;
278: }
279: }
280: }
281: if (c == 0)
282: signalNotEmpty();
283: return true;
284: }
285:
286: /**
287: * Inserts the specified element at the tail of this queue if it is
288: * possible to do so immediately without exceeding the queue's capacity,
289: * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
290: * is full.
291: * When using a capacity-restricted queue, this method is generally
292: * preferable to method {@link BlockingQueue#add add}, which can fail to
293: * insert an element only by throwing an exception.
294: *
295: * @throws NullPointerException if the specified element is null
296: */
297: public boolean offer(Object e) {
298: if (e == null)
299: throw new NullPointerException();
300: if (count == capacity)
301: return false;
302: int c = -1;
303: synchronized (putLock) {
304: if (count < capacity) {
305: insert(e);
306: synchronized (this ) {
307: c = count++;
308: }
309: if (c + 1 < capacity)
310: putLock.notify();
311: }
312: }
313: if (c == 0)
314: signalNotEmpty();
315: return c >= 0;
316: }
317:
318: public Object take() throws InterruptedException {
319: Object x;
320: int c = -1;
321: synchronized (takeLock) {
322: try {
323: while (count == 0)
324: takeLock.wait();
325: } catch (InterruptedException ie) {
326: takeLock.notify(); // propagate to a non-interrupted thread
327: throw ie;
328: }
329:
330: x = extract();
331: synchronized (this ) {
332: c = count--;
333: }
334: if (c > 1)
335: takeLock.notify();
336: }
337: if (c == capacity)
338: signalNotFull();
339: return x;
340: }
341:
342: public Object poll(long timeout, TimeUnit unit)
343: throws InterruptedException {
344: Object x = null;
345: int c = -1;
346: long nanos = unit.toNanos(timeout);
347: synchronized (takeLock) {
348: long deadline = Utils.nanoTime() + nanos;
349: for (;;) {
350: if (count > 0) {
351: x = extract();
352: synchronized (this ) {
353: c = count--;
354: }
355: if (c > 1)
356: takeLock.notify();
357: break;
358: }
359: if (nanos <= 0)
360: return null;
361: try {
362: TimeUnit.NANOSECONDS.timedWait(takeLock, nanos);
363: nanos = deadline - Utils.nanoTime();
364: } catch (InterruptedException ie) {
365: takeLock.notify(); // propagate to a non-interrupted thread
366: throw ie;
367: }
368: }
369: }
370: if (c == capacity)
371: signalNotFull();
372: return x;
373: }
374:
375: public Object poll() {
376: if (count == 0)
377: return null;
378: Object x = null;
379: int c = -1;
380: synchronized (takeLock) {
381: if (count > 0) {
382: x = extract();
383: synchronized (this ) {
384: c = count--;
385: }
386: if (c > 1)
387: takeLock.notify();
388: }
389: }
390: if (c == capacity)
391: signalNotFull();
392: return x;
393: }
394:
395: public Object peek() {
396: if (count == 0)
397: return null;
398: synchronized (takeLock) {
399: Node first = head.next;
400: if (first == null)
401: return null;
402: else
403: return first.item;
404: }
405: }
406:
407: /**
408: * Removes a single instance of the specified element from this queue,
409: * if it is present. More formally, removes an element <tt>e</tt> such
410: * that <tt>o.equals(e)</tt>, if this queue contains one or more such
411: * elements.
412: * Returns <tt>true</tt> if this queue contained the specified element
413: * (or equivalently, if this queue changed as a result of the call).
414: *
415: * @param o element to be removed from this queue, if present
416: * @return <tt>true</tt> if this queue changed as a result of the call
417: */
418: public boolean remove(Object o) {
419: if (o == null)
420: return false;
421: boolean removed = false;
422: synchronized (putLock) {
423: synchronized (takeLock) {
424: Node trail = head;
425: Node p = head.next;
426: while (p != null) {
427: if (o.equals(p.item)) {
428: removed = true;
429: break;
430: }
431: trail = p;
432: p = p.next;
433: }
434: if (removed) {
435: p.item = null;
436: trail.next = p.next;
437: if (last == p)
438: last = trail;
439: synchronized (this ) {
440: if (count-- == capacity)
441: putLock.notifyAll();
442: }
443: }
444: }
445: }
446: return removed;
447: }
448:
449: /**
450: * Returns an array containing all of the elements in this queue, in
451: * proper sequence.
452: *
453: * <p>The returned array will be "safe" in that no references to it are
454: * maintained by this queue. (In other words, this method must allocate
455: * a new array). The caller is thus free to modify the returned array.
456: *
457: * <p>This method acts as bridge between array-based and collection-based
458: * APIs.
459: *
460: * @return an array containing all of the elements in this queue
461: */
462: public Object[] toArray() {
463: synchronized (putLock) {
464: synchronized (takeLock) {
465: int size = count;
466: Object[] a = new Object[size];
467: int k = 0;
468: for (Node p = head.next; p != null; p = p.next)
469: a[k++] = p.item;
470: return a;
471: }
472: }
473: }
474:
475: /**
476: * Returns an array containing all of the elements in this queue, in
477: * proper sequence; the runtime type of the returned array is that of
478: * the specified array. If the queue fits in the specified array, it
479: * is returned therein. Otherwise, a new array is allocated with the
480: * runtime type of the specified array and the size of this queue.
481: *
482: * <p>If this queue fits in the specified array with room to spare
483: * (i.e., the array has more elements than this queue), the element in
484: * the array immediately following the end of the queue is set to
485: * <tt>null</tt>.
486: *
487: * <p>Like the {@link #toArray()} method, this method acts as bridge between
488: * array-based and collection-based APIs. Further, this method allows
489: * precise control over the runtime type of the output array, and may,
490: * under certain circumstances, be used to save allocation costs.
491: *
492: * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
493: * The following code can be used to dump the queue into a newly
494: * allocated array of <tt>String</tt>:
495: *
496: * <pre>
497: * String[] y = x.toArray(new String[0]);</pre>
498: *
499: * Note that <tt>toArray(new Object[0])</tt> is identical in function to
500: * <tt>toArray()</tt>.
501: *
502: * @param a the array into which the elements of the queue are to
503: * be stored, if it is big enough; otherwise, a new array of the
504: * same runtime type is allocated for this purpose
505: * @return an array containing all of the elements in this queue
506: * @throws ArrayStoreException if the runtime type of the specified array
507: * is not a supertype of the runtime type of every element in
508: * this queue
509: * @throws NullPointerException if the specified array is null
510: */
511: public Object[] toArray(Object[] a) {
512: synchronized (putLock) {
513: synchronized (takeLock) {
514: int size = count;
515: if (a.length < size)
516: a = (Object[]) java.lang.reflect.Array.newInstance(
517: a.getClass().getComponentType(), size);
518:
519: int k = 0;
520: for (Node p = head.next; p != null; p = p.next)
521: a[k++] = (Object) p.item;
522: if (a.length > k)
523: a[k] = null;
524: return a;
525: }
526: }
527: }
528:
529: public String toString() {
530: synchronized (putLock) {
531: synchronized (takeLock) {
532: return super .toString();
533: }
534: }
535: }
536:
537: /**
538: * Atomically removes all of the elements from this queue.
539: * The queue will be empty after this call returns.
540: */
541: public void clear() {
542: synchronized (putLock) {
543: synchronized (takeLock) {
544: head.next = null;
545: //assert head.item == null;
546: last = head;
547: int c;
548: synchronized (this ) {
549: c = count;
550: count = 0;
551: }
552: if (c == capacity)
553: putLock.notifyAll();
554: }
555: }
556: }
557:
558: /**
559: * @throws UnsupportedOperationException {@inheritDoc}
560: * @throws ClassCastException {@inheritDoc}
561: * @throws NullPointerException {@inheritDoc}
562: * @throws IllegalArgumentException {@inheritDoc}
563: */
564: public int drainTo(Collection c) {
565: if (c == null)
566: throw new NullPointerException();
567: if (c == this )
568: throw new IllegalArgumentException();
569: Node first;
570: synchronized (putLock) {
571: synchronized (takeLock) {
572: first = head.next;
573: head.next = null;
574: //assert head.item == null;
575: last = head;
576: int cold;
577: synchronized (this ) {
578: cold = count;
579: count = 0;
580: }
581: if (cold == capacity)
582: putLock.notifyAll();
583: }
584: }
585: // Transfer the elements outside of locks
586: int n = 0;
587: for (Node p = first; p != null; p = p.next) {
588: c.add(p.item);
589: p.item = null;
590: ++n;
591: }
592: return n;
593: }
594:
595: /**
596: * @throws UnsupportedOperationException {@inheritDoc}
597: * @throws ClassCastException {@inheritDoc}
598: * @throws NullPointerException {@inheritDoc}
599: * @throws IllegalArgumentException {@inheritDoc}
600: */
601: public int drainTo(Collection c, int maxElements) {
602: if (c == null)
603: throw new NullPointerException();
604: if (c == this )
605: throw new IllegalArgumentException();
606: synchronized (putLock) {
607: synchronized (takeLock) {
608: int n = 0;
609: Node p = head.next;
610: while (p != null && n < maxElements) {
611: c.add(p.item);
612: p.item = null;
613: p = p.next;
614: ++n;
615: }
616: if (n != 0) {
617: head.next = p;
618: //assert head.item == null;
619: if (p == null)
620: last = head;
621: int cold;
622: synchronized (this ) {
623: cold = count;
624: count -= n;
625: }
626: if (cold == capacity)
627: putLock.notifyAll();
628: }
629: return n;
630: }
631: }
632: }
633:
634: /**
635: * Returns an iterator over the elements in this queue in proper sequence.
636: * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
637: * will never throw {@link java.util.ConcurrentModificationException},
638: * and guarantees to traverse elements as they existed upon
639: * construction of the iterator, and may (but is not guaranteed to)
640: * reflect any modifications subsequent to construction.
641: *
642: * @return an iterator over the elements in this queue in proper sequence
643: */
644: public Iterator iterator() {
645: return new Itr();
646: }
647:
648: private class Itr implements Iterator {
649: /*
650: * Basic weak-consistent iterator. At all times hold the next
651: * item to hand out so that if hasNext() reports true, we will
652: * still have it to return even if lost race with a take etc.
653: */
654: private Node current;
655: private Node lastRet;
656: private Object currentElement;
657:
658: Itr() {
659: synchronized (putLock) {
660: synchronized (takeLock) {
661: current = head.next;
662: if (current != null)
663: currentElement = current.item;
664: }
665: }
666: }
667:
668: public boolean hasNext() {
669: return current != null;
670: }
671:
672: public Object next() {
673: synchronized (putLock) {
674: synchronized (takeLock) {
675: if (current == null)
676: throw new NoSuchElementException();
677: Object x = currentElement;
678: lastRet = current;
679: current = current.next;
680: if (current != null)
681: currentElement = current.item;
682: return x;
683: }
684: }
685: }
686:
687: public void remove() {
688: if (lastRet == null)
689: throw new IllegalStateException();
690: synchronized (putLock) {
691: synchronized (takeLock) {
692: Node node = lastRet;
693: lastRet = null;
694: Node trail = head;
695: Node p = head.next;
696: while (p != null && p != node) {
697: trail = p;
698: p = p.next;
699: }
700: if (p == node) {
701: p.item = null;
702: trail.next = p.next;
703: if (last == p)
704: last = trail;
705: int c;
706: synchronized (this ) {
707: c = count--;
708: }
709: if (c == capacity)
710: putLock.notifyAll();
711: }
712: }
713: }
714: }
715: }
716:
717: /**
718: * Save the state to a stream (that is, serialize it).
719: *
720: * @serialData The capacity is emitted (int), followed by all of
721: * its elements (each an <tt>Object</tt>) in the proper order,
722: * followed by a null
723: * @param s the stream
724: */
725: private void writeObject(java.io.ObjectOutputStream s)
726: throws java.io.IOException {
727:
728: synchronized (putLock) {
729: synchronized (takeLock) {
730: // Write out any hidden stuff, plus capacity
731: s.defaultWriteObject();
732:
733: // Write out all elements in the proper order.
734: for (Node p = head.next; p != null; p = p.next)
735: s.writeObject(p.item);
736:
737: // Use trailing null as sentinel
738: s.writeObject(null);
739: }
740: }
741: }
742:
743: /**
744: * Reconstitute this queue instance from a stream (that is,
745: * deserialize it).
746: * @param s the stream
747: */
748: private void readObject(java.io.ObjectInputStream s)
749: throws java.io.IOException, ClassNotFoundException {
750: // Read in capacity, and any hidden stuff
751: s.defaultReadObject();
752:
753: synchronized (this ) {
754: count = 0;
755: }
756: last = head = new Node(null);
757:
758: // Read in all elements and place in queue
759: for (;;) {
760: Object item = (Object) s.readObject();
761: if (item == null)
762: break;
763: add(item);
764: }
765: }
766:
767: private static class SerializableLock implements
768: java.io.Serializable {
769: private final static long serialVersionUID = 400L;
770: }
771: }
|