001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package java.io;
019:
020: import org.apache.harmony.luni.util.Msg;
021:
022: /**
023: * PipedInputStream is a class which receives information on a communications
024: * pipe. When two threads want to pass data back and forth, one creates a piped
025: * output stream and the other creates a piped input stream.
026: *
027: * @see PipedOutputStream
028: */
029: public class PipedInputStream extends InputStream {
030:
031: private Thread lastReader, lastWriter;
032:
033: private boolean isClosed = false;
034:
035: /**
036: * The circular buffer through which data is passed.
037: */
038: protected byte buffer[];
039:
040: /**
041: * The index in <code>buffer</code> where the next byte will be written.
042: */
043: protected int in = -1;
044:
045: /**
046: * The index in <code>buffer</code> where the next byte will be read.
047: */
048: protected int out = 0;
049:
050: /**
051: * The size of the default pipe in bytes
052: */
053: protected static final int PIPE_SIZE = 1024;
054:
055: /**
056: * Indicates if this pipe is connected
057: */
058: boolean isConnected = false;
059:
060: /**
061: * Constructs a new unconnected PipedInputStream. The resulting Stream must
062: * be connected to a PipedOutputStream before data may be read from it.
063: *
064: */
065: public PipedInputStream() {
066: /* empty */
067: }
068:
069: /**
070: * Constructs a new PipedInputStream connected to the PipedOutputStream
071: * <code>out</code>. Any data written to the output stream can be read
072: * from the this input stream.
073: *
074: * @param out
075: * the PipedOutputStream to connect to.
076: *
077: * @throws IOException
078: * if this or <code>out</code> are already connected.
079: */
080: public PipedInputStream(PipedOutputStream out) throws IOException {
081: connect(out);
082: }
083:
084: /**
085: * Answers a int representing the number of bytes that are available before
086: * this PipedInputStream will block. This method returns the number of bytes
087: * written to the pipe but not read yet up to the size of the pipe.
088: *
089: * @return int the number of bytes available before blocking.
090: *
091: * @throws IOException
092: * If an error occurs in this stream.
093: */
094: @Override
095: public synchronized int available() throws IOException {
096: if (buffer == null || in == -1) {
097: return 0;
098: }
099: return in <= out ? buffer.length - out + in : in - out;
100: }
101:
102: /**
103: * Close this PipedInputStream. This implementation releases the buffer used
104: * for the pipe and notifies all threads waiting to read or write.
105: *
106: * @throws IOException
107: * If an error occurs attempting to close this stream.
108: */
109: @Override
110: public void close() throws IOException {
111: synchronized (this ) {
112: /* No exception thrown if already closed */
113: if (buffer != null) {
114: /* Release buffer to indicate closed. */
115: buffer = null;
116: }
117: }
118: }
119:
120: /**
121: * Connects this PipedInputStream to a PipedOutputStream. Any data written
122: * to the OutputStream becomes readable in this InputStream.
123: *
124: * @param src
125: * the source PipedOutputStream.
126: *
127: * @throws IOException
128: * If either stream is already connected.
129: */
130: public void connect(PipedOutputStream src) throws IOException {
131: src.connect(this );
132: }
133:
134: /**
135: * Reads a single byte from this PipedInputStream and returns the result as
136: * an int. The low-order byte is returned or -1 of the end of stream was
137: * encountered. If there is no data in the pipe, this method blocks until
138: * there is data available. Separate threads should be used for the reader
139: * of the PipedInputStream and the PipedOutputStream. There may be
140: * undesirable results if more than one Thread interacts a input or output
141: * pipe.
142: *
143: * @return int The byte read or -1 if end of stream.
144: *
145: * @throws IOException
146: * If the stream is already closed or another IOException
147: * occurs.
148: */
149: @Override
150: public synchronized int read() throws IOException {
151: if (!isConnected) {
152: throw new IOException(Msg.getString("K0074")); //$NON-NLS-1$
153: }
154: if (buffer == null) {
155: throw new IOException(Msg.getString("K0075")); //$NON-NLS-1$
156: }
157: /**
158: * Set the last thread to be reading on this PipedInputStream. If
159: * lastReader dies while someone is waiting to write an IOException of
160: * "Pipe broken" will be thrown in receive()
161: */
162: lastReader = Thread.currentThread();
163: try {
164: int attempts = 3;
165: while (in == -1) {
166: // Are we at end of stream?
167: if (isClosed) {
168: return -1;
169: }
170: if ((attempts-- <= 0) && lastWriter != null
171: && !lastWriter.isAlive()) {
172: throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
173: }
174: // Notify callers of receive()
175: notifyAll();
176: wait(1000);
177: }
178: } catch (InterruptedException e) {
179: throw new InterruptedIOException();
180: }
181:
182: byte result = buffer[out++];
183: if (out == buffer.length) {
184: out = 0;
185: }
186: if (out == in) {
187: // empty buffer
188: in = -1;
189: out = 0;
190: }
191: return result & 0xff;
192: }
193:
194: /**
195: * Reads at most <code>count</code> bytes from this PipedInputStream and
196: * stores them in byte array <code>buffer</code> starting at
197: * <code>offset</code>. Answer the number of bytes actually read or -1 if
198: * no bytes were read and end of stream was encountered. Separate threads
199: * should be used for the reader of the PipedInputStream and the
200: * PipedOutputStream. There may be undesirable results if more than one
201: * Thread interacts a input or output pipe.
202: *
203: * @param bytes
204: * the byte array in which to store the read bytes.
205: * @param offset
206: * the offset in <code>buffer</code> to store the read bytes.
207: * @param count
208: * the maximum number of bytes to store in <code>buffer</code>.
209: * @return the number of bytes actually read or -1 if end of stream.
210: *
211: * @throws IOException
212: * If the stream is already closed or another IOException
213: * occurs.
214: */
215: @Override
216: public synchronized int read(byte[] bytes, int offset, int count)
217: throws IOException {
218: if (bytes == null) {
219: throw new NullPointerException();
220: }
221:
222: if (offset < 0 || offset > bytes.length || count < 0
223: || count > bytes.length - offset) {
224: throw new IndexOutOfBoundsException();
225: }
226:
227: if (count == 0) {
228: return 0;
229: }
230:
231: if (!isConnected) {
232: throw new IOException(Msg.getString("K0074")); //$NON-NLS-1$
233: }
234:
235: if (buffer == null) {
236: throw new IOException(Msg.getString("K0075")); //$NON-NLS-1$
237: }
238:
239: /**
240: * Set the last thread to be reading on this PipedInputStream. If
241: * lastReader dies while someone is waiting to write an IOException of
242: * "Pipe broken" will be thrown in receive()
243: */
244: lastReader = Thread.currentThread();
245: try {
246: int attempts = 3;
247: while (in == -1) {
248: // Are we at end of stream?
249: if (isClosed) {
250: return -1;
251: }
252: if ((attempts-- <= 0) && lastWriter != null
253: && !lastWriter.isAlive()) {
254: throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
255: }
256: // Notify callers of receive()
257: notifyAll();
258: wait(1000);
259: }
260: } catch (InterruptedException e) {
261: throw new InterruptedIOException();
262: }
263:
264: int copyLength = 0;
265: /* Copy bytes from out to end of buffer first */
266: if (out >= in) {
267: copyLength = count > (buffer.length - out) ? buffer.length
268: - out : count;
269: System.arraycopy(buffer, out, bytes, offset, copyLength);
270: out += copyLength;
271: if (out == buffer.length) {
272: out = 0;
273: }
274: if (out == in) {
275: // empty buffer
276: in = -1;
277: out = 0;
278: }
279: }
280:
281: /*
282: * Did the read fully succeed in the previous copy or is the buffer
283: * empty?
284: */
285: if (copyLength == count || in == -1) {
286: return copyLength;
287: }
288:
289: int bytesCopied = copyLength;
290: /* Copy bytes from 0 to the number of available bytes */
291: copyLength = in - out > (count - bytesCopied) ? count
292: - bytesCopied : in - out;
293: System.arraycopy(buffer, out, bytes, offset + bytesCopied,
294: copyLength);
295: out += copyLength;
296: if (out == in) {
297: // empty buffer
298: in = -1;
299: out = 0;
300: }
301: return bytesCopied + copyLength;
302: }
303:
304: /**
305: * Receives a byte and stores it into the PipedInputStream. This called by
306: * PipedOutputStream.write() when writes occur. The lowest-order byte is
307: * stored at index <code>in</code> in the <code>buffer</code>.
308: * <P>
309: * If the buffer is full and the thread sending #receive is interrupted, the
310: * InterruptedIOException will be thrown.
311: *
312: * @param oneByte
313: * the byte to store into the pipe.
314: *
315: * @throws IOException
316: * If the stream is already closed or another IOException
317: * occurs.
318: */
319: protected synchronized void receive(int oneByte) throws IOException {
320: if (buffer == null || isClosed) {
321: throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
322: }
323: if (lastReader != null && !lastReader.isAlive()) {
324: throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
325: }
326: /**
327: * Set the last thread to be writing on this PipedInputStream. If
328: * lastWriter dies while someone is waiting to read an IOException of
329: * "Pipe broken" will be thrown in read()
330: */
331: lastWriter = Thread.currentThread();
332: try {
333: while (buffer != null && out == in) {
334: notifyAll();
335: wait(1000);
336: if (lastReader != null && !lastReader.isAlive()) {
337: throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
338: }
339: }
340: } catch (InterruptedException e) {
341: throw new InterruptedIOException();
342: }
343: if (buffer != null) {
344: if (in == -1) {
345: in = 0;
346: }
347: buffer[in++] = (byte) oneByte;
348: if (in == buffer.length) {
349: in = 0;
350: }
351: return;
352: }
353: }
354:
355: synchronized void done() {
356: isClosed = true;
357: notifyAll();
358: }
359: }
|