001: // MuxReader.java
002: // $Id: MuxReader.java,v 1.8 2000/08/16 21:38:01 ylafon Exp $
003: // (c) COPYRIGHT MIT and INRIA, 1996.
004: // Please first read the full copyright statement in file COPYRIGHT.html
005:
006: package org.w3c.www.mux;
007:
008: import java.io.EOFException;
009: import java.io.IOException;
010: import java.io.InputStream;
011: import java.io.PrintStream;
012:
013: class MuxMessage {
014: int flags = -1; // set by parseMuxWord0
015: int sessid = -1; // set by parseMuxWord0
016: int len = -1; // set by parseMuxWord0
017: int llen = -1; // set by parseMuxWord1
018: int pad = -1; // set by setup{Long|Control}Message
019: int bytes = -1; // set by setup{Long|Control}Message
020: int hsize = -1; // set by setup{Long|Control}Message
021: boolean isctrl = false; // set by setupControlMessage
022: int ctrlop = -1; // set by setupControlMessage
023:
024: MuxMessage() {
025: }
026: }
027:
028: class MuxReader extends Thread {
029: private static final boolean debug = true;
030:
031: /**
032: * The MuxStream we are reading data for.
033: */
034: MuxStream stream = null;
035: /**
036: * Quick access to our MuxStream input stream.
037: */
038: InputStream in = null;
039:
040: /**
041: * Parsed message.
042: */
043: MuxMessage msg = null;
044: /**
045: * Lookeahead message (when nextAvailable set to <strong>true</strong>.
046: */
047: MuxMessage nmsg = null;
048: /**
049: * Current message - flags.
050: */
051: protected int msgflags = -1;
052: /**
053: * Current message - session id.
054: */
055: protected int msgsessid = -1;
056: /**
057: * Current message - Message length.
058: */
059: protected int msglen = -1;
060: /**
061: * Current message - Message long length.
062: */
063: protected int msgllen = -1;
064: /**
065: * Current message - padding bytes to skip.
066: */
067: protected int msgpad = -1;
068: /**
069: * Current message - Message content size.
070: */
071: protected int msgbytes = -1;
072: /**
073: * Current message - Is that a control message.
074: * If this is a control message, then msgctrlop is set properly.
075: */
076: protected boolean msgisctrl = false;
077: /**
078: * Current message - If is control, control op code.
079: */
080: protected int msgctrlop = -1;
081: /**
082: * Were we able to lookahed on next message ?
083: * When this variable is set to <strong>true</strong>, all <em>nmsg</em>
084: * variables contains the parsed next message.
085: */
086: protected boolean nextAvailable = false;
087: /**
088: * Current message - MuxSession to dispatch to.
089: */
090: protected MuxSession msgsess = null;
091: /**
092: * Input buffer.
093: */
094: protected byte buffer[] = null;
095: /**
096: * Half the buffer size (precomputed once and for all).
097: */
098: protected int midbuflength = -1;
099: /**
100: * input buffer pointer.
101: */
102: int bufptr = 0;
103: /**
104: * Input buffer length.
105: */
106: int buflen = 0;
107: /**
108: * Are we still alive ?
109: */
110: protected boolean alive = true;
111:
112: /**
113: * Combine the four bytes into a word, and conform to little endian.
114: * @return An integer value for the given four bytes.
115: */
116:
117: private final int computeWord(byte w0, byte w1, byte w2, byte w3) {
118: return (((((int) w3) & 0xff) << 24)
119: | ((((int) w2) & 0xff) << 16)
120: | ((((int) w1) & 0xff) << 8) | ((((int) w0) & 0xff)));
121: }
122:
123: /**
124: * Parse the first the given integer as the first 32 bits of a MUX message.
125: * This method will set all variables appropriately.
126: * @see #readMessage.
127: * @return A boolean, <strong>true</strong> if next integer of input
128: * is to be read as a long length, <strong>false</strong> otherwise.
129: */
130:
131: private final boolean parseMuxWord0(byte w0, byte w1, byte w2,
132: byte w3, MuxMessage into) {
133: into.flags = computeWord(w0, w1, w2, w3);
134: into.sessid = (into.flags & 0x03fc0000) >> 18;
135: into.len = (into.flags & 0x3ffff);
136: return (into.flags & MUX.LONG_LENGTH) != 0;
137: }
138:
139: /**
140: * Parse the second byte of a mux header.
141: * This method will set all variables appropriately.
142: * @see #readMessage
143: */
144:
145: private final void parseMuxWord1(byte w0, byte w1, byte w2,
146: byte w3, MuxMessage into) {
147: into.llen = computeWord(w0, w1, w2, w3);
148: }
149:
150: private final boolean setupControlMessage(MuxMessage m) {
151: if (m.isctrl = ((m.flags & MUX.CONTROL) == MUX.CONTROL)) {
152: int a = -1;
153: switch (m.ctrlop = ((m.flags & MUX.CTRL_CODE) >> 26)) {
154: case MUX.CTRL_DEFINE_STRING:
155: // Convert the byte data into a String:
156: m.bytes = m.llen;
157: m.pad = ((a = (m.llen & 0x7)) != 0) ? 8 - a : 0;
158: m.hsize = 8;
159: break;
160: case MUX.CTRL_DEFINE_STACK:
161: m.bytes = m.llen;
162: m.pad = ((a = (m.llen & 0x7)) != 0) ? 8 - a : 0;
163: m.hsize = 8;
164: break;
165: case MUX.CTRL_MUX_CONTROL:
166: m.bytes = 0;
167: m.pad = 0;
168: m.hsize = 8;
169: break;
170: case MUX.CTRL_SEND_CREDIT:
171: m.bytes = 0;
172: m.pad = 0;
173: m.hsize = 8;
174: break;
175: }
176: return true;
177: } else {
178: return false;
179: }
180: }
181:
182: private final void setupLongMessage(MuxMessage m) {
183: if (setupControlMessage(m))
184: return;
185: int a = -1;
186: m.bytes = m.llen;
187: m.pad = ((a = (m.llen & 0x7)) != 0) ? 8 - a : 0;
188: m.hsize = 8;
189: }
190:
191: private final void setupMessage(MuxMessage m) {
192: if (setupControlMessage(m))
193: return;
194: if ((m.flags & MUX.SYN) != 0) {
195: m.bytes = 0;
196: m.pad = 0;
197: } else {
198: int a = -1;
199: m.bytes = m.len;
200: m.pad = ((a = (m.len & 0x3)) != 0) ? 4 - a : 0;
201: }
202: m.hsize = 4;
203: }
204:
205: /**
206: * Parse a full mux header into the given message repository.
207: * @return Number of bytes consumed from buffer.
208: */
209:
210: private final void parseMuxHeader(MuxMessage into)
211: throws IOException {
212: while (buflen < 4)
213: fillBuffer();
214: boolean isLong = parseMuxWord0(buffer[bufptr],
215: buffer[bufptr + 1], buffer[bufptr + 2],
216: buffer[bufptr + 3], into);
217: if (isLong) {
218: while (buflen < 4)
219: fillBuffer();
220: parseMuxWord1(buffer[bufptr + 4], buffer[bufptr + 5],
221: buffer[bufptr + 6], buffer[bufptr + 7], into);
222: setupLongMessage(into);
223: } else {
224: setupMessage(into);
225: }
226: }
227:
228: private final boolean parseMuxHeaderAhead(int ptr, int avail,
229: MuxMessage into) {
230: int a = -1;
231:
232: if (avail < 4)
233: return false;
234: boolean isLong = parseMuxWord0(buffer[ptr], buffer[ptr + 1],
235: buffer[ptr + 2], buffer[ptr + 3], into);
236: if (isLong) {
237: if (avail < 8)
238: return false;
239: parseMuxWord1(buffer[ptr + 4], buffer[ptr + 5],
240: buffer[ptr + 6], buffer[ptr + 7], into);
241: setupLongMessage(into);
242: } else {
243: setupMessage(into);
244: }
245: return true;
246: }
247:
248: private final void setCurrent(MuxMessage m) {
249: msgflags = m.flags;
250: msgsessid = m.sessid;
251: msglen = m.len;
252: msgllen = m.llen;
253: msgpad = m.pad;
254: msgbytes = m.bytes;
255: msgisctrl = m.isctrl;
256: msgctrlop = m.ctrlop;
257: }
258:
259: /**
260: * Fill in the read buffer.
261: */
262:
263: private final void fillBuffer() throws IOException {
264: // Rotate the buffer if needed:
265: if (buflen == 0) {
266: bufptr = 0;
267: } else if (bufptr > midbuflength) {
268: System.arraycopy(buffer, bufptr, buffer, 0, buflen);
269: bufptr = 0;
270: }
271: // No more data available, reading the stream is required
272: int ptr = bufptr + buflen;
273: int got = in.read(buffer, ptr, buffer.length - ptr);
274: if (got > 0) {
275: buflen += got;
276: if (debug)
277: System.out.println("MuxReader.fillBuffer: " + buflen
278: + " bytes.");
279: } else if (got < 0) {
280: // The socket has been closed, notify the session to shutdown:
281: stream.error(this , "Gracefull close.");
282: // Fake exception to get back to reader's main loop:
283: throw new EOFException("Gracefull close.");
284: }
285: if (debug)
286: System.out.println("MuxReader: got " + got + " bytes.");
287: }
288:
289: /**
290: * Read next available message from the stream input stream.
291: * This method fills in the following variable:
292: * <dl>
293: * <dt>msgflags<dd>An integer describing the MUX flags for current message.
294: * <dt>msgsessid<dd>The session that is to receive the message.
295: * <dt>msglen<dd>The message length (or protocol id).
296: * <dt>msgllen<dd>The long message length, when the flags requires
297: * it.
298: * <dt>msgbytes<dd>The real length of the message body (using either the
299: * long header format or the short one).
300: * <dt>msgpad<dd>Number of padding bytes to skip by the end of that
301: * message.
302: * </dl>
303: * It is up to the caller to read the reminaing bytes of the message
304: * before calling this method again.
305: */
306:
307: protected void readMessage() throws IOException {
308: int a = -1;
309:
310: // Read in current message:
311: if (nextAvailable) {
312: // Next message was read ahead, use previous result:
313: setCurrent(nmsg);
314: bufptr += nmsg.hsize;
315: buflen -= nmsg.hsize;
316: nextAvailable = false;
317: } else {
318: // Parse current message:
319: parseMuxHeader(msg);
320: bufptr += msg.hsize;
321: buflen -= msg.hsize;
322: setCurrent(msg);
323: }
324: // Try to read ahead next message:
325: int nbufptr = bufptr + msgbytes + msgpad;
326: if (nextAvailable = (nbufptr < bufptr + buflen)) {
327: nextAvailable = parseMuxHeaderAhead(nbufptr, bufptr
328: + buflen - nbufptr, nmsg);
329: }
330: if (debug)
331: System.out.println("[readMessage] bufptr=" + bufptr
332: + ", f=" + Integer.toString(msgflags, 16) + ", i="
333: + msgsessid + ", l=" + msglen + ", s=" + msgbytes);
334: }
335:
336: /**
337: * Read message body.
338: * It is up to the caller of that routine to consume exactly the number
339: * of returned bytes from this reader's input buffer.
340: * @return A boolean, <strong>true</strong> if more bytes are available for
341: * that message, <strong>false</string>otherwise.
342: */
343:
344: private final int readMessageBody() throws IOException {
345: if (debug)
346: System.out.println("readMessageBody: " + msgbytes
347: + " bytes avail.");
348: if (msgbytes > 0) {
349: if (buflen <= 0)
350: fillBuffer();
351: if (msgbytes > buflen) {
352: msgbytes -= buflen;
353: return buflen;
354: } else {
355: int ret = msgbytes;
356: msgbytes = 0;
357: return ret;
358: }
359: } else if (msgpad > 0) {
360: // Skip padding bytes:
361: while (buflen <= msgpad)
362: fillBuffer();
363: bufptr += msgpad;
364: buflen -= msgpad;
365: }
366: return 0;
367: }
368:
369: /**
370: * Decode the current message body as a String.
371: * @return A String instance.
372: */
373:
374: protected String msgToString() throws IOException {
375: if (buffer.length < msglen)
376: throw new RuntimeException(
377: "String doesn't hold in buffer !");
378: while (buflen < msglen)
379: fillBuffer();
380: String s = new String(buffer, 0, bufptr, msglen);
381: bufptr += msglen;
382: buflen -= msglen;
383: return s;
384: }
385:
386: /**
387: * Decode the current message body as a shot array.
388: * @return A short array instance.
389: */
390:
391: protected int[] msgShortArrayToIntArray() throws IOException {
392: if (buffer.length < msglen)
393: throw new RuntimeException(
394: "ShortArray doesn't hold in buffer !");
395: while (buflen < msglen)
396: fillBuffer();
397: int a[] = new int[msglen >> 1];
398: for (int i = 0; i < a.length; i++) {
399: a[i] = (buffer[bufptr] | (buffer[bufptr + 1] << 8)) & 0xffff;
400: bufptr += 2;
401: }
402: buflen -= msglen;
403: return a;
404: }
405:
406: /**
407: * Handle (decode and dispatch) control messages.
408: * This method gets called by the dispatcher whenever a MUX header
409: * with the control bit set is the current message to dispatch.
410: */
411:
412: protected void handleControlMessage() throws IOException {
413: switch (msgctrlop) {
414: case MUX.CTRL_DEFINE_STRING:
415: // Convert the byte data into a String:
416: String str = msgToString();
417: stream.ctrlDefineString(msglen, str);
418: break;
419: case MUX.CTRL_DEFINE_STACK:
420: int ids[] = msgShortArrayToIntArray();
421: stream.ctrlDefineStack(msgsessid, ids);
422: break;
423: case MUX.CTRL_MUX_CONTROL:
424: stream.ctrlMuxControl(msgsessid, msglen);
425: break;
426: case MUX.CTRL_SEND_CREDIT:
427: stream.ctrlSendCredit(msgsessid, msgllen);
428: break;
429: }
430: }
431:
432: /**
433: * Dispatch the current message to the appropriate handler.
434: */
435:
436: protected void dispatchMessage() throws IOException {
437: msgsess = stream.lookupSession(msgflags, msgsessid, msglen,
438: msgllen);
439: if (msgsess != null) {
440: if (msgisctrl) {
441: // Control message requires special actions:
442: handleControlMessage();
443: } else {
444: // Dispatch that message body to the given session:
445: boolean noflush = (nextAvailable && (nmsg.sessid == msgsessid));
446: int got = 0;
447: while ((got = readMessageBody()) > 0) {
448: msgsess.pushInput(buffer, bufptr, got, noflush);
449: bufptr += got;
450: buflen -= got;
451: }
452: // Notify the session of any fancy flags:
453: if ((msgflags & MUX.FIN) == MUX.FIN)
454: msgsess.notifyFIN();
455: if ((msgflags & MUX.RST) == MUX.RST)
456: msgsess.notifyRST();
457: if ((msgflags & MUX.PUSH) == MUX.PUSH)
458: msgsess.notifyPUSH();
459: }
460: } else {
461: // Discard that message's data:
462: int got = -1;
463: while ((got = readMessageBody()) > 0) {
464: bufptr += got;
465: buflen -= got;
466: }
467: }
468: }
469:
470: /**
471: * Shutdown the reader for this stream.
472: */
473:
474: protected synchronized void shutdown() {
475: alive = false;
476: buffer = null;
477: stop();
478: }
479:
480: /**
481: * Runfor ever, reading available input.
482: * Unfortunatelly the Java IO models <em>requires</em> that you consume
483: * a full thread, just to read the input stream.
484: */
485:
486: public void run() {
487: try {
488: while (alive) {
489: readMessage();
490: dispatchMessage();
491: // Clear up current message descriptor:
492: msgflags = 0;
493: msgsessid = 0;
494: msglen = 0;
495: msgllen = 0;
496: msgpad = 0;
497: msgisctrl = false;
498: msgctrlop = -1;
499: }
500: } catch (EOFException ex) {
501: // Already handled, the stream *has* been notified.
502: } catch (IOException ex) {
503: stream.error(this , ex);
504: }
505: }
506:
507: MuxReader(MuxStream stream, InputStream in) throws IOException {
508: this .stream = stream;
509: this .in = in;
510: this .buffer = new byte[MUX.READER_BUFFER_SIZE];
511: this .bufptr = 0;
512: this .buflen = 0;
513: this .midbuflength = (MUX.READER_BUFFER_SIZE >> 1);
514: this .msg = new MuxMessage();
515: this .nmsg = new MuxMessage();
516: setName("MuxReader");
517: }
518:
519: }
|