001: /*
002: * Copyright 1999-2006 Sun Microsystems, Inc. All Rights Reserved.
003: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
004: *
005: * This code is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU General Public License version 2 only, as
007: * published by the Free Software Foundation. Sun designates this
008: * particular file as subject to the "Classpath" exception as provided
009: * by Sun in the LICENSE file that accompanied this code.
010: *
011: * This code is distributed in the hope that it will be useful, but WITHOUT
012: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
014: * version 2 for more details (a copy is included in the LICENSE file that
015: * accompanied this code).
016: *
017: * You should have received a copy of the GNU General Public License version
018: * 2 along with this work; if not, write to the Free Software Foundation,
019: * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
020: *
021: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
022: * CA 95054 USA or visit www.sun.com if you need additional information or
023: * have any questions.
024: */
025: package sun.net.www.http;
026:
027: import java.io.*;
028: import java.util.*;
029:
030: import sun.net.*;
031: import sun.net.www.*;
032:
033: /**
034: * A <code>ChunkedInputStream</code> provides a stream for reading a body of
035: * a http message that can be sent as a series of chunks, each with its own
036: * size indicator. Optionally the last chunk can be followed by trailers
037: * containing entity-header fields.
038: * <p>
039: * A <code>ChunkedInputStream</code> is also <code>Hurryable</code> so it
040: * can be hurried to the end of the stream if the bytes are available on
041: * the underlying stream.
042: */
043: public class ChunkedInputStream extends InputStream implements
044: Hurryable {
045:
046: /**
047: * The underlying stream
048: */
049: private InputStream in;
050:
051: /**
052: * The <code>HttpClient</code> that should be notified when the chunked stream has
053: * completed.
054: */
055: private HttpClient hc;
056:
057: /**
058: * The <code>MessageHeader</code> that is populated with any optional trailer
059: * that appear after the last chunk.
060: */
061: private MessageHeader responses;
062:
063: /**
064: * The size, in bytes, of the chunk that is currently being read.
065: * This size is only valid if the current position in the underlying
066: * input stream is inside a chunk (ie: state == STATE_READING_CHUNK).
067: */
068: private int chunkSize;
069:
070: /**
071: * The number of bytes read from the underlying stream for the current
072: * chunk. This value is always in the range <code>0</code> through to
073: * <code>chunkSize</code>
074: */
075: private int chunkRead;
076:
077: /**
078: * The internal buffer array where chunk data is available for the
079: * application to read.
080: */
081: private byte chunkData[] = new byte[4096];
082:
083: /**
084: * The current position in the buffer. It contains the index
085: * of the next byte to read from <code>chunkData</code>
086: */
087: private int chunkPos;
088:
089: /**
090: * The index one greater than the index of the last valid byte in the
091: * buffer. This value is always in the range <code>0</code> through
092: * <code>chunkData.length</code>.
093: */
094: private int chunkCount;
095:
096: /**
097: * The internal buffer where bytes from the underlying stream can be
098: * read. It may contain bytes representing chunk-size, chunk-data, or
099: * trailer fields.
100: */
101: private byte rawData[] = new byte[32];
102:
103: /**
104: * The current position in the buffer. It contains the index
105: * of the next byte to read from <code>rawData</code>
106: */
107: private int rawPos;
108:
109: /**
110: * The index one greater than the index of the last valid byte in the
111: * buffer. This value is always in the range <code>0</code> through
112: * <code>rawData.length</code>.
113: */
114: private int rawCount;
115:
116: /**
117: * Indicates if an error was encountered when processing the chunked
118: * stream.
119: */
120: private boolean error;
121:
122: /**
123: * Indicates if the chunked stream has been closed using the
124: * <code>close</code> method.
125: */
126: private boolean closed;
127:
128: /**
129: * State to indicate that next field should be :-
130: * chunk-size [ chunk-extension ] CRLF
131: */
132: static final int STATE_AWAITING_CHUNK_HEADER = 1;
133:
134: /**
135: * State to indicate that we are currently reading the chunk-data.
136: */
137: static final int STATE_READING_CHUNK = 2;
138:
139: /**
140: * Indicates that a chunk has been completely read and the next
141: * fields to be examine should be CRLF
142: */
143: static final int STATE_AWAITING_CHUNK_EOL = 3;
144:
145: /**
146: * Indicates that all chunks have been read and the next field
147: * should be optional trailers or an indication that the chunked
148: * stream is complete.
149: */
150: static final int STATE_AWAITING_TRAILERS = 4;
151:
152: /**
153: * State to indicate that the chunked stream is complete and
154: * no further bytes should be read from the underlying stream.
155: */
156: static final int STATE_DONE = 5;
157:
158: /**
159: * Indicates the current state.
160: */
161: private int state;
162:
163: /**
164: * Check to make sure that this stream has not been closed.
165: */
166: private void ensureOpen() throws IOException {
167: if (closed) {
168: throw new IOException("stream is closed");
169: }
170: }
171:
172: /**
173: * Ensures there is <code>size</code> bytes available in
174: * <code>rawData</code>. This requires that we either
175: * shift the bytes in use to the begining of the buffer
176: * or allocate a large buffer with sufficient space available.
177: */
178: private void ensureRawAvailable(int size) {
179: if (rawCount + size > rawData.length) {
180: int used = rawCount - rawPos;
181: if (used + size > rawData.length) {
182: byte tmp[] = new byte[used + size];
183: if (used > 0) {
184: System.arraycopy(rawData, rawPos, tmp, 0, used);
185: }
186: rawData = tmp;
187: } else {
188: if (used > 0) {
189: System.arraycopy(rawData, rawPos, rawData, 0, used);
190: }
191: }
192: rawCount = used;
193: rawPos = 0;
194: }
195: }
196:
197: /**
198: * Close the underlying input stream by either returning it to the
199: * keep alive cache or closing the stream.
200: * <p>
201: * As a chunked stream is inheritly persistent (see HTTP 1.1 RFC) the
202: * underlying stream can be returned to the keep alive cache if the
203: * stream can be completely read without error.
204: */
205: private void closeUnderlying() throws IOException {
206: if (in == null) {
207: return;
208: }
209:
210: if (!error && state == STATE_DONE) {
211: hc.finished();
212: } else {
213: if (!hurry()) {
214: hc.closeServer();
215: }
216: }
217:
218: in = null;
219: }
220:
221: /**
222: * Attempt to read the remainder of a chunk directly into the
223: * caller's buffer.
224: * <p>
225: * Return the number of bytes read.
226: */
227: private int fastRead(byte[] b, int off, int len) throws IOException {
228:
229: // assert state == STATE_READING_CHUNKS;
230:
231: int remaining = chunkSize - chunkRead;
232: int cnt = (remaining < len) ? remaining : len;
233: if (cnt > 0) {
234: int nread;
235: try {
236: nread = in.read(b, off, cnt);
237: } catch (IOException e) {
238: error = true;
239: throw e;
240: }
241: if (nread > 0) {
242: chunkRead += nread;
243: if (chunkRead >= chunkSize) {
244: state = STATE_AWAITING_CHUNK_EOL;
245: }
246: return nread;
247: }
248: error = true;
249: throw new IOException("Premature EOF");
250: } else {
251: return 0;
252: }
253: }
254:
255: /**
256: * Process any outstanding bytes that have already been read into
257: * <code>rawData</code>.
258: * <p>
259: * The parsing of the chunked stream is performed as a state machine with
260: * <code>state</code> representing the current state of the processing.
261: * <p>
262: * Returns when either all the outstanding bytes in rawData have been
263: * processed or there is insufficient bytes available to continue
264: * processing. When the latter occurs <code>rawPos</code> will not have
265: * been updated and thus the processing can be restarted once further
266: * bytes have been read into <code>rawData</code>.
267: */
268: private void processRaw() throws IOException {
269: int pos;
270: int i;
271:
272: while (state != STATE_DONE) {
273:
274: switch (state) {
275:
276: /**
277: * We are awaiting a line with a chunk header
278: */
279: case STATE_AWAITING_CHUNK_HEADER:
280: /*
281: * Find \n to indicate end of chunk header. If not found when there is
282: * insufficient bytes in the raw buffer to parse a chunk header.
283: */
284: pos = rawPos;
285: while (pos < rawCount) {
286: if (rawData[pos] == '\n') {
287: break;
288: }
289: pos++;
290: }
291: if (pos >= rawCount) {
292: return;
293: }
294:
295: /*
296: * Extract the chunk size from the header (ignoring extensions).
297: */
298: String header = new String(rawData, rawPos, pos
299: - rawPos + 1, "US-ASCII");
300: for (i = 0; i < header.length(); i++) {
301: if (Character.digit(header.charAt(i), 16) == -1)
302: break;
303: }
304: try {
305: chunkSize = Integer.parseInt(
306: header.substring(0, i), 16);
307: } catch (NumberFormatException e) {
308: error = true;
309: throw new IOException("Bogus chunk size");
310: }
311:
312: /*
313: * Chunk has been parsed so move rawPos to first byte of chunk
314: * data.
315: */
316: rawPos = pos + 1;
317: chunkRead = 0;
318:
319: /*
320: * A chunk size of 0 means EOF.
321: */
322: if (chunkSize > 0) {
323: state = STATE_READING_CHUNK;
324: } else {
325: state = STATE_AWAITING_TRAILERS;
326: }
327: break;
328:
329: /**
330: * We are awaiting raw entity data (some may have already been
331: * read). chunkSize is the size of the chunk; chunkRead is the
332: * total read from the underlying stream to date.
333: */
334: case STATE_READING_CHUNK:
335: /* no data available yet */
336: if (rawPos >= rawCount) {
337: return;
338: }
339:
340: /*
341: * Compute the number of bytes of chunk data available in the
342: * raw buffer.
343: */
344: int copyLen = Math.min(chunkSize - chunkRead, rawCount
345: - rawPos);
346:
347: /*
348: * Expand or compact chunkData if needed.
349: */
350: if (chunkData.length < chunkCount + copyLen) {
351: int cnt = chunkCount - chunkPos;
352: if (chunkData.length < cnt + copyLen) {
353: byte tmp[] = new byte[cnt + copyLen];
354: System.arraycopy(chunkData, chunkPos, tmp, 0,
355: cnt);
356: chunkData = tmp;
357: } else {
358: System.arraycopy(chunkData, chunkPos,
359: chunkData, 0, cnt);
360: }
361: chunkPos = 0;
362: chunkCount = cnt;
363: }
364:
365: /*
366: * Copy the chunk data into chunkData so that it's available
367: * to the read methods.
368: */
369: System.arraycopy(rawData, rawPos, chunkData,
370: chunkCount, copyLen);
371: rawPos += copyLen;
372: chunkCount += copyLen;
373: chunkRead += copyLen;
374:
375: /*
376: * If all the chunk has been copied into chunkData then the next
377: * token should be CRLF.
378: */
379: if (chunkSize - chunkRead <= 0) {
380: state = STATE_AWAITING_CHUNK_EOL;
381: } else {
382: return;
383: }
384: break;
385:
386: /**
387: * Awaiting CRLF after the chunk
388: */
389: case STATE_AWAITING_CHUNK_EOL:
390: /* not available yet */
391: if (rawPos + 1 >= rawCount) {
392: return;
393: }
394:
395: if (rawData[rawPos] != '\r') {
396: error = true;
397: throw new IOException("missing CR");
398: }
399: if (rawData[rawPos + 1] != '\n') {
400: error = true;
401: throw new IOException("missing LF");
402: }
403: rawPos += 2;
404:
405: /*
406: * Move onto the next chunk
407: */
408: state = STATE_AWAITING_CHUNK_HEADER;
409: break;
410:
411: /**
412: * Last chunk has been read so not we're waiting for optional
413: * trailers.
414: */
415: case STATE_AWAITING_TRAILERS:
416:
417: /*
418: * Do we have an entire line in the raw buffer?
419: */
420: pos = rawPos;
421: while (pos < rawCount) {
422: if (rawData[pos] == '\n') {
423: break;
424: }
425: pos++;
426: }
427: if (pos >= rawCount) {
428: return;
429: }
430:
431: if (pos == rawPos) {
432: error = true;
433: throw new IOException(
434: "LF should be proceeded by CR");
435: }
436: if (rawData[pos - 1] != '\r') {
437: error = true;
438: throw new IOException(
439: "LF should be proceeded by CR");
440: }
441:
442: /*
443: * Stream done so close underlying stream.
444: */
445: if (pos == (rawPos + 1)) {
446:
447: state = STATE_DONE;
448: closeUnderlying();
449:
450: return;
451: }
452:
453: /*
454: * Extract any tailers and append them to the message
455: * headers.
456: */
457: String trailer = new String(rawData, rawPos, pos
458: - rawPos, "US-ASCII");
459: i = trailer.indexOf(':');
460: if (i == -1) {
461: throw new IOException(
462: "Malformed tailer - format should be key:value");
463: }
464: String key = (trailer.substring(0, i)).trim();
465: String value = (trailer.substring(i + 1, trailer
466: .length())).trim();
467:
468: responses.add(key, value);
469:
470: /*
471: * Move onto the next trailer.
472: */
473: rawPos = pos + 1;
474: break;
475:
476: } /* switch */
477: }
478: }
479:
480: /**
481: * Reads any available bytes from the underlying stream into
482: * <code>rawData</code> and returns the number of bytes of
483: * chunk data available in <code>chunkData</code> that the
484: * application can read.
485: */
486: private int readAheadNonBlocking() throws IOException {
487:
488: /*
489: * If there's anything available on the underlying stream then we read
490: * it into the raw buffer and process it. Processing ensures that any
491: * available chunk data is made available in chunkData.
492: */
493: int avail = in.available();
494: if (avail > 0) {
495:
496: /* ensure that there is space in rawData to read the available */
497: ensureRawAvailable(avail);
498:
499: int nread;
500: try {
501: nread = in.read(rawData, rawCount, avail);
502: } catch (IOException e) {
503: error = true;
504: throw e;
505: }
506: if (nread < 0) {
507: error = true; /* premature EOF ? */
508: return -1;
509: }
510: rawCount += nread;
511:
512: /*
513: * Process the raw bytes that have been read.
514: */
515: processRaw();
516: }
517:
518: /*
519: * Return the number of chunked bytes available to read
520: */
521: return chunkCount - chunkPos;
522: }
523:
524: /**
525: * Reads from the underlying stream until there is chunk data
526: * available in <code>chunkData</code> for the application to
527: * read.
528: */
529: private int readAheadBlocking() throws IOException {
530:
531: do {
532: /*
533: * All of chunked response has been read to return EOF.
534: */
535: if (state == STATE_DONE) {
536: return -1;
537: }
538:
539: /*
540: * We must read into the raw buffer so make sure there is space
541: * available. We use a size of 32 to avoid too much chunk data
542: * being read into the raw buffer.
543: */
544: ensureRawAvailable(32);
545: int nread;
546: try {
547: nread = in.read(rawData, rawCount, rawData.length
548: - rawCount);
549: } catch (IOException e) {
550: error = true;
551: throw e;
552: }
553:
554: /**
555: * If we hit EOF it means there's a problem as we should never
556: * attempt to read once the last chunk and trailers have been
557: * received.
558: */
559: if (nread < 0) {
560: error = true;
561: throw new IOException("Premature EOF");
562: }
563:
564: /**
565: * Process the bytes from the underlying stream
566: */
567: rawCount += nread;
568: processRaw();
569:
570: } while (chunkCount <= 0);
571:
572: /*
573: * Return the number of chunked bytes available to read
574: */
575: return chunkCount - chunkPos;
576: }
577:
578: /**
579: * Read ahead in either blocking or non-blocking mode. This method
580: * is typically used when we run out of available bytes in
581: * <code>chunkData</code> or we need to determine how many bytes
582: * are available on the input stream.
583: */
584: private int readAhead(boolean allowBlocking) throws IOException {
585:
586: /*
587: * Last chunk already received - return EOF
588: */
589: if (state == STATE_DONE) {
590: return -1;
591: }
592:
593: /*
594: * Reset position/count if data in chunkData is exhausted.
595: */
596: if (chunkPos >= chunkCount) {
597: chunkCount = 0;
598: chunkPos = 0;
599: }
600:
601: /*
602: * Read ahead blocking or non-blocking
603: */
604: if (allowBlocking) {
605: return readAheadBlocking();
606: } else {
607: return readAheadNonBlocking();
608: }
609: }
610:
611: /**
612: * Creates a <code>ChunkedInputStream</code> and saves its arguments, for
613: * later use.
614: *
615: * @param in the underlying input stream.
616: * @param hc the HttpClient
617: * @param responses the MessageHeader that should be populated with optional
618: * trailers.
619: */
620: public ChunkedInputStream(InputStream in, HttpClient hc,
621: MessageHeader responses) throws IOException {
622:
623: /* save arguments */
624: this .in = in;
625: this .responses = responses;
626: this .hc = hc;
627:
628: /*
629: * Set our initial state to indicate that we are first starting to
630: * look for a chunk header.
631: */
632: state = STATE_AWAITING_CHUNK_HEADER;
633: }
634:
635: /**
636: * See
637: * the general contract of the <code>read</code>
638: * method of <code>InputStream</code>.
639: *
640: * @return the next byte of data, or <code>-1</code> if the end of the
641: * stream is reached.
642: * @exception IOException if an I/O error occurs.
643: * @see java.io.FilterInputStream#in
644: */
645: public synchronized int read() throws IOException {
646: ensureOpen();
647: if (chunkPos >= chunkCount) {
648: if (readAhead(true) <= 0) {
649: return -1;
650: }
651: }
652: return chunkData[chunkPos++] & 0xff;
653: }
654:
655: /**
656: * Reads bytes from this stream into the specified byte array, starting at
657: * the given offset.
658: *
659: * @param b destination buffer.
660: * @param off offset at which to start storing bytes.
661: * @param len maximum number of bytes to read.
662: * @return the number of bytes read, or <code>-1</code> if the end of
663: * the stream has been reached.
664: * @exception IOException if an I/O error occurs.
665: */
666: public synchronized int read(byte b[], int off, int len)
667: throws IOException {
668: ensureOpen();
669: if ((off < 0) || (off > b.length) || (len < 0)
670: || ((off + len) > b.length) || ((off + len) < 0)) {
671: throw new IndexOutOfBoundsException();
672: } else if (len == 0) {
673: return 0;
674: }
675:
676: int avail = chunkCount - chunkPos;
677: if (avail <= 0) {
678: /*
679: * Optimization: if we're in the middle of the chunk read
680: * directly from the underlying stream into the caller's
681: * buffer
682: */
683: if (state == STATE_READING_CHUNK) {
684: return fastRead(b, off, len);
685: }
686:
687: /*
688: * We're not in the middle of a chunk so we must read ahead
689: * until there is some chunk data available.
690: */
691: avail = readAhead(true);
692: if (avail < 0) {
693: return -1; /* EOF */
694: }
695: }
696: int cnt = (avail < len) ? avail : len;
697: System.arraycopy(chunkData, chunkPos, b, off, cnt);
698: chunkPos += cnt;
699:
700: return cnt;
701: }
702:
703: /**
704: * Returns the number of bytes that can be read from this input
705: * stream without blocking.
706: *
707: * @return the number of bytes that can be read from this input
708: * stream without blocking.
709: * @exception IOException if an I/O error occurs.
710: * @see java.io.FilterInputStream#in
711: */
712: public synchronized int available() throws IOException {
713: ensureOpen();
714:
715: int avail = chunkCount - chunkPos;
716: if (avail > 0) {
717: return avail;
718: }
719:
720: avail = readAhead(false);
721:
722: if (avail < 0) {
723: return 0;
724: } else {
725: return avail;
726: }
727: }
728:
729: /**
730: * Close the stream by either returning the connection to the
731: * keep alive cache or closing the underlying stream.
732: * <p>
733: * If the chunked response hasn't been completely read we
734: * try to "hurry" to the end of the response. If this is
735: * possible (without blocking) then the connection can be
736: * returned to the keep alive cache.
737: *
738: * @exception IOException if an I/O error occurs.
739: */
740: public synchronized void close() throws IOException {
741: if (closed) {
742: return;
743: }
744: closeUnderlying();
745: closed = true;
746: }
747:
748: /**
749: * Hurry the input stream by reading everything from the underlying
750: * stream. If the last chunk (and optional trailers) can be read without
751: * blocking then the stream is considered hurried.
752: * <p>
753: * Note that if an error has occured or we can't get to last chunk
754: * without blocking then this stream can't be hurried and should be
755: * closed.
756: */
757: public synchronized boolean hurry() {
758: if (in == null || error) {
759: return false;
760: }
761:
762: try {
763: readAhead(false);
764: } catch (Exception e) {
765: return false;
766: }
767:
768: if (error) {
769: return false;
770: }
771:
772: return (state == STATE_DONE);
773: }
774:
775: }
|