001: // MuxOutputStream.java
002: // $Id: MuxOutputStream.java,v 1.9 2000/08/16 21:38:01 ylafon Exp $
003: // (c) COPYRIGHT MIT and INRIA, 1996.
004: // Please first read the full copyright statement in file COPYRIGHT.html
005:
006: package org.w3c.www.mux;
007:
008: import java.io.IOException;
009: import java.io.OutputStream;
010: import java.io.PrintStream;
011:
012: public class MuxOutputStream extends OutputStream {
013: protected static final boolean debug = false;
014:
015: /**
016: * The session this stream is attached to.
017: */
018: protected MuxSession session = null;
019: /**
020: * The identifier of above session (fast access).
021: */
022: protected int sessid = -1;
023: /**
024: * The writer instance for the multiplexed stream.
025: */
026: protected MuxWriter writer = null;
027: /**
028: * The current max allowed fragment size.
029: */
030: protected int fragsz = MUX.SENDER_DEFAULT_FRAGMENT_SIZE;
031: /**
032: * The currently available credit.
033: */
034: protected int avail_credit = MUX.SENDER_DEFAULT_CREDIT;
035: /**
036: * Has this stream been closed ?
037: */
038: protected boolean closed = false;
039:
040: /**
041: * Callback notifying that more credit is available for that stream.
042: * @param credit The credit we are getting from our peer.
043: */
044:
045: protected synchronized void notifyCredit(int credit) {
046: if (debug)
047: System.out.println("> notifyCredit[" + sessid + "]: "
048: + credit);
049: avail_credit += credit;
050: notifyAll();
051: }
052:
053: /**
054: * Callback notifying the the frgament size has changed.
055: * @param control The new fragment size.
056: */
057:
058: protected synchronized void notifyControl(int control) {
059: if (debug)
060: System.out.println("notifyControl: " + control);
061: fragsz = control;
062: }
063:
064: /**
065: * Emit the given data on current session.
066: * @param b The buffer containing the data to be emitted.
067: * @param off Offset of data within above buffer.
068: * @param len Length of data to be written,
069: */
070:
071: private synchronized void send(byte b[], int off, int len)
072: throws IOException {
073: // Otherwise perform:
074: while (len > 0) {
075: // Make sure we have some remaining credit:
076: while (avail_credit <= 0) {
077: // If closed, trigger an error:
078: if (closed)
079: throw new IOException("Broken pipe");
080: writer.flush();
081: try {
082: wait();
083: } catch (InterruptedException ex) {
084: throw new IOException("Interrupted IO !");
085: }
086: }
087: // Chunk (if needed) until all available credit has been consumed
088: while (avail_credit > 0) {
089: if (fragsz <= 0) {
090: int sz = Math.min(avail_credit, len);
091: writer.writeData(sessid, b, off, sz);
092: len -= sz;
093: off += sz;
094: avail_credit -= sz;
095: } else if (len < fragsz) {
096: // No fragmentation needed, we can sink all our data:
097: writer.writeData(sessid, b, off, len);
098: avail_credit -= len;
099: return;
100: } else {
101: // Emit only one single chunk:
102: writer.writeData(sessid, b, off, fragsz);
103: len -= fragsz;
104: off += fragsz;
105: avail_credit -= fragsz;
106: }
107: }
108: }
109: }
110:
111: /**
112: * Writes a byte. This method will block until the byte is actually
113: * written.
114: * It is <em>highly</em> recomended that you use a buffered output
115: * stream on top of that stream, or that you don't use that method.
116: * @param b the byte
117: * @exception IOException If an I/O error has occurred.
118: */
119:
120: public void write(int b) throws IOException {
121: byte bits[] = new byte[1];
122: bits[0] = (byte) (b & 0xff);
123: write(bits, 0, 1);
124: }
125:
126: /**
127: * Writes a sub array of bytes.
128: * @param b the data to be written
129: * @param off the start offset in the data
130: * @param len the number of bytes that are written
131: * @exception IOException If an I/O error has occurred.
132: */
133:
134: public void write(byte b[], int off, int len) throws IOException {
135: send(b, off, len);
136: }
137:
138: /**
139: * Flush that output stream, blocking all data has been sent.
140: * @exception IOException If some IO errors occur.
141: */
142:
143: public void flush() throws IOException {
144: writer.flush();
145: }
146:
147: /**
148: * Close that session output stream.
149: * @exception IOException If some IO errors occur.
150: */
151:
152: public synchronized void close() throws IOException {
153: if (closed)
154: return;
155: closed = true;
156: session.sendFIN();
157: notifyAll();
158: return;
159: }
160:
161: protected MuxOutputStream(MuxSession session) {
162: this.session = session;
163: this.sessid = session.getIdentifier();
164: this.writer = session.getMuxStream().getMuxWriter();
165: }
166:
167: }
|