001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.util;
028:
029: import java.io.IOException;
030: import java.io.InputStream;
031:
032: /**
033: * An efficient replacement for java.io.PipedInputStream.
034: **/
035: public class PipedInputStream extends InputStream {
036: private static final int DEFAULT_BUFSIZE = 10000;
037:
038: private byte[] buffer;
039: private int in = 0;
040: private int out = 0;
041: private int space;
042: private boolean eof = false;
043: private boolean closed = false;
044:
045: /**
046: * Creates an unconnected PipedInputStream with a default buffer size.
047: **/
048: public PipedInputStream() {
049: this (DEFAULT_BUFSIZE);
050: }
051:
052: /**
053: * Creates an unconnected PipedInputStream with a specific buffer
054: * size.
055: * @param bufSize the size of the ring buffer (defaults to DEFAULT_BUFSIZE).
056: **/
057: public PipedInputStream(int bufSize) {
058: buffer = new byte[bufSize];
059: space = bufSize;
060: }
061:
062: /**
063: * Creates a PipedInputStream with a default buffer size connected
064: * to the given PipedOutputStream.
065: * @param pos the PipedOutputStream to connect to this PipedInputStream
066: **/
067: public PipedInputStream(PipedOutputStream pos) {
068: this (DEFAULT_BUFSIZE);
069: pos.connect(this );
070: }
071:
072: /**
073: * Creates a PipedInputStream with a specific buffer size connected
074: * to the given PipedOutputStream.
075: * @param pos the PipedOutputStream to connect to this PipedInputStream
076: * @param bufSize the size of the ring buffer (defaults to DEFAULT_BUFSIZE).
077: **/
078: public PipedInputStream(PipedOutputStream pos, int bufSize) {
079: this (bufSize);
080: pos.connect(this );
081: }
082:
083: /**
084: * Used by PipedOutputStream to put a byte into this stream.
085: * @param b the byte to put.;
086: **/
087: synchronized void put(int b) throws IOException {
088: if (closed)
089: throw new IOException("PipedInputStream Closed");
090: while (space == 0) {
091: if (closed)
092: throw new IOException("PipedInputStream Closed");
093: try {
094: notify(); // Get the read thread going if necessary
095: wait();
096: } catch (InterruptedException ie) {
097: }
098: }
099: buffer[in++] = (byte) (b & 0xff);
100: if (in == buffer.length)
101: in = 0;
102: space--;
103: }
104:
105: /**
106: * Put bytes into this stream. Used by PipedOutputStream.
107: * @param b an array of bytes to be inserted
108: * @param off the offset in the array of the first byte to put
109: * @param len the number of bytes to put
110: **/
111: synchronized void put(byte[] b, int off, int len)
112: throws IOException {
113: while (len > 0) {
114: if (closed)
115: throw new IOException("PipedInputStream Closed");
116: while (space == 0) {
117: if (closed)
118: throw new IOException("PipedInputStream Closed");
119: try {
120: notify(); // Get the read thread going if necessary
121: wait();
122: } catch (InterruptedException ie) {
123: }
124: }
125: int tail = buffer.length - in;
126: int nb = Math.min(len, Math.min(tail, space));
127: System.arraycopy(b, off, buffer, in, nb);
128: in += nb;
129: if (in == buffer.length)
130: in = 0;
131: off += nb;
132: space -= nb;
133: len -= nb;
134: }
135: }
136:
137: /**
138: * Insure that everything that has been put into the stream is
139: * available. Wakes the reader if the buffer is not empty.
140: **/
141: synchronized void flush() throws IOException {
142: if (closed)
143: throw new IOException("PipedInputStream Closed");
144: if (buffer.length > space)
145: notify();
146: }
147:
148: synchronized void setEOF() throws IOException {
149: if (closed)
150: throw new IOException("PipedInputStream Closed");
151: flush();
152: eof = true;
153: }
154:
155: /**
156: * Read a single byte. Waits for the byte to be available if necessary.
157: * @return the next byte in the buffer as an int.
158: **/
159: public synchronized int read() throws IOException {
160: if (closed)
161: throw new IOException("Closed");
162: while (space == buffer.length) {
163: try {
164: if (eof)
165: return -1;
166: wait();
167: } catch (InterruptedException ie) {
168: }
169: }
170: int result = buffer[out++] & 0xff;
171: if (out == buffer.length)
172: out = 0;
173: space++;
174: if (space == buffer.length)
175: notify();
176: return result;
177: }
178:
179: /**
180: * Read an array full of bytes. Always reads at least one byte. May
181: * return with the array not completely full.
182: * @param b the byte array to fill.
183: * @return the number of bytes read or -1 if eof has been reached.
184: **/
185: public synchronized int read(byte[] b) throws IOException {
186: return read(b, 0, b.length);
187: }
188:
189: /**
190: * Read some bytes into an array. Allows only a portion of an array to be filled.
191: * @param b the array to fill
192: * @param off the offset into the array of the first byte to be read
193: * @param len the maximum number of bytes to read.
194: * @return the number of bytes read. If eof has been reached, -1 is
195: * returned. if len is zero, then zero is returned.
196: **/
197: public synchronized int read(byte[] b, int off, int len)
198: throws IOException {
199: if (closed)
200: throw new IOException("Closed");
201: int result = 0;
202: while (len > 0) {
203: while (buffer.length == space) {
204: if (eof) {
205: if (result == 0)
206: return -1;
207: return result;
208: }
209: try {
210: wait();
211: } catch (InterruptedException ie) {
212: }
213: }
214: int tail = buffer.length - out;
215: int avail = buffer.length - space;
216: int nb = Math.min(len, Math.min(tail, avail));
217: System.arraycopy(buffer, out, b, off, nb);
218: out += nb;
219: if (out == buffer.length) {
220: out = 0;
221: }
222: space += nb;
223: notify();
224: off += nb;
225: len -= nb;
226: result += nb;
227: if (space == buffer.length)
228: break;
229: }
230: return result;
231: }
232: }
|