001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.executors.impl;
018:
019: import java.lang.reflect.Method;
020: import java.util.HashMap;
021: import java.util.Map;
022: import java.util.concurrent.ArrayBlockingQueue;
023: import java.util.concurrent.BlockingQueue;
024: import java.util.concurrent.LinkedBlockingQueue;
025: import java.util.concurrent.RejectedExecutionHandler;
026: import java.util.concurrent.SynchronousQueue;
027: import java.util.concurrent.ThreadFactory;
028: import java.util.concurrent.ThreadPoolExecutor;
029: import java.util.concurrent.TimeUnit;
030: import java.util.concurrent.atomic.AtomicInteger;
031:
032: import org.apache.servicemix.executors.Executor;
033: import org.apache.servicemix.executors.ExecutorFactory;
034:
035: /**
036: * Default implementation of the ExecutorFactory.
037: *
038: * Configuration can be done hierachically.
039: * When an executor is created with an id of <code>foo.bar</code>,
040: * the factory will look for a configuration in the following
041: * way:
042: * <ul>
043: * <li>configs.get("foo.bar")</li>
044: * <li>configs.get("foo")</li>
045: * <li>defaultConfig</li>
046: * </ul>
047: *
048: * @author <a href="mailto:gnodet [at] gmail.com">Guillaume Nodet</a>
049: */
050: public class ExecutorFactoryImpl implements ExecutorFactory {
051:
052: private ExecutorConfig defaultConfig = new ExecutorConfig();
053:
054: private Map<String, ExecutorConfig> configs = new HashMap<String, ExecutorConfig>();
055:
056: public Executor createExecutor(String id) {
057: ExecutorConfig config = getConfig(id);
058: return new ExecutorImpl(createService(id, config), config
059: .getShutdownDelay());
060: }
061:
062: protected ExecutorConfig getConfig(String id) {
063: ExecutorConfig config = null;
064: if (configs != null) {
065: config = configs.get(id);
066: while (config == null && id.indexOf('.') > 0) {
067: id = id.substring(0, id.lastIndexOf('.'));
068: config = configs.get(id);
069: }
070: }
071: if (config == null) {
072: config = defaultConfig;
073: }
074: return config;
075: }
076:
077: protected ThreadPoolExecutor createService(String id,
078: ExecutorConfig config) {
079: if (config.getQueueSize() != 0 && config.getCorePoolSize() == 0) {
080: throw new IllegalArgumentException(
081: "CorePoolSize must be > 0 when using a capacity queue");
082: }
083: BlockingQueue<Runnable> queue;
084: if (config.getQueueSize() == 0) {
085: queue = new SynchronousQueue<Runnable>();
086: } else if (config.getQueueSize() < 0
087: || config.getQueueSize() == Integer.MAX_VALUE) {
088: queue = new LinkedBlockingQueue<Runnable>();
089: } else {
090: queue = new ArrayBlockingQueue<Runnable>(config
091: .getQueueSize());
092: }
093: ThreadFactory factory = new DefaultThreadFactory(id, config
094: .isThreadDaemon(), config.getThreadPriority());
095: RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
096: ThreadPoolExecutor service = new ThreadPoolExecutor(config
097: .getCorePoolSize(),
098: config.getMaximumPoolSize() < 0 ? Integer.MAX_VALUE
099: : config.getMaximumPoolSize(), config
100: .getKeepAliveTime(), TimeUnit.MILLISECONDS,
101: queue, factory, handler);
102: if (config.isAllowCoreThreadsTimeout()) {
103: try {
104: Method mth = service.getClass().getMethod(
105: "allowCoreThreadTimeOut",
106: new Class[] { boolean.class });
107: mth.invoke(service, new Object[] { Boolean.TRUE });
108: } catch (Throwable t) {
109: // Do nothing
110: }
111: }
112: return service;
113: }
114:
115: /**
116: * The default thread factory
117: */
118: static class DefaultThreadFactory implements ThreadFactory {
119: final ThreadGroup group;
120:
121: final AtomicInteger threadNumber = new AtomicInteger(1);
122:
123: final String namePrefix;
124:
125: final boolean daemon;
126:
127: final int priority;
128:
129: DefaultThreadFactory(String id, boolean daemon, int priority) {
130: SecurityManager s = System.getSecurityManager();
131: group = (s != null) ? s.getThreadGroup() : Thread
132: .currentThread().getThreadGroup();
133: namePrefix = "pool-" + id + "-thread-";
134: this .daemon = daemon;
135: this .priority = priority;
136: }
137:
138: public Thread newThread(Runnable r) {
139: Thread t = new Thread(group, r, namePrefix
140: + threadNumber.getAndIncrement(), 0);
141: if (t.isDaemon() != daemon) {
142: t.setDaemon(daemon);
143: }
144: if (t.getPriority() != priority) {
145: t.setPriority(priority);
146: }
147: return t;
148: }
149: }
150:
151: /**
152: * @return the configs
153: */
154: public Map<String, ExecutorConfig> getConfigs() {
155: return configs;
156: }
157:
158: /**
159: * @param configs the configs to set
160: */
161: public void setConfigs(Map<String, ExecutorConfig> configs) {
162: this .configs = configs;
163: }
164:
165: /**
166: * @return the defaultConfig
167: */
168: public ExecutorConfig getDefaultConfig() {
169: return defaultConfig;
170: }
171:
172: /**
173: * @param defaultConfig the defaultConfig to set
174: */
175: public void setDefaultConfig(ExecutorConfig defaultConfig) {
176: this.defaultConfig = defaultConfig;
177: }
178:
179: }
|