001: // ActiveStream.java
002: // $Id: ActiveStream.java,v 1.29 2004/11/04 16:28:33 ylafon Exp $
003: // (c) COPYRIGHT MIT and INRIA, 1996.
004: // Please first read the full copyright statement in file COPYRIGHT.html
005:
006: package org.w3c.www.protocol.http.cache;
007:
008: import java.io.IOException;
009: import java.io.InputStream;
010: import java.io.OutputStream;
011:
012: import org.w3c.util.ThreadCache;
013:
014: import org.w3c.www.protocol.http.Request;
015: import org.w3c.www.protocol.http.Reply;
016:
017: class ActiveInputStream extends InputStream {
018: private static final int buflen = 2048;
019: private static final int halflen = (buflen / 2);
020:
021: byte buffer[] = new byte[2048];
022: int off = 0;
023: int len = 0;
024:
025: boolean closed = false;
026: boolean interrupted = false;
027:
028: private synchronized void waitForInput() throws IOException {
029: while (true) {
030: // Interruption of some sort ?
031: if (interrupted)
032: throw new IOException("Broken active pipe.");
033: int avail = len - off;
034: if (closed || (avail > 0))
035: return;
036: // Wait for something to happen:
037: try {
038: wait();
039: } catch (InterruptedException ex) {
040: }
041: }
042: }
043:
044: /**
045: * We wait for half buffer size to be available before pushing data.
046: * This is to prevent a silly window syndrom problem, where the pusher
047: * and the puller would exachnge single bytes of data.
048: */
049:
050: public synchronized void receive(byte buf[], int boff, int blen)
051: throws IOException {
052: // Push all data:
053: while (boff < blen) {
054: // Has this stream been closed ?
055: if (closed)
056: throw new IOException("Write to closed stream.");
057: // Push data:
058: int space = buffer.length - len;
059: if ((space >= (blen - boff)) || (space > halflen)) {
060: int push = Math.min(blen - boff, space);
061: System.arraycopy(buf, boff, buffer, len, push);
062: len += push;
063: boff += push;
064: notifyAll();
065: } else {
066: try {
067: wait();
068: } catch (InterruptedException ex) {
069: }
070: }
071: }
072: }
073:
074: public synchronized void close() {
075: closed = true;
076: notifyAll();
077: }
078:
079: public synchronized void interrupt() {
080: interrupted = true;
081: closed = true;
082: notifyAll();
083: }
084:
085: public synchronized int read() throws IOException {
086: waitForInput();
087: // Have we reached end of stream ?
088: if (closed && ((len - off) == 0))
089: return -1;
090: // Read as quickly as possible:
091: int b = (buffer[off++] & 0xff);
092: if (off >= len) {
093: off = 0;
094: len = 0;
095: notifyAll();
096: }
097: return b;
098: }
099:
100: public synchronized int read(byte to[], int toff, int tlen)
101: throws IOException {
102: waitForInput();
103: // Check for exhausted stream:
104: int avail = len - off;
105: if (closed && (avail == 0)) {
106: return -1;
107: }
108: // Send the appropriate stuff:
109: if (tlen >= avail) {
110: int snd = avail;
111: System.arraycopy(buffer, off, to, toff, avail);
112: off = 0;
113: len = 0;
114: notifyAll();
115: return snd;
116: } else {
117: System.arraycopy(buffer, off, to, toff, tlen);
118: if ((off += tlen) > halflen) {
119: // Shift buffer:
120: System.arraycopy(buffer, off, buffer, 0, len - off);
121: len -= off;
122: off = 0;
123: notifyAll();
124: }
125: return tlen;
126: }
127: }
128:
129: public synchronized int available() {
130: return (closed || interrupted) ? -1 : len - off;
131: }
132:
133: }
134:
135: /**
136: * ActiveStream is used to tee a stream to the client, while caching it.
137: * This class basically mimics the piped streams provided in the java library
138: * in a more efficient manner (well, sort of).
139: * <p>If any error occurs while writing data back to the client, then the
140: * active thread finishes it works, but only streaming data into the sink,
141: */
142:
143: public class ActiveStream implements Runnable {
144: private static ThreadCache threadcache = null;
145:
146: ActiveInputStream pout = null;
147: boolean poutClosed = false;
148: InputStream src = null;
149: boolean srcClosed = false;
150: OutputStream dst = null;
151: boolean dstClosed = false;
152: TeeMonitor monitor = null;
153:
154: public void run() {
155:
156: byte buffer[] = new byte[2048];
157: int chunksz = 256;
158: boolean notified = false;
159: int total = 0;
160:
161: try {
162: int count = 0;
163: while ((count = src.read(buffer, 0, chunksz)) >= 0) {
164: // Try to write to the pipe, if still valid:
165: if (!poutClosed) {
166: try {
167: pout.receive(buffer, 0, count);
168: } catch (IOException ex) {
169: try {
170: pout.close();
171: } catch (Exception e) {
172: }
173: poutClosed = true;
174: }
175: }
176: // Always write to destination:
177: dst.write(buffer, 0, count);
178: total += count;
179: // Increment the chunk size, for improved performance:
180: chunksz = Math.min(buffer.length, (chunksz * 2));
181: }
182: src.close();
183: srcClosed = true;
184: dst.close();
185: dstClosed = true;
186: if (!poutClosed) {
187: pout.close();
188: poutClosed = true;
189: }
190: monitor.notifyTeeSuccess(total);
191: notified = true;
192: } catch (IOException ex) {
193: ex.printStackTrace();
194: try {
195: monitor.notifyTeeFailure(total);
196: } catch (Exception nex) {
197: // a duplicate notifyTeeFailure
198: }
199: notified = true;
200: } finally {
201: if (!srcClosed) {
202: try {
203: src.close();
204: } catch (Exception ex) {
205: }
206: srcClosed = true;
207: }
208: if (!dstClosed) {
209: try {
210: dst.close();
211: } catch (Exception ex) {
212: }
213: dstClosed = true;
214: }
215: if (!poutClosed) {
216: try {
217: pout.interrupt();
218: } catch (Exception ex) {
219: }
220: poutClosed = true;
221: }
222: if (!notified)
223: monitor.notifyTeeFailure(total);
224: }
225: }
226:
227: public static InputStream createTee(TeeMonitor monitor,
228: InputStream src, OutputStream dst) {
229: // Allocate a new tee stream:
230: ActiveStream tee = new ActiveStream();
231: tee.monitor = monitor;
232: tee.pout = new ActiveInputStream();
233: tee.src = src;
234: tee.dst = dst;
235: // Allocate a thread for this tee stream:
236: if (!threadcache.getThread(tee, false)) {
237: return null;
238: } else {
239: return tee.pout;
240: }
241: }
242:
243: public static synchronized void initialize() {
244: if (threadcache == null) {
245: threadcache = new ThreadCache("active-streams");
246: threadcache.setCachesize(10);
247: threadcache.initialize();
248: }
249: }
250:
251: ActiveStream() {
252: }
253: }
|