001: /*
002: * Copyright 2003-2004 The Apache Software Foundation
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.apache.commons.net.telnet;
017:
018: import java.io.BufferedInputStream;
019: import java.io.IOException;
020: import java.io.InputStream;
021: import java.io.InterruptedIOException;
022:
023: /***
024: *
025: * <p>
026: *
027: * <p>
028: * <p>
029: * @author Daniel F. Savarese
030: * @author Bruno D'Avanzo
031: ***/
032:
033: final class TelnetInputStream extends BufferedInputStream implements
034: Runnable {
035: static final int _STATE_DATA = 0, _STATE_IAC = 1, _STATE_WILL = 2,
036: _STATE_WONT = 3, _STATE_DO = 4, _STATE_DONT = 5,
037: _STATE_SB = 6, _STATE_SE = 7, _STATE_CR = 8,
038: _STATE_IAC_SB = 9;
039:
040: private boolean __hasReachedEOF, __isClosed;
041: private boolean __readIsWaiting;
042: private int __receiveState, __queueHead, __queueTail,
043: __bytesAvailable;
044: private int[] __queue;
045: private TelnetClient __client;
046: private Thread __thread;
047: private IOException __ioException;
048:
049: /* TERMINAL-TYPE option (start)*/
050: private int __suboption[] = new int[256];
051: private int __suboption_count = 0;
052: /* TERMINAL-TYPE option (end)*/
053:
054: private boolean __threaded;
055:
056: TelnetInputStream(InputStream input, TelnetClient client,
057: boolean readerThread) {
058: super (input);
059: __client = client;
060: __receiveState = _STATE_DATA;
061: __isClosed = true;
062: __hasReachedEOF = false;
063: // Make it 2049, because when full, one slot will go unused, and we
064: // want a 2048 byte buffer just to have a round number (base 2 that is)
065: __queue = new int[2049];
066: __queueHead = 0;
067: __queueTail = 0;
068: __bytesAvailable = 0;
069: __ioException = null;
070: __readIsWaiting = false;
071: __threaded = false;
072: if (readerThread)
073: __thread = new Thread(this );
074: else
075: __thread = null;
076: }
077:
078: TelnetInputStream(InputStream input, TelnetClient client) {
079: this (input, client, true);
080: }
081:
082: void _start() {
083: if (__thread == null)
084: return;
085:
086: int priority;
087: __isClosed = false;
088: // Need to set a higher priority in case JVM does not use pre-emptive
089: // threads. This should prevent scheduler induced deadlock (rather than
090: // deadlock caused by a bug in this code).
091: priority = Thread.currentThread().getPriority() + 1;
092: if (priority > Thread.MAX_PRIORITY)
093: priority = Thread.MAX_PRIORITY;
094: __thread.setPriority(priority);
095: __thread.setDaemon(true);
096: __thread.start();
097: __threaded = true;
098: }
099:
100: // synchronized(__client) critical sections are to protect against
101: // TelnetOutputStream writing through the telnet client at same time
102: // as a processDo/Will/etc. command invoked from TelnetInputStream
103: // tries to write.
104: private int __read() throws IOException {
105: int ch;
106:
107: _loop: while (true) {
108: // Exit only when we reach end of stream.
109: if ((ch = super .read()) < 0)
110: return -1;
111:
112: ch = (ch & 0xff);
113:
114: /* Code Section added for supporting AYT (start)*/
115: synchronized (__client) {
116: __client._processAYTResponse();
117: }
118: /* Code Section added for supporting AYT (end)*/
119:
120: /* Code Section added for supporting spystreams (start)*/
121: __client._spyRead(ch);
122: /* Code Section added for supporting spystreams (end)*/
123:
124: _mainSwitch: switch (__receiveState) {
125:
126: case _STATE_CR:
127: if (ch == '\0') {
128: // Strip null
129: continue;
130: }
131: // How do we handle newline after cr?
132: // else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
133:
134: // Handle as normal data by falling through to _STATE_DATA case
135:
136: case _STATE_DATA:
137: if (ch == TelnetCommand.IAC) {
138: __receiveState = _STATE_IAC;
139: continue;
140: }
141:
142: if (ch == '\r') {
143: synchronized (__client) {
144: if (__client
145: ._requestedDont(TelnetOption.BINARY))
146: __receiveState = _STATE_CR;
147: else
148: __receiveState = _STATE_DATA;
149: }
150: } else
151: __receiveState = _STATE_DATA;
152: break;
153:
154: case _STATE_IAC:
155: switch (ch) {
156: case TelnetCommand.WILL:
157: __receiveState = _STATE_WILL;
158: continue;
159: case TelnetCommand.WONT:
160: __receiveState = _STATE_WONT;
161: continue;
162: case TelnetCommand.DO:
163: __receiveState = _STATE_DO;
164: continue;
165: case TelnetCommand.DONT:
166: __receiveState = _STATE_DONT;
167: continue;
168: /* TERMINAL-TYPE option (start)*/
169: case TelnetCommand.SB:
170: __suboption_count = 0;
171: __receiveState = _STATE_SB;
172: continue;
173: /* TERMINAL-TYPE option (end)*/
174: case TelnetCommand.IAC:
175: __receiveState = _STATE_DATA;
176: break;
177: default:
178: break;
179: }
180: __receiveState = _STATE_DATA;
181: continue;
182: case _STATE_WILL:
183: synchronized (__client) {
184: __client._processWill(ch);
185: __client._flushOutputStream();
186: }
187: __receiveState = _STATE_DATA;
188: continue;
189: case _STATE_WONT:
190: synchronized (__client) {
191: __client._processWont(ch);
192: __client._flushOutputStream();
193: }
194: __receiveState = _STATE_DATA;
195: continue;
196: case _STATE_DO:
197: synchronized (__client) {
198: __client._processDo(ch);
199: __client._flushOutputStream();
200: }
201: __receiveState = _STATE_DATA;
202: continue;
203: case _STATE_DONT:
204: synchronized (__client) {
205: __client._processDont(ch);
206: __client._flushOutputStream();
207: }
208: __receiveState = _STATE_DATA;
209: continue;
210: /* TERMINAL-TYPE option (start)*/
211: case _STATE_SB:
212: switch (ch) {
213: case TelnetCommand.IAC:
214: __receiveState = _STATE_IAC_SB;
215: continue;
216: default:
217: // store suboption char
218: __suboption[__suboption_count++] = ch;
219: break;
220: }
221: __receiveState = _STATE_SB;
222: continue;
223: case _STATE_IAC_SB:
224: switch (ch) {
225: case TelnetCommand.SE:
226: synchronized (__client) {
227: __client._processSuboption(__suboption,
228: __suboption_count);
229: __client._flushOutputStream();
230: }
231: __receiveState = _STATE_DATA;
232: continue;
233: default:
234: __receiveState = _STATE_SB;
235: break;
236: }
237: __receiveState = _STATE_DATA;
238: continue;
239: /* TERMINAL-TYPE option (end)*/
240: }
241:
242: break;
243: }
244:
245: return ch;
246: }
247:
248: // synchronized(__client) critical sections are to protect against
249: // TelnetOutputStream writing through the telnet client at same time
250: // as a processDo/Will/etc. command invoked from TelnetInputStream
251: // tries to write.
252: private void __processChar(int ch) throws InterruptedException {
253: // Critical section because we're altering __bytesAvailable,
254: // __queueTail, and the contents of _queue.
255: synchronized (__queue) {
256: while (__bytesAvailable >= __queue.length - 1) {
257: if (__threaded) {
258: __queue.notify();
259: try {
260: __queue.wait();
261: } catch (InterruptedException e) {
262: throw e;
263: }
264: }
265: }
266:
267: // Need to do this in case we're not full, but block on a read
268: if (__readIsWaiting && __threaded) {
269: __queue.notify();
270: }
271:
272: __queue[__queueTail] = ch;
273: ++__bytesAvailable;
274:
275: if (++__queueTail >= __queue.length)
276: __queueTail = 0;
277: }
278: }
279:
280: public int read() throws IOException {
281: // Critical section because we're altering __bytesAvailable,
282: // __queueHead, and the contents of _queue in addition to
283: // testing value of __hasReachedEOF.
284: synchronized (__queue) {
285:
286: while (true) {
287: if (__ioException != null) {
288: IOException e;
289: e = __ioException;
290: __ioException = null;
291: throw e;
292: }
293:
294: if (__bytesAvailable == 0) {
295: // Return -1 if at end of file
296: if (__hasReachedEOF)
297: return -1;
298:
299: // Otherwise, we have to wait for queue to get something
300: if (__threaded) {
301: __queue.notify();
302: try {
303: __readIsWaiting = true;
304: __queue.wait();
305: __readIsWaiting = false;
306: } catch (InterruptedException e) {
307: throw new IOException(
308: "Fatal thread interruption during read.");
309: }
310: } else {
311: //__alreadyread = false;
312: __readIsWaiting = true;
313: int ch;
314:
315: do {
316: try {
317: if ((ch = __read()) < 0)
318: if (ch != -2)
319: return (ch);
320: } catch (InterruptedIOException e) {
321: synchronized (__queue) {
322: __ioException = e;
323: __queue.notifyAll();
324: try {
325: __queue.wait(100);
326: } catch (InterruptedException interrupted) {
327: }
328: }
329: return (-1);
330: }
331:
332: try {
333: if (ch != -2) {
334: __processChar(ch);
335: }
336: } catch (InterruptedException e) {
337: if (__isClosed)
338: return (-1);
339: }
340: } while (super .available() > 0);
341:
342: __readIsWaiting = false;
343: }
344: continue;
345: } else {
346: int ch;
347:
348: ch = __queue[__queueHead];
349:
350: if (++__queueHead >= __queue.length)
351: __queueHead = 0;
352:
353: --__bytesAvailable;
354:
355: // Need to explicitly notify() so available() works properly
356: if (__bytesAvailable == 0 && __threaded) {
357: __queue.notify();
358: }
359:
360: return ch;
361: }
362: }
363: }
364: }
365:
366: /***
367: * Reads the next number of bytes from the stream into an array and
368: * returns the number of bytes read. Returns -1 if the end of the
369: * stream has been reached.
370: * <p>
371: * @param buffer The byte array in which to store the data.
372: * @return The number of bytes read. Returns -1 if the
373: * end of the message has been reached.
374: * @exception IOException If an error occurs in reading the underlying
375: * stream.
376: ***/
377: public int read(byte buffer[]) throws IOException {
378: return read(buffer, 0, buffer.length);
379: }
380:
381: /***
382: * Reads the next number of bytes from the stream into an array and returns
383: * the number of bytes read. Returns -1 if the end of the
384: * message has been reached. The characters are stored in the array
385: * starting from the given offset and up to the length specified.
386: * <p>
387: * @param buffer The byte array in which to store the data.
388: * @param offset The offset into the array at which to start storing data.
389: * @param length The number of bytes to read.
390: * @return The number of bytes read. Returns -1 if the
391: * end of the stream has been reached.
392: * @exception IOException If an error occurs while reading the underlying
393: * stream.
394: ***/
395: public int read(byte buffer[], int offset, int length)
396: throws IOException {
397: int ch, off;
398:
399: if (length < 1)
400: return 0;
401:
402: // Critical section because run() may change __bytesAvailable
403: synchronized (__queue) {
404: if (length > __bytesAvailable)
405: length = __bytesAvailable;
406: }
407:
408: if ((ch = read()) == -1)
409: return -1;
410:
411: off = offset;
412:
413: do {
414: buffer[offset++] = (byte) ch;
415: } while (--length > 0 && (ch = read()) != -1);
416:
417: //__client._spyRead(buffer, off, offset - off);
418: return (offset - off);
419: }
420:
421: /*** Returns false. Mark is not supported. ***/
422: public boolean markSupported() {
423: return false;
424: }
425:
426: public int available() throws IOException {
427: // Critical section because run() may change __bytesAvailable
428: synchronized (__queue) {
429: return __bytesAvailable;
430: }
431: }
432:
433: // Cannot be synchronized. Will cause deadlock if run() is blocked
434: // in read because BufferedInputStream read() is synchronized.
435: public void close() throws IOException {
436: // Completely disregard the fact thread may still be running.
437: // We can't afford to block on this close by waiting for
438: // thread to terminate because few if any JVM's will actually
439: // interrupt a system read() from the interrupt() method.
440: super .close();
441:
442: synchronized (__queue) {
443: __hasReachedEOF = true;
444: __isClosed = true;
445:
446: if (__thread != null && __thread.isAlive()) {
447: __thread.interrupt();
448: }
449:
450: __queue.notifyAll();
451: }
452:
453: __threaded = false;
454: }
455:
456: public void run() {
457: int ch;
458:
459: try {
460: _outerLoop: while (!__isClosed) {
461: try {
462: if ((ch = __read()) < 0)
463: break;
464: } catch (InterruptedIOException e) {
465: synchronized (__queue) {
466: __ioException = e;
467: __queue.notifyAll();
468: try {
469: __queue.wait(100);
470: } catch (InterruptedException interrupted) {
471: if (__isClosed)
472: break _outerLoop;
473: }
474: continue;
475: }
476: } catch (RuntimeException re) {
477: // We treat any runtime exceptions as though the
478: // stream has been closed. We close the
479: // underlying stream just to be sure.
480: super .close();
481: // Breaking the loop has the effect of setting
482: // the state to closed at the end of the method.
483: break _outerLoop;
484: }
485:
486: try {
487: __processChar(ch);
488: } catch (InterruptedException e) {
489: if (__isClosed)
490: break _outerLoop;
491: }
492: }
493: } catch (IOException ioe) {
494: synchronized (__queue) {
495: __ioException = ioe;
496: }
497: }
498:
499: synchronized (__queue) {
500: __isClosed = true; // Possibly redundant
501: __hasReachedEOF = true;
502: __queue.notify();
503: }
504:
505: __threaded = false;
506: }
507: }
508:
509: /* Emacs configuration
510: * Local variables: **
511: * mode: java **
512: * c-basic-offset: 4 **
513: * indent-tabs-mode: nil **
514: * End: **
515: */
|