001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.core;
019:
020: import java.util.Collection;
021: import java.util.concurrent.LinkedBlockingQueue;
022: import java.util.concurrent.ThreadFactory;
023: import java.util.concurrent.ThreadPoolExecutor;
024: import java.util.concurrent.TimeUnit;
025: import java.util.concurrent.atomic.AtomicInteger;
026:
027: import org.apache.catalina.Executor;
028: import org.apache.catalina.LifecycleException;
029: import org.apache.catalina.LifecycleListener;
030: import org.apache.catalina.util.LifecycleSupport;
031: import java.util.concurrent.RejectedExecutionException;
032:
033: public class StandardThreadExecutor implements Executor {
034:
035: // ---------------------------------------------- Properties
036: protected int threadPriority = Thread.NORM_PRIORITY;
037:
038: protected boolean daemon = true;
039:
040: protected String namePrefix = "tomcat-exec-";
041:
042: protected int maxThreads = 200;
043:
044: protected int minSpareThreads = 25;
045:
046: protected int maxIdleTime = 60000;
047:
048: protected ThreadPoolExecutor executor = null;
049:
050: protected String name;
051:
052: private LifecycleSupport lifecycle = new LifecycleSupport(this );
053:
054: // ---------------------------------------------- Constructors
055: public StandardThreadExecutor() {
056: //empty constructor for the digester
057: }
058:
059: // ---------------------------------------------- Public Methods
060: public void start() throws LifecycleException {
061: lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, null);
062: TaskQueue taskqueue = new TaskQueue();
063: TaskThreadFactory tf = new TaskThreadFactory(namePrefix);
064: lifecycle.fireLifecycleEvent(START_EVENT, null);
065: executor = new ThreadPoolExecutor(getMinSpareThreads(),
066: getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,
067: taskqueue, tf);
068: taskqueue.setParent((ThreadPoolExecutor) executor);
069: lifecycle.fireLifecycleEvent(AFTER_START_EVENT, null);
070: }
071:
072: public void stop() throws LifecycleException {
073: lifecycle.fireLifecycleEvent(BEFORE_STOP_EVENT, null);
074: lifecycle.fireLifecycleEvent(STOP_EVENT, null);
075: if (executor != null)
076: executor.shutdown();
077: executor = null;
078: lifecycle.fireLifecycleEvent(AFTER_STOP_EVENT, null);
079: }
080:
081: public void execute(Runnable command) {
082: if (executor != null) {
083: try {
084: executor.execute(command);
085: } catch (RejectedExecutionException rx) {
086: //there could have been contention around the queue
087: if (!((TaskQueue) executor.getQueue()).force(command))
088: throw new RejectedExecutionException();
089: }
090: } else
091: throw new IllegalStateException(
092: "StandardThreadPool not started.");
093: }
094:
095: public int getThreadPriority() {
096: return threadPriority;
097: }
098:
099: public boolean isDaemon() {
100:
101: return daemon;
102: }
103:
104: public String getNamePrefix() {
105: return namePrefix;
106: }
107:
108: public int getMaxIdleTime() {
109: return maxIdleTime;
110: }
111:
112: public int getMaxThreads() {
113: return maxThreads;
114: }
115:
116: public int getMinSpareThreads() {
117: return minSpareThreads;
118: }
119:
120: public String getName() {
121: return name;
122: }
123:
124: public void setThreadPriority(int threadPriority) {
125: this .threadPriority = threadPriority;
126: }
127:
128: public void setDaemon(boolean daemon) {
129: this .daemon = daemon;
130: }
131:
132: public void setNamePrefix(String namePrefix) {
133: this .namePrefix = namePrefix;
134: }
135:
136: public void setMaxIdleTime(int maxIdleTime) {
137: this .maxIdleTime = maxIdleTime;
138: }
139:
140: public void setMaxThreads(int maxThreads) {
141: this .maxThreads = maxThreads;
142: }
143:
144: public void setMinSpareThreads(int minSpareThreads) {
145: this .minSpareThreads = minSpareThreads;
146: }
147:
148: public void setName(String name) {
149: this .name = name;
150: }
151:
152: /**
153: * Add a LifecycleEvent listener to this component.
154: *
155: * @param listener The listener to add
156: */
157: public void addLifecycleListener(LifecycleListener listener) {
158: lifecycle.addLifecycleListener(listener);
159: }
160:
161: /**
162: * Get the lifecycle listeners associated with this lifecycle. If this
163: * Lifecycle has no listeners registered, a zero-length array is returned.
164: */
165: public LifecycleListener[] findLifecycleListeners() {
166: return lifecycle.findLifecycleListeners();
167: }
168:
169: /**
170: * Remove a LifecycleEvent listener from this component.
171: *
172: * @param listener The listener to remove
173: */
174: public void removeLifecycleListener(LifecycleListener listener) {
175: lifecycle.removeLifecycleListener(listener);
176: }
177:
178: // Statistics from the thread pool
179: public int getActiveCount() {
180: return (executor != null) ? executor.getActiveCount() : 0;
181: }
182:
183: public long getCompletedTaskCount() {
184: return (executor != null) ? executor.getCompletedTaskCount()
185: : 0;
186: }
187:
188: public int getCorePoolSize() {
189: return (executor != null) ? executor.getCorePoolSize() : 0;
190: }
191:
192: public int getLargestPoolSize() {
193: return (executor != null) ? executor.getLargestPoolSize() : 0;
194: }
195:
196: public int getPoolSize() {
197: return (executor != null) ? executor.getPoolSize() : 0;
198: }
199:
200: // ---------------------------------------------- TaskQueue Inner Class
201: class TaskQueue extends LinkedBlockingQueue<Runnable> {
202: ThreadPoolExecutor parent = null;
203:
204: public TaskQueue() {
205: super ();
206: }
207:
208: public TaskQueue(int initialCapacity) {
209: super (initialCapacity);
210: }
211:
212: public TaskQueue(Collection<? extends Runnable> c) {
213: super (c);
214: }
215:
216: public void setParent(ThreadPoolExecutor tp) {
217: parent = tp;
218: }
219:
220: public boolean force(Runnable o) {
221: if (parent.isShutdown())
222: throw new RejectedExecutionException(
223: "Executor not running, can't force a command into the queue");
224: return super .offer(o); //forces the item onto the queue, to be used if the task is rejected
225: }
226:
227: public boolean offer(Runnable o) {
228: //we can't do any checks
229: if (parent == null)
230: return super .offer(o);
231: //we are maxed out on threads, simply queue the object
232: if (parent.getPoolSize() == parent.getMaximumPoolSize())
233: return super .offer(o);
234: //we have idle threads, just add it to the queue
235: //this is an approximation, so it could use some tuning
236: if (parent.getActiveCount() < (parent.getPoolSize()))
237: return super .offer(o);
238: //if we have less threads than maximum force creation of a new thread
239: if (parent.getPoolSize() < parent.getMaximumPoolSize())
240: return false;
241: //if we reached here, we need to add it to the queue
242: return super .offer(o);
243: }
244: }
245:
246: // ---------------------------------------------- ThreadFactory Inner Class
247: class TaskThreadFactory implements ThreadFactory {
248: final ThreadGroup group;
249: final AtomicInteger threadNumber = new AtomicInteger(1);
250: final String namePrefix;
251:
252: TaskThreadFactory(String namePrefix) {
253: SecurityManager s = System.getSecurityManager();
254: group = (s != null) ? s.getThreadGroup() : Thread
255: .currentThread().getThreadGroup();
256: this .namePrefix = namePrefix;
257: }
258:
259: public Thread newThread(Runnable r) {
260: Thread t = new Thread(group, r, namePrefix
261: + threadNumber.getAndIncrement());
262: t.setDaemon(daemon);
263: t.setPriority(getThreadPriority());
264: return t;
265: }
266: }
267:
268: }
|