001: /*
002: File: LinkedQueue.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: 25aug1998 dl added peek
013: 10dec1998 dl added isEmpty
014: 10oct1999 dl lock on node object to ensure visibility
015: */
016:
017: package org.logicalcobwebs.concurrent;
018:
019: /**
020: * A linked list based channel implementation.
021: * The algorithm avoids contention between puts
022: * and takes when the queue is not empty.
023: * Normally a put and a take can proceed simultaneously.
024: * (Although it does not allow multiple concurrent puts or takes.)
025: * This class tends to perform more efficently than
026: * other Channel implementations in producer/consumer
027: * applications.
028: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
029: **/
030:
031: public class LinkedQueue implements Channel {
032:
033: /**
034: * Dummy header node of list. The first actual node, if it exists, is always
035: * at head_.next. After each take, the old first node becomes the head.
036: **/
037: protected LinkedNode head_;
038:
039: /**
040: * Helper monitor for managing access to last node.
041: **/
042: protected final Object putLock_ = new Object();
043:
044: /**
045: * The last node of list. Put() appends to list, so modifies last_
046: **/
047: protected LinkedNode last_;
048:
049: /**
050: * The number of threads waiting for a take.
051: * Notifications are provided in put only if greater than zero.
052: * The bookkeeping is worth it here since in reasonably balanced
053: * usages, the notifications will hardly ever be necessary, so
054: * the call overhead to notify can be eliminated.
055: **/
056: protected int waitingForTake_ = 0;
057:
058: public LinkedQueue() {
059: head_ = new LinkedNode(null);
060: last_ = head_;
061: }
062:
063: /** Main mechanics for put/offer **/
064: protected void insert(Object x) {
065: synchronized (putLock_) {
066: LinkedNode p = new LinkedNode(x);
067: synchronized (last_) {
068: last_.next = p;
069: last_ = p;
070: }
071: if (waitingForTake_ > 0)
072: putLock_.notify();
073: }
074: }
075:
076: /** Main mechanics for take/poll **/
077: protected synchronized Object extract() {
078: synchronized (head_) {
079: Object x = null;
080: LinkedNode first = head_.next;
081: if (first != null) {
082: x = first.value;
083: first.value = null;
084: head_ = first;
085: }
086: return x;
087: }
088: }
089:
090: public void put(Object x) throws InterruptedException {
091: if (x == null)
092: throw new IllegalArgumentException();
093: if (Thread.interrupted())
094: throw new InterruptedException();
095: insert(x);
096: }
097:
098: public boolean offer(Object x, long msecs)
099: throws InterruptedException {
100: if (x == null)
101: throw new IllegalArgumentException();
102: if (Thread.interrupted())
103: throw new InterruptedException();
104: insert(x);
105: return true;
106: }
107:
108: public Object take() throws InterruptedException {
109: if (Thread.interrupted())
110: throw new InterruptedException();
111: // try to extract. If fail, then enter wait-based retry loop
112: Object x = extract();
113: if (x != null)
114: return x;
115: else {
116: synchronized (putLock_) {
117: try {
118: ++waitingForTake_;
119: for (;;) {
120: x = extract();
121: if (x != null) {
122: --waitingForTake_;
123: return x;
124: } else {
125: putLock_.wait();
126: }
127: }
128: } catch (InterruptedException ex) {
129: --waitingForTake_;
130: putLock_.notify();
131: throw ex;
132: }
133: }
134: }
135: }
136:
137: public Object peek() {
138: synchronized (head_) {
139: LinkedNode first = head_.next;
140: if (first != null)
141: return first.value;
142: else
143: return null;
144: }
145: }
146:
147: public boolean isEmpty() {
148: synchronized (head_) {
149: return head_.next == null;
150: }
151: }
152:
153: public Object poll(long msecs) throws InterruptedException {
154: if (Thread.interrupted())
155: throw new InterruptedException();
156: Object x = extract();
157: if (x != null)
158: return x;
159: else {
160: synchronized (putLock_) {
161: try {
162: long waitTime = msecs;
163: long start = (msecs <= 0) ? 0 : System
164: .currentTimeMillis();
165: ++waitingForTake_;
166: for (;;) {
167: x = extract();
168: if (x != null || waitTime <= 0) {
169: --waitingForTake_;
170: return x;
171: } else {
172: putLock_.wait(waitTime);
173: waitTime = msecs
174: - (System.currentTimeMillis() - start);
175: }
176: }
177: } catch (InterruptedException ex) {
178: --waitingForTake_;
179: putLock_.notify();
180: throw ex;
181: }
182: }
183: }
184: }
185: }
|