001: /*
002: * @(#)ThreadPool.java 0.9.0 06/04/2000 - 15:13:54
003: *
004: * Copyright (C) 2000,,2003 2002 Matt Albrecht
005: * groboclown@users.sourceforge.net
006: * http://groboutils.sourceforge.net
007: *
008: * Permission is hereby granted, free of charge, to any person obtaining a
009: * copy of this software and associated documentation files (the "Software"),
010: * to deal in the Software without restriction, including without limitation
011: * the rights to use, copy, modify, merge, publish, distribute, sublicense,
012: * and/or sell copies of the Software, and to permit persons to whom the
013: * Software is furnished to do so, subject to the following conditions:
014: *
015: * The above copyright notice and this permission notice shall be included in
016: * all copies or substantial portions of the Software.
017: *
018: * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
019: * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
020: * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
021: * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
022: * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
023: * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
024: * DEALINGS IN THE SOFTWARE.
025: */
026:
027: package net.sourceforge.groboutils.util.thread.v1;
028:
029: import net.sourceforge.groboutils.util.datastruct.v1.SynchQueue;
030:
031: import java.util.Vector;
032:
033: /**
034: * A pool of QueueThread instances, each containing an instance of
035: * an ObjectListener implemented class. The Class to be the listener
036: * is passed into the constructor. Requirements for the Class are:
037: * 1. it implements QueueThread.ObjectListener, 2. it has a public
038: * constructor without any parameters.
039: * <P>
040: * The pool handles menial tasks such as:
041: * <ol>
042: * <li>Growing the thread pool if the number of waiting objects
043: * is above a threshold number, up to a maximum number of
044: * threads.
045: * <li>Finding the thread with the fewest number of waiting objects.
046: * <li>Optimization of determining which thread to pass events to.
047: * </ol>
048: * <P>
049: * The pool gets much of its functionality by sharing a single SynchQueue
050: * between all of its threads.
051: *
052: * @author Matt Albrecht <a href="mailto:groboclown@users.sourceforge.net">groboclown@users.sourceforge.net</a>
053: * @since June 4, 2000
054: * @version $Date: 2003/02/10 22:52:49 $
055: */
056: public class ThreadPool {
057: //----------------------------
058: // Public data
059:
060: //----------------------------
061: // Private data
062:
063: private Class m_objListenerClass = null;
064: private Object m_objListenerInitData = null;
065:
066: private QueueThread[] m_pool = null;
067: private SynchQueue m_sharedQueue = new SynchQueue();
068:
069: private int m_maxThreads = 10;
070: private int m_numThreads = 0;
071: private int m_depthThreshold = 5;
072:
073: //----------------------------
074: // constructors
075:
076: /**
077: * Default constructor
078: */
079: public ThreadPool(Class objectListenerClass) {
080: this (objectListenerClass, null, 1, 10);
081: }
082:
083: /**
084: *
085: */
086: public ThreadPool(Class objectListenerClass, int maxThreads) {
087: this (objectListenerClass, null, 1, maxThreads);
088: }
089:
090: /**
091: *
092: * @param initData if the given objectListenerClass is an instance
093: * of ThreadObjectListener, then the initData will be passed
094: * into the initialize( Object ) method.
095: */
096: public ThreadPool(Class objectListenerClass, Object initData) {
097: this (objectListenerClass, initData, 1, 10);
098: }
099:
100: /**
101: *
102: */
103: public ThreadPool(Class objectListenerClass, Object initData,
104: int maxThreads) {
105: this (objectListenerClass, initData, 1, maxThreads);
106: }
107:
108: /**
109: *
110: */
111: public ThreadPool(Class objectListenerClass, Object initData,
112: int startingThreadCount, int maxThreads) {
113: this .m_objListenerClass = objectListenerClass;
114: this .m_objListenerInitData = initData;
115: try {
116: createObjectListenerInstance();
117: } catch (Exception ex) {
118: ex.printStackTrace();
119: throw new IllegalArgumentException("Class "
120: + objectListenerClass
121: + " does not create ObjectListener instances");
122: }
123:
124: setMaximumThreadCount(maxThreads);
125: this .m_pool = new QueueThread[maxThreads];
126: while (this .m_numThreads < startingThreadCount) {
127: addNewThread();
128: }
129: }
130:
131: //----------------------------
132: // Public methods
133:
134: /**
135: *
136: */
137: public void setDepthThreshold(int threshold) {
138: if (threshold < 1) {
139: throw new IllegalArgumentException("threshold " + threshold
140: + " is too low");
141: }
142: this .m_depthThreshold = threshold;
143: }
144:
145: /**
146: *
147: */
148: public int getObjectDepth() {
149: return this .m_sharedQueue.size();
150: }
151:
152: /**
153: * Adds the given object into the shared queue, so that the next
154: * available thread will process it.
155: */
156: public void addObject(Object o) {
157: checkThreshold();
158: this .m_sharedQueue.enqueue(o);
159: }
160:
161: /**
162: *
163: */
164: public int getThreadCount() {
165: return this .m_numThreads;
166: }
167:
168: /**
169: *
170: */
171: public int getMaximumThreadCount() {
172: return this .m_maxThreads;
173: }
174:
175: /**
176: *
177: */
178: public void setMaximumThreadCount(int max) {
179: if (max < 1) {
180: throw new IllegalArgumentException("maximum count " + max
181: + " is out of bounds");
182: }
183: this .m_maxThreads = max;
184: }
185:
186: /**
187: * Waits for all expecting objects in the queue to be processed,
188: * and for each thread to finish processing an object.
189: */
190: public void waitForThreadsToFinish() {
191: // wait for the SynchQueue to empty
192: while (getObjectDepth() > 0) {
193: Thread.yield();
194: }
195:
196: Vector v = new Vector();
197: synchronized (v) {
198: // find all threads which are still processing objects
199: for (int i = this .m_numThreads; --i >= 0;) {
200: if (this .m_pool[i].isProcessingObjects()) {
201: v.addElement(this .m_pool[i]);
202: }
203: }
204:
205: // wait for all threads to finish processing their objects
206: QueueThread qt;
207: while (v.size() > 0) {
208: Thread.yield();
209: for (int i = v.size(); --i >= 0;) {
210: qt = (QueueThread) v.elementAt(i);
211: if (!qt.isProcessingObjects()) {
212: v.removeElementAt(i);
213: // don't need to adjust i because
214: // we're procressing backwards.
215: }
216: }
217: }
218: }
219: }
220:
221: /**
222: * Stops all threads.
223: */
224: public synchronized void stopThreads() {
225: for (int i = this .m_numThreads; --i >= 0;) {
226: if (this .m_pool[i] != null)
227: this .m_pool[i].stop();
228: }
229: }
230:
231: /**
232: * Suspends all threads.
233: */
234: public synchronized void suspendThreads() {
235: for (int i = this .m_numThreads; --i >= 0;) {
236: if (this .m_pool[i] != null)
237: this .m_pool[i].suspend();
238: }
239: }
240:
241: /**
242: * Resumes all threads.
243: */
244: public synchronized void resumeThreads() {
245: for (int i = this .m_numThreads; --i >= 0;) {
246: if (this .m_pool[i] != null)
247: this .m_pool[i].resume();
248: }
249: }
250:
251: //----------------------------
252: // Protected methods
253:
254: /**
255: * If there are not enough threads, then add one into the
256: * internal array, start the thread, and return the created
257: * thread.
258: *
259: * @return the new thread, or <tt>null</tt> if the pool has
260: * exceeded its maximum thread count.
261: */
262: protected synchronized QueueThread addNewThread() {
263: QueueThread qt = null;
264: if (this .m_numThreads < this .m_maxThreads) {
265: qt = this .m_pool[this .m_numThreads++] = new QueueThread(
266: createObjectListenerInstance(), this .m_sharedQueue);
267: qt.start();
268: }
269: return qt;
270: }
271:
272: /**
273: * Checks if the depth on the shared queue is too deep (beyond the
274: * threshold), and if so, creates a new thread to help deal with the
275: * situation.
276: */
277: protected void checkThreshold() {
278: if (this .m_sharedQueue.size() > this .m_depthThreshold) {
279: addNewThread();
280: }
281: }
282:
283: /**
284: * Create an instance of the basic object listener class, as given
285: * in the constructor.
286: *
287: * @exception IllegalStateException thrown if there is an error
288: * creating a new instance of the class.
289: */
290: protected IObjectListener createObjectListenerInstance() {
291: try {
292: //System.out.println("Creating an instance of class "+this.m_objListenerClass+
293: //", modifiers = "+(java.lang.reflect.Modifier.toString(
294: //this.m_objListenerClass.getModifiers())));
295: IObjectListener ol = (IObjectListener) this .m_objListenerClass
296: .newInstance();
297: if (ol instanceof IThreadObjectListener) {
298: ((IThreadObjectListener) ol)
299: .initialize(this .m_objListenerInitData);
300: }
301: return ol;
302: } catch (InstantiationException ie) {
303: throw new IllegalStateException(
304: "could not instantiate from class "
305: + this .m_objListenerClass.getName()
306: + ": general instantiation exception "
307: + ie.getMessage());
308: } catch (IllegalAccessException iae) {
309: throw new IllegalStateException(
310: "could not instantiate from class "
311: + this .m_objListenerClass.getName()
312: + ": could not access constructor "
313: + iae.getMessage());
314: } catch (ClassCastException cce) {
315: throw new IllegalStateException(
316: "could not instantiate from class "
317: + this .m_objListenerClass.getName()
318: + ": instance of wrong type "
319: + cce.getMessage());
320: }
321: }
322:
323: //----------------------------
324: // Private methods
325: }
|