001: // $Id: Queue2.java,v 1.5 2004/12/31 14:10:40 belaban Exp $
002:
003: package org.jgroups.util;
004:
005: import EDU.oswego.cs.dl.util.concurrent.CondVar;
006: import EDU.oswego.cs.dl.util.concurrent.Mutex;
007: import EDU.oswego.cs.dl.util.concurrent.Sync;
008: import org.apache.commons.logging.Log;
009: import org.apache.commons.logging.LogFactory;
010: import org.jgroups.TimeoutException;
011:
012: import java.util.Vector;
013:
014: /**
015: * Elements are added at the tail and removed from the head. Class is thread-safe in that
016: * 1 producer and 1 consumer may add/remove elements concurrently. The class is not
017: * explicitely designed for multiple producers or consumers. Implemented as a linked
018: * list, so that removal of an element at the head does not cause a right-shift of the
019: * remaining elements (as in a Vector-based implementation).
020: * <p>Implementation is based on util.concurrent.* classes
021: * @author Bela Ban
022: * @author Filip Hanik
023: */
024: public class Queue2 {
025: /*head and the tail of the list so that we can easily add and remove objects*/
026: Element head = null, tail = null;
027:
028: /*flag to determine the state of the queue*/
029: boolean closed = false;
030:
031: /*current size of the queue*/
032: int size = 0;
033:
034: /* Lock object for synchronization. Is notified when element is added */
035: final Sync mutex = new Mutex();
036:
037: /** Signals to listeners when an element has been added */
038: final CondVar add_condvar = new CondVar(mutex);
039:
040: /** Signals to listeners when an element has been removed */
041: final CondVar remove_condvar = new CondVar(mutex);
042:
043: /*the number of end markers that have been added*/
044: int num_markers = 0;
045:
046: protected static final Log log = LogFactory.getLog(Queue2.class);
047:
048: /**
049: * if the queue closes during the runtime
050: * an endMarker object is added to the end of the queue to indicate that
051: * the queue will close automatically when the end marker is encountered
052: * This allows for a "soft" close.
053: * @see Queue#close
054: */
055: private static final Object endMarker = new Object();
056:
057: /**
058: * the class Element indicates an object in the queue.
059: * This element allows for the linked list algorithm by always holding a
060: * reference to the next element in the list.
061: * if Element.next is null, then this element is the tail of the list.
062: */
063: class Element {
064: /*the actual value stored in the queue*/
065: Object obj = null;
066: /*pointer to the next item in the (queue) linked list*/
067: Element next = null;
068:
069: /**
070: * creates an Element object holding its value
071: * @param o - the object to be stored in the queue position
072: */
073: Element(Object o) {
074: obj = o;
075: }
076:
077: /**
078: * prints out the value of the object
079: */
080: public String toString() {
081: return obj != null ? obj.toString() : "null";
082: }
083: }
084:
085: /**
086: * creates an empty queue
087: */
088: public Queue2() {
089: }
090:
091: /**
092: * Returns the first element. Returns null if no elements are available.
093: */
094: public Object getFirst() {
095: return head != null ? head.obj : null;
096: }
097:
098: /**
099: * Returns the last element. Returns null if no elements are available.
100: */
101: public Object getLast() {
102: return tail != null ? tail.obj : null;
103: }
104:
105: /**
106: * returns true if the Queue has been closed
107: * however, this method will return false if the queue has been closed
108: * using the close(true) method and the last element has yet not been received.
109: * @return true if the queue has been closed
110: */
111: public boolean closed() {
112: return closed;
113: }
114:
115: /**
116: * adds an object to the tail of this queue
117: * If the queue has been closed with close(true) no exception will be
118: * thrown if the queue has not been flushed yet.
119: * @param obj - the object to be added to the queue
120: * @exception QueueClosedException exception if closed() returns true
121: */
122: public void add(Object obj) throws QueueClosedException {
123: if (obj == null) {
124: if (log.isErrorEnabled())
125: log.error("argument must not be null");
126: return;
127: }
128: if (closed)
129: throw new QueueClosedException();
130: if (this .num_markers > 0)
131: throw new QueueClosedException(
132: "Queue2.add(): queue has been closed. You can not add more elements. "
133: + "Waiting for removal of remaining elements.");
134:
135: try {
136: mutex.acquire();
137:
138: /*create a new linked list element*/
139: Element el = new Element(obj);
140: /*check the first element*/
141: if (head == null) {
142: /*the object added is the first element*/
143: /*set the head to be this object*/
144: head = el;
145: /*set the tail to be this object*/
146: tail = head;
147: /*set the size to be one, since the queue was empty*/
148: size = 1;
149: } else {
150: /*add the object to the end of the linked list*/
151: tail.next = el;
152: /*set the tail to point to the last element*/
153: tail = el;
154: /*increase the size*/
155: size++;
156: }
157: /*wake up all the threads that are waiting for the lock to be released*/
158: add_condvar.broadcast(); // todo: maybe signal is all we need ?
159: } catch (InterruptedException e) {
160: } finally {
161: mutex.release();
162: }
163: }
164:
165: /**
166: * Adds a new object to the head of the queue
167: * basically (obj.equals(queue.remove(queue.add(obj)))) returns true
168: * If the queue has been closed with close(true) no exception will be
169: * thrown if the queue has not been flushed yet.
170: * @param obj - the object to be added to the queue
171: * @exception QueueClosedException exception if closed() returns true
172: *
173: */
174: public void addAtHead(Object obj) throws QueueClosedException {
175: if (obj == null) {
176: if (log.isErrorEnabled())
177: log.error("argument must not be null");
178: return;
179: }
180: if (closed)
181: throw new QueueClosedException();
182: if (this .num_markers > 0)
183: throw new QueueClosedException(
184: "Queue2.addAtHead(): queue has been closed. You can not add more elements. "
185: + "Waiting for removal of remaining elements.");
186:
187: try {
188: mutex.acquire();
189: Element el = new Element(obj);
190: /*check the head element in the list*/
191: if (head == null) {
192: /*this is the first object, we could have done add(obj) here*/
193: head = el;
194: tail = head;
195: size = 1;
196: } else {
197: /*set the head element to be the child of this one*/
198: el.next = head;
199: /*set the head to point to the recently added object*/
200: head = el;
201: /*increase the size*/
202: size++;
203: }
204: /*wake up all the threads that are waiting for the lock to be released*/
205: add_condvar.broadcast();
206: } catch (InterruptedException e) {
207:
208: } finally {
209: mutex.release();
210: }
211: }
212:
213: /**
214: * Removes 1 element from head or <B>blocks</B>
215: * until next element has been added or until queue has been closed
216: * @return the first element to be taken of the queue
217: */
218: public Object remove() throws QueueClosedException {
219: /*initialize the return value*/
220: Object retval = null;
221:
222: try {
223: mutex.acquire();
224: /*wait as long as the queue is empty. return when an element is present or queue is closed*/
225: while (size == 0) {
226: if (closed)
227: throw new QueueClosedException();
228: try {
229: add_condvar.await();
230: } catch (InterruptedException ex) {
231: }
232: }
233:
234: if (closed)
235: throw new QueueClosedException();
236:
237: /*remove the head from the queue, if we make it to this point, retval should not be null !*/
238: retval = removeInternal();
239: if (retval == null)
240: if (log.isErrorEnabled())
241: log
242: .error("element was null, should never be the case");
243: } catch (InterruptedException e) {
244: ;
245: } finally {
246: mutex.release();
247: }
248:
249: /*
250: * we ran into an Endmarker, which means that the queue was closed before
251: * through close(true)
252: */
253: if (retval == endMarker) {
254: close(false); // mark queue as closed
255: throw new QueueClosedException();
256: }
257:
258: /*return the object*/
259: return retval;
260: }
261:
262: /**
263: * Removes 1 element from the head.
264: * If the queue is empty the operation will wait for timeout ms.
265: * if no object is added during the timeout time, a Timout exception is thrown
266: * @param timeout - the number of milli seconds this operation will wait before it times out
267: * @return the first object in the queue
268: */
269: public Object remove(long timeout) throws QueueClosedException,
270: TimeoutException {
271: Object retval = null;
272:
273: try {
274: mutex.acquire();
275: /*if the queue size is zero, we want to wait until a new object is added*/
276: if (size == 0) {
277: if (closed)
278: throw new QueueClosedException();
279: try {
280: /*release the mutex lock and wait no more than timeout ms*/
281: add_condvar.timedwait(timeout);
282: } catch (InterruptedException ex) {
283: }
284: }
285: /*we either timed out, or got notified by the mutex lock object*/
286:
287: /*check to see if the object closed*/
288: if (closed)
289: throw new QueueClosedException();
290:
291: /*get the next value*/
292: retval = removeInternal();
293: /*null result means we timed out*/
294: if (retval == null)
295: throw new TimeoutException();
296:
297: /*if we reached an end marker we are going to close the queue*/
298: if (retval == endMarker) {
299: close(false);
300: throw new QueueClosedException();
301: }
302: } catch (InterruptedException e) {
303:
304: } finally {
305: mutex.release();
306: }
307:
308: /*at this point we actually did receive a value from the queue, return it*/
309: return retval;
310: }
311:
312: /**
313: * removes a specific object from the queue.
314: * the object is matched up using the Object.equals method.
315: * @param obj the actual object to be removed from the queue
316: */
317: public void removeElement(Object obj) throws QueueClosedException {
318: Element el, tmp_el;
319: boolean removed = false;
320:
321: if (obj == null) {
322: if (log.isErrorEnabled())
323: log.error("argument must not be null");
324: return;
325: }
326:
327: try {
328: mutex.acquire();
329: el = head;
330:
331: /*the queue is empty*/
332: if (el == null)
333: return;
334:
335: /*check to see if the head element is the one to be removed*/
336: if (el.obj.equals(obj)) {
337: /*the head element matched we will remove it*/
338: head = el.next;
339: el.next = null;
340: /*check if we only had one object left
341: *at this time the queue becomes empty
342: *this will set the tail=head=null
343: */
344: if (size == 1)
345: tail = head; // null
346: decrementSize();
347: removed = true;
348:
349: /*and end the operation, it was successful*/
350: return;
351: }
352:
353: /*look through the other elements*/
354: while (el.next != null) {
355: if (el.next.obj.equals(obj)) {
356: tmp_el = el.next;
357: if (tmp_el == tail) // if it is the last element, move tail one to the left (bela Sept 20 2002)
358: tail = el;
359: el.next = el.next.next; // point to the el past the next one. can be null.
360: tmp_el.next = null;
361: decrementSize();
362: removed = true;
363: break;
364: }
365: el = el.next;
366: }
367: } catch (InterruptedException e) {
368:
369: } finally {
370: if (removed)
371: remove_condvar.broadcast();
372: mutex.release();
373: }
374: }
375:
376: /**
377: * returns the first object on the queue, without removing it.
378: * If the queue is empty this object blocks until the first queue object has
379: * been added
380: * @return the first object on the queue
381: */
382: public Object peek() throws QueueClosedException {
383: Object retval = null;
384:
385: try {
386: mutex.acquire();
387: while (size == 0) {
388: if (closed)
389: throw new QueueClosedException();
390: try {
391: add_condvar.await();
392: } catch (InterruptedException ex) {
393: }
394: }
395:
396: if (closed)
397: throw new QueueClosedException();
398:
399: retval = (head != null) ? head.obj : null;
400:
401: // @remove:
402: if (retval == null) {
403: // print some diagnostics
404: if (log.isErrorEnabled())
405: log.error("retval is null: head=" + head
406: + ", tail=" + tail + ", size()=" + size()
407: + ", num_markers=" + num_markers
408: + ", closed()=" + closed());
409: }
410: } catch (InterruptedException e) {
411:
412: } finally {
413: mutex.release();
414: }
415:
416: if (retval == endMarker) {
417: close(false); // mark queue as closed
418: throw new QueueClosedException();
419: }
420:
421: return retval;
422: }
423:
424: /**
425: * returns the first object on the queue, without removing it.
426: * If the queue is empty this object blocks until the first queue object has
427: * been added or the operation times out
428: * @param timeout how long in milli seconds will this operation wait for an object to be added to the queue
429: * before it times out
430: * @return the first object on the queue
431: */
432:
433: public Object peek(long timeout) throws QueueClosedException,
434: TimeoutException {
435: Object retval = null;
436:
437: try {
438: mutex.acquire();
439: if (size == 0) {
440: if (closed)
441: throw new QueueClosedException();
442: try {
443: mutex.wait(timeout);
444: } catch (InterruptedException ex) {
445: }
446: }
447: if (closed)
448: throw new QueueClosedException();
449:
450: retval = head != null ? head.obj : null;
451:
452: if (retval == null)
453: throw new TimeoutException();
454:
455: if (retval == endMarker) {
456: close(false);
457: throw new QueueClosedException();
458: }
459: } catch (InterruptedException e) {
460:
461: } finally {
462: mutex.release();
463: }
464: return retval;
465: }
466:
467: /**
468: Marks the queues as closed. When an <code>add</code> or <code>remove</code> operation is
469: attempted on a closed queue, an exception is thrown.
470: @param flush_entries When true, a end-of-entries marker is added to the end of the queue.
471: Entries may be added and removed, but when the end-of-entries marker
472: is encountered, the queue is marked as closed. This allows to flush
473: pending messages before closing the queue.
474: */
475: public void close(boolean flush_entries) {
476: if (flush_entries) {
477: try {
478: add(endMarker); // add an end-of-entries marker to the end of the queue
479: num_markers++;
480: } catch (QueueClosedException closed) {
481: }
482: return;
483: }
484:
485: try {
486: mutex.acquire();
487: closed = true;
488: try {
489: add_condvar.broadcast();
490: remove_condvar.broadcast();
491: } catch (Exception e) {
492: if (log.isErrorEnabled())
493: log.error("exception=" + e);
494: }
495: } catch (InterruptedException e) {
496:
497: } finally {
498: mutex.release();
499: }
500: }
501:
502: /**
503: * resets the queue.
504: * This operation removes all the objects in the queue and marks the queue open
505: */
506: public void reset() {
507: num_markers = 0;
508: if (!closed)
509: close(false);
510:
511: try {
512: mutex.acquire();
513: size = 0;
514: head = null;
515: tail = null;
516: closed = false;
517: } catch (InterruptedException e) {
518:
519: } finally {
520: mutex.release();
521: }
522: }
523:
524: /**
525: * returns the number of objects that are currently in the queue
526: */
527: public int size() {
528: return size - num_markers;
529: }
530:
531: /**
532: * prints the size of the queue
533: */
534: public String toString() {
535: return "Queue2 (" + size() + ") messages";
536: }
537:
538: /**
539: * Dumps internal state @remove
540: */
541: public String debug() {
542: return toString() + ", head=" + head + ", tail=" + tail
543: + ", closed()=" + closed() + ", contents="
544: + getContents();
545: }
546:
547: /**
548: * returns a vector with all the objects currently in the queue
549: */
550: public Vector getContents() {
551: Vector retval = new Vector();
552: Element el;
553:
554: try {
555: mutex.acquire();
556: el = head;
557: while (el != null) {
558: retval.addElement(el.obj);
559: el = el.next;
560: }
561: } catch (InterruptedException e) {
562:
563: } finally {
564: mutex.release();
565: }
566: return retval;
567: }
568:
569: /**
570: * Blocks until the queue has no elements left. If the queue is empty, the call will return
571: * immediately
572: * @param timeout Call returns if timeout has elapsed (number of milliseconds). 0 means to wait forever
573: * @throws QueueClosedException Thrown if queue has been closed
574: * @throws TimeoutException Thrown if timeout has elapsed
575: */
576: public void waitUntilEmpty(long timeout)
577: throws QueueClosedException, TimeoutException {
578: long time_to_wait = timeout >= 0 ? timeout : 0; // eliminate negative values
579:
580: try {
581: mutex.acquire();
582:
583: if (timeout == 0) {
584: while (size > 0 && closed == false) {
585: remove_condvar.await();
586: }
587: } else {
588: long start_time = System.currentTimeMillis();
589:
590: while (time_to_wait > 0 && size > 0 && closed == false) {
591: try {
592: remove_condvar.timedwait(time_to_wait);
593: } catch (InterruptedException ex) {
594:
595: }
596: time_to_wait = timeout
597: - (System.currentTimeMillis() - start_time);
598: }
599:
600: if (size > 0)
601: throw new TimeoutException("queue has " + size
602: + " elements");
603: }
604:
605: if (closed)
606: throw new QueueClosedException();
607: } catch (InterruptedException e) {
608:
609: } finally {
610: mutex.release();
611: }
612: }
613:
614: /* ------------------------------------- Private Methods ----------------------------------- */
615:
616: /**
617: * Removes the first element. Returns null if no elements in queue.
618: * Always called with mutex locked (we don't have to lock mutex ourselves)
619: */
620: private Object removeInternal() {
621: Element retval;
622:
623: /*if the head is null, the queue is empty*/
624: if (head == null)
625: return null;
626:
627: retval = head; // head must be non-null now
628:
629: head = head.next;
630: if (head == null)
631: tail = null;
632:
633: decrementSize();
634:
635: remove_condvar.broadcast(); // todo: correct ?
636:
637: if (head != null && head.obj == endMarker) {
638: closed = true;
639: }
640:
641: retval.next = null;
642: return retval.obj;
643: }
644:
645: void decrementSize() {
646: size--;
647: if (size < 0)
648: size = 0;
649: }
650:
651: /* ---------------------------------- End of Private Methods -------------------------------- */
652:
653: }
|