001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.scheduling.backportconcurrent;
018:
019: import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
020: import edu.emory.mathcs.backport.java.util.concurrent.Executor;
021: import edu.emory.mathcs.backport.java.util.concurrent.Executors;
022: import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
023: import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException;
024: import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler;
025: import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
026: import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
027: import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
028: import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
029: import org.apache.commons.logging.Log;
030: import org.apache.commons.logging.LogFactory;
031:
032: import org.springframework.beans.factory.BeanNameAware;
033: import org.springframework.beans.factory.DisposableBean;
034: import org.springframework.beans.factory.InitializingBean;
035: import org.springframework.core.task.TaskRejectedException;
036: import org.springframework.scheduling.SchedulingTaskExecutor;
037: import org.springframework.util.Assert;
038:
039: /**
040: * JavaBean that allows for configuring a JSR-166 backport
041: * {@link edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor} in bean
042: * style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", "queueCapacity"
043: * properties), exposing it as a Spring {@link org.springframework.core.task.TaskExecutor}.
044: * This is an alternative to configuring a ThreadPoolExecutor instance directly using
045: * constructor injection, with a separate {@link ConcurrentTaskExecutor} adapter wrapping it.
046: *
047: * <p>For any custom needs, in particular for defining a
048: * {@link edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor},
049: * it is recommended to use a straight definition of the Executor instance or a
050: * factory method definition that points to the JSR-166 backport
051: * {@link edu.emory.mathcs.backport.java.util.concurrent.Executors} class.
052: * To expose such a raw Executor as a Spring {@link org.springframework.core.task.TaskExecutor},
053: * simply wrap it with a {@link ConcurrentTaskExecutor} adapter.
054: *
055: * <p><b>NOTE:</b> This class implements Spring's
056: * {@link org.springframework.core.task.TaskExecutor} interface as well as
057: * the JSR-166 {@link edu.emory.mathcs.backport.java.util.concurrent.Executor}
058: * interface, with the former being the primary interface, the other just
059: * serving as secondary convenience. For this reason, the exception handling
060: * follows the TaskExecutor contract rather than the Executor contract, in
061: * particular regarding the {@link org.springframework.core.task.TaskRejectedException}.
062: *
063: * @author Juergen Hoeller
064: * @since 2.0.3
065: * @see org.springframework.core.task.TaskExecutor
066: * @see edu.emory.mathcs.backport.java.util.concurrent.Executor
067: * @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor
068: * @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor
069: * @see edu.emory.mathcs.backport.java.util.concurrent.Executors
070: * @see ConcurrentTaskExecutor
071: */
072: public class ThreadPoolTaskExecutor implements SchedulingTaskExecutor,
073: Executor, BeanNameAware, InitializingBean, DisposableBean {
074:
075: protected final Log logger = LogFactory.getLog(getClass());
076:
077: private final Object poolSizeMonitor = new Object();
078:
079: private int corePoolSize = 1;
080:
081: private int maxPoolSize = Integer.MAX_VALUE;
082:
083: private int keepAliveSeconds = 60;
084:
085: private int queueCapacity = Integer.MAX_VALUE;
086:
087: private ThreadFactory threadFactory = Executors
088: .defaultThreadFactory();
089:
090: private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
091:
092: private String beanName;
093:
094: private ThreadPoolExecutor threadPoolExecutor;
095:
096: /**
097: * Set the ThreadPoolExecutor's core pool size.
098: * Default is 1.
099: * <p><b>This setting can be modified at runtime, for example through JMX.</b>
100: */
101: public void setCorePoolSize(int corePoolSize) {
102: synchronized (this .poolSizeMonitor) {
103: this .corePoolSize = corePoolSize;
104: if (this .threadPoolExecutor != null) {
105: this .threadPoolExecutor.setCorePoolSize(corePoolSize);
106: }
107: }
108: }
109:
110: /**
111: * Return the ThreadPoolExecutor's core pool size.
112: */
113: public int getCorePoolSize() {
114: synchronized (this .poolSizeMonitor) {
115: return this .corePoolSize;
116: }
117: }
118:
119: /**
120: * Set the ThreadPoolExecutor's maximum pool size.
121: * Default is <code>Integer.MAX_VALUE</code>.
122: * <p><b>This setting can be modified at runtime, for example through JMX.</b>
123: */
124: public void setMaxPoolSize(int maxPoolSize) {
125: synchronized (this .poolSizeMonitor) {
126: this .maxPoolSize = maxPoolSize;
127: if (this .threadPoolExecutor != null) {
128: this .threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
129: }
130: }
131: }
132:
133: /**
134: * Return the ThreadPoolExecutor's maximum pool size.
135: */
136: public int getMaxPoolSize() {
137: synchronized (this .poolSizeMonitor) {
138: return this .maxPoolSize;
139: }
140: }
141:
142: /**
143: * Set the ThreadPoolExecutor's keep-alive seconds.
144: * Default is 60.
145: * <p><b>This setting can be modified at runtime, for example through JMX.</b>
146: */
147: public void setKeepAliveSeconds(int keepAliveSeconds) {
148: synchronized (this .poolSizeMonitor) {
149: this .keepAliveSeconds = keepAliveSeconds;
150: if (this .threadPoolExecutor != null) {
151: this .threadPoolExecutor.setKeepAliveTime(
152: keepAliveSeconds, TimeUnit.SECONDS);
153: }
154: }
155: }
156:
157: /**
158: * Return the ThreadPoolExecutor's keep-alive seconds.
159: */
160: public int getKeepAliveSeconds() {
161: synchronized (this .poolSizeMonitor) {
162: return this .keepAliveSeconds;
163: }
164: }
165:
166: /**
167: * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
168: * Default is <code>Integer.MAX_VALUE</code>.
169: * <p>Any positive value will lead to a LinkedBlockingQueue instance;
170: * any other value will lead to a SynchronousQueue instance.
171: * @see edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue
172: * @see edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue
173: */
174: public void setQueueCapacity(int queueCapacity) {
175: this .queueCapacity = queueCapacity;
176: }
177:
178: /**
179: * Set the ThreadFactory to use for the ThreadPoolExecutor's thread pool.
180: * Default is the ThreadPoolExecutor's default thread factory.
181: * @see edu.emory.mathcs.backport.java.util.concurrent.Executors#defaultThreadFactory()
182: */
183: public void setThreadFactory(ThreadFactory threadFactory) {
184: this .threadFactory = (threadFactory != null ? threadFactory
185: : Executors.defaultThreadFactory());
186: }
187:
188: /**
189: * Set the RejectedExecutionHandler to use for the ThreadPoolExecutor.
190: * Default is the ThreadPoolExecutor's default abort policy.
191: * @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.AbortPolicy
192: */
193: public void setRejectedExecutionHandler(
194: RejectedExecutionHandler rejectedExecutionHandler) {
195: this .rejectedExecutionHandler = (rejectedExecutionHandler != null ? rejectedExecutionHandler
196: : new ThreadPoolExecutor.AbortPolicy());
197: }
198:
199: public void setBeanName(String name) {
200: this .beanName = name;
201: }
202:
203: /**
204: * Calls <code>initialize()</code> after the container applied all property values.
205: * @see #initialize()
206: */
207: public void afterPropertiesSet() {
208: initialize();
209: }
210:
211: /**
212: * Creates the BlockingQueue and the ThreadPoolExecutor.
213: * @see #createQueue
214: */
215: public void initialize() {
216: if (logger.isInfoEnabled()) {
217: logger.info("Initializing ThreadPoolExecutor"
218: + (this .beanName != null ? " '" + this .beanName
219: + "'" : ""));
220: }
221: BlockingQueue queue = createQueue(this .queueCapacity);
222: this .threadPoolExecutor = new ThreadPoolExecutor(
223: this .corePoolSize, this .maxPoolSize,
224: this .keepAliveSeconds, TimeUnit.SECONDS, queue,
225: this .threadFactory, this .rejectedExecutionHandler);
226: }
227:
228: /**
229: * Create the BlockingQueue to use for the ThreadPoolExecutor.
230: * <p>A LinkedBlockingQueue instance will be created for a positive
231: * capacity value; a SynchronousQueue else.
232: * @param queueCapacity the specified queue capacity
233: * @return the BlockingQueue instance
234: * @see edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue
235: * @see edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue
236: */
237: protected BlockingQueue createQueue(int queueCapacity) {
238: if (queueCapacity > 0) {
239: return new LinkedBlockingQueue(queueCapacity);
240: } else {
241: return new SynchronousQueue();
242: }
243: }
244:
245: /**
246: * Return the underlying ThreadPoolExecutor for native access.
247: * @return the underlying ThreadPoolExecutor (never <code>null</code>)
248: * @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't been initialized yet
249: */
250: public ThreadPoolExecutor getThreadPoolExecutor()
251: throws IllegalStateException {
252: Assert.state(this .threadPoolExecutor != null,
253: "ThreadPoolTaskExecutor not initialized");
254: return this .threadPoolExecutor;
255: }
256:
257: /**
258: * Implementation of both the JSR-166 backport Executor interface and the Spring
259: * TaskExecutor interface, delegating to the ThreadPoolExecutor instance.
260: * @see edu.emory.mathcs.backport.java.util.concurrent.Executor#execute(Runnable)
261: * @see org.springframework.core.task.TaskExecutor#execute(Runnable)
262: */
263: public void execute(Runnable task) {
264: Executor executor = getThreadPoolExecutor();
265: try {
266: executor.execute(task);
267: } catch (RejectedExecutionException ex) {
268: throw new TaskRejectedException("Executor [" + executor
269: + "] did not accept task: " + task, ex);
270: }
271: }
272:
273: /**
274: * This task executor prefers short-lived work units.
275: */
276: public boolean prefersShortLivedTasks() {
277: return true;
278: }
279:
280: /**
281: * Return the current pool size.
282: * @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#getPoolSize()
283: */
284: public int getPoolSize() {
285: return getThreadPoolExecutor().getPoolSize();
286: }
287:
288: /**
289: * Return the number of currently active threads.
290: * @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#getActiveCount()
291: */
292: public int getActiveCount() {
293: return getThreadPoolExecutor().getActiveCount();
294: }
295:
296: /**
297: * Calls <code>shutdown</code> when the BeanFactory destroys
298: * the task executor instance.
299: * @see #shutdown()
300: */
301: public void destroy() {
302: shutdown();
303: }
304:
305: /**
306: * Perform a shutdown on the ThreadPoolExecutor.
307: * @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#shutdown()
308: */
309: public void shutdown() {
310: if (logger.isInfoEnabled()) {
311: logger.info("Shutting down ThreadPoolExecutor"
312: + (this .beanName != null ? " '" + this .beanName
313: + "'" : ""));
314: }
315: this.threadPoolExecutor.shutdown();
316: }
317:
318: }
|