Source Code Cross Referenced for ArrayBlockingQueue.java in  » Apache-Harmony-Java-SE » java-package » java » util » concurrent » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Apache Harmony Java SE » java package » java.util.concurrent 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Written by Doug Lea with assistance from members of JCP JSR-166
003:         * Expert Group and released to the public domain, as explained at
004:         * http://creativecommons.org/licenses/publicdomain
005:         */
006:
007:        package java.util.concurrent;
008:
009:        import java.util.concurrent.locks.*;
010:        import java.util.*;
011:
012:        /**
013:         * A bounded {@linkplain BlockingQueue blocking queue} backed by an
014:         * array.  This queue orders elements FIFO (first-in-first-out).  The
015:         * <em>head</em> of the queue is that element that has been on the
016:         * queue the longest time.  The <em>tail</em> of the queue is that
017:         * element that has been on the queue the shortest time. New elements
018:         * are inserted at the tail of the queue, and the queue retrieval
019:         * operations obtain elements at the head of the queue.
020:         *
021:         * <p>This is a classic &quot;bounded buffer&quot;, in which a
022:         * fixed-sized array holds elements inserted by producers and
023:         * extracted by consumers.  Once created, the capacity cannot be
024:         * increased.  Attempts to offer an element to a full queue will
025:         * result in the offer operation blocking; attempts to retrieve an
026:         * element from an empty queue will similarly block.
027:         *
028:         * <p> This class supports an optional fairness policy for ordering
029:         * waiting producer and consumer threads.  By default, this ordering
030:         * is not guaranteed. However, a queue constructed with fairness set
031:         * to <tt>true</tt> grants threads access in FIFO order. Fairness
032:         * generally decreases throughput but reduces variability and avoids
033:         * starvation.
034:         *
035:         * <p>This class implements all of the <em>optional</em> methods
036:         * of the {@link Collection} and {@link Iterator} interfaces.
037:         *
038:         * <p>This class is a member of the
039:         * <a href="{@docRoot}/../guide/collections/index.html">
040:         * Java Collections Framework</a>.
041:         *
042:         * @since 1.5
043:         * @author Doug Lea
044:         * @param <E> the type of elements held in this collection
045:         */
046:        public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements 
047:                BlockingQueue<E>, java.io.Serializable {
048:
049:            /**
050:             * Serialization ID. This class relies on default serialization
051:             * even for the items array, which is default-serialized, even if
052:             * it is empty. Otherwise it could not be declared final, which is
053:             * necessary here.
054:             */
055:            private static final long serialVersionUID = -817911632652898426L;
056:
057:            /** The queued items  */
058:            private final E[] items;
059:            /** items index for next take, poll or remove */
060:            private transient int takeIndex;
061:            /** items index for next put, offer, or add. */
062:            private transient int putIndex;
063:            /** Number of items in the queue */
064:            private int count;
065:
066:            /*
067:             * Concurrency control uses the classic two-condition algorithm
068:             * found in any textbook.
069:             */
070:
071:            /** Main lock guarding all access */
072:            private final ReentrantLock lock;
073:            /** Condition for waiting takes */
074:            private final Condition notEmpty;
075:            /** Condition for waiting puts */
076:            private final Condition notFull;
077:
078:            // Internal helper methods
079:
080:            /**
081:             * Circularly increment i.
082:             */
083:            final int inc(int i) {
084:                return (++i == items.length) ? 0 : i;
085:            }
086:
087:            /**
088:             * Insert element at current put position, advance, and signal.
089:             * Call only when holding lock.
090:             */
091:            private void insert(E x) {
092:                items[putIndex] = x;
093:                putIndex = inc(putIndex);
094:                ++count;
095:                notEmpty.signal();
096:            }
097:
098:            /**
099:             * Extract element at current take position, advance, and signal.
100:             * Call only when holding lock.
101:             */
102:            private E extract() {
103:                final E[] items = this .items;
104:                E x = items[takeIndex];
105:                items[takeIndex] = null;
106:                takeIndex = inc(takeIndex);
107:                --count;
108:                notFull.signal();
109:                return x;
110:            }
111:
112:            /**
113:             * Utility for remove and iterator.remove: Delete item at position i.
114:             * Call only when holding lock.
115:             */
116:            void removeAt(int i) {
117:                final E[] items = this .items;
118:                // if removing front item, just advance
119:                if (i == takeIndex) {
120:                    items[takeIndex] = null;
121:                    takeIndex = inc(takeIndex);
122:                } else {
123:                    // slide over all others up through putIndex.
124:                    for (;;) {
125:                        int nexti = inc(i);
126:                        if (nexti != putIndex) {
127:                            items[i] = items[nexti];
128:                            i = nexti;
129:                        } else {
130:                            items[i] = null;
131:                            putIndex = i;
132:                            break;
133:                        }
134:                    }
135:                }
136:                --count;
137:                notFull.signal();
138:            }
139:
140:            /**
141:             * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
142:             * capacity and default access policy.
143:             * @param capacity the capacity of this queue
144:             * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
145:             */
146:            public ArrayBlockingQueue(int capacity) {
147:                this (capacity, false);
148:            }
149:
150:            /**
151:             * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
152:             * capacity and the specified access policy.
153:             * @param capacity the capacity of this queue
154:             * @param fair if <tt>true</tt> then queue accesses for threads blocked
155:             * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
156:             * the access order is unspecified.
157:             * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
158:             */
159:            public ArrayBlockingQueue(int capacity, boolean fair) {
160:                if (capacity <= 0)
161:                    throw new IllegalArgumentException();
162:                this .items = (E[]) new Object[capacity];
163:                lock = new ReentrantLock(fair);
164:                notEmpty = lock.newCondition();
165:                notFull = lock.newCondition();
166:            }
167:
168:            /**
169:             * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
170:             * capacity, the specified access policy and initially containing the
171:             * elements of the given collection,
172:             * added in traversal order of the collection's iterator.
173:             * @param capacity the capacity of this queue
174:             * @param fair if <tt>true</tt> then queue accesses for threads blocked
175:             * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
176:             * the access order is unspecified.
177:             * @param c the collection of elements to initially contain
178:             * @throws IllegalArgumentException if <tt>capacity</tt> is less than
179:             * <tt>c.size()</tt>, or less than 1.
180:             * @throws NullPointerException if <tt>c</tt> or any element within it
181:             * is <tt>null</tt>
182:             */
183:            public ArrayBlockingQueue(int capacity, boolean fair,
184:                    Collection<? extends E> c) {
185:                this (capacity, fair);
186:                if (capacity < c.size())
187:                    throw new IllegalArgumentException();
188:
189:                for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
190:                    add(it.next());
191:            }
192:
193:            /**
194:             * Inserts the specified element at the tail of this queue if possible,
195:             * returning immediately if this queue is full.
196:             *
197:             * @param o the element to add.
198:             * @return <tt>true</tt> if it was possible to add the element to
199:             *         this queue, else <tt>false</tt>
200:             * @throws NullPointerException if the specified element is <tt>null</tt>
201:             */
202:            public boolean offer(E o) {
203:                if (o == null)
204:                    throw new NullPointerException();
205:                final ReentrantLock lock = this .lock;
206:                lock.lock();
207:                try {
208:                    if (count == items.length)
209:                        return false;
210:                    else {
211:                        insert(o);
212:                        return true;
213:                    }
214:                } finally {
215:                    lock.unlock();
216:                }
217:            }
218:
219:            /**
220:             * Inserts the specified element at the tail of this queue, waiting if
221:             * necessary up to the specified wait time for space to become available.
222:             * @param o the element to add
223:             * @param timeout how long to wait before giving up, in units of
224:             * <tt>unit</tt>
225:             * @param unit a <tt>TimeUnit</tt> determining how to interpret the
226:             * <tt>timeout</tt> parameter
227:             * @return <tt>true</tt> if successful, or <tt>false</tt> if
228:             * the specified waiting time elapses before space is available.
229:             * @throws InterruptedException if interrupted while waiting.
230:             * @throws NullPointerException if the specified element is <tt>null</tt>.
231:             */
232:            public boolean offer(E o, long timeout, TimeUnit unit)
233:                    throws InterruptedException {
234:
235:                if (o == null)
236:                    throw new NullPointerException();
237:                final ReentrantLock lock = this .lock;
238:                lock.lockInterruptibly();
239:                try {
240:                    long nanos = unit.toNanos(timeout);
241:                    for (;;) {
242:                        if (count != items.length) {
243:                            insert(o);
244:                            return true;
245:                        }
246:                        if (nanos <= 0)
247:                            return false;
248:                        try {
249:                            nanos = notFull.awaitNanos(nanos);
250:                        } catch (InterruptedException ie) {
251:                            notFull.signal(); // propagate to non-interrupted thread
252:                            throw ie;
253:                        }
254:                    }
255:                } finally {
256:                    lock.unlock();
257:                }
258:            }
259:
260:            public E poll() {
261:                final ReentrantLock lock = this .lock;
262:                lock.lock();
263:                try {
264:                    if (count == 0)
265:                        return null;
266:                    E x = extract();
267:                    return x;
268:                } finally {
269:                    lock.unlock();
270:                }
271:            }
272:
273:            public E poll(long timeout, TimeUnit unit)
274:                    throws InterruptedException {
275:                final ReentrantLock lock = this .lock;
276:                lock.lockInterruptibly();
277:                try {
278:                    long nanos = unit.toNanos(timeout);
279:                    for (;;) {
280:                        if (count != 0) {
281:                            E x = extract();
282:                            return x;
283:                        }
284:                        if (nanos <= 0)
285:                            return null;
286:                        try {
287:                            nanos = notEmpty.awaitNanos(nanos);
288:                        } catch (InterruptedException ie) {
289:                            notEmpty.signal(); // propagate to non-interrupted thread
290:                            throw ie;
291:                        }
292:
293:                    }
294:                } finally {
295:                    lock.unlock();
296:                }
297:            }
298:
299:            public boolean remove(Object o) {
300:                if (o == null)
301:                    return false;
302:                final E[] items = this .items;
303:                final ReentrantLock lock = this .lock;
304:                lock.lock();
305:                try {
306:                    int i = takeIndex;
307:                    int k = 0;
308:                    for (;;) {
309:                        if (k++ >= count)
310:                            return false;
311:                        if (o.equals(items[i])) {
312:                            removeAt(i);
313:                            return true;
314:                        }
315:                        i = inc(i);
316:                    }
317:
318:                } finally {
319:                    lock.unlock();
320:                }
321:            }
322:
323:            public E peek() {
324:                final ReentrantLock lock = this .lock;
325:                lock.lock();
326:                try {
327:                    return (count == 0) ? null : items[takeIndex];
328:                } finally {
329:                    lock.unlock();
330:                }
331:            }
332:
333:            public E take() throws InterruptedException {
334:                final ReentrantLock lock = this .lock;
335:                lock.lockInterruptibly();
336:                try {
337:                    try {
338:                        while (count == 0)
339:                            notEmpty.await();
340:                    } catch (InterruptedException ie) {
341:                        notEmpty.signal(); // propagate to non-interrupted thread
342:                        throw ie;
343:                    }
344:                    E x = extract();
345:                    return x;
346:                } finally {
347:                    lock.unlock();
348:                }
349:            }
350:
351:            /**
352:             * Adds the specified element to the tail of this queue, waiting if
353:             * necessary for space to become available.
354:             * @param o the element to add
355:             * @throws InterruptedException if interrupted while waiting.
356:             * @throws NullPointerException if the specified element is <tt>null</tt>.
357:             */
358:            public void put(E o) throws InterruptedException {
359:                if (o == null)
360:                    throw new NullPointerException();
361:                final E[] items = this .items;
362:                final ReentrantLock lock = this .lock;
363:                lock.lockInterruptibly();
364:                try {
365:                    try {
366:                        while (count == items.length)
367:                            notFull.await();
368:                    } catch (InterruptedException ie) {
369:                        notFull.signal(); // propagate to non-interrupted thread
370:                        throw ie;
371:                    }
372:                    insert(o);
373:                } finally {
374:                    lock.unlock();
375:                }
376:            }
377:
378:            // this doc comment is overridden to remove the reference to collections
379:            // greater in size than Integer.MAX_VALUE
380:            /**
381:             * Returns the number of elements in this queue.
382:             *
383:             * @return  the number of elements in this queue.
384:             */
385:            public int size() {
386:                final ReentrantLock lock = this .lock;
387:                lock.lock();
388:                try {
389:                    return count;
390:                } finally {
391:                    lock.unlock();
392:                }
393:            }
394:
395:            // this doc comment is a modified copy of the inherited doc comment,
396:            // without the reference to unlimited queues.
397:            /**
398:             * Returns the number of elements that this queue can ideally (in
399:             * the absence of memory or resource constraints) accept without
400:             * blocking. This is always equal to the initial capacity of this queue
401:             * less the current <tt>size</tt> of this queue.
402:             * <p>Note that you <em>cannot</em> always tell if
403:             * an attempt to <tt>add</tt> an element will succeed by
404:             * inspecting <tt>remainingCapacity</tt> because it may be the
405:             * case that a waiting consumer is ready to <tt>take</tt> an
406:             * element out of an otherwise full queue.
407:             */
408:            public int remainingCapacity() {
409:                final ReentrantLock lock = this .lock;
410:                lock.lock();
411:                try {
412:                    return items.length - count;
413:                } finally {
414:                    lock.unlock();
415:                }
416:            }
417:
418:            public boolean contains(Object o) {
419:                if (o == null)
420:                    return false;
421:                final E[] items = this .items;
422:                final ReentrantLock lock = this .lock;
423:                lock.lock();
424:                try {
425:                    int i = takeIndex;
426:                    int k = 0;
427:                    while (k++ < count) {
428:                        if (o.equals(items[i]))
429:                            return true;
430:                        i = inc(i);
431:                    }
432:                    return false;
433:                } finally {
434:                    lock.unlock();
435:                }
436:            }
437:
438:            public Object[] toArray() {
439:                final E[] items = this .items;
440:                final ReentrantLock lock = this .lock;
441:                lock.lock();
442:                try {
443:                    Object[] a = new Object[count];
444:                    int k = 0;
445:                    int i = takeIndex;
446:                    while (k < count) {
447:                        a[k++] = items[i];
448:                        i = inc(i);
449:                    }
450:                    return a;
451:                } finally {
452:                    lock.unlock();
453:                }
454:            }
455:
456:            public <T> T[] toArray(T[] a) {
457:                final E[] items = this .items;
458:                final ReentrantLock lock = this .lock;
459:                lock.lock();
460:                try {
461:                    if (a.length < count)
462:                        a = (T[]) java.lang.reflect.Array.newInstance(a
463:                                .getClass().getComponentType(), count);
464:
465:                    int k = 0;
466:                    int i = takeIndex;
467:                    while (k < count) {
468:                        a[k++] = (T) items[i];
469:                        i = inc(i);
470:                    }
471:                    if (a.length > count)
472:                        a[count] = null;
473:                    return a;
474:                } finally {
475:                    lock.unlock();
476:                }
477:            }
478:
479:            public String toString() {
480:                final ReentrantLock lock = this .lock;
481:                lock.lock();
482:                try {
483:                    return super .toString();
484:                } finally {
485:                    lock.unlock();
486:                }
487:            }
488:
489:            public void clear() {
490:                final E[] items = this .items;
491:                final ReentrantLock lock = this .lock;
492:                lock.lock();
493:                try {
494:                    int i = takeIndex;
495:                    int k = count;
496:                    while (k-- > 0) {
497:                        items[i] = null;
498:                        i = inc(i);
499:                    }
500:                    count = 0;
501:                    putIndex = 0;
502:                    takeIndex = 0;
503:                    notFull.signalAll();
504:                } finally {
505:                    lock.unlock();
506:                }
507:            }
508:
509:            public int drainTo(Collection<? super  E> c) {
510:                if (c == null)
511:                    throw new NullPointerException();
512:                if (c == this )
513:                    throw new IllegalArgumentException();
514:                final E[] items = this .items;
515:                final ReentrantLock lock = this .lock;
516:                lock.lock();
517:                try {
518:                    int i = takeIndex;
519:                    int n = 0;
520:                    int max = count;
521:                    while (n < max) {
522:                        c.add(items[i]);
523:                        items[i] = null;
524:                        i = inc(i);
525:                        ++n;
526:                    }
527:                    if (n > 0) {
528:                        count = 0;
529:                        putIndex = 0;
530:                        takeIndex = 0;
531:                        notFull.signalAll();
532:                    }
533:                    return n;
534:                } finally {
535:                    lock.unlock();
536:                }
537:            }
538:
539:            public int drainTo(Collection<? super  E> c, int maxElements) {
540:                if (c == null)
541:                    throw new NullPointerException();
542:                if (c == this )
543:                    throw new IllegalArgumentException();
544:                if (maxElements <= 0)
545:                    return 0;
546:                final E[] items = this .items;
547:                final ReentrantLock lock = this .lock;
548:                lock.lock();
549:                try {
550:                    int i = takeIndex;
551:                    int n = 0;
552:                    int sz = count;
553:                    int max = (maxElements < count) ? maxElements : count;
554:                    while (n < max) {
555:                        c.add(items[i]);
556:                        items[i] = null;
557:                        i = inc(i);
558:                        ++n;
559:                    }
560:                    if (n > 0) {
561:                        count -= n;
562:                        takeIndex = i;
563:                        notFull.signalAll();
564:                    }
565:                    return n;
566:                } finally {
567:                    lock.unlock();
568:                }
569:            }
570:
571:            /**
572:             * Returns an iterator over the elements in this queue in proper sequence.
573:             * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
574:             * will never throw {@link java.util.ConcurrentModificationException},
575:             * and guarantees to traverse elements as they existed upon
576:             * construction of the iterator, and may (but is not guaranteed to)
577:             * reflect any modifications subsequent to construction.
578:             *
579:             * @return an iterator over the elements in this queue in proper sequence.
580:             */
581:            public Iterator<E> iterator() {
582:                final ReentrantLock lock = this .lock;
583:                lock.lock();
584:                try {
585:                    return new Itr();
586:                } finally {
587:                    lock.unlock();
588:                }
589:            }
590:
591:            /**
592:             * Iterator for ArrayBlockingQueue
593:             */
594:            private class Itr implements  Iterator<E> {
595:                /**
596:                 * Index of element to be returned by next,
597:                 * or a negative number if no such.
598:                 */
599:                private int nextIndex;
600:
601:                /**
602:                 * nextItem holds on to item fields because once we claim
603:                 * that an element exists in hasNext(), we must return it in
604:                 * the following next() call even if it was in the process of
605:                 * being removed when hasNext() was called.
606:                 **/
607:                private E nextItem;
608:
609:                /**
610:                 * Index of element returned by most recent call to next.
611:                 * Reset to -1 if this element is deleted by a call to remove.
612:                 */
613:                private int lastRet;
614:
615:                Itr() {
616:                    lastRet = -1;
617:                    if (count == 0)
618:                        nextIndex = -1;
619:                    else {
620:                        nextIndex = takeIndex;
621:                        nextItem = items[takeIndex];
622:                    }
623:                }
624:
625:                public boolean hasNext() {
626:                    /*
627:                     * No sync. We can return true by mistake here
628:                     * only if this iterator passed across threads,
629:                     * which we don't support anyway.
630:                     */
631:                    return nextIndex >= 0;
632:                }
633:
634:                /**
635:                 * Check whether nextIndex is valid; if so setting nextItem.
636:                 * Stops iterator when either hits putIndex or sees null item.
637:                 */
638:                private void checkNext() {
639:                    if (nextIndex == putIndex) {
640:                        nextIndex = -1;
641:                        nextItem = null;
642:                    } else {
643:                        nextItem = items[nextIndex];
644:                        if (nextItem == null)
645:                            nextIndex = -1;
646:                    }
647:                }
648:
649:                public E next() {
650:                    final ReentrantLock lock = ArrayBlockingQueue.this .lock;
651:                    lock.lock();
652:                    try {
653:                        if (nextIndex < 0)
654:                            throw new NoSuchElementException();
655:                        lastRet = nextIndex;
656:                        E x = nextItem;
657:                        nextIndex = inc(nextIndex);
658:                        checkNext();
659:                        return x;
660:                    } finally {
661:                        lock.unlock();
662:                    }
663:                }
664:
665:                public void remove() {
666:                    final ReentrantLock lock = ArrayBlockingQueue.this .lock;
667:                    lock.lock();
668:                    try {
669:                        int i = lastRet;
670:                        if (i == -1)
671:                            throw new IllegalStateException();
672:                        lastRet = -1;
673:
674:                        int ti = takeIndex;
675:                        removeAt(i);
676:                        // back up cursor (reset to front if was first element)
677:                        nextIndex = (i == ti) ? takeIndex : i;
678:                        checkNext();
679:                    } finally {
680:                        lock.unlock();
681:                    }
682:                }
683:            }
684:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.