001: // $Id: LinkedListQueue.java,v 1.6 2004/09/23 16:29:56 belaban Exp $
002:
003: package org.jgroups.util;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.TimeoutException;
008:
009: import java.util.Iterator;
010: import java.util.LinkedList;
011: import java.util.NoSuchElementException;
012: import java.util.Vector;
013:
014: /**
015: * LinkedListQueue implementation based on java.util.Queue. Can be renamed to Queue.java and compiled if someone wants to
016: * use this implementation rather than the original Queue. However, a simple insertion and removal of 1 million
017: * objects into this queue shoed that it was ca. 15-20% slower than the original queue. We just include it in the
018: * JGroups distribution to maybe use it at a later point when it has become faster.
019: *
020: * @author Bela Ban
021: */
022: public class LinkedListQueue {
023:
024: final LinkedList l = new LinkedList();
025:
026: /*flag to determine the state of the Queue*/
027: boolean closed = false;
028:
029: /*lock object for synchronization*/
030: final Object mutex = new Object();
031:
032: /*the number of end markers that have been added*/
033: int num_markers = 0;
034:
035: /**
036: * if the queue closes during the runtime
037: * an endMarker object is added to the end of the queue to indicate that
038: * the queue will close automatically when the end marker is encountered
039: * This allows for a "soft" close.
040: *
041: * @see LinkedListQueue#close
042: */
043: private static final Object endMarker = new Object();
044:
045: protected static final Log log = LogFactory
046: .getLog(LinkedListQueue.class);
047:
048: /**
049: * creates an empty queue
050: */
051: public LinkedListQueue() {
052: }
053:
054: /**
055: * returns true if the Queue has been closed
056: * however, this method will return false if the queue has been closed
057: * using the close(true) method and the last element has yet not been received.
058: *
059: * @return true if the queue has been closed
060: */
061: public boolean closed() {
062: return closed;
063: }
064:
065: /**
066: * adds an object to the tail of this queue
067: * If the queue has been closed with close(true) no exception will be
068: * thrown if the queue has not been flushed yet.
069: *
070: * @param obj - the object to be added to the queue
071: * @throws QueueClosed exception if closed() returns true
072: */
073: public void add(Object obj) throws QueueClosedException {
074: if (closed)
075: throw new QueueClosedException();
076: if (this .num_markers > 0)
077: throw new QueueClosedException(
078: "LinkedListQueue.add(): queue has been closed. You can not add more elements. "
079: + "Waiting for removal of remaining elements.");
080:
081: /*lock the queue from other threads*/
082: synchronized (mutex) {
083: l.add(obj);
084:
085: /*wake up all the threads that are waiting for the lock to be released*/
086: mutex.notifyAll();
087: }
088: }
089:
090: /**
091: * Adds a new object to the head of the queue
092: * basically (obj.equals(LinkedListQueue.remove(LinkedListQueue.add(obj)))) returns true
093: * If the queue has been closed with close(true) no exception will be
094: * thrown if the queue has not been flushed yet.
095: *
096: * @param obj - the object to be added to the queue
097: * @throws QueueClosed exception if closed() returns true
098: */
099: public void addAtHead(Object obj) throws QueueClosedException {
100: if (closed)
101: throw new QueueClosedException();
102: if (this .num_markers > 0)
103: throw new QueueClosedException(
104: "LinkedListQueue.addAtHead(): queue has been closed. You can not add more elements. "
105: + "Waiting for removal of remaining elements.");
106:
107: /*lock the queue from other threads*/
108: synchronized (mutex) {
109: l.addFirst(obj);
110:
111: /*wake up all the threads that are waiting for the lock to be released*/
112: mutex.notifyAll();
113: }
114: }
115:
116: /**
117: * Removes 1 element from head or <B>blocks</B>
118: * until next element has been added
119: *
120: * @return the first element to be taken of the queue
121: */
122: public Object remove() throws QueueClosedException {
123: Object retval = null;
124:
125: /*lock the queue*/
126: synchronized (mutex) {
127: /*wait as long as the queue is empty*/
128: while (l.size() == 0) {
129: if (closed)
130: throw new QueueClosedException();
131: try {
132: mutex.wait();
133: } catch (InterruptedException ex) {
134: }
135: }
136:
137: if (closed)
138: throw new QueueClosedException();
139:
140: /*remove the head from the queue*/
141: try {
142: retval = l.removeFirst();
143: if (l.size() == 1 && l.getFirst().equals(endMarker))
144: closed = true;
145: } catch (NoSuchElementException ex) {
146: if (log.isErrorEnabled())
147: log.error("retval == null, size()=" + l.size());
148: return null;
149: }
150:
151: // we ran into an Endmarker, which means that the queue was closed before
152: // through close(true)
153: if (retval == endMarker) {
154: close(false); // mark queue as closed
155: throw new QueueClosedException();
156: }
157: }
158:
159: /*return the object, should be never null*/
160: return retval;
161: }
162:
163: /**
164: * Removes 1 element from the head.
165: * If the queue is empty the operation will wait for timeout ms.
166: * if no object is added during the timeout time, a Timout exception is thrown
167: *
168: * @param timeout - the number of milli seconds this operation will wait before it times out
169: * @return the first object in the queue
170: */
171: public Object remove(long timeout) throws QueueClosedException,
172: TimeoutException {
173: Object retval = null;
174:
175: /*lock the queue*/
176: synchronized (mutex) {
177: /*if the queue size is zero, we want to wait until a new object is added*/
178: if (l.size() == 0) {
179: if (closed)
180: throw new QueueClosedException();
181: try {
182: /*release the mutex lock and wait no more than timeout ms*/
183: mutex.wait(timeout);
184: } catch (InterruptedException ex) {
185: }
186: }
187: /*we either timed out, or got notified by the mutex lock object*/
188:
189: /*check to see if the object closed*/
190: if (closed)
191: throw new QueueClosedException();
192:
193: /*get the next value*/
194: try {
195: retval = l.removeFirst();
196: if (l.size() == 1 && l.getFirst().equals(endMarker))
197: closed = true;
198: } catch (NoSuchElementException ex) {
199: /*null result means we timed out*/
200: throw new TimeoutException();
201: }
202:
203: /*if we reached an end marker we are going to close the queue*/
204: if (retval == endMarker) {
205: close(false);
206: throw new QueueClosedException();
207: }
208: /*at this point we actually did receive a value from the queue, return it*/
209: return retval;
210: }
211: }
212:
213: /**
214: * removes a specific object from the queue.
215: * the object is matched up using the Object.equals method.
216: *
217: * @param obj the actual object to be removed from the queue
218: */
219: public void removeElement(Object obj) throws QueueClosedException {
220: boolean removed;
221:
222: if (obj == null)
223: return;
224:
225: /*lock the queue*/
226: synchronized (mutex) {
227: removed = l.remove(obj);
228: if (!removed)
229: if (log.isWarnEnabled())
230: log.warn("element " + obj
231: + " was not found in the queue");
232: }
233: }
234:
235: /**
236: * returns the first object on the queue, without removing it.
237: * If the queue is empty this object blocks until the first queue object has
238: * been added
239: *
240: * @return the first object on the queue
241: */
242: public Object peek() throws QueueClosedException {
243: Object retval = null;
244:
245: synchronized (mutex) {
246: while (l.size() == 0) {
247: if (closed)
248: throw new QueueClosedException();
249: try {
250: mutex.wait();
251: } catch (InterruptedException ex) {
252: }
253: }
254:
255: if (closed)
256: throw new QueueClosedException();
257:
258: try {
259: retval = l.getFirst();
260: } catch (NoSuchElementException ex) {
261: if (log.isErrorEnabled())
262: log.error("retval == null, size()=" + l.size());
263: return null;
264: }
265: }
266:
267: if (retval == endMarker) {
268: close(false); // mark queue as closed
269: throw new QueueClosedException();
270: }
271:
272: return retval;
273: }
274:
275: /**
276: * returns the first object on the queue, without removing it.
277: * If the queue is empty this object blocks until the first queue object has
278: * been added or the operation times out
279: *
280: * @param timeout how long in milli seconds will this operation wait for an object to be added to the queue
281: * before it times out
282: * @return the first object on the queue
283: */
284:
285: public Object peek(long timeout) throws QueueClosedException,
286: TimeoutException {
287: Object retval = null;
288:
289: synchronized (mutex) {
290: if (l.size() == 0) {
291: if (closed)
292: throw new QueueClosedException();
293: try {
294: mutex.wait(timeout);
295: } catch (InterruptedException ex) {
296: }
297: }
298: if (closed)
299: throw new QueueClosedException();
300:
301: try {
302: retval = l.getFirst();
303: } catch (NoSuchElementException ex) {
304: /*null result means we timed out*/
305: throw new TimeoutException();
306: }
307:
308: if (retval == endMarker) {
309: close(false);
310: throw new QueueClosedException();
311: }
312: return retval;
313: }
314: }
315:
316: /**
317: * Marks the queues as closed. When an <code>add</code> or <code>remove</code> operation is
318: * attempted on a closed queue, an exception is thrown.
319: *
320: * @param flush_entries When true, a end-of-entries marker is added to the end of the queue.
321: * Entries may be added and removed, but when the end-of-entries marker
322: * is encountered, the queue is marked as closed. This allows to flush
323: * pending messages before closing the queue.
324: */
325: public void close(boolean flush_entries) {
326: if (flush_entries) {
327: try {
328: add(endMarker); // add an end-of-entries marker to the end of the queue
329: num_markers++;
330: } catch (QueueClosedException closed) {
331: }
332: return;
333: }
334:
335: synchronized (mutex) {
336: closed = true;
337: try {
338: mutex.notifyAll();
339: } catch (Exception e) {
340: if (log.isErrorEnabled())
341: log.error("exception=" + e);
342: }
343: }
344: }
345:
346: /**
347: * resets the queue.
348: * This operation removes all the objects in the queue and marks the queue open
349: */
350: public void reset() {
351: num_markers = 0;
352: if (!closed)
353: close(false);
354:
355: synchronized (mutex) {
356: l.clear();
357: closed = false;
358: }
359: }
360:
361: /**
362: * returns the number of objects that are currently in the queue
363: */
364: public int size() {
365: return l.size() - num_markers;
366: }
367:
368: /**
369: * prints the size of the queue
370: */
371: public String toString() {
372: return "LinkedListQueue (" + size() + ") messages [closed="
373: + closed + ']';
374: }
375:
376: /**
377: * returns a vector with all the objects currently in the queue
378: */
379: public Vector getContents() {
380: Vector retval = new Vector();
381:
382: synchronized (mutex) {
383: for (Iterator it = l.iterator(); it.hasNext();) {
384: retval.addElement(it.next());
385: }
386: }
387: return retval;
388: }
389:
390: }
|