001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.cocoon.components.thread;
018:
019: import org.apache.avalon.framework.activity.Disposable;
020: import org.apache.avalon.framework.activity.Startable;
021: import org.apache.avalon.framework.configuration.Configurable;
022: import org.apache.avalon.framework.configuration.Configuration;
023: import org.apache.avalon.framework.configuration.ConfigurationException;
024: import org.apache.avalon.framework.logger.AbstractLogEnabled;
025: import org.apache.avalon.framework.logger.Logger;
026: import org.apache.avalon.framework.thread.ThreadSafe;
027:
028: import java.util.HashMap;
029: import java.util.Iterator;
030: import java.util.Map;
031: import java.util.SortedSet;
032: import java.util.TreeSet;
033:
034: /**
035: * The DefaultRunnableManager implements the {@link RunnableManager} interface
036: * and is responsible to create {@link ThreadPool}s and run {@link Runnable}s
037: * in them as background commands.
038: *
039: * <p>
040: * The configuration of the <code>DefaultRunnableManager</code>:
041: * <pre>
042: * <thread-factory>org.apache.cocoon.components.thread.DefaultThreadFactory</thread-factory>
043: * <thread-pools>
044: * <thread-pool>
045: * <name>default</name>
046: * <priority>NORM</priority>
047: * <daemon>false</daemon>
048: * <queue-size>-1</queue-size>
049: * <max-pool-size>-1</max-pool-size>
050: * <min-pool-size>2</min-pool-size>
051: * <keep-alive-time-ms>20000</keep-alive-time-ms>
052: * <block-policy>RUN</block-policy>
053: * <shutdown-graceful>false</shutdown-graceful>
054: * <shutdown-wait-time-ms>-1</shutdown-wait-time-ms>
055: * </thread-pool>
056: * </thread-pools>
057: * </pre>
058: * </p>
059: *
060: * <p>
061: * Have a look at
062: * http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/PooledExecutor.html,
063: * {@link EDU.oswego.cs.dl.util.concurrent.PooledExecutor} or the cocoon.xconf
064: * file for more information.
065: * </p>
066: *
067: * @author <a href="mailto:giacomo.at.apache.org">Giacomo Pati</a>
068: * @version $Id: DefaultRunnableManager.java 56848 2004-11-07 14:09:23Z giacomo $
069: */
070: public class DefaultRunnableManager extends AbstractLogEnabled
071: implements RunnableManager, Configurable, Disposable, Runnable,
072: Startable, ThreadSafe {
073: //~ Static fields/initializers ---------------------------------------------
074:
075: /** The default {@link ThreadFactory} */
076: public static final String DEFAULT_THREAD_FACTORY = DefaultThreadFactory.class
077: .getName();
078:
079: /** The default queue size */
080: public static final int DEFAULT_QUEUE_SIZE = -1;
081:
082: /** The default maximum pool size */
083: public static final int DEFAULT_MAX_POOL_SIZE = 5;
084:
085: /** The default minimum pool size */
086: public static final int DEFAULT_MIN_POOL_SIZE = 5;
087:
088: /** The default thread priority */
089: public static final String DEFAULT_THREAD_PRIORITY = "NORM";
090:
091: /** The default daemon mode */
092: public static final boolean DEFAULT_DAEMON_MODE = false;
093:
094: /** The default keep alive time */
095: public static final long DEFAULT_KEEP_ALIVE_TIME = 60000L;
096:
097: /** The default way to shutdown gracefully */
098: public static final boolean DEFAULT_SHUTDOWN_GRACEFUL = false;
099:
100: /** The default shutdown waittime time */
101: public static final int DEFAULT_SHUTDOWN_WAIT_TIME = -1;
102:
103: /** The default shutdown waittime time */
104: public static final String DEFAULT_THREADPOOL_NAME = "default";
105:
106: //~ Instance fields --------------------------------------------------------
107:
108: /**
109: * Sorted set of <code>ExecutionInfo</code> instances, based on their next
110: * execution time.
111: */
112: protected SortedSet m_commandStack = new TreeSet();
113:
114: /** The managed thread pools */
115: final Map m_pools = new HashMap();
116:
117: /** The configured default ThreadFactory class instance */
118: private Class m_defaultThreadFactoryClass;
119:
120: /** Keep us running? */
121: private boolean m_keepRunning = false;
122:
123: //~ Methods ----------------------------------------------------------------
124:
125: /**
126: * @see org.apache.avalon.framework.configuration.Configurable#configure(org.apache.avalon.framework.configuration.Configuration)
127: */
128: public void configure(final Configuration config)
129: throws ConfigurationException {
130: final String defaultThreadFactoryName = config.getChild(
131: "thread-factory").getValue(DEFAULT_THREAD_FACTORY);
132:
133: try {
134: m_defaultThreadFactoryClass = Thread.currentThread()
135: .getContextClassLoader().loadClass(
136: defaultThreadFactoryName);
137: } catch (final Exception ex) {
138: throw new ConfigurationException(
139: "Cannot create instance of default thread factory "
140: + defaultThreadFactoryName, ex);
141: }
142:
143: final Configuration[] threadpools = config.getChild(
144: "thread-pools").getChildren("thread-pool");
145:
146: for (int i = 0; i < threadpools.length; i++) {
147: final DefaultThreadPool pool = configThreadPool(threadpools[i]);
148: }
149:
150: // Check if a "default" pool has been created
151: final ThreadPool defaultThreadPool = (ThreadPool) m_pools
152: .get(DEFAULT_THREADPOOL_NAME);
153:
154: if (null == defaultThreadPool) {
155: createPool(DEFAULT_THREADPOOL_NAME, DEFAULT_QUEUE_SIZE,
156: DEFAULT_MAX_POOL_SIZE, DEFAULT_MIN_POOL_SIZE,
157: getPriority(DEFAULT_THREAD_PRIORITY),
158: DEFAULT_DAEMON_MODE, DEFAULT_KEEP_ALIVE_TIME,
159: DefaultThreadPool.POLICY_DEFAULT,
160: DEFAULT_SHUTDOWN_GRACEFUL,
161: DEFAULT_SHUTDOWN_WAIT_TIME);
162: }
163: }
164:
165: /**
166: * Create a shared ThreadPool
167: *
168: * @param name The name of the thread pool
169: * @param queueSize The size of the queue
170: * @param maxPoolSize The maximum number of threads
171: * @param minPoolSize The maximum number of threads
172: * @param priority The priority of threads created by this pool. This is
173: * one of {@link Thread#MIN_PRIORITY}, {@link
174: * Thread#NORM_PRIORITY}, or {@link Thread#MAX_PRIORITY}
175: * @param isDaemon Whether or not thread from the pool should run in daemon
176: * mode
177: * @param keepAliveTime How long should a thread be alive for new work to
178: * be done before it is GCed
179: * @param blockPolicy What's the blocking policy is resources are exhausted
180: * @param shutdownGraceful Should we wait for the queue to finish all
181: * pending commands?
182: * @param shutdownWaitTime After what time a normal shutdown should take
183: * into account if a graceful shutdown has not come to an end
184: *
185: * @throws IllegalArgumentException If the pool already exists
186: */
187: public void createPool(final String name, final int queueSize,
188: final int maxPoolSize, final int minPoolSize,
189: final int priority, final boolean isDaemon,
190: final long keepAliveTime, final String blockPolicy,
191: final boolean shutdownGraceful, final int shutdownWaitTime) {
192: if (null != m_pools.get(name)) {
193: throw new IllegalArgumentException("ThreadPool \"" + name
194: + "\" already exists");
195: }
196:
197: createPool(new DefaultThreadPool(), name, queueSize,
198: maxPoolSize, minPoolSize, priority, isDaemon,
199: keepAliveTime, blockPolicy, shutdownGraceful,
200: shutdownWaitTime);
201: }
202:
203: /**
204: * Create a private ThreadPool
205: *
206: * @param queueSize The size of the queue
207: * @param maxPoolSize The maximum number of threads
208: * @param minPoolSize The maximum number of threads
209: * @param priority The priority of threads created by this pool. This is
210: * one of {@link Thread#MIN_PRIORITY}, {@link
211: * Thread#NORM_PRIORITY}, or {@link Thread#MAX_PRIORITY}
212: * @param isDaemon Whether or not thread from the pool should run in daemon
213: * mode
214: * @param keepAliveTime How long should a thread be alive for new work to
215: * be done before it is GCed
216: * @param blockPolicy What's the blocking policy is resources are exhausted
217: * @param shutdownGraceful Should we wait for the queue to finish all
218: * pending commands?
219: * @param shutdownWaitTime After what time a normal shutdown should take
220: * into account if a graceful shutdown has not come to an end
221: *
222: * @return A newly created <code>ThreadPool</code>
223: */
224: public ThreadPool createPool(final int queueSize,
225: final int maxPoolSize, final int minPoolSize,
226: final int priority, final boolean isDaemon,
227: final long keepAliveTime, final String blockPolicy,
228: final boolean shutdownGraceful, final int shutdownWaitTime) {
229: final DefaultThreadPool pool = new DefaultThreadPool();
230: final String name = "anon-" + pool.hashCode();
231:
232: return createPool(pool, name, queueSize, maxPoolSize,
233: minPoolSize, priority, isDaemon, keepAliveTime,
234: blockPolicy, shutdownGraceful, shutdownWaitTime);
235: }
236:
237: /**
238: * @see org.apache.avalon.framework.activity.Disposable#dispose()
239: */
240: public void dispose() {
241: if (getLogger().isDebugEnabled()) {
242: getLogger().debug("Disposing all thread pools");
243: }
244:
245: for (final Iterator i = m_pools.keySet().iterator(); i
246: .hasNext();) {
247: final String poolName = (String) i.next();
248: final DefaultThreadPool pool = (DefaultThreadPool) m_pools
249: .get(poolName);
250:
251: if (getLogger().isDebugEnabled()) {
252: getLogger().debug(
253: "Disposing thread pool " + pool.getName());
254: }
255:
256: pool.shutdown();
257:
258: if (getLogger().isDebugEnabled()) {
259: getLogger().debug(
260: "Thread pool " + pool.getName() + " disposed");
261: }
262: }
263:
264: try {
265: m_pools.clear();
266: } catch (final Throwable t) {
267: getLogger().error("Cannot dispose", t);
268: }
269: }
270:
271: /**
272: * Run a {@link Runnable} in the background using a {@link ThreadPool}
273: *
274: * @param threadPoolName The thread pool name to be used
275: * @param command The {@link Runnable} to execute
276: * @param delay the delay befor first run
277: * @param interval The interval for repeated runs
278: *
279: * @throws IllegalArgumentException DOCUMENT ME!
280: */
281: public void execute(final String threadPoolName,
282: final Runnable command, final long delay, long interval) {
283: if (delay < 0) {
284: throw new IllegalArgumentException("delay < 0");
285: }
286:
287: if (interval < 0) {
288: throw new IllegalArgumentException("interval < 0");
289: }
290:
291: ThreadPool pool = (ThreadPool) m_pools.get(threadPoolName);
292:
293: if (null == pool) {
294: getLogger().warn(
295: "ThreadPool \"" + threadPoolName
296: + "\" is not known. Will use ThreadPool \""
297: + DEFAULT_THREADPOOL_NAME + "\"");
298: pool = (ThreadPool) m_pools.get(DEFAULT_THREADPOOL_NAME);
299: }
300:
301: if (getLogger().isDebugEnabled()) {
302: getLogger().debug(
303: "Command entered: " + command.toString()
304: + ", pool=" + pool.getName() + ", delay="
305: + delay + ", interval=" + interval);
306: }
307:
308: new ExecutionInfo(pool, command, delay, interval, getLogger());
309: }
310:
311: /**
312: * Run a {@link Runnable} in the background using a {@link ThreadPool}
313: *
314: * @param command The {@link Runnable} to execute
315: * @param delay the delay befor first run
316: * @param interval The interval for repeated runs
317: */
318: public void execute(final Runnable command, final long delay,
319: final long interval) {
320: execute(DEFAULT_THREADPOOL_NAME, command, delay, interval);
321: }
322:
323: /**
324: * Run a {@link Runnable} in the background using a {@link ThreadPool}
325: *
326: * @param command The {@link Runnable} to execute
327: * @param delay the delay befor first run
328: */
329: public void execute(final Runnable command, final long delay) {
330: execute(DEFAULT_THREADPOOL_NAME, command, delay, 0);
331: }
332:
333: /**
334: * Run a {@link Runnable} in the background using a {@link ThreadPool}
335: *
336: * @param command The {@link Runnable} to execute
337: */
338: public void execute(final Runnable command) {
339: execute(DEFAULT_THREADPOOL_NAME, command, 0, 0);
340: }
341:
342: /**
343: * Run a {@link Runnable} in the background using a {@link ThreadPool}
344: *
345: * @param threadPoolName The thread pool name to be used
346: * @param command The {@link Runnable} to execute
347: * @param delay the delay befor first run
348: */
349: public void execute(final String threadPoolName,
350: final Runnable command, final long delay) {
351: execute(threadPoolName, command, delay, 0);
352: }
353:
354: /**
355: * Run a {@link Runnable} in the background using a {@link ThreadPool}
356: *
357: * @param threadPoolName The thread pool name to be used
358: * @param command The {@link Runnable} to execute
359: */
360: public void execute(final String threadPoolName,
361: final Runnable command) {
362: execute(threadPoolName, command, 0, 0);
363: }
364:
365: /**
366: * Remove a <code>Runnable</code> from the command stack
367: *
368: * @param command The <code>Runnable</code> to be removed
369: */
370: public void remove(Runnable command) {
371: synchronized (m_commandStack) {
372: for (final Iterator i = m_commandStack.iterator(); i
373: .hasNext();) {
374: final ExecutionInfo info = (ExecutionInfo) i.next();
375:
376: if (info.m_command == command) {
377: i.remove();
378: m_commandStack.notifyAll();
379:
380: return;
381: }
382: }
383: }
384:
385: getLogger().warn(
386: "Could not find command " + command + " for removal");
387: }
388:
389: /**
390: * The heart of the command manager
391: */
392: public void run() {
393: if (getLogger().isDebugEnabled()) {
394: getLogger().debug("Entering loop");
395: }
396:
397: while (m_keepRunning) {
398: synchronized (m_commandStack) {
399: try {
400: if (m_commandStack.size() > 0) {
401: final ExecutionInfo info = (ExecutionInfo) m_commandStack
402: .first();
403: final long delay = info.m_nextRun
404: - System.currentTimeMillis();
405:
406: if (delay > 0) {
407: m_commandStack.wait(delay);
408: }
409: } else {
410: if (getLogger().isDebugEnabled()) {
411: getLogger()
412: .debug(
413: "No commands available. Will just wait for one");
414: }
415:
416: m_commandStack.wait();
417: }
418: } catch (final InterruptedException ie) {
419: if (getLogger().isDebugEnabled()) {
420: getLogger().debug("I've been interrupted");
421: }
422: }
423:
424: if (m_keepRunning) {
425: if (m_commandStack.size() > 0) {
426: final ExecutionInfo info = (ExecutionInfo) m_commandStack
427: .first();
428: final long delay = info.m_nextRun
429: - System.currentTimeMillis();
430:
431: if (delay < 0) {
432: info.execute();
433: }
434: }
435: }
436: }
437: }
438:
439: if (getLogger().isDebugEnabled()) {
440: getLogger().debug("Exiting loop");
441: }
442: }
443:
444: /**
445: * Start the managing thread
446: *
447: * @throws Exception DOCUMENT ME!
448: */
449: public void start() throws Exception {
450: if (getLogger().isDebugEnabled()) {
451: getLogger().debug("Starting the heart");
452: }
453:
454: m_keepRunning = true;
455: ((ThreadPool) m_pools.get(DEFAULT_THREADPOOL_NAME))
456: .execute(this );
457: }
458:
459: /**
460: * Stop the managing thread
461: *
462: * @throws Exception DOCUMENT ME!
463: */
464: public void stop() throws Exception {
465: m_keepRunning = false;
466:
467: synchronized (m_commandStack) {
468: m_commandStack.notifyAll();
469: }
470: }
471:
472: /**
473: * DOCUMENT ME!
474: *
475: * @param priority The priority to set as string value.
476: *
477: * @return The priority as int value.
478: */
479: private int getPriority(final String priority) {
480: if ("MIN".equalsIgnoreCase(priority)) {
481: return Thread.MIN_PRIORITY;
482: } else if ("NORM".equalsIgnoreCase(priority)) {
483: return Thread.NORM_PRIORITY;
484: } else if ("MAX".equalsIgnoreCase(priority)) {
485: return Thread.MAX_PRIORITY;
486: } else {
487: getLogger().warn(
488: "Unknown thread priority \"" + priority
489: + "\". Set to \"NORM\".");
490:
491: return Thread.NORM_PRIORITY;
492: }
493: }
494:
495: /**
496: * DOCUMENT ME!
497: *
498: * @param config DOCUMENT ME!
499: *
500: * @return DOCUMENT ME!
501: *
502: * @throws ConfigurationException DOCUMENT ME!
503: */
504: private DefaultThreadPool configThreadPool(
505: final Configuration config) throws ConfigurationException {
506: final String name = config.getChild("name").getValue();
507: final int queueSize = config.getChild("queue-size")
508: .getValueAsInteger(DEFAULT_QUEUE_SIZE);
509: final int maxPoolSize = config.getChild("max-pool-size")
510: .getValueAsInteger(DEFAULT_MAX_POOL_SIZE);
511: int minPoolSize = config.getChild("min-pool-size")
512: .getValueAsInteger(DEFAULT_MIN_POOL_SIZE);
513:
514: // make sure we have enough threads for the default thread pool as we
515: // need one for ourself
516: if (DEFAULT_THREADPOOL_NAME.equals(name)
517: && ((minPoolSize > 0) && (minPoolSize < DEFAULT_MIN_POOL_SIZE))) {
518: minPoolSize = DEFAULT_MIN_POOL_SIZE;
519: }
520:
521: final String priority = config.getChild("priority").getValue(
522: DEFAULT_THREAD_PRIORITY);
523: final boolean isDaemon = config.getChild("daemon")
524: .getValueAsBoolean(DEFAULT_DAEMON_MODE);
525: final long keepAliveTime = config
526: .getChild("keep-alive-time-ms").getValueAsLong(
527: DEFAULT_KEEP_ALIVE_TIME);
528: final String blockPolicy = config.getChild("block-policy")
529: .getValue(DefaultThreadPool.POLICY_DEFAULT);
530: final boolean shutdownGraceful = config.getChild(
531: "shutdown-graceful").getValueAsBoolean(
532: DEFAULT_SHUTDOWN_GRACEFUL);
533: final int shutdownWaitTime = config.getChild(
534: "shutdown-wait-time-ms").getValueAsInteger(
535: DEFAULT_SHUTDOWN_WAIT_TIME);
536:
537: return createPool(new DefaultThreadPool(), name, queueSize,
538: maxPoolSize, minPoolSize, getPriority(priority),
539: isDaemon, keepAliveTime, blockPolicy, shutdownGraceful,
540: shutdownWaitTime);
541: }
542:
543: /**
544: * Create a ThreadPool
545: *
546: * @param pool DOCUMENT ME!
547: * @param name DOCUMENT ME!
548: * @param queueSize The size of the queue
549: * @param maxPoolSize The maximum number of threads
550: * @param minPoolSize The maximum number of threads
551: * @param priority The priority of threads created by this pool. This is
552: * one of {@link Thread#MIN_PRIORITY}, {@link
553: * Thread#NORM_PRIORITY}, or {@link Thread#MAX_PRIORITY}
554: * @param isDaemon Whether or not thread from the pool should run in daemon
555: * mode
556: * @param keepAliveTime How long should a thread be alive for new work to
557: * be done before it is GCed
558: * @param blockPolicy What's the blocking policy is resources are exhausted
559: * @param shutdownGraceful Should we wait for the queue to finish all
560: * pending commands?
561: * @param shutdownWaitTime After what time a normal shutdown should take
562: * into account if a graceful shutdown has not come to an end
563: *
564: * @return A newly created <code>ThreadPool</code>
565: */
566: private DefaultThreadPool createPool(final DefaultThreadPool pool,
567: final String name, final int queueSize,
568: final int maxPoolSize, final int minPoolSize,
569: final int priority, final boolean isDaemon,
570: final long keepAliveTime, final String blockPolicy,
571: final boolean shutdownGraceful, final int shutdownWaitTime) {
572: pool.enableLogging(getLogger().getChildLogger(name));
573: pool.setName(name);
574:
575: ThreadFactory factory = null;
576: try {
577: factory = (ThreadFactory) m_defaultThreadFactoryClass
578: .newInstance();
579: } catch (final Exception ex) {
580: getLogger().warn(
581: "Cannot instantiate a ThreadFactory from class "
582: + m_defaultThreadFactoryClass.getName()
583: + ". Will use a "
584: + DefaultThreadFactory.class.getName(), ex);
585: factory = new DefaultThreadFactory();
586: }
587:
588: factory.setPriority(priority);
589: factory.setDaemon(isDaemon);
590: pool.setThreadFactory(factory);
591: pool.setQueue(queueSize);
592: pool.setMaximumPoolSize((maxPoolSize < 0) ? Integer.MAX_VALUE
593: : maxPoolSize);
594:
595: if (minPoolSize < 1) {
596: getLogger().warn(
597: "min-pool-size < 1 for pool \"" + name
598: + "\". Set to 1");
599: }
600:
601: pool.setMinimumPoolSize((minPoolSize < 1) ? 1 : minPoolSize);
602:
603: if (keepAliveTime < 0) {
604: getLogger().warn(
605: "keep-alive-time-ms < 0 for pool \"" + name
606: + "\". Set to 1000");
607: }
608:
609: pool.setKeepAliveTime((keepAliveTime < 0) ? 1000
610: : keepAliveTime);
611: pool.setBlockPolicy(blockPolicy);
612: pool.setShutdownGraceful(shutdownGraceful);
613: pool.setShutdownWaitTimeMs(shutdownWaitTime);
614:
615: synchronized (m_pools) {
616: m_pools.put(name, pool);
617: }
618:
619: printPoolInfo(pool);
620: return pool;
621: }
622:
623: /**
624: * DOCUMENT ME!
625: *
626: * @param pool DOCUMENT ME!
627: */
628: private void printPoolInfo(final DefaultThreadPool pool) {
629: if (getLogger().isInfoEnabled()) {
630: if (pool.isQueued()) {
631: final StringBuffer msg = new StringBuffer();
632: msg.append("ThreadPool named \"")
633: .append(pool.getName());
634: msg.append("\" created with maximum queue-size=");
635: msg.append(pool.getMaxQueueSize());
636: msg.append(",max-pool-size=").append(
637: pool.getMaximumPoolSize());
638: msg.append(",min-pool-size=").append(
639: pool.getMinimumPoolSize());
640: msg.append(",priority=").append(pool.getPriority());
641: msg.append(",isDaemon=").append(
642: ((ThreadFactory) pool.getThreadFactory())
643: .isDaemon());
644: msg.append(",keep-alive-time-ms=").append(
645: pool.getKeepAliveTime());
646: msg.append(",block-policy=\"").append(
647: pool.getBlockPolicy());
648: msg.append("\",shutdown-wait-time-ms=").append(
649: pool.getShutdownWaitTimeMs());
650: getLogger().info(msg.toString());
651: } else {
652: final StringBuffer msg = new StringBuffer();
653: msg.append("ThreadPool named \"")
654: .append(pool.getName());
655: msg.append("\" created with no queue,max-pool-size=")
656: .append(pool.getMaximumPoolSize());
657: msg.append(",min-pool-size=").append(
658: pool.getMinimumPoolSize());
659: msg.append(",priority=").append(pool.getPriority());
660: msg.append(",isDaemon=").append(
661: ((ThreadFactory) pool.getThreadFactory())
662: .isDaemon());
663: msg.append(",keep-alive-time-ms=").append(
664: pool.getKeepAliveTime());
665: msg.append(",block-policy=").append(
666: pool.getBlockPolicy());
667: msg.append(",shutdown-wait-time-ms=").append(
668: pool.getShutdownWaitTimeMs());
669: getLogger().info(msg.toString());
670: }
671: }
672: }
673:
674: //~ Inner Classes ----------------------------------------------------------
675:
676: /**
677: * The $classType$ class ...
678: *
679: * @author <a href="mailto:giacomo.at.apache.org">Giacomo Pati</a>
680: * @version $Id: DefaultRunnableManager.java 56848 2004-11-07 14:09:23Z giacomo $
681: */
682: private class ExecutionInfo implements Comparable {
683: //~ Instance fields ----------------------------------------------------
684:
685: /** Our logger */
686: final Logger m_logger;
687:
688: /** DOCUMENT ME! */
689: final Runnable m_command;
690:
691: /** DOCUMENT ME! */
692: final ThreadPool m_pool;
693:
694: /** DOCUMENT ME! */
695: final long m_delay;
696:
697: /** DOCUMENT ME! */
698: final long m_interval;
699:
700: /** DOCUMENT ME! */
701: long m_nextRun = 0;
702:
703: //~ Constructors -------------------------------------------------------
704:
705: /**
706: * Creates a new ExecutionInfo object.
707: *
708: * @param pool DOCUMENT ME!
709: * @param command DOCUMENT ME!
710: * @param delay DOCUMENT ME!
711: * @param interval DOCUMENT ME!
712: * @param logger DOCUMENT ME!
713: */
714: ExecutionInfo(final ThreadPool pool, final Runnable command,
715: final long delay, final long interval,
716: final Logger logger) {
717: m_pool = pool;
718: m_command = command;
719: m_delay = delay;
720: m_interval = interval;
721: m_logger = logger;
722: m_nextRun = System.currentTimeMillis() + delay;
723:
724: synchronized (m_commandStack) {
725: m_commandStack.add(this );
726: m_commandStack.notifyAll();
727: }
728: Thread.yield(); // Give others a chance to run
729: }
730:
731: //~ Methods ------------------------------------------------------------
732:
733: /**
734: * DOCUMENT ME!
735: *
736: * @param other DOCUMENT ME!
737: *
738: * @return DOCUMENT ME!
739: */
740: public int compareTo(final Object other) {
741: final ExecutionInfo otherInfo = (ExecutionInfo) other;
742: int diff = (int) (m_nextRun - otherInfo.m_nextRun);
743: if (diff == 0) {
744: if (this == other) {
745: // Same object, return 0.
746: return 0;
747: } else {
748: // NOT the same object, MUST return non-0 value.
749: return System.identityHashCode(this )
750: - System.identityHashCode(other);
751: }
752: }
753: return diff;
754: }
755:
756: /**
757: * DOCUMENT ME!
758: */
759: void execute() {
760: if (m_logger.isDebugEnabled()) {
761: m_logger.debug("Executing command " + m_command
762: + " in pool \"" + m_pool.getName()
763: + "\", schedule with interval=" + m_interval);
764: }
765:
766: synchronized (m_commandStack) {
767: m_commandStack.remove(this );
768: if (m_interval > 0) {
769: m_nextRun = System.currentTimeMillis() + m_interval;
770: m_commandStack.add(this );
771: }
772: }
773:
774: try {
775: m_pool.execute(m_command);
776: } catch (final InterruptedException ie) {
777: if (m_logger.isDebugEnabled()) {
778: m_logger.debug("Interrupted executing command + "
779: + m_command);
780: }
781: } catch (final Throwable t) {
782: m_logger.error("Exception executing command "
783: + m_command, t);
784: }
785: }
786: }
787: }
|