001: /*
002: *
003: * Jsmtpd, Java SMTP daemon
004: * Copyright (C) 2005 Jean-Francois POUX, jf.poux@laposte.net
005: *
006: * This program is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU General Public License
008: * as published by the Free Software Foundation; either version 2
009: * of the License, or (at your option) any later version.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
014: * GNU General Public License for more details.
015: *
016: * You should have received a copy of the GNU General Public License
017: * along with this program; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
019: *
020: */
021: package org.jsmtpd.generic.threadpool;
022:
023: import java.util.Iterator;
024: import java.util.LinkedList;
025:
026: import org.apache.commons.logging.Log;
027: import org.apache.commons.logging.LogFactory;
028:
029: /**
030: * A generic, fixed-size thread pooler
031: * each not busy thread are kept waiting
032: *
033: * Things to do to add auto-grow ability :
034: * Record number of active th over time (at each assign free th, or a controling thread)
035: * if thread pool is inactive during a while, and curent size is bigger than min size,
036: * try to remove free thds from the pool. => synchronize the collection
037: *
038: *
039: * Add a max thread int
040: * add min thread int
041: *
042: *
043: * Instead of throwing an exception directly when pool is exhausted
044: * try to increase by 10% the number of thd, if under max th., then re assign a thread.
045: *
046: *
047: *
048: * @author Jean-Francois POUX
049: * @see org.jsmtpd.generic.threadpool.IThreadedClass
050: */
051: public class GenericThreadPool implements ThreadPool {
052: private Log log = LogFactory.getLog(GenericThreadPool.class);
053: private LinkedList<ThreadWorker> threads = new LinkedList<ThreadWorker>();
054:
055: /**
056: *
057: * @param numThreads number of threads to be spawned
058: * @param threadClassName name of the class to be threaded, must impletement IThreadedClass
059: * @throws InstantiationException
060: * @throws IllegalAccessException
061: * @throws ClassNotFoundException
062: */
063: public GenericThreadPool(int numThreads, String threadClassName,
064: String displayThreadName) throws InstantiationException,
065: IllegalAccessException, ClassNotFoundException {
066: ThreadWorker tmp;
067: IThreadedClass cls;
068: log
069: .debug("Starting a fixed pool of " + numThreads
070: + " threads");
071: for (int i = 0; i < numThreads; i++) {
072: tmp = new ThreadWorker();
073: cls = (IThreadedClass) Class.forName(threadClassName)
074: .newInstance();
075: tmp.setWorker(cls);
076: tmp.setName(displayThreadName + "#" + tmp.getId());
077: tmp.start();
078: while (!tmp.isFree()) {
079: //wait for thread to be up
080: Thread.yield();
081: log.debug("Thread " + tmp.getName() + " ready");
082: }
083: threads.add(tmp);
084: }
085: }
086:
087: /**
088: * Will gracefully shutdown each running thread
089: *
090: */
091: public void gracefullShutdown() {
092: log.debug("Gracefull shutdown ...");
093: ThreadWorker tmp;
094: for (int i = 0; i < threads.size(); i++) {
095: tmp = (ThreadWorker) threads.get(i);
096: tmp.gracefullShutdown();
097: }
098: }
099:
100: /**
101: * Will force each thread to shutdown
102: *
103: */
104: public void forceShutdown() {
105: log.debug("Forcing shutdown ...");
106: ThreadWorker tmp;
107: for (int i = 0; i < threads.size(); i++) {
108: tmp = (ThreadWorker) threads.get(i);
109: tmp.forceShutdown();
110: }
111: }
112:
113: /**
114: *
115: * @return true if any free thread
116: */
117: public synchronized boolean hasFreeThread() {
118: for (Iterator iter = threads.iterator(); iter.hasNext();) {
119: ThreadWorker element = (ThreadWorker) iter.next();
120: if (element.isFree())
121: return true;
122: }
123: return false;
124: }
125:
126: /**
127: *
128: */
129: public synchronized int countFreeThread() {
130: int count = 0;
131: for (Iterator iter = threads.iterator(); iter.hasNext();) {
132: ThreadWorker element = (ThreadWorker) iter.next();
133: if (element.isFree())
134: count++;
135: }
136: return count;
137: }
138:
139: /**
140: * passes the obj parameter to the thread instance, and runs its doJob mehtod
141: * @param obj the object to pass
142: * @throws BusyThreadPoolException when the pool is exhausted
143: */
144: public synchronized void assignFreeThread(Object obj)
145: throws BusyThreadPoolException {
146: int i = 0;
147: for (Iterator iter = threads.iterator(); iter.hasNext();) {
148: ThreadWorker element = (ThreadWorker) iter.next();
149: if (element.isFree()) {
150: log.debug("Worker " + element.getName()
151: + " is free, assigning job");
152: element.setParam(obj);
153: element.wake();
154: return;
155: }
156: i++;
157: }
158: log.warn("Thread pool exhausted !");
159: throw new BusyThreadPoolException();
160: }
161:
162: }
|