001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.scheduling.concurrent;
018:
019: import java.util.concurrent.BlockingQueue;
020: import java.util.concurrent.Executor;
021: import java.util.concurrent.Executors;
022: import java.util.concurrent.LinkedBlockingQueue;
023: import java.util.concurrent.RejectedExecutionException;
024: import java.util.concurrent.RejectedExecutionHandler;
025: import java.util.concurrent.SynchronousQueue;
026: import java.util.concurrent.ThreadFactory;
027: import java.util.concurrent.ThreadPoolExecutor;
028: import java.util.concurrent.TimeUnit;
029:
030: import org.apache.commons.logging.Log;
031: import org.apache.commons.logging.LogFactory;
032:
033: import org.springframework.beans.factory.BeanNameAware;
034: import org.springframework.beans.factory.DisposableBean;
035: import org.springframework.beans.factory.InitializingBean;
036: import org.springframework.core.task.TaskRejectedException;
037: import org.springframework.scheduling.SchedulingTaskExecutor;
038: import org.springframework.util.Assert;
039:
040: /**
041: * JavaBean that allows for configuring a JDK 1.5 {@link java.util.concurrent.ThreadPoolExecutor}
042: * in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", "queueCapacity"
043: * properties), exposing it as a Spring {@link org.springframework.core.task.TaskExecutor}.
044: * This is an alternative to configuring a ThreadPoolExecutor instance directly using
045: * constructor injection, with a separate {@link ConcurrentTaskExecutor} adapter wrapping it.
046: *
047: * <p>For any custom needs, in particular for defining a
048: * {@link java.util.concurrent.ScheduledThreadPoolExecutor}, it is recommended to
049: * use a straight definition of the Executor instance or a factory method definition
050: * that points to the JDK 1.5 {@link java.util.concurrent.Executors} class.
051: * To expose such a raw Executor as a Spring {@link org.springframework.core.task.TaskExecutor},
052: * simply wrap it with a {@link ConcurrentTaskExecutor} adapter.
053: *
054: * <p><b>NOTE:</b> This class implements Spring's
055: * {@link org.springframework.core.task.TaskExecutor} interface as well as the JDK 1.5
056: * {@link java.util.concurrent.Executor} interface, with the former being the primary
057: * interface, the other just serving as secondary convenience. For this reason, the
058: * exception handling follows the TaskExecutor contract rather than the Executor contract,
059: * in particular regarding the {@link org.springframework.core.task.TaskRejectedException}.
060: *
061: * @author Juergen Hoeller
062: * @since 2.0
063: * @see org.springframework.core.task.TaskExecutor
064: * @see java.util.concurrent.Executor
065: * @see java.util.concurrent.ThreadPoolExecutor
066: * @see java.util.concurrent.ScheduledThreadPoolExecutor
067: * @see java.util.concurrent.Executors
068: * @see ConcurrentTaskExecutor
069: */
070: public class ThreadPoolTaskExecutor implements SchedulingTaskExecutor,
071: Executor, BeanNameAware, InitializingBean, DisposableBean {
072:
073: protected final Log logger = LogFactory.getLog(getClass());
074:
075: private final Object poolSizeMonitor = new Object();
076:
077: private int corePoolSize = 1;
078:
079: private int maxPoolSize = Integer.MAX_VALUE;
080:
081: private int keepAliveSeconds = 60;
082:
083: private int queueCapacity = Integer.MAX_VALUE;
084:
085: private ThreadFactory threadFactory = Executors
086: .defaultThreadFactory();
087:
088: private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
089:
090: private String beanName;
091:
092: private ThreadPoolExecutor threadPoolExecutor;
093:
094: /**
095: * Set the ThreadPoolExecutor's core pool size.
096: * Default is 1.
097: * <p><b>This setting can be modified at runtime, for example through JMX.</b>
098: */
099: public void setCorePoolSize(int corePoolSize) {
100: synchronized (this .poolSizeMonitor) {
101: this .corePoolSize = corePoolSize;
102: if (this .threadPoolExecutor != null) {
103: this .threadPoolExecutor.setCorePoolSize(corePoolSize);
104: }
105: }
106: }
107:
108: /**
109: * Return the ThreadPoolExecutor's core pool size.
110: */
111: public int getCorePoolSize() {
112: synchronized (this .poolSizeMonitor) {
113: return this .corePoolSize;
114: }
115: }
116:
117: /**
118: * Set the ThreadPoolExecutor's maximum pool size.
119: * Default is <code>Integer.MAX_VALUE</code>.
120: * <p><b>This setting can be modified at runtime, for example through JMX.</b>
121: */
122: public void setMaxPoolSize(int maxPoolSize) {
123: synchronized (this .poolSizeMonitor) {
124: this .maxPoolSize = maxPoolSize;
125: if (this .threadPoolExecutor != null) {
126: this .threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
127: }
128: }
129: }
130:
131: /**
132: * Return the ThreadPoolExecutor's maximum pool size.
133: */
134: public int getMaxPoolSize() {
135: synchronized (this .poolSizeMonitor) {
136: return this .maxPoolSize;
137: }
138: }
139:
140: /**
141: * Set the ThreadPoolExecutor's keep-alive seconds.
142: * Default is 60.
143: * <p><b>This setting can be modified at runtime, for example through JMX.</b>
144: */
145: public void setKeepAliveSeconds(int keepAliveSeconds) {
146: synchronized (this .poolSizeMonitor) {
147: this .keepAliveSeconds = keepAliveSeconds;
148: if (this .threadPoolExecutor != null) {
149: this .threadPoolExecutor.setKeepAliveTime(
150: keepAliveSeconds, TimeUnit.SECONDS);
151: }
152: }
153: }
154:
155: /**
156: * Return the ThreadPoolExecutor's keep-alive seconds.
157: */
158: public int getKeepAliveSeconds() {
159: synchronized (this .poolSizeMonitor) {
160: return this .keepAliveSeconds;
161: }
162: }
163:
164: /**
165: * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
166: * Default is <code>Integer.MAX_VALUE</code>.
167: * <p>Any positive value will lead to a LinkedBlockingQueue instance;
168: * any other value will lead to a SynchronousQueue instance.
169: * @see java.util.concurrent.LinkedBlockingQueue
170: * @see java.util.concurrent.SynchronousQueue
171: */
172: public void setQueueCapacity(int queueCapacity) {
173: this .queueCapacity = queueCapacity;
174: }
175:
176: /**
177: * Set the ThreadFactory to use for the ThreadPoolExecutor's thread pool.
178: * Default is the ThreadPoolExecutor's default thread factory.
179: * @see java.util.concurrent.Executors#defaultThreadFactory()
180: */
181: public void setThreadFactory(ThreadFactory threadFactory) {
182: this .threadFactory = (threadFactory != null ? threadFactory
183: : Executors.defaultThreadFactory());
184: }
185:
186: /**
187: * Set the RejectedExecutionHandler to use for the ThreadPoolExecutor.
188: * Default is the ThreadPoolExecutor's default abort policy.
189: * @see java.util.concurrent.ThreadPoolExecutor.AbortPolicy
190: */
191: public void setRejectedExecutionHandler(
192: RejectedExecutionHandler rejectedExecutionHandler) {
193: this .rejectedExecutionHandler = (rejectedExecutionHandler != null ? rejectedExecutionHandler
194: : new ThreadPoolExecutor.AbortPolicy());
195: }
196:
197: public void setBeanName(String name) {
198: this .beanName = name;
199: }
200:
201: /**
202: * Calls <code>initialize()</code> after the container applied all property values.
203: * @see #initialize()
204: */
205: public void afterPropertiesSet() {
206: initialize();
207: }
208:
209: /**
210: * Creates the BlockingQueue and the ThreadPoolExecutor.
211: * @see #createQueue
212: */
213: public void initialize() {
214: if (logger.isInfoEnabled()) {
215: logger.info("Initializing ThreadPoolExecutor"
216: + (this .beanName != null ? " '" + this .beanName
217: + "'" : ""));
218: }
219: BlockingQueue queue = createQueue(this .queueCapacity);
220: this .threadPoolExecutor = new ThreadPoolExecutor(
221: this .corePoolSize, this .maxPoolSize,
222: this .keepAliveSeconds, TimeUnit.SECONDS, queue,
223: this .threadFactory, this .rejectedExecutionHandler);
224: }
225:
226: /**
227: * Create the BlockingQueue to use for the ThreadPoolExecutor.
228: * <p>A LinkedBlockingQueue instance will be created for a positive
229: * capacity value; a SynchronousQueue else.
230: * @param queueCapacity the specified queue capacity
231: * @return the BlockingQueue instance
232: * @see java.util.concurrent.LinkedBlockingQueue
233: * @see java.util.concurrent.SynchronousQueue
234: */
235: protected BlockingQueue createQueue(int queueCapacity) {
236: if (queueCapacity > 0) {
237: return new LinkedBlockingQueue(queueCapacity);
238: } else {
239: return new SynchronousQueue();
240: }
241: }
242:
243: /**
244: * Return the underlying ThreadPoolExecutor for native access.
245: * @return the underlying ThreadPoolExecutor (never <code>null</code>)
246: * @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't been initialized yet
247: */
248: public ThreadPoolExecutor getThreadPoolExecutor()
249: throws IllegalStateException {
250: Assert.state(this .threadPoolExecutor != null,
251: "ThreadPoolTaskExecutor not initialized");
252: return this .threadPoolExecutor;
253: }
254:
255: /**
256: * Implementation of both the JDK 1.5 Executor interface and the Spring
257: * TaskExecutor interface, delegating to the ThreadPoolExecutor instance.
258: * @see java.util.concurrent.Executor#execute(Runnable)
259: * @see org.springframework.core.task.TaskExecutor#execute(Runnable)
260: */
261: public void execute(Runnable task) {
262: Executor executor = getThreadPoolExecutor();
263: try {
264: executor.execute(task);
265: } catch (RejectedExecutionException ex) {
266: throw new TaskRejectedException("Executor [" + executor
267: + "] did not accept task: " + task, ex);
268: }
269: }
270:
271: /**
272: * This task executor prefers short-lived work units.
273: */
274: public boolean prefersShortLivedTasks() {
275: return true;
276: }
277:
278: /**
279: * Return the current pool size.
280: * @see java.util.concurrent.ThreadPoolExecutor#getPoolSize()
281: */
282: public int getPoolSize() {
283: return getThreadPoolExecutor().getPoolSize();
284: }
285:
286: /**
287: * Return the number of currently active threads.
288: * @see java.util.concurrent.ThreadPoolExecutor#getActiveCount()
289: */
290: public int getActiveCount() {
291: return getThreadPoolExecutor().getActiveCount();
292: }
293:
294: /**
295: * Calls <code>shutdown</code> when the BeanFactory destroys
296: * the task executor instance.
297: * @see #shutdown()
298: */
299: public void destroy() {
300: shutdown();
301: }
302:
303: /**
304: * Perform a shutdown on the ThreadPoolExecutor.
305: * @see java.util.concurrent.ThreadPoolExecutor#shutdown()
306: */
307: public void shutdown() {
308: if (logger.isInfoEnabled()) {
309: logger.info("Shutting down ThreadPoolExecutor"
310: + (this .beanName != null ? " '" + this .beanName
311: + "'" : ""));
312: }
313: this.threadPoolExecutor.shutdown();
314: }
315:
316: }
|