001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.common;
021:
022: import java.util.concurrent.Executor;
023: import java.util.concurrent.ExecutorService;
024: import java.util.concurrent.Executors;
025: import java.util.concurrent.atomic.AtomicInteger;
026:
027: import org.slf4j.Logger;
028: import org.slf4j.LoggerFactory;
029:
030: /**
031: * An {@link IoProcessor} pool that distributes {@link IoSession}s into one or more
032: * {@link IoProcessor}s. Most current transport implementations use this pool internally
033: * to perform better in a multi-core environment, and therefore, you won't need to
034: * use this pool directly unless you are running multiple {@link IoService}s in the
035: * same JVM.
036: * <p>
037: * If you are running multiple {@link IoService}s, you could want to share the pool
038: * among all services. To do so, you can create a new {@link SimpleIoProcessorPool}
039: * instance by yourself and provide the pool as a constructor parameter when you
040: * create the services.
041: * <p>
042: * This pool uses Java reflection API to create multiple {@link IoProcessor} instances.
043: * It tries to instantiate the processor in the following order:
044: * <ol>
045: * <li>A public constructor with one {@link ExecutorService} parameter.</li>
046: * <li>A public constructor with one {@link Executor} parameter.</li>
047: * <li>A public default constructor</li>
048: * </ol>
049: * The following is an example for the NIO socket transport:
050: * <pre><code>
051: * // Create a shared pool.
052: * SimpleIoProcessorPool<NioSession> pool =
053: * new SimpleIoProcessorPool<NioSession>(NioProcessor.class, 16);
054: *
055: * // Create two services that share the same pool.
056: * SocketAcceptor acceptor = new NioSocketAcceptor(pool);
057: * SocketConnector connector = new NioSocketConnector(pool);
058: *
059: * ...
060: *
061: * // Release related resources.
062: * connector.dispose();
063: * acceptor.dispose();
064: * pool.dispose();
065: * </code></pre>
066: *
067: * @author The Apache MINA Project (dev@mina.apache.org)
068: * @version $Rev: 609876 $, $Date: 2008-01-07 22:40:38 -0700 (Mon, 07 Jan 2008) $
069: *
070: * @param <T> the type of the {@link IoSession} to be managed by the specified
071: * {@link IoProcessor}.
072: */
073: public class SimpleIoProcessorPool<T extends AbstractIoSession>
074: implements IoProcessor<T> {
075:
076: private static final int DEFAULT_SIZE = Runtime.getRuntime()
077: .availableProcessors() + 1;
078: private static final AttributeKey PROCESSOR = new AttributeKey(
079: SimpleIoProcessorPool.class, "processor");
080:
081: private final Logger logger = LoggerFactory.getLogger(getClass());
082:
083: private final IoProcessor<T>[] pool;
084: private final AtomicInteger processorDistributor = new AtomicInteger();
085: private final Executor executor;
086: private final boolean createdExecutor;
087:
088: private final Object disposalLock = new Object();
089: private volatile boolean disposing;
090: private volatile boolean disposed;
091:
092: public SimpleIoProcessorPool(
093: Class<? extends IoProcessor<T>> processorType) {
094: this (processorType, null, DEFAULT_SIZE);
095: }
096:
097: public SimpleIoProcessorPool(
098: Class<? extends IoProcessor<T>> processorType, int size) {
099: this (processorType, null, size);
100: }
101:
102: public SimpleIoProcessorPool(
103: Class<? extends IoProcessor<T>> processorType,
104: Executor executor) {
105: this (processorType, executor, DEFAULT_SIZE);
106: }
107:
108: @SuppressWarnings("unchecked")
109: public SimpleIoProcessorPool(
110: Class<? extends IoProcessor<T>> processorType,
111: Executor executor, int size) {
112: if (processorType == null) {
113: throw new NullPointerException("processorType");
114: }
115: if (size <= 0) {
116: throw new IllegalArgumentException("size: " + size
117: + " (expected: positive integer)");
118: }
119:
120: if (executor == null) {
121: this .executor = executor = Executors.newCachedThreadPool();
122: this .createdExecutor = true;
123: } else {
124: this .executor = executor;
125: this .createdExecutor = false;
126: }
127:
128: pool = new IoProcessor[size];
129:
130: boolean success = false;
131: try {
132: for (int i = 0; i < pool.length; i++) {
133: IoProcessor<T> processor = null;
134:
135: // Try to create a new processor with a proper constructor.
136: try {
137: try {
138: processor = processorType.getConstructor(
139: ExecutorService.class).newInstance(
140: executor);
141: } catch (NoSuchMethodException e) {
142: // To the next step...
143: }
144:
145: if (processor == null) {
146: try {
147: processor = processorType.getConstructor(
148: Executor.class).newInstance(
149: executor);
150: } catch (NoSuchMethodException e) {
151: // To the next step...
152: }
153: }
154:
155: if (processor == null) {
156: try {
157: processor = processorType.getConstructor()
158: .newInstance();
159: } catch (NoSuchMethodException e) {
160: // To the next step...
161: }
162: }
163: } catch (RuntimeException e) {
164: throw e;
165: } catch (Exception e) {
166: throw new RuntimeIoException(
167: "Failed to create a new instance of "
168: + processorType.getName(), e);
169: }
170:
171: // Raise an exception if no proper constructor is found.
172: if (processor == null) {
173: throw new IllegalArgumentException(
174: String.valueOf(processorType)
175: + " must have a public constructor "
176: + "with one "
177: + ExecutorService.class
178: .getSimpleName()
179: + " parameter, "
180: + "a public constructor with one "
181: + Executor.class.getSimpleName()
182: + " parameter or a public default constructor.");
183: }
184:
185: pool[i] = processor;
186: }
187:
188: success = true;
189: } finally {
190: if (!success) {
191: dispose();
192: }
193: }
194: }
195:
196: public final void add(T session) {
197: getProcessor(session).add(session);
198: }
199:
200: public final void flush(T session) {
201: getProcessor(session).flush(session);
202: }
203:
204: public final void remove(T session) {
205: getProcessor(session).remove(session);
206: }
207:
208: public final void updateTrafficMask(T session) {
209: getProcessor(session).updateTrafficMask(session);
210: }
211:
212: public boolean isDisposed() {
213: return disposed;
214: }
215:
216: public boolean isDisposing() {
217: return disposing;
218: }
219:
220: public final void dispose() {
221: if (disposed) {
222: return;
223: }
224:
225: synchronized (disposalLock) {
226: if (!disposing) {
227: disposing = true;
228: for (int i = pool.length - 1; i >= 0; i--) {
229: if (pool[i] == null || pool[i].isDisposing()) {
230: continue;
231: }
232:
233: try {
234: pool[i].dispose();
235: } catch (Exception e) {
236: logger.warn("Failed to dispose a "
237: + pool[i].getClass().getSimpleName()
238: + " at index " + i + ".", e);
239: } finally {
240: pool[i] = null;
241: }
242: }
243:
244: if (createdExecutor) {
245: ((ExecutorService) executor).shutdown();
246: }
247: }
248: }
249:
250: disposed = true;
251: }
252:
253: @SuppressWarnings("unchecked")
254: private IoProcessor<T> getProcessor(T session) {
255: IoProcessor<T> p = (IoProcessor<T>) session
256: .getAttribute(PROCESSOR);
257: if (p == null) {
258: p = nextProcessor();
259: IoProcessor<T> oldp = (IoProcessor<T>) session
260: .setAttributeIfAbsent(PROCESSOR, p);
261: if (oldp != null) {
262: p = oldp;
263: }
264: }
265:
266: return p;
267: }
268:
269: private IoProcessor<T> nextProcessor() {
270: checkDisposal();
271: return pool[Math.abs(processorDistributor.getAndIncrement())
272: % pool.length];
273: }
274:
275: private void checkDisposal() {
276: if (disposed) {
277: throw new IllegalStateException(
278: "A disposed processor cannot be accessed.");
279: }
280: }
281: }
|