001: /*
002: * The Unified Mapping Platform (JUMP) is an extensible, interactive GUI
003: * for visualizing and manipulating spatial features with geometry and attributes.
004: *
005: * Copyright (C) 2003 Vivid Solutions
006: * Copyright (C) 2007 Intevation GmbH
007: *
008: * This program is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU General Public License
010: * as published by the Free Software Foundation; either version 2
011: * of the License, or (at your option) any later version.
012: *
013: * This program is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
016: * GNU General Public License for more details.
017: *
018: * You should have received a copy of the GNU General Public License
019: * along with this program; if not, write to the Free Software
020: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
021: *
022: * Suite #1A
023: * 2328 Government Street
024: * Victoria BC V8T 5G5
025: * Canada
026: *
027: * (250)385-6040
028: * www.vividsolutions.com
029: */
030: package com.vividsolutions.jump.workbench.ui.renderer;
031:
032: import java.util.LinkedList;
033: import java.util.ArrayList;
034:
035: /**
036: * This thread queue executes at maximum N Runnables in parallel
037: * were N is a given number of worker threads that should be used.
038: * If N threads are running and busy each further incoming
039: * Runnable is queued until one of the threads has finished its current job.
040: * If a worker thread becomes idle (no more job in the queue)
041: * it is hold alive for 5 seconds. If during this period of time
042: * no new Runnable is enqueued the worker thread dies.
043: *
044: * @author Sascha L. Teichmann (sascha.teichmann@intevation.de)
045: */
046: public class ThreadQueue {
047: /** The time a worker thread stays alive if idle */
048: public static final long WORKER_STAY_ALIVE_TIME = 5000L;
049:
050: /**
051: * Worker thread. Fetches Runnable from the surrounding
052: * ThreadQueue instance.
053: */
054: protected class Worker extends Thread {
055: public void run() {
056: try {
057: for (;;) {
058: Runnable runnable;
059:
060: synchronized (queuedRunnables) {
061: if (queuedRunnables.isEmpty()) {
062:
063: ++waitingThreads;
064: try {
065: queuedRunnables
066: .wait(WORKER_STAY_ALIVE_TIME);
067: } catch (InterruptedException ie) {
068: } finally {
069: --waitingThreads;
070: }
071:
072: // if still empty -> die!
073: if (queuedRunnables.isEmpty())
074: break;
075: }
076: if (disposed)
077: break;
078: runnable = (Runnable) queuedRunnables.remove();
079: } // synchronized queuedRunnables
080:
081: try {
082: runnable.run();
083: } catch (Exception e) {
084: e.printStackTrace();
085: }
086:
087: // check if we are the last of the mohicans ...
088: boolean lastRunningThread;
089: synchronized (runningThreads) {
090: lastRunningThread = runningThreads[0] == 1;
091: }
092: if (lastRunningThread) {
093: fireAllRunningThreadsFinished();
094: }
095: } // for (;;)
096: } finally { // guarantee that counter goes down
097: synchronized (runningThreads) {
098: --runningThreads[0];
099: }
100: }
101: }
102: } // class Worker
103:
104: /**
105: * If the number of running threads goes down to zero
106: * implementations of this interface are able to be informed.
107: */
108: public interface Listener {
109: void allRunningThreadsFinished();
110: } // interface Listener
111:
112: /** Number of running threads */
113: protected int[] runningThreads = new int[1];
114:
115: /** max. Number of threads running parallel */
116: protected int maxRunningThreads;
117:
118: /** Number of threads that are currently idle */
119: protected int waitingThreads;
120:
121: /** The queue of Runnables jobs waiting to be run */
122: protected LinkedList queuedRunnables;
123:
124: /** Singals that the ThreadQueue is going to quit */
125: protected boolean disposed;
126:
127: /** List of Listeners */
128: protected ArrayList listeners = new ArrayList();
129:
130: /**
131: * Creates a ThreadQueue with one worker thread.
132: */
133: public ThreadQueue() {
134: this (1);
135: }
136:
137: /** Creates a ThreadQueue with a given number of worker threads.
138: * @param maxRunningThreads the max. number of threads to be run parallel.
139: */
140: public ThreadQueue(int maxRunningThreads) {
141: this .maxRunningThreads = Math.max(1, maxRunningThreads);
142: queuedRunnables = new LinkedList();
143: }
144:
145: /**
146: * Adds a Listener to this ThreadQueue.
147: * @param listener the listener to add.
148: */
149: public synchronized void add(Listener listener) {
150: if (listener != null)
151: listeners.add(listener);
152: }
153:
154: /**
155: * Removes a Listener from this ThreadQueue.
156: * @param listener the listener to be removed.
157: */
158: public synchronized void remove(Listener listener) {
159: if (listener != null)
160: listeners.remove(listener);
161: }
162:
163: /**
164: * Informs Listeners of the fact that the number of running threads
165: * went to zero.
166: */
167: protected void fireAllRunningThreadsFinished() {
168: ArrayList copy;
169: synchronized (this ) {
170: copy = new ArrayList(listeners);
171: }
172: for (int i = copy.size() - 1; i >= 0; --i)
173: ((Listener) copy.get(i)).allRunningThreadsFinished();
174: }
175:
176: /**
177: * The number of currently running worker threads.
178: * @return number of currently running worker threads.
179: */
180: public int runningThreads() {
181: synchronized (runningThreads) {
182: return runningThreads[0];
183: }
184: }
185:
186: /**
187: * The number of currently running worker threads.
188: * Alias for runningThreads()
189: * @return number of currently running worker threads.
190: */
191: public int getRunningThreads() {
192: return runningThreads();
193: }
194:
195: /**
196: * The number of currently waiting Runnables.
197: * @return number of currently waiting Runnables.
198: */
199: public int waitingRunnables() {
200: synchronized (runningThreads) {
201: return queuedRunnables.size();
202: }
203: }
204:
205: /**
206: * The number of currently idle worker threads.
207: * @return number of currently idle worker threads.
208: */
209: public int waitingThreads() {
210: synchronized (queuedRunnables) {
211: return waitingThreads;
212: }
213: }
214:
215: /**
216: * Adds a Runnables to the queue. It will be run in one
217: * of the worker threads.
218: * @param runnable The Runnables to add
219: */
220: public void add(Runnable runnable) {
221: int waiting;
222: synchronized (queuedRunnables) {
223: if (disposed)
224: return;
225: waiting = waitingThreads;
226: queuedRunnables.add(runnable);
227: queuedRunnables.notify();
228: } // synchronized (queuedRunnables)
229:
230: synchronized (runningThreads) {
231:
232: // if waitingThreads == 1 then
233: // the queuedRunnables.notify() should have waked it up.
234:
235: if (waitingThreads < 2
236: && runningThreads[0] < maxRunningThreads) {
237: ++runningThreads[0];
238: Worker w = new Worker();
239: w.setDaemon(true);
240: w.start();
241: }
242: } // synchronized (runningThreads)
243: }
244:
245: /**
246: * Empties the queue of waiting Runnables.
247: */
248: public void clear() {
249: synchronized (queuedRunnables) {
250: queuedRunnables.clear();
251: }
252: }
253:
254: /**
255: * Shuts down the ThreadQueue.
256: */
257: public void dispose() {
258: synchronized (queuedRunnables) {
259: disposed = true;
260: queuedRunnables.clear();
261: // wakeup idle threads
262: queuedRunnables.notifyAll();
263: }
264: synchronized (this ) {
265: listeners.clear();
266: }
267: }
268: }
269: // end of file
|