001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */package org.apache.geronimo.pool;
017:
018: import java.util.Collections;
019: import java.util.HashMap;
020: import java.util.Iterator;
021: import java.util.Map;
022:
023: import javax.management.MalformedObjectNameException;
024: import javax.management.ObjectName;
025: import javax.management.j2ee.statistics.BoundedRangeStatistic;
026: import javax.management.j2ee.statistics.CountStatistic;
027: import javax.management.j2ee.statistics.Stats;
028:
029: import java.util.concurrent.ThreadPoolExecutor;
030: import java.util.concurrent.RejectedExecutionHandler;
031: import java.util.concurrent.RejectedExecutionException;
032: import java.util.concurrent.ThreadFactory;
033: import java.util.concurrent.SynchronousQueue;
034: import java.util.concurrent.TimeUnit;
035:
036: import org.apache.geronimo.gbean.GBeanInfo;
037: import org.apache.geronimo.gbean.GBeanInfoBuilder;
038: import org.apache.geronimo.gbean.GBeanLifecycle;
039:
040: import org.apache.geronimo.management.J2EEManagedObject;
041: import org.apache.geronimo.management.StatisticsProvider;
042: import org.apache.geronimo.management.geronimo.stats.ThreadPoolStats;
043: import org.apache.geronimo.management.stats.BoundedRangeStatisticImpl;
044: import org.apache.geronimo.management.stats.CountStatisticImpl;
045: import org.apache.geronimo.management.stats.StatsImpl;
046:
047: /**
048: * @version $Rev: 598759 $ $Date: 2007-11-27 12:32:26 -0800 (Tue, 27 Nov 2007) $
049: */
050: public class ThreadPool implements GeronimoExecutor, GBeanLifecycle,
051: J2EEManagedObject, StatisticsProvider {
052: private ThreadPoolExecutor executor;
053: private ClassLoader classLoader;
054: private ObjectName objectName;
055: private boolean waitWhenBlocked;
056:
057: // Statistics-related fields follow
058: private boolean statsActive = true;
059: private PoolStatsImpl stats = new PoolStatsImpl();
060: private Map clients = new HashMap();
061:
062: public ThreadPool(int minPoolSize, int maxPoolSize,
063: String poolName, long keepAliveTime,
064: ClassLoader classLoader, String objectName) {
065: ThreadPoolExecutor p = new ThreadPoolExecutor(
066: minPoolSize, // core size
067: maxPoolSize, // max size
068: keepAliveTime, TimeUnit.MILLISECONDS,
069: new SynchronousQueue());
070:
071: p
072: .setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
073: p.setThreadFactory(new ThreadPoolThreadFactory(poolName,
074: classLoader));
075:
076: try {
077: this .objectName = ObjectName.getInstance(objectName);
078: } catch (MalformedObjectNameException e) {
079: throw new IllegalStateException(
080: "Bad object name injected: " + e.getMessage(), e);
081: }
082:
083: executor = p;
084: this .classLoader = classLoader;
085:
086: // set pool stats start time
087: stats.setStartTime();
088: }
089:
090: public String getName() {
091: return objectName.getKeyProperty("name");
092: }
093:
094: public String getObjectName() {
095: return objectName.getCanonicalName();
096: }
097:
098: public boolean isEventProvider() {
099: return true;
100: }
101:
102: public boolean isStateManageable() {
103: return true;
104: }
105:
106: public boolean isStatisticsProvider() {
107: return true;
108: }
109:
110: public Stats getStats() {
111: stats.threadsInUse.setLowerBound(0);
112: stats.threadsInUse.setUpperBound(executor.getMaximumPoolSize());
113: int inUse = executor.getPoolSize();
114: stats.threadsInUse.setCurrent(inUse);
115: if (inUse < stats.threadsInUse.getLowWaterMark()) {
116: stats.threadsInUse.setLowWaterMark(inUse);
117: }
118: if (inUse > stats.threadsInUse.getHighWaterMark()) {
119: stats.threadsInUse.setHighWaterMark(inUse);
120: }
121: if (statsActive) {
122: synchronized (this ) {
123: stats.prepareConsumers(clients);
124: }
125: } else {
126: stats.prepareConsumers(Collections.EMPTY_MAP);
127: }
128: // set last sapmle time
129: stats.setLastSampleTime();
130: return stats;
131: }
132:
133: /**
134: * Reset all statistics in PoolStatsImpl object
135: */
136: public void resetStats() {
137: stats.threadsInUse.setLowerBound(0);
138: stats.threadsInUse.setUpperBound(0);
139: stats.threadsInUse.setCurrent(0);
140: stats.threadsInUse.setLowWaterMark(0);
141: stats.threadsInUse.setHighWaterMark(0);
142: stats.setStartTime();
143: }
144:
145: public static class PoolStatsImpl extends StatsImpl implements
146: ThreadPoolStats {
147: private BoundedRangeStatisticImpl threadsInUse = new BoundedRangeStatisticImpl(
148: "Threads In Use", "",
149: "The number of threads in use by this thread pool");
150: private Map consumers = new HashMap();
151:
152: public PoolStatsImpl() {
153: addStat(threadsInUse.getName(), threadsInUse);
154: }
155:
156: public BoundedRangeStatistic getThreadsInUse() {
157: return threadsInUse;
158: }
159:
160: public CountStatistic getCountForConsumer(String consumer) {
161: return (CountStatistic) consumers.get(consumer);
162: }
163:
164: public String[] getThreadConsumers() {
165: return (String[]) consumers.keySet().toArray(
166: new String[consumers.size()]);
167: }
168:
169: public void prepareConsumers(Map clients) {
170: Map result = new HashMap();
171: for (Iterator it = clients.keySet().iterator(); it
172: .hasNext();) {
173: String client = (String) it.next();
174: Integer count = (Integer) clients.get(client);
175: CountStatisticImpl stat = (CountStatisticImpl) consumers
176: .get(client);
177: if (stat == null) {
178: stat = new CountStatisticImpl("Threads for "
179: + client, "",
180: "The number of threads used by the client known as '"
181: + client + "'", count.intValue());
182: addStat(stat.getName(), stat);
183: } else {
184: consumers.remove(client);
185: stat.setCount(count.intValue());
186: }
187: result.put(client, stat);
188: }
189: for (Iterator it = consumers.keySet().iterator(); it
190: .hasNext();) {
191: String client = (String) it.next();
192: removeStat(((CountStatisticImpl) consumers.get(client))
193: .getName());
194: }
195: consumers = result;
196: }
197: }
198:
199: public int getPoolSize() {
200: return executor.getPoolSize();
201: }
202:
203: public int getMaximumPoolSize() {
204: return executor.getMaximumPoolSize();
205: }
206:
207: public int getActiveCount() {
208: return executor.getActiveCount();
209: }
210:
211: public boolean awaitTermination(long timeout, TimeUnit unit)
212: throws InterruptedException {
213: return executor.awaitTermination(timeout, unit);
214: }
215:
216: public void execute(Runnable command) {
217: execute("Unknown", command);
218: }
219:
220: public void execute(final String consumerName,
221: final Runnable runnable) {
222: Runnable command;
223: if (statsActive) {
224: command = new Runnable() {
225: public void run() {
226: startWork(consumerName);
227: try {
228: runnable.run();
229: } finally {
230: finishWork(consumerName);
231: }
232: }
233: };
234: } else {
235: command = runnable;
236: }
237:
238: ThreadPoolExecutor p;
239: synchronized (this ) {
240: p = executor;
241: }
242: if (p == null) {
243: throw new IllegalStateException(
244: "ThreadPool has been stopped");
245: }
246: Runnable task = new ContextClassLoaderRunnable(command,
247: classLoader);
248: p.execute(task);
249: }
250:
251: private synchronized void startWork(String consumerName) {
252: Integer test = (Integer) clients.get(consumerName);
253: if (test == null) {
254: clients.put(consumerName, new Integer(1));
255: } else {
256: clients.put(consumerName, new Integer(test.intValue() + 1));
257: }
258: }
259:
260: private synchronized void finishWork(String consumerName) {
261: Integer test = (Integer) clients.get(consumerName);
262: if (test.intValue() == 1) {
263: clients.remove(consumerName);
264: } else {
265: clients.put(consumerName, new Integer(test.intValue() - 1));
266: }
267: }
268:
269: private static class WaitWhenBlockedPolicy implements
270: RejectedExecutionHandler {
271: public void rejectedExecution(Runnable r,
272: ThreadPoolExecutor executor)
273: throws RejectedExecutionException {
274: try {
275: executor.getQueue().put(r);
276: } catch (InterruptedException e) {
277: throw new RejectedExecutionException(e);
278: }
279: }
280: }
281:
282: public void setWaitWhenBlocked(boolean wait) {
283: waitWhenBlocked = wait;
284: if (wait) {
285: executor
286: .setRejectedExecutionHandler(new WaitWhenBlockedPolicy());
287: } else {
288: executor
289: .setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
290: }
291: }
292:
293: public boolean isWaitWhenBlocked() {
294: return waitWhenBlocked;
295: }
296:
297: public void doStart() throws Exception {
298: }
299:
300: public void doStop() throws Exception {
301: ThreadPoolExecutor p;
302: synchronized (this ) {
303: p = executor;
304: executor = null;
305: classLoader = null;
306: }
307: if (p != null) {
308: p.shutdownNow();
309: }
310: }
311:
312: public void doFail() {
313: try {
314: doStop();
315: } catch (Exception e) {
316: }
317: }
318:
319: private static final class ThreadPoolThreadFactory implements
320: ThreadFactory {
321: private final String poolName;
322: private final ClassLoader classLoader;
323:
324: private int nextWorkerID = 0;
325:
326: public ThreadPoolThreadFactory(String poolName,
327: ClassLoader classLoader) {
328: this .poolName = poolName;
329: this .classLoader = classLoader;
330: }
331:
332: public Thread newThread(Runnable arg0) {
333: Thread thread = new Thread(arg0, poolName + " "
334: + getNextWorkerID());
335: thread.setContextClassLoader(classLoader);
336: return thread;
337: }
338:
339: private synchronized int getNextWorkerID() {
340: return nextWorkerID++;
341: }
342: }
343:
344: private static final class ContextClassLoaderRunnable implements
345: Runnable {
346: private Runnable task;
347: private ClassLoader classLoader;
348:
349: public ContextClassLoaderRunnable(Runnable task,
350: ClassLoader classLoader) {
351: this .task = task;
352: this .classLoader = classLoader;
353: }
354:
355: public void run() {
356: Runnable myTask = task;
357: ClassLoader myClassLoader = classLoader;
358:
359: // clear fields so they can be garbage collected
360: task = null;
361: classLoader = null;
362:
363: if (myClassLoader != null) {
364: // we asumme the thread classloader is already set to our final class loader
365: // because the only to access the thread is wrapped with the Runnable or via the initial thread pool
366: try {
367: myTask.run();
368: } finally {
369: Thread.currentThread().setContextClassLoader(
370: myClassLoader);
371: }
372: }
373: }
374: }
375:
376: public static final GBeanInfo GBEAN_INFO;
377:
378: static {
379: GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic(
380: ThreadPool.class, "GBean");
381:
382: infoFactory.addAttribute("minPoolSize", int.class, true);
383: infoFactory.addAttribute("maxPoolSize", int.class, true);
384: infoFactory.addAttribute("poolName", String.class, true);
385: infoFactory.addAttribute("keepAliveTime", long.class, true);
386: infoFactory
387: .addAttribute("waitWhenBlocked", boolean.class, true);
388:
389: infoFactory.addAttribute("objectName", String.class, false);
390: infoFactory.addAttribute("classLoader", ClassLoader.class,
391: false);
392:
393: infoFactory.addInterface(GeronimoExecutor.class);
394:
395: infoFactory.setConstructor(new String[] { "minPoolSize",
396: "maxPoolSize", "poolName", "keepAliveTime",
397: "classLoader", "objectName" });
398:
399: GBEAN_INFO = infoFactory.getBeanInfo();
400: }
401:
402: public static GBeanInfo getGBeanInfo() {
403: return GBEAN_INFO;
404: }
405: }
|