001: /*
002: * $HeadURL: https://svn.apache.org/repos/asf/httpcomponents/httpcore/tags/4.0-beta1/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java $
003: * $Revision: 613298 $
004: * $Date: 2008-01-18 23:09:22 +0100 (Fri, 18 Jan 2008) $
005: *
006: * ====================================================================
007: * Licensed to the Apache Software Foundation (ASF) under one
008: * or more contributor license agreements. See the NOTICE file
009: * distributed with this work for additional information
010: * regarding copyright ownership. The ASF licenses this file
011: * to you under the Apache License, Version 2.0 (the
012: * "License"); you may not use this file except in compliance
013: * with the License. You may obtain a copy of the License at
014: *
015: * http://www.apache.org/licenses/LICENSE-2.0
016: *
017: * Unless required by applicable law or agreed to in writing,
018: * software distributed under the License is distributed on an
019: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
020: * KIND, either express or implied. See the License for the
021: * specific language governing permissions and limitations
022: * under the License.
023: * ====================================================================
024: *
025: * This software consists of voluntary contributions made by many
026: * individuals on behalf of the Apache Software Foundation. For more
027: * information on the Apache Software Foundation, please see
028: * <http://www.apache.org/>.
029: *
030: */
031:
032: package org.apache.http.impl.nio.reactor;
033:
034: import java.io.IOException;
035: import java.io.InterruptedIOException;
036: import java.nio.channels.CancelledKeyException;
037: import java.nio.channels.Channel;
038: import java.nio.channels.ClosedSelectorException;
039: import java.nio.channels.SelectionKey;
040: import java.nio.channels.Selector;
041: import java.nio.channels.SocketChannel;
042: import java.util.Collections;
043: import java.util.HashSet;
044: import java.util.Iterator;
045: import java.util.Queue;
046: import java.util.Set;
047: import java.util.concurrent.ConcurrentLinkedQueue;
048:
049: import org.apache.http.nio.reactor.IOReactor;
050: import org.apache.http.nio.reactor.IOReactorException;
051: import org.apache.http.nio.reactor.IOReactorStatus;
052: import org.apache.http.nio.reactor.IOSession;
053:
054: public abstract class AbstractIOReactor implements IOReactor {
055:
056: private volatile IOReactorStatus status;
057:
058: private final Object shutdownMutex;
059: private final long selectTimeout;
060: private final Selector selector;
061: private final Set<IOSession> sessions;
062: private final Queue<IOSession> closedSessions;
063: private final Queue<ChannelEntry> newChannels;
064:
065: public AbstractIOReactor(long selectTimeout)
066: throws IOReactorException {
067: super ();
068: if (selectTimeout <= 0) {
069: throw new IllegalArgumentException(
070: "Select timeout may not be negative or zero");
071: }
072: this .selectTimeout = selectTimeout;
073: this .sessions = Collections
074: .synchronizedSet(new HashSet<IOSession>());
075: this .closedSessions = new ConcurrentLinkedQueue<IOSession>();
076: this .newChannels = new ConcurrentLinkedQueue<ChannelEntry>();
077: try {
078: this .selector = Selector.open();
079: } catch (IOException ex) {
080: throw new IOReactorException("Failure opening selector", ex);
081: }
082: this .shutdownMutex = new Object();
083: this .status = IOReactorStatus.INACTIVE;
084: }
085:
086: protected abstract void acceptable(SelectionKey key);
087:
088: protected abstract void connectable(SelectionKey key);
089:
090: protected abstract void readable(SelectionKey key);
091:
092: protected abstract void writable(SelectionKey key);
093:
094: protected abstract void timeoutCheck(SelectionKey key, long now);
095:
096: protected abstract void validate(Set<SelectionKey> keys);
097:
098: protected abstract void keyCreated(SelectionKey key,
099: IOSession session);
100:
101: protected abstract IOSession keyCancelled(SelectionKey key);
102:
103: protected abstract void sessionClosed(IOSession session);
104:
105: public IOReactorStatus getStatus() {
106: return this .status;
107: }
108:
109: public void addChannel(final ChannelEntry channelEntry) {
110: if (channelEntry == null) {
111: throw new IllegalArgumentException(
112: "Channel entry may not be null");
113: }
114: this .newChannels.add(channelEntry);
115: this .selector.wakeup();
116: }
117:
118: protected void execute() throws InterruptedIOException,
119: IOReactorException {
120: this .status = IOReactorStatus.ACTIVE;
121:
122: try {
123: for (;;) {
124:
125: int readyCount;
126: try {
127: readyCount = this .selector
128: .select(this .selectTimeout);
129: } catch (InterruptedIOException ex) {
130: throw ex;
131: } catch (IOException ex) {
132: throw new IOReactorException(
133: "Unexpected selector failure", ex);
134: }
135:
136: if (this .status == IOReactorStatus.SHUT_DOWN) {
137: // Hard shut down. Exit select loop immediately
138: break;
139: }
140:
141: if (this .status == IOReactorStatus.SHUTTING_DOWN) {
142: // Graceful shutdown in process
143: // Try to close things out nicely
144: closeSessions();
145: closeNewChannels();
146: }
147:
148: // Process selected I/O events
149: if (readyCount > 0) {
150: processEvents(this .selector.selectedKeys());
151: }
152:
153: // Validate active channels
154: validate(this .selector.keys());
155:
156: // Process closed sessions
157: processClosedSessions();
158:
159: // If active process new channels
160: if (this .status == IOReactorStatus.ACTIVE) {
161: processNewChannels();
162: }
163:
164: // Exit select loop if graceful shutdown has been completed
165: if (this .status.compareTo(IOReactorStatus.ACTIVE) > 0
166: && this .sessions.isEmpty()) {
167: break;
168: }
169:
170: }
171:
172: // Close remaining active channels and the selector itself
173: closeActiveChannels();
174:
175: } catch (ClosedSelectorException ex) {
176: } finally {
177: synchronized (this .shutdownMutex) {
178: this .status = IOReactorStatus.SHUT_DOWN;
179: this .shutdownMutex.notifyAll();
180: }
181: }
182: }
183:
184: private void processEvents(final Set<SelectionKey> selectedKeys) {
185: for (Iterator<SelectionKey> it = selectedKeys.iterator(); it
186: .hasNext();) {
187:
188: SelectionKey key = it.next();
189: processEvent(key);
190:
191: }
192: selectedKeys.clear();
193: }
194:
195: protected void processEvent(final SelectionKey key) {
196: try {
197: if (key.isAcceptable()) {
198: acceptable(key);
199: }
200: if (key.isConnectable()) {
201: connectable(key);
202: }
203: if (key.isReadable()) {
204: readable(key);
205: }
206: if (key.isWritable()) {
207: writable(key);
208: }
209: } catch (CancelledKeyException ex) {
210: IOSession session = keyCancelled(key);
211: if (session != null) {
212: this .closedSessions.add(session);
213: }
214: key.attach(null);
215: }
216: }
217:
218: private void processNewChannels() throws IOReactorException {
219: ChannelEntry entry;
220: while ((entry = this .newChannels.poll()) != null) {
221:
222: SocketChannel channel;
223: SelectionKey key;
224: try {
225: channel = entry.getChannel();
226: channel.configureBlocking(false);
227: key = channel.register(this .selector, 0);
228: } catch (IOException ex) {
229: throw new IOReactorException(
230: "Failure registering channel "
231: + "with the selector", ex);
232: }
233:
234: IOSession session = new IOSessionImpl(key,
235: new SessionClosedCallback() {
236:
237: public void sessionClosed(IOSession session) {
238: closedSessions.add(session);
239: }
240:
241: });
242:
243: int timeout = 0;
244: try {
245: timeout = channel.socket().getSoTimeout();
246: } catch (IOException ex) {
247: // Very unlikely to happen and is not fatal
248: // as the protocol layer is expected to overwrite
249: // this value anyways
250: }
251:
252: session.setAttribute(IOSession.ATTACHMENT_KEY, entry
253: .getAttachment());
254: session.setSocketTimeout(timeout);
255: this .sessions.add(session);
256:
257: try {
258: keyCreated(key, session);
259:
260: SessionRequestImpl sessionRequest = entry
261: .getSessionRequest();
262: if (sessionRequest != null) {
263: sessionRequest.completed(session);
264: }
265: } catch (CancelledKeyException ex) {
266: this .closedSessions.add(session);
267: key.attach(null);
268: }
269: }
270: }
271:
272: private void processClosedSessions() {
273: IOSession session;
274: while ((session = this .closedSessions.poll()) != null) {
275: if (this .sessions.remove(session)) {
276: sessionClosed(session);
277: }
278: }
279: }
280:
281: protected void closeSessions() {
282: synchronized (this .sessions) {
283: for (Iterator<IOSession> it = this .sessions.iterator(); it
284: .hasNext();) {
285: IOSession session = it.next();
286: session.close();
287: }
288: }
289: }
290:
291: protected void closeNewChannels() throws IOReactorException {
292: ChannelEntry entry;
293: while ((entry = this .newChannels.poll()) != null) {
294: SessionRequestImpl sessionRequest = entry
295: .getSessionRequest();
296: if (sessionRequest != null) {
297: sessionRequest.cancel();
298: }
299: SocketChannel channel = entry.getChannel();
300: try {
301: channel.close();
302: } catch (IOException ignore) {
303: }
304: }
305: }
306:
307: protected void closeActiveChannels() throws IOReactorException {
308: Set<SelectionKey> keys = this .selector.keys();
309: for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext();) {
310: try {
311: SelectionKey key = it.next();
312: Channel channel = key.channel();
313: if (channel != null) {
314: channel.close();
315: }
316: } catch (IOException ignore) {
317: }
318: }
319: try {
320: this .selector.close();
321: } catch (IOException ignore) {
322: }
323: }
324:
325: public void gracefulShutdown() {
326: if (this .status != IOReactorStatus.ACTIVE) {
327: // Already shutting down
328: return;
329: }
330: this .status = IOReactorStatus.SHUTTING_DOWN;
331: this .selector.wakeup();
332: }
333:
334: public void hardShutdown() throws IOReactorException {
335: if (this .status == IOReactorStatus.SHUT_DOWN) {
336: // Already shut down
337: return;
338: }
339: this .status = IOReactorStatus.SHUT_DOWN;
340: closeNewChannels();
341: closeActiveChannels();
342: }
343:
344: public void awaitShutdown(long timeout) throws InterruptedException {
345: synchronized (this .shutdownMutex) {
346: long deadline = System.currentTimeMillis() + timeout;
347: long remaining = timeout;
348: while (this .status != IOReactorStatus.SHUT_DOWN) {
349: this .shutdownMutex.wait(remaining);
350: if (timeout > 0) {
351: remaining = deadline - System.currentTimeMillis();
352: if (remaining <= 0) {
353: break;
354: }
355: }
356: }
357: }
358: }
359:
360: public void shutdown(long gracePeriod) throws IOReactorException {
361: if (this .status != IOReactorStatus.INACTIVE) {
362: gracefulShutdown();
363: try {
364: awaitShutdown(gracePeriod);
365: } catch (InterruptedException ignore) {
366: }
367: }
368: if (this .status != IOReactorStatus.SHUT_DOWN) {
369: hardShutdown();
370: }
371: }
372:
373: public void shutdown() throws IOReactorException {
374: shutdown(1000);
375: }
376:
377: }
|