001: package net.sf.thingamablog;
002:
003: import java.io.FilterInputStream;
004: import java.io.IOException;
005: import java.io.InputStream;
006: import java.io.InterruptedIOException;
007:
008: /**
009: * Wraps an input stream that blocks indefinitely to simulate timeouts on read(), skip(), and close(). The resulting
010: * input stream is buffered and supports retrying operations that failed due to an InterruptedIOException. Supports
011: * resuming partially completed operations after an InterruptedIOException REGARDLESS of whether the underlying stream
012: * does unless the underlying stream itself generates InterruptedIOExceptions in which case it must also support
013: * resuming. Check the bytesTransferred field to determine how much of the operation completed; conversely, at what
014: * point to resume.
015: */
016:
017: public class TimeoutInputStream extends FilterInputStream {
018:
019: private final long readTimeout;
020: private final long closeTimeout;
021: private boolean closeRequested = false;
022: private Thread thread;
023: private byte[] iobuffer;
024: private int head = 0;
025: private int length = 0;
026: private IOException ioe = null;
027: private boolean waitingForClose = false;
028: private boolean growWhenFull = false;
029:
030: /**
031: * Creates a timeout wrapper for an input stream.
032: * @param in the underlying input stream
033: * @param bufferSize the buffer size in bytes; should be large enough to mitigate Thread synchronization and context
034: * switching overhead
035: * @param readTimeout the number of milliseconds to block for a read() or skip() before throwing an
036: * InterruptedIOException; blocks indefinitely
037: * @param closeTimeout the number of milliseconds to block for a close() before throwing an InterruptedIOException;
038: * blocks indefinitely, -1 closes the stream in the background
039: */
040:
041: public TimeoutInputStream(InputStream in, int bufferSize,
042: long readTimeout, long closeTimeout) {
043: super (in);
044: this .readTimeout = readTimeout;
045: this .closeTimeout = closeTimeout;
046: this .iobuffer = new byte[bufferSize];
047:
048: thread = new Thread(new Runnable() {
049: public void run() {
050: runThread();
051: }
052:
053: }, "TimeoutInputStream");
054:
055: thread.setDaemon(true);
056: thread.start();
057: }
058:
059: public TimeoutInputStream(InputStream in, int bufferSize,
060: long readTimeout, long closeTimeout, boolean growWhenFull) {
061: this (in, bufferSize, readTimeout, closeTimeout);
062: this .growWhenFull = growWhenFull;
063: }
064:
065: /**
066: * Wraps the underlying stream's method.
067: * It may be important to wait for a stream to actually be closed because
068: * it holds an implicit lock on a system resoure (such as a file) while it is open.
069: * Closing a stream may take time if the underlying stream is still servicing a previous request.
070: * @throws InterruptedIOException if the timeout expired
071: * @throws IOException if an i/o error occurs
072: */
073:
074: public void close() throws IOException {
075: Thread oldThread;
076:
077: synchronized (this ) {
078: if (thread == null)
079: return;
080:
081: oldThread = thread;
082: closeRequested = true;
083: thread.interrupt();
084: checkError();
085: }
086:
087: if (closeTimeout == -1)
088: return;
089:
090: try {
091: oldThread.join(closeTimeout);
092: } catch (InterruptedException e) {
093: Thread.currentThread().interrupt();
094: }
095:
096: synchronized (this ) {
097: checkError();
098:
099: if (thread != null)
100: throw new InterruptedIOException();
101: }
102: }
103:
104: /**
105: * Returns the number of unread bytes in the buffer.
106: * @throws IOException if an i/o error occurs
107: */
108:
109: public synchronized int available() throws IOException {
110: if (length == 0)
111: checkError();
112:
113: return length > 0 ? length : 0;
114: }
115:
116: /**
117: * Reads a byte from the stream.
118: * @throws InterruptedIOException if the timeout expired and no data was received,
119: * bytesTransferred will be zero
120: *
121: * @throws IOException if an i/o error occurs
122: */
123:
124: public synchronized int read() throws IOException {
125: if (!syncFill())
126: return -1;
127:
128: int b = iobuffer[head++] & 255;
129:
130: if (head == iobuffer.length)
131: head = 0;
132:
133: length--;
134: notify();
135: return b;
136: }
137:
138: /**
139: * Reads multiple bytes from the stream.
140: * @throws InterruptedIOException if the timeout expired and no data was received,
141: * bytesTransferred will be zero
142: * @throws IOException if an i/o error occurs
143: */
144:
145: public synchronized int read(byte[] buffer, int off, int len)
146: throws IOException {
147: if (!syncFill())
148: return -1;
149:
150: int pos = off;
151: if (len > length)
152: len = length;
153:
154: while (len-- > 0) {
155: buffer[pos++] = iobuffer[head++];
156: if (head == iobuffer.length)
157: head = 0;
158: length--;
159: }
160:
161: notify();
162: return pos - off;
163: }
164:
165: /**
166: * Skips multiple bytes in the stream.
167: * @throws InterruptedIOException if the timeout expired before all of the
168: * bytes specified have been skipped,
169: * bytesTransferred may be non-zero
170: * @throws IOException if an i/o error occurs
171: */
172:
173: public synchronized long skip(long count) throws IOException {
174: long amount = 0;
175: try {
176: do {
177: if (!syncFill())
178: break;
179:
180: int skip = (int) Math.min(count - amount, length);
181: head = (head + skip) % iobuffer.length;
182: length -= skip;
183: amount += skip;
184: } while (amount < count);
185: } catch (InterruptedIOException e) {
186: e.bytesTransferred = (int) amount;
187: throw e;
188: }
189:
190: notify();
191: return amount;
192: }
193:
194: /**
195: * Mark is not supported by the wrapper even if the underlying stream does, returns false.
196: */
197:
198: public boolean markSupported() {
199: return false;
200: }
201:
202: /**
203: * Waits for the buffer to fill if it is empty and the stream has not reached EOF.
204: * @return true if bytes are available, false if EOF has been reached
205: * @throws InterruptedIOException if EOF not reached but no bytes are available
206: */
207:
208: private boolean syncFill() throws IOException {
209: if (length != 0)
210: return true;
211:
212: checkError();
213:
214: if (waitingForClose)
215: return false;
216:
217: notify();
218:
219: try {
220: wait(readTimeout);
221: } catch (InterruptedException e) {
222: Thread.currentThread().interrupt();
223: }
224:
225: if (length != 0)
226: return true;
227:
228: checkError();
229:
230: if (waitingForClose)
231: return false;
232:
233: throw new InterruptedIOException();
234: }
235:
236: /**
237: * If an exception is pending, throw it.
238: */
239: private void checkError() throws IOException {
240: if (ioe != null) {
241: IOException e = ioe;
242: ioe = null;
243: throw e;
244: }
245: }
246:
247: /**
248: * Runs the thread in the background.
249: */
250:
251: private void runThread() {
252: try {
253: readUntilDone();
254: } catch (IOException e) {
255: synchronized (this ) {
256: ioe = e;
257: }
258: } finally {
259: waitUntilClosed();
260: try {
261: in.close();
262: } catch (IOException e) {
263: synchronized (this ) {
264: ioe = e;
265: }
266: } finally {
267: synchronized (this ) {
268: thread = null;
269: notify();
270: }
271: }
272: }
273: }
274:
275: /**
276: * Waits until we have been requested to close the stream.
277: */
278: private synchronized void waitUntilClosed() {
279: waitingForClose = true;
280: notify();
281:
282: while (!closeRequested) {
283: try {
284: wait();
285: } catch (InterruptedException e) {
286: closeRequested = true;
287: }
288: }
289: }
290:
291: /**
292: * Reads bytes into the buffer until EOF, closed, or error.
293: */
294:
295: private void readUntilDone() throws IOException {
296: for (;;) {
297: int off;
298: int len;
299:
300: synchronized (this ) {
301: while (isBufferFull()) {
302: if (closeRequested)
303: return;
304:
305: waitForRead();
306: }
307:
308: off = (head + length) % iobuffer.length;
309: len = ((head > off) ? head : iobuffer.length) - off;
310: }
311:
312: int count;
313: try {
314: count = in.read(iobuffer, off, len);
315: if (count == -1)
316: return;
317: } catch (InterruptedIOException e) {
318: count = e.bytesTransferred;
319: }
320:
321: synchronized (this ) {
322: length += count;
323: notify();
324: }
325: }
326: }
327:
328: private synchronized void waitForRead() {
329: try {
330: if (growWhenFull) {
331: wait(readTimeout);
332: } else {
333: wait();
334: }
335: } catch (InterruptedException e) {
336: closeRequested = true;
337: }
338:
339: if (growWhenFull && isBufferFull()) {
340: growBuffer();
341: }
342: }
343:
344: private synchronized void growBuffer() {
345: int newSize = 2 * iobuffer.length;
346: if (newSize > iobuffer.length) {
347: if (true) {
348: System.out.println("InputStream growing to " + newSize
349: + " bytes");
350: }
351:
352: byte[] newBuffer = new byte[newSize];
353:
354: int pos = 0;
355: int len = length;
356:
357: while (len-- > 0) {
358: newBuffer[pos++] = iobuffer[head++];
359: if (head == iobuffer.length)
360: head = 0;
361: }
362:
363: iobuffer = newBuffer;
364: head = 0;
365: }
366: }
367:
368: private boolean isBufferFull() {
369: return length == iobuffer.length;
370: }
371:
372: }
|