001: /*
002: * $HeadURL: https://svn.apache.org/repos/asf/httpcomponents/httpcore/tags/4.0-beta1/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java $
003: * $Revision: 613298 $
004: * $Date: 2008-01-18 23:09:22 +0100 (Fri, 18 Jan 2008) $
005: *
006: * ====================================================================
007: * Licensed to the Apache Software Foundation (ASF) under one
008: * or more contributor license agreements. See the NOTICE file
009: * distributed with this work for additional information
010: * regarding copyright ownership. The ASF licenses this file
011: * to you under the Apache License, Version 2.0 (the
012: * "License"); you may not use this file except in compliance
013: * with the License. You may obtain a copy of the License at
014: *
015: * http://www.apache.org/licenses/LICENSE-2.0
016: *
017: * Unless required by applicable law or agreed to in writing,
018: * software distributed under the License is distributed on an
019: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
020: * KIND, either express or implied. See the License for the
021: * specific language governing permissions and limitations
022: * under the License.
023: * ====================================================================
024: *
025: * This software consists of voluntary contributions made by many
026: * individuals on behalf of the Apache Software Foundation. For more
027: * information on the Apache Software Foundation, please see
028: * <http://www.apache.org/>.
029: *
030: */
031:
032: package org.apache.http.impl.nio.reactor;
033:
034: import java.io.IOException;
035: import java.io.InterruptedIOException;
036: import java.net.Socket;
037: import java.nio.channels.Channel;
038: import java.nio.channels.ClosedChannelException;
039: import java.nio.channels.ClosedSelectorException;
040: import java.nio.channels.SelectableChannel;
041: import java.nio.channels.SelectionKey;
042: import java.nio.channels.Selector;
043: import java.util.Iterator;
044: import java.util.Set;
045: import java.util.concurrent.ThreadFactory;
046:
047: import org.apache.http.nio.params.NIOReactorParams;
048: import org.apache.http.nio.reactor.IOEventDispatch;
049: import org.apache.http.nio.reactor.IOReactor;
050: import org.apache.http.nio.reactor.IOReactorException;
051: import org.apache.http.nio.reactor.IOReactorExceptionHandler;
052: import org.apache.http.nio.reactor.IOReactorStatus;
053: import org.apache.http.params.HttpConnectionParams;
054: import org.apache.http.params.HttpParams;
055:
056: public abstract class AbstractMultiworkerIOReactor implements IOReactor {
057:
058: protected volatile IOReactorStatus status;
059:
060: protected final HttpParams params;
061: protected final Selector selector;
062: protected final long selectTimeout;
063:
064: private final int workerCount;
065: private final ThreadFactory threadFactory;
066: private final BaseIOReactor[] dispatchers;
067: private final Worker[] workers;
068: private final Thread[] threads;
069: private final long gracePeriod;
070: private final Object shutdownMutex;
071:
072: protected IOReactorExceptionHandler exceptionHandler;
073:
074: private int currentWorker = 0;
075:
076: public AbstractMultiworkerIOReactor(int workerCount,
077: final ThreadFactory threadFactory, final HttpParams params)
078: throws IOReactorException {
079: super ();
080: if (workerCount <= 0) {
081: throw new IllegalArgumentException(
082: "Worker count may not be negative or zero");
083: }
084: if (params == null) {
085: throw new IllegalArgumentException(
086: "HTTP parameters may not be negative or zero");
087: }
088: try {
089: this .selector = Selector.open();
090: } catch (IOException ex) {
091: throw new IOReactorException("Failure opening selector", ex);
092: }
093: this .params = params;
094: this .selectTimeout = NIOReactorParams.getSelectInterval(params);
095: this .gracePeriod = NIOReactorParams.getGracePeriod(params);
096: this .shutdownMutex = new Object();
097: this .workerCount = workerCount;
098: if (threadFactory != null) {
099: this .threadFactory = threadFactory;
100: } else {
101: this .threadFactory = new DefaultThreadFactory();
102: }
103: this .dispatchers = new BaseIOReactor[workerCount];
104: this .workers = new Worker[workerCount];
105: this .threads = new Thread[workerCount];
106: this .status = IOReactorStatus.INACTIVE;
107: }
108:
109: public IOReactorStatus getStatus() {
110: return this .status;
111: }
112:
113: public void setExceptionHandler(
114: final IOReactorExceptionHandler exceptionHandler) {
115: this .exceptionHandler = exceptionHandler;
116: }
117:
118: protected abstract void processEvents(int count)
119: throws IOReactorException;
120:
121: protected abstract void cancelRequests() throws IOReactorException;
122:
123: public void execute(final IOEventDispatch eventDispatch)
124: throws InterruptedIOException, IOReactorException {
125: if (eventDispatch == null) {
126: throw new IllegalArgumentException(
127: "Event dispatcher may not be null");
128: }
129:
130: this .status = IOReactorStatus.ACTIVE;
131:
132: // Start I/O dispatchers
133: for (int i = 0; i < this .dispatchers.length; i++) {
134: BaseIOReactor dispatcher = new BaseIOReactor(
135: this .selectTimeout);
136: dispatcher.setExceptionHandler(exceptionHandler);
137: this .dispatchers[i] = dispatcher;
138: }
139: for (int i = 0; i < this .workerCount; i++) {
140: BaseIOReactor dispatcher = this .dispatchers[i];
141: this .workers[i] = new Worker(dispatcher, eventDispatch);
142: this .threads[i] = this .threadFactory
143: .newThread(this .workers[i]);
144: }
145: for (int i = 0; i < this .workerCount; i++) {
146: if (this .status != IOReactorStatus.ACTIVE) {
147: return;
148: }
149: this .threads[i].start();
150: }
151:
152: try {
153:
154: for (;;) {
155: int readyCount;
156: try {
157: readyCount = this .selector
158: .select(this .selectTimeout);
159: } catch (InterruptedIOException ex) {
160: throw ex;
161: } catch (IOException ex) {
162: throw new IOReactorException(
163: "Unexpected selector failure", ex);
164: }
165:
166: if (this .status.compareTo(IOReactorStatus.ACTIVE) > 0) {
167: break;
168: }
169: processEvents(readyCount);
170:
171: // Verify I/O dispatchers
172: for (int i = 0; i < this .workerCount; i++) {
173: Worker worker = this .workers[i];
174: Thread thread = this .threads[i];
175: if (!thread.isAlive()) {
176: Exception ex = worker.getException();
177: if (ex instanceof IOReactorException) {
178: throw (IOReactorException) ex;
179: } else if (ex instanceof InterruptedIOException) {
180: throw (InterruptedIOException) ex;
181: } else if (ex instanceof RuntimeException) {
182: throw (RuntimeException) ex;
183: } else if (ex != null) {
184: throw new IOReactorException(ex
185: .getMessage(), ex);
186: }
187: }
188: }
189: }
190:
191: } catch (ClosedSelectorException ex) {
192: } finally {
193: // Shutdown
194: try {
195: doShutdown();
196: } catch (IOException ex) {
197: throw new IOReactorException(ex.getMessage(), ex);
198: }
199: }
200: }
201:
202: protected void doShutdown() throws IOException {
203: if (this .status.compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
204: return;
205: }
206: this .status = IOReactorStatus.SHUTTING_DOWN;
207: cancelRequests();
208: this .selector.wakeup();
209:
210: // Close out all channels
211: if (this .selector.isOpen()) {
212: Set<SelectionKey> keys = this .selector.keys();
213: for (Iterator<SelectionKey> it = keys.iterator(); it
214: .hasNext();) {
215: try {
216: SelectionKey key = it.next();
217: Channel channel = key.channel();
218: if (channel != null) {
219: channel.close();
220: }
221: } catch (IOException ignore) {
222: }
223: }
224: // Stop dispatching I/O events
225: this .selector.close();
226: }
227:
228: // Attempt to shut down I/O dispatchers gracefully
229: for (int i = 0; i < this .workerCount; i++) {
230: BaseIOReactor dispatcher = this .dispatchers[i];
231: dispatcher.gracefulShutdown();
232: }
233:
234: try {
235: // Force shut down I/O dispatchers if they fail to terminate
236: // in time
237: for (int i = 0; i < this .workerCount; i++) {
238: BaseIOReactor dispatcher = this .dispatchers[i];
239: if (dispatcher.getStatus() != IOReactorStatus.INACTIVE) {
240: dispatcher.awaitShutdown(this .gracePeriod);
241: }
242: if (dispatcher.getStatus() != IOReactorStatus.SHUT_DOWN) {
243: dispatcher.hardShutdown();
244: }
245: }
246: // Join worker threads
247: for (int i = 0; i < this .workerCount; i++) {
248: Thread t = this .threads[i];
249: if (t != null) {
250: t.join(this .gracePeriod);
251: }
252: }
253: } catch (InterruptedException ex) {
254: throw new InterruptedIOException(ex.getMessage());
255: } finally {
256: synchronized (this .shutdownMutex) {
257: this .status = IOReactorStatus.SHUT_DOWN;
258: this .shutdownMutex.notifyAll();
259: }
260: }
261: }
262:
263: protected void addChannel(final ChannelEntry entry) {
264: // Distribute new channels among the workers
265: this .dispatchers[this .currentWorker++ % this .workerCount]
266: .addChannel(entry);
267: }
268:
269: protected SelectionKey registerChannel(
270: final SelectableChannel channel, int ops)
271: throws ClosedChannelException {
272: return channel.register(this .selector, ops);
273: }
274:
275: protected void prepareSocket(final Socket socket)
276: throws IOException {
277: socket.setTcpNoDelay(HttpConnectionParams
278: .getTcpNoDelay(this .params));
279: socket.setSoTimeout(HttpConnectionParams
280: .getSoTimeout(this .params));
281: int linger = HttpConnectionParams.getLinger(this .params);
282: if (linger >= 0) {
283: socket.setSoLinger(linger > 0, linger);
284: }
285: }
286:
287: protected void awaitShutdown(long timeout)
288: throws InterruptedException {
289: synchronized (this .shutdownMutex) {
290: long deadline = System.currentTimeMillis() + timeout;
291: long remaining = timeout;
292: while (this .status != IOReactorStatus.SHUT_DOWN) {
293: this .shutdownMutex.wait(remaining);
294: if (timeout > 0) {
295: remaining = deadline - System.currentTimeMillis();
296: if (remaining <= 0) {
297: break;
298: }
299: }
300: }
301: }
302: }
303:
304: public void shutdown() throws IOException {
305: shutdown(2000);
306: }
307:
308: public void shutdown(long waitMs) throws IOException {
309: if (this .status != IOReactorStatus.ACTIVE) {
310: return;
311: }
312: this .status = IOReactorStatus.SHUTDOWN_REQUEST;
313: this .selector.wakeup();
314: try {
315: awaitShutdown(waitMs);
316: } catch (InterruptedException ignore) {
317: }
318: }
319:
320: static class Worker implements Runnable {
321:
322: final BaseIOReactor dispatcher;
323: final IOEventDispatch eventDispatch;
324:
325: private volatile Exception exception;
326:
327: public Worker(final BaseIOReactor dispatcher,
328: final IOEventDispatch eventDispatch) {
329: super ();
330: this .dispatcher = dispatcher;
331: this .eventDispatch = eventDispatch;
332: }
333:
334: public void run() {
335: try {
336: this .dispatcher.execute(this .eventDispatch);
337: } catch (InterruptedIOException ex) {
338: this .exception = ex;
339: } catch (IOReactorException ex) {
340: this .exception = ex;
341: } catch (RuntimeException ex) {
342: this .exception = ex;
343: }
344: }
345:
346: public Exception getException() {
347: return this .exception;
348: }
349:
350: }
351:
352: static class DefaultThreadFactory implements ThreadFactory {
353:
354: private static int COUNT = 0;
355:
356: public Thread newThread(final Runnable r) {
357: return new Thread(r, "I/O dispatcher " + (++COUNT));
358: }
359:
360: }
361:
362: }
|