001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package java.io;
019:
020: import org.apache.harmony.luni.util.Msg;
021:
022: /**
023: * PipedReader is a class which receives information on a communications pipe.
024: * When two threads want to pass data back and forth, one creates a piped writer
025: * and the other creates a piped reader.
026: */
027: public class PipedReader extends Reader {
028:
029: private Thread lastReader;
030:
031: private Thread lastWriter;
032:
033: private boolean isClosed;
034:
035: /**
036: * The circular buffer through which data is passed.
037: */
038: private char data[];
039:
040: /**
041: * The index in <code>buffer</code> where the next character will be
042: * written.
043: */
044: private int in = -1;
045:
046: /**
047: * The index in <code>buffer</code> where the next character will be read.
048: */
049: private int out;
050:
051: /**
052: * The size of the default pipe in characters
053: */
054: private static final int PIPE_SIZE = 1024;
055:
056: /**
057: * Indicates if this pipe is connected
058: */
059: private boolean isConnected;
060:
061: /**
062: * Constructs a new unconnected PipedReader. The resulting Reader must be
063: * connected to a PipedWriter before data may be read from it.
064: */
065: public PipedReader() {
066: data = new char[PIPE_SIZE];
067: }
068:
069: /**
070: * Constructs a new PipedReader connected to the PipedWriter
071: * <code>out</code>. Any data written to the writer can be read from the
072: * this reader.
073: *
074: * @param out
075: * the PipedWriter to connect to.
076: *
077: * @throws IOException
078: * if this or <code>out</code> are already connected.
079: */
080: public PipedReader(PipedWriter out) throws IOException {
081: this ();
082: connect(out);
083: }
084:
085: /**
086: * Close this PipedReader. This implementation releases the buffer used for
087: * the pipe and notifies all threads waiting to read or write.
088: *
089: * @throws IOException
090: * If an error occurs attempting to close this reader.
091: */
092: @Override
093: public void close() throws IOException {
094: synchronized (lock) {
095: /* No exception thrown if already closed */
096: if (data != null) {
097: /* Release buffer to indicate closed. */
098: data = null;
099: }
100: }
101: }
102:
103: /**
104: * Connects this PipedReader to a PipedWriter. Any data written to the
105: * Writer becomes available in this Reader.
106: *
107: * @param src
108: * the source PipedWriter.
109: *
110: * @throws IOException
111: * If either Writer or Reader is already connected.
112: */
113: public void connect(PipedWriter src) throws IOException {
114: synchronized (lock) {
115: src.connect(this );
116: }
117: }
118:
119: /**
120: * Establish the connection to the PipedWriter.
121: *
122: * @throws IOException
123: * If this Reader is already connected.
124: */
125: void establishConnection() throws IOException {
126: synchronized (lock) {
127: if (data == null) {
128: throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
129: }
130: if (isConnected) {
131: throw new IOException(Msg.getString("K007a")); //$NON-NLS-1$
132: }
133: isConnected = true;
134: }
135: }
136:
137: /**
138: * Reads the next character from this Reader. Answer the character actually
139: * read or -1 if no character was read and end of reader was encountered.
140: * Separate threads should be used for the reader of the PipedReader and the
141: * PipedWriter. There may be undesirable results if more than one Thread
142: * interacts a reader or writer pipe.
143: *
144: * @return int the character read -1 if end of reader.
145: *
146: * @throws IOException
147: * If the reader is already closed or another IOException
148: * occurs.
149: */
150: @Override
151: public int read() throws IOException {
152: char[] carray = new char[1];
153: int result = read(carray, 0, 1);
154: return result != -1 ? carray[0] : result;
155: }
156:
157: /**
158: * Reads at most <code>count</code> character from this PipedReader and
159: * stores them in char array <code>buffer</code> starting at
160: * <code>offset</code>. Answer the number of characters actually read or
161: * -1 if no characters were read and end of stream was encountered. Separate
162: * threads should be used for the reader of the PipedReader and the
163: * PipedWriter. There may be undesirable results if more than one Thread
164: * interacts a reader or writer pipe.
165: *
166: * @param buffer
167: * the character array in which to store the read characters.
168: * @param offset
169: * the offset in <code>buffer</code> to store the read
170: * characters.
171: * @param count
172: * the maximum number of characters to store in
173: * <code>buffer</code>.
174: * @return int the number of characters actually read or -1 if end of
175: * reader.
176: *
177: * @throws IOException
178: * If the reader is already closed or another IOException
179: * occurs.
180: */
181: @Override
182: public int read(char[] buffer, int offset, int count)
183: throws IOException {
184: synchronized (lock) {
185: if (!isConnected) {
186: throw new IOException(Msg.getString("K007b")); //$NON-NLS-1$
187: }
188: if (data == null) {
189: throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
190: }
191: // avoid int overflow
192: if (offset < 0 || count > buffer.length - offset
193: || count < 0) {
194: throw new IndexOutOfBoundsException();
195: }
196: if (count == 0) {
197: return 0;
198: }
199: /**
200: * Set the last thread to be reading on this PipedReader. If
201: * lastReader dies while someone is waiting to write an IOException
202: * of "Pipe broken" will be thrown in receive()
203: */
204: lastReader = Thread.currentThread();
205: try {
206: boolean first = true;
207: while (in == -1) {
208: // Are we at end of stream?
209: if (isClosed) {
210: return -1;
211: }
212: if (!first && lastWriter != null
213: && !lastWriter.isAlive()) {
214: throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
215: }
216: first = false;
217: // Notify callers of receive()
218: lock.notifyAll();
219: lock.wait(1000);
220: }
221: } catch (InterruptedException e) {
222: throw new InterruptedIOException();
223: }
224:
225: int copyLength = 0;
226: /* Copy chars from out to end of buffer first */
227: if (out >= in) {
228: copyLength = count > data.length - out ? data.length
229: - out : count;
230: System.arraycopy(data, out, buffer, offset, copyLength);
231: out += copyLength;
232: if (out == data.length) {
233: out = 0;
234: }
235: if (out == in) {
236: // empty buffer
237: in = -1;
238: out = 0;
239: }
240: }
241:
242: /*
243: * Did the read fully succeed in the previous copy or is the buffer
244: * empty?
245: */
246: if (copyLength == count || in == -1) {
247: return copyLength;
248: }
249:
250: int charsCopied = copyLength;
251: /* Copy bytes from 0 to the number of available bytes */
252: copyLength = in - out > count - copyLength ? count
253: - copyLength : in - out;
254: System.arraycopy(data, out, buffer, offset + charsCopied,
255: copyLength);
256: out += copyLength;
257: if (out == in) {
258: // empty buffer
259: in = -1;
260: out = 0;
261: }
262: return charsCopied + copyLength;
263: }
264: }
265:
266: /**
267: * Answer a boolean indicating whether or not this Reader is ready to be
268: * read. Answers true if the buffer contains characters to be read.
269: *
270: * @return boolean <code>true</code> if there are characters ready,
271: * <code>false</code> otherwise.
272: *
273: * @throws IOException
274: * If the reader is already closed or another IOException
275: * occurs.
276: */
277: @Override
278: public boolean ready() throws IOException {
279: synchronized (lock) {
280: if (!isConnected) {
281: throw new IOException(Msg.getString("K007b")); //$NON-NLS-1$
282: }
283: if (data == null) {
284: throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
285: }
286: return in != -1;
287: }
288: }
289:
290: /**
291: * Receives a char and stores it into the PipedReader. This called by
292: * PipedWriter.write() when writes occur.
293: * <P>
294: * If the buffer is full and the thread sending #receive is interrupted, the
295: * InterruptedIOException will be thrown.
296: *
297: * @param oneChar
298: * the char to store into the pipe.
299: *
300: * @throws IOException
301: * If the stream is already closed or another IOException
302: * occurs.
303: */
304: void receive(char oneChar) throws IOException {
305: synchronized (lock) {
306: if (data == null) {
307: throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
308: }
309: if (lastReader != null && !lastReader.isAlive()) {
310: throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
311: }
312: /*
313: * Set the last thread to be writing on this PipedWriter. If
314: * lastWriter dies while someone is waiting to read an IOException
315: * of "Pipe broken" will be thrown in read()
316: */
317: lastWriter = Thread.currentThread();
318: try {
319: while (data != null && out == in) {
320: lock.notifyAll();
321: wait(1000);
322: if (lastReader != null && !lastReader.isAlive()) {
323: throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
324: }
325: }
326: } catch (InterruptedException e) {
327: throw new InterruptedIOException();
328: }
329: if (data != null) {
330: if (in == -1) {
331: in = 0;
332: }
333: data[in++] = oneChar;
334: if (in == data.length) {
335: in = 0;
336: }
337: return;
338: }
339: }
340: }
341:
342: /**
343: * Receives a char array and stores it into the PipedReader. This called by
344: * PipedWriter.write() when writes occur.
345: * <P>
346: * If the buffer is full and the thread sending #receive is interrupted, the
347: * InterruptedIOException will be thrown.
348: *
349: * @param chars
350: * the char array to store into the pipe.
351: * @param offset
352: * offset to start reading from
353: * @param count
354: * total characters to read
355: *
356: * @throws IOException
357: * If the stream is already closed or another IOException
358: * occurs.
359: */
360: void receive(char[] chars, int offset, int count)
361: throws IOException {
362: synchronized (lock) {
363: if (data == null) {
364: throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
365: }
366: if (lastReader != null && !lastReader.isAlive()) {
367: throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
368: }
369: /**
370: * Set the last thread to be writing on this PipedWriter. If
371: * lastWriter dies while someone is waiting to read an IOException
372: * of "Pipe broken" will be thrown in read()
373: */
374: lastWriter = Thread.currentThread();
375: while (count > 0) {
376: try {
377: while (data != null && out == in) {
378: lock.notifyAll();
379: wait(1000);
380: if (lastReader != null && !lastReader.isAlive()) {
381: throw new IOException(Msg
382: .getString("K0076")); //$NON-NLS-1$
383: }
384: }
385: } catch (InterruptedException e) {
386: throw new InterruptedIOException();
387: }
388: if (data == null) {
389: break;
390: }
391: if (in == -1) {
392: in = 0;
393: }
394: if (in >= out) {
395: int length = data.length - in;
396: if (count < length) {
397: length = count;
398: }
399: System.arraycopy(chars, offset, data, in, length);
400: offset += length;
401: count -= length;
402: in += length;
403: if (in == data.length) {
404: in = 0;
405: }
406: }
407: if (count > 0 && in != out) {
408: int length = out - in;
409: if (count < length) {
410: length = count;
411: }
412: System.arraycopy(chars, offset, data, in, length);
413: offset += length;
414: count -= length;
415: in += length;
416: }
417: }
418: if (count == 0) {
419: return;
420: }
421: }
422: }
423:
424: void done() {
425: synchronized (lock) {
426: isClosed = true;
427: lock.notifyAll();
428: }
429: }
430:
431: void flush() {
432: synchronized (lock) {
433: lock.notifyAll();
434: }
435: }
436: }
|