001: // Copyright (c) 2004-2005 Sun Microsystems Inc., All Rights Reserved.
002:
003: /*
004: * WorkThreadPool.java
005: *
006: * SUN PROPRIETARY/CONFIDENTIAL
007: * This software is the proprietary information of Sun Microsystems, Inc.
008: * Use is subject to license terms
009: */
010: package com.sun.jbi.binding.file.framework;
011:
012: import com.sun.jbi.binding.file.FileBindingContext;
013: import com.sun.jbi.binding.file.FileBindingResources;
014: import com.sun.jbi.binding.file.util.StringTranslator;
015:
016: import java.util.ArrayList;
017: import java.util.Iterator;
018: import java.util.NoSuchElementException;
019: import java.util.logging.Logger;
020:
021: /**
022: * This class manages a pool of worker threads. The Work Manager uses this
023: * class to execute the command in different threads.
024: *
025: * @author Sun Microsystems, Inc.
026: */
027: class WorkThreadPool implements FileBindingResources {
028: /**
029: * Default Maximum Thread count.
030: */
031: private static final int MAX_THREAD_COUNT = 10;
032:
033: /**
034: * Default Minimum Thread count.
035: */
036: private static final int MIN_THREAD_COUNT = 1;
037:
038: /**
039: * Handle to the busy thread pool.
040: */
041: private ArrayList mBusyThreadPool;
042:
043: /**
044: * Handle to the free thread pool.
045: */
046: private ArrayList mFreeThreadPool;
047:
048: /**
049: * Contains a list of threads on which we are waiting for response.
050: */
051: private ArrayList mThreadWaitList;
052:
053: /**
054: * Internal handle to the logger instance.
055: */
056: private Logger mLog;
057:
058: /**
059: * Internal variable which indicates that workethread pool has been
060: * instructed to stop all its threads.
061: */
062: private String mState;
063:
064: /**
065: * i18n
066: */
067: private StringTranslator mTranslator;
068:
069: /**
070: * Maximum Thread count.
071: */
072: private int mMaxThreadCount = MAX_THREAD_COUNT;
073:
074: /**
075: * Minimum Thread count.
076: */
077: private int mMinThreadCount = MIN_THREAD_COUNT;
078:
079: /**
080: * Creates a new instance of WorkThreadPool.
081: *
082: * @param minThreadCount - minimum thread limit
083: * @param maxThreadCount - maximum thread limit
084: */
085: WorkThreadPool(int minThreadCount, int maxThreadCount) {
086: mLog = FileBindingContext.getInstance().getLogger();
087: mMaxThreadCount = maxThreadCount;
088: mMinThreadCount = minThreadCount;
089: mThreadWaitList = new ArrayList();
090: mTranslator = new StringTranslator();
091: mState = "NEW";
092: }
093:
094: /**
095: * Creates a new instance of WorkThreadPool.
096: */
097: WorkThreadPool() {
098: mLog = Logger.getLogger(this .getClass().getPackage().getName());
099: mTranslator = new StringTranslator();
100: mThreadWaitList = new ArrayList();
101: mState = "NEW";
102: }
103:
104: /**
105: * Returns the count of threads in busy thread pool
106: *
107: * @return int count of number of busy threads
108: */
109: public int getBusyThreads() {
110: return mBusyThreadPool.size();
111: }
112:
113: /**
114: * Sets the log file.
115: *
116: * @param logFile log file.
117: */
118: public void setLogger(String logFile) {
119: mLog = mLog.getLogger(logFile);
120: }
121:
122: /**
123: * Sets the maximum thread count
124: *
125: * @param count max thread.
126: */
127: public void setMaxThreads(int count) {
128: mMaxThreadCount = count;
129: }
130:
131: /**
132: * Sets the minimum thread count.
133: *
134: * @param count min thread count.
135: */
136: public void setMinThreads(int count) {
137: mMinThreadCount = count;
138: }
139:
140: /**
141: * This method exits when all the threds complete.
142: */
143: public synchronized void exitWhenBusyThreadsDone() {
144: while (mBusyThreadPool.size() != 0) {
145: try {
146: wait();
147: } catch (Exception e) {
148: mLog.severe(mTranslator.getString(
149: FBC_THREADS_BUSYTHREADS, String
150: .valueOf(mBusyThreadPool.size())));
151:
152: continue;
153: }
154: }
155: }
156:
157: /**
158: * Gets a free thread from the free thread pool.
159: *
160: * @return a worker thread instance if a free thread exists; otherwise
161: * null.
162: *
163: * @throws IllegalStateException when the thread pool is not running.
164: */
165: synchronized WorkThread getFreeThread() {
166: WorkThread workerThread = null;
167:
168: if (!mState.equals("START")) {
169: throw new IllegalStateException(
170: "Thread pool is not running");
171: }
172:
173: if (mFreeThreadPool.size() > 0) {
174: workerThread = (WorkThread) mFreeThreadPool.get(0);
175: mFreeThreadPool.remove(0);
176: mBusyThreadPool.add(workerThread);
177: } else if ((mBusyThreadPool.size()) < mMaxThreadCount) {
178: workerThread = new WorkThread(this );
179: mBusyThreadPool.add(workerThread);
180: new Thread(workerThread).start();
181: } else {
182: try {
183: mThreadWaitList.add(Thread.currentThread());
184: wait();
185: mThreadWaitList.remove(Thread.currentThread());
186:
187: if (mFreeThreadPool.size() > 0) {
188: workerThread = (WorkThread) mFreeThreadPool.get(0);
189: mFreeThreadPool.remove(0);
190: mBusyThreadPool.add(workerThread);
191: }
192: } catch (InterruptedException interupException) {
193: workerThread = null;
194: } catch (Exception exception) {
195: mLog.warning("Details : " + exception.toString());
196: workerThread = null;
197: }
198: }
199:
200: return workerThread;
201: }
202:
203: /**
204: * Cleans up the free and busy threads.
205: *
206: * @throws IllegalStateException exception.
207: */
208: synchronized void cleanup() {
209: if (!(mState.equals("INIT") || mState.equals("STOP"))) {
210: throw new IllegalStateException(
211: "Thread Pool is still active");
212: }
213:
214: mFreeThreadPool.clear();
215: mBusyThreadPool.clear();
216: }
217:
218: /**
219: * Initializes the instance.
220: *
221: * @throws IllegalStateException
222: */
223: void init() {
224: if (!mState.equals("NEW")) {
225: throw new IllegalStateException(
226: "Threadpool has already been initialized");
227: }
228:
229: mFreeThreadPool = new ArrayList(mMaxThreadCount);
230: mBusyThreadPool = new ArrayList(mMaxThreadCount);
231:
232: for (int i = 0; i < mMinThreadCount; i++) {
233: mFreeThreadPool.add(new WorkThread(this ));
234: }
235:
236: mState = "INIT";
237: }
238:
239: /**
240: * Release the thread to the free pool. This method is used by worker
241: * threads to notify that it has completed processing its command.
242: *
243: * @param thread - worker thread instance.
244: *
245: * @throws IllegalStateException
246: * @throws NoSuchElementException
247: */
248: synchronized void releaseThread(WorkThread thread) {
249: if (!mState.equals("START")) {
250: throw new IllegalStateException(
251: "Thread pool is not running");
252: }
253:
254: int threadIndex = mBusyThreadPool.indexOf(thread);
255:
256: if (threadIndex != -1) {
257: mBusyThreadPool.remove(threadIndex);
258:
259: if (mFreeThreadPool.size() < mMinThreadCount) {
260: mFreeThreadPool.add(thread);
261:
262: // Notify the manager to indicate that a thread has been released
263: // This is ignored if no one are waiting for a free thread.
264: try {
265: notifyAll();
266: } catch (IllegalMonitorStateException exp) {
267: mLog.severe("Details :" + exp.toString());
268: }
269: } else {
270: thread.stop();
271: }
272:
273: if (mBusyThreadPool.size() == 0) {
274: try {
275: notify();
276: } catch (IllegalMonitorStateException exp) {
277: mLog.severe("Details :" + exp.toString());
278: }
279: }
280: } else {
281: throw new NoSuchElementException("Thread "
282: + thread.getName() + " cannot be found");
283: }
284: }
285:
286: /**
287: * Start the free threads.
288: *
289: * @throws IllegalStateException
290: */
291: void start() {
292: mLog.info("Starting the thread pool");
293:
294: WorkThread workerThread;
295:
296: if (!(mState.equals("INIT") || mState.equals("STOP"))) {
297: throw new IllegalStateException(
298: "Thread pool has already been started");
299: }
300:
301: for (Iterator iter = mFreeThreadPool.iterator(); iter.hasNext();) {
302: workerThread = (WorkThread) iter.next();
303: new Thread(workerThread).start();
304: }
305:
306: for (Iterator iter = mBusyThreadPool.iterator(); iter.hasNext();) {
307: workerThread = (WorkThread) iter.next();
308: new Thread(workerThread).start();
309: }
310:
311: mState = "START";
312: }
313:
314: /**
315: * Stops the free and busy threads.
316: *
317: * @throws IllegalStateException exception.
318: */
319: synchronized void stop() {
320: mLog.fine(mTranslator.getString(FBC_THREADS_POOL_STOP));
321:
322: WorkThread workerThread;
323:
324: if (!mState.equals("START")) {
325: throw new IllegalStateException(
326: "Threadpool has not been started");
327: }
328:
329: for (Iterator iter = mFreeThreadPool.iterator(); iter.hasNext();) {
330: workerThread = (WorkThread) iter.next();
331: workerThread.stop();
332: }
333:
334: for (Iterator iter = mBusyThreadPool.iterator(); iter.hasNext();) {
335: workerThread = (WorkThread) iter.next();
336: workerThread.stop();
337: }
338:
339: Thread waitThread = null;
340:
341: for (Iterator iter = mThreadWaitList.iterator(); iter.hasNext();) {
342: waitThread = (Thread) iter.next();
343: waitThread.interrupt();
344: }
345:
346: mState = "STOP";
347: }
348: }
|