001: /*
002: * $HeadURL: https://svn.apache.org/repos/asf/httpcomponents/httpcore/tags/4.0-beta1/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java $
003: * $Revision: 613298 $
004: * $Date: 2008-01-18 23:09:22 +0100 (Fri, 18 Jan 2008) $
005: *
006: * ====================================================================
007: * Licensed to the Apache Software Foundation (ASF) under one
008: * or more contributor license agreements. See the NOTICE file
009: * distributed with this work for additional information
010: * regarding copyright ownership. The ASF licenses this file
011: * to you under the Apache License, Version 2.0 (the
012: * "License"); you may not use this file except in compliance
013: * with the License. You may obtain a copy of the License at
014: *
015: * http://www.apache.org/licenses/LICENSE-2.0
016: *
017: * Unless required by applicable law or agreed to in writing,
018: * software distributed under the License is distributed on an
019: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
020: * KIND, either express or implied. See the License for the
021: * specific language governing permissions and limitations
022: * under the License.
023: * ====================================================================
024: *
025: * This software consists of voluntary contributions made by many
026: * individuals on behalf of the Apache Software Foundation. For more
027: * information on the Apache Software Foundation, please see
028: * <http://www.apache.org/>.
029: *
030: */
031:
032: package org.apache.http.nio.protocol;
033:
034: import java.io.IOException;
035: import java.io.OutputStream;
036: import java.util.concurrent.Executor;
037:
038: import org.apache.http.ConnectionReuseStrategy;
039: import org.apache.http.HttpEntity;
040: import org.apache.http.HttpEntityEnclosingRequest;
041: import org.apache.http.HttpException;
042: import org.apache.http.HttpRequest;
043: import org.apache.http.HttpResponse;
044: import org.apache.http.HttpResponseFactory;
045: import org.apache.http.HttpStatus;
046: import org.apache.http.HttpVersion;
047: import org.apache.http.MethodNotSupportedException;
048: import org.apache.http.ProtocolVersion;
049: import org.apache.http.ProtocolException;
050: import org.apache.http.UnsupportedHttpVersionException;
051: import org.apache.http.entity.ByteArrayEntity;
052: import org.apache.http.nio.ContentDecoder;
053: import org.apache.http.nio.ContentEncoder;
054: import org.apache.http.nio.IOControl;
055: import org.apache.http.nio.NHttpConnection;
056: import org.apache.http.nio.NHttpServerConnection;
057: import org.apache.http.nio.entity.ContentBufferEntity;
058: import org.apache.http.nio.entity.ContentOutputStream;
059: import org.apache.http.nio.params.NIOReactorPNames;
060: import org.apache.http.nio.util.ByteBufferAllocator;
061: import org.apache.http.nio.util.ContentInputBuffer;
062: import org.apache.http.nio.util.ContentOutputBuffer;
063: import org.apache.http.nio.util.DirectByteBufferAllocator;
064: import org.apache.http.nio.util.SharedInputBuffer;
065: import org.apache.http.nio.util.SharedOutputBuffer;
066: import org.apache.http.params.HttpParams;
067: import org.apache.http.params.DefaultedHttpParams;
068: import org.apache.http.protocol.HttpContext;
069: import org.apache.http.protocol.ExecutionContext;
070: import org.apache.http.protocol.HttpProcessor;
071: import org.apache.http.protocol.HttpRequestHandler;
072: import org.apache.http.util.EncodingUtils;
073:
074: /**
075: * HTTP service handler implementation that allocates content buffers of limited
076: * size upon initialization and is capable of controlling the frequency of I/O
077: * events in order to guarantee those content buffers do not ever get overflown.
078: * This helps ensure near constant memory footprint of HTTP connections and to
079: * avoid the 'out of memory' condition while streaming out response content.
080: *
081: * <p>The service handler will delegate the task of processing requests and
082: * generating response content to an {@link Executor}, which is expected to
083: * perform those tasks using dedicated worker threads in order to avoid
084: * blocking the I/O thread.</p>
085: *
086: * @see NIOReactorPNames#CONTENT_BUFFER_SIZE
087: *
088: * @author <a href="mailto:oleg at ural.ru">Oleg Kalnichevski</a>
089: *
090: */
091: public class ThrottlingHttpServiceHandler extends
092: NHttpServiceHandlerBase {
093:
094: protected final Executor executor;
095:
096: public ThrottlingHttpServiceHandler(
097: final HttpProcessor httpProcessor,
098: final HttpResponseFactory responseFactory,
099: final ConnectionReuseStrategy connStrategy,
100: final ByteBufferAllocator allocator,
101: final Executor executor, final HttpParams params) {
102: super (httpProcessor, responseFactory, connStrategy, allocator,
103: params);
104: this .executor = executor;
105: }
106:
107: public ThrottlingHttpServiceHandler(
108: final HttpProcessor httpProcessor,
109: final HttpResponseFactory responseFactory,
110: final ConnectionReuseStrategy connStrategy,
111: final Executor executor, final HttpParams params) {
112: this (httpProcessor, responseFactory, connStrategy,
113: new DirectByteBufferAllocator(), executor, params);
114: }
115:
116: public void connected(final NHttpServerConnection conn) {
117: HttpContext context = conn.getContext();
118:
119: int bufsize = this .params.getIntParameter(
120: NIOReactorPNames.CONTENT_BUFFER_SIZE, 20480);
121: ServerConnState connState = new ServerConnState(bufsize, conn,
122: allocator);
123: context.setAttribute(CONN_STATE, connState);
124:
125: if (this .eventListener != null) {
126: this .eventListener.connectionOpen(conn);
127: }
128: }
129:
130: public void closed(final NHttpServerConnection conn) {
131: if (this .eventListener != null) {
132: this .eventListener.connectionClosed(conn);
133: }
134: }
135:
136: public void exception(final NHttpServerConnection conn,
137: final HttpException httpex) {
138: if (conn.isResponseSubmitted()) {
139: if (eventListener != null) {
140: eventListener.fatalProtocolException(httpex, conn);
141: }
142: return;
143: }
144:
145: HttpContext context = conn.getContext();
146:
147: ServerConnState connState = (ServerConnState) context
148: .getAttribute(CONN_STATE);
149:
150: try {
151:
152: HttpResponse response = this .responseFactory
153: .newHttpResponse(HttpVersion.HTTP_1_0,
154: HttpStatus.SC_INTERNAL_SERVER_ERROR,
155: context);
156: response.setParams(new DefaultedHttpParams(response
157: .getParams(), this .params));
158: handleException(httpex, response);
159: response.setEntity(null);
160:
161: this .httpProcessor.process(response, context);
162:
163: synchronized (connState) {
164: connState.setResponse(response);
165: // Response is ready to be committed
166: conn.requestOutput();
167: }
168:
169: } catch (IOException ex) {
170: shutdownConnection(conn, ex);
171: if (eventListener != null) {
172: eventListener.fatalIOException(ex, conn);
173: }
174: } catch (HttpException ex) {
175: closeConnection(conn, ex);
176: if (eventListener != null) {
177: eventListener.fatalProtocolException(ex, conn);
178: }
179: }
180: }
181:
182: public void requestReceived(final NHttpServerConnection conn) {
183: HttpContext context = conn.getContext();
184:
185: final HttpRequest request = conn.getHttpRequest();
186: final ServerConnState connState = (ServerConnState) context
187: .getAttribute(CONN_STATE);
188:
189: synchronized (connState) {
190: boolean contentExpected = false;
191: if (request instanceof HttpEntityEnclosingRequest) {
192: HttpEntity entity = ((HttpEntityEnclosingRequest) request)
193: .getEntity();
194: if (entity != null) {
195: contentExpected = true;
196: }
197: }
198:
199: if (!contentExpected) {
200: conn.suspendInput();
201: }
202:
203: this .executor.execute(new Runnable() {
204:
205: public void run() {
206: try {
207:
208: handleRequest(request, connState, conn);
209:
210: } catch (IOException ex) {
211: shutdownConnection(conn, ex);
212: if (eventListener != null) {
213: eventListener.fatalIOException(ex, conn);
214: }
215: } catch (HttpException ex) {
216: shutdownConnection(conn, ex);
217: if (eventListener != null) {
218: eventListener.fatalProtocolException(ex,
219: conn);
220: }
221: }
222: }
223:
224: });
225:
226: connState.notifyAll();
227: }
228:
229: }
230:
231: public void inputReady(final NHttpServerConnection conn,
232: final ContentDecoder decoder) {
233: HttpContext context = conn.getContext();
234:
235: ServerConnState connState = (ServerConnState) context
236: .getAttribute(CONN_STATE);
237:
238: try {
239:
240: synchronized (connState) {
241: ContentInputBuffer buffer = connState.getInbuffer();
242:
243: buffer.consumeContent(decoder);
244: if (decoder.isCompleted()) {
245: connState
246: .setInputState(ServerConnState.REQUEST_BODY_DONE);
247: } else {
248: connState
249: .setInputState(ServerConnState.REQUEST_BODY_STREAM);
250: }
251:
252: connState.notifyAll();
253: }
254:
255: } catch (IOException ex) {
256: shutdownConnection(conn, ex);
257: if (this .eventListener != null) {
258: this .eventListener.fatalIOException(ex, conn);
259: }
260: }
261:
262: }
263:
264: public void responseReady(final NHttpServerConnection conn) {
265: HttpContext context = conn.getContext();
266:
267: ServerConnState connState = (ServerConnState) context
268: .getAttribute(CONN_STATE);
269:
270: try {
271:
272: synchronized (connState) {
273: HttpResponse response = connState.getResponse();
274: if (connState.getOutputState() == ServerConnState.READY
275: && response != null
276: && !conn.isResponseSubmitted()) {
277:
278: conn.submitResponse(response);
279: int statusCode = response.getStatusLine()
280: .getStatusCode();
281: HttpEntity entity = response.getEntity();
282:
283: if (statusCode >= 200 && entity == null) {
284: connState
285: .setOutputState(ServerConnState.RESPONSE_DONE);
286: if (!connState.isWorkerRunning()) {
287: connState.resetOutput();
288: connState.resetInput();
289: conn.requestInput();
290: }
291:
292: if (!this .connStrategy.keepAlive(response,
293: context)) {
294: conn.close();
295: }
296: } else {
297: connState
298: .setOutputState(ServerConnState.RESPONSE_SENT);
299: }
300: }
301:
302: connState.notifyAll();
303: }
304:
305: } catch (IOException ex) {
306: shutdownConnection(conn, ex);
307: if (eventListener != null) {
308: eventListener.fatalIOException(ex, conn);
309: }
310: } catch (HttpException ex) {
311: closeConnection(conn, ex);
312: if (eventListener != null) {
313: eventListener.fatalProtocolException(ex, conn);
314: }
315: }
316: }
317:
318: public void outputReady(final NHttpServerConnection conn,
319: final ContentEncoder encoder) {
320: HttpContext context = conn.getContext();
321:
322: ServerConnState connState = (ServerConnState) context
323: .getAttribute(CONN_STATE);
324:
325: try {
326:
327: synchronized (connState) {
328: HttpResponse response = connState.getResponse();
329: ContentOutputBuffer buffer = connState.getOutbuffer();
330:
331: buffer.produceContent(encoder);
332: if (encoder.isCompleted()) {
333: connState
334: .setOutputState(ServerConnState.RESPONSE_BODY_DONE);
335:
336: if (!connState.isWorkerRunning()) {
337: connState.resetOutput();
338: connState.resetInput();
339: conn.requestInput();
340: }
341:
342: if (!this .connStrategy.keepAlive(response, context)) {
343: conn.close();
344: }
345: } else {
346: connState
347: .setOutputState(ServerConnState.RESPONSE_BODY_STREAM);
348: }
349:
350: connState.notifyAll();
351: }
352:
353: } catch (IOException ex) {
354: shutdownConnection(conn, ex);
355: if (this .eventListener != null) {
356: this .eventListener.fatalIOException(ex, conn);
357: }
358: }
359: }
360:
361: private void handleException(final HttpException ex,
362: final HttpResponse response) {
363: if (ex instanceof MethodNotSupportedException) {
364: response.setStatusCode(HttpStatus.SC_NOT_IMPLEMENTED);
365: } else if (ex instanceof UnsupportedHttpVersionException) {
366: response
367: .setStatusCode(HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED);
368: } else if (ex instanceof ProtocolException) {
369: response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
370: } else {
371: response.setStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
372: }
373: byte[] msg = EncodingUtils.getAsciiBytes(ex.getMessage());
374: ByteArrayEntity entity = new ByteArrayEntity(msg);
375: entity.setContentType("text/plain; charset=US-ASCII");
376: response.setEntity(entity);
377: }
378:
379: private void handleRequest(final HttpRequest request,
380: final ServerConnState connState,
381: final NHttpServerConnection conn) throws HttpException,
382: IOException {
383:
384: HttpContext context = conn.getContext();
385:
386: // Block until previous request is fully processed and
387: // the worker thread no longer holds the shared buffer
388: synchronized (connState) {
389: try {
390: for (;;) {
391: int currentState = connState.getOutputState();
392: if (!connState.isWorkerRunning()) {
393: break;
394: }
395: if (currentState == ServerConnState.SHUTDOWN) {
396: return;
397: }
398: connState.wait();
399: }
400: } catch (InterruptedException ex) {
401: connState.shutdown();
402: return;
403: }
404: connState.setInputState(ServerConnState.REQUEST_RECEIVED);
405: connState.setRequest(request);
406: connState.setWorkerRunning(true);
407: }
408:
409: request.setParams(new DefaultedHttpParams(request.getParams(),
410: this .params));
411:
412: context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
413: context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
414:
415: ProtocolVersion ver = request.getRequestLine()
416: .getProtocolVersion();
417:
418: if (!ver.lessEquals(HttpVersion.HTTP_1_1)) {
419: // Downgrade protocol version if greater than HTTP/1.1
420: ver = HttpVersion.HTTP_1_1;
421: }
422:
423: HttpResponse response = null;
424:
425: if (request instanceof HttpEntityEnclosingRequest) {
426: HttpEntityEnclosingRequest eeRequest = (HttpEntityEnclosingRequest) request;
427:
428: if (eeRequest.expectContinue()) {
429: response = this .responseFactory.newHttpResponse(ver,
430: HttpStatus.SC_CONTINUE, context);
431: response.setParams(new DefaultedHttpParams(response
432: .getParams(), this .params));
433: if (this .expectationVerifier != null) {
434: try {
435: this .expectationVerifier.verify(request,
436: response, context);
437: } catch (HttpException ex) {
438: response = this .responseFactory
439: .newHttpResponse(
440: HttpVersion.HTTP_1_0,
441: HttpStatus.SC_INTERNAL_SERVER_ERROR,
442: context);
443: response.setParams(new DefaultedHttpParams(
444: response.getParams(), this .params));
445: handleException(ex, response);
446: }
447: }
448:
449: if (response.getStatusLine().getStatusCode() < 200) {
450:
451: // Send 1xx response indicating the server expections
452: // have been met
453: synchronized (connState) {
454: connState.setResponse(response);
455: conn.requestOutput();
456:
457: // Block until 1xx response is sent to the client
458: try {
459: for (;;) {
460: int currentState = connState
461: .getOutputState();
462: if (currentState == ServerConnState.RESPONSE_SENT) {
463: break;
464: }
465: if (currentState == ServerConnState.SHUTDOWN) {
466: return;
467: }
468: connState.wait();
469: }
470: } catch (InterruptedException ex) {
471: connState.shutdown();
472: return;
473: }
474: connState.resetOutput();
475: response = null;
476: }
477: } else {
478: // Discard request entity
479: conn.resetInput();
480: eeRequest.setEntity(null);
481: }
482:
483: }
484:
485: // Create a wrapper entity instead of the original one
486: if (eeRequest.getEntity() != null) {
487: eeRequest.setEntity(new ContentBufferEntity(eeRequest
488: .getEntity(), connState.getInbuffer()));
489: }
490:
491: }
492:
493: if (response == null) {
494: response = this .responseFactory.newHttpResponse(ver,
495: HttpStatus.SC_OK, context);
496: response.setParams(new DefaultedHttpParams(response
497: .getParams(), this .params));
498:
499: context.setAttribute(ExecutionContext.HTTP_RESPONSE,
500: response);
501:
502: try {
503:
504: this .httpProcessor.process(request, context);
505:
506: HttpRequestHandler handler = null;
507: if (this .handlerResolver != null) {
508: String requestURI = request.getRequestLine()
509: .getUri();
510: handler = this .handlerResolver.lookup(requestURI);
511: }
512: if (handler != null) {
513: handler.handle(request, response, context);
514: } else {
515: response
516: .setStatusCode(HttpStatus.SC_NOT_IMPLEMENTED);
517: }
518:
519: } catch (HttpException ex) {
520: response = this .responseFactory.newHttpResponse(
521: HttpVersion.HTTP_1_0,
522: HttpStatus.SC_INTERNAL_SERVER_ERROR, context);
523: response.setParams(new DefaultedHttpParams(response
524: .getParams(), this .params));
525: handleException(ex, response);
526: }
527: }
528:
529: this .httpProcessor.process(response, context);
530:
531: if (!canResponseHaveBody(request, response)) {
532: response.setEntity(null);
533: }
534:
535: connState.setResponse(response);
536: // Response is ready to be committed
537: conn.requestOutput();
538:
539: if (response.getEntity() != null) {
540: ContentOutputBuffer buffer = connState.getOutbuffer();
541: OutputStream outstream = new ContentOutputStream(buffer);
542:
543: HttpEntity entity = response.getEntity();
544: entity.writeTo(outstream);
545: outstream.flush();
546: outstream.close();
547: }
548:
549: synchronized (connState) {
550: if (connState.getOutputState() == ServerConnState.RESPONSE_DONE
551: && conn.isOpen()) {
552: connState.resetInput();
553: connState.resetOutput();
554: conn.requestInput();
555: }
556: connState.setWorkerRunning(false);
557: connState.notifyAll();
558: }
559: }
560:
561: protected void shutdownConnection(final NHttpConnection conn,
562: final Throwable cause) {
563: HttpContext context = conn.getContext();
564:
565: ServerConnState connState = (ServerConnState) context
566: .getAttribute(CONN_STATE);
567:
568: super .shutdownConnection(conn, cause);
569:
570: if (connState != null) {
571: connState.shutdown();
572: }
573: }
574:
575: static class ServerConnState {
576:
577: public static final int SHUTDOWN = -1;
578: public static final int READY = 0;
579: public static final int REQUEST_RECEIVED = 1;
580: public static final int REQUEST_BODY_STREAM = 2;
581: public static final int REQUEST_BODY_DONE = 4;
582: public static final int RESPONSE_SENT = 8;
583: public static final int RESPONSE_BODY_STREAM = 16;
584: public static final int RESPONSE_BODY_DONE = 32;
585: public static final int RESPONSE_DONE = 32;
586:
587: private final SharedInputBuffer inbuffer;
588: private final SharedOutputBuffer outbuffer;
589:
590: private volatile int inputState;
591: private volatile int outputState;
592:
593: private volatile HttpRequest request;
594: private volatile HttpResponse response;
595:
596: private volatile boolean workerRunning;
597:
598: public ServerConnState(int bufsize, final IOControl ioControl,
599: final ByteBufferAllocator allocator) {
600: super ();
601: this .inbuffer = new SharedInputBuffer(bufsize, ioControl,
602: allocator);
603: this .outbuffer = new SharedOutputBuffer(bufsize, ioControl,
604: allocator);
605: this .inputState = READY;
606: this .outputState = READY;
607: }
608:
609: public ContentInputBuffer getInbuffer() {
610: return this .inbuffer;
611: }
612:
613: public ContentOutputBuffer getOutbuffer() {
614: return this .outbuffer;
615: }
616:
617: public int getInputState() {
618: return this .inputState;
619: }
620:
621: public void setInputState(int inputState) {
622: this .inputState = inputState;
623: }
624:
625: public int getOutputState() {
626: return this .outputState;
627: }
628:
629: public void setOutputState(int outputState) {
630: this .outputState = outputState;
631: }
632:
633: public HttpRequest getRequest() {
634: return this .request;
635: }
636:
637: public void setRequest(final HttpRequest request) {
638: this .request = request;
639: }
640:
641: public HttpResponse getResponse() {
642: return this .response;
643: }
644:
645: public void setResponse(final HttpResponse response) {
646: this .response = response;
647: }
648:
649: public boolean isWorkerRunning() {
650: return this .workerRunning;
651: }
652:
653: public void setWorkerRunning(boolean b) {
654: this .workerRunning = b;
655: }
656:
657: public void shutdown() {
658: this .inbuffer.shutdown();
659: this .outbuffer.shutdown();
660: this .inputState = SHUTDOWN;
661: this .outputState = SHUTDOWN;
662: }
663:
664: public void resetInput() {
665: this .inbuffer.reset();
666: this .request = null;
667: this .inputState = READY;
668: }
669:
670: public void resetOutput() {
671: this.outbuffer.reset();
672: this.response = null;
673: this.outputState = READY;
674: }
675:
676: }
677:
678: }
|