001: // MuxInputStream.java
002: // $Id: MuxInputStream.java,v 1.13 2000/08/16 21:38:01 ylafon Exp $
003: // (c) COPYRIGHT MIT and INRIA, 1996.
004: // Please first read the full copyright statement in file COPYRIGHT.html
005:
006: package org.w3c.www.mux;
007:
008: import java.io.IOException;
009: import java.io.InputStream;
010: import java.io.PrintStream;
011:
012: public class MuxInputStream extends InputStream implements MUX {
013: /**
014: * Debug flags - debug the push method.
015: */
016: private static final boolean debugPush = false;
017:
018: /**
019: * The MuxSession instance this input stream is attached to.
020: */
021: protected MuxSession session = null;
022: /**
023: * A quick reference to that session writer.
024: */
025: protected MuxWriter writer = null;
026: /**
027: * This input stream associated buffer.
028: */
029: protected byte buffer[] = null;
030: /**
031: * The current buffer position.
032: */
033: protected int bufptr = -1;
034: /**
035: * The current mark within the input buffer, or <strong>-1</strong>.
036: */
037: protected int markptr = -1;
038: /**
039: * The current buffer length.
040: */
041: protected int buflen = -1;
042: /**
043: * Has this stream been closed ?
044: */
045: protected boolean closed = false;
046: /**
047: * Currently consumed credits.
048: */
049: protected int consumed_credit = 0;
050: /**
051: * Current available credit on that session.
052: */
053: protected int avail_credit = MUX.RECEIVER_DEFAULT_CREDIT;
054: /**
055: * Yet another push is pending from the reader thread.
056: */
057: protected boolean pushpending = false;
058: /**
059: * Error message in case of error.
060: */
061: protected String errmsg = null;
062:
063: private void addCredit(int consumed) throws IOException {
064: consumed_credit += consumed;
065: if (consumed_credit > (avail_credit >> 1)) {
066: // Send more credit:
067: writer.ctrlSendCredit(session.getIdentifier(),
068: consumed_credit);
069: writer.flush();
070: consumed_credit = 0;
071: }
072: notifyAll();
073: }
074:
075: /**
076: * Fill in that input stream with more data.
077: * This method can only be called from within that package, typically
078: * by the session reader, to fill in the buffer.
079: * @param data The data read from the socket.
080: * @param off Offset of available data within above buffer.
081: * @param len Length of available data within above buffer.
082: * @param noflush Set to <strong>true</strong> if there is already more
083: * data available for that session.
084: */
085:
086: protected synchronized void push(byte data[], int off, int len,
087: boolean noflush) throws IOException {
088: if (debugPush)
089: System.out.println("MuxReader[push]: " + len + " bytes"
090: + ", noflush=" + noflush);
091: // If that stream was closed in the mean time, discard data:
092: if (closed) {
093: // FIXME this would be the place to send a RESET:
094: return;
095: }
096: // Otherwise, do the job, until all data has been accepted:
097: int bufpos = -1;
098: int avail = -1;
099: while (len > 0) {
100: bufpos = bufptr + buflen;
101: avail = buffer.length - bufpos;
102: // Should we shift the buffer now ?
103: if ((avail < len) && (bufptr > 0)) {
104: System.arraycopy(buffer, bufptr, buffer, 0, buflen);
105: if (markptr >= 0)
106: markptr = (markptr >= bufptr) ? markptr - bufptr
107: : -1;
108: bufptr = 0;
109: bufpos = buflen;
110: avail = buffer.length - bufpos;
111: }
112: // Accept as many bytes as possible, until we are done
113: // Wait for some available space in buffer:
114: if (debugPush)
115: System.out.println("push: " + len + " bytes, avail="
116: + avail);
117: while (avail <= 0) {
118: if (buflen > 0)
119: notifyAll();
120: try {
121: wait();
122: } catch (InterruptedException ex) {
123: throw new IOException("Interrupted read");
124: }
125: if (buflen != 0) {
126: bufpos = bufptr + buflen;
127: avail = buffer.length - bufpos;
128: } else {
129: bufpos = 0;
130: bufptr = 0;
131: buflen = 0;
132: avail = buffer.length;
133: }
134: }
135: // Flush data into available buffer space, and notify any reader:
136: if (len < avail) {
137: System.arraycopy(data, off, buffer, bufpos, len);
138: buflen += len;
139: len = 0;
140: } else {
141: System.arraycopy(data, off, buffer, bufpos, avail);
142: off += avail;
143: len -= avail;
144: buflen += avail;
145: }
146: if (!(pushpending = noflush))
147: notifyAll();
148: if (debugPush)
149: System.out.println("push: " + buflen
150: + " total bytes pushed.");
151: }
152: }
153:
154: /**
155: * Set a mark in that input stream.
156: * @param readlimit The maximum limit of bytes allowed to be read
157: * before the mark becomes invalid.
158: */
159:
160: public synchronized void mark(int readlimit) {
161: markptr = bufptr;
162: }
163:
164: /**
165: * Reset buffer to last mark.
166: * @exception IOException If the mark has not been set, or if it is no
167: * longer valid.
168: */
169:
170: public synchronized void reset() throws IOException {
171: if (markptr <= bufptr) {
172: buflen += (bufptr - markptr);
173: bufptr = markptr;
174: } else {
175: throw new IOException("invalid mark.");
176: }
177: }
178:
179: /**
180: * Notify that stream of some error condition.
181: * When an error condition is detected, all read accesses to the stream
182: * will result in an IOException being thrown, with as a message, the
183: * message provided here.
184: * @param msg Error message to be provided in any future IOException.
185: */
186:
187: protected synchronized void error(String msg) {
188: errmsg = msg;
189: try {
190: close();
191: } catch (IOException ex) {
192: }
193: }
194:
195: /**
196: * Get the number of available bytes on that stream.
197: * @return Number of bytes available.
198: */
199:
200: public synchronized int available() throws IOException {
201: if (closed) {
202: if (errmsg != null)
203: throw new IOException(errmsg);
204: return -1;
205: }
206: return buflen;
207: }
208:
209: /**
210: * Close that input stream.
211: * @exception IOException If some IO error occured during close.
212: */
213:
214: public synchronized void close() throws IOException {
215: closed = true;
216: pushpending = false;
217: notifyAll();
218: }
219:
220: /**
221: * Read one byte of input from the stream.
222: * @return The byte read, or <strong>-1</strong> if end of stream.
223: * @exception IOException If an IO error has occured.
224: */
225:
226: public synchronized int read() throws IOException {
227: while (true) {
228: // Always send back available data:
229: if (buflen > 0) {
230: byte ch = buffer[bufptr++];
231: buflen--;
232: addCredit(1);
233: return ((int) ch) & 0xff;
234: }
235: // If closed, throw an IOException:
236: if (closed) {
237: if (errmsg != null)
238: throw new IOException(errmsg);
239: return -1;
240: }
241: // Wait for this session's buffer to be filled by the reader
242: try {
243: wait();
244: } catch (InterruptedException ex) {
245: throw new IOException("Interrupted read.");
246: }
247: }
248: }
249:
250: /**
251: * Reads into an array of bytes. This method will
252: * block until some input is available.
253: * @param b the buffer into which the data is read
254: * @param off the start offset of the data
255: * @param len the maximum number of bytes read
256: * @return the actual number of bytes read, -1 is
257: * returned when the end of the stream is reached.
258: * @exception IOException If an I/O error has occurred.
259: */
260:
261: public synchronized int read(byte b[], int off, int len)
262: throws IOException {
263: while (true) {
264: if ((buflen > 0) && !pushpending) {
265: // Send back that output straight
266: int size = Math.min(len, buflen);
267: System.arraycopy(buffer, bufptr, b, off, size);
268: bufptr += size;
269: buflen -= size;
270: // Notify the reader thread if it is waiting for buffer space
271: addCredit(size);
272: return size;
273: }
274: // If closed, throw an IOException
275: if (closed) {
276: if (errmsg != null)
277: throw new IOException(errmsg);
278: return -1;
279: }
280: // Wait for this session's buffer to be filled by the reader
281: try {
282: wait();
283: } catch (InterruptedException ex) {
284: throw new IOException("Interrupted read.");
285: }
286: }
287: }
288:
289: /**
290: * Create an input stream to read data from the given session.
291: * @param session The session to read data from.
292: */
293:
294: protected MuxInputStream(MuxSession session) {
295: this .session = session;
296: this .writer = session.getMuxStream().getMuxWriter();
297: this .buffer = new byte[session.getInputBufferSize()];
298: this .bufptr = 0;
299: this .buflen = 0;
300: }
301:
302: }
|