001: package ch.ethz.ssh2;
002:
003: import java.io.IOException;
004: import java.io.InputStream;
005:
006: /**
007: * A <code>StreamGobbler</code> is an InputStream that uses an internal worker
008: * thread to constantly consume input from another InputStream. It uses a buffer
009: * to store the consumed data. The buffer size is automatically adjusted, if needed.
010: * <p>
011: * This class is sometimes very convenient - if you wrap a session's STDOUT and STDERR
012: * InputStreams with instances of this class, then you don't have to bother about
013: * the shared window of STDOUT and STDERR in the low level SSH-2 protocol,
014: * since all arriving data will be immediatelly consumed by the worker threads.
015: * Also, as a side effect, the streams will be buffered (e.g., single byte
016: * read() operations are faster).
017: * <p>
018: * Other SSH for Java libraries include this functionality by default in
019: * their STDOUT and STDERR InputStream implementations, however, please be aware
020: * that this approach has also a downside:
021: * <p>
022: * If you do not call the StreamGobbler's <code>read()</code> method often enough
023: * and the peer is constantly sending huge amounts of data, then you will sooner or later
024: * encounter a low memory situation due to the aggregated data (well, it also depends on the Java heap size).
025: * Joe Average will like this class anyway - a paranoid programmer would never use such an approach.
026: * <p>
027: * The term "StreamGobbler" was taken from an article called "When Runtime.exec() won't",
028: * see http://www.javaworld.com/javaworld/jw-12-2000/jw-1229-traps.html.
029: *
030: * @author Christian Plattner, plattner@inf.ethz.ch
031: * @version $Id: StreamGobbler.java,v 1.4 2006/02/14 19:43:16 cplattne Exp $
032: */
033:
034: public class StreamGobbler extends InputStream {
035: class GobblerThread extends Thread {
036: public void run() {
037: byte[] buff = new byte[8192];
038:
039: while (true) {
040: try {
041: int avail = is.read(buff);
042:
043: synchronized (synchronizer) {
044: if (avail <= 0) {
045: isEOF = true;
046: synchronizer.notifyAll();
047: break;
048: }
049:
050: int space_available = buffer.length - write_pos;
051:
052: if (space_available < avail) {
053: /* compact/resize buffer */
054:
055: int unread_size = write_pos - read_pos;
056: int need_space = unread_size + avail;
057:
058: byte[] new_buffer = buffer;
059:
060: if (need_space > buffer.length) {
061: int inc = need_space / 3;
062: inc = (inc < 256) ? 256 : inc;
063: inc = (inc > 8192) ? 8192 : inc;
064: new_buffer = new byte[need_space + inc];
065: }
066:
067: if (unread_size > 0)
068: System.arraycopy(buffer, read_pos,
069: new_buffer, 0, unread_size);
070:
071: buffer = new_buffer;
072:
073: read_pos = 0;
074: write_pos = unread_size;
075: }
076:
077: System.arraycopy(buff, 0, buffer, write_pos,
078: avail);
079: write_pos += avail;
080:
081: synchronizer.notifyAll();
082: }
083: } catch (IOException e) {
084: synchronized (synchronizer) {
085: exception = e;
086: synchronizer.notifyAll();
087: break;
088: }
089: }
090: }
091: }
092: }
093:
094: private InputStream is;
095: private GobblerThread t;
096:
097: private Object synchronizer = new Object();
098:
099: private boolean isEOF = false;
100: private boolean isClosed = false;
101: private IOException exception = null;
102:
103: private byte[] buffer = new byte[2048];
104: private int read_pos = 0;
105: private int write_pos = 0;
106:
107: public StreamGobbler(InputStream is) {
108: this .is = is;
109: t = new GobblerThread();
110: t.setDaemon(true);
111: t.start();
112: }
113:
114: public int read() throws IOException {
115: synchronized (synchronizer) {
116: if (isClosed)
117: throw new IOException("This StreamGobbler is closed.");
118:
119: while (read_pos == write_pos) {
120: if (exception != null)
121: throw exception;
122:
123: if (isEOF)
124: return -1;
125:
126: try {
127: synchronizer.wait();
128: } catch (InterruptedException e) {
129: }
130: }
131:
132: int b = buffer[read_pos++] & 0xff;
133:
134: return b;
135: }
136: }
137:
138: public int available() throws IOException {
139: synchronized (synchronizer) {
140: if (isClosed)
141: throw new IOException("This StreamGobbler is closed.");
142:
143: return write_pos - read_pos;
144: }
145: }
146:
147: public int read(byte[] b) throws IOException {
148: return read(b, 0, b.length);
149: }
150:
151: public void close() throws IOException {
152: synchronized (synchronizer) {
153: if (isClosed)
154: return;
155: isClosed = true;
156: isEOF = true;
157: synchronizer.notifyAll();
158: is.close();
159: }
160: }
161:
162: public int read(byte[] b, int off, int len) throws IOException {
163: if (b == null)
164: throw new NullPointerException();
165:
166: if ((off < 0) || (len < 0) || ((off + len) > b.length)
167: || ((off + len) < 0) || (off > b.length))
168: throw new IndexOutOfBoundsException();
169:
170: if (len == 0)
171: return 0;
172:
173: synchronized (synchronizer) {
174: if (isClosed)
175: throw new IOException("This StreamGobbler is closed.");
176:
177: while (read_pos == write_pos) {
178: if (exception != null)
179: throw exception;
180:
181: if (isEOF)
182: return -1;
183:
184: try {
185: synchronizer.wait();
186: } catch (InterruptedException e) {
187: }
188: }
189:
190: int avail = write_pos - read_pos;
191:
192: avail = (avail > len) ? len : avail;
193:
194: System.arraycopy(buffer, read_pos, b, off, avail);
195:
196: read_pos += avail;
197:
198: return avail;
199: }
200: }
201: }
|