001: /*_############################################################################
002: _##
003: _## SNMP4J - ThreadPool.java
004: _##
005: _## Copyright (C) 2003-2008 Frank Fock and Jochen Katz (SNMP4J.org)
006: _##
007: _## Licensed under the Apache License, Version 2.0 (the "License");
008: _## you may not use this file except in compliance with the License.
009: _## You may obtain a copy of the License at
010: _##
011: _## http://www.apache.org/licenses/LICENSE-2.0
012: _##
013: _## Unless required by applicable law or agreed to in writing, software
014: _## distributed under the License is distributed on an "AS IS" BASIS,
015: _## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: _## See the License for the specific language governing permissions and
017: _## limitations under the License.
018: _##
019: _##########################################################################*/
020:
021: package org.snmp4j.util;
022:
023: import java.util.*;
024:
025: /**
026: * The <code>ThreadPool</code> provides a pool of a fixed number of threads
027: * that are capable to execute tasks that implement the <code>Runnable</code>
028: * interface concurrently. The ThreadPool blocks when all threads are busy
029: * with tasks and an additional task is added.
030: *
031: * @author Frank Fock
032: * @version 1.6
033: * @since 1.0.2
034: */
035: public class ThreadPool implements WorkerPool {
036:
037: protected Vector taskManagers;
038: protected String name = "ThreadPool";
039: protected volatile boolean stop = false;
040: protected boolean respawnThreads = false;
041:
042: protected ThreadPool() {
043: }
044:
045: protected String getTaskManagerName(String prefix, int index) {
046: return prefix + "." + index;
047: }
048:
049: protected void setup(String name, int size) {
050: this .name = name;
051: taskManagers = new Vector(size);
052: for (int i = 0; i < size; i++) {
053: TaskManager tm = new TaskManager(
054: getTaskManagerName(name, i));
055: taskManagers.add(tm);
056: tm.start();
057: }
058: }
059:
060: /**
061: * Creates a thread pool with the supplied name and size.
062: * @param name
063: * the name prefix for the threads in this pool.
064: * @param size
065: * the number of threads in this pool. This number also specifies the
066: * number of concurrent tasks that can be executed with this pool.
067: * @return
068: * a <code>ThreadPool</code> instance.
069: */
070: public static ThreadPool create(String name, int size) {
071: ThreadPool pool = new ThreadPool();
072: pool.setup(name, size);
073: return pool;
074: }
075:
076: /**
077: * Executes a task on behalf of this thread pool. If all threads are currently
078: * busy, this method call blocks until a thread gets idle again which is when
079: * the call returns immediately.
080: * @param task
081: * a <code>Runnable</code> to execute.
082: */
083: public synchronized void execute(WorkerTask task) {
084: while (true) {
085: for (int i = 0; i < taskManagers.size(); i++) {
086: TaskManager tm = (TaskManager) taskManagers.get(i);
087: if ((respawnThreads) && (!tm.isAlive())) {
088: tm = new TaskManager(getTaskManagerName(name, i));
089: }
090: if (tm.isIdle()) {
091: tm.execute(task);
092: return;
093: }
094: }
095: try {
096: wait();
097: } catch (InterruptedException ex) {
098: // ignore
099: }
100: }
101: }
102:
103: /**
104: * Tries to execute a task on behalf of this thread pool. If all threads are
105: * currently busy, this method returns <code>false</code>. Otherwise the task
106: * is executed in background.
107: * @param task
108: * a <code>Runnable</code> to execute.
109: * @return
110: * <code>true</code> if the task is executing.
111: * @since 1.6
112: */
113: public synchronized boolean tryToExecute(WorkerTask task) {
114: for (int i = 0; i < taskManagers.size(); i++) {
115: TaskManager tm = (TaskManager) taskManagers.get(i);
116: if ((respawnThreads) && (!tm.isAlive())) {
117: tm = new TaskManager(getTaskManagerName(name, i));
118: }
119: if (tm.isIdle()) {
120: tm.execute(task);
121: return true;
122: }
123: }
124: return false;
125: }
126:
127: /**
128: * Tests if the threads are respawn (recreates) when they have been stopped
129: * or canceled.
130: * @return
131: * <code>true</code> if threads are respawn.
132: */
133: public boolean isRespawnThreads() {
134: return respawnThreads;
135: }
136:
137: /**
138: * Specifies whether threads are respawned by this thread pool after they
139: * have been stopped or not. Default is no respawning.
140: * @param respawnThreads
141: * if <code>true</code> then threads will be respawn.
142: */
143: public void setRespawnThreads(boolean respawnThreads) {
144: this .respawnThreads = respawnThreads;
145: }
146:
147: /**
148: * Returns the name of the thread pool.
149: * @return
150: * the name of this thread pool.
151: */
152: public String getName() {
153: return name;
154: }
155:
156: /**
157: * Stops all threads in this thread pool gracefully. This method will not
158: * return until all threads have been terminated and joined successfully.
159: */
160: public void stop() {
161: List tms;
162: synchronized (this ) {
163: stop = true;
164: tms = (List) taskManagers.clone();
165: }
166: for (int i = 0; i < tms.size(); i++) {
167: TaskManager tm = (TaskManager) tms.get(i);
168: tm.terminate();
169: synchronized (tm) {
170: tm.notify();
171: }
172: try {
173: tm.join();
174: } catch (InterruptedException ex) {
175: //ignore
176: }
177: }
178: }
179:
180: /**
181: * Cancels all threads non-blocking by interrupting them.
182: */
183: public synchronized void cancel() {
184: stop = true;
185: for (int i = 0; i < taskManagers.size(); i++) {
186: TaskManager tm = (TaskManager) taskManagers.get(i);
187: tm.terminate();
188: tm.interrupt();
189: }
190: }
191:
192: /**
193: * Interrupts all threads in the pool.
194: * @since 1.6
195: */
196: public synchronized void interrupt() {
197: for (int i = 0; i < taskManagers.size(); i++) {
198: TaskManager tm = (TaskManager) taskManagers.get(i);
199: tm.interrupt();
200: }
201: }
202:
203: /**
204: * Checks if all threads of the pool are idle.
205: * @return
206: * <code>true</code> if all threads are idle.
207: * @since 1.6
208: */
209: public synchronized boolean isIdle() {
210: for (int i = 0; i < taskManagers.size(); i++) {
211: TaskManager tm = (TaskManager) taskManagers.get(i);
212: if (!tm.isIdle()) {
213: return false;
214: }
215: }
216: return true;
217: }
218:
219: /**
220: * The <code>TaskManager</code> executes tasks in a thread.
221: *
222: * @author Frank Fock
223: * @version 1.9
224: * @since 1.0.2
225: */
226: class TaskManager extends Thread {
227:
228: private WorkerTask task = null;
229: private volatile boolean run = true;
230:
231: public TaskManager(String name) {
232: super (name);
233: }
234:
235: public synchronized void run() {
236: while ((!stop) && run) {
237: if (task != null) {
238: task.run();
239: synchronized (ThreadPool.this ) {
240: task = null;
241: ThreadPool.this .notify();
242: }
243: } else {
244: try {
245: wait();
246: } catch (InterruptedException ex) {
247: run = respawnThreads;
248: break;
249: }
250: }
251: }
252: }
253:
254: public boolean isIdle() {
255: return ((task == null) && run);
256: }
257:
258: public boolean isStopped() {
259: return stop;
260: }
261:
262: public void terminate() {
263: stop = true;
264: WorkerTask t;
265: if ((t = task) != null) {
266: t.terminate();
267: }
268: }
269:
270: public synchronized void execute(WorkerTask task) {
271: if (this .task == null) {
272: this .task = task;
273: notify();
274: } else {
275: throw new IllegalStateException(
276: "TaskManager is not idle");
277: }
278: }
279: }
280: }
|