001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.axis2.transport.nhttp;
020:
021: import org.apache.axis2.context.ConfigurationContext;
022: import org.apache.axis2.transport.nhttp.util.PipeImpl;
023: import org.apache.axis2.transport.nhttp.util.WorkerPool;
024: import org.apache.axis2.transport.nhttp.util.WorkerPoolFactory;
025: import org.apache.http.*;
026: import org.apache.http.entity.BasicHttpEntity;
027: import org.apache.http.entity.ByteArrayEntity;
028: import org.apache.http.impl.DefaultConnectionReuseStrategy;
029: import org.apache.http.impl.DefaultHttpResponseFactory;
030: import org.apache.http.nio.ContentDecoder;
031: import org.apache.http.nio.ContentEncoder;
032: import org.apache.http.nio.NHttpServerConnection;
033: import org.apache.http.nio.NHttpServiceHandler;
034: import org.apache.http.params.HttpParams;
035: import org.apache.http.protocol.*;
036: import org.apache.http.util.EncodingUtils;
037: import org.apache.commons.logging.Log;
038: import org.apache.commons.logging.LogFactory;
039:
040: import java.io.IOException;
041: import java.nio.ByteBuffer;
042: import java.nio.channels.Channels;
043: import java.nio.channels.WritableByteChannel;
044: import java.nio.channels.ReadableByteChannel;
045:
046: /**
047: * The server connection handler. An instance of this class is used by each IOReactor, to
048: * process every connection. Hence this class should not store any data related to a single
049: * connection - as this is being shared.
050: */
051: public class ServerHandler implements NHttpServiceHandler {
052:
053: private static final Log log = LogFactory
054: .getLog(ServerHandler.class);
055:
056: /** the HTTP protocol parameters to adhere to */
057: private final HttpParams params;
058: /** the factory to create HTTP responses */
059: private final HttpResponseFactory responseFactory;
060: /** the HTTP response processor */
061: private final HttpProcessor httpProcessor;
062: /** the strategy to re-use connections */
063: private final ConnectionReuseStrategy connStrategy;
064:
065: /** the Axis2 configuration context */
066: ConfigurationContext cfgCtx = null;
067: /** the nhttp configuration */
068: private NHttpConfiguration cfg = null;
069: /** is this https? */
070: private boolean isHttps = false;
071:
072: /** the thread pool to process requests */
073: private WorkerPool workerPool = null;
074:
075: private static final String REQUEST_SINK_CHANNEL = "request-sink-channel";
076: private static final String RESPONSE_SOURCE_CHANNEL = "response-source-channel";
077: private static final String REQUEST_BUFFER = "request-buffer";
078: private static final String RESPONSE_BUFFER = "response-buffer";
079:
080: public ServerHandler(final ConfigurationContext cfgCtx,
081: final HttpParams params, final boolean isHttps) {
082: super ();
083: this .cfgCtx = cfgCtx;
084: this .params = params;
085: this .isHttps = isHttps;
086: this .responseFactory = new DefaultHttpResponseFactory();
087: this .httpProcessor = getHttpProcessor();
088: this .connStrategy = new DefaultConnectionReuseStrategy();
089:
090: this .cfg = NHttpConfiguration.getInstance();
091: this .workerPool = WorkerPoolFactory.getWorkerPool(cfg
092: .getServerCoreThreads(), cfg.getServerMaxThreads(), cfg
093: .getServerKeepalive(), cfg.getServerQueueLen(),
094: "Server Worker thread group", "HttpServerWorker");
095: }
096:
097: /**
098: * Process a new incoming request
099: * @param conn the connection
100: */
101: public void requestReceived(final NHttpServerConnection conn) {
102:
103: HttpContext context = conn.getContext();
104: HttpRequest request = conn.getHttpRequest();
105: context.setAttribute(HttpContext.HTTP_REQUEST, request);
106:
107: // allocate temporary buffers to process this request
108: context.setAttribute(REQUEST_BUFFER, ByteBuffer.allocate(cfg
109: .getBufferZise()));
110: context.setAttribute(RESPONSE_BUFFER, ByteBuffer.allocate(cfg
111: .getBufferZise()));
112:
113: try {
114: PipeImpl requestPipe = new PipeImpl(); // the pipe used to process the request
115: PipeImpl responsePipe = new PipeImpl(); // the pipe used to process the response
116: context.setAttribute(REQUEST_SINK_CHANNEL, requestPipe
117: .sink());
118: context.setAttribute(RESPONSE_SOURCE_CHANNEL, responsePipe
119: .source());
120:
121: // create the default response to this request
122: HttpVersion httpVersion = request.getRequestLine()
123: .getHttpVersion();
124: HttpResponse response = responseFactory.newHttpResponse(
125: httpVersion, HttpStatus.SC_OK, context);
126: response.setParams(this .params);
127:
128: // create a basic HttpEntity using the source channel of the response pipe
129: BasicHttpEntity entity = new BasicHttpEntity();
130: entity.setContent(Channels.newInputStream(responsePipe
131: .source()));
132: if (httpVersion.greaterEquals(HttpVersion.HTTP_1_1)) {
133: entity.setChunked(true);
134: }
135: response.setEntity(entity);
136:
137: // hand off processing of the request to a thread off the pool
138: workerPool.execute(new ServerWorker(cfgCtx, conn, isHttps,
139: this , request, Channels.newInputStream(requestPipe
140: .source()), response, Channels
141: .newOutputStream(responsePipe.sink())));
142:
143: } catch (IOException e) {
144: handleException("Error processing request received for : "
145: + request.getRequestLine().getUri(), e, conn);
146: } catch (Exception e) {
147: handleException("Error processing request received for : "
148: + request.getRequestLine().getUri(), e, conn);
149: }
150: }
151:
152: /**
153: * Process ready input by writing it into the Pipe
154: * @param conn the connection being processed
155: * @param decoder the content decoder in use
156: */
157: public void inputReady(final NHttpServerConnection conn,
158: final ContentDecoder decoder) {
159:
160: HttpContext context = conn.getContext();
161: WritableByteChannel sink = (WritableByteChannel) context
162: .getAttribute(REQUEST_SINK_CHANNEL);
163: ByteBuffer inbuf = (ByteBuffer) context
164: .getAttribute(REQUEST_BUFFER);
165:
166: try {
167: while (decoder.read(inbuf) > 0) {
168: inbuf.flip();
169: sink.write(inbuf);
170: inbuf.compact();
171: }
172:
173: if (decoder.isCompleted()) {
174: sink.close();
175: }
176:
177: } catch (IOException e) {
178: handleException("I/O Error : " + e.getMessage(), e, conn);
179: }
180: }
181:
182: public void responseReady(NHttpServerConnection conn) {
183: // New API method - should not require
184: }
185:
186: /**
187: * Process ready output by writing into the channel
188: * @param conn the connection being processed
189: * @param encoder the content encoder in use
190: */
191: public void outputReady(final NHttpServerConnection conn,
192: final ContentEncoder encoder) {
193:
194: HttpContext context = conn.getContext();
195: HttpResponse response = conn.getHttpResponse();
196: ReadableByteChannel source = (ReadableByteChannel) context
197: .getAttribute(RESPONSE_SOURCE_CHANNEL);
198: ByteBuffer outbuf = (ByteBuffer) context
199: .getAttribute(RESPONSE_BUFFER);
200:
201: try {
202: int bytesRead = source.read(outbuf);
203: if (bytesRead == -1) {
204: encoder.complete();
205: } else {
206: outbuf.flip();
207: encoder.write(outbuf);
208: outbuf.compact();
209: }
210:
211: if (encoder.isCompleted()) {
212: source.close();
213: if (!connStrategy.keepAlive(response, context)) {
214: conn.close();
215: }
216: }
217:
218: } catch (IOException e) {
219: handleException("I/O Error : " + e.getMessage(), e, conn);
220: }
221: }
222:
223: /**
224: * Commit the response to the connection. Processes the response through the configured
225: * HttpProcessor and submits it to be sent out
226: * @param conn the connection being processed
227: * @param response the response to commit over the connection
228: */
229: public void commitResponse(final NHttpServerConnection conn,
230: final HttpResponse response) {
231: try {
232: httpProcessor.process(response, conn.getContext());
233: conn.submitResponse(response);
234: } catch (HttpException e) {
235: handleException("Unexpected HTTP protocol error : "
236: + e.getMessage(), e, conn);
237: } catch (IOException e) {
238: handleException("IO error submiting response : "
239: + e.getMessage(), e, conn);
240: }
241: }
242:
243: /**
244: * Handle connection timeouts by shutting down the connections
245: * @param conn the connection being processed
246: */
247: public void timeout(final NHttpServerConnection conn) {
248: HttpRequest req = (HttpRequest) conn.getContext().getAttribute(
249: HttpContext.HTTP_REQUEST);
250: if (req != null) {
251: log.debug("Connection Timeout for request to : "
252: + req.getRequestLine().getUri()
253: + " Probably the keepalive connection was closed");
254: } else {
255: log.warn("Connection Timeout");
256: }
257: shutdownConnection(conn);
258: }
259:
260: public void connected(final NHttpServerConnection conn) {
261: log.trace("New incoming connection");
262: }
263:
264: public void closed(final NHttpServerConnection conn) {
265: log.trace("Connection closed");
266: }
267:
268: /**
269: * Handle HTTP Protocol violations with an error response
270: * @param conn the connection being processed
271: * @param e the exception encountered
272: */
273: public void exception(final NHttpServerConnection conn,
274: final HttpException e) {
275: HttpContext context = conn.getContext();
276: HttpRequest request = conn.getHttpRequest();
277: HttpVersion ver = request.getRequestLine().getHttpVersion();
278: HttpResponse response = responseFactory.newHttpResponse(ver,
279: HttpStatus.SC_BAD_REQUEST, context);
280: byte[] msg = EncodingUtils
281: .getAsciiBytes("Malformed HTTP request: "
282: + e.getMessage());
283: ByteArrayEntity entity = new ByteArrayEntity(msg);
284: entity.setContentType("text/plain; charset=US-ASCII");
285: response.setEntity(entity);
286: commitResponse(conn, response);
287: }
288:
289: /**
290: * Handle IO errors while reading or writing to underlying channels
291: * @param conn the connection being processed
292: * @param e the exception encountered
293: */
294: public void exception(NHttpServerConnection conn, IOException e) {
295: if (e instanceof ConnectionClosedException
296: || e.getMessage().indexOf("Connection reset by peer") > 0
297: || e.getMessage().indexOf("forcibly closed") > 0) {
298: log
299: .debug("I/O error (Probably the keepalive connection was closed):"
300: + e.getMessage());
301: } else {
302: log.error("I/O error: " + e.getMessage());
303: }
304: shutdownConnection(conn);
305: }
306:
307: // ----------- utility methods -----------
308:
309: private void handleException(String msg, Exception e,
310: NHttpServerConnection conn) {
311: log.error(msg, e);
312: if (conn != null) {
313: shutdownConnection(conn);
314: }
315: }
316:
317: /**
318: * Shutdown the connection ignoring any IO errors during the process
319: * @param conn the connection to be shutdown
320: */
321: private void shutdownConnection(final HttpConnection conn) {
322: try {
323: conn.shutdown();
324: } catch (IOException ignore) {
325: }
326: }
327:
328: /**
329: * Return the HttpProcessor for responses
330: * @return the HttpProcessor that processes HttpResponses of this server
331: */
332: private HttpProcessor getHttpProcessor() {
333: BasicHttpProcessor httpProcessor = new BasicHttpProcessor();
334: httpProcessor.addInterceptor(new ResponseDate());
335: httpProcessor.addInterceptor(new ResponseServer());
336: httpProcessor.addInterceptor(new ResponseContent());
337: httpProcessor.addInterceptor(new ResponseConnControl());
338: return httpProcessor;
339: }
340: }
|