001: /*
002: * @(#)PipedReader.java 1.20 06/10/10
003: *
004: * Copyright 1990-2006 Sun Microsystems, Inc. All Rights Reserved.
005: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER
006: *
007: * This program is free software; you can redistribute it and/or
008: * modify it under the terms of the GNU General Public License version
009: * 2 only, as published by the Free Software Foundation.
010: *
011: * This program is distributed in the hope that it will be useful, but
012: * WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * General Public License version 2 for more details (a copy is
015: * included at /legal/license.txt).
016: *
017: * You should have received a copy of the GNU General Public License
018: * version 2 along with this work; if not, write to the Free Software
019: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA
021: *
022: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa
023: * Clara, CA 95054 or visit www.sun.com if you need additional
024: * information or have any questions.
025: *
026: */
027:
028: package java.io;
029:
030: /**
031: * Piped character-input streams.
032: *
033: * @version 1.12, 00/02/02
034: * @author Mark Reinhold
035: * @since JDK1.1
036: */
037:
038: public class PipedReader extends Reader {
039: boolean closedByWriter = false;
040: boolean closedByReader = false;
041: boolean connected = false;
042:
043: /* Identification of the read and write sides needs to be
044: more sophisticated. Either using thread groups (but what about
045: pipes within a thread?) or using finalization (but it may be a
046: long time until the next GC). */
047: Thread readSide;
048: Thread writeSide;
049:
050: /**
051: * The size of the pipe's circular input buffer.
052: */
053: static final int PIPE_SIZE = 1024;
054:
055: /**
056: * The circular buffer into which incoming data is placed.
057: */
058: char buffer[] = new char[PIPE_SIZE];
059:
060: /**
061: * The index of the position in the circular buffer at which the
062: * next character of data will be stored when received from the connected
063: * piped writer. <code>in<0</code> implies the buffer is empty,
064: * <code>in==out</code> implies the buffer is full
065: */
066: int in = -1;
067:
068: /**
069: * The index of the position in the circular buffer at which the next
070: * character of data will be read by this piped reader.
071: */
072: int out = 0;
073:
074: /**
075: * Creates a <code>PipedReader</code> so
076: * that it is connected to the piped writer
077: * <code>src</code>. Data written to <code>src</code>
078: * will then be available as input from this stream.
079: *
080: * @param src the stream to connect to.
081: * @exception IOException if an I/O error occurs.
082: */
083: public PipedReader(PipedWriter src) throws IOException {
084: connect(src);
085: }
086:
087: /**
088: * Creates a <code>PipedReader</code> so
089: * that it is not yet connected. It must be
090: * connected to a <code>PipedWriter</code>
091: * before being used.
092: *
093: * @see java.io.PipedReader#connect(java.io.PipedWriter)
094: * @see java.io.PipedWriter#connect(java.io.PipedReader)
095: */
096: public PipedReader() {
097: }
098:
099: /**
100: * Causes this piped reader to be connected
101: * to the piped writer <code>src</code>.
102: * If this object is already connected to some
103: * other piped writer, an <code>IOException</code>
104: * is thrown.
105: * <p>
106: * If <code>src</code> is an
107: * unconnected piped writer and <code>snk</code>
108: * is an unconnected piped reader, they
109: * may be connected by either the call:
110: * <p>
111: * <pre><code>snk.connect(src)</code> </pre>
112: * <p>
113: * or the call:
114: * <p>
115: * <pre><code>src.connect(snk)</code> </pre>
116: * <p>
117: * The two
118: * calls have the same effect.
119: *
120: * @param src The piped writer to connect to.
121: * @exception IOException if an I/O error occurs.
122: */
123: public void connect(PipedWriter src) throws IOException {
124: src.connect(this );
125: }
126:
127: /**
128: * Receives a char of data. This method will block if no input is
129: * available.
130: */
131: synchronized void receive(int c) throws IOException {
132: if (!connected) {
133: throw new IOException("Pipe not connected");
134: } else if (closedByWriter || closedByReader) {
135: throw new IOException("Pipe closed");
136: } else if (readSide != null && !readSide.isAlive()) {
137: throw new IOException("Read end dead");
138: }
139:
140: writeSide = Thread.currentThread();
141: while (in == out) {
142: if ((readSide != null) && !readSide.isAlive()) {
143: throw new IOException("Pipe broken");
144: }
145: /* full: kick any waiting readers */
146: notifyAll();
147: try {
148: wait(1000);
149: } catch (InterruptedException ex) {
150: throw new java.io.InterruptedIOException();
151: }
152: }
153: if (in < 0) {
154: in = 0;
155: out = 0;
156: }
157: buffer[in++] = (char) c;
158: if (in >= buffer.length) {
159: in = 0;
160: }
161: }
162:
163: /**
164: * Receives data into an array of characters. This method will
165: * block until some input is available.
166: */
167: synchronized void receive(char c[], int off, int len)
168: throws IOException {
169: while (--len >= 0) {
170: receive(c[off++]);
171: }
172: }
173:
174: /**
175: * Notifies all waiting threads that the last character of data has been
176: * received.
177: */
178: synchronized void receivedLast() {
179: closedByWriter = true;
180: notifyAll();
181: }
182:
183: /**
184: * Reads the next character of data from this piped stream.
185: * If no character is available because the end of the stream
186: * has been reached, the value <code>-1</code> is returned.
187: * This method blocks until input data is available, the end of
188: * the stream is detected, or an exception is thrown.
189: *
190: * If a thread was providing data characters
191: * to the connected piped writer, but
192: * the thread is no longer alive, then an
193: * <code>IOException</code> is thrown.
194: *
195: * @return the next character of data, or <code>-1</code> if the end of the
196: * stream is reached.
197: * @exception IOException if the pipe is broken.
198: */
199: public synchronized int read() throws IOException {
200: if (!connected) {
201: throw new IOException("Pipe not connected");
202: } else if (closedByReader) {
203: throw new IOException("Pipe closed");
204: } else if (writeSide != null && !writeSide.isAlive()
205: && !closedByWriter && (in < 0)) {
206: throw new IOException("Write end dead");
207: }
208:
209: readSide = Thread.currentThread();
210: int trials = 2;
211: while (in < 0) {
212: if (closedByWriter) {
213: /* closed by writer, return EOF */
214: return -1;
215: }
216: if ((writeSide != null) && (!writeSide.isAlive())
217: && (--trials < 0)) {
218: throw new IOException("Pipe broken");
219: }
220: /* might be a writer waiting */
221: notifyAll();
222: try {
223: wait(1000);
224: } catch (InterruptedException ex) {
225: throw new java.io.InterruptedIOException();
226: }
227: }
228: int ret = buffer[out++];
229: if (out >= buffer.length) {
230: out = 0;
231: }
232: if (in == out) {
233: /* now empty */
234: in = -1;
235: }
236: return ret;
237: }
238:
239: /**
240: * Reads up to <code>len</code> characters of data from this piped
241: * stream into an array of characters. Less than <code>len</code> characters
242: * will be read if the end of the data stream is reached. This method
243: * blocks until at least one character of input is available.
244: * If a thread was providing data characters to the connected piped output,
245: * but the thread is no longer alive, then an <code>IOException</code>
246: * is thrown.
247: *
248: * @param cbuf the buffer into which the data is read.
249: * @param off the start offset of the data.
250: * @param len the maximum number of characters read.
251: * @return the total number of characters read into the buffer, or
252: * <code>-1</code> if there is no more data because the end of
253: * the stream has been reached.
254: * @exception IOException if an I/O error occurs.
255: */
256: public synchronized int read(char cbuf[], int off, int len)
257: throws IOException {
258: if (!connected) {
259: throw new IOException("Pipe not connected");
260: } else if (closedByReader) {
261: throw new IOException("Pipe closed");
262: } else if (writeSide != null && !writeSide.isAlive()
263: && !closedByWriter && (in < 0)) {
264: throw new IOException("Write end dead");
265: }
266:
267: if ((off < 0) || (off > cbuf.length) || (len < 0)
268: || ((off + len) > cbuf.length) || ((off + len) < 0)) {
269: throw new IndexOutOfBoundsException();
270: } else if (len == 0) {
271: return 0;
272: }
273:
274: /* possibly wait on the first character */
275: int c = read();
276: if (c < 0) {
277: return -1;
278: }
279: cbuf[off] = (char) c;
280: int rlen = 1;
281: while ((in >= 0) && (--len > 0)) {
282: cbuf[off + rlen] = buffer[out++];
283: rlen++;
284: if (out >= buffer.length) {
285: out = 0;
286: }
287: if (in == out) {
288: /* now empty */
289: in = -1;
290: }
291: }
292: return rlen;
293: }
294:
295: /**
296: * Tell whether this stream is ready to be read. A piped character
297: * stream is ready if the circular buffer is not empty.
298: *
299: * @exception IOException If an I/O error occurs
300: */
301: public synchronized boolean ready() throws IOException {
302: if (!connected) {
303: throw new IOException("Pipe not connected");
304: } else if (closedByReader) {
305: throw new IOException("Pipe closed");
306: } else if (writeSide != null && !writeSide.isAlive()
307: && !closedByWriter && (in < 0)) {
308: throw new IOException("Write end dead");
309: }
310: if (in < 0) {
311: return false;
312: } else {
313: return true;
314: }
315: }
316:
317: /**
318: * Closes this piped stream and releases any system resources
319: * associated with the stream.
320: *
321: * @exception IOException if an I/O error occurs.
322: */
323: public void close() throws IOException {
324: in = -1;
325: closedByReader = true;
326: }
327: }
|