001: /*
002: * hgcommons 7
003: * Hammurapi Group Common Library
004: * Copyright (C) 2003 Hammurapi Group
005: *
006: * This program is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2 of the License, or (at your option) any later version.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019: *
020: * URL: http://www.hammurapi.biz/hammurapi-biz/ef/xmenu/hammurapi-group/products/products/hgcommons/index.html
021: * e-Mail: support@hammurapi.biz
022: */
023: package biz.hammurapi.util;
024:
025: import java.util.LinkedList;
026:
027: import biz.hammurapi.config.ComponentBase;
028: import biz.hammurapi.config.ConfigurationException;
029:
030: /**
031: * Distributes work among multiple threads.
032: * @author Pavel Vlasov
033: * @revision $Revision$
034: */
035: public class ThreadPool extends ComponentBase implements Worker {
036:
037: private int numberOfThreads = 10;
038: private int priority = Thread.NORM_PRIORITY;
039: private ExceptionSink exceptionSink;
040: private int maxQueue = 10;
041: private boolean stopped;
042:
043: public ThreadPool() {
044: // Default constructor
045: }
046:
047: /**
048: * @param numberOfThreads Number of threads to create.
049: * @param priority Threads priority.
050: * @param maxQueue Maximum number of jobs in execution queue. When
051: * execution queue reaches its maximum post() processes job in the invoking thread. Values <1 mean no limit.
052: * @param exceptionSink
053: */
054: public ThreadPool(int numberOfThreads, int priority, int maxQueue,
055: ExceptionSink exceptionSink) {
056: super ();
057: this .numberOfThreads = numberOfThreads;
058: this .priority = priority;
059: this .exceptionSink = exceptionSink;
060: this .maxQueue = maxQueue;
061: }
062:
063: public ThreadPool(int numberOfThreads, int priority, int maxQueue,
064: ExceptionSink exceptionSink, String name) {
065: this (numberOfThreads, priority, maxQueue, exceptionSink);
066: if (name != null) {
067: this .name = name;
068: }
069: }
070:
071: private LinkedList jobQueue = new LinkedList();
072: private int[] threads = { 0 };
073: private String name = toString();
074:
075: public boolean post(Runnable job) {
076: int queueSize;
077: synchronized (jobQueue) {
078: if (stopped) {
079: return false;
080: }
081:
082: queueSize = jobQueue.size();
083:
084: if (maxQueue == 0 || queueSize < maxQueue) {
085: // Add job to processing queue
086: jobQueue.add(job);
087: jobQueue.notify(); // wake up one thread.
088:
089: addMeasurement("queue", queueSize, 0);
090: addMeasurement("post", 1, 0);
091: return true;
092: }
093: }
094:
095: addMeasurement("queue", queueSize, 0);
096: addMeasurement("post", 1, 0);
097:
098: // process in current thread.
099: process(job);
100: return true;
101: }
102:
103: private class WorkerThread extends Thread {
104:
105: public void run() {
106: synchronized (threads) {
107: threads[0]++;
108: }
109:
110: try {
111: while (true) {
112: Runnable job;
113: int queueSize;
114: synchronized (jobQueue) {
115: while (!stopped && jobQueue.isEmpty()) {
116: try {
117: jobQueue.wait();
118: } catch (InterruptedException e) {
119: if (exceptionSink == null) {
120: e.printStackTrace();
121: } else {
122: exceptionSink.consume(this , e);
123: }
124: return;
125: }
126: }
127:
128: if (stopped && jobQueue.isEmpty()) {
129: return;
130: }
131:
132: job = (Runnable) jobQueue.removeFirst();
133:
134: queueSize = jobQueue.size();
135: }
136:
137: addMeasurement("queue", queueSize, 0);
138:
139: process(job);
140: }
141: } finally {
142: synchronized (threads) {
143: threads[0]--;
144: if (threads[0] <= 0) {
145: threads.notifyAll();
146: }
147: }
148: }
149: }
150: }
151:
152: public void start() throws ConfigurationException {
153: for (int i = 0; i < numberOfThreads; i++) {
154: Thread th = new WorkerThread();
155: th.setPriority(priority);
156: th.setName(this .name + "-pool-thread-" + i);
157: th.start();
158: }
159: }
160:
161: /**
162: * Creates threads to replace terminated threads.
163: * Client posting jobs to thread pool may implement liveness check routines and
164: * terminate pool threads in a job hangs. This method allows the client code
165: * to replenish the thread pool after termination of one of worker threads.
166: */
167: public void replenish() {
168: if (!stopped) {
169: synchronized (threads) {
170: for (int i = threads[0]; i < numberOfThreads; i++) {
171: Thread th = new WorkerThread();
172: th.setPriority(priority);
173: th.setName(this .name + "-pool-thread-" + i);
174: th.start();
175: }
176: }
177: }
178: }
179:
180: public void stop() throws ConfigurationException {
181: synchronized (threads) {
182: stopped = true;
183: synchronized (jobQueue) {
184: jobQueue.notifyAll();
185: }
186:
187: while (threads[0] > 0) {
188: try {
189: threads.wait();
190: } catch (InterruptedException e) {
191: throw new ConfigurationException(
192: "Stop() interrupted", e);
193: }
194: }
195: }
196: }
197:
198: /**
199: * @param job
200: */
201: private void process(Runnable job) {
202: long start = System.currentTimeMillis();
203: try {
204: job.run();
205: } catch (Exception e) {
206: if (exceptionSink == null) {
207: e.printStackTrace();
208: } else {
209: exceptionSink.consume(job, e);
210: }
211: } finally {
212: long now = System.currentTimeMillis();
213: addMeasurement("run", now - start, now);
214: }
215: }
216:
217: public int getMaxQueue() {
218: return maxQueue;
219: }
220:
221: public void setMaxQueue(int maxQueue) {
222: this .maxQueue = maxQueue;
223: }
224:
225: public String getName() {
226: return name;
227: }
228:
229: public void setName(String name) {
230: this .name = name;
231: }
232:
233: public int getNumberOfThreads() {
234: return numberOfThreads;
235: }
236:
237: public void setNumberOfThreads(int numberOfThreads) {
238: this .numberOfThreads = numberOfThreads;
239: }
240:
241: public int getPriority() {
242: return priority;
243: }
244:
245: public void setPriority(int priority) {
246: this .priority = priority;
247: }
248:
249: public void setExceptionSink(ExceptionSink exceptionSink) {
250: this.exceptionSink = exceptionSink;
251: }
252:
253: }
|