001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)WorkThreadPool.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.binding.jms.framework;
030:
031: import java.util.ArrayList;
032: import java.util.Iterator;
033: import java.util.NoSuchElementException;
034:
035: import java.util.logging.Logger;
036:
037: /**
038: * This class manages a pool of worker threads. The Work Manager uses this
039: * class to execute the command in different threads.
040: *
041: * @author Sun Microsystems, Inc.
042: */
043: class WorkThreadPool {
044: /**
045: * Default Maximum Thread count.
046: */
047: private static final int MAX_THREAD_COUNT = 10;
048:
049: /**
050: * Default Minimum Thread count.
051: */
052: private static final int MIN_THREAD_COUNT = 1;
053:
054: /**
055: * Handle to the busy thread pool.
056: */
057: private ArrayList mBusyThreadPool;
058:
059: /**
060: * Handle to the free thread pool.
061: */
062: private ArrayList mFreeThreadPool;
063:
064: /**
065: * Contains a list of threads on which we are waiting for response.
066: */
067: private ArrayList mThreadWaitList;
068:
069: /**
070: * Internal handle to the logger instance.
071: */
072: private Logger mLog;
073:
074: /**
075: * Internal variable which indicates that workethread pool has been
076: * instructed to stop all its threads.
077: */
078: private String mState;
079:
080: /**
081: * Maximum Thread count.
082: */
083: private int mMaxThreadCount = MAX_THREAD_COUNT;
084:
085: /**
086: * Minimum Thread count.
087: */
088: private int mMinThreadCount = MIN_THREAD_COUNT;
089:
090: /**
091: * Creates a new instance of WorkThreadPool.
092: *
093: * @param minThreadCount - minimum thread limit
094: * @param maxThreadCount - maximum thread limit
095: */
096: WorkThreadPool(int minThreadCount, int maxThreadCount) {
097: mLog = Logger.getLogger(this .getClass().getPackage().getName());
098: mMaxThreadCount = maxThreadCount;
099: mMinThreadCount = minThreadCount;
100: mThreadWaitList = new ArrayList();
101: mState = "NEW";
102: }
103:
104: /**
105: * Creates a new instance of WorkThreadPool.
106: */
107: WorkThreadPool() {
108: mLog = Logger.getLogger(this .getClass().getPackage().getName());
109: mThreadWaitList = new ArrayList();
110: mState = "NEW";
111: }
112:
113: /**
114: * Returns the count of threads in busy thread pool
115: *
116: * @return int count of number of busy threads
117: */
118: public int getBusyThreads() {
119: return mBusyThreadPool.size();
120: }
121:
122: /**
123: * Sets the log file.
124: *
125: * @param logFile DOCUMENT ME!
126: */
127: public void setLogger(String logFile) {
128: mLog = mLog.getLogger(logFile);
129: }
130:
131: /**
132: * DOCUMENT ME!
133: *
134: * @param count DOCUMENT ME!
135: */
136: public void setMaxThreads(int count) {
137: mMaxThreadCount = count;
138: }
139:
140: /**
141: * DOCUMENT ME!
142: *
143: * @param count DOCUMENT ME!
144: */
145: public void setMinThreads(int count) {
146: mMinThreadCount = count;
147: }
148:
149: /**
150: *
151: */
152: public synchronized void exitWhenBusyThreadsDone() {
153: while (mBusyThreadPool.size() != 0) {
154: try {
155: wait();
156: } catch (Exception e) {
157: mLog.info("GOt notification in exitWhenBusyThreadsDone"
158: + mBusyThreadPool.size());
159:
160: continue;
161: }
162: }
163: }
164:
165: /**
166: * Gets a free thread from the free thread pool.
167: *
168: * @return a worker thread instance if a free thread exists; otherwise
169: * null.
170: *
171: * @throws IllegalStateException when the thread pool is not running.
172: */
173: synchronized WorkThread getFreeThread() {
174: WorkThread workerThread = null;
175:
176: if (!mState.equals("START")) {
177: throw new IllegalStateException(
178: "Thread pool is not running");
179: }
180:
181: if (mFreeThreadPool.size() > 0) {
182: workerThread = (WorkThread) mFreeThreadPool.get(0);
183: mFreeThreadPool.remove(0);
184: mBusyThreadPool.add(workerThread);
185: } else if ((mBusyThreadPool.size()) < mMaxThreadCount) {
186: workerThread = new WorkThread(this );
187: mBusyThreadPool.add(workerThread);
188: new Thread(workerThread).start();
189: } else {
190: try {
191: mThreadWaitList.add(Thread.currentThread());
192: wait();
193: mThreadWaitList.remove(Thread.currentThread());
194:
195: // There should always be a free thread now.
196: if (mFreeThreadPool.size() > 0) {
197: workerThread = (WorkThread) mFreeThreadPool.get(0);
198: mFreeThreadPool.remove(0);
199: mBusyThreadPool.add(workerThread);
200: } else {
201: // this should never happen.
202: mLog
203: .severe("Something is seriously wrong here. fix it");
204: }
205: } catch (InterruptedException interupException) {
206: // Threadpool has been instructed to cleanup
207: // do nothing
208: mLog.fine("Received interrupt signal to cleanup");
209: workerThread = null;
210: } catch (Exception exception) {
211: // This should not happen.
212: // Log warning
213: mLog
214: .warning("Exception thrown when waiting for a free thread");
215: mLog.warning("Details : " + exception.toString());
216: workerThread = null;
217: }
218: }
219:
220: return workerThread;
221: }
222:
223: /**
224: * Cleans up the free and busy threads.
225: *
226: * @throws IllegalStateException DOCUMENT ME!
227: */
228: synchronized void cleanup() {
229: mLog.fine("Cleaning up the worker thread pool");
230:
231: if (!(mState.equals("INIT") || mState.equals("STOP"))) {
232: throw new IllegalStateException(
233: "Thread Pool is still active");
234: }
235:
236: mFreeThreadPool.clear();
237: mBusyThreadPool.clear();
238: }
239:
240: /**
241: * Initializes the instance.
242: *
243: * @throws IllegalStateException
244: */
245: void init() {
246: if (!mState.equals("NEW")) {
247: throw new IllegalStateException(
248: "Threadpool has already been initialized");
249: }
250:
251: mFreeThreadPool = new ArrayList(mMaxThreadCount);
252: mBusyThreadPool = new ArrayList(mMaxThreadCount);
253:
254: for (int i = 0; i < mMinThreadCount; i++) {
255: mFreeThreadPool.add(new WorkThread(this ));
256: }
257:
258: mState = "INIT";
259: }
260:
261: /**
262: * Release the thread to the free pool. This method is used by worker
263: * threads to notify that it has completed processing its command.
264: *
265: * @param thread - worker thread instance.
266: *
267: * @throws IllegalStateException
268: * @throws NoSuchElementException DOCUMENT ME!
269: */
270: synchronized void releaseThread(WorkThread thread) {
271: if (!mState.equals("START")) {
272: throw new IllegalStateException(
273: "Thread pool is not running");
274: }
275:
276: int threadIndex = mBusyThreadPool.indexOf(thread);
277:
278: if (threadIndex != -1) {
279: mBusyThreadPool.remove(threadIndex);
280:
281: if (mFreeThreadPool.size() < mMinThreadCount) {
282: mFreeThreadPool.add(thread);
283:
284: // Notify the manager to indicate that a thread has been released
285: // This is ignored if no one are waiting for a free thread.
286: try {
287: notifyAll();
288: } catch (IllegalMonitorStateException exp) {
289: // This should not happen
290: mLog
291: .severe("Exception while notifying work manager");
292: mLog.severe("Details :" + exp.toString());
293: }
294: } else {
295: thread.stop();
296: }
297:
298: if (mBusyThreadPool.size() == 0) {
299: try {
300: notify();
301: } catch (IllegalMonitorStateException exp) {
302: // This should not happen
303: mLog
304: .severe("Exception while notifying COMPLETION OF BUSY THREADS");
305: mLog.severe("Details :" + exp.toString());
306: }
307: }
308: } else {
309: throw new NoSuchElementException("thread "
310: + thread.getName() + " cannot be found");
311: }
312: }
313:
314: /**
315: * Start the free threads.
316: *
317: * @throws IllegalStateException
318: */
319: void start() {
320: mLog.fine("Starting the thread pool");
321:
322: WorkThread workerThread;
323:
324: if (!(mState.equals("INIT") || mState.equals("STOP"))) {
325: throw new IllegalStateException(
326: "Thread pool has already been started");
327: }
328:
329: for (Iterator iter = mFreeThreadPool.iterator(); iter.hasNext();) {
330: workerThread = (WorkThread) iter.next();
331: new Thread(workerThread).start();
332: }
333:
334: for (Iterator iter = mBusyThreadPool.iterator(); iter.hasNext();) {
335: workerThread = (WorkThread) iter.next();
336: new Thread(workerThread).start();
337: }
338:
339: mState = "START";
340: }
341:
342: /**
343: * Stops the free and busy threads.
344: *
345: * @throws IllegalStateException DOCUMENT ME!
346: */
347: synchronized void stop() {
348: mLog.fine("Stopping the worker thread pool");
349:
350: WorkThread workerThread;
351:
352: if (!mState.equals("START")) {
353: throw new IllegalStateException(
354: "Threadpool has not been started");
355: }
356:
357: for (Iterator iter = mFreeThreadPool.iterator(); iter.hasNext();) {
358: workerThread = (WorkThread) iter.next();
359: workerThread.stop();
360: }
361:
362: for (Iterator iter = mBusyThreadPool.iterator(); iter.hasNext();) {
363: workerThread = (WorkThread) iter.next();
364: workerThread.stop();
365: }
366:
367: // Interrupt all threads in which work manager is waiting for a free
368: // thread.
369: Thread waitThread = null;
370:
371: for (Iterator iter = mThreadWaitList.iterator(); iter.hasNext();) {
372: waitThread = (Thread) iter.next();
373: waitThread.interrupt();
374: }
375:
376: mState = "STOP";
377: }
378: }
|