001: /*
002: * CoadunationLib: The coadunation libraries.
003: * Copyright (C) 2007 Rift IT Contracting
004: *
005: * This library is free software; you can redistribute it and/or
006: * modify it under the terms of the GNU Lesser General Public
007: * License as published by the Free Software Foundation; either
008: * version 2.1 of the License, or (at your option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
013: * Lesser General Public License for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public
016: * License along with this library; if not, write to the Free Software
017: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
018: *
019: * ThreadPoolManager.java
020: */
021:
022: // package path
023: package com.rift.coad.lib.thread.pool;
024:
025: // java imports
026: import java.util.Vector;
027: import java.util.Iterator;
028: import java.util.concurrent.atomic.AtomicInteger;
029:
030: // logging import
031: import org.apache.log4j.Logger;
032:
033: // coadunation imports
034: import com.rift.coad.lib.common.ClassUtil;
035: import com.rift.coad.lib.thread.CoadunationThread;
036: import com.rift.coad.lib.thread.ThreadStateMonitor;
037:
038: /**
039: * This object is responsible for managing a pool of threads assigned to process
040: * a task object.
041: *
042: * @author Brett Chaldecott
043: */
044: public class ThreadPoolManager {
045:
046: /**
047: * This class is responsible for processing the tasks.
048: */
049: public class PoolThread extends CoadunationThread {
050:
051: // The classes private member variables
052: private ThreadStateMonitor state = new ThreadStateMonitor();
053: private ThreadPoolManager threadPoolManager = null;
054: private Class taskClass = null;
055:
056: /**
057: * The constructor of pool thread.
058: *
059: * @param threadPool The reference to the thread pool.
060: * @param taskClass The task object to process.
061: * @exception Exception
062: */
063: public PoolThread(ThreadPoolManager threadPoolManager,
064: Class taskClass) throws Exception {
065: this .threadPoolManager = threadPoolManager;
066: this .taskClass = taskClass;
067: }
068:
069: /**
070: * This method replaces the run method in the BasicThread.
071: *
072: * @exception Exception
073: */
074: public void process() throws Exception {
075: while (!state.isTerminated()) {
076: if (!monitor()) {
077: break;
078: }
079: try {
080: Task task = (Task) taskClass.newInstance();
081: task.process(threadPoolManager);
082: } catch (Exception ex) {
083: log.error("Failed to process a task : "
084: + ex.getMessage(), ex);
085: }
086: processing.decrementAndGet();
087: }
088: removeThread(this );
089: log.debug("Pool thread exiting");
090: }
091:
092: /**
093: * This method will be implemented by child objects to terminate the
094: * processing of this thread.
095: */
096: public void terminate() {
097: state.terminate(true);
098: }
099:
100: }
101:
102: // the logger reference
103: protected Logger log = Logger.getLogger(ThreadPoolManager.class
104: .getName());
105:
106: // privat member variables
107: private AtomicInteger processing = new AtomicInteger(0);
108: private int currentSize = 0;
109: private int minSize = 0;
110: private int maxSize = 0;
111: private Class taskClass = null;
112: private String username = null;
113: private Vector threadList = new Vector();
114: private ThreadStateMonitor state = new ThreadStateMonitor();
115: private int releaseThread = 1;
116:
117: /**
118: * Creates a new instance of ThreadPoolManager
119: *
120: * @param size The size of this thread pool.
121: * @param taskClass The class that implements the task interface.
122: * @param username The name of the user that the threads will run as.
123: * @exception PoolException
124: */
125: public ThreadPoolManager(int size, Class taskClass, String username)
126: throws PoolException {
127: validateTask(taskClass);
128: this .minSize = size;
129: this .maxSize = size;
130: this .taskClass = taskClass;
131: this .username = username;
132: startThreads(minSize);
133: }
134:
135: /**
136: * Creates a new instance of ThreadPoolManager
137: *
138: * @param minSize The minimum size of this thread pool.
139: * @param maxSize The maximum size of this thread pool.
140: * @param taskClass The class that implements the task interface.
141: * @param username The name of the user that the threads will run as.
142: * @exception PoolException
143: */
144: public ThreadPoolManager(int minSize, int maxSize, Class taskClass,
145: String username) throws PoolException {
146: validateTask(taskClass);
147: this .minSize = minSize;
148: this .maxSize = maxSize;
149: this .taskClass = taskClass;
150: this .username = username;
151: startThreads(minSize);
152: }
153:
154: /**
155: * This method returns the min size.
156: *
157: * @return The minimum size of the thread pool.
158: */
159: public synchronized int getMinSize() {
160: return minSize;
161: }
162:
163: /**
164: * This method sets the minum size of the thread pool.
165: *
166: * @param minSize The minimum size of the pool.
167: * @exception PoolException
168: */
169: public synchronized void setMinSize(int minSize)
170: throws PoolException {
171: checkState();
172: if (minSize > maxSize) {
173: throw new PoolException(
174: "Min size must be smaller than max size.");
175: }
176: this .minSize = minSize;
177: if (currentSize < minSize) {
178: startThreads(minSize - currentSize);
179: }
180: notifyAll();
181: }
182:
183: /**
184: * This method returns the max size of the thread pool.
185: *
186: * @return The maximum size of the thread pool.
187: */
188: public synchronized int getMaxSize() {
189: return maxSize;
190: }
191:
192: /**
193: * This method sets the maximum size of the thread pool.
194: *
195: * @param maxSize The maximum size of the thread pool.
196: */
197: public synchronized void setMaxSize(int maxSize)
198: throws PoolException {
199: checkState();
200: if (maxSize < minSize) {
201: throw new PoolException(
202: "Max size must be greater than min size.");
203: }
204: this .maxSize = maxSize;
205: notifyAll();
206: }
207:
208: /**
209: * This method returns the size of the thread pool.
210: *
211: * @return The size of the thread pool.
212: */
213: public synchronized int getSize() {
214: return maxSize;
215: }
216:
217: /**
218: * This method sets the size of the thread pool.
219: *
220: * @param size The size of the thread pool.
221: * @exception PoolException
222: */
223: public synchronized void setSize(int size) throws PoolException {
224: checkState();
225: this .minSize = size;
226: this .maxSize = size;
227: if (currentSize < size) {
228: startThreads(size - currentSize);
229: }
230: notifyAll();
231: }
232:
233: /**
234: * This method releases threads a thread from the pool.
235: *
236: * @exception PoolException
237: */
238: public synchronized void releaseThread() throws PoolException {
239: int processing = this .processing.get();
240: this .releaseThread++;
241: processing += releaseThread;
242: if (processing > minSize && processing <= maxSize) {
243: startThreads(1);
244: }
245: notify();
246: }
247:
248: /**
249: * This method is called to terminate the thread pool.
250: */
251: public void terminate() throws PoolException {
252: state.terminate(true);
253: Vector threadListCopy = null;
254: synchronized (this ) {
255: threadListCopy = new Vector(threadList);
256: }
257: for (Iterator iter = threadListCopy.iterator(); iter.hasNext();) {
258: CoadunationThread thread = (CoadunationThread) iter.next();
259: thread.terminate();
260: }
261:
262: synchronized (this ) {
263: notifyAll();
264: }
265: }
266:
267: /**
268: * This method validates the task object.
269: *
270: * @param taskClass The class to test.
271: * @exception PoolException
272: */
273: private void validateTask(Class taskClass) throws PoolException {
274: if (!ClassUtil.testForParent(taskClass, Task.class)) {
275: throw new PoolException("Task class ["
276: + taskClass.getName() + "] does not inherit from ["
277: + Task.class.getName() + "]");
278: }
279: }
280:
281: /**
282: * This method is called to start the threads
283: *
284: * @param size The number of threads to release.
285: * @exception PoolException
286: */
287: private void startThreads(int size) throws PoolException {
288: try {
289: for (int count = 0; count < size; count++) {
290: PoolThread thread = new PoolThread(this , taskClass);
291: thread.start(username);
292: addThread(thread);
293: }
294: } catch (Exception ex) {
295: log.error("Failed to start the threads : "
296: + ex.getMessage(), ex);
297: throw new PoolException("Failed to start the threads : "
298: + ex.getMessage(), ex);
299: }
300: }
301:
302: /**
303: * This method is call by the pool threads to monitor the processing.
304: *
305: * @return TRUE if processing, should continue, FALSE if not.
306: * @exception PoolException
307: */
308: private synchronized boolean monitor() throws PoolException {
309: while (true) {
310: if (currentSize > maxSize) {
311: currentSize--;
312: return false;
313: } else if (releaseThread > 0) {
314: releaseThread--;
315: processing.incrementAndGet();
316: return true;
317: } else if (currentSize > minSize) {
318: currentSize--;
319: return false;
320: } else if (state.isTerminated()) {
321: currentSize--;
322: return false;
323: }
324: try {
325: wait();
326: } catch (Exception ex) {
327: log.error("Wait failed : " + ex.getMessage());
328: }
329: }
330: }
331:
332: /**
333: * This method adds a thread to the list of threads
334: */
335: private synchronized void addThread(PoolThread thread) {
336: currentSize++;
337: threadList.add(thread);
338: }
339:
340: /**
341: * This method is called to remove a thread from the list.
342: *
343: * @param thread The thread to remove
344: */
345: private synchronized void removeThread(PoolThread thread) {
346: threadList.remove(thread);
347: }
348:
349: /**
350: * This method is used to check the state of this pool.
351: *
352: * @exception PoolException
353: */
354: private void checkState() throws PoolException {
355: if (state.isTerminated()) {
356: throw new PoolException(
357: "The thread pool has been terminated");
358: }
359: }
360: }
|