001: /*
002: * PollerInputStream.java November 2005
003: *
004: * Copyright (C) 2005, Niall Gallagher <niallg@users.sf.net>
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation.
009: *
010: * This library is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013: * GNU Lesser General Public License for more details.
014: *
015: * You should have received a copy of the GNU Lesser General
016: * Public License along with this library; if not, write to the
017: * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
018: * Boston, MA 02111-1307 USA
019: */
020:
021: package simple.http;
022:
023: import java.io.InterruptedIOException;
024: import java.io.InputStream;
025: import java.io.IOException;
026: import java.net.Socket;
027:
028: /**
029: * The <code>PollerInputStream</code> object is used to poll a HTTP
030: * pipeline for message headers. This is used so that a non-blocking
031: * mechanism for reading can be established. This acts much like the
032: * implementation of the <code>PushbackInputStream</code>, except
033: * that it is optimised to unread frequently.
034: * <p>
035: * The need to unread, or reset, the bytes read from the stream is
036: * so that data can be read in reasonable size chunks. This however
037: * can cause the read to over shoot the end of the HTTP request
038: * header. So, to ensure that the next request is not corrupted it
039: * needs to be pushed back into the stream for use in the next poll.
040: * <p>
041: * This is a specialised stream and provides a distinct behaviour.
042: * In essence it acts much like an <code>Iterator</code> as it
043: * requires the <code>available</code> method to be used in the
044: * same manner as an iterator would requires <code>hasMore</code>.
045: * The available method reads from the underlying input stream and
046: * fills a buffer, which is used in the next read invocation. If
047: * the <code>available</code> method is not used the read cannot
048: * be pushed back onto the stream via a <code>reset</code> call.
049: *
050: * @author Niall Gallagher
051: *
052: * @see simple.http.Poller
053: */
054: final class PollerInputStream extends InputStream {
055:
056: /**
057: * This streams provides the source for the HTTP message.
058: */
059: private InputStream data;
060:
061: /**
062: * This socket represents the connection to the client.
063: */
064: private Socket sock;
065:
066: /**
067: * This is used to accumulate data from the HTTP message.
068: */
069: private byte[] buf;
070:
071: /**
072: * This is the number of bytes currently accumulated.
073: */
074: private int size;
075:
076: /**
077: * This is the offset within the buffer to read from.
078: */
079: private int pos;
080:
081: /**
082: * Constructor for the <code>PollerInputStream</code> object. This
083: * creates a stream that can be used to acquire bytes from the
084: * provided pipeline in such a way that reads can be rolled back.
085: * This constructor imposes a maximum read size of 1024 bytes.
086: *
087: * @param pipe this is the pipeline that will be polled
088: */
089: public PollerInputStream(Pipeline pipe) throws IOException {
090: this (pipe, 1024);
091: }
092:
093: /**
094: * Constructor for the <code>PollerInputStream</code> object. This
095: * creates a stream that can be used to acquire bytes from the
096: * provided pipeline in such a way that reads can be rolled back.
097: * This constructor imposes the maximum read size specified.
098: *
099: * @param pipe this is the pipeline that will be polled
100: * @param size this is the maximum read size for this stream
101: */
102: public PollerInputStream(Pipeline pipe, int size)
103: throws IOException {
104: this .data = pipe.getInputStream();
105: this .buf = new byte[size * 2];
106: this .sock = pipe.sock;
107: }
108:
109: /**
110: * This method is used to read from the underlying stream. If the
111: * data requires resetting then the <code>available</code> method
112: * should be used before using this. If this is not done then the
113: * data read may not be resettable. This method may block if the
114: * buffer contains no data, this is up to the underlying stream.
115: *
116: * @return this returns the next byte read from the stream
117: */
118: public int read() throws IOException {
119: if (size > 0) {
120: size--;
121: return buf[pos++] & 0xff;
122: }
123: return data.read();
124: }
125:
126: /**
127: * This method is used to read from the underlying stream. If the
128: * data requires resetting then the <code>available</code> method
129: * should be used before using this. If this is not done then the
130: * data read may not be resettable. This method may block if the
131: * buffer contains no data, this is up to the underlying stream.
132: * <p>
133: * If the <code>available</code> method is not called before this
134: * method then this may have an empty buffer, in which case the
135: * underlying stream is used to acquire the requested bytes. This
136: * is ensures that bytes can always be read using this stream.
137: *
138: * @param b this is the bytes buffer to read the bytes into
139: * @param off this is the offset within the buffer to start
140: * @param len this is the maximum number of bytes to read
141: *
142: * @return this returns the next byte read from the stream
143: */
144: public int read(byte[] b, int off, int len) throws IOException {
145: int last = pos + size;
146:
147: if (pos < last) {
148: int min = Math.min(len, size);
149:
150: if (min > 0) {
151: System.arraycopy(buf, pos, b, off, min);
152: }
153: size -= min;
154: pos += min;
155:
156: return min;
157: }
158: return data.read(b, off, len);
159: }
160:
161: /**
162: * This provides the primary means of reading data from the
163: * underlying HTTP stream. This method ensures that regardless of
164: * the input stream implementation, the available method will
165: * always indicate whether there are bytes ready for reading.
166: * <p>
167: * This performs a <code>read</code> from the underlying stream,
168: * so that if there are bytes read the available method will
169: * indicate the number of bytes buffered. This generally will
170: * not block, however the <code>available(int)</code> method can
171: * be used to provide a timeout for a blocking socket read.
172: *
173: * @return this returns the number of bytes that can be read
174: */
175: public int available() throws IOException {
176: return available(1);
177: }
178:
179: /**
180: * This provides the primary means of reading data from the
181: * underlying HTTP stream. This method ensures that regardless of
182: * the input stream implementation, the available method will
183: * always indicate whether there are bytes ready for reading.
184: * <p>
185: * This performs a <code>read</code> from the underlying stream,
186: * so that if there are bytes read the available method will
187: * indicate the number of bytes buffered. This generally will
188: * not block, however a timeout can be specified which imposes
189: * a maximum length of time the read will block for.
190: *
191: * @param timeout this is the maximum blocking time imposed
192: *
193: * @return this returns the number of bytes that can be read
194: */
195: public int available(int timeout) throws IOException {
196: if (size > 0) {
197: return size;
198: }
199: return peek(timeout);
200: }
201:
202: /**
203: * This method is used to read and buffer data from the underlying
204: * input stream. This ensures that the <code>available</code>
205: * method can work correctly regardless of the implementation of
206: * the underlying stream. For instance take JSSE streams. In
207: * general these will return zero for the number of available
208: * bytes. This is not very useful when trying to perform polling.
209: *
210: * @return this returns the number of bytes that are buffered
211: */
212: private int peek() throws IOException {
213: try {
214: int free = buf.length - (pos + size);
215:
216: if (pos > buf.length / 4) {
217: free += compact();
218: }
219: int last = pos + size;
220: int read = data.read(buf, last, free);
221:
222: if (read > 0) {
223: size += read;
224: }
225: if (read < 0) {
226: close();
227: }
228: } catch (InterruptedIOException e) {
229: }
230: return size;
231: }
232:
233: /**
234: * This method is used to read and buffer data from the underlying
235: * input stream. This ensures that the <code>available</code>
236: * method can work correctly regardless of the implementation of
237: * the underlying stream. For instance take JSSE streams. In
238: * general these will return zero for the number of available
239: * bytes. This is not very useful when trying to perform polling.
240: * <p>
241: * A timeout can be specified to this method will allows a maximum
242: * blocking time to be imposed. This is useful when the underlying
243: * stream is suspected of blocking on a read operation.
244: *
245: * @param timeout this is the maximum blocking time imposed
246: *
247: * @return this returns the number of bytes that are buffered
248: */
249: private int peek(int timeout) throws IOException {
250: int wait = sock.getSoTimeout();
251:
252: if (timeout > 0) {
253: sock.setSoTimeout(timeout);
254: }
255: int size = peek();
256:
257: if (size != -1) {
258: sock.setSoTimeout(wait);
259: }
260: return size;
261: }
262:
263: /**
264: * When the buffer has used up most of the available space this
265: * method will ensure that existing unread data is moved closer to
266: * the start of the buffer. This ensures that there is more space
267: * with which to copy further data. This will return the number of
268: * extra bytes available after the compacting has completed.
269: *
270: * @return this returns the number of bytes that were freed
271: */
272: private int compact() {
273: int space = pos;
274:
275: if (pos > 0) {
276: System.arraycopy(buf, pos, buf, 0, size);
277: pos = 0;
278: }
279: return space;
280: }
281:
282: /**
283: * This will perform a rollback from the previous read. This is
284: * used to that if a read over shoots the end of the HTTP header
285: * the bytes that form the next header can be pushed back into
286: * the stream. This is used after a <code>read</code> invocation
287: * only. If two reads are performed this can rollback only the
288: * last, and only up to the number of bytes most recently read.
289: *
290: * @param count this is the number of bytes to rollback
291: *
292: * @return returns the number of bytes that were rolled back
293: */
294: public int reset(int count) throws IOException {
295: int mark = pos;
296:
297: if (pos - count > 0) {
298: size += count;
299: pos -= count;
300: } else {
301: size += pos;
302: pos = 0;
303: }
304: return mark - pos;
305: }
306:
307: /**
308: * This will close the underlying stream. This closes the stream
309: * in such a way that any previously buffered bytes can be read,
310: * however no further bytes can be buffered or read from the
311: * underlying input stream due to exceptions being thrown.
312: */
313: public void close() throws IOException {
314: data.close();
315: }
316:
317: }
|