001: /*
002: File: LinkedQueue.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 11Jun1998 dl Create public version
012: 25aug1998 dl added peek
013: 10dec1998 dl added isEmpty
014: 10oct1999 dl lock on node object to ensure visibility
015: */
016:
017: package org.dbunit.util.concurrent;
018:
019: import org.slf4j.Logger;
020: import org.slf4j.LoggerFactory;
021:
022: /**
023: * A linked list based channel implementation.
024: * The algorithm avoids contention between puts
025: * and takes when the queue is not empty.
026: * Normally a put and a take can proceed simultaneously.
027: * (Although it does not allow multiple concurrent puts or takes.)
028: * This class tends to perform more efficently than
029: * other Channel implementations in producer/consumer
030: * applications.
031: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
032: **/
033:
034: public class LinkedQueue implements Channel {
035:
036: /**
037: * Logger for this class
038: */
039: private static final Logger logger = LoggerFactory
040: .getLogger(LinkedQueue.class);
041:
042: /**
043: * Dummy header node of list. The first actual node, if it exists, is always
044: * at head_.next. After each take, the old first node becomes the head.
045: **/
046: protected LinkedNode head_;
047:
048: /**
049: * Helper monitor for managing access to last node.
050: **/
051: protected final Object putLock_ = new Object();
052:
053: /**
054: * The last node of list. Put() appends to list, so modifies last_
055: **/
056: protected LinkedNode last_;
057:
058: /**
059: * The number of threads waiting for a take.
060: * Notifications are provided in put only if greater than zero.
061: * The bookkeeping is worth it here since in reasonably balanced
062: * usages, the notifications will hardly ever be necessary, so
063: * the call overhead to notify can be eliminated.
064: **/
065: protected int waitingForTake_ = 0;
066:
067: public LinkedQueue() {
068: head_ = new LinkedNode(null);
069: last_ = head_;
070: }
071:
072: /** Main mechanics for put/offer **/
073: protected void insert(Object x) {
074: logger.debug("insert(x=" + x + ") - start");
075:
076: synchronized (putLock_) {
077: LinkedNode p = new LinkedNode(x);
078: synchronized (last_) {
079: last_.next = p;
080: last_ = p;
081: }
082: if (waitingForTake_ > 0)
083: putLock_.notify();
084: }
085: }
086:
087: /** Main mechanics for take/poll **/
088: protected synchronized Object extract() {
089: logger.debug("extract() - start");
090:
091: synchronized (head_) {
092: Object x = null;
093: LinkedNode first = head_.next;
094: if (first != null) {
095: x = first.value;
096: first.value = null;
097: head_ = first;
098: }
099: return x;
100: }
101: }
102:
103: public void put(Object x) throws InterruptedException {
104: logger.debug("put(x=" + x + ") - start");
105:
106: if (x == null)
107: throw new IllegalArgumentException();
108: if (Thread.interrupted())
109: throw new InterruptedException();
110: insert(x);
111: }
112:
113: public boolean offer(Object x, long msecs)
114: throws InterruptedException {
115: logger.debug("offer(x=" + x + ", msecs=" + msecs + ") - start");
116:
117: if (x == null)
118: throw new IllegalArgumentException();
119: if (Thread.interrupted())
120: throw new InterruptedException();
121: insert(x);
122: return true;
123: }
124:
125: public Object take() throws InterruptedException {
126: logger.debug("take() - start");
127:
128: if (Thread.interrupted())
129: throw new InterruptedException();
130: // try to extract. If fail, then enter wait-based retry loop
131: Object x = extract();
132: if (x != null)
133: return x;
134: else {
135: synchronized (putLock_) {
136: try {
137: ++waitingForTake_;
138: for (;;) {
139: x = extract();
140: if (x != null) {
141: --waitingForTake_;
142: return x;
143: } else {
144: putLock_.wait();
145: }
146: }
147: } catch (InterruptedException ex) {
148: logger.error("take()", ex);
149:
150: --waitingForTake_;
151: putLock_.notify();
152: throw ex;
153: }
154: }
155: }
156: }
157:
158: public Object peek() {
159: logger.debug("peek() - start");
160:
161: synchronized (head_) {
162: LinkedNode first = head_.next;
163: if (first != null)
164: return first.value;
165: else
166: return null;
167: }
168: }
169:
170: public boolean isEmpty() {
171: logger.debug("isEmpty() - start");
172:
173: synchronized (head_) {
174: return head_.next == null;
175: }
176: }
177:
178: public Object poll(long msecs) throws InterruptedException {
179: logger.debug("poll(msecs=" + msecs + ") - start");
180:
181: if (Thread.interrupted())
182: throw new InterruptedException();
183: Object x = extract();
184: if (x != null)
185: return x;
186: else {
187: synchronized (putLock_) {
188: try {
189: long waitTime = msecs;
190: long start = (msecs <= 0) ? 0 : System
191: .currentTimeMillis();
192: ++waitingForTake_;
193: for (;;) {
194: x = extract();
195: if (x != null || waitTime <= 0) {
196: --waitingForTake_;
197: return x;
198: } else {
199: putLock_.wait(waitTime);
200: waitTime = msecs
201: - (System.currentTimeMillis() - start);
202: }
203: }
204: } catch (InterruptedException ex) {
205: logger.error("poll()", ex);
206:
207: --waitingForTake_;
208: putLock_.notify();
209: throw ex;
210: }
211: }
212: }
213: }
214: }
|