001: /*
002: * Jython Database Specification API 2.0
003: *
004: * $Id: Queue.java 2414 2005-02-23 04:26:23Z bzimmer $
005: *
006: * Copyright (c) 2001 brian zimmer <bzimmer@ziclix.com>
007: *
008: */
009: package com.ziclix.python.sql.util;
010:
011: import java.util.LinkedList;
012:
013: /**
014: * This queue blocks until closed or an element is enqueued. If the queue
015: * reaches capacity, the dequeue thread gets priority in order to bring the
016: * queue size under a certain threshold.
017: *
018: * @author brian zimmer
019: * @version $Revision: 2414 $
020: */
021: public class Queue {
022:
023: /**
024: * Field closed
025: */
026: protected boolean closed;
027:
028: /**
029: * Field queue
030: */
031: protected LinkedList queue;
032:
033: /**
034: * Field capacity, threshold
035: */
036: protected int capacity, threshold;
037:
038: /**
039: * Instantiate a blocking queue with no bounded capacity.
040: */
041: public Queue() {
042: this (0);
043: }
044:
045: /**
046: * Instantiate a blocking queue with the specified capacity.
047: */
048: public Queue(int capacity) {
049:
050: this .closed = false;
051: this .capacity = capacity;
052: this .queue = new LinkedList();
053: this .threshold = (int) (this .capacity * 0.75f);
054: }
055:
056: /**
057: * Enqueue an object and notify all waiting Threads.
058: */
059: public synchronized void enqueue(Object element)
060: throws InterruptedException {
061:
062: if (closed) {
063: throw new QueueClosedException();
064: }
065:
066: this .queue.addLast(element);
067: this .notify();
068:
069: /*
070: * Block while the capacity of the queue has been breached.
071: */
072: while ((this .capacity > 0)
073: && (this .queue.size() >= this .capacity)) {
074: this .wait();
075:
076: if (closed) {
077: throw new QueueClosedException();
078: }
079: }
080: }
081:
082: /**
083: * Blocks until an object is dequeued or the queue is closed.
084: */
085: public synchronized Object dequeue() throws InterruptedException {
086:
087: while (this .queue.size() <= 0) {
088: this .wait();
089:
090: if (closed) {
091: throw new QueueClosedException();
092: }
093: }
094:
095: Object object = this .queue.removeFirst();
096:
097: // if space exists, notify the other threads
098: if (this .queue.size() < this .threshold) {
099: this .notify();
100: }
101:
102: return object;
103: }
104:
105: /**
106: * Close the queue and notify all waiting Threads.
107: */
108: public synchronized void close() {
109:
110: this .closed = true;
111:
112: this.notifyAll();
113: }
114: }
|