001: /*
002: * $HeadURL: https://svn.apache.org/repos/asf/httpcomponents/httpcore/tags/4.0-beta1/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java $
003: * $Revision: 613612 $
004: * $Date: 2008-01-20 18:03:05 +0100 (Sun, 20 Jan 2008) $
005: *
006: * ====================================================================
007: * Licensed to the Apache Software Foundation (ASF) under one
008: * or more contributor license agreements. See the NOTICE file
009: * distributed with this work for additional information
010: * regarding copyright ownership. The ASF licenses this file
011: * to you under the Apache License, Version 2.0 (the
012: * "License"); you may not use this file except in compliance
013: * with the License. You may obtain a copy of the License at
014: *
015: * http://www.apache.org/licenses/LICENSE-2.0
016: *
017: * Unless required by applicable law or agreed to in writing,
018: * software distributed under the License is distributed on an
019: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
020: * KIND, either express or implied. See the License for the
021: * specific language governing permissions and limitations
022: * under the License.
023: * ====================================================================
024: *
025: * This software consists of voluntary contributions made by many
026: * individuals on behalf of the Apache Software Foundation. For more
027: * information on the Apache Software Foundation, please see
028: * <http://www.apache.org/>.
029: *
030: */
031:
032: package org.apache.http.impl.nio.reactor;
033:
034: import java.io.IOException;
035: import java.net.SocketAddress;
036: import java.nio.channels.CancelledKeyException;
037: import java.nio.channels.SelectionKey;
038: import java.nio.channels.ServerSocketChannel;
039: import java.nio.channels.SocketChannel;
040: import java.util.Collections;
041: import java.util.HashSet;
042: import java.util.Iterator;
043: import java.util.Queue;
044: import java.util.Set;
045: import java.util.concurrent.ConcurrentLinkedQueue;
046: import java.util.concurrent.ThreadFactory;
047:
048: import org.apache.http.nio.reactor.IOReactorException;
049: import org.apache.http.nio.reactor.IOReactorStatus;
050: import org.apache.http.nio.reactor.ListenerEndpoint;
051: import org.apache.http.nio.reactor.ListeningIOReactor;
052: import org.apache.http.params.HttpParams;
053:
054: public class DefaultListeningIOReactor extends
055: AbstractMultiworkerIOReactor implements ListeningIOReactor {
056:
057: private final Queue<ListenerEndpointImpl> requestQueue;
058: private final Set<ListenerEndpointImpl> endpoints;
059: private final Set<SocketAddress> pausedEndpoints;
060:
061: private volatile boolean paused;
062:
063: public DefaultListeningIOReactor(int workerCount,
064: final ThreadFactory threadFactory, final HttpParams params)
065: throws IOReactorException {
066: super (workerCount, threadFactory, params);
067: this .requestQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
068: this .endpoints = Collections
069: .synchronizedSet(new HashSet<ListenerEndpointImpl>());
070: this .pausedEndpoints = new HashSet<SocketAddress>();
071: }
072:
073: public DefaultListeningIOReactor(int workerCount,
074: final HttpParams params) throws IOReactorException {
075: this (workerCount, null, params);
076: }
077:
078: @Override
079: protected void cancelRequests() throws IOReactorException {
080: ListenerEndpointImpl request;
081: while ((request = this .requestQueue.poll()) != null) {
082: request.cancel();
083: }
084: }
085:
086: @Override
087: protected void processEvents(int readyCount)
088: throws IOReactorException {
089: if (!this .paused) {
090: processSessionRequests();
091: }
092:
093: if (readyCount > 0) {
094: Set<SelectionKey> selectedKeys = this .selector
095: .selectedKeys();
096: for (Iterator<SelectionKey> it = selectedKeys.iterator(); it
097: .hasNext();) {
098:
099: SelectionKey key = it.next();
100: processEvent(key);
101:
102: }
103: selectedKeys.clear();
104: }
105: }
106:
107: private void processEvent(final SelectionKey key)
108: throws IOReactorException {
109: try {
110:
111: if (key.isAcceptable()) {
112:
113: ServerSocketChannel serverChannel = (ServerSocketChannel) key
114: .channel();
115: SocketChannel socketChannel = null;
116: try {
117: socketChannel = serverChannel.accept();
118: } catch (IOException ex) {
119: if (this .exceptionHandler == null
120: || !this .exceptionHandler.handle(ex)) {
121: throw new IOReactorException(
122: "Failure accepting connection", ex);
123: }
124: }
125:
126: if (socketChannel != null) {
127: try {
128: prepareSocket(socketChannel.socket());
129: } catch (IOException ex) {
130: if (this .exceptionHandler == null
131: || !this .exceptionHandler.handle(ex)) {
132: throw new IOReactorException(
133: "Failure initalizing socket", ex);
134: }
135: }
136: ChannelEntry entry = new ChannelEntry(socketChannel);
137: addChannel(entry);
138: }
139: }
140:
141: } catch (CancelledKeyException ex) {
142: ListenerEndpoint endpoint = (ListenerEndpoint) key
143: .attachment();
144: this .endpoints.remove(endpoint);
145: key.attach(null);
146: }
147: }
148:
149: private ListenerEndpointImpl createEndpoint(
150: final SocketAddress address) {
151: ListenerEndpointImpl endpoint = new ListenerEndpointImpl(
152: address, new ListenerEndpointClosedCallback() {
153:
154: public void endpointClosed(
155: final ListenerEndpoint endpoint) {
156: endpoints.remove(endpoint);
157: }
158:
159: });
160: return endpoint;
161: }
162:
163: public ListenerEndpoint listen(final SocketAddress address) {
164: if (this .status.compareTo(IOReactorStatus.ACTIVE) > 0) {
165: throw new IllegalStateException(
166: "I/O reactor has been shut down");
167: }
168: ListenerEndpointImpl request = createEndpoint(address);
169: this .requestQueue.add(request);
170: this .selector.wakeup();
171: return request;
172: }
173:
174: private void processSessionRequests() throws IOReactorException {
175: ListenerEndpointImpl request;
176: while ((request = this .requestQueue.poll()) != null) {
177: SocketAddress address = request.getAddress();
178: ServerSocketChannel serverChannel;
179: try {
180: serverChannel = ServerSocketChannel.open();
181: serverChannel.configureBlocking(false);
182: } catch (IOException ex) {
183: throw new IOReactorException(
184: "Failure opening server socket", ex);
185: }
186: try {
187: serverChannel.socket().bind(address);
188: } catch (IOException ex) {
189: request.failed(ex);
190: if (this .exceptionHandler == null
191: || !this .exceptionHandler.handle(ex)) {
192: throw new IOReactorException(
193: "Failure binding socket to address "
194: + address, ex);
195: } else {
196: return;
197: }
198: }
199: try {
200: SelectionKey key = serverChannel.register(
201: this .selector, SelectionKey.OP_ACCEPT);
202: key.attach(request);
203: request.setKey(key);
204: } catch (IOException ex) {
205: throw new IOReactorException(
206: "Failure registering channel "
207: + "with the selector", ex);
208: }
209:
210: this .endpoints.add(request);
211: request.completed(serverChannel.socket()
212: .getLocalSocketAddress());
213: }
214: }
215:
216: public Set<ListenerEndpoint> getEndpoints() {
217: Set<ListenerEndpoint> set = new HashSet<ListenerEndpoint>();
218: synchronized (this .endpoints) {
219: Iterator<ListenerEndpointImpl> it = this .endpoints
220: .iterator();
221: while (it.hasNext()) {
222: ListenerEndpoint endpoint = it.next();
223: if (!endpoint.isClosed()) {
224: set.add(endpoint);
225: } else {
226: it.remove();
227: }
228: }
229: }
230: return set;
231: }
232:
233: public void pause() throws IOException {
234: if (this .paused) {
235: return;
236: }
237: this .paused = true;
238: synchronized (this .endpoints) {
239: Iterator<ListenerEndpointImpl> it = this .endpoints
240: .iterator();
241: while (it.hasNext()) {
242: ListenerEndpoint endpoint = it.next();
243: if (!endpoint.isClosed()) {
244: endpoint.close();
245: this .pausedEndpoints.add(endpoint.getAddress());
246: }
247: }
248: this .endpoints.clear();
249: }
250: }
251:
252: public void resume() throws IOException {
253: if (!this .paused) {
254: return;
255: }
256: this .paused = false;
257: for (SocketAddress address : this .pausedEndpoints) {
258: ListenerEndpointImpl request = createEndpoint(address);
259: this.requestQueue.add(request);
260: }
261: this.pausedEndpoints.clear();
262: this.selector.wakeup();
263: }
264:
265: }
|