001: /*
002: * $HeadURL: https://svn.apache.org/repos/asf/httpcomponents/httpcore/tags/4.0-beta1/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java $
003: * $Revision: 613298 $
004: * $Date: 2008-01-18 23:09:22 +0100 (Fri, 18 Jan 2008) $
005: *
006: * ====================================================================
007: * Licensed to the Apache Software Foundation (ASF) under one
008: * or more contributor license agreements. See the NOTICE file
009: * distributed with this work for additional information
010: * regarding copyright ownership. The ASF licenses this file
011: * to you under the Apache License, Version 2.0 (the
012: * "License"); you may not use this file except in compliance
013: * with the License. You may obtain a copy of the License at
014: *
015: * http://www.apache.org/licenses/LICENSE-2.0
016: *
017: * Unless required by applicable law or agreed to in writing,
018: * software distributed under the License is distributed on an
019: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
020: * KIND, either express or implied. See the License for the
021: * specific language governing permissions and limitations
022: * under the License.
023: * ====================================================================
024: *
025: * This software consists of voluntary contributions made by many
026: * individuals on behalf of the Apache Software Foundation. For more
027: * information on the Apache Software Foundation, please see
028: * <http://www.apache.org/>.
029: *
030: */
031:
032: package org.apache.http.impl.nio.reactor;
033:
034: import java.io.IOException;
035: import java.net.InetSocketAddress;
036: import java.net.SocketAddress;
037: import java.net.UnknownHostException;
038: import java.nio.channels.CancelledKeyException;
039: import java.nio.channels.SelectionKey;
040: import java.nio.channels.SocketChannel;
041: import java.util.Iterator;
042: import java.util.Queue;
043: import java.util.Set;
044: import java.util.concurrent.ConcurrentLinkedQueue;
045: import java.util.concurrent.ThreadFactory;
046:
047: import org.apache.http.nio.reactor.ConnectingIOReactor;
048: import org.apache.http.nio.reactor.IOReactorException;
049: import org.apache.http.nio.reactor.IOReactorStatus;
050: import org.apache.http.nio.reactor.SessionRequest;
051: import org.apache.http.nio.reactor.SessionRequestCallback;
052: import org.apache.http.params.HttpConnectionParams;
053: import org.apache.http.params.HttpParams;
054:
055: public class DefaultConnectingIOReactor extends
056: AbstractMultiworkerIOReactor implements ConnectingIOReactor {
057:
058: private final Queue<SessionRequestImpl> requestQueue;
059:
060: private long lastTimeoutCheck;
061:
062: public DefaultConnectingIOReactor(int workerCount,
063: final ThreadFactory threadFactory, final HttpParams params)
064: throws IOReactorException {
065: super (workerCount, threadFactory, params);
066: this .requestQueue = new ConcurrentLinkedQueue<SessionRequestImpl>();
067: this .lastTimeoutCheck = System.currentTimeMillis();
068: }
069:
070: public DefaultConnectingIOReactor(int workerCount,
071: final HttpParams params) throws IOReactorException {
072: this (workerCount, null, params);
073: }
074:
075: @Override
076: protected void cancelRequests() throws IOReactorException {
077: SessionRequestImpl request;
078: while ((request = this .requestQueue.poll()) != null) {
079: request.cancel();
080: }
081: }
082:
083: @Override
084: protected void processEvents(int readyCount)
085: throws IOReactorException {
086: processSessionRequests();
087:
088: if (readyCount > 0) {
089: Set<SelectionKey> selectedKeys = this .selector
090: .selectedKeys();
091: for (Iterator<SelectionKey> it = selectedKeys.iterator(); it
092: .hasNext();) {
093:
094: SelectionKey key = it.next();
095: processEvent(key);
096:
097: }
098: selectedKeys.clear();
099: }
100:
101: long currentTime = System.currentTimeMillis();
102: if ((currentTime - this .lastTimeoutCheck) >= this .selectTimeout) {
103: this .lastTimeoutCheck = currentTime;
104: Set<SelectionKey> keys = this .selector.keys();
105: processTimeouts(keys);
106: }
107: }
108:
109: private void processEvent(final SelectionKey key) {
110: try {
111:
112: if (key.isConnectable()) {
113:
114: SocketChannel channel = (SocketChannel) key.channel();
115: // Get request handle
116: SessionRequestHandle requestHandle = (SessionRequestHandle) key
117: .attachment();
118: SessionRequestImpl sessionRequest = requestHandle
119: .getSessionRequest();
120:
121: // Finish connection process
122: try {
123: channel.finishConnect();
124: } catch (IOException ex) {
125: sessionRequest.failed(ex);
126: }
127: key.cancel();
128: if (channel.isConnected()) {
129: try {
130: try {
131: prepareSocket(channel.socket());
132: } catch (IOException ex) {
133: if (this .exceptionHandler == null
134: || !this .exceptionHandler
135: .handle(ex)) {
136: throw new IOReactorException(
137: "Failure initalizing socket",
138: ex);
139: }
140: }
141: ChannelEntry entry = new ChannelEntry(channel,
142: sessionRequest);
143: addChannel(entry);
144: } catch (IOException ex) {
145: sessionRequest.failed(ex);
146: }
147: }
148: }
149:
150: } catch (CancelledKeyException ex) {
151: key.attach(null);
152: }
153: }
154:
155: private void processTimeouts(final Set<SelectionKey> keys) {
156: long now = System.currentTimeMillis();
157: for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext();) {
158: SelectionKey key = it.next();
159: Object attachment = key.attachment();
160:
161: if (attachment instanceof SessionRequestHandle) {
162: SessionRequestHandle handle = (SessionRequestHandle) key
163: .attachment();
164: SessionRequestImpl sessionRequest = handle
165: .getSessionRequest();
166: int timeout = sessionRequest.getConnectTimeout();
167: if (timeout > 0) {
168: if (handle.getRequestTime() + timeout < now) {
169: sessionRequest.timeout();
170: }
171: }
172: }
173:
174: }
175: }
176:
177: public SessionRequest connect(final SocketAddress remoteAddress,
178: final SocketAddress localAddress, final Object attachment,
179: final SessionRequestCallback callback) {
180:
181: if (this .status.compareTo(IOReactorStatus.ACTIVE) > 0) {
182: throw new IllegalStateException(
183: "I/O reactor has been shut down");
184: }
185: SessionRequestImpl sessionRequest = new SessionRequestImpl(
186: remoteAddress, localAddress, attachment, callback);
187: sessionRequest.setConnectTimeout(HttpConnectionParams
188: .getConnectionTimeout(this .params));
189:
190: this .requestQueue.add(sessionRequest);
191: this .selector.wakeup();
192:
193: return sessionRequest;
194: }
195:
196: private void validateAddress(final SocketAddress address)
197: throws UnknownHostException {
198: if (address == null) {
199: return;
200: }
201: if (address instanceof InetSocketAddress) {
202: InetSocketAddress endpoint = (InetSocketAddress) address;
203: if (endpoint.isUnresolved()) {
204: throw new UnknownHostException(endpoint.getHostName());
205: }
206: }
207: }
208:
209: private void processSessionRequests() throws IOReactorException {
210: SessionRequestImpl request;
211: while ((request = this .requestQueue.poll()) != null) {
212: if (request.isCompleted()) {
213: continue;
214: }
215: SocketChannel socketChannel;
216: try {
217: socketChannel = SocketChannel.open();
218: socketChannel.configureBlocking(false);
219: } catch (IOException ex) {
220: throw new IOReactorException("Failure opening socket",
221: ex);
222: }
223: try {
224: validateAddress(request.getLocalAddress());
225: validateAddress(request.getRemoteAddress());
226:
227: if (request.getLocalAddress() != null) {
228: socketChannel.socket().bind(
229: request.getLocalAddress());
230: }
231: boolean connected = socketChannel.connect(request
232: .getRemoteAddress());
233: if (connected) {
234: prepareSocket(socketChannel.socket());
235: ChannelEntry entry = new ChannelEntry(
236: socketChannel, request);
237: addChannel(entry);
238: return;
239: }
240: } catch (IOException ex) {
241: request.failed(ex);
242: return;
243: }
244:
245: SelectionKey key;
246: try {
247: key = socketChannel.register(this .selector, 0);
248: request.setKey(key);
249: } catch (IOException ex) {
250: throw new IOReactorException(
251: "Failure registering channel "
252: + "with the selector", ex);
253: }
254:
255: SessionRequestHandle requestHandle = new SessionRequestHandle(
256: request);
257: try {
258: key.attach(requestHandle);
259: key.interestOps(SelectionKey.OP_CONNECT);
260: } catch (CancelledKeyException ex) {
261: // Ignore cancelled keys
262: }
263: }
264: }
265:
266: }
|