001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)WorkThreadPool.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.engine.xslt.framework;
030:
031: import java.util.ArrayList;
032: import java.util.Iterator;
033: import java.util.NoSuchElementException;
034: import java.util.logging.Logger;
035: import com.sun.jbi.engine.xslt.TransformationEngineContext;
036:
037: /**
038: * This class manages a pool of worker threads. The Work Manager uses this
039: * class to execute the command in different threads.
040: *
041: * @author Sun Microsystems, Inc.
042: */
043: class WorkThreadPool {
044: /**
045: * Default Maximum Thread count.
046: */
047: private static final int MAX_THREAD_COUNT = 10;
048:
049: /**
050: * Default Minimum Thread count.
051: */
052: private static final int MIN_THREAD_COUNT = 2;
053:
054: /**
055: * Handle to the busy thread pool.
056: */
057: private ArrayList mBusyThreadPool;
058:
059: /**
060: * Handle to the free thread pool.
061: */
062: private ArrayList mFreeThreadPool;
063:
064: /**
065: * Contains a list of threads on which we are waiting for response.
066: */
067: private ArrayList mThreadWaitList;
068:
069: /**
070: * Internal handle to the logger instance.
071: */
072: private Logger mLog;
073:
074: /**
075: * Internal variable which indicates that workethread pool has been
076: * instructed to stop all its threads.
077: */
078: private String mState;
079:
080: /**
081: * Maximum Thread count.
082: */
083: private int mMaxThreadCount = MAX_THREAD_COUNT;
084:
085: /**
086: * Minimum Thread count.
087: */
088: private int mMinThreadCount = MIN_THREAD_COUNT;
089:
090: /**
091: * Creates a new instance of WorkThreadPool.
092: *
093: * @param minThreadCount - minimum thread limit
094: * @param maxThreadCount - maximum thread limit
095: */
096: WorkThreadPool(int minThreadCount, int maxThreadCount) {
097: mLog = TransformationEngineContext.getInstance().getLogger("");
098: mMaxThreadCount = maxThreadCount;
099: mMinThreadCount = minThreadCount;
100: mThreadWaitList = new ArrayList();
101: mState = "NEW";
102: }
103:
104: /**
105: * Creates a new instance of WorkThreadPool.
106: */
107: WorkThreadPool() {
108: mLog = TransformationEngineContext.getInstance().getLogger("");
109: mThreadWaitList = new ArrayList();
110: mState = "NEW";
111: }
112:
113: /**
114: * Returns the count of threads in busy thread pool
115: *
116: * @return int count of number of busy threads
117: */
118: public int getBusyThreads() {
119: return mBusyThreadPool.size();
120: }
121:
122: /**
123: * Sets the log file.
124: *
125: * @param logFile DOCUMENT ME!
126: */
127: public void setLogger(String logFile) {
128: mLog = mLog.getLogger(logFile);
129: }
130:
131: /**
132: * DOCUMENT ME!
133: *
134: * @param count DOCUMENT ME!
135: */
136: public void setMaxThreads(int count) {
137: mMaxThreadCount = count;
138: }
139:
140: /**
141: * DOCUMENT ME!
142: *
143: * @param count DOCUMENT ME!
144: */
145: public void setMinThreads(int count) {
146: mMinThreadCount = count;
147: }
148:
149: /**
150: *
151: */
152: public synchronized void exitWhenBusyThreadsDone() {
153: while (mBusyThreadPool.size() != 0) {
154: try {
155: wait();
156: } catch (Exception e) {
157: mLog.info("GOt notification in exitWhenBusyThreadsDone"
158: + mBusyThreadPool.size());
159:
160: continue;
161: }
162: }
163: }
164:
165: /**
166: * Gets a free thread from the free thread pool.
167: *
168: * @return a worker thread instance if a free thread exists; otherwise
169: * null.
170: *
171: * @throws IllegalStateException when the thread pool is not running.
172: */
173: synchronized WorkThread getFreeThread() {
174: WorkThread workerThread = null;
175:
176: if (!mState.equals("START")) {
177: throw new IllegalStateException(
178: "Thread pool is not running");
179: }
180:
181: if (mFreeThreadPool.size() > 0) {
182: workerThread = (WorkThread) mFreeThreadPool.get(0);
183: mFreeThreadPool.remove(0);
184: mBusyThreadPool.add(workerThread);
185: } else if ((mBusyThreadPool.size()) < mMaxThreadCount) {
186: workerThread = new WorkThread(this );
187: mBusyThreadPool.add(workerThread);
188: new Thread(workerThread).start();
189: } else {
190: try {
191: mThreadWaitList.add(Thread.currentThread());
192: //wait();
193: mThreadWaitList.remove(Thread.currentThread());
194:
195: // There should always be a free thread now.
196: if (mFreeThreadPool.size() > 0) {
197: workerThread = (WorkThread) mFreeThreadPool.get(0);
198: mFreeThreadPool.remove(0);
199: mBusyThreadPool.add(workerThread);
200: } else {
201: // this should never happen.
202: mLog
203: .severe("Something is seriously wrong here. fix it");
204: }
205: }
206: /*catch (InterruptedException interupException)
207: {
208: // Threadpool has been instructed to cleanup
209: // do nothing
210: mLog.info("Received interrupt signal to cleanup");
211: workerThread = null;
212: }*/
213: catch (Exception exception) {
214: // This should not happen.
215: // Log warning
216: mLog
217: .warning("Exception thrown when waiting for a free thread");
218: mLog.warning("Details : " + exception.toString());
219: workerThread = null;
220: }
221: }
222:
223: return workerThread;
224: }
225:
226: /**
227: * Cleans up the free and busy threads.
228: *
229: * @throws IllegalStateException DOCUMENT ME!
230: */
231: synchronized void cleanup() {
232: mLog.info("Cleaning up the worker thread pool");
233:
234: if (!(mState.equals("INIT") || mState.equals("STOP"))) {
235: throw new IllegalStateException(
236: "Thread Pool is still active");
237: }
238:
239: mFreeThreadPool.clear();
240: mBusyThreadPool.clear();
241: }
242:
243: /**
244: * Initializes the instance.
245: *
246: * @throws IllegalStateException
247: */
248: void init() {
249: if (!mState.equals("NEW")) {
250: throw new IllegalStateException(
251: "Threadpool has already been initialized");
252: }
253:
254: mFreeThreadPool = new ArrayList(mMaxThreadCount);
255: mBusyThreadPool = new ArrayList(mMaxThreadCount);
256:
257: for (int i = 0; i < mMinThreadCount; i++) {
258: mFreeThreadPool.add(new WorkThread(this ));
259: }
260:
261: mState = "INIT";
262: }
263:
264: /**
265: * Release the thread to the free pool. This method is used by worker
266: * threads to notify that it has completed processing its command.
267: *
268: * @param thread - worker thread instance.
269: *
270: * @throws IllegalStateException
271: * @throws NoSuchElementException DOCUMENT ME!
272: */
273: synchronized void releaseThread(WorkThread thread) {
274: if (!mState.equals("START")) {
275: throw new IllegalStateException(
276: "Thread pool is not running");
277: }
278:
279: int threadIndex = mBusyThreadPool.indexOf(thread);
280:
281: if (threadIndex != -1) {
282: mBusyThreadPool.remove(threadIndex);
283:
284: if (mFreeThreadPool.size() < mMinThreadCount) {
285: mFreeThreadPool.add(thread);
286:
287: // Notify the manager to indicate that a thread has been released
288: // This is ignored if no one are waiting for a free thread.
289: try {
290: notifyAll();
291: } catch (IllegalMonitorStateException exp) {
292: // This should not happen
293: mLog
294: .severe("Exception while notifying work manager");
295: mLog.severe("Details :" + exp.toString());
296: }
297: } else {
298: thread.stop();
299: }
300:
301: if (mBusyThreadPool.size() == 0) {
302: try {
303: notify();
304: } catch (IllegalMonitorStateException exp) {
305: // This should not happen
306: mLog
307: .severe("Exception while notifying COMPLETION OF BUSY THREADS");
308: mLog.severe("Details :" + exp.toString());
309: }
310: }
311: } else {
312: throw new NoSuchElementException("thread "
313: + thread.getName() + " cannot be found");
314: }
315: }
316:
317: /**
318: * Start the free threads.
319: *
320: * @throws IllegalStateException
321: */
322: void start() {
323: mLog.info("Starting the thread pool" + mState);
324:
325: WorkThread workerThread;
326:
327: if (!(mState.equals("INIT") || mState.equals("STOP"))) {
328: throw new IllegalStateException(
329: "Thread pool has already been started");
330: }
331:
332: for (Iterator iter = mFreeThreadPool.iterator(); iter.hasNext();) {
333: workerThread = (WorkThread) iter.next();
334: new Thread(workerThread).start();
335: }
336:
337: for (Iterator iter = mBusyThreadPool.iterator(); iter.hasNext();) {
338: workerThread = (WorkThread) iter.next();
339: new Thread(workerThread).start();
340: }
341:
342: mState = "START";
343: }
344:
345: /**
346: * Stops the free and busy threads.
347: *
348: * @throws IllegalStateException DOCUMENT ME!
349: */
350: synchronized void stop() {
351: mLog.info("Stopping the worker thread pool");
352:
353: WorkThread workerThread;
354:
355: if (!mState.equals("START")) {
356: throw new IllegalStateException(
357: "Threadpool has not been started");
358: }
359:
360: for (Iterator iter = mFreeThreadPool.iterator(); iter.hasNext();) {
361: workerThread = (WorkThread) iter.next();
362: workerThread.stop();
363: }
364:
365: for (Iterator iter = mBusyThreadPool.iterator(); iter.hasNext();) {
366: workerThread = (WorkThread) iter.next();
367: workerThread.stop();
368: }
369:
370: // Interrupt all threads in which work manager is waiting for a free
371: // thread.
372: Thread waitThread = null;
373:
374: for (Iterator iter = mThreadWaitList.iterator(); iter.hasNext();) {
375: waitThread = (Thread) iter.next();
376: waitThread.interrupt();
377: }
378:
379: mState = "STOP";
380: mLog.info("Stopping the worker thread pool state =" + mState);
381: }
382: }
|