001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)WorkThreadPool.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.binding.file.framework;
030:
031: import com.sun.jbi.binding.file.FileBindingContext;
032: import com.sun.jbi.binding.file.FileBindingResources;
033: import com.sun.jbi.binding.file.util.StringTranslator;
034:
035: import java.util.ArrayList;
036: import java.util.Iterator;
037: import java.util.NoSuchElementException;
038: import java.util.logging.Logger;
039:
040: /**
041: * This class manages a pool of worker threads. The Work Manager uses this
042: * class to execute the command in different threads.
043: *
044: * @author Sun Microsystems, Inc.
045: */
046: class WorkThreadPool implements FileBindingResources {
047: /**
048: * Default Maximum Thread count.
049: */
050: private static final int MAX_THREAD_COUNT = 10;
051:
052: /**
053: * Default Minimum Thread count.
054: */
055: private static final int MIN_THREAD_COUNT = 1;
056:
057: /**
058: * Handle to the busy thread pool.
059: */
060: private ArrayList mBusyThreadPool;
061:
062: /**
063: * Handle to the free thread pool.
064: */
065: private ArrayList mFreeThreadPool;
066:
067: /**
068: * Contains a list of threads on which we are waiting for response.
069: */
070: private ArrayList mThreadWaitList;
071:
072: /**
073: * Internal handle to the logger instance.
074: */
075: private Logger mLog;
076:
077: /**
078: * Internal variable which indicates that workethread pool has been
079: * instructed to stop all its threads.
080: */
081: private String mState;
082:
083: /**
084: * i18n
085: */
086: private StringTranslator mTranslator;
087:
088: /**
089: * Maximum Thread count.
090: */
091: private int mMaxThreadCount = MAX_THREAD_COUNT;
092:
093: /**
094: * Minimum Thread count.
095: */
096: private int mMinThreadCount = MIN_THREAD_COUNT;
097:
098: /**
099: * Creates a new instance of WorkThreadPool.
100: *
101: * @param minThreadCount - minimum thread limit
102: * @param maxThreadCount - maximum thread limit
103: */
104: WorkThreadPool(int minThreadCount, int maxThreadCount) {
105: mLog = FileBindingContext.getInstance().getLogger();
106: mMaxThreadCount = maxThreadCount;
107: mMinThreadCount = minThreadCount;
108: mThreadWaitList = new ArrayList();
109: mTranslator = new StringTranslator();
110: mState = "NEW";
111: }
112:
113: /**
114: * Creates a new instance of WorkThreadPool.
115: */
116: WorkThreadPool() {
117: mLog = FileBindingContext.getInstance().getLogger();
118: mTranslator = new StringTranslator();
119: mThreadWaitList = new ArrayList();
120: mState = "NEW";
121: }
122:
123: /**
124: * Returns the count of threads in busy thread pool
125: *
126: * @return int count of number of busy threads
127: */
128: public int getBusyThreads() {
129: return mBusyThreadPool.size();
130: }
131:
132: /**
133: * Sets the log file.
134: *
135: * @param logFile log file.
136: */
137: public void setLogger(String logFile) {
138: mLog = mLog.getLogger(logFile);
139: }
140:
141: /**
142: * Sets the maximum thread count
143: *
144: * @param count max thread.
145: */
146: public void setMaxThreads(int count) {
147: mMaxThreadCount = count;
148: }
149:
150: /**
151: * Sets the minimum thread count.
152: *
153: * @param count min thread count.
154: */
155: public void setMinThreads(int count) {
156: mMinThreadCount = count;
157: }
158:
159: /**
160: * This method exits when all the threds complete.
161: */
162: public synchronized void exitWhenBusyThreadsDone() {
163: while (mBusyThreadPool.size() != 0) {
164: try {
165: wait();
166: } catch (Exception e) {
167: mLog.severe(mTranslator.getString(
168: FBC_THREADS_BUSYTHREADS, String
169: .valueOf(mBusyThreadPool.size())));
170:
171: continue;
172: }
173: }
174: }
175:
176: /**
177: * Gets a free thread from the free thread pool.
178: *
179: * @return a worker thread instance if a free thread exists; otherwise
180: * null.
181: *
182: * @throws IllegalStateException when the thread pool is not running.
183: */
184: synchronized WorkThread getFreeThread() {
185: WorkThread workerThread = null;
186:
187: if (!mState.equals("START")) {
188: throw new IllegalStateException(
189: "Thread pool is not running");
190: }
191:
192: if (mFreeThreadPool.size() > 0) {
193: workerThread = (WorkThread) mFreeThreadPool.get(0);
194: mFreeThreadPool.remove(0);
195: mBusyThreadPool.add(workerThread);
196: } else if ((mBusyThreadPool.size()) < mMaxThreadCount) {
197: workerThread = new WorkThread(this );
198: mBusyThreadPool.add(workerThread);
199: new Thread(workerThread).start();
200: } else {
201: try {
202: mThreadWaitList.add(Thread.currentThread());
203: wait();
204: mThreadWaitList.remove(Thread.currentThread());
205:
206: if (mFreeThreadPool.size() > 0) {
207: workerThread = (WorkThread) mFreeThreadPool.get(0);
208: mFreeThreadPool.remove(0);
209: mBusyThreadPool.add(workerThread);
210: }
211: } catch (InterruptedException interupException) {
212: workerThread = null;
213: } catch (Exception exception) {
214: mLog.warning("Details : " + exception.toString());
215: workerThread = null;
216: }
217: }
218:
219: return workerThread;
220: }
221:
222: /**
223: * Cleans up the free and busy threads.
224: *
225: * @throws IllegalStateException exception.
226: */
227: synchronized void cleanup() {
228: if (!(mState.equals("INIT") || mState.equals("STOP"))) {
229: throw new IllegalStateException(
230: "Thread Pool is still active");
231: }
232:
233: mFreeThreadPool.clear();
234: mBusyThreadPool.clear();
235: }
236:
237: /**
238: * Initializes the instance.
239: *
240: * @throws IllegalStateException
241: */
242: void init() {
243: if (!mState.equals("NEW")) {
244: throw new IllegalStateException(
245: "Threadpool has already been initialized");
246: }
247:
248: mFreeThreadPool = new ArrayList(mMaxThreadCount);
249: mBusyThreadPool = new ArrayList(mMaxThreadCount);
250:
251: for (int i = 0; i < mMinThreadCount; i++) {
252: mFreeThreadPool.add(new WorkThread(this ));
253: }
254:
255: mState = "INIT";
256: }
257:
258: /**
259: * Release the thread to the free pool. This method is used by worker
260: * threads to notify that it has completed processing its command.
261: *
262: * @param thread - worker thread instance.
263: *
264: * @throws IllegalStateException
265: * @throws NoSuchElementException
266: */
267: synchronized void releaseThread(WorkThread thread) {
268: if (!mState.equals("START")) {
269: throw new IllegalStateException(
270: "Thread pool is not running");
271: }
272:
273: int threadIndex = mBusyThreadPool.indexOf(thread);
274:
275: if (threadIndex != -1) {
276: mBusyThreadPool.remove(threadIndex);
277:
278: if (mFreeThreadPool.size() < mMinThreadCount) {
279: mFreeThreadPool.add(thread);
280:
281: // Notify the manager to indicate that a thread has been released
282: // This is ignored if no one are waiting for a free thread.
283: try {
284: notifyAll();
285: } catch (IllegalMonitorStateException exp) {
286: mLog.severe("Details :" + exp.toString());
287: }
288: } else {
289: thread.stop();
290: }
291:
292: if (mBusyThreadPool.size() == 0) {
293: try {
294: notify();
295: } catch (IllegalMonitorStateException exp) {
296: mLog.severe("Details :" + exp.toString());
297: }
298: }
299: } else {
300: throw new NoSuchElementException("Thread "
301: + thread.getName() + " cannot be found");
302: }
303: }
304:
305: /**
306: * Start the free threads.
307: *
308: * @throws IllegalStateException
309: */
310: void start() {
311: WorkThread workerThread;
312:
313: if (!(mState.equals("INIT") || mState.equals("STOP"))) {
314: throw new IllegalStateException(
315: "Thread pool has already been started");
316: }
317:
318: for (Iterator iter = mFreeThreadPool.iterator(); iter.hasNext();) {
319: workerThread = (WorkThread) iter.next();
320: new Thread(workerThread).start();
321: }
322:
323: for (Iterator iter = mBusyThreadPool.iterator(); iter.hasNext();) {
324: workerThread = (WorkThread) iter.next();
325: new Thread(workerThread).start();
326: }
327:
328: mState = "START";
329: }
330:
331: /**
332: * Stops the free and busy threads.
333: *
334: * @throws IllegalStateException exception.
335: */
336: synchronized void stop() {
337: mLog.fine(mTranslator.getString(FBC_THREADS_POOL_STOP));
338:
339: WorkThread workerThread;
340:
341: if (!mState.equals("START")) {
342: throw new IllegalStateException(
343: "Threadpool has not been started");
344: }
345:
346: for (Iterator iter = mFreeThreadPool.iterator(); iter.hasNext();) {
347: workerThread = (WorkThread) iter.next();
348: workerThread.stop();
349: }
350:
351: for (Iterator iter = mBusyThreadPool.iterator(); iter.hasNext();) {
352: workerThread = (WorkThread) iter.next();
353: workerThread.stop();
354: }
355:
356: Thread waitThread = null;
357:
358: for (Iterator iter = mThreadWaitList.iterator(); iter.hasNext();) {
359: waitThread = (Thread) iter.next();
360: waitThread.interrupt();
361: }
362:
363: mState = "STOP";
364: }
365: }
|