001: // Copyright (C) 2003,2004,2005 by Object Mentor, Inc. All rights reserved.
002: // Released under the terms of the GNU General Public License version 2 or later.
003: package fitnesse.components;
004:
005: import java.io.*;
006: import java.net.Socket;
007: import fit.Counts;
008: import fitnesse.util.StreamReader;
009:
010: public class FitClient {
011:
012: protected FitClientListener listener;
013:
014: protected Socket fitSocket;
015:
016: private OutputStream fitInput;
017:
018: private StreamReader fitOutput;
019:
020: private volatile int sent = 0;
021:
022: private volatile int received = 0;
023:
024: private volatile boolean isDoneSending = false;
025:
026: protected volatile boolean killed = false;
027:
028: protected Thread fitListeningThread;
029:
030: public FitClient(FitClientListener listener) throws Exception {
031: this .listener = listener;
032: }
033:
034: public void acceptSocket(Socket socket) throws Exception {
035: checkForPulse();
036: fitSocket = socket;
037: fitInput = fitSocket.getOutputStream();
038: FitProtocol.writeData("", fitInput);
039: fitOutput = new StreamReader(fitSocket.getInputStream());
040:
041: fitListeningThread = new Thread(new FitListeningRunnable(),
042: "FitClient fitOutput");
043: fitListeningThread.start();
044: }
045:
046: public void send(String data) throws Exception {
047: checkForPulse();
048: FitProtocol.writeData(data, fitInput);
049: sent++;
050: }
051:
052: public void done() throws Exception {
053: checkForPulse();
054: FitProtocol.writeSize(0, fitInput);
055: isDoneSending = true;
056: }
057:
058: public void join() throws Exception {
059: if (fitListeningThread != null)
060: fitListeningThread.join();
061: }
062:
063: public void kill() throws Exception {
064: killed = true;
065: if (fitListeningThread != null)
066: fitListeningThread.interrupt();
067: }
068:
069: public void exceptionOccurred(Exception e) {
070: listener.exceptionOccurred(e);
071: }
072:
073: protected void checkForPulse() throws InterruptedException {
074: if (killed)
075: throw new InterruptedException("FitClient was killed");
076: }
077:
078: void listenToFit() {
079: try {
080: while (!finishedReading()) {
081: int size;
082: size = FitProtocol.readSize(fitOutput);
083: if (size != 0) {
084: String readValue = fitOutput.read(size);
085: if (fitOutput.byteCount() < size)
086: throw new Exception("I was expecting " + size
087: + " bytes but I only got "
088: + fitOutput.byteCount());
089: listener.acceptOutput(readValue);
090: } else {
091: Counts counts = FitProtocol.readCounts(fitOutput);
092: listener.acceptResults(counts);
093: received++;
094: }
095: }
096: } catch (Exception e) {
097: exceptionOccurred(e);
098: }
099: }
100:
101: private boolean finishedReading() {
102: while (stateIndeterminate())
103: shortSleep();
104: return isDoneSending && received == sent;
105: }
106:
107: /**
108: * @return true if the current state of the transission is indeterminate.
109: *
110: * When the number of pages sent and recieved is the same, we may be done
111: * with the whole job, or we may just be waiting for FitNesse to send the
112: * next page. There's no way to know until FitNesse either calls send, or
113: * done.
114: */
115: private boolean stateIndeterminate() {
116: return (received == sent) && !isDoneSending;
117: }
118:
119: private void shortSleep() {
120: try {
121: Thread.sleep(10);
122: } catch (InterruptedException e) {
123: e.printStackTrace();
124: }
125: }
126:
127: class FitListeningRunnable implements Runnable {
128: public void run() {
129: listenToFit();
130: }
131: }
132: }
|