001: /*
002: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003: *
004: * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
005: *
006: * The contents of this file are subject to the terms of either the GNU
007: * General Public License Version 2 only ("GPL") or the Common
008: * Development and Distribution License("CDDL") (collectively, the
009: * "License"). You may not use this file except in compliance with the
010: * License. You can obtain a copy of the License at
011: * http://www.netbeans.org/cddl-gplv2.html
012: * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the
013: * specific language governing permissions and limitations under the
014: * License. When distributing the software, include this License Header
015: * Notice in each file and include the License file at
016: * nbbuild/licenses/CDDL-GPL-2-CP. Sun designates this
017: * particular file as subject to the "Classpath" exception as provided
018: * by Sun in the GPL Version 2 section of the License file that
019: * accompanied this code. If applicable, add the following below the
020: * License Header, with the fields enclosed by brackets [] replaced by
021: * your own identifying information:
022: * "Portions Copyrighted [year] [name of copyright owner]"
023: *
024: * Contributor(s):
025: *
026: * The Original Software is NetBeans. The Initial Developer of the Original
027: * Software is Sun Microsystems, Inc. Portions Copyright 1997-2006 Sun
028: * Microsystems, Inc. All Rights Reserved.
029: *
030: * If you wish your version of this file to be governed by only the CDDL
031: * or only the GPL Version 2, indicate your decision by adding
032: * "[Contributor] elects to include this software in this distribution
033: * under the [CDDL or GPL Version 2] license." If you do not indicate a
034: * single choice of license, a recipient has the option to distribute
035: * your version of this file under either the CDDL, the GPL Version 2 or
036: * to extend the choice of license to its licensees as provided above.
037: * However, if you add GPL Version 2 code and therefore, elected the GPL
038: * Version 2 license, then the option applies only if the new code is
039: * made subject to such option by the copyright holder.
040: */
041:
042: package org.netbeans.lib.collab.util;
043:
044: import java.util.ArrayList;
045: import java.util.LinkedList;
046:
047: /**
048: *
049: */
050: public class Worker implements Runnable {
051:
052: private LinkedList queue = new LinkedList();
053: private ArrayList threads;
054: private volatile boolean RUN = true;
055: private Runnable _lastRunnable = null;
056: private int maxThreads = 1;
057: private int waitingThreads;
058: private String thrNamePrefix = "org.netbeans.lib.collab.util.Worker";
059: private int _capacity = -1;
060:
061: private static final boolean DEBUG = false;
062:
063: /**
064: *
065: *
066: * @param
067: */
068: public Worker() {
069: init(maxThreads, maxThreads, -1, null);
070: }
071:
072: /**
073: * creates a Thread Pool
074: * @param threadCnt number of threads in this pool
075: */
076: public Worker(int threadCnt) {
077: init(threadCnt, threadCnt, -1, null);
078: }
079:
080: /**
081: * creates a Thread Pool
082: * @param minThreads initial number of threads
083: * @param maxThreads maximum number of threads
084: * In this implementation the number of threads in the pool
085: * never goes downn.
086: */
087: public Worker(int minThreads, int maxThreads) {
088: init(minThreads, maxThreads, -1, null);
089: }
090:
091: /**
092: * creates a Thread Pool
093: * @param minThreads initial number of threads
094: * @param maxThreads maximum number of threads
095: * @param capacity maximum number of pending jobs in the
096: * queue. if the queue size reaches this value,
097: * addRunnable blocks until the queue goes back below capacity.
098: */
099: public Worker(int minThreads, int maxThreads, int capacity) {
100: init(minThreads, maxThreads, capacity, null);
101: }
102:
103: /**
104: * creates a Thread Pool
105: * @param minThreads initial number of threads
106: * @param maxThreads maximum number of threads
107: * @param capacity maximum number of pending jobs in the
108: * queue. if the queue size reaches this value,
109: * addRunnable blocks until the queue goes back below capacity.
110: * @param thrNamePrefix prefix for pool thread names. This is useful
111: * for diagnostic if you have multiple Worker pools.
112: */
113: public Worker(int minThreads, int maxThreads, int capacity,
114: String thrNamePrefix) {
115: init(minThreads, maxThreads, capacity, thrNamePrefix);
116: }
117:
118: /**
119: * creates a Thread Pool
120: * @param minThreads initial number of threads
121: * @param maxThreads maximum number of threads
122: * @param thrNamePrefix prefix for pool thread names. This is useful
123: * for diagnostic if you have multiple Worker pools.
124: */
125: public Worker(int minThreads, int maxThreads, String thrNamePrefix) {
126: init(minThreads, maxThreads, -1, thrNamePrefix);
127: }
128:
129: private void init(int minThreads, int maxThreads, int capacity,
130: String thrNamePrefix) {
131: _capacity = capacity;
132: if (maxThreads < minThreads) {
133: this .maxThreads = minThreads;
134: } else {
135: this .maxThreads = maxThreads;
136: }
137:
138: if (thrNamePrefix != null) {
139: this .thrNamePrefix = thrNamePrefix;
140: }
141: threads = new ArrayList(maxThreads);
142: synchronized (this ) {
143: for (int i = 0; i < minThreads; i++) {
144: Thread t = new Thread(this , this .thrNamePrefix + " "
145: + i);
146: threads.add(t);
147: t.start();
148: waitingThreads++;
149: }
150: }
151: }
152:
153: /**
154: * @param r runnable to run
155: * @return number of elements in the job queue, including the
156: * one added by this method
157: */
158: public synchronized int addRunnable(Runnable r) {
159: if (!RUN)
160: return queue.size();
161:
162: queue.addLast(r);
163: if (waitingThreads > 0) {
164: notify();
165: }
166: if (threads.size() < maxThreads
167: && waitingThreads < queue.size()) {
168: Thread t = new Thread(this , thrNamePrefix + " "
169: + threads.size());
170: threads.add(t);
171: t.start();
172: // bump the count.
173: waitingThreads++;
174: }
175: return queue.size();
176: }
177:
178: /**
179: * @param r runnable to run
180: * @return number of elements in the job queue, including the
181: * one added by this method
182: */
183: public synchronized int addRunnableIfPossible(Runnable r) {
184: if (!RUN)
185: return queue.size();
186:
187: if (_capacity > maxThreads && queue.size() >= _capacity) {
188: Thread.yield();
189: return -1;
190: }
191:
192: return addRunnable(r);
193: }
194:
195: public synchronized boolean isFull() {
196: return (_capacity > maxThreads && queue.size() >= _capacity);
197: }
198:
199: public synchronized int backlog() {
200: return queue.size();
201: }
202:
203: /**
204: *
205: *
206: * @param
207: */
208: public void run() {
209: synchronized (this ) {
210: waitingThreads--;
211: }
212:
213: while (RUN) {
214: Runnable r = null;
215: synchronized (this ) {
216: while (queue.size() == 0) {
217: try {
218: waitingThreads++;
219: wait();
220: } catch (InterruptedException ex) {
221: ex.printStackTrace();
222: } finally {
223: waitingThreads--;
224: }
225: if (!RUN) {
226: return;
227: }
228: }
229: r = (Runnable) queue.removeFirst();
230: if (_lastRunnable == r) {
231: RUN = false;
232: // Wake up all threads.
233: notifyAll();
234: }
235: }
236:
237: try {
238: if (r != null) {
239: Thread t = null;
240: String oldName = null;
241: if (DEBUG) {
242: t = Thread.currentThread();
243: oldName = t.getName();
244: t.setName(r.toString());
245: }
246: r.run();
247: if (DEBUG) {
248: t.setName(oldName);
249: }
250: }
251: } catch (Throwable t) {
252: t.printStackTrace();
253: }
254: }
255: }
256:
257: /**
258: * Stops and joins all threads of this worker
259: */
260: final public void stop() {
261: RUN = false;
262: synchronized (this ) {
263: notifyAll();
264: }
265: }
266:
267: /**
268: * Stop the worker thread with this last runnable object
269: * @param runnable last job that will be run. After this runnable
270: * is dequeued, no other job is dequeued or enqueued.
271: */
272: final public void stop(Runnable r) {
273: addRunnable(_lastRunnable = r);
274: }
275:
276: //for test
277: ArrayList getThreads() {
278: return threads;
279: }
280:
281: @Override
282: public String toString() {
283: return thrNamePrefix + ":" + super.toString();
284: }
285:
286: }
|