001: /* Copyright 2004 Inderjeet Singh. All rights reserved. You may not modify,
002: * use, reproduce or distribute this software except in compliance with the
003: * terms of the license at http://tcpmon.dev.java.net/
004: * $Id: StreamThread.java,v 1.4 2004/11/14 19:28:55 inder Exp $ */
005:
006: package tcpmon;
007:
008: import java.net.*;
009: import java.io.*;
010:
011: /** This class downloads data from a stream in a separate thread.
012: * @author Inderjeet Singh
013: */
014: final class StreamThread extends Thread {
015: /** configuration parameter: the time to sleep (in Millis) if the
016: * data is not present on the input stream yet. */
017: private static final int DATA_ARRIVAL_WAIT_TIME = 200;
018:
019: /** configuration parameter: the size of the data buffer. */
020: private static final int BUF_SIZE = 8072;
021:
022: private static final int MAX_NUM_RETRIES = 15;
023:
024: public StreamThread(InputStream src, OutputStream d) {
025: super ("stream-" + Debug.getUniqueId());
026: assert src != null;
027: assert d != null;
028: this .src = src;
029: this .dst = new OutputStream[1];
030: this .dst[0] = d;
031: }
032:
033: public StreamThread(InputStream src, OutputStream[] dst) {
034: super ("stream-" + Debug.getUniqueId());
035: assert src != null;
036: for (int i = 0; i < dst.length; ++i)
037: assert dst[i] != null;
038: this .src = src;
039: this .dst = dst;
040: }
041:
042: public void closeConnections() {
043: try {
044: src.close();
045: } catch (Exception e) {
046: Debug.print(e);
047: }
048: for (int i = 0; i < dst.length; ++i) {
049: try {
050: dst[i].close();
051: } catch (Exception e) {
052: Debug.print(e);
053: }
054: }
055: }
056:
057: public void run() {
058: try {
059: if (Debug.level >= Debug.MIN_DEBUG)
060: Debug.println("Starting stream thread ...");
061: int count = copyStream();
062: if (Debug.level >= Debug.FULL_DEBUG)
063: Debug.println("transferred " + count + " bytes.");
064: } catch (EOFException eofe) {
065: // Normal behavior. Ignore it.
066: } catch (NoRouteToHostException nrthe) {
067: System.err
068: .println("No route to the other end of the tunnel!");
069: } catch (IOException ioe) {
070: Debug.print(ioe);
071: } finally {
072: closeConnections();
073: if (Debug.level >= Debug.MIN_DEBUG)
074: Debug.println("Ending stream thread ...");
075: }
076: }
077:
078: private int copyStreamByteByByte() throws IOException {
079: int bytesRead = 0;
080: int tmp = 0;
081: while ((tmp = src.read()) != -1) {
082: ++bytesRead;
083: for (int i = 0; i < dst.length; ++i) {
084: dst[i].write((char) tmp);
085: }
086: }
087: return bytesRead;
088: }
089:
090: /** copy all the data present in the src to the dst. */
091: private int copyStream() throws IOException {
092:
093: byte buf[] = new byte[BUF_SIZE];
094: int bytesRead = 0;
095: int total = 0;
096: int numRetries = 0;
097: do {
098: if (src.available() == 0) {
099: if (numRetries >= MAX_NUM_RETRIES)
100: throw new IOException(
101: "StreamThread: data not available "
102: + "on the connection");
103: try {
104: Thread.currentThread().sleep(
105: DATA_ARRIVAL_WAIT_TIME, 0);
106: } catch (InterruptedException ie) {
107: Debug.print(ie);
108: }
109: ++numRetries;
110: if (Debug.level >= Debug.FULL_DEBUG)
111: Debug.println("NumRetries: " + numRetries);
112: }
113: bytesRead = src.read(buf);
114: if (bytesRead > 0) {
115: numRetries = 0;
116: for (int i = 0; i < dst.length; ++i) {
117: dst[i].write(buf, 0, bytesRead);
118: }
119: total += bytesRead;
120: }
121: } while (bytesRead != -1);
122: return total;
123: }
124:
125: private InputStream src;
126: private OutputStream[] dst;
127: }
|