Source Code Cross Referenced for LinkedBlockingQueue.java in  » 6.0-JDK-Core » Collections-Jar-Zip-Logging-regex » java » util » concurrent » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Home
Java Source Code / Java Documentation
1.6.0 JDK Core
2.6.0 JDK Modules
3.6.0 JDK Modules com.sun
4.6.0 JDK Modules com.sun.java
5.6.0 JDK Modules sun
6.6.0 JDK Platform
7.Ajax
8.Apache Harmony Java SE
9.Aspect oriented
10.Authentication Authorization
11.Blogger System
12.Build
13.Byte Code
14.Cache
15.Chart
16.Chat
17.Code Analyzer
18.Collaboration
19.Content Management System
20.Database Client
21.Database DBMS
22.Database JDBC Connection Pool
23.Database ORM
24.Development
25.EJB Server
26.ERP CRM Financial
27.ESB
28.Forum
29.Game
30.GIS
31.Graphic 3D
32.Graphic Library
33.Groupware
34.HTML Parser
35.IDE
36.IDE Eclipse
37.IDE Netbeans
38.Installer
39.Internationalization Localization
40.Inversion of Control
41.Issue Tracking
42.J2EE
43.J2ME
44.JBoss
45.JMS
46.JMX
47.Library
48.Mail Clients
49.Music
50.Net
51.Parser
52.PDF
53.Portal
54.Profiler
55.Project Management
56.Report
57.RSS RDF
58.Rule Engine
59.Science
60.Scripting
61.Search Engine
62.Security
63.Sevlet Container
64.Source Control
65.Swing Library
66.Template Engine
67.Test Coverage
68.Testing
69.UML
70.Web Crawler
71.Web Framework
72.Web Mail
73.Web Server
74.Web Services
75.Web Services apache cxf 2.2.6
76.Web Services AXIS2
77.Wiki Engine
78.Workflow Engines
79.XML
80.XML UI
Java Source Code / Java Documentation » 6.0 JDK Core » Collections Jar Zip Logging regex » java.util.concurrent 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001        /*
002         * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
003         *
004         * This code is free software; you can redistribute it and/or modify it
005         * under the terms of the GNU General Public License version 2 only, as
006         * published by the Free Software Foundation.  Sun designates this
007         * particular file as subject to the "Classpath" exception as provided
008         * by Sun in the LICENSE file that accompanied this code.
009         *
010         * This code is distributed in the hope that it will be useful, but WITHOUT
011         * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012         * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
013         * version 2 for more details (a copy is included in the LICENSE file that
014         * accompanied this code).
015         *
016         * You should have received a copy of the GNU General Public License version
017         * 2 along with this work; if not, write to the Free Software Foundation,
018         * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
019         *
020         * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
021         * CA 95054 USA or visit www.sun.com if you need additional information or
022         * have any questions.
023         */
024
025        /*
026         * This file is available under and governed by the GNU General Public
027         * License version 2 only, as published by the Free Software Foundation.
028         * However, the following notice accompanied the original version of this
029         * file:
030         *
031         * Written by Doug Lea with assistance from members of JCP JSR-166
032         * Expert Group and released to the public domain, as explained at
033         * http://creativecommons.org/licenses/publicdomain
034         */
035
036        package java.util.concurrent;
037
038        import java.util.concurrent.atomic.*;
039        import java.util.concurrent.locks.*;
040        import java.util.*;
041
042        /**
043         * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
044         * linked nodes.
045         * This queue orders elements FIFO (first-in-first-out).
046         * The <em>head</em> of the queue is that element that has been on the
047         * queue the longest time.
048         * The <em>tail</em> of the queue is that element that has been on the
049         * queue the shortest time. New elements
050         * are inserted at the tail of the queue, and the queue retrieval
051         * operations obtain elements at the head of the queue.
052         * Linked queues typically have higher throughput than array-based queues but
053         * less predictable performance in most concurrent applications.
054         *
055         * <p> The optional capacity bound constructor argument serves as a
056         * way to prevent excessive queue expansion. The capacity, if unspecified,
057         * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
058         * dynamically created upon each insertion unless this would bring the
059         * queue above capacity.
060         *
061         * <p>This class and its iterator implement all of the
062         * <em>optional</em> methods of the {@link Collection} and {@link
063         * Iterator} interfaces.
064         *
065         * <p>This class is a member of the
066         * <a href="{@docRoot}/../technotes/guides/collections/index.html">
067         * Java Collections Framework</a>.
068         *
069         * @since 1.5
070         * @author Doug Lea
071         * @param <E> the type of elements held in this collection
072         *
073         */
074        public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements 
075                BlockingQueue<E>, java.io.Serializable {
076            private static final long serialVersionUID = -6903933977591709194L;
077
078            /*
079             * A variant of the "two lock queue" algorithm.  The putLock gates
080             * entry to put (and offer), and has an associated condition for
081             * waiting puts.  Similarly for the takeLock.  The "count" field
082             * that they both rely on is maintained as an atomic to avoid
083             * needing to get both locks in most cases. Also, to minimize need
084             * for puts to get takeLock and vice-versa, cascading notifies are
085             * used. When a put notices that it has enabled at least one take,
086             * it signals taker. That taker in turn signals others if more
087             * items have been entered since the signal. And symmetrically for
088             * takes signalling puts. Operations such as remove(Object) and
089             * iterators acquire both locks.
090             */
091
092            /**
093             * Linked list node class
094             */
095            static class Node<E> {
096                /** The item, volatile to ensure barrier separating write and read */
097                volatile E item;
098                Node<E> next;
099
100                Node(E x) {
101                    item = x;
102                }
103            }
104
105            /** The capacity bound, or Integer.MAX_VALUE if none */
106            private final int capacity;
107
108            /** Current number of elements */
109            private final AtomicInteger count = new AtomicInteger(0);
110
111            /** Head of linked list */
112            private transient Node<E> head;
113
114            /** Tail of linked list */
115            private transient Node<E> last;
116
117            /** Lock held by take, poll, etc */
118            private final ReentrantLock takeLock = new ReentrantLock();
119
120            /** Wait queue for waiting takes */
121            private final Condition notEmpty = takeLock.newCondition();
122
123            /** Lock held by put, offer, etc */
124            private final ReentrantLock putLock = new ReentrantLock();
125
126            /** Wait queue for waiting puts */
127            private final Condition notFull = putLock.newCondition();
128
129            /**
130             * Signals a waiting take. Called only from put/offer (which do not
131             * otherwise ordinarily lock takeLock.)
132             */
133            private void signalNotEmpty() {
134                final ReentrantLock takeLock = this .takeLock;
135                takeLock.lock();
136                try {
137                    notEmpty.signal();
138                } finally {
139                    takeLock.unlock();
140                }
141            }
142
143            /**
144             * Signals a waiting put. Called only from take/poll.
145             */
146            private void signalNotFull() {
147                final ReentrantLock putLock = this .putLock;
148                putLock.lock();
149                try {
150                    notFull.signal();
151                } finally {
152                    putLock.unlock();
153                }
154            }
155
156            /**
157             * Creates a node and links it at end of queue.
158             * @param x the item
159             */
160            private void insert(E x) {
161                last = last.next = new Node<E>(x);
162            }
163
164            /**
165             * Removes a node from head of queue,
166             * @return the node
167             */
168            private E extract() {
169                Node<E> first = head.next;
170                head = first;
171                E x = first.item;
172                first.item = null;
173                return x;
174            }
175
176            /**
177             * Lock to prevent both puts and takes.
178             */
179            private void fullyLock() {
180                putLock.lock();
181                takeLock.lock();
182            }
183
184            /**
185             * Unlock to allow both puts and takes.
186             */
187            private void fullyUnlock() {
188                takeLock.unlock();
189                putLock.unlock();
190            }
191
192            /**
193             * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
194             * {@link Integer#MAX_VALUE}.
195             */
196            public LinkedBlockingQueue() {
197                this (Integer.MAX_VALUE);
198            }
199
200            /**
201             * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
202             *
203             * @param capacity the capacity of this queue
204             * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
205             *         than zero
206             */
207            public LinkedBlockingQueue(int capacity) {
208                if (capacity <= 0)
209                    throw new IllegalArgumentException();
210                this .capacity = capacity;
211                last = head = new Node<E>(null);
212            }
213
214            /**
215             * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
216             * {@link Integer#MAX_VALUE}, initially containing the elements of the
217             * given collection,
218             * added in traversal order of the collection's iterator.
219             *
220             * @param c the collection of elements to initially contain
221             * @throws NullPointerException if the specified collection or any
222             *         of its elements are null
223             */
224            public LinkedBlockingQueue(Collection<? extends E> c) {
225                this (Integer.MAX_VALUE);
226                for (E e : c)
227                    add(e);
228            }
229
230            // this doc comment is overridden to remove the reference to collections
231            // greater in size than Integer.MAX_VALUE
232            /**
233             * Returns the number of elements in this queue.
234             *
235             * @return the number of elements in this queue
236             */
237            public int size() {
238                return count.get();
239            }
240
241            // this doc comment is a modified copy of the inherited doc comment,
242            // without the reference to unlimited queues.
243            /**
244             * Returns the number of additional elements that this queue can ideally
245             * (in the absence of memory or resource constraints) accept without
246             * blocking. This is always equal to the initial capacity of this queue
247             * less the current <tt>size</tt> of this queue.
248             *
249             * <p>Note that you <em>cannot</em> always tell if an attempt to insert
250             * an element will succeed by inspecting <tt>remainingCapacity</tt>
251             * because it may be the case that another thread is about to
252             * insert or remove an element.
253             */
254            public int remainingCapacity() {
255                return capacity - count.get();
256            }
257
258            /**
259             * Inserts the specified element at the tail of this queue, waiting if
260             * necessary for space to become available.
261             *
262             * @throws InterruptedException {@inheritDoc}
263             * @throws NullPointerException {@inheritDoc}
264             */
265            public void put(E e) throws InterruptedException {
266                if (e == null)
267                    throw new NullPointerException();
268                // Note: convention in all put/take/etc is to preset
269                // local var holding count  negative to indicate failure unless set.
270                int c = -1;
271                final ReentrantLock putLock = this .putLock;
272                final AtomicInteger count = this .count;
273                putLock.lockInterruptibly();
274                try {
275                    /*
276                     * Note that count is used in wait guard even though it is
277                     * not protected by lock. This works because count can
278                     * only decrease at this point (all other puts are shut
279                     * out by lock), and we (or some other waiting put) are
280                     * signalled if it ever changes from
281                     * capacity. Similarly for all other uses of count in
282                     * other wait guards.
283                     */
284                    try {
285                        while (count.get() == capacity)
286                            notFull.await();
287                    } catch (InterruptedException ie) {
288                        notFull.signal(); // propagate to a non-interrupted thread
289                        throw ie;
290                    }
291                    insert(e);
292                    c = count.getAndIncrement();
293                    if (c + 1 < capacity)
294                        notFull.signal();
295                } finally {
296                    putLock.unlock();
297                }
298                if (c == 0)
299                    signalNotEmpty();
300            }
301
302            /**
303             * Inserts the specified element at the tail of this queue, waiting if
304             * necessary up to the specified wait time for space to become available.
305             *
306             * @return <tt>true</tt> if successful, or <tt>false</tt> if
307             *         the specified waiting time elapses before space is available.
308             * @throws InterruptedException {@inheritDoc}
309             * @throws NullPointerException {@inheritDoc}
310             */
311            public boolean offer(E e, long timeout, TimeUnit unit)
312                    throws InterruptedException {
313
314                if (e == null)
315                    throw new NullPointerException();
316                long nanos = unit.toNanos(timeout);
317                int c = -1;
318                final ReentrantLock putLock = this .putLock;
319                final AtomicInteger count = this .count;
320                putLock.lockInterruptibly();
321                try {
322                    for (;;) {
323                        if (count.get() < capacity) {
324                            insert(e);
325                            c = count.getAndIncrement();
326                            if (c + 1 < capacity)
327                                notFull.signal();
328                            break;
329                        }
330                        if (nanos <= 0)
331                            return false;
332                        try {
333                            nanos = notFull.awaitNanos(nanos);
334                        } catch (InterruptedException ie) {
335                            notFull.signal(); // propagate to a non-interrupted thread
336                            throw ie;
337                        }
338                    }
339                } finally {
340                    putLock.unlock();
341                }
342                if (c == 0)
343                    signalNotEmpty();
344                return true;
345            }
346
347            /**
348             * Inserts the specified element at the tail of this queue if it is
349             * possible to do so immediately without exceeding the queue's capacity,
350             * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
351             * is full.
352             * When using a capacity-restricted queue, this method is generally
353             * preferable to method {@link BlockingQueue#add add}, which can fail to
354             * insert an element only by throwing an exception.
355             *
356             * @throws NullPointerException if the specified element is null
357             */
358            public boolean offer(E e) {
359                if (e == null)
360                    throw new NullPointerException();
361                final AtomicInteger count = this .count;
362                if (count.get() == capacity)
363                    return false;
364                int c = -1;
365                final ReentrantLock putLock = this .putLock;
366                putLock.lock();
367                try {
368                    if (count.get() < capacity) {
369                        insert(e);
370                        c = count.getAndIncrement();
371                        if (c + 1 < capacity)
372                            notFull.signal();
373                    }
374                } finally {
375                    putLock.unlock();
376                }
377                if (c == 0)
378                    signalNotEmpty();
379                return c >= 0;
380            }
381
382            public E take() throws InterruptedException {
383                E x;
384                int c = -1;
385                final AtomicInteger count = this .count;
386                final ReentrantLock takeLock = this .takeLock;
387                takeLock.lockInterruptibly();
388                try {
389                    try {
390                        while (count.get() == 0)
391                            notEmpty.await();
392                    } catch (InterruptedException ie) {
393                        notEmpty.signal(); // propagate to a non-interrupted thread
394                        throw ie;
395                    }
396
397                    x = extract();
398                    c = count.getAndDecrement();
399                    if (c > 1)
400                        notEmpty.signal();
401                } finally {
402                    takeLock.unlock();
403                }
404                if (c == capacity)
405                    signalNotFull();
406                return x;
407            }
408
409            public E poll(long timeout, TimeUnit unit)
410                    throws InterruptedException {
411                E x = null;
412                int c = -1;
413                long nanos = unit.toNanos(timeout);
414                final AtomicInteger count = this .count;
415                final ReentrantLock takeLock = this .takeLock;
416                takeLock.lockInterruptibly();
417                try {
418                    for (;;) {
419                        if (count.get() > 0) {
420                            x = extract();
421                            c = count.getAndDecrement();
422                            if (c > 1)
423                                notEmpty.signal();
424                            break;
425                        }
426                        if (nanos <= 0)
427                            return null;
428                        try {
429                            nanos = notEmpty.awaitNanos(nanos);
430                        } catch (InterruptedException ie) {
431                            notEmpty.signal(); // propagate to a non-interrupted thread
432                            throw ie;
433                        }
434                    }
435                } finally {
436                    takeLock.unlock();
437                }
438                if (c == capacity)
439                    signalNotFull();
440                return x;
441            }
442
443            public E poll() {
444                final AtomicInteger count = this .count;
445                if (count.get() == 0)
446                    return null;
447                E x = null;
448                int c = -1;
449                final ReentrantLock takeLock = this .takeLock;
450                takeLock.lock();
451                try {
452                    if (count.get() > 0) {
453                        x = extract();
454                        c = count.getAndDecrement();
455                        if (c > 1)
456                            notEmpty.signal();
457                    }
458                } finally {
459                    takeLock.unlock();
460                }
461                if (c == capacity)
462                    signalNotFull();
463                return x;
464            }
465
466            public E peek() {
467                if (count.get() == 0)
468                    return null;
469                final ReentrantLock takeLock = this .takeLock;
470                takeLock.lock();
471                try {
472                    Node<E> first = head.next;
473                    if (first == null)
474                        return null;
475                    else
476                        return first.item;
477                } finally {
478                    takeLock.unlock();
479                }
480            }
481
482            /**
483             * Removes a single instance of the specified element from this queue,
484             * if it is present.  More formally, removes an element <tt>e</tt> such
485             * that <tt>o.equals(e)</tt>, if this queue contains one or more such
486             * elements.
487             * Returns <tt>true</tt> if this queue contained the specified element
488             * (or equivalently, if this queue changed as a result of the call).
489             *
490             * @param o element to be removed from this queue, if present
491             * @return <tt>true</tt> if this queue changed as a result of the call
492             */
493            public boolean remove(Object o) {
494                if (o == null)
495                    return false;
496                boolean removed = false;
497                fullyLock();
498                try {
499                    Node<E> trail = head;
500                    Node<E> p = head.next;
501                    while (p != null) {
502                        if (o.equals(p.item)) {
503                            removed = true;
504                            break;
505                        }
506                        trail = p;
507                        p = p.next;
508                    }
509                    if (removed) {
510                        p.item = null;
511                        trail.next = p.next;
512                        if (last == p)
513                            last = trail;
514                        if (count.getAndDecrement() == capacity)
515                            notFull.signalAll();
516                    }
517                } finally {
518                    fullyUnlock();
519                }
520                return removed;
521            }
522
523            /**
524             * Returns an array containing all of the elements in this queue, in
525             * proper sequence.
526             *
527             * <p>The returned array will be "safe" in that no references to it are
528             * maintained by this queue.  (In other words, this method must allocate
529             * a new array).  The caller is thus free to modify the returned array.
530             *
531             * <p>This method acts as bridge between array-based and collection-based
532             * APIs.
533             *
534             * @return an array containing all of the elements in this queue
535             */
536            public Object[] toArray() {
537                fullyLock();
538                try {
539                    int size = count.get();
540                    Object[] a = new Object[size];
541                    int k = 0;
542                    for (Node<E> p = head.next; p != null; p = p.next)
543                        a[k++] = p.item;
544                    return a;
545                } finally {
546                    fullyUnlock();
547                }
548            }
549
550            /**
551             * Returns an array containing all of the elements in this queue, in
552             * proper sequence; the runtime type of the returned array is that of
553             * the specified array.  If the queue fits in the specified array, it
554             * is returned therein.  Otherwise, a new array is allocated with the
555             * runtime type of the specified array and the size of this queue.
556             *
557             * <p>If this queue fits in the specified array with room to spare
558             * (i.e., the array has more elements than this queue), the element in
559             * the array immediately following the end of the queue is set to
560             * <tt>null</tt>.
561             *
562             * <p>Like the {@link #toArray()} method, this method acts as bridge between
563             * array-based and collection-based APIs.  Further, this method allows
564             * precise control over the runtime type of the output array, and may,
565             * under certain circumstances, be used to save allocation costs.
566             *
567             * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
568             * The following code can be used to dump the queue into a newly
569             * allocated array of <tt>String</tt>:
570             *
571             * <pre>
572             *     String[] y = x.toArray(new String[0]);</pre>
573             *
574             * Note that <tt>toArray(new Object[0])</tt> is identical in function to
575             * <tt>toArray()</tt>.
576             *
577             * @param a the array into which the elements of the queue are to
578             *          be stored, if it is big enough; otherwise, a new array of the
579             *          same runtime type is allocated for this purpose
580             * @return an array containing all of the elements in this queue
581             * @throws ArrayStoreException if the runtime type of the specified array
582             *         is not a supertype of the runtime type of every element in
583             *         this queue
584             * @throws NullPointerException if the specified array is null
585             */
586            public <T> T[] toArray(T[] a) {
587                fullyLock();
588                try {
589                    int size = count.get();
590                    if (a.length < size)
591                        a = (T[]) java.lang.reflect.Array.newInstance(a
592                                .getClass().getComponentType(), size);
593
594                    int k = 0;
595                    for (Node p = head.next; p != null; p = p.next)
596                        a[k++] = (T) p.item;
597                    if (a.length > k)
598                        a[k] = null;
599                    return a;
600                } finally {
601                    fullyUnlock();
602                }
603            }
604
605            public String toString() {
606                fullyLock();
607                try {
608                    return super .toString();
609                } finally {
610                    fullyUnlock();
611                }
612            }
613
614            /**
615             * Atomically removes all of the elements from this queue.
616             * The queue will be empty after this call returns.
617             */
618            public void clear() {
619                fullyLock();
620                try {
621                    head.next = null;
622                    assert head.item == null;
623                    last = head;
624                    if (count.getAndSet(0) == capacity)
625                        notFull.signalAll();
626                } finally {
627                    fullyUnlock();
628                }
629            }
630
631            /**
632             * @throws UnsupportedOperationException {@inheritDoc}
633             * @throws ClassCastException            {@inheritDoc}
634             * @throws NullPointerException          {@inheritDoc}
635             * @throws IllegalArgumentException      {@inheritDoc}
636             */
637            public int drainTo(Collection<? super  E> c) {
638                if (c == null)
639                    throw new NullPointerException();
640                if (c == this )
641                    throw new IllegalArgumentException();
642                Node<E> first;
643                fullyLock();
644                try {
645                    first = head.next;
646                    head.next = null;
647                    assert head.item == null;
648                    last = head;
649                    if (count.getAndSet(0) == capacity)
650                        notFull.signalAll();
651                } finally {
652                    fullyUnlock();
653                }
654                // Transfer the elements outside of locks
655                int n = 0;
656                for (Node<E> p = first; p != null; p = p.next) {
657                    c.add(p.item);
658                    p.item = null;
659                    ++n;
660                }
661                return n;
662            }
663
664            /**
665             * @throws UnsupportedOperationException {@inheritDoc}
666             * @throws ClassCastException            {@inheritDoc}
667             * @throws NullPointerException          {@inheritDoc}
668             * @throws IllegalArgumentException      {@inheritDoc}
669             */
670            public int drainTo(Collection<? super  E> c, int maxElements) {
671                if (c == null)
672                    throw new NullPointerException();
673                if (c == this )
674                    throw new IllegalArgumentException();
675                fullyLock();
676                try {
677                    int n = 0;
678                    Node<E> p = head.next;
679                    while (p != null && n < maxElements) {
680                        c.add(p.item);
681                        p.item = null;
682                        p = p.next;
683                        ++n;
684                    }
685                    if (n != 0) {
686                        head.next = p;
687                        assert head.item == null;
688                        if (p == null)
689                            last = head;
690                        if (count.getAndAdd(-n) == capacity)
691                            notFull.signalAll();
692                    }
693                    return n;
694                } finally {
695                    fullyUnlock();
696                }
697            }
698
699            /**
700             * Returns an iterator over the elements in this queue in proper sequence.
701             * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
702             * will never throw {@link ConcurrentModificationException},
703             * and guarantees to traverse elements as they existed upon
704             * construction of the iterator, and may (but is not guaranteed to)
705             * reflect any modifications subsequent to construction.
706             *
707             * @return an iterator over the elements in this queue in proper sequence
708             */
709            public Iterator<E> iterator() {
710                return new Itr();
711            }
712
713            private class Itr implements  Iterator<E> {
714                /*
715                 * Basic weak-consistent iterator.  At all times hold the next
716                 * item to hand out so that if hasNext() reports true, we will
717                 * still have it to return even if lost race with a take etc.
718                 */
719                private Node<E> current;
720                private Node<E> lastRet;
721                private E currentElement;
722
723                Itr() {
724                    final ReentrantLock putLock = LinkedBlockingQueue.this .putLock;
725                    final ReentrantLock takeLock = LinkedBlockingQueue.this .takeLock;
726                    putLock.lock();
727                    takeLock.lock();
728                    try {
729                        current = head.next;
730                        if (current != null)
731                            currentElement = current.item;
732                    } finally {
733                        takeLock.unlock();
734                        putLock.unlock();
735                    }
736                }
737
738                public boolean hasNext() {
739                    return current != null;
740                }
741
742                public E next() {
743                    final ReentrantLock putLock = LinkedBlockingQueue.this .putLock;
744                    final ReentrantLock takeLock = LinkedBlockingQueue.this .takeLock;
745                    putLock.lock();
746                    takeLock.lock();
747                    try {
748                        if (current == null)
749                            throw new NoSuchElementException();
750                        E x = currentElement;
751                        lastRet = current;
752                        current = current.next;
753                        if (current != null)
754                            currentElement = current.item;
755                        return x;
756                    } finally {
757                        takeLock.unlock();
758                        putLock.unlock();
759                    }
760                }
761
762                public void remove() {
763                    if (lastRet == null)
764                        throw new IllegalStateException();
765                    final ReentrantLock putLock = LinkedBlockingQueue.this .putLock;
766                    final ReentrantLock takeLock = LinkedBlockingQueue.this .takeLock;
767                    putLock.lock();
768                    takeLock.lock();
769                    try {
770                        Node<E> node = lastRet;
771                        lastRet = null;
772                        Node<E> trail = head;
773                        Node<E> p = head.next;
774                        while (p != null && p != node) {
775                            trail = p;
776                            p = p.next;
777                        }
778                        if (p == node) {
779                            p.item = null;
780                            trail.next = p.next;
781                            if (last == p)
782                                last = trail;
783                            int c = count.getAndDecrement();
784                            if (c == capacity)
785                                notFull.signalAll();
786                        }
787                    } finally {
788                        takeLock.unlock();
789                        putLock.unlock();
790                    }
791                }
792            }
793
794            /**
795             * Save the state to a stream (that is, serialize it).
796             *
797             * @serialData The capacity is emitted (int), followed by all of
798             * its elements (each an <tt>Object</tt>) in the proper order,
799             * followed by a null
800             * @param s the stream
801             */
802            private void writeObject(java.io.ObjectOutputStream s)
803                    throws java.io.IOException {
804
805                fullyLock();
806                try {
807                    // Write out any hidden stuff, plus capacity
808                    s.defaultWriteObject();
809
810                    // Write out all elements in the proper order.
811                    for (Node<E> p = head.next; p != null; p = p.next)
812                        s.writeObject(p.item);
813
814                    // Use trailing null as sentinel
815                    s.writeObject(null);
816                } finally {
817                    fullyUnlock();
818                }
819            }
820
821            /**
822             * Reconstitute this queue instance from a stream (that is,
823             * deserialize it).
824             * @param s the stream
825             */
826            private void readObject(java.io.ObjectInputStream s)
827                    throws java.io.IOException, ClassNotFoundException {
828                // Read in capacity, and any hidden stuff
829                s.defaultReadObject();
830
831                count.set(0);
832                last = head = new Node<E>(null);
833
834                // Read in all elements and place in queue
835                for (;;) {
836                    E item = (E) s.readObject();
837                    if (item == null)
838                        break;
839                    add(item);
840                }
841            }
842        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.