001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.cocoon.components.thread;
018:
019: import org.apache.avalon.framework.logger.LogEnabled;
020: import org.apache.avalon.framework.logger.Logger;
021:
022: import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
023:
024: /**
025: * The DefaultThreadPool class implements the {@link ThreadPool} interface.
026: * Instances of this class are made by the {@link RunnableManager} passing a
027: * configuration into the <code>configure</code> method.
028: *
029: * @author <a href="mailto:giacomo.at.apache.org">Giacomo Pati</a>
030: * @version CVS $Id: DefaultThreadPool.java 56843 2004-11-07 13:34:30Z giacomo $
031: */
032: public class DefaultThreadPool extends PooledExecutor implements
033: ThreadPool, LogEnabled {
034: //~ Static fields/initializers ---------------------------------------------
035:
036: /** Default ThreadPool block policy */
037: public static final String POLICY_DEFAULT = POLICY_RUN;
038:
039: //~ Instance fields --------------------------------------------------------
040:
041: /** Wrapps a channel */
042: private ChannelWrapper m_channelWrapper;
043:
044: /** Our logger */
045: private Logger m_logger;
046:
047: /** The Queue */
048: private Queue m_queue;
049:
050: /** The blocking policy */
051: private String m_blockPolicy;
052:
053: /** The name of this thread pool */
054: private String m_name;
055:
056: /** Should we wait for running jobs to terminate on shutdown ? */
057: private boolean m_shutdownGraceful;
058:
059: /** The maximum queue size */
060: private int m_queueSize;
061:
062: /** How long to wait for running jobs to terminate on disposition */
063: private int m_shutdownWaitTimeMs;
064:
065: //~ Constructors -----------------------------------------------------------
066:
067: /**
068: * Create a new pool.
069: */
070: DefaultThreadPool() {
071: this (new ChannelWrapper());
072: }
073:
074: /**
075: * Create a new pool.
076: *
077: * @param channel DOCUMENT ME!
078: */
079: private DefaultThreadPool(final ChannelWrapper channel) {
080: super (channel);
081: m_channelWrapper = channel;
082: }
083:
084: //~ Methods ----------------------------------------------------------------
085:
086: /**
087: * DOCUMENT ME!
088: *
089: * @return Returns the blockPolicy.
090: */
091: public String getBlockPolicy() {
092: return m_blockPolicy;
093: }
094:
095: /**
096: * DOCUMENT ME!
097: *
098: * @return maximum size of the queue (0 if isQueued() == false)
099: *
100: * @see org.apache.cocoon.components.thread.ThreadPool#getQueueSize()
101: */
102: public int getMaxQueueSize() {
103: return ((m_queueSize < 0) ? Integer.MAX_VALUE : m_queueSize);
104: }
105:
106: /**
107: * DOCUMENT ME!
108: *
109: * @return size of queue (0 if isQueued() == false)
110: *
111: * @see org.apache.cocoon.components.thread.ThreadPool#getQueueSize()
112: */
113: public int getMaximumQueueSize() {
114: return m_queueSize;
115: }
116:
117: /**
118: * @see org.apache.cocoon.components.thread.ThreadPool#getName()
119: */
120: public String getName() {
121: return m_name;
122: }
123:
124: /**
125: * Get hte priority used to create Threads
126: *
127: * @return {@link Thread#MIN_PRIORITY}, {@link Thread#NORM_PRIORITY}, or
128: * {@link Thread#MAX_PRIORITY}
129: */
130: public int getPriority() {
131: return ((ThreadFactory) super .getThreadFactory()).getPriority();
132: }
133:
134: /**
135: * DOCUMENT ME!
136: *
137: * @return current size of the queue (0 if isQueued() == false)
138: *
139: * @see org.apache.cocoon.components.thread.ThreadPool#getQueueSize()
140: */
141: public int getQueueSize() {
142: return m_queue.getQueueSize();
143: }
144:
145: /**
146: * Whether this DefaultThreadPool has a queue
147: *
148: * @return Returns the m_isQueued.
149: *
150: * @see org.apache.cocoon.components.thread.ThreadPool#isQueued()
151: */
152: public boolean isQueued() {
153: return m_queueSize != 0;
154: }
155:
156: /**
157: * Set the logger
158: *
159: * @param logger
160: *
161: * @see org.apache.avalon.framework.logger.LogEnabled#enableLogging(org.apache.avalon.framework.logger.Logger)
162: */
163: public void enableLogging(Logger logger) {
164: m_logger = logger;
165: }
166:
167: /**
168: * Execute a command
169: *
170: * @param command The {@link Runnable} to execute
171: *
172: * @throws InterruptedException In case of interruption
173: */
174: public void execute(Runnable command) throws InterruptedException {
175: if (getLogger().isDebugEnabled()) {
176: getLogger().debug(
177: "Executing Command: " + command.toString()
178: + ",pool=" + getName());
179: }
180:
181: super .execute(command);
182: }
183:
184: /**
185: * @see org.apache.cocoon.components.thread.ThreadPool#shutdown()
186: */
187: public void shutdown() {
188: if (m_shutdownGraceful) {
189: shutdownAfterProcessingCurrentlyQueuedTasks();
190: } else {
191: shutdownNow();
192: }
193:
194: try {
195: if (getShutdownWaitTimeMs() > 0) {
196: if (!awaitTerminationAfterShutdown(getShutdownWaitTimeMs())) {
197: getLogger()
198: .warn(
199: "running commands have not terminated within "
200: + getShutdownWaitTimeMs()
201: + "ms. Will shut them down by interruption");
202: interruptAll();
203: shutdownNow();
204: }
205: }
206:
207: awaitTerminationAfterShutdown();
208: } catch (final InterruptedException ie) {
209: getLogger().error("cannot shutdown ThreadPool", ie);
210: }
211: }
212:
213: /**
214: * Set the blocking policy
215: *
216: * @param blockPolicy The blocking policy value
217: */
218: void setBlockPolicy(final String blockPolicy) {
219: m_blockPolicy = blockPolicy;
220:
221: if (POLICY_ABORT.equalsIgnoreCase(blockPolicy)) {
222: abortWhenBlocked();
223: } else if (POLICY_DISCARD.equalsIgnoreCase(blockPolicy)) {
224: discardWhenBlocked();
225: } else if (POLICY_DISCARD_OLDEST.equalsIgnoreCase(blockPolicy)) {
226: discardOldestWhenBlocked();
227: } else if (POLICY_RUN.equalsIgnoreCase(blockPolicy)) {
228: runWhenBlocked();
229: } else if (POLICY_WAIT.equalsIgnoreCase(blockPolicy)) {
230: waitWhenBlocked();
231: } else {
232: final StringBuffer msg = new StringBuffer();
233: msg
234: .append(
235: "WARNING: Unknown block-policy configuration \"")
236: .append(blockPolicy);
237: msg.append("\". Should be one of \"").append(POLICY_ABORT);
238: msg.append("\",\"").append(POLICY_DISCARD);
239: msg.append("\",\"").append(POLICY_DISCARD_OLDEST);
240: msg.append("\",\"").append(POLICY_RUN);
241: msg.append("\",\"").append(POLICY_WAIT);
242: msg.append("\". Will use \"").append(POLICY_DEFAULT)
243: .append("\"");
244: getLogger().warn(msg.toString());
245: setBlockPolicy(POLICY_DEFAULT);
246: }
247: }
248:
249: /**
250: * DOCUMENT ME!
251: *
252: * @param name The name to set.
253: */
254: void setName(String name) {
255: m_name = name;
256: }
257:
258: /**
259: * DOCUMENT ME!
260: *
261: * @param queueSize DOCUMENT ME!
262: */
263: void setQueue(final int queueSize) {
264: if (queueSize != 0) {
265: if (queueSize > 0) {
266: m_queue = new BoundedQueue(queueSize);
267: } else {
268: m_queue = new LinkedQueue();
269: }
270: } else {
271: m_queue = new SynchronousChannel();
272: }
273:
274: m_queueSize = queueSize;
275: m_channelWrapper.setChannel(m_queue);
276: }
277:
278: /**
279: * DOCUMENT ME!
280: *
281: * @param shutdownGraceful The shutdownGraceful to set.
282: */
283: void setShutdownGraceful(boolean shutdownGraceful) {
284: m_shutdownGraceful = shutdownGraceful;
285: }
286:
287: /**
288: * DOCUMENT ME!
289: *
290: * @return Returns the shutdownGraceful.
291: */
292: boolean isShutdownGraceful() {
293: return m_shutdownGraceful;
294: }
295:
296: /**
297: * DOCUMENT ME!
298: *
299: * @param shutdownWaitTimeMs The shutdownWaitTimeMs to set.
300: */
301: void setShutdownWaitTimeMs(int shutdownWaitTimeMs) {
302: m_shutdownWaitTimeMs = shutdownWaitTimeMs;
303: }
304:
305: /**
306: * DOCUMENT ME!
307: *
308: * @return Returns the shutdownWaitTimeMs.
309: */
310: int getShutdownWaitTimeMs() {
311: return m_shutdownWaitTimeMs;
312: }
313:
314: /**
315: * Get our <code>Logger</code>
316: *
317: * @return our <code>Logger</code>
318: */
319: private Logger getLogger() {
320: return m_logger;
321: }
322: }
|