001: // $Id: Queue.java,v 1.29 2006/08/08 15:34:22 belaban Exp $
002:
003: package org.jgroups.util;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.TimeoutException;
008:
009: import java.util.LinkedList;
010: import java.util.Collection;
011: import java.util.Iterator;
012: import java.util.Enumeration;
013:
014: /**
015: * Elements are added at the tail and removed from the head. Class is thread-safe in that
016: * 1 producer and 1 consumer may add/remove elements concurrently. The class is not
017: * explicitely designed for multiple producers or consumers. Implemented as a linked
018: * list, so that removal of an element at the head does not cause a right-shift of the
019: * remaining elements (as in a Vector-based implementation).
020: * @author Bela Ban
021: */
022: public class Queue {
023:
024: /*head and the tail of the list so that we can easily add and remove objects*/
025: private Element head = null, tail = null;
026:
027: /*flag to determine the state of the queue*/
028: private boolean closed = false;
029:
030: /*current size of the queue*/
031: private int size = 0;
032:
033: /* Lock object for synchronization. Is notified when element is added */
034: private final Object mutex = new Object();
035:
036: /** Lock object for syncing on removes. It is notified when an object is removed */
037: // Object remove_mutex=new Object();
038: /*the number of end markers that have been added*/
039: private int num_markers = 0;
040:
041: /**
042: * if the queue closes during the runtime
043: * an endMarker object is added to the end of the queue to indicate that
044: * the queue will close automatically when the end marker is encountered
045: * This allows for a "soft" close.
046: * @see Queue#close
047: */
048: private static final Object endMarker = new Object();
049:
050: protected static final Log log = LogFactory.getLog(Queue.class);
051:
052: /**
053: * the class Element indicates an object in the queue.
054: * This element allows for the linked list algorithm by always holding a
055: * reference to the next element in the list.
056: * if Element.next is null, then this element is the tail of the list.
057: */
058: static class Element {
059: /*the actual value stored in the queue*/
060: Object obj = null;
061: /*pointer to the next item in the (queue) linked list*/
062: Element next = null;
063:
064: /**
065: * creates an Element object holding its value
066: * @param o - the object to be stored in the queue position
067: */
068: Element(Object o) {
069: obj = o;
070: }
071:
072: /**
073: * prints out the value of the object
074: */
075: public String toString() {
076: return obj != null ? obj.toString() : "null";
077: }
078: }
079:
080: /**
081: * creates an empty queue
082: */
083: public Queue() {
084: }
085:
086: /**
087: * Returns the first element. Returns null if no elements are available.
088: */
089: public Object getFirst() {
090: synchronized (mutex) {
091: return head != null ? head.obj : null;
092: }
093: }
094:
095: /**
096: * Returns the last element. Returns null if no elements are available.
097: */
098: public Object getLast() {
099: synchronized (mutex) {
100: return tail != null ? tail.obj : null;
101: }
102: }
103:
104: /**
105: * returns true if the Queue has been closed
106: * however, this method will return false if the queue has been closed
107: * using the close(true) method and the last element has yet not been received.
108: * @return true if the queue has been closed
109: */
110: public boolean closed() {
111: synchronized (mutex) {
112: return closed;
113: }
114: }
115:
116: /**
117: * adds an object to the tail of this queue
118: * If the queue has been closed with close(true) no exception will be
119: * thrown if the queue has not been flushed yet.
120: * @param obj - the object to be added to the queue
121: * @exception QueueClosedException exception if closed() returns true
122: */
123: public void add(Object obj) throws QueueClosedException {
124: if (obj == null) {
125: if (log.isErrorEnabled())
126: log.error("argument must not be null");
127: return;
128: }
129:
130: /*lock the queue from other threads*/
131: synchronized (mutex) {
132: if (closed)
133: throw new QueueClosedException();
134: if (this .num_markers > 0)
135: throw new QueueClosedException(
136: "queue has been closed. You can not add more elements. "
137: + "Waiting for removal of remaining elements.");
138: addInternal(obj);
139:
140: /*wake up all the threads that are waiting for the lock to be released*/
141: mutex.notifyAll();
142: }
143: }
144:
145: public void addAll(Collection c) throws QueueClosedException {
146: if (c == null) {
147: if (log.isErrorEnabled())
148: log.error("argument must not be null");
149: return;
150: }
151:
152: /*lock the queue from other threads*/
153: synchronized (mutex) {
154: if (closed)
155: throw new QueueClosedException();
156: if (this .num_markers > 0)
157: throw new QueueClosedException(
158: "queue has been closed. You can not add more elements. "
159: + "Waiting for removal of remaining elements.");
160:
161: Object obj;
162: for (Iterator it = c.iterator(); it.hasNext();) {
163: obj = it.next();
164: if (obj != null)
165: addInternal(obj);
166: }
167:
168: /*wake up all the threads that are waiting for the lock to be released*/
169: mutex.notifyAll();
170: }
171: }
172:
173: public void addAll(List l) throws QueueClosedException {
174: if (l == null) {
175: if (log.isErrorEnabled())
176: log.error("argument must not be null");
177: return;
178: }
179:
180: /*lock the queue from other threads*/
181: synchronized (mutex) {
182: if (closed)
183: throw new QueueClosedException();
184: if (this .num_markers > 0)
185: throw new QueueClosedException(
186: "queue has been closed. You can not add more elements. "
187: + "Waiting for removal of remaining elements.");
188:
189: Object obj;
190: for (Enumeration en = l.elements(); en.hasMoreElements();) {
191: obj = en.nextElement();
192: if (obj != null)
193: addInternal(obj);
194: }
195:
196: /*wake up all the threads that are waiting for the lock to be released*/
197: mutex.notifyAll();
198: }
199: }
200:
201: /**
202: * Adds a new object to the head of the queue
203: * basically (obj.equals(queue.remove(queue.add(obj)))) returns true
204: * If the queue has been closed with close(true) no exception will be
205: * thrown if the queue has not been flushed yet.
206: * @param obj - the object to be added to the queue
207: * @exception QueueClosedException exception if closed() returns true
208: */
209: public void addAtHead(Object obj) throws QueueClosedException {
210: if (obj == null) {
211: if (log.isErrorEnabled())
212: log.error("argument must not be null");
213: return;
214: }
215:
216: /*lock the queue from other threads*/
217: synchronized (mutex) {
218: if (closed)
219: throw new QueueClosedException();
220: if (this .num_markers > 0)
221: throw new QueueClosedException(
222: "Queue.addAtHead(): queue has been closed. You can not add more elements. "
223: + "Waiting for removal of remaining elements.");
224:
225: Element el = new Element(obj);
226: /*check the head element in the list*/
227: if (head == null) {
228: /*this is the first object, we could have done add(obj) here*/
229: head = el;
230: tail = head;
231: size = 1;
232: } else {
233: /*set the head element to be the child of this one*/
234: el.next = head;
235: /*set the head to point to the recently added object*/
236: head = el;
237: /*increase the size*/
238: size++;
239: }
240: /*wake up all the threads that are waiting for the lock to be released*/
241: mutex.notifyAll();
242: }
243: }
244:
245: /**
246: * Removes 1 element from head or <B>blocks</B>
247: * until next element has been added or until queue has been closed
248: * @return the first element to be taken of the queue
249: */
250: public Object remove() throws QueueClosedException {
251: Object retval;
252: synchronized (mutex) {
253: /*wait as long as the queue is empty. return when an element is present or queue is closed*/
254: while (size == 0) {
255: if (closed)
256: throw new QueueClosedException();
257: try {
258: mutex.wait();
259: } catch (InterruptedException ex) {
260: }
261: }
262:
263: if (closed)
264: throw new QueueClosedException();
265:
266: /*remove the head from the queue, if we make it to this point, retval should not be null !*/
267: retval = removeInternal();
268: if (retval == null)
269: if (log.isErrorEnabled())
270: log
271: .error("element was null, should never be the case");
272: }
273:
274: /*
275: * we ran into an Endmarker, which means that the queue was closed before
276: * through close(true)
277: */
278: // if(retval == endMarker) {
279: // close(false); // mark queue as closed
280: // throw new QueueClosedException();
281: // }
282: return retval;
283: }
284:
285: /**
286: * Removes 1 element from the head.
287: * If the queue is empty the operation will wait for timeout ms.
288: * if no object is added during the timeout time, a Timout exception is thrown
289: * @param timeout - the number of milli seconds this operation will wait before it times out
290: * @return the first object in the queue
291: */
292: public Object remove(long timeout) throws QueueClosedException,
293: TimeoutException {
294: Object retval;
295:
296: synchronized (mutex) {
297: if (closed)
298: throw new QueueClosedException();
299:
300: /*if the queue size is zero, we want to wait until a new object is added*/
301: if (size == 0) {
302: try {
303: /*release the mutex lock and wait no more than timeout ms*/
304: mutex.wait(timeout);
305: } catch (InterruptedException ex) {
306: }
307: }
308: /*we either timed out, or got notified by the mutex lock object*/
309: if (closed)
310: throw new QueueClosedException();
311:
312: /*get the next value*/
313: retval = removeInternal();
314: /*null result means we timed out*/
315: if (retval == null)
316: throw new TimeoutException("timeout=" + timeout + "ms");
317:
318: /*if we reached an end marker we are going to close the queue*/
319: // if(retval == endMarker) {
320: // close(false);
321: // throw new QueueClosedException();
322: // }
323: /*at this point we actually did receive a value from the queue, return it*/
324: return retval;
325: }
326: }
327:
328: /**
329: * removes a specific object from the queue.
330: * the object is matched up using the Object.equals method.
331: * @param obj the actual object to be removed from the queue
332: */
333: public void removeElement(Object obj) throws QueueClosedException {
334: Element el, tmp_el;
335:
336: if (obj == null) {
337: if (log.isErrorEnabled())
338: log.error("argument must not be null");
339: return;
340: }
341:
342: synchronized (mutex) {
343: if (closed) /*check to see if the queue is closed*/
344: throw new QueueClosedException();
345:
346: el = head;
347:
348: /*the queue is empty*/
349: if (el == null)
350: return;
351:
352: /*check to see if the head element is the one to be removed*/
353: if (el.obj.equals(obj)) {
354: /*the head element matched we will remove it*/
355: head = el.next;
356: el.next = null;
357: el.obj = null;
358: /*check if we only had one object left
359: *at this time the queue becomes empty
360: *this will set the tail=head=null
361: */
362: if (size == 1)
363: tail = head; // null
364: decrementSize();
365: return;
366: }
367:
368: /*look through the other elements*/
369: while (el.next != null) {
370: if (el.next.obj.equals(obj)) {
371: tmp_el = el.next;
372: if (tmp_el == tail) // if it is the last element, move tail one to the left (bela Sept 20 2002)
373: tail = el;
374: el.next.obj = null;
375: el.next = el.next.next; // point to the el past the next one. can be null.
376: tmp_el.next = null;
377: tmp_el.obj = null;
378: decrementSize();
379: break;
380: }
381: el = el.next;
382: }
383: }
384: }
385:
386: /**
387: * returns the first object on the queue, without removing it.
388: * If the queue is empty this object blocks until the first queue object has
389: * been added
390: * @return the first object on the queue
391: */
392: public Object peek() throws QueueClosedException {
393: Object retval;
394:
395: synchronized (mutex) {
396: while (size == 0) {
397: if (closed)
398: throw new QueueClosedException();
399: try {
400: mutex.wait();
401: } catch (InterruptedException ex) {
402: }
403: }
404:
405: if (closed)
406: throw new QueueClosedException();
407:
408: retval = (head != null) ? head.obj : null;
409: }
410:
411: if (retval == endMarker) {
412: close(false); // mark queue as closed
413: throw new QueueClosedException();
414: }
415:
416: return retval;
417: }
418:
419: /**
420: * returns the first object on the queue, without removing it.
421: * If the queue is empty this object blocks until the first queue object has
422: * been added or the operation times out
423: * @param timeout how long in milli seconds will this operation wait for an object to be added to the queue
424: * before it times out
425: * @return the first object on the queue
426: */
427: public Object peek(long timeout) throws QueueClosedException,
428: TimeoutException {
429: Object retval;
430:
431: synchronized (mutex) {
432: if (size == 0) {
433: if (closed)
434: throw new QueueClosedException();
435: try {
436: mutex.wait(timeout);
437: } catch (InterruptedException ex) {
438: }
439: }
440: if (closed)
441: throw new QueueClosedException();
442:
443: retval = head != null ? head.obj : null;
444:
445: if (retval == null)
446: throw new TimeoutException("timeout=" + timeout + "ms");
447:
448: if (retval == endMarker) {
449: close(false);
450: throw new QueueClosedException();
451: }
452: return retval;
453: }
454: }
455:
456: /** Removes all elements from the queue. This method can succeed even when the queue is closed */
457: public void clear() {
458: synchronized (mutex) {
459: head = tail = null;
460: size = 0;
461: num_markers = 0;
462: mutex.notifyAll();
463: }
464: }
465:
466: /**
467: Marks the queues as closed. When an <code>add</code> or <code>remove</code> operation is
468: attempted on a closed queue, an exception is thrown.
469: @param flush_entries When true, a end-of-entries marker is added to the end of the queue.
470: Entries may be added and removed, but when the end-of-entries marker
471: is encountered, the queue is marked as closed. This allows to flush
472: pending messages before closing the queue.
473: */
474: public void close(boolean flush_entries) {
475: synchronized (mutex) {
476: if (flush_entries && size > 0) {
477: try {
478: add(endMarker); // add an end-of-entries marker to the end of the queue
479: num_markers++;
480: } catch (QueueClosedException closed_ex) {
481: }
482: return;
483: }
484: closed = true;
485: mutex.notifyAll();
486: }
487: }
488:
489: /** Waits until the queue has been closed. Returns immediately if already closed
490: * @param timeout Number of milliseconds to wait. A value <= 0 means to wait forever
491: */
492: public void waitUntilClosed(long timeout) {
493: synchronized (mutex) {
494: if (closed)
495: return;
496: try {
497: mutex.wait(timeout);
498: } catch (InterruptedException e) {
499: }
500: }
501: }
502:
503: /**
504: * resets the queue.
505: * This operation removes all the objects in the queue and marks the queue open
506: */
507: public void reset() {
508: synchronized (mutex) {
509: num_markers = 0;
510: if (!closed)
511: close(false);
512: size = 0;
513: head = null;
514: tail = null;
515: closed = false;
516: mutex.notifyAll();
517: }
518: }
519:
520: /**
521: * Returns all the elements of the queue
522: * @return A copy of the queue
523: */
524: public LinkedList values() {
525: LinkedList retval = new LinkedList();
526: synchronized (mutex) {
527: Element el = head;
528: while (el != null) {
529: retval.add(el.obj);
530: el = el.next;
531: }
532: }
533: return retval;
534: }
535:
536: /**
537: * returns the number of objects that are currently in the queue
538: */
539: public int size() {
540: synchronized (mutex) {
541: return size - num_markers;
542: }
543: }
544:
545: /**
546: * prints the size of the queue
547: */
548: public String toString() {
549: return "Queue (" + size() + ") elements";
550: }
551:
552: /* ------------------------------------- Private Methods ----------------------------------- */
553:
554: private final void addInternal(Object obj) {
555: /*create a new linked list element*/
556: Element el = new Element(obj);
557: /*check the first element*/
558: if (head == null) {
559: /*the object added is the first element*/
560: /*set the head to be this object*/
561: head = el;
562: /*set the tail to be this object*/
563: tail = head;
564: /*set the size to be one, since the queue was empty*/
565: size = 1;
566: } else {
567: /*add the object to the end of the linked list*/
568: tail.next = el;
569: /*set the tail to point to the last element*/
570: tail = el;
571: /*increase the size*/
572: size++;
573: }
574: }
575:
576: /**
577: * Removes the first element. Returns null if no elements in queue.
578: * Always called with mutex locked (we don't have to lock mutex ourselves)
579: */
580: private Object removeInternal() {
581: Element retval;
582: Object obj;
583:
584: /*if the head is null, the queue is empty*/
585: if (head == null)
586: return null;
587:
588: retval = head; // head must be non-null now
589:
590: head = head.next;
591: if (head == null)
592: tail = null;
593:
594: decrementSize();
595: if (head != null && head.obj == endMarker) {
596: closed = true;
597: mutex.notifyAll();
598: }
599:
600: retval.next = null;
601: obj = retval.obj;
602: retval.obj = null;
603: return obj;
604: }
605:
606: /** Doesn't need to be synchronized; is always called from synchronized methods */
607: final private void decrementSize() {
608: size--;
609: if (size < 0)
610: size = 0;
611: }
612:
613: /* ---------------------------------- End of Private Methods -------------------------------- */
614:
615: }
|