001: // MuxWriter.java
002: // $Id: MuxWriter.java,v 1.9 2000/08/16 21:38:01 ylafon Exp $
003: // (c) COPYRIGHT MIT and INRIA, 1996.
004: // Please first read the full copyright statement in file COPYRIGHT.html
005:
006: package org.w3c.www.mux;
007:
008: import java.io.IOException;
009: import java.io.OutputStream;
010: import java.io.PrintStream;
011:
012: /**
013: * This class is dumb. It does no control flow, and nothing clever, just
014: * emit appropriate MUX headers before sending some data.
015: * <p>The flow control is handled on a per session basis, by both the
016: * MuxSession class, and the MuxOutputStream class.
017: * @see MuxSession
018: * @see MuxOutputStream
019: */
020:
021: class MuxWriter implements MUX {
022: private static final byte padbytes[] = { (byte) 0, (byte) 0,
023: (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0 };
024: private static final boolean debug = true;
025: /**
026: * The MUX Stream this writer is working for.
027: */
028: protected MuxStream stream = null;
029: /**
030: * The output buffer.
031: */
032: protected byte buffer[] = new byte[MUX.WRITER_BUFFER_SIZE];
033: /**
034: * The current buffer size.
035: */
036: protected int buflen = 0;
037: /**
038: * The current buffer pointer.
039: */
040: protected int bufptr = 0;
041: /**
042: * The output stream to write data to.
043: */
044: protected OutputStream out = null;
045:
046: /**
047: * Can we get this capacity from our buffer.
048: * <p>The caller is responsible to synchronize access to that method.
049: * Make best effort (ie flush) to try getting the requested capacity. If
050: * success, than return <strong>true</strong> otherwise, return
051: * <strong>false</strong>.
052: * @param capacity Requested capacity.
053: * @return A boolean <strong>true</strong if requested capacity is
054: * available.
055: * @exception IOException If flushing the buffer trigered some IO errors.
056: */
057:
058: private boolean ensureCapacity(int capacity) throws IOException {
059: if (bufptr + buflen + capacity < buffer.length) {
060: return true;
061: } else if (buffer.length < capacity) {
062: flush();
063: return true;
064: } else {
065: return false;
066: }
067: }
068:
069: /**
070: * Encode a word (little endian)
071: * <p>The caller is responsible to synchronize access to that method.
072: * <p>The caller is assumed to make sure the required capacity is
073: * available.
074: * @param word The word to encode.
075: */
076:
077: private final void encodeWord(int word) {
078: int pos = bufptr + buflen;
079: buffer[pos++] = (byte) (word & 0x000000ff);
080: buffer[pos++] = (byte) ((word & 0x0000ff00) >> 8);
081: buffer[pos++] = (byte) ((word & 0x00ff0000) >> 16);
082: buffer[pos++] = (byte) ((word & 0xff000000) >> 24);
083: buflen += 4;
084: if (debug)
085: System.out.println("[encodeWord] 0x"
086: + Integer.toString(buffer[bufptr], 16)
087: + Integer.toString(buffer[bufptr + 1], 16)
088: + Integer.toString(buffer[bufptr + 2], 16)
089: + Integer.toString(buffer[bufptr + 3], 16));
090:
091: }
092:
093: /**
094: * Encode a short (little endian)
095: * <p>The caller is responsible to synchronize access to that method.
096: * <p>The caller is assumed to make sure the required capacity is
097: * available.
098: * @param s The short to encode.
099: */
100:
101: private final void encodeShort(short s) {
102: int pos = bufptr + buflen;
103: buffer[pos++] = (byte) (s & 0x00ff);
104: buffer[pos++] = (byte) ((s & 0xff00) >> 8);
105: buflen += 2;
106: }
107:
108: /**
109: * Encode a small message.
110: * <p>The caller is responsible to synchronize access to that method.
111: * @param flags The header flags.
112: * @param session The session.
113: * @param len The message length.
114: * @param into Target buffer.
115: * @param dst Target buffer position.
116: */
117:
118: private final void encodeMessage(int flags, int sessid, int length)
119: throws IOException {
120: ensureCapacity(4);
121: int word = (flags | ((sessid & 0xff) << 18) | length);
122: if (debug)
123: System.out.println("sending h="
124: + Integer.toString(word, 16));
125: encodeWord(word);
126: }
127:
128: /**
129: * Encode a big message.
130: * <p>The caller is responsible to synchronize access to that method.
131: * @param flags The header flags.
132: * @param session The session identifier.
133: * @param protocol The protocol identifier.
134: * @param len The message length.
135: */
136:
137: private final void encodeLongMessage(int flags, int sessid,
138: int protocol, int length) throws IOException {
139: ensureCapacity(8);
140: int word = (flags | ((sessid & 0xff) << 18) | protocol);
141: if (debug)
142: System.out.println("sending h="
143: + Integer.toString(word, 16) + ", l="
144: + Integer.toString(length, 16));
145: encodeWord(word);
146: encodeWord(length);
147: }
148:
149: /**
150: * Emit the given chunk of data.
151: * <p>The caller is responsible to synchronize access to that method.
152: * <p>The caller is reponsible for having emitted the right header
153: * before actually emitting that data.
154: */
155:
156: private final void emitData(byte data[], int off, int len)
157: throws IOException {
158: if (len <= 0)
159: return;
160: if (ensureCapacity(len)) {
161: // Just add to buffer:
162: System.arraycopy(data, off, buffer, bufptr + buflen, len);
163: buflen += len;
164: } else {
165: // Write through:
166: flush();
167: out.write(data, off, len);
168: }
169: }
170:
171: /**
172: * Emit the given String.
173: * <p>The caller is responsible to synchronize access to that method.
174: * <p>The caller is reponsible for having emitted the right header
175: * @param str The String to be emitted.
176: * @param len Length of the String (or <strong>-1</strong> if not computed
177: * yet).
178: */
179:
180: private final void emitData(String str, int len) throws IOException {
181: if (len < 0)
182: len = str.length();
183: if (!ensureCapacity(len))
184: // FIXME
185: throw new RuntimeException(
186: "String to big to hold in buffer !");
187: str.getBytes(0, len, buffer, bufptr + buflen);
188: buflen += len;
189: }
190:
191: /**
192: * Emit the given integer array as a short array (little endian).
193: * <p>The caller is responsible to synchronize access to that method.
194: * <p>The caller is reponsible for having emitted the right header
195: * @param a The array of int to be encoded as an array of shorts.
196: */
197:
198: private final void emitShortArray(int a[]) throws IOException {
199: if (!ensureCapacity(a.length << 1))
200: // FIXME
201: throw new RuntimeException(
202: "Array to bug to hold in buffer.");
203: for (int i = 0; i < a.length; i++)
204: encodeShort((short) (a[i] & 0xffff));
205: }
206:
207: /**
208: * Shutdown the writer.
209: */
210:
211: protected synchronized void shutdown() {
212: buffer = null;
213: }
214:
215: /**
216: * Flush current output buffer.
217: */
218:
219: protected synchronized void flush() throws IOException {
220: if (buflen > 0) {
221: out.write(buffer, bufptr, buflen);
222: bufptr = 0;
223: buflen = 0;
224: }
225: }
226:
227: /**
228: * Write one message of output.
229: * @param sessid The session identifier.
230: * @param flags The flags of that message.
231: * @param protid The protocol identifier.
232: * @param b The buffer containing the data to write.
233: * @param o Offset of data within above buffer.
234: * @param l Length of data to be written.
235: */
236:
237: protected synchronized void writeMessage(int sessid, int flags,
238: int protocol, byte b[], int o, int l) throws IOException {
239: encodeLongMessage(flags, sessid, protocol, l);
240: emitData(b, o, l);
241: }
242:
243: /**
244: * Write one message of output.
245: * @param sessid The session identifier.
246: * @param flags The flags of that message.
247: * @param protid The protocol identifier.
248: */
249:
250: protected synchronized void writeMessage(int sessid, int flags,
251: int protocol) throws IOException {
252: encodeMessage(flags, sessid, protocol);
253: }
254:
255: /**
256: * Short cut to write data on a given session.
257: * @param sessid The session to write data to.
258: * @param b The buffer containing the data to be written.
259: * @param o Offset of data within above buffer.
260: * @param l Length of data to be written.
261: */
262:
263: protected synchronized void writeData(int sessid, byte b[], int o,
264: int l) throws IOException {
265: encodeMessage(0, sessid, l);
266: if (l > 0) {
267: // Emit raw data first:
268: emitData(b, o, l);
269: // Emit padding bytes as needed:
270: int padlen = ((l & 0x3) != 0) ? (4 - (l & 0x3)) : 0;
271: if (padlen != 0)
272: emitData(padbytes, 0, padlen);
273: }
274: }
275:
276: protected void ctrlDefineString(int stackid, String id)
277: throws IOException {
278: int word = ((MUX.LONG_LENGTH | MUX.CONTROL) // flags
279: | (MUX.CTRL_DEFINE_STRING << 26) // opcode
280: | (stackid & MUX.LENGTH)); // stack identifier
281: int len = id.length();
282: synchronized (this ) {
283: encodeWord(word);
284: encodeWord(len);
285: emitData(id, len);
286: }
287: }
288:
289: protected void ctrlDefineStack(int id, int stack[])
290: throws IOException {
291: int word = ((MUX.LONG_LENGTH | MUX.CONTROL) // flags
292: | (MUX.CTRL_DEFINE_STACK << 26) // opcode
293: | (id & MUX.LENGTH));
294: int len = (stack.length << 1);
295: synchronized (this ) {
296: encodeWord(word);
297: encodeWord(len);
298: emitShortArray(stack);
299: }
300: }
301:
302: protected void ctrlMuxControl(int sessid, int fragsz)
303: throws IOException {
304: int word = ((MUX.LONG_LENGTH | MUX.CONTROL) // flags
305: | (MUX.CTRL_MUX_CONTROL << 26) // opcode
306: | (sessid << 18) // session id
307: | (fragsz & MUX.LENGTH)); // frag size
308: synchronized (this ) {
309: encodeWord(word);
310: encodeWord(0);
311: }
312: }
313:
314: protected void ctrlSendCredit(int sessid, int credit)
315: throws IOException {
316: int word = ((MUX.LONG_LENGTH | MUX.CONTROL) // flags
317: | (MUX.CTRL_SEND_CREDIT << 26) // opcode
318: | (sessid << 18)); // session id
319: synchronized (this ) {
320: encodeWord(word);
321: encodeWord(credit);
322: }
323: }
324:
325: protected synchronized boolean needsFlush() {
326: return buflen > 0;
327: }
328:
329: MuxWriter(MuxStream stream, OutputStream out) throws IOException {
330: this .stream = stream;
331: this .out = out;
332: this .buffer = new byte[MUX.WRITER_BUFFER_SIZE];
333: this .bufptr = 0;
334: this .buflen = 0;
335: }
336:
337: }
|