001: /*
002: * $HeadURL: https://svn.apache.org/repos/asf/httpcomponents/httpcore/tags/4.0-beta1/module-nio/src/main/java/org/apache/http/impl/nio/reactor/BaseIOReactor.java $
003: * $Revision: 613298 $
004: * $Date: 2008-01-18 23:09:22 +0100 (Fri, 18 Jan 2008) $
005: *
006: * ====================================================================
007: * Licensed to the Apache Software Foundation (ASF) under one
008: * or more contributor license agreements. See the NOTICE file
009: * distributed with this work for additional information
010: * regarding copyright ownership. The ASF licenses this file
011: * to you under the Apache License, Version 2.0 (the
012: * "License"); you may not use this file except in compliance
013: * with the License. You may obtain a copy of the License at
014: *
015: * http://www.apache.org/licenses/LICENSE-2.0
016: *
017: * Unless required by applicable law or agreed to in writing,
018: * software distributed under the License is distributed on an
019: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
020: * KIND, either express or implied. See the License for the
021: * specific language governing permissions and limitations
022: * under the License.
023: * ====================================================================
024: *
025: * This software consists of voluntary contributions made by many
026: * individuals on behalf of the Apache Software Foundation. For more
027: * information on the Apache Software Foundation, please see
028: * <http://www.apache.org/>.
029: *
030: */
031:
032: package org.apache.http.impl.nio.reactor;
033:
034: import java.io.InterruptedIOException;
035: import java.nio.channels.CancelledKeyException;
036: import java.nio.channels.SelectionKey;
037: import java.util.HashSet;
038: import java.util.Iterator;
039: import java.util.Set;
040:
041: import org.apache.http.nio.reactor.EventMask;
042: import org.apache.http.nio.reactor.IOEventDispatch;
043: import org.apache.http.nio.reactor.IOReactorException;
044: import org.apache.http.nio.reactor.IOReactorExceptionHandler;
045: import org.apache.http.nio.reactor.IOSession;
046:
047: public class BaseIOReactor extends AbstractIOReactor {
048:
049: private final long timeoutCheckInterval;
050: private final Set<IOSession> bufferingSessions;
051:
052: private long lastTimeoutCheck;
053:
054: private IOReactorExceptionHandler exceptionHandler = null;
055: private IOEventDispatch eventDispatch = null;
056:
057: public BaseIOReactor(long selectTimeout) throws IOReactorException {
058: super (selectTimeout);
059: this .bufferingSessions = new HashSet<IOSession>();
060: this .timeoutCheckInterval = selectTimeout;
061: this .lastTimeoutCheck = System.currentTimeMillis();
062: }
063:
064: public void execute(final IOEventDispatch eventDispatch)
065: throws InterruptedIOException, IOReactorException {
066: if (eventDispatch == null) {
067: throw new IllegalArgumentException(
068: "Event dispatcher may not be null");
069: }
070: this .eventDispatch = eventDispatch;
071: execute();
072: }
073:
074: public void setExceptionHandler(
075: IOReactorExceptionHandler exceptionHandler) {
076: this .exceptionHandler = exceptionHandler;
077: }
078:
079: protected void handleRuntimeException(final RuntimeException ex) {
080: if (this .exceptionHandler == null
081: || !this .exceptionHandler.handle(ex)) {
082: throw ex;
083: }
084: }
085:
086: @Override
087: protected void acceptable(final SelectionKey key) {
088: }
089:
090: @Override
091: protected void connectable(final SelectionKey key) {
092: }
093:
094: @Override
095: protected void readable(final SelectionKey key) {
096: SessionHandle handle = (SessionHandle) key.attachment();
097: IOSession session = handle.getSession();
098: handle.resetLastRead();
099:
100: try {
101: this .eventDispatch.inputReady(session);
102: } catch (RuntimeException ex) {
103: handleRuntimeException(ex);
104: }
105: if (session.hasBufferedInput()) {
106: this .bufferingSessions.add(session);
107: }
108: }
109:
110: @Override
111: protected void writable(final SelectionKey key) {
112: SessionHandle handle = (SessionHandle) key.attachment();
113: IOSession session = handle.getSession();
114: handle.resetLastWrite();
115:
116: try {
117: this .eventDispatch.outputReady(session);
118: } catch (RuntimeException ex) {
119: handleRuntimeException(ex);
120: }
121: }
122:
123: @Override
124: protected void validate(final Set<SelectionKey> keys) {
125: long currentTime = System.currentTimeMillis();
126: if ((currentTime - this .lastTimeoutCheck) >= this .timeoutCheckInterval) {
127: this .lastTimeoutCheck = currentTime;
128: if (keys != null) {
129: for (Iterator<SelectionKey> it = keys.iterator(); it
130: .hasNext();) {
131: SelectionKey key = it.next();
132: timeoutCheck(key, currentTime);
133: }
134: }
135: }
136: if (!this .bufferingSessions.isEmpty()) {
137: for (Iterator<IOSession> it = this .bufferingSessions
138: .iterator(); it.hasNext();) {
139: IOSession session = it.next();
140: if (!session.hasBufferedInput()) {
141: it.remove();
142: continue;
143: }
144: try {
145: int ops = session.getEventMask();
146: if ((ops & EventMask.READ) > 0) {
147: try {
148: this .eventDispatch.inputReady(session);
149: } catch (RuntimeException ex) {
150: handleRuntimeException(ex);
151: }
152: if (!session.hasBufferedInput()) {
153: it.remove();
154: }
155: }
156: } catch (CancelledKeyException ex) {
157: it.remove();
158: }
159: }
160: }
161: }
162:
163: @Override
164: protected void timeoutCheck(final SelectionKey key, long now) {
165: Object attachment = key.attachment();
166: if (attachment instanceof SessionHandle) {
167: SessionHandle handle = (SessionHandle) key.attachment();
168: IOSession session = handle.getSession();
169: int timeout = session.getSocketTimeout();
170: if (timeout > 0) {
171: if (handle.getLastReadTime() + timeout < now) {
172: try {
173: this .eventDispatch.timeout(session);
174: } catch (RuntimeException ex) {
175: handleRuntimeException(ex);
176: }
177: }
178: }
179: }
180: }
181:
182: @Override
183: protected void keyCreated(final SelectionKey key,
184: final IOSession session) {
185: SessionHandle handle = new SessionHandle(session);
186: key.attach(handle);
187: try {
188: this .eventDispatch.connected(session);
189: } catch (RuntimeException ex) {
190: handleRuntimeException(ex);
191: }
192: }
193:
194: @Override
195: protected IOSession keyCancelled(final SelectionKey key) {
196: Object attachment = key.attachment();
197: if (attachment instanceof SessionHandle) {
198: SessionHandle handle = (SessionHandle) attachment;
199: return handle.getSession();
200: } else {
201: return null;
202: }
203: }
204:
205: @Override
206: protected void sessionClosed(final IOSession session) {
207: try {
208: this .eventDispatch.disconnected(session);
209: } catch (RuntimeException ex) {
210: handleRuntimeException(ex);
211: }
212: }
213:
214: }
|