001: /*
002: * @(#)ChunkedInputStream.java 1.11 06/10/10
003: *
004: * Copyright 1990-2006 Sun Microsystems, Inc. All Rights Reserved.
005: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER
006: *
007: * This program is free software; you can redistribute it and/or
008: * modify it under the terms of the GNU General Public License version
009: * 2 only, as published by the Free Software Foundation.
010: *
011: * This program is distributed in the hope that it will be useful, but
012: * WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * General Public License version 2 for more details (a copy is
015: * included at /legal/license.txt).
016: *
017: * You should have received a copy of the GNU General Public License
018: * version 2 along with this work; if not, write to the Free Software
019: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA
021: *
022: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa
023: * Clara, CA 95054 or visit www.sun.com if you need additional
024: * information or have any questions.
025: *
026: */
027: package sun.net.www.http;
028:
029: import java.io.*;
030: import java.util.*;
031:
032: import sun.net.www.*;
033:
034: /**
035: * A <code>ChunkedInputStream</code> provides a stream for reading a body of
036: * a http message that can be sent as a series of chunks, each with its own
037: * size indicator. Optionally the last chunk can be followed by trailers
038: * containing entity-header fields.
039: * <p>
040: * A <code>ChunkedInputStream</code> is also <code>Hurryable</code> so it
041: * can be hurried to the end of the stream if the bytes are available on
042: * the underlying stream.
043: */
044: public class ChunkedInputStream extends InputStream implements
045: Hurryable {
046:
047: /**
048: * The underlying stream
049: */
050: private InputStream in;
051:
052: /**
053: * The <code>HttpClient<code> that should be notified when the chunked stream has
054: * completed.
055: */
056: private HttpClient hc;
057:
058: /**
059: * The <code>MessageHeader</code> that is populated with any optional trailer
060: * that appear after the last chunk.
061: */
062: private MessageHeader responses;
063:
064: /**
065: * The size, in bytes, of the chunk that is currently being read.
066: * This size is only valid if the current position in the underlying
067: * input stream is inside a chunk (ie: state == STATE_READING_CHUNK).
068: */
069: private int chunkSize;
070:
071: /**
072: * The number of bytes read from the underlying stream for the current
073: * chunk. This value is always in the range <code>0</code> through to
074: * <code>chunkSize</code>
075: */
076: private int chunkRead;
077:
078: /**
079: * The internal buffer array where chunk data is available for the
080: * application to read.
081: */
082: private byte chunkData[] = new byte[4096];
083:
084: /**
085: * The current position in the buffer. It contains the index
086: * of the next byte to read from <code>chunkData</code>
087: */
088: private int chunkPos;
089:
090: /**
091: * The index one greater than the index of the last valid byte in the
092: * buffer. This value is always in the range <code>0</code> through
093: * <code>chunkData.length</code>.
094: */
095: private int chunkCount;
096:
097: /**
098: * The internal buffer where bytes from the underlying stream can be
099: * read. It may contain bytes representing chunk-size, chunk-data, or
100: * trailer fields.
101: */
102: private byte rawData[] = new byte[32];
103:
104: /**
105: * The current position in the buffer. It contains the index
106: * of the next byte to read from <code>rawData</code>
107: */
108: private int rawPos;
109:
110: /**
111: * The index one greater than the index of the last valid byte in the
112: * buffer. This value is always in the range <code>0</code> through
113: * <code>rawData.length</code>.
114: */
115: private int rawCount;
116:
117: /**
118: * Indicates if an error was encountered when processing the chunked
119: * stream.
120: */
121: private boolean error;
122:
123: /**
124: * Indicates if the chunked stream has been closed using the
125: * <code>close</close> method.
126: */
127: private boolean closed;
128:
129: /**
130: * State to indicate that next field should be :-
131: * chunk-size [ chunk-extension ] CRLF
132: */
133: static final int STATE_AWAITING_CHUNK_HEADER = 1;
134:
135: /**
136: * State to indicate that we are currently reading the chunk-data.
137: */
138: static final int STATE_READING_CHUNK = 2;
139:
140: /**
141: * Indicates that a chunk has been completely read and the next
142: * fields to be examine should be CRLF
143: */
144: static final int STATE_AWAITING_CHUNK_EOL = 3;
145:
146: /**
147: * Indicates that all chunks have been read and the next field
148: * should be optional trailers or an indication that the chunked
149: * stream is complete.
150: */
151: static final int STATE_AWAITING_TRAILERS = 4;
152:
153: /**
154: * State to indicate that the chunked stream is complete and
155: * no further bytes should be read from the underlying stream.
156: */
157: static final int STATE_DONE = 5;
158:
159: /**
160: * Indicates the current state.
161: */
162: private int state;
163:
164: /**
165: * Check to make sure that this stream has not been closed.
166: */
167: private void ensureOpen() throws IOException {
168: if (closed) {
169: throw new IOException("stream is closed");
170: }
171: }
172:
173: /**
174: * Ensures there is <code>size</code> bytes available in
175: * <code>rawData<code>. This requires that we either
176: * shift the bytes in use to the begining of the buffer
177: * or allocate a large buffer with sufficient space available.
178: */
179: private void ensureRawAvailable(int size) {
180: if (rawCount + size > rawData.length) {
181: int used = rawCount - rawPos;
182: if (used + size > rawData.length) {
183: byte tmp[] = new byte[used + size];
184: if (used > 0) {
185: System.arraycopy(rawData, rawPos, tmp, 0, used);
186: }
187: rawData = tmp;
188: } else {
189: if (used > 0) {
190: System.arraycopy(rawData, rawPos, rawData, 0, used);
191: }
192: }
193: rawCount = used;
194: rawPos = 0;
195: }
196: }
197:
198: /**
199: * Close the underlying input stream by either returning it to the
200: * keep alive cache or closing the stream.
201: * <p>
202: * As a chunked stream is inheritly persistent (see HTTP 1.1 RFC) the
203: * underlying stream can be returned to the keep alive cache if the
204: * stream can be completely read without error.
205: */
206: private void closeUnderlying() throws IOException {
207: if (in == null) {
208: return;
209: }
210:
211: if (!error && state == STATE_DONE) {
212: hc.finished();
213: } else {
214: if (!hurry()) {
215: hc.closeServer();
216: }
217: }
218:
219: in = null;
220: }
221:
222: /**
223: * Attempt to read the remainder of a chunk directly into the
224: * caller's buffer.
225: * <p>
226: * Return the number of bytes read.
227: */
228: private int fastRead(byte[] b, int off, int len) throws IOException {
229:
230: // assert state == STATE_READING_CHUNKS;
231:
232: int remaining = chunkSize - chunkRead;
233: int cnt = (remaining < len) ? remaining : len;
234: if (cnt > 0) {
235: int nread;
236: try {
237: nread = in.read(b, off, cnt);
238: } catch (IOException e) {
239: error = true;
240: throw e;
241: }
242: if (nread > 0) {
243: chunkRead += nread;
244: if (chunkRead >= chunkSize) {
245: state = STATE_AWAITING_CHUNK_EOL;
246: }
247: return nread;
248: }
249: error = true;
250: throw new IOException("Premature EOF");
251: } else {
252: return 0;
253: }
254: }
255:
256: /**
257: * Process any outstanding bytes that have already been read into
258: * <code>rawData</code>.
259: * <p>
260: * The parsing of the chunked stream is performed as a state machine with
261: * <code>state</code> representing the current state of the processing.
262: * <p>
263: * Returns when either all the outstanding bytes in rawData have been
264: * processed or there is insufficient bytes available to continue
265: * processing. When the latter occurs <code>rawPos</code> will not have
266: * been updated and thus the processing can be restarted once further
267: * bytes have been read into <code>rawData</code>.
268: */
269: private void processRaw() throws IOException {
270: int pos;
271: int i;
272:
273: while (state != STATE_DONE) {
274:
275: switch (state) {
276:
277: /**
278: * We are awaiting a line with a chunk header
279: */
280: case STATE_AWAITING_CHUNK_HEADER:
281: /*
282: * Find \n to indicate end of chunk header. If not found when there is
283: * insufficient bytes in the raw buffer to parse a chunk header.
284: */
285: pos = rawPos;
286: while (pos < rawCount) {
287: if (rawData[pos] == '\n') {
288: break;
289: }
290: pos++;
291: }
292: if (pos >= rawCount) {
293: return;
294: }
295:
296: /*
297: * Extract the chunk size from the header (ignoring extensions).
298: */
299: String header = new String(rawData, rawPos, pos
300: - rawPos + 1);
301: for (i = 0; i < header.length(); i++) {
302: if (Character.digit(header.charAt(i), 16) == -1)
303: break;
304: }
305: try {
306: chunkSize = Integer.parseInt(
307: header.substring(0, i), 16);
308: } catch (NumberFormatException e) {
309: error = true;
310: throw new IOException("Bogus chunk size");
311: }
312:
313: /*
314: * Chunk has been parsed so move rawPos to first byte of chunk
315: * data.
316: */
317: rawPos = pos + 1;
318: chunkRead = 0;
319:
320: /*
321: * A chunk size of 0 means EOF.
322: */
323: if (chunkSize > 0) {
324: state = STATE_READING_CHUNK;
325: } else {
326: state = STATE_AWAITING_TRAILERS;
327: }
328: break;
329:
330: /**
331: * We are awaiting raw entity data (some may have already been
332: * read). chunkSize is the size of the chunk; chunkRead is the
333: * total read from the underlying stream to date.
334: */
335: case STATE_READING_CHUNK:
336: /* no data available yet */
337: if (rawPos >= rawCount) {
338: return;
339: }
340:
341: /*
342: * Compute the number of bytes of chunk data available in the
343: * raw buffer.
344: */
345: int copyLen = Math.min(chunkSize - chunkRead, rawCount
346: - rawPos);
347:
348: /*
349: * Expand or compact chunkData if needed.
350: */
351: if (chunkData.length < chunkCount + copyLen) {
352: int cnt = chunkCount - chunkPos;
353: if (chunkData.length < cnt + copyLen) {
354: byte tmp[] = new byte[cnt + copyLen];
355: System.arraycopy(chunkData, chunkPos, tmp, 0,
356: cnt);
357: chunkData = tmp;
358: } else {
359: System.arraycopy(chunkData, chunkPos,
360: chunkData, 0, cnt);
361: }
362: chunkPos = 0;
363: chunkCount = cnt;
364: }
365:
366: /*
367: * Copy the chunk data into chunkData so that it's available
368: * to the read methods.
369: */
370: System.arraycopy(rawData, rawPos, chunkData,
371: chunkCount, copyLen);
372: rawPos += copyLen;
373: chunkCount += copyLen;
374: chunkRead += copyLen;
375:
376: /*
377: * If all the chunk has been copied into chunkData then the next
378: * token should be CRLF.
379: */
380: if (chunkSize - chunkRead <= 0) {
381: state = STATE_AWAITING_CHUNK_EOL;
382: } else {
383: return;
384: }
385: break;
386:
387: /**
388: * Awaiting CRLF after the chunk
389: */
390: case STATE_AWAITING_CHUNK_EOL:
391: /* not available yet */
392: if (rawPos + 1 >= rawCount) {
393: return;
394: }
395:
396: if (rawData[rawPos] != '\r') {
397: error = true;
398: throw new IOException("missing CR");
399: }
400: if (rawData[rawPos + 1] != '\n') {
401: error = true;
402: throw new IOException("missing LF");
403: }
404: rawPos += 2;
405:
406: /*
407: * Move onto the next chunk
408: */
409: state = STATE_AWAITING_CHUNK_HEADER;
410: break;
411:
412: /**
413: * Last chunk has been read so not we're waiting for optional
414: * trailers.
415: */
416: case STATE_AWAITING_TRAILERS:
417:
418: /*
419: * Do we have an entire line in the raw buffer?
420: */
421: pos = rawPos;
422: while (pos < rawCount) {
423: if (rawData[pos] == '\n') {
424: break;
425: }
426: pos++;
427: }
428: if (pos >= rawCount) {
429: return;
430: }
431:
432: if (pos == rawPos) {
433: error = true;
434: throw new IOException(
435: "LF should be proceeded by CR");
436: }
437: if (rawData[pos - 1] != '\r') {
438: error = true;
439: throw new IOException(
440: "LF should be proceeded by CR");
441: }
442:
443: /*
444: * Stream done so close underlying stream.
445: */
446: if (pos == (rawPos + 1)) {
447:
448: state = STATE_DONE;
449: closeUnderlying();
450:
451: return;
452: }
453:
454: /*
455: * Extract any tailers and append them to the message
456: * headers.
457: */
458: String trailer = new String(rawData, rawPos, pos
459: - rawPos);
460: i = trailer.indexOf(':');
461: if (i == -1) {
462: throw new IOException(
463: "Malformed tailer - format should be key:value");
464: }
465: String key = (trailer.substring(0, i)).trim();
466: String value = (trailer.substring(i + 1, trailer
467: .length())).trim();
468:
469: responses.add(key, value);
470:
471: /*
472: * Move onto the next trailer.
473: */
474: rawPos = pos + 1;
475: break;
476:
477: } /* switch */
478: }
479: }
480:
481: /**
482: * Reads any available bytes from the underlying stream into
483: * <code>rawData</code> and returns the number of bytes of
484: * chunk data available in <code>chunkData</code> that the
485: * application can read.
486: */
487: private int readAheadNonBlocking() throws IOException {
488:
489: /*
490: * If there's anything available on the underlying stream then we read
491: * it into the raw buffer and process it. Processing ensures that any
492: * available chunk data is made available in chunkData.
493: */
494: int avail = in.available();
495: if (avail > 0) {
496:
497: /* ensure that there is space in rawData to read the available */
498: ensureRawAvailable(avail);
499:
500: int nread;
501: try {
502: nread = in.read(rawData, rawCount, avail);
503: } catch (IOException e) {
504: error = true;
505: throw e;
506: }
507: if (nread < 0) {
508: error = true; /* premature EOF ? */
509: return -1;
510: }
511: rawCount += nread;
512:
513: /*
514: * Process the raw bytes that have been read.
515: */
516: processRaw();
517: }
518:
519: /*
520: * Return the number of chunked bytes available to read
521: */
522: return chunkCount - chunkPos;
523: }
524:
525: /**
526: * Reads from the underlying stream until there is chunk data
527: * available in <code>chunkData</code> for the application to
528: * read.
529: */
530: private int readAheadBlocking() throws IOException {
531:
532: do {
533: /*
534: * All of chunked response has been read to return EOF.
535: */
536: if (state == STATE_DONE) {
537: return -1;
538: }
539:
540: /*
541: * We must read into the raw buffer so make sure there is space
542: * available. We use a size of 32 to avoid too much chunk data
543: * being read into the raw buffer.
544: */
545: ensureRawAvailable(32);
546: int nread;
547: try {
548: nread = in.read(rawData, rawCount, rawData.length
549: - rawCount);
550: } catch (IOException e) {
551: error = true;
552: throw e;
553: }
554:
555: /**
556: * If we hit EOF it means there's a problem as we should never
557: * attempt to read once the last chunk and trailers have been
558: * received.
559: */
560: if (nread < 0) {
561: error = true;
562: throw new IOException("Premature EOF");
563: }
564:
565: /**
566: * Process the bytes from the underlying stream
567: */
568: rawCount += nread;
569: processRaw();
570:
571: } while (chunkCount <= 0);
572:
573: /*
574: * Return the number of chunked bytes available to read
575: */
576: return chunkCount - chunkPos;
577: }
578:
579: /**
580: * Read ahead in either blocking or non-blocking mode. This method
581: * is typically used when we run out of available bytes in
582: * <code>chunkData<code> or we need to determine how many bytes
583: * are available on the input stream.
584: */
585: private int readAhead(boolean allowBlocking) throws IOException {
586:
587: /*
588: * Last chunk already received - return EOF
589: */
590: if (state == STATE_DONE) {
591: return -1;
592: }
593:
594: /*
595: * Reset position/count if data in chunkData is exhausted.
596: */
597: if (chunkPos >= chunkCount) {
598: chunkCount = 0;
599: chunkPos = 0;
600: }
601:
602: /*
603: * Read ahead blocking or non-blocking
604: */
605: if (allowBlocking) {
606: return readAheadBlocking();
607: } else {
608: return readAheadNonBlocking();
609: }
610: }
611:
612: /**
613: * Creates a <code>ChunkedInputStream</code> and saves its arguments, for
614: * later use.
615: *
616: * @param in the underlying input stream.
617: * @param hc the HttpClient
618: * @param responses the MessageHeader that should be populated with optional
619: * trailers.
620: */
621: public ChunkedInputStream(InputStream in, HttpClient hc,
622: MessageHeader responses) throws IOException {
623:
624: /* save arguments */
625: this .in = in;
626: this .responses = responses;
627: this .hc = hc;
628:
629: /*
630: * Set our initial state to indicate that we are first starting to
631: * look for a chunk header.
632: */
633: state = STATE_AWAITING_CHUNK_HEADER;
634: }
635:
636: /**
637: * See
638: * the general contract of the <code>read</code>
639: * method of <code>InputStream</code>.
640: *
641: * @return the next byte of data, or <code>-1</code> if the end of the
642: * stream is reached.
643: * @exception IOException if an I/O error occurs.
644: * @see java.io.FilterInputStream#in
645: */
646: public synchronized int read() throws IOException {
647: ensureOpen();
648: if (chunkPos >= chunkCount) {
649: if (readAhead(true) <= 0) {
650: return -1;
651: }
652: }
653: return chunkData[chunkPos++] & 0xff;
654: }
655:
656: /**
657: * Reads bytes from this stream into the specified byte array, starting at
658: * the given offset.
659: *
660: * @param b destination buffer.
661: * @param off offset at which to start storing bytes.
662: * @param len maximum number of bytes to read.
663: * @return the number of bytes read, or <code>-1</code> if the end of
664: * the stream has been reached.
665: * @exception IOException if an I/O error occurs.
666: */
667: public synchronized int read(byte b[], int off, int len)
668: throws IOException {
669: ensureOpen();
670: if ((off < 0) || (off > b.length) || (len < 0)
671: || ((off + len) > b.length) || ((off + len) < 0)) {
672: throw new IndexOutOfBoundsException();
673: } else if (len == 0) {
674: return 0;
675: }
676:
677: int avail = chunkCount - chunkPos;
678: if (avail <= 0) {
679: /*
680: * Optimization: if we're in the middle of the chunk read
681: * directly from the underlying stream into the caller's
682: * buffer
683: */
684: if (state == STATE_READING_CHUNK) {
685: return fastRead(b, off, len);
686: }
687:
688: /*
689: * We're not in the middle of a chunk so we must read ahead
690: * until there is some chunk data available.
691: */
692: avail = readAhead(true);
693: if (avail < 0) {
694: return -1; /* EOF */
695: }
696: }
697: int cnt = (avail < len) ? avail : len;
698: System.arraycopy(chunkData, chunkPos, b, off, cnt);
699: chunkPos += cnt;
700:
701: return cnt;
702: }
703:
704: /**
705: * Returns the number of bytes that can be read from this input
706: * stream without blocking.
707: *
708: * @return the number of bytes that can be read from this input
709: * stream without blocking.
710: * @exception IOException if an I/O error occurs.
711: * @see java.io.FilterInputStream#in
712: */
713: public synchronized int available() throws IOException {
714: ensureOpen();
715: int avail = readAhead(false);
716: if (avail < 0) {
717: return 0;
718: } else {
719: return avail;
720: }
721: }
722:
723: /**
724: * Close the stream by either returning the connection to the
725: * keep alive cache or closing the underlying stream.
726: * <p>
727: * If the chunked response hasn't been completely read we
728: * try to "hurry" to the end of the response. If this is
729: * possible (without blocking) then the connection can be
730: * returned to the keep alive cache.
731: *
732: * @exception IOException if an I/O error occurs.
733: */
734: public synchronized void close() throws IOException {
735: if (closed) {
736: return;
737: }
738: closeUnderlying();
739: closed = true;
740: }
741:
742: /**
743: * Hurry the input stream by reading everything from the underlying
744: * stream. If the last chunk (and optional trailers) can be read without
745: * blocking then the stream is considered hurried.
746: * <p>
747: * Note that if an error has occured or we can't get to last chunk
748: * without blocking then this stream can't be hurried and should be
749: * closed.
750: */
751: public synchronized boolean hurry() {
752: if (in == null || error) {
753: return false;
754: }
755:
756: try {
757: readAhead(false);
758: } catch (Exception e) {
759: return false;
760: }
761:
762: if (error) {
763: return false;
764: }
765:
766: return (state == STATE_DONE);
767: }
768:
769: }
|