001: /*
002: * SSHTools - Java SSH2 API
003: *
004: * Copyright (C) 2002-2003 Lee David Painter and Contributors.
005: *
006: * Contributions made by:
007: *
008: * Brett Smith
009: * Richard Pernavas
010: * Erwin Bolwidt
011: *
012: * This program is free software; you can redistribute it and/or
013: * modify it under the terms of the GNU General Public License
014: * as published by the Free Software Foundation; either version 2
015: * of the License, or (at your option) any later version.
016: *
017: * This program is distributed in the hope that it will be useful,
018: * but WITHOUT ANY WARRANTY; without even the implied warranty of
019: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
020: * GNU General Public License for more details.
021: *
022: * You should have received a copy of the GNU General Public License
023: * along with this program; if not, write to the Free Software
024: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
025: */
026: package com.sshtools.j2ssh.connection;
027:
028: import com.sshtools.j2ssh.transport.MessageNotAvailableException;
029: import com.sshtools.j2ssh.transport.MessageStoreEOFException;
030: import com.sshtools.j2ssh.transport.SshMessageStore;
031:
032: import org.apache.commons.logging.Log;
033: import org.apache.commons.logging.LogFactory;
034:
035: import java.io.IOException;
036: import java.io.InputStream;
037: import java.io.InterruptedIOException;
038:
039: /**
040: *
041: *
042: * @author $author$
043: * @version $Revision: 1.35 $
044: */
045: public class ChannelInputStream extends InputStream {
046: private static Log log = LogFactory
047: .getLog(ChannelInputStream.class);
048: int[] filter;
049: byte[] msgdata;
050: int currentPos = 0;
051: private SshMessageStore messageStore;
052: private Integer type = null;
053: private int interrupt = 5000;
054: private boolean isBlocking = false;
055: private Object lock = new Object();
056: private Thread blockingThread = null;
057:
058: /**
059: * Creates a new ChannelInputStream object.
060: *
061: * @param messageStore
062: * @param type
063: */
064: public ChannelInputStream(SshMessageStore messageStore, Integer type) {
065: this .messageStore = messageStore;
066: filter = new int[1];
067: this .type = type;
068:
069: if (type != null) {
070: filter[0] = SshMsgChannelExtendedData.SSH_MSG_CHANNEL_EXTENDED_DATA;
071: } else {
072: filter[0] = SshMsgChannelData.SSH_MSG_CHANNEL_DATA;
073: }
074: }
075:
076: /**
077: * Creates a new ChannelInputStream object.
078: *
079: * @param messageStore
080: */
081: public ChannelInputStream(SshMessageStore messageStore) {
082: this (messageStore, null);
083: }
084:
085: /**
086: *
087: *
088: * @return
089: */
090: public int available() {
091: int available = 0;
092:
093: if (msgdata != null) {
094: available = msgdata.length - currentPos;
095:
096: if (log.isDebugEnabled() && (available > 0)) {
097: log.debug(String.valueOf(available)
098: + " bytes of channel data available");
099: }
100:
101: available = (available >= 0) ? available : 0;
102: }
103:
104: if (available == 0) {
105: try {
106: if (type != null) {
107: SshMsgChannelExtendedData msg = (SshMsgChannelExtendedData) messageStore
108: .peekMessage(filter);
109: available = msg.getChannelData().length;
110: } else {
111: SshMsgChannelData msg = (SshMsgChannelData) messageStore
112: .peekMessage(filter);
113: available = msg.getChannelData().length;
114: }
115:
116: if (log.isDebugEnabled()) {
117: log.debug(String.valueOf(available)
118: + " bytes of channel data available");
119: }
120: } catch (MessageStoreEOFException mse) {
121: log
122: .debug("No bytes available since the MessageStore is EOF");
123: available = -1;
124: } catch (MessageNotAvailableException mna) {
125: available = 0;
126: } catch (InterruptedException ex) {
127: log
128: .info("peekMessage was interrupted, no data available!");
129: available = 0;
130: }
131: }
132:
133: return available;
134: }
135:
136: /**
137: *
138: *
139: * @throws IOException
140: */
141: public void close() throws IOException {
142: log.info("Closing ChannelInputStream");
143: messageStore.close();
144: }
145:
146: /**
147: *
148: *
149: * @return
150: */
151: public boolean isClosed() {
152: return messageStore.isClosed();
153: }
154:
155: /**
156: *
157: *
158: * @param interrupt
159: */
160: public void setBlockInterrupt(int interrupt) {
161: this .interrupt = (interrupt < 1000) ? 1000 : interrupt;
162: }
163:
164: /**
165: *
166: */
167: public void interrupt() {
168: messageStore.breakWaiting();
169: }
170:
171: /**
172: *
173: *
174: * @return
175: *
176: * @throws java.io.IOException
177: * @throws InterruptedIOException
178: */
179: public int read() throws java.io.IOException {
180: try {
181: block();
182:
183: return msgdata[currentPos++] & 0xFF;
184: } catch (MessageStoreEOFException mse) {
185: return -1;
186: } catch (InterruptedException ex) {
187: throw new InterruptedIOException(
188: "The thread was interrupted whilst waiting for channel data");
189: }
190: }
191:
192: /**
193: *
194: *
195: * @param b
196: * @param off
197: * @param len
198: *
199: * @return
200: *
201: * @throws IOException
202: * @throws IOException
203: */
204: public int read(byte[] b, int off, int len) throws IOException {
205: try {
206: block();
207:
208: int actual = available();
209:
210: if (actual > len) {
211: actual = len;
212: }
213:
214: if (actual > 0) {
215: System.arraycopy(msgdata, currentPos, b, off, actual);
216: currentPos += actual;
217: }
218:
219: return actual;
220: } catch (MessageStoreEOFException mse) {
221: return -1;
222: } catch (InterruptedException ex) {
223: throw new InterruptedIOException(
224: "The thread was interrupted whilst waiting for channel data");
225: }
226: }
227:
228: private void block() throws MessageStoreEOFException,
229: InterruptedException, IOException {
230: if (msgdata == null) {
231: collectNextMessage();
232: }
233:
234: if (currentPos >= msgdata.length) {
235: collectNextMessage();
236: }
237: }
238:
239: private void startBlockingOperation() throws IOException {
240: synchronized (lock) {
241: if (isBlocking) {
242: throw new IOException(
243: (("Cannot read from InputStream! " + blockingThread) == null) ? "**NULL THREAD**"
244: : (blockingThread.getName() + " is currently performing a blocking operation"));
245: }
246:
247: log.debug("Starting blocking operation");
248: blockingThread = Thread.currentThread();
249: isBlocking = true;
250: }
251: }
252:
253: private void stopBlockingOperation() throws IOException {
254: synchronized (lock) {
255: log.debug("Completed blocking operation");
256: blockingThread = null;
257: isBlocking = false;
258: }
259: }
260:
261: private void collectNextMessage() throws MessageStoreEOFException,
262: InterruptedException, IOException {
263: // Collect the next message
264: startBlockingOperation();
265:
266: try {
267: if (type != null) {
268: SshMsgChannelExtendedData msg = null;
269:
270: while ((msg == null) && !isClosed()) {
271: try {
272: log.debug("Waiting for extended channel data");
273: msg = (SshMsgChannelExtendedData) messageStore
274: .getMessage(filter, interrupt);
275: } catch (MessageNotAvailableException ex) {
276: // Ignore the timeout but this allows us to review the
277: // InputStreams state once in a while
278: }
279: }
280:
281: if (msg != null) {
282: msgdata = msg.getChannelData();
283: currentPos = 0;
284: } else {
285: throw new MessageStoreEOFException();
286: }
287: } else {
288: SshMsgChannelData msg = null;
289:
290: while ((msg == null) && !isClosed()) {
291: try {
292: log.debug("Waiting for channel data");
293: msg = (SshMsgChannelData) messageStore
294: .getMessage(filter, interrupt);
295: } catch (MessageNotAvailableException ex1) {
296: // Ignore the timeout but this allows us to review the
297: // InputStreams state once in a while
298: }
299: }
300:
301: if (msg != null) {
302: msgdata = msg.getChannelData();
303: currentPos = 0;
304: } else {
305: throw new MessageStoreEOFException();
306: }
307: }
308: } finally {
309: stopBlockingOperation();
310: }
311: }
312: }
|