001: /*
002: * @(#)PipedInputStream.java 1.38 06/10/10
003: *
004: * Copyright 1990-2006 Sun Microsystems, Inc. All Rights Reserved.
005: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER
006: *
007: * This program is free software; you can redistribute it and/or
008: * modify it under the terms of the GNU General Public License version
009: * 2 only, as published by the Free Software Foundation.
010: *
011: * This program is distributed in the hope that it will be useful, but
012: * WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * General Public License version 2 for more details (a copy is
015: * included at /legal/license.txt).
016: *
017: * You should have received a copy of the GNU General Public License
018: * version 2 along with this work; if not, write to the Free Software
019: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA
021: *
022: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa
023: * Clara, CA 95054 or visit www.sun.com if you need additional
024: * information or have any questions.
025: *
026: */
027:
028: package java.io;
029:
030: /**
031: * A piped input stream should be connected
032: * to a piped output stream; the piped input
033: * stream then provides whatever data bytes
034: * are written to the piped output stream.
035: * Typically, data is read from a <code>PipedInputStream</code>
036: * object by one thread and data is written
037: * to the corresponding <code>PipedOutputStream</code>
038: * by some other thread. Attempting to use
039: * both objects from a single thread is not
040: * recommended, as it may deadlock the thread.
041: * The piped input stream contains a buffer,
042: * decoupling read operations from write operations,
043: * within limits.
044: *
045: * @author James Gosling
046: * @version 1.30, 02/02/00
047: * @see java.io.PipedOutputStream
048: * @since JDK1.0
049: */
050: public class PipedInputStream extends InputStream {
051: boolean closedByWriter = false;
052: boolean closedByReader = false;
053: boolean connected = false;
054:
055: /* NOTE: identification of the read and write sides needs to be
056: more sophisticated. Either using thread groups (but what about
057: pipes within a thread?) or using finalization (but it may be a
058: long time until the next GC). */
059: Thread readSide;
060: Thread writeSide;
061:
062: /**
063: * The size of the pipe's circular input buffer.
064: * @since JDK1.1
065: */
066: protected static final int PIPE_SIZE = 1024;
067:
068: /**
069: * The circular buffer into which incoming data is placed.
070: * @since JDK1.1
071: */
072: protected byte buffer[] = new byte[PIPE_SIZE];
073:
074: /**
075: * The index of the position in the circular buffer at which the
076: * next byte of data will be stored when received from the connected
077: * piped output stream. <code>in<0</code> implies the buffer is empty,
078: * <code>in==out</code> implies the buffer is full
079: * @since JDK1.1
080: */
081: protected int in = -1;
082:
083: /**
084: * The index of the position in the circular buffer at which the next
085: * byte of data will be read by this piped input stream.
086: * @since JDK1.1
087: */
088: protected int out = 0;
089:
090: /**
091: * Creates a <code>PipedInputStream</code> so
092: * that it is connected to the piped output
093: * stream <code>src</code>. Data bytes written
094: * to <code>src</code> will then be available
095: * as input from this stream.
096: *
097: * @param src the stream to connect to.
098: * @exception IOException if an I/O error occurs.
099: */
100: public PipedInputStream(PipedOutputStream src) throws IOException {
101: connect(src);
102: }
103:
104: /**
105: * Creates a <code>PipedInputStream</code> so
106: * that it is not yet connected. It must be
107: * connected to a <code>PipedOutputStream</code>
108: * before being used.
109: *
110: * @see java.io.PipedInputStream#connect(java.io.PipedOutputStream)
111: * @see java.io.PipedOutputStream#connect(java.io.PipedInputStream)
112: */
113: public PipedInputStream() {
114: }
115:
116: /**
117: * Causes this piped input stream to be connected
118: * to the piped output stream <code>src</code>.
119: * If this object is already connected to some
120: * other piped output stream, an <code>IOException</code>
121: * is thrown.
122: * <p>
123: * If <code>src</code> is an
124: * unconnected piped output stream and <code>snk</code>
125: * is an unconnected piped input stream, they
126: * may be connected by either the call:
127: * <p>
128: * <pre><code>snk.connect(src)</code> </pre>
129: * <p>
130: * or the call:
131: * <p>
132: * <pre><code>src.connect(snk)</code> </pre>
133: * <p>
134: * The two
135: * calls have the same effect.
136: *
137: * @param src The piped output stream to connect to.
138: * @exception IOException if an I/O error occurs.
139: */
140: public void connect(PipedOutputStream src) throws IOException {
141: src.connect(this );
142: }
143:
144: /**
145: * Receives a byte of data. This method will block if no input is
146: * available.
147: * @param b the byte being received
148: * @exception IOException If the pipe is broken.
149: * @since JDK1.1
150: */
151: protected synchronized void receive(int b) throws IOException {
152: if (!connected) {
153: throw new IOException("Pipe not connected");
154: } else if (closedByWriter || closedByReader) {
155: throw new IOException("Pipe closed");
156: } else if (readSide != null && !readSide.isAlive()) {
157: throw new IOException("Read end dead");
158: }
159:
160: writeSide = Thread.currentThread();
161: while (in == out) {
162: if ((readSide != null) && !readSide.isAlive()) {
163: throw new IOException("Pipe broken");
164: }
165: /* full: kick any waiting readers */
166: notifyAll();
167: try {
168: wait(1000);
169: } catch (InterruptedException ex) {
170: throw new java.io.InterruptedIOException();
171: }
172: }
173: if (in < 0) {
174: in = 0;
175: out = 0;
176: }
177: buffer[in++] = (byte) (b & 0xFF);
178: if (in >= buffer.length) {
179: in = 0;
180: }
181: }
182:
183: /**
184: * Receives data into an array of bytes. This method will
185: * block until some input is available.
186: * @param b the buffer into which the data is received
187: * @param off the start offset of the data
188: * @param len the maximum number of bytes received
189: * @return the actual number of bytes received, -1 is
190: * returned when the end of the stream is reached.
191: * @exception IOException If an I/O error has occurred.
192: */
193: synchronized void receive(byte b[], int off, int len)
194: throws IOException {
195: while (--len >= 0) {
196: receive(b[off++]);
197: }
198: }
199:
200: /**
201: * Notifies all waiting threads that the last byte of data has been
202: * received.
203: */
204: synchronized void receivedLast() {
205: closedByWriter = true;
206: notifyAll();
207: }
208:
209: /**
210: * Reads the next byte of data from this piped input stream. The
211: * value byte is returned as an <code>int</code> in the range
212: * <code>0</code> to <code>255</code>. If no byte is available
213: * because the end of the stream has been reached, the value
214: * <code>-1</code> is returned. This method blocks until input data
215: * is available, the end of the stream is detected, or an exception
216: * is thrown.
217: * If a thread was providing data bytes
218: * to the connected piped output stream, but
219: * the thread is no longer alive, then an
220: * <code>IOException</code> is thrown.
221: *
222: * @return the next byte of data, or <code>-1</code> if the end of the
223: * stream is reached.
224: * @exception IOException if the pipe is broken.
225: */
226: public synchronized int read() throws IOException {
227: if (!connected) {
228: throw new IOException("Pipe not connected");
229: } else if (closedByReader) {
230: throw new IOException("Pipe closed");
231: } else if (writeSide != null && !writeSide.isAlive()
232: && !closedByWriter && (in < 0)) {
233: throw new IOException("Write end dead");
234: }
235:
236: readSide = Thread.currentThread();
237: int trials = 2;
238: while (in < 0) {
239: if (closedByWriter) {
240: /* closed by writer, return EOF */
241: return -1;
242: }
243: if ((writeSide != null) && (!writeSide.isAlive())
244: && (--trials < 0)) {
245: throw new IOException("Pipe broken");
246: }
247: /* might be a writer waiting */
248: notifyAll();
249: try {
250: wait(1000);
251: } catch (InterruptedException ex) {
252: throw new java.io.InterruptedIOException();
253: }
254: }
255: int ret = buffer[out++] & 0xFF;
256: if (out >= buffer.length) {
257: out = 0;
258: }
259: if (in == out) {
260: /* now empty */
261: in = -1;
262: }
263: return ret;
264: }
265:
266: /**
267: * Reads up to <code>len</code> bytes of data from this piped input
268: * stream into an array of bytes. Less than <code>len</code> bytes
269: * will be read if the end of the data stream is reached. This method
270: * blocks until at least one byte of input is available.
271: * If a thread was providing data bytes
272: * to the connected piped output stream, but
273: * the thread is no longer alive, then an
274: * <code>IOException</code> is thrown.
275: *
276: * @param b the buffer into which the data is read.
277: * @param off the start offset of the data.
278: * @param len the maximum number of bytes read.
279: * @return the total number of bytes read into the buffer, or
280: * <code>-1</code> if there is no more data because the end of
281: * the stream has been reached.
282: * @exception IOException if an I/O error occurs.
283: */
284: public synchronized int read(byte b[], int off, int len)
285: throws IOException {
286: if (b == null) {
287: throw new NullPointerException();
288: } else if ((off < 0) || (off > b.length) || (len < 0)
289: || ((off + len) > b.length) || ((off + len) < 0)) {
290: throw new IndexOutOfBoundsException();
291: } else if (len == 0) {
292: return 0;
293: }
294:
295: /* possibly wait on the first character */
296: int c = read();
297: if (c < 0) {
298: return -1;
299: }
300: b[off] = (byte) c;
301: int rlen = 1;
302: while ((in >= 0) && (--len > 0)) {
303: b[off + rlen] = buffer[out++];
304: rlen++;
305: if (out >= buffer.length) {
306: out = 0;
307: }
308: if (in == out) {
309: /* now empty */
310: in = -1;
311: }
312: }
313: return rlen;
314: }
315:
316: /**
317: * Returns the number of bytes that can be read from this input
318: * stream without blocking. This method overrides the <code>available</code>
319: * method of the parent class.
320: *
321: * @return the number of bytes that can be read from this input stream
322: * without blocking.
323: * @exception IOException if an I/O error occurs.
324: * @since JDK1.0.2
325: */
326: public synchronized int available() throws IOException {
327: if (in < 0)
328: return 0;
329: else if (in == out)
330: return buffer.length;
331: else if (in > out)
332: return in - out;
333: else
334: return in + buffer.length - out;
335: }
336:
337: /**
338: * Closes this piped input stream and releases any system resources
339: * associated with the stream.
340: *
341: * @exception IOException if an I/O error occurs.
342: */
343: public void close() throws IOException {
344: in = -1;
345: closedByReader = true;
346: }
347: }
|