001: package org.jacorb.util.threadpool;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1999-2004 Gerald Brose
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: *
022: */
023:
024: import java.util.*;
025: import org.jacorb.config.Configuration;
026: import org.omg.CORBA.NO_RESOURCES;
027: import org.apache.avalon.framework.logger.Logger;
028:
029: /**
030: * @author Nicolas Noffke
031: * @version $Id: ThreadPool.java,v 1.21 2006/07/12 11:34:43 alphonse.bendt Exp $
032: */
033: public class ThreadPool {
034: private final int max_threads;
035: private final int max_idle_threads;
036:
037: private int total_threads = 0;
038: private int idle_threads = 0;
039:
040: private final LinkedList job_queue;
041: private final ConsumerFactory factory;
042:
043: private final String namePrefix;
044: private int threadCount = 0;
045:
046: /**
047: * <code>logger</code> is the logger for threadpool.
048: */
049: private final Logger logger;
050:
051: /**
052: * <code>shutdown</code> denotes whether to shutdown the pool.
053: */
054: private boolean shutdown;
055:
056: public ThreadPool(Configuration configuration,
057: String threadNamePrefix, ConsumerFactory factory,
058: int max_threads, int max_idle_threads) {
059: namePrefix = threadNamePrefix;
060: this .job_queue = new LinkedList();
061: this .factory = factory;
062: this .max_threads = max_threads;
063: this .max_idle_threads = max_idle_threads;
064:
065: logger = configuration.getNamedLogger("jacorb.util.tpool");
066: }
067:
068: protected synchronized Object getJob() {
069: idle_threads++;
070:
071: /*
072: * Check job queue is empty before having surplus idle threads exit,
073: * otherwise (as was done previously) if a large number of jobs get
074: * enqueued just after a large number of previously non-idle threads
075: * complete their jobs, then all the newly created threads as well as
076: * the newly non-idle threads will exit until max_idle_threads is
077: * reached before serving up any jobs, and if there are more jobs than
078: * max_idle_threads despite the fact that enough threads once existed to
079: * handle the queued jobs, the excess jobs will be blocked until the
080: * jobs taking up the max_idle_threads complete.
081: *
082: * Also, checking the idle_thread count every time the thread is
083: * notified and the job queue is empty ensures that the surplus idle
084: * threads get cleaned up more quickly, otherwise idle threads can only
085: * be cleaned up after completing a job.
086: */
087: while ((!shutdown) && job_queue.isEmpty()) {
088: /*
089: * This tells the newly idle thread to exit, because
090: * there are already too much idle threads.
091: */
092: if (idle_threads > max_idle_threads) {
093: if (logger.isDebugEnabled()) {
094: logger
095: .debug("["
096: + idle_threads
097: + "/"
098: + total_threads
099: + "] Telling thread to exit (too many idle)");
100: }
101: return getShutdownJob();
102: }
103:
104: try {
105: if (logger.isDebugEnabled()) {
106: logger.debug("[" + idle_threads + "/"
107: + total_threads + "] job queue empty");
108: }
109: wait();
110: } catch (InterruptedException e) {
111: // ignored
112: }
113: }
114: //pool is to be shut down completely
115: if (shutdown) {
116: return getShutdownJob();
117: }
118:
119: idle_threads--;
120:
121: if (logger.isDebugEnabled()) {
122: logger.debug("[" + idle_threads + "/" + total_threads
123: + "] removed idle thread (job scheduled)");
124: }
125: return job_queue.removeFirst();
126: }
127:
128: /**
129: * the returned null will cause the ConsumerTie to exit.
130: * also decrement thread counters.
131: */
132: private Object getShutdownJob() {
133: total_threads--;
134: idle_threads--;
135: return null;
136: }
137:
138: public synchronized void putJob(Object job) {
139: job_queue.add(job);
140: notifyAll();
141:
142: /*
143: * Create a new thread if there aren't enough idle threads
144: * to handle all the jobs in the queue and we haven't reached
145: * the max thread limit. This ensures that there are always
146: * enough idle threads to handle all the jobs in the queue
147: * and no jobs get stuck blocked waiting for a thread to become
148: * idle while we are still below the max thread limit.
149: */
150: if ((job_queue.size() > idle_threads)
151: && (total_threads < max_threads)) {
152: createNewThread();
153: } else if (job_queue.size() > idle_threads) {
154: // more jobs than idle threads. however
155: // thread limit reached.
156: if (logger.isDebugEnabled()) {
157: logger
158: .debug("(Pool)["
159: + idle_threads
160: + "/"
161: + total_threads
162: + "] no idle threads but maximum number of threads reached ("
163: + max_threads + ")");
164: }
165:
166: job_queue.remove(job);
167:
168: throw new NO_RESOURCES(
169: "(Pool)["
170: + idle_threads
171: + "/"
172: + total_threads
173: + "] no idle threads but maximum number of threads reached ("
174: + max_threads + ")");
175: }
176: }
177:
178: private void createNewThread() {
179: if (logger.isDebugEnabled()) {
180: logger.debug("[" + idle_threads + "/" + total_threads
181: + "] creating new thread");
182: }
183:
184: Thread thread = new Thread(new ConsumerTie(this , factory
185: .create()));
186: thread.setName(namePrefix + (threadCount++));
187: thread.setDaemon(true);
188: thread.start();
189:
190: total_threads++;
191: }
192:
193: /**
194: * <code>getLogger</code> returns the threadpools logger.
195: *
196: * @return a <code>Logger</code> value
197: */
198: Logger getLogger() {
199: return logger;
200: }
201:
202: /**
203: * <code>shutdown</code> will shutdown the pool.
204: */
205: public synchronized void shutdown() {
206: if (logger.isDebugEnabled()) {
207: logger.debug("[" + idle_threads + "/" + total_threads
208: + "] shutting down pool");
209: }
210:
211: shutdown = true;
212: notifyAll();
213: }
214: } // ThreadPool
|