001: /*
002: * $HeadURL: https://svn.apache.org/repos/asf/httpcomponents/httpcore/tags/4.0-beta1/module-nio/src/main/java/org/apache/http/nio/protocol/BufferingHttpClientHandler.java $
003: * $Revision: 613298 $
004: * $Date: 2008-01-18 23:09:22 +0100 (Fri, 18 Jan 2008) $
005: *
006: * ====================================================================
007: * Licensed to the Apache Software Foundation (ASF) under one
008: * or more contributor license agreements. See the NOTICE file
009: * distributed with this work for additional information
010: * regarding copyright ownership. The ASF licenses this file
011: * to you under the Apache License, Version 2.0 (the
012: * "License"); you may not use this file except in compliance
013: * with the License. You may obtain a copy of the License at
014: *
015: * http://www.apache.org/licenses/LICENSE-2.0
016: *
017: * Unless required by applicable law or agreed to in writing,
018: * software distributed under the License is distributed on an
019: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
020: * KIND, either express or implied. See the License for the
021: * specific language governing permissions and limitations
022: * under the License.
023: * ====================================================================
024: *
025: * This software consists of voluntary contributions made by many
026: * individuals on behalf of the Apache Software Foundation. For more
027: * information on the Apache Software Foundation, please see
028: * <http://www.apache.org/>.
029: *
030: */
031:
032: package org.apache.http.nio.protocol;
033:
034: import java.io.IOException;
035: import java.io.OutputStream;
036:
037: import org.apache.http.ConnectionReuseStrategy;
038: import org.apache.http.HttpEntity;
039: import org.apache.http.HttpEntityEnclosingRequest;
040: import org.apache.http.HttpException;
041: import org.apache.http.HttpRequest;
042: import org.apache.http.HttpResponse;
043: import org.apache.http.HttpStatus;
044: import org.apache.http.nio.ContentDecoder;
045: import org.apache.http.nio.ContentEncoder;
046: import org.apache.http.nio.NHttpClientConnection;
047: import org.apache.http.nio.entity.ContentBufferEntity;
048: import org.apache.http.nio.entity.ContentOutputStream;
049: import org.apache.http.nio.util.ByteBufferAllocator;
050: import org.apache.http.nio.util.ContentInputBuffer;
051: import org.apache.http.nio.util.ContentOutputBuffer;
052: import org.apache.http.nio.util.HeapByteBufferAllocator;
053: import org.apache.http.nio.util.SimpleInputBuffer;
054: import org.apache.http.nio.util.SimpleOutputBuffer;
055: import org.apache.http.params.HttpParams;
056: import org.apache.http.params.CoreProtocolPNames;
057: import org.apache.http.params.DefaultedHttpParams;
058: import org.apache.http.protocol.ExecutionContext;
059: import org.apache.http.protocol.HttpContext;
060: import org.apache.http.protocol.HttpProcessor;
061:
062: /**
063: * HTTP client handler implementation that buffers the content of HTTP messages
064: * entirely in memory and executes HTTP requests on the main I/O thread.
065: *
066: * <p>This service handler should be used only when dealing with HTTP messages
067: * that are known to be limited in length</p>
068: *
069: * @author <a href="mailto:oleg at ural.ru">Oleg Kalnichevski</a>
070: *
071: */
072: public class BufferingHttpClientHandler extends NHttpClientHandlerBase {
073:
074: public BufferingHttpClientHandler(
075: final HttpProcessor httpProcessor,
076: final HttpRequestExecutionHandler execHandler,
077: final ConnectionReuseStrategy connStrategy,
078: final ByteBufferAllocator allocator, final HttpParams params) {
079: super (httpProcessor, execHandler, connStrategy, allocator,
080: params);
081: }
082:
083: public BufferingHttpClientHandler(
084: final HttpProcessor httpProcessor,
085: final HttpRequestExecutionHandler execHandler,
086: final ConnectionReuseStrategy connStrategy,
087: final HttpParams params) {
088: this (httpProcessor, execHandler, connStrategy,
089: new HeapByteBufferAllocator(), params);
090: }
091:
092: public void connected(final NHttpClientConnection conn,
093: final Object attachment) {
094: HttpContext context = conn.getContext();
095:
096: initialize(conn, attachment);
097:
098: ClientConnState connState = new ClientConnState(allocator);
099: context.setAttribute(CONN_STATE, connState);
100:
101: if (this .eventListener != null) {
102: this .eventListener.connectionOpen(conn);
103: }
104:
105: requestReady(conn);
106: }
107:
108: @Override
109: public void closed(final NHttpClientConnection conn) {
110: HttpContext context = conn.getContext();
111:
112: this .execHandler.finalizeContext(context);
113:
114: // TODO - replace with super.closed(conn); ?
115: if (this .eventListener != null) {
116: this .eventListener.connectionClosed(conn);
117: }
118: }
119:
120: public void requestReady(final NHttpClientConnection conn) {
121: HttpContext context = conn.getContext();
122:
123: ClientConnState connState = (ClientConnState) context
124: .getAttribute(CONN_STATE);
125: if (connState.getOutputState() != ClientConnState.READY) {
126: return;
127: }
128:
129: try {
130:
131: HttpRequest request = this .execHandler
132: .submitRequest(context);
133: if (request == null) {
134: return;
135: }
136:
137: request.setParams(new DefaultedHttpParams(request
138: .getParams(), this .params));
139:
140: context
141: .setAttribute(ExecutionContext.HTTP_REQUEST,
142: request);
143: this .httpProcessor.process(request, context);
144: connState.setRequest(request);
145: conn.submitRequest(request);
146: connState.setOutputState(ClientConnState.REQUEST_SENT);
147:
148: if (request instanceof HttpEntityEnclosingRequest) {
149: if (((HttpEntityEnclosingRequest) request)
150: .expectContinue()) {
151: int timeout = conn.getSocketTimeout();
152: connState.setTimeout(timeout);
153: timeout = this .params.getIntParameter(
154: CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
155: conn.setSocketTimeout(timeout);
156: connState
157: .setOutputState(ClientConnState.EXPECT_CONTINUE);
158: } else {
159: prepareRequestBody(
160: (HttpEntityEnclosingRequest) request,
161: connState);
162: }
163: }
164:
165: } catch (IOException ex) {
166: shutdownConnection(conn, ex);
167: if (this .eventListener != null) {
168: this .eventListener.fatalIOException(ex, conn);
169: }
170: } catch (HttpException ex) {
171: closeConnection(conn, ex);
172: if (this .eventListener != null) {
173: this .eventListener.fatalProtocolException(ex, conn);
174: }
175: }
176: }
177:
178: public void inputReady(final NHttpClientConnection conn,
179: final ContentDecoder decoder) {
180: HttpContext context = conn.getContext();
181:
182: ClientConnState connState = (ClientConnState) context
183: .getAttribute(CONN_STATE);
184: ContentInputBuffer buffer = connState.getInbuffer();
185:
186: try {
187:
188: buffer.consumeContent(decoder);
189: if (decoder.isCompleted()) {
190: connState
191: .setInputState(ClientConnState.RESPONSE_BODY_DONE);
192: processResponse(conn, connState);
193: } else {
194: connState
195: .setInputState(ClientConnState.RESPONSE_BODY_STREAM);
196: }
197:
198: } catch (IOException ex) {
199: shutdownConnection(conn, ex);
200: if (this .eventListener != null) {
201: this .eventListener.fatalIOException(ex, conn);
202: }
203: } catch (HttpException ex) {
204: closeConnection(conn, ex);
205: if (this .eventListener != null) {
206: this .eventListener.fatalProtocolException(ex, conn);
207: }
208: }
209: }
210:
211: public void outputReady(final NHttpClientConnection conn,
212: final ContentEncoder encoder) {
213: HttpContext context = conn.getContext();
214:
215: ClientConnState connState = (ClientConnState) context
216: .getAttribute(CONN_STATE);
217: ContentOutputBuffer buffer = connState.getOutbuffer();
218:
219: try {
220:
221: if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
222: conn.suspendOutput();
223: return;
224: }
225:
226: buffer.produceContent(encoder);
227: if (encoder.isCompleted()) {
228: connState
229: .setInputState(ClientConnState.REQUEST_BODY_DONE);
230: } else {
231: connState
232: .setInputState(ClientConnState.REQUEST_BODY_STREAM);
233: }
234:
235: } catch (IOException ex) {
236: shutdownConnection(conn, ex);
237: if (this .eventListener != null) {
238: this .eventListener.fatalIOException(ex, conn);
239: }
240: }
241: }
242:
243: public void responseReceived(final NHttpClientConnection conn) {
244: HttpContext context = conn.getContext();
245: ClientConnState connState = (ClientConnState) context
246: .getAttribute(CONN_STATE);
247:
248: HttpResponse response = conn.getHttpResponse();
249: response.setParams(new DefaultedHttpParams(
250: response.getParams(), this .params));
251:
252: HttpRequest request = connState.getRequest();
253:
254: try {
255:
256: int statusCode = response.getStatusLine().getStatusCode();
257: if (statusCode < HttpStatus.SC_OK) {
258: // 1xx intermediate response
259: if (statusCode == HttpStatus.SC_CONTINUE
260: && connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
261: continueRequest(conn, connState);
262: }
263: return;
264: } else {
265: connState.setResponse(response);
266: connState
267: .setInputState(ClientConnState.RESPONSE_RECEIVED);
268:
269: if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
270: cancelRequest(conn, connState);
271: }
272: }
273: if (!canResponseHaveBody(request, response)) {
274: conn.resetInput();
275: response.setEntity(null);
276: processResponse(conn, connState);
277: }
278:
279: } catch (IOException ex) {
280: shutdownConnection(conn, ex);
281: if (this .eventListener != null) {
282: this .eventListener.fatalIOException(ex, conn);
283: }
284: } catch (HttpException ex) {
285: closeConnection(conn, ex);
286: if (this .eventListener != null) {
287: this .eventListener.fatalProtocolException(ex, conn);
288: }
289: }
290: }
291:
292: public void timeout(final NHttpClientConnection conn) {
293: HttpContext context = conn.getContext();
294: ClientConnState connState = (ClientConnState) context
295: .getAttribute(CONN_STATE);
296:
297: try {
298:
299: if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
300: continueRequest(conn, connState);
301: return;
302: }
303:
304: } catch (IOException ex) {
305: shutdownConnection(conn, ex);
306: if (this .eventListener != null) {
307: this .eventListener.fatalIOException(ex, conn);
308: }
309: }
310:
311: handleTimeout(conn);
312: }
313:
314: private void initialize(final NHttpClientConnection conn,
315: final Object attachment) {
316: HttpContext context = conn.getContext();
317:
318: context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
319: this .execHandler.initalizeContext(context, attachment);
320: }
321:
322: private void continueRequest(final NHttpClientConnection conn,
323: final ClientConnState connState) throws IOException {
324:
325: HttpRequest request = connState.getRequest();
326:
327: int timeout = connState.getTimeout();
328: conn.setSocketTimeout(timeout);
329:
330: prepareRequestBody((HttpEntityEnclosingRequest) request,
331: connState);
332: conn.requestOutput();
333: connState.setOutputState(ClientConnState.REQUEST_SENT);
334: }
335:
336: private void cancelRequest(final NHttpClientConnection conn,
337: final ClientConnState connState) throws IOException {
338:
339: int timeout = connState.getTimeout();
340: conn.setSocketTimeout(timeout);
341:
342: conn.resetOutput();
343: connState.resetOutput();
344: }
345:
346: private void prepareRequestBody(
347: final HttpEntityEnclosingRequest request,
348: final ClientConnState connState) throws IOException {
349: HttpEntity entity = request.getEntity();
350: if (entity != null) {
351: OutputStream outstream = new ContentOutputStream(connState
352: .getOutbuffer());
353: entity.writeTo(outstream);
354: outstream.flush();
355: outstream.close();
356: }
357: }
358:
359: private void processResponse(final NHttpClientConnection conn,
360: final ClientConnState connState) throws IOException,
361: HttpException {
362:
363: HttpContext context = conn.getContext();
364: HttpResponse response = connState.getResponse();
365:
366: if (response.getEntity() != null) {
367: response.setEntity(new ContentBufferEntity(response
368: .getEntity(), connState.getInbuffer()));
369: }
370:
371: context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
372:
373: this .httpProcessor.process(response, context);
374:
375: this .execHandler.handleResponse(response, context);
376:
377: if (!this .connStrategy.keepAlive(response, context)) {
378: conn.close();
379: } else {
380: // Ready for another request
381: connState.resetInput();
382: connState.resetOutput();
383: conn.requestOutput();
384: }
385: }
386:
387: static class ClientConnState {
388:
389: public static final int READY = 0;
390: public static final int REQUEST_SENT = 1;
391: public static final int EXPECT_CONTINUE = 2;
392: public static final int REQUEST_BODY_STREAM = 4;
393: public static final int REQUEST_BODY_DONE = 8;
394: public static final int RESPONSE_RECEIVED = 16;
395: public static final int RESPONSE_BODY_STREAM = 32;
396: public static final int RESPONSE_BODY_DONE = 64;
397:
398: private SimpleInputBuffer inbuffer;
399: private ContentOutputBuffer outbuffer;
400:
401: private int inputState;
402: private int outputState;
403:
404: private HttpRequest request;
405: private HttpResponse response;
406:
407: private int timeout;
408: private final ByteBufferAllocator allocator;
409:
410: public ClientConnState(final ByteBufferAllocator allocator) {
411: super ();
412: this .allocator = allocator;
413: }
414:
415: public ContentInputBuffer getInbuffer() {
416: if (this .inbuffer == null) {
417: this .inbuffer = new SimpleInputBuffer(2048, allocator);
418: }
419: return this .inbuffer;
420: }
421:
422: public ContentOutputBuffer getOutbuffer() {
423: if (this .outbuffer == null) {
424: this .outbuffer = new SimpleOutputBuffer(2048, allocator);
425: }
426: return this .outbuffer;
427: }
428:
429: public int getInputState() {
430: return this .inputState;
431: }
432:
433: public void setInputState(int inputState) {
434: this .inputState = inputState;
435: }
436:
437: public int getOutputState() {
438: return this .outputState;
439: }
440:
441: public void setOutputState(int outputState) {
442: this .outputState = outputState;
443: }
444:
445: public HttpRequest getRequest() {
446: return this .request;
447: }
448:
449: public void setRequest(final HttpRequest request) {
450: this .request = request;
451: }
452:
453: public HttpResponse getResponse() {
454: return this .response;
455: }
456:
457: public void setResponse(final HttpResponse response) {
458: this .response = response;
459: }
460:
461: public int getTimeout() {
462: return this .timeout;
463: }
464:
465: public void setTimeout(int timeout) {
466: this .timeout = timeout;
467: }
468:
469: public void resetInput() {
470: this .inbuffer = null;
471: this .response = null;
472: this .inputState = READY;
473: }
474:
475: public void resetOutput() {
476: this.outbuffer = null;
477: this.request = null;
478: this.outputState = READY;
479: }
480: }
481:
482: }
|