001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)WorkThreadPool.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.engine.sequencing.framework.threads;
030:
031: import com.sun.jbi.engine.sequencing.SequencingEngineContext;
032:
033: import java.util.ArrayList;
034: import java.util.Iterator;
035: import java.util.NoSuchElementException;
036: import java.util.logging.Logger;
037:
038: /**
039: * This class manages a pool of worker threads. The Work Manager uses this
040: * class to execute the command in different threads.
041: *
042: * @author Sun Microsystems, Inc.
043: */
044: class WorkThreadPool {
045: /**
046: * Default Maximum Thread count.
047: */
048: private static final int MAX_THREAD_COUNT = 10;
049:
050: /**
051: * Default Minimum Thread count.
052: */
053: private static final int MIN_THREAD_COUNT = 2;
054:
055: /**
056: * Handle to the busy thread pool.
057: */
058: private ArrayList mBusyThreadPool;
059:
060: /**
061: * Handle to the free thread pool.
062: */
063: private ArrayList mFreeThreadPool;
064:
065: /**
066: * Contains a list of threads on which we are waiting for response.
067: */
068: private ArrayList mThreadWaitList;
069:
070: /**
071: * Internal handle to the logger instance.
072: */
073: private Logger mLog;
074:
075: /**
076: * Internal variable which indicates that workethread pool has been
077: * instructed to stop all its threads.
078: */
079: private String mState;
080:
081: /**
082: * Maximum Thread count.
083: */
084: private int mMaxThreadCount = MAX_THREAD_COUNT;
085:
086: /**
087: * Minimum Thread count.
088: */
089: private int mMinThreadCount = MIN_THREAD_COUNT;
090:
091: /**
092: * Creates a new instance of WorkThreadPool.
093: *
094: * @param minThreadCount - minimum thread limit
095: * @param maxThreadCount - maximum thread limit
096: */
097: WorkThreadPool(int minThreadCount, int maxThreadCount) {
098: mLog = SequencingEngineContext.getInstance().getLogger();
099: mMaxThreadCount = maxThreadCount;
100: mMinThreadCount = minThreadCount;
101: mThreadWaitList = new ArrayList();
102: mState = "NEW";
103: }
104:
105: /**
106: * Creates a new instance of WorkThreadPool.
107: */
108: WorkThreadPool() {
109: mLog = SequencingEngineContext.getInstance().getLogger();
110: mThreadWaitList = new ArrayList();
111: mState = "NEW";
112: }
113:
114: /**
115: * Returns the count of threads in busy thread pool
116: *
117: * @return int count of number of busy threads
118: */
119: public int getBusyThreads() {
120: return mBusyThreadPool.size();
121: }
122:
123: /**
124: * Sets the log file.
125: *
126: * @param logFile log file.
127: */
128: public void setLogger(String logFile) {
129: mLog = mLog.getLogger(logFile);
130: }
131:
132: /**
133: * Sets the max threads.
134: *
135: * @param count max threads.
136: */
137: public void setMaxThreads(int count) {
138: mMaxThreadCount = count;
139: }
140:
141: /**
142: * Sets the minimum threads.
143: *
144: * @param count min threads.
145: */
146: public void setMinThreads(int count) {
147: mMinThreadCount = count;
148: }
149:
150: /**
151: *
152: */
153: public synchronized void exitWhenBusyThreadsDone() {
154: while (mBusyThreadPool.size() != 0) {
155: try {
156: wait();
157: } catch (Exception e) {
158: mLog.info("GOt notification in exitWhenBusyThreadsDone"
159: + mBusyThreadPool.size());
160:
161: continue;
162: }
163: }
164: }
165:
166: /**
167: * Gets a free thread from the free thread pool.
168: *
169: * @return a worker thread instance if a free thread exists; otherwise
170: * null.
171: *
172: * @throws IllegalStateException when the thread pool is not running.
173: */
174: synchronized WorkThread getFreeThread() {
175: WorkThread workerThread = null;
176:
177: if (!mState.equals("START")) {
178: throw new IllegalStateException(
179: "Thread pool is not running");
180: }
181:
182: if (mFreeThreadPool.size() > 0) {
183: workerThread = (WorkThread) mFreeThreadPool.get(0);
184: mFreeThreadPool.remove(0);
185: mBusyThreadPool.add(workerThread);
186: } else if ((mBusyThreadPool.size()) < mMaxThreadCount) {
187: workerThread = new WorkThread(this );
188: mBusyThreadPool.add(workerThread);
189: new Thread(workerThread).start();
190: } else {
191: try {
192: mThreadWaitList.add(Thread.currentThread());
193: wait();
194: mThreadWaitList.remove(Thread.currentThread());
195:
196: // There should always be a free thread now.
197: if (mFreeThreadPool.size() > 0) {
198: workerThread = (WorkThread) mFreeThreadPool.get(0);
199: mFreeThreadPool.remove(0);
200: mBusyThreadPool.add(workerThread);
201: } else {
202: // this should never happen.
203: mLog
204: .severe("Something is seriously wrong here. fix it");
205: }
206: } catch (InterruptedException interupException) {
207: // Threadpool has been instructed to cleanup
208: // do nothing
209: mLog.info("Received interrupt signal to cleanup");
210: workerThread = null;
211: } catch (Exception exception) {
212: // This should not happen.
213: // Log warning
214: mLog
215: .warning("Exception thrown when waiting for a free thread");
216: mLog.warning("Details : " + exception.toString());
217: workerThread = null;
218: }
219: }
220:
221: return workerThread;
222: }
223:
224: /**
225: * Cleans up the free and busy threads.
226: *
227: * @throws IllegalStateException exception.
228: */
229: synchronized void cleanup() {
230: mLog.info("Cleaning up the worker thread pool");
231:
232: if (!(mState.equals("INIT") || mState.equals("STOP"))) {
233: throw new IllegalStateException(
234: "Thread Pool is still active");
235: }
236:
237: mFreeThreadPool.clear();
238: mBusyThreadPool.clear();
239: }
240:
241: /**
242: * Initializes the instance.
243: *
244: * @throws IllegalStateException
245: */
246: void init() {
247: if (!mState.equals("NEW")) {
248: throw new IllegalStateException(
249: "Threadpool has already been initialized");
250: }
251:
252: mFreeThreadPool = new ArrayList(mMaxThreadCount);
253: mBusyThreadPool = new ArrayList(mMaxThreadCount);
254:
255: for (int i = 0; i < mMinThreadCount; i++) {
256: mFreeThreadPool.add(new WorkThread(this ));
257: }
258:
259: mState = "INIT";
260: }
261:
262: /**
263: * Release the thread to the free pool. This method is used by worker
264: * threads to notify that it has completed processing its command.
265: *
266: * @param thread - worker thread instance.
267: *
268: * @throws IllegalStateException
269: * @throws NoSuchElementException exception
270: */
271: synchronized void releaseThread(WorkThread thread) {
272: if (!mState.equals("START")) {
273: throw new IllegalStateException(
274: "Thread pool is not running");
275: }
276:
277: int threadIndex = mBusyThreadPool.indexOf(thread);
278:
279: if (threadIndex != -1) {
280: mBusyThreadPool.remove(threadIndex);
281:
282: if (mFreeThreadPool.size() < mMinThreadCount) {
283: mFreeThreadPool.add(thread);
284:
285: // Notify the manager to indicate that a thread has been released
286: // This is ignored if no one are waiting for a free thread.
287: try {
288: notifyAll();
289: } catch (IllegalMonitorStateException exp) {
290: // This should not happen
291: mLog
292: .severe("Exception while notifying work manager");
293: mLog.severe("Details :" + exp.toString());
294: }
295: } else {
296: thread.stop();
297: }
298:
299: if (mBusyThreadPool.size() == 0) {
300: try {
301: notify();
302: } catch (IllegalMonitorStateException exp) {
303: // This should not happen
304: mLog
305: .severe("Exception while notifying COMPLETION OF BUSY THREADS");
306: mLog.severe("Details :" + exp.toString());
307: }
308: }
309: } else {
310: throw new NoSuchElementException("thread "
311: + thread.getName() + " cannot be found");
312: }
313: }
314:
315: /**
316: * Start the free threads.
317: *
318: * @throws IllegalStateException
319: */
320: void start() {
321: mLog.info("Starting the thread pool");
322:
323: WorkThread workerThread;
324:
325: if (!(mState.equals("INIT") || mState.equals("STOP"))) {
326: throw new IllegalStateException(
327: "Thread pool has already been started");
328: }
329:
330: for (Iterator iter = mFreeThreadPool.iterator(); iter.hasNext();) {
331: workerThread = (WorkThread) iter.next();
332: new Thread(workerThread).start();
333: }
334:
335: for (Iterator iter = mBusyThreadPool.iterator(); iter.hasNext();) {
336: workerThread = (WorkThread) iter.next();
337: new Thread(workerThread).start();
338: }
339:
340: mState = "START";
341: }
342:
343: /**
344: * Stops the free and busy threads.
345: *
346: * @throws IllegalStateException exception.
347: */
348: synchronized void stop() {
349: mLog.info("Stopping the worker thread pool");
350:
351: WorkThread workerThread;
352:
353: if (!mState.equals("START")) {
354: throw new IllegalStateException(
355: "Threadpool has not been started");
356: }
357:
358: for (Iterator iter = mFreeThreadPool.iterator(); iter.hasNext();) {
359: workerThread = (WorkThread) iter.next();
360: workerThread.stop();
361: }
362:
363: for (Iterator iter = mBusyThreadPool.iterator(); iter.hasNext();) {
364: workerThread = (WorkThread) iter.next();
365: workerThread.stop();
366: }
367:
368: // Interrupt all threads in which work manager is waiting for a free
369: // thread.
370: Thread waitThread = null;
371:
372: for (Iterator iter = mThreadWaitList.iterator(); iter.hasNext();) {
373: waitThread = (Thread) iter.next();
374: waitThread.interrupt();
375: }
376:
377: mState = "STOP";
378: }
379: }
|