001: /*
002: * @(#)RespInputStream.java 0.3-2 18/06/1999
003: *
004: * This file is part of the HTTPClient package
005: * Copyright (C) 1996-1999 Ronald Tschalär
006: *
007: * This library is free software; you can redistribute it and/or
008: * modify it under the terms of the GNU Lesser General Public
009: * License as published by the Free Software Foundation; either
010: * version 2 of the License, or (at your option) any later version.
011: *
012: * This library is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this library; if not, write to the Free
019: * Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
020: * MA 02111-1307, USA
021: *
022: * For questions, suggestions, bug-reports, enhancement-requests etc.
023: * I may be contacted at:
024: *
025: * ronald@innovation.ch
026: *
027: */
028:
029: package HTTPClient;
030:
031: import java.io.InputStream;
032: import java.io.IOException;
033: import java.io.InterruptedIOException;
034:
035: /**
036: * This is the InputStream that gets returned to the user. The extensions
037: * consist of the capability to have the data pushed into a buffer if the
038: * stream demux needs to.
039: *
040: * @version 0.3-2 18/06/1999
041: * @author Ronald Tschalär
042: * @since V0.2
043: */
044: final class RespInputStream extends InputStream implements
045: GlobalConstants {
046: /** the stream demultiplexor */
047: private StreamDemultiplexor demux = null;
048:
049: /** our response handler */
050: private ResponseHandler resph;
051:
052: /** signals that the user has closed the stream and will therefore
053: not read any further data */
054: boolean closed = false;
055:
056: /** signals that the connection may not be closed prematurely */
057: private boolean dont_truncate = false;
058:
059: /** this buffer is used to buffer data that the demux has to get rid of */
060: private byte[] buffer = null;
061:
062: /** signals that we were interrupted and that the buffer is not complete */
063: private boolean interrupted = false;
064:
065: /** the offset at which the unread data starts in the buffer */
066: private int offset = 0;
067:
068: /** the end of the data in the buffer */
069: private int end = 0;
070:
071: /** the total number of bytes of entity data read from the demux so far */
072: int count = 0;
073:
074: // Constructors
075:
076: RespInputStream(StreamDemultiplexor demux, ResponseHandler resph) {
077: this .demux = demux;
078: this .resph = resph;
079: }
080:
081: // public Methods
082:
083: private byte[] ch = new byte[1];
084:
085: /**
086: * Reads a single byte.
087: *
088: * @return the byte read, or -1 if EOF.
089: * @exception IOException if any exception occured on the connection.
090: */
091: public synchronized int read() throws IOException {
092: int rcvd = read(ch, 0, 1);
093: if (rcvd == 1)
094: return ch[0] & 0xff;
095: else
096: return -1;
097: }
098:
099: /**
100: * Reads <var>len</var> bytes into <var>b</var>, starting at offset
101: * <var>off</var>.
102: *
103: * @return the number of bytes actually read, or -1 if EOF.
104: * @exception IOException if any exception occured on the connection.
105: */
106: public synchronized int read(byte[] b, int off, int len)
107: throws IOException {
108: if (closed)
109: return -1;
110:
111: int left = end - offset;
112: if (buffer != null && !(left == 0 && interrupted)) {
113: if (left == 0)
114: return -1;
115:
116: len = (len > left ? left : len);
117: System.arraycopy(buffer, offset, b, off, len);
118: offset += len;
119:
120: return len;
121: } else {
122: if (DebugDemux) {
123: if (resph.resp.cd_type != CD_HDRS)
124: System.err.println("RspIS: Reading stream "
125: + this .hashCode() + " ("
126: + Thread.currentThread() + ")");
127: }
128:
129: int rcvd;
130: if (resph.resp.cd_type == CD_HDRS)
131: rcvd = demux.read(b, off, len, resph,
132: resph.resp.timeout);
133: else
134: rcvd = demux.read(b, off, len, resph, 0);
135: if (rcvd != -1 && resph.resp.got_headers)
136: count += rcvd;
137:
138: return rcvd;
139: }
140: }
141:
142: /**
143: * skips <var>num</var> bytes.
144: *
145: * @return the number of bytes actually skipped.
146: * @exception IOException if any exception occured on the connection.
147: */
148: public synchronized long skip(long num) throws IOException {
149: if (closed)
150: return 0;
151:
152: int left = end - offset;
153: if (buffer != null && !(left == 0 && interrupted)) {
154: num = (num > left ? left : num);
155: offset += num;
156: return num;
157: } else {
158: long skpd = demux.skip(num, resph);
159: if (resph.resp.got_headers)
160: count += skpd;
161: return skpd;
162: }
163: }
164:
165: /**
166: * gets the number of bytes available for reading without blocking.
167: *
168: * @return the number of bytes available.
169: * @exception IOException if any exception occured on the connection.
170: */
171: public synchronized int available() throws IOException {
172: if (closed)
173: return 0;
174:
175: if (buffer != null && !(end - offset == 0 && interrupted))
176: return end - offset;
177: else
178: return demux.available(resph);
179: }
180:
181: /**
182: * closes the stream.
183: *
184: * @exception if any exception occured on the connection before or
185: * during close.
186: */
187: public synchronized void close() throws IOException {
188: if (!closed) {
189: closed = true;
190:
191: if (dont_truncate && (buffer == null || interrupted))
192: readAll(resph.resp.timeout);
193:
194: if (DebugDemux)
195: System.err.println("RspIS: User closed stream "
196: + hashCode() + " (" + Thread.currentThread()
197: + ")");
198:
199: demux.closeSocketIfAllStreamsClosed();
200:
201: if (dont_truncate) {
202: try {
203: resph.resp.http_resp.invokeTrailerHandlers(false);
204: } catch (ModuleException me) {
205: throw new IOException(me.toString());
206: }
207: }
208: }
209: }
210:
211: /**
212: * A safety net to clean up.
213: */
214: protected void finalize() throws Throwable {
215: try {
216: close();
217: } finally {
218: super .finalize();
219: }
220: }
221:
222: // local Methods
223:
224: /**
225: * Reads all remainings data into buffer. This is used to force a read
226: * of upstream responses.
227: *
228: * <P>This is probably the most tricky and buggy method around. It's the
229: * only one that really violates the strict top-down method invocation
230: * from the Response through the ResponseStream to the StreamDemultiplexor.
231: * This means we need to be awfully careful about what is synchronized
232: * and what parameters are passed to whom.
233: *
234: * @param timeout the timeout to use for reading from the demux
235: * @exception IOException If any exception occurs while reading stream.
236: */
237: void readAll(int timeout) throws IOException {
238: if (DebugDemux)
239: System.err.println("RspIS: Read-all on stream "
240: + this .hashCode() + " (" + Thread.currentThread()
241: + ")");
242:
243: synchronized (resph.resp) {
244: if (!resph.resp.got_headers) // force headers to be read
245: {
246: int sav_to = resph.resp.timeout;
247: resph.resp.timeout = timeout;
248: resph.resp.getStatusCode();
249: resph.resp.timeout = sav_to;
250: }
251: }
252:
253: synchronized (this ) {
254: if (buffer != null && !interrupted)
255: return;
256:
257: int rcvd = 0;
258: try {
259: if (closed) // throw away
260: {
261: buffer = new byte[10000];
262: do {
263: count += rcvd;
264: rcvd = demux.read(buffer, 0, buffer.length,
265: resph, timeout);
266: } while (rcvd != -1);
267: buffer = null;
268: } else {
269: if (buffer == null) {
270: buffer = new byte[10000];
271: offset = 0;
272: end = 0;
273: }
274:
275: do {
276: rcvd = demux.read(buffer, end, buffer.length
277: - end, resph, timeout);
278: if (rcvd < 0)
279: break;
280:
281: count += rcvd;
282: end += rcvd;
283: buffer = Util.resizeArray(buffer, end + 10000);
284: } while (true);
285: }
286: } catch (InterruptedIOException iioe) {
287: interrupted = true;
288: throw iioe;
289: } catch (IOException ioe) {
290: buffer = null; // force a read on demux for exception
291: }
292:
293: interrupted = false;
294: }
295: }
296:
297: /**
298: * Sometime the full response body must be read, i.e. the connection may
299: * not be closed prematurely (by us). Currently this is needed when the
300: * chunked encoding with trailers is used in a response.
301: */
302: synchronized void dontTruncate() {
303: dont_truncate = true;
304: }
305: }
|