001: /*
002: * $HeadURL: https://svn.apache.org/repos/asf/httpcomponents/httpcore/tags/4.0-beta1/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpClientHandler.java $
003: * $Revision: 612997 $
004: * $Date: 2008-01-17 23:42:36 +0100 (Thu, 17 Jan 2008) $
005: *
006: * ====================================================================
007: * Licensed to the Apache Software Foundation (ASF) under one
008: * or more contributor license agreements. See the NOTICE file
009: * distributed with this work for additional information
010: * regarding copyright ownership. The ASF licenses this file
011: * to you under the Apache License, Version 2.0 (the
012: * "License"); you may not use this file except in compliance
013: * with the License. You may obtain a copy of the License at
014: *
015: * http://www.apache.org/licenses/LICENSE-2.0
016: *
017: * Unless required by applicable law or agreed to in writing,
018: * software distributed under the License is distributed on an
019: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
020: * KIND, either express or implied. See the License for the
021: * specific language governing permissions and limitations
022: * under the License.
023: * ====================================================================
024: *
025: * This software consists of voluntary contributions made by many
026: * individuals on behalf of the Apache Software Foundation. For more
027: * information on the Apache Software Foundation, please see
028: * <http://www.apache.org/>.
029: *
030: */
031:
032: package org.apache.http.nio.protocol;
033:
034: import java.io.IOException;
035: import java.io.OutputStream;
036: import java.util.concurrent.Executor;
037:
038: import org.apache.http.ConnectionReuseStrategy;
039: import org.apache.http.HttpEntity;
040: import org.apache.http.HttpEntityEnclosingRequest;
041: import org.apache.http.HttpException;
042: import org.apache.http.HttpRequest;
043: import org.apache.http.HttpResponse;
044: import org.apache.http.HttpStatus;
045: import org.apache.http.nio.ContentDecoder;
046: import org.apache.http.nio.ContentEncoder;
047: import org.apache.http.nio.IOControl;
048: import org.apache.http.nio.NHttpClientConnection;
049: import org.apache.http.nio.NHttpConnection;
050: import org.apache.http.nio.entity.ContentBufferEntity;
051: import org.apache.http.nio.entity.ContentOutputStream;
052: import org.apache.http.nio.params.NIOReactorPNames;
053: import org.apache.http.nio.protocol.ThrottlingHttpServiceHandler.ServerConnState;
054: import org.apache.http.nio.util.ByteBufferAllocator;
055: import org.apache.http.nio.util.ContentInputBuffer;
056: import org.apache.http.nio.util.ContentOutputBuffer;
057: import org.apache.http.nio.util.DirectByteBufferAllocator;
058: import org.apache.http.nio.util.SharedInputBuffer;
059: import org.apache.http.nio.util.SharedOutputBuffer;
060: import org.apache.http.params.HttpParams;
061: import org.apache.http.params.CoreProtocolPNames;
062: import org.apache.http.params.DefaultedHttpParams;
063: import org.apache.http.protocol.ExecutionContext;
064: import org.apache.http.protocol.HttpContext;
065: import org.apache.http.protocol.HttpProcessor;
066:
067: /**
068: * HTTP client handler implementation that allocates content buffers of limited
069: * size upon initialization and is capable of controlling the frequency of I/O
070: * events in order to guarantee those content buffers do not ever get overflown.
071: * This helps ensure near constant memory footprint of HTTP connections and to
072: * avoid the 'out of memory' condition while streaming out response content.
073: *
074: * <p>The client handler will delegate the tasks of sending entity enclosing
075: * HTTP requests and processing of HTTP responses to an {@link Executor},
076: * which is expected to perform those tasks using dedicated worker threads in
077: * order to avoid blocking the I/O thread.</p>
078: *
079: * @see NIOReactorPNames#CONTENT_BUFFER_SIZE
080: *
081: * @author <a href="mailto:oleg at ural.ru">Oleg Kalnichevski</a>
082: *
083: */
084: public class ThrottlingHttpClientHandler extends NHttpClientHandlerBase {
085:
086: protected final Executor executor;
087:
088: public ThrottlingHttpClientHandler(
089: final HttpProcessor httpProcessor,
090: final HttpRequestExecutionHandler execHandler,
091: final ConnectionReuseStrategy connStrategy,
092: final ByteBufferAllocator allocator,
093: final Executor executor, final HttpParams params) {
094: super (httpProcessor, execHandler, connStrategy, allocator,
095: params);
096: if (executor == null) {
097: throw new IllegalArgumentException(
098: "Executor may not be null");
099: }
100: this .executor = executor;
101: }
102:
103: public ThrottlingHttpClientHandler(
104: final HttpProcessor httpProcessor,
105: final HttpRequestExecutionHandler execHandler,
106: final ConnectionReuseStrategy connStrategy,
107: final Executor executor, final HttpParams params) {
108: this (httpProcessor, execHandler, connStrategy,
109: new DirectByteBufferAllocator(), executor, params);
110: }
111:
112: public void connected(final NHttpClientConnection conn,
113: final Object attachment) {
114: HttpContext context = conn.getContext();
115:
116: initialize(conn, attachment);
117:
118: int bufsize = this .params.getIntParameter(
119: NIOReactorPNames.CONTENT_BUFFER_SIZE, 20480);
120: ClientConnState connState = new ClientConnState(bufsize, conn,
121: this .allocator);
122: context.setAttribute(CONN_STATE, connState);
123:
124: if (this .eventListener != null) {
125: this .eventListener.connectionOpen(conn);
126: }
127:
128: requestReady(conn);
129: }
130:
131: @Override
132: public void closed(final NHttpClientConnection conn) {
133: HttpContext context = conn.getContext();
134:
135: this .execHandler.finalizeContext(context);
136:
137: // TODO - replace with super.closed(conn); ?
138: if (this .eventListener != null) {
139: this .eventListener.connectionClosed(conn);
140: }
141: }
142:
143: public void requestReady(final NHttpClientConnection conn) {
144: HttpContext context = conn.getContext();
145:
146: ClientConnState connState = (ClientConnState) context
147: .getAttribute(CONN_STATE);
148:
149: try {
150:
151: synchronized (connState) {
152: if (connState.getOutputState() != ClientConnState.READY) {
153: return;
154: }
155:
156: HttpRequest request = this .execHandler
157: .submitRequest(context);
158: if (request == null) {
159: return;
160: }
161:
162: request.setParams(new DefaultedHttpParams(request
163: .getParams(), this .params));
164:
165: context.setAttribute(ExecutionContext.HTTP_REQUEST,
166: request);
167: this .httpProcessor.process(request, context);
168: connState.setRequest(request);
169: conn.submitRequest(request);
170: connState.setOutputState(ClientConnState.REQUEST_SENT);
171:
172: conn.requestInput();
173:
174: if (request instanceof HttpEntityEnclosingRequest) {
175: if (((HttpEntityEnclosingRequest) request)
176: .expectContinue()) {
177: int timeout = conn.getSocketTimeout();
178: connState.setTimeout(timeout);
179: timeout = this .params.getIntParameter(
180: CoreProtocolPNames.WAIT_FOR_CONTINUE,
181: 3000);
182: conn.setSocketTimeout(timeout);
183: connState
184: .setOutputState(ClientConnState.EXPECT_CONTINUE);
185: } else {
186: sendRequestBody(
187: (HttpEntityEnclosingRequest) request,
188: connState, conn);
189: }
190: }
191:
192: connState.notifyAll();
193: }
194:
195: } catch (IOException ex) {
196: shutdownConnection(conn, ex);
197: if (this .eventListener != null) {
198: this .eventListener.fatalIOException(ex, conn);
199: }
200: } catch (HttpException ex) {
201: closeConnection(conn, ex);
202: if (this .eventListener != null) {
203: this .eventListener.fatalProtocolException(ex, conn);
204: }
205: }
206: }
207:
208: public void outputReady(final NHttpClientConnection conn,
209: final ContentEncoder encoder) {
210: HttpContext context = conn.getContext();
211:
212: ClientConnState connState = (ClientConnState) context
213: .getAttribute(CONN_STATE);
214:
215: try {
216:
217: synchronized (connState) {
218: if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
219: conn.suspendOutput();
220: return;
221: }
222: ContentOutputBuffer buffer = connState.getOutbuffer();
223: buffer.produceContent(encoder);
224: if (encoder.isCompleted()) {
225: connState
226: .setInputState(ClientConnState.REQUEST_BODY_DONE);
227: } else {
228: connState
229: .setInputState(ClientConnState.REQUEST_BODY_STREAM);
230: }
231:
232: connState.notifyAll();
233: }
234:
235: } catch (IOException ex) {
236: shutdownConnection(conn, ex);
237: if (this .eventListener != null) {
238: this .eventListener.fatalIOException(ex, conn);
239: }
240: }
241: }
242:
243: public void responseReceived(final NHttpClientConnection conn) {
244: HttpContext context = conn.getContext();
245: ClientConnState connState = (ClientConnState) context
246: .getAttribute(CONN_STATE);
247:
248: try {
249:
250: synchronized (connState) {
251: HttpResponse response = conn.getHttpResponse();
252: response.setParams(new DefaultedHttpParams(response
253: .getParams(), this .params));
254:
255: HttpRequest request = connState.getRequest();
256:
257: int statusCode = response.getStatusLine()
258: .getStatusCode();
259: if (statusCode < HttpStatus.SC_OK) {
260: // 1xx intermediate response
261: if (statusCode == HttpStatus.SC_CONTINUE
262: && connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
263: connState
264: .setOutputState(ClientConnState.REQUEST_SENT);
265: continueRequest(conn, connState);
266: }
267: return;
268: } else {
269: connState.setResponse(response);
270: connState
271: .setInputState(ClientConnState.RESPONSE_RECEIVED);
272:
273: if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
274: int timeout = connState.getTimeout();
275: conn.setSocketTimeout(timeout);
276: conn.resetOutput();
277: }
278: }
279:
280: if (!canResponseHaveBody(request, response)) {
281: conn.resetInput();
282: response.setEntity(null);
283: connState
284: .setInputState(ClientConnState.RESPONSE_DONE);
285:
286: if (!this .connStrategy.keepAlive(response, context)) {
287: conn.close();
288: }
289: }
290:
291: if (response.getEntity() != null) {
292: response.setEntity(new ContentBufferEntity(response
293: .getEntity(), connState.getInbuffer()));
294: }
295:
296: context.setAttribute(ExecutionContext.HTTP_RESPONSE,
297: response);
298:
299: this .httpProcessor.process(response, context);
300:
301: handleResponse(response, connState, conn);
302:
303: connState.notifyAll();
304: }
305:
306: } catch (IOException ex) {
307: shutdownConnection(conn, ex);
308: if (this .eventListener != null) {
309: this .eventListener.fatalIOException(ex, conn);
310: }
311: } catch (HttpException ex) {
312: closeConnection(conn, ex);
313: if (this .eventListener != null) {
314: this .eventListener.fatalProtocolException(ex, conn);
315: }
316: }
317: }
318:
319: public void inputReady(final NHttpClientConnection conn,
320: final ContentDecoder decoder) {
321: HttpContext context = conn.getContext();
322:
323: ClientConnState connState = (ClientConnState) context
324: .getAttribute(CONN_STATE);
325: try {
326:
327: synchronized (connState) {
328: HttpResponse response = connState.getResponse();
329: ContentInputBuffer buffer = connState.getInbuffer();
330:
331: buffer.consumeContent(decoder);
332: if (decoder.isCompleted()) {
333: connState
334: .setInputState(ClientConnState.RESPONSE_BODY_DONE);
335:
336: if (!this .connStrategy.keepAlive(response, context)) {
337: conn.close();
338: }
339: } else {
340: connState
341: .setInputState(ClientConnState.RESPONSE_BODY_STREAM);
342: }
343:
344: connState.notifyAll();
345: }
346:
347: } catch (IOException ex) {
348: shutdownConnection(conn, ex);
349: if (this .eventListener != null) {
350: this .eventListener.fatalIOException(ex, conn);
351: }
352: }
353: }
354:
355: public void timeout(final NHttpClientConnection conn) {
356: HttpContext context = conn.getContext();
357: ClientConnState connState = (ClientConnState) context
358: .getAttribute(CONN_STATE);
359:
360: try {
361:
362: synchronized (connState) {
363: if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
364: connState
365: .setOutputState(ClientConnState.REQUEST_SENT);
366: continueRequest(conn, connState);
367:
368: connState.notifyAll();
369: return;
370: }
371: }
372:
373: } catch (IOException ex) {
374: shutdownConnection(conn, ex);
375: if (this .eventListener != null) {
376: this .eventListener.fatalIOException(ex, conn);
377: }
378: }
379:
380: handleTimeout(conn);
381: }
382:
383: private void initialize(final NHttpClientConnection conn,
384: final Object attachment) {
385: HttpContext context = conn.getContext();
386:
387: context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
388: this .execHandler.initalizeContext(context, attachment);
389: }
390:
391: private void continueRequest(final NHttpClientConnection conn,
392: final ClientConnState connState) throws IOException {
393:
394: HttpRequest request = connState.getRequest();
395:
396: int timeout = connState.getTimeout();
397: conn.setSocketTimeout(timeout);
398:
399: sendRequestBody((HttpEntityEnclosingRequest) request,
400: connState, conn);
401: }
402:
403: private void sendRequestBody(
404: final HttpEntityEnclosingRequest request,
405: final ClientConnState connState,
406: final NHttpClientConnection conn) throws IOException {
407: HttpEntity entity = request.getEntity();
408: if (entity != null) {
409:
410: this .executor.execute(new Runnable() {
411:
412: public void run() {
413: try {
414:
415: // Block until previous request is fully processed and
416: // the worker thread no longer holds the shared buffer
417: synchronized (connState) {
418: try {
419: for (;;) {
420: int currentState = connState
421: .getOutputState();
422: if (!connState.isWorkerRunning()) {
423: break;
424: }
425: if (currentState == ServerConnState.SHUTDOWN) {
426: return;
427: }
428: connState.wait();
429: }
430: } catch (InterruptedException ex) {
431: connState.shutdown();
432: return;
433: }
434: connState.setWorkerRunning(true);
435: }
436:
437: HttpEntity entity = request.getEntity();
438: OutputStream outstream = new ContentOutputStream(
439: connState.getOutbuffer());
440: entity.writeTo(outstream);
441: outstream.flush();
442: outstream.close();
443:
444: synchronized (connState) {
445: connState.setWorkerRunning(false);
446: connState.notifyAll();
447: }
448:
449: } catch (IOException ex) {
450: shutdownConnection(conn, ex);
451: if (eventListener != null) {
452: eventListener.fatalIOException(ex, conn);
453: }
454: }
455: }
456:
457: });
458: }
459: }
460:
461: private void handleResponse(final HttpResponse response,
462: final ClientConnState connState,
463: final NHttpClientConnection conn) {
464:
465: final HttpContext context = conn.getContext();
466:
467: this .executor.execute(new Runnable() {
468:
469: public void run() {
470: try {
471:
472: // Block until previous request is fully processed and
473: // the worker thread no longer holds the shared buffer
474: synchronized (connState) {
475: try {
476: for (;;) {
477: int currentState = connState
478: .getOutputState();
479: if (!connState.isWorkerRunning()) {
480: break;
481: }
482: if (currentState == ServerConnState.SHUTDOWN) {
483: return;
484: }
485: connState.wait();
486: }
487: } catch (InterruptedException ex) {
488: connState.shutdown();
489: return;
490: }
491: connState.setWorkerRunning(true);
492: }
493:
494: execHandler.handleResponse(response, context);
495:
496: synchronized (connState) {
497:
498: try {
499: for (;;) {
500: int currentState = connState
501: .getInputState();
502: if (currentState == ClientConnState.RESPONSE_DONE) {
503: break;
504: }
505: if (currentState == ServerConnState.SHUTDOWN) {
506: return;
507: }
508: connState.wait();
509: }
510: } catch (InterruptedException ex) {
511: connState.shutdown();
512: }
513:
514: connState.resetInput();
515: connState.resetOutput();
516: if (conn.isOpen()) {
517: conn.requestOutput();
518: }
519: connState.setWorkerRunning(false);
520: connState.notifyAll();
521: }
522:
523: } catch (IOException ex) {
524: shutdownConnection(conn, ex);
525: if (eventListener != null) {
526: eventListener.fatalIOException(ex, conn);
527: }
528: }
529: }
530:
531: });
532:
533: }
534:
535: protected void shutdownConnection(final NHttpConnection conn,
536: final Throwable cause) {
537: HttpContext context = conn.getContext();
538:
539: ClientConnState connState = (ClientConnState) context
540: .getAttribute(CONN_STATE);
541:
542: super .shutdownConnection(conn, cause);
543:
544: if (connState != null) {
545: connState.shutdown();
546: }
547: }
548:
549: static class ClientConnState {
550:
551: public static final int SHUTDOWN = -1;
552: public static final int READY = 0;
553: public static final int REQUEST_SENT = 1;
554: public static final int EXPECT_CONTINUE = 2;
555: public static final int REQUEST_BODY_STREAM = 4;
556: public static final int REQUEST_BODY_DONE = 8;
557: public static final int RESPONSE_RECEIVED = 16;
558: public static final int RESPONSE_BODY_STREAM = 32;
559: public static final int RESPONSE_BODY_DONE = 64;
560: public static final int RESPONSE_DONE = 64;
561:
562: private final SharedInputBuffer inbuffer;
563: private final SharedOutputBuffer outbuffer;
564:
565: private volatile int inputState;
566: private volatile int outputState;
567:
568: private volatile HttpRequest request;
569: private volatile HttpResponse response;
570:
571: private volatile int timeout;
572:
573: private volatile boolean workerRunning;
574:
575: public ClientConnState(int bufsize, final IOControl ioControl,
576: final ByteBufferAllocator allocator) {
577: super ();
578: this .inbuffer = new SharedInputBuffer(bufsize, ioControl,
579: allocator);
580: this .outbuffer = new SharedOutputBuffer(bufsize, ioControl,
581: allocator);
582: this .inputState = READY;
583: this .outputState = READY;
584: }
585:
586: public ContentInputBuffer getInbuffer() {
587: return this .inbuffer;
588: }
589:
590: public ContentOutputBuffer getOutbuffer() {
591: return this .outbuffer;
592: }
593:
594: public int getInputState() {
595: return this .inputState;
596: }
597:
598: public void setInputState(int inputState) {
599: this .inputState = inputState;
600: }
601:
602: public int getOutputState() {
603: return this .outputState;
604: }
605:
606: public void setOutputState(int outputState) {
607: this .outputState = outputState;
608: }
609:
610: public HttpRequest getRequest() {
611: return this .request;
612: }
613:
614: public void setRequest(final HttpRequest request) {
615: this .request = request;
616: }
617:
618: public HttpResponse getResponse() {
619: return this .response;
620: }
621:
622: public void setResponse(final HttpResponse response) {
623: this .response = response;
624: }
625:
626: public int getTimeout() {
627: return this .timeout;
628: }
629:
630: public void setTimeout(int timeout) {
631: this .timeout = timeout;
632: }
633:
634: public boolean isWorkerRunning() {
635: return this .workerRunning;
636: }
637:
638: public void setWorkerRunning(boolean b) {
639: this .workerRunning = b;
640: }
641:
642: public void shutdown() {
643: this .inbuffer.shutdown();
644: this .outbuffer.shutdown();
645: this .inputState = SHUTDOWN;
646: this .outputState = SHUTDOWN;
647: }
648:
649: public void resetInput() {
650: this .inbuffer.reset();
651: this .request = null;
652: this .inputState = READY;
653: }
654:
655: public void resetOutput() {
656: this.outbuffer.reset();
657: this.response = null;
658: this.outputState = READY;
659: }
660:
661: }
662:
663: }
|