001: // MuxStream.java
002: // $Id: MuxStream.java,v 1.8 2000/08/16 21:38:01 ylafon Exp $
003: // (c) COPYRIGHT MIT and INRIA, 1996.
004: // Please first read the full copyright statement in file COPYRIGHT.html
005:
006: package org.w3c.www.mux;
007:
008: import java.io.IOException;
009: import java.io.InputStream;
010: import java.io.OutputStream;
011: import java.io.PrintStream;
012:
013: import java.net.InetAddress;
014: import java.net.Socket;
015:
016: public class MuxStream {
017: /**
018: * That stream accept handler.
019: */
020: protected MuxStreamHandler handler = null;
021: /**
022: * This stream reader.
023: */
024: protected MuxReader reader = null;
025: /**
026: * This stream writer.
027: */
028: protected MuxWriter writer = null;
029: /**
030: * Currently defined sessions.
031: */
032: protected MuxSession sessions[] = null;
033: /**
034: * Is this the server side of the MUX channel ?
035: */
036: protected boolean server = false;
037: /**
038: * Inet address of the other end's connection (maybe <strong>null</strong>)
039: */
040: protected InetAddress inetaddr = null;
041: /**
042: * The raw input stream.
043: */
044: protected InputStream in = null;
045: /**
046: * The raw output stream.
047: */
048: protected OutputStream out = null;
049: /**
050: * Is this muxed stream still alive ?
051: */
052: protected boolean alive = true;
053:
054: // Assumes sessions array is of correct size, and checks have been done
055:
056: private synchronized MuxSession createSession(int sessid, int protid)
057: throws IOException {
058: MuxSession session = sessions[sessid];
059: if (session == null) {
060: session = new MuxSession(this , sessid, protid);
061: sessions[sessid] = session;
062: } else {
063: System.out
064: .println("MuxStream:createSession: already existing !");
065: }
066: return session;
067: }
068:
069: // Are we willing to accept that new session ?
070: // Because we need to accept it internally, we always return something
071: // It is up to the caller to make sure flags has SYN set
072: // NOTE that the calls to the handler don't lock that object (a feature)
073:
074: private MuxSession acceptSession(int flags, int sessid, int protid)
075: throws IOException {
076: if (server & ((sessid & 1) == 0))
077: throw new IOException("MUX: Invalid even session id "
078: + sessid);
079: // Query the session handler about that new session:
080: MuxSession session = null;
081: if ((handler != null)
082: && handler.acceptSession(this , sessid, protid)) {
083: // Session accepted, setup handler:
084: session = createSession(sessid, protid);
085: handler.notifySession(session);
086: } else {
087: // Session rejected, emit a RST:
088: session = null;
089: System.out.println(this + ": RST (accept) session "
090: + sessid);
091: writer.writeMessage(sessid, MUX.RST, 0);
092: writer.flush();
093: }
094: return session;
095: }
096:
097: private final synchronized MuxSession allocateSession(int protid)
098: throws IOException {
099: // Available sessions ?
100: int i = (server ? 2 : 3);
101: for (; i < sessions.length; i += 2) {
102: if (sessions[i] == null) {
103: sessions[i] = new MuxSession(this , i, protid);
104: return sessions[i];
105: }
106: }
107: // Create a new session:
108: MuxSession session = checkSession(i);
109: if (session == null)
110: session = new MuxSession(this , i, protid);
111: sessions[i] = session;
112: return session;
113: }
114:
115: private final synchronized MuxSession checkSession(int sessid)
116: throws IOException {
117: // Check protocol validity:
118: if (sessid >= MUX.MAX_SESSION)
119: throw new IOException("MUX: Invalid session id " + sessid);
120: // Get or create the appropriate session:
121: if (sessid >= sessions.length) {
122: MuxSession ns[] = new MuxSession[sessid + MUX.SESSIONS_INCR];
123: System.arraycopy(sessions, 0, ns, 0, sessions.length);
124: sessions = ns;
125: }
126: return sessions[sessid];
127: }
128:
129: /**
130: * This stream is dying, clean it up.
131: * It is up to the caller to make sure all existing sessions have been
132: * terminated (gracefully or not).
133: * <p>This will shutdown all realted threads, and close the transport
134: * streams.
135: */
136:
137: private synchronized void cleanup() {
138: alive = false;
139: // Cleanup the reader and writer objects:
140: reader.shutdown();
141: writer.shutdown();
142: reader = null;
143: writer = null;
144: // Close streams:
145: try {
146: in.close();
147: out.close();
148: } catch (IOException ex) {
149: }
150: in = null;
151: out = null;
152: }
153:
154: /**
155: * Get this stream MuxWriter object.
156: * @return A MuxWriter instance.
157: */
158:
159: protected final MuxWriter getMuxWriter() {
160: return writer;
161: }
162:
163: /**
164: * A severe (fatal for that connection) errror has occured. Cleanup.
165: * @param obj The object that has generated the error.
166: * @param ex The exception that triggered the error (or <strong>null
167: * </strong> null if this was a logical error).
168: */
169:
170: protected void error(Object obj, Exception ex) {
171: System.out.println("*** Fatal error on " + this );
172: ex.printStackTrace();
173: System.out.println("No recovery !");
174: System.exit(1);
175: }
176:
177: /**
178: * A soft error has occured (eg socket close), Cleanup.
179: * @param obj The object that has detected the soft error.
180: * @param msg An associated String message.
181: */
182:
183: protected synchronized void error(Object obj, String msg) {
184: // Is there any pending session ?
185: boolean problems = false;
186: synchronized (this ) {
187: for (int i = 0; i < sessions.length; i++) {
188: if (sessions[i] != null)
189: sessions[i].abort();
190: }
191: }
192: // If no problems, close socket, we're done:
193: cleanup();
194: }
195:
196: /**
197: * Handle the given DefineString control message.
198: * @param strid The identifier for that String in the futur.
199: * @param str This String being defined.
200: */
201:
202: protected void ctrlDefineString(int strid, String str) {
203: }
204:
205: /**
206: * Handle the given DefineStack control message.
207: * @param id The identifier for that stack in the future.
208: * @param stack The stack description (as an array of shorts).
209: */
210:
211: protected void ctrlDefineStack(int id, int stack[])
212: throws IOException {
213: }
214:
215: /**
216: * Handle the given MuxControl control message.
217: * @param sessid The session to which that message applies.
218: * @param fragsz The max allowed fragment size on that session.
219: */
220:
221: protected void ctrlMuxControl(int sessid, int fragsz)
222: throws IOException {
223: MuxSession session = lookupSession(sessid, true);
224: session.notifyControl(fragsz);
225: }
226:
227: /**
228: * Handle the given SendCredit control message.
229: * @param sessid The session to which that message applies.
230: * @param credit The allowed credits.
231: */
232:
233: protected void ctrlSendCredit(int sessid, int credit)
234: throws IOException {
235: MuxSession session = lookupSession(sessid, true);
236: session.notifyCredit(credit);
237: }
238:
239: /**
240: * Handle that new incomming message.
241: * This method is called by the reader of that session, to dispatch
242: * the message currently being read.
243: * @return A MuxSession instance to dispatch that message to, or
244: * <strong>null</strong> otherwise (ie a new session was rejected, etc).
245: * In that last case, it is up to the reader of that session to discard
246: * any pending data.
247: */
248:
249: protected MuxSession lookupSession(int flags, int sessid,
250: int length, int llength) throws IOException {
251: MuxSession session = checkSession(sessid);
252: if (session == null) {
253: if ((flags & MUX.SYN) != 0) {
254: // Length really means protid in that case:
255: session = acceptSession(flags, sessid, length);
256: } else if ((flags & MUX.FIN) != MUX.FIN) {
257: // We don't know about that session, emit some reset:
258: System.out.println(this + ": RST (lookup) session "
259: + sessid);
260: if ((flags & MUX.RST) != MUX.RST) {
261: // Above test breaks a nasty loop !
262: writer.writeMessage(sessid, MUX.RST, 0);
263: }
264: }
265: }
266: return session;
267: }
268:
269: /**
270: * Lookup for an already existing session having the given identifier.
271: * @param sessid The identifier of the session to look for.
272: * @param check Is <strong>null</strong> a valid answer, if set and
273: * the requested session doesn't exist, a runtime exception is thrown.
274: * @return A MuxSession instance, or <strong>null</strong> if check is
275: * <strong>false</strong> and no session was found.
276: */
277:
278: protected synchronized MuxSession lookupSession(int sessid,
279: boolean check) {
280: if (sessid < sessions.length) {
281: MuxSession session = sessions[sessid];
282: if (session != null)
283: return session;
284: }
285: if (check) {
286: throw new RuntimeException("MuxStream:lookupSession: "
287: + " invalid session id " + sessid + ".");
288: }
289: return null;
290: }
291:
292: /**
293: * Unregiter the given session, it has been closed.
294: * @param session The session to unregister.
295: */
296:
297: protected synchronized void unregisterSession(MuxSession session) {
298: sessions[session.getIdentifier()] = null;
299: }
300:
301: /**
302: * Create a new MUX session, by connecting to the other end.
303: * @param protid The protocol that is going to be spoken on that new
304: * session.
305: * @return A connected MuxSession.
306: * @exception IOException If the connection couldn't be set up properly.
307: */
308:
309: public MuxSession connect(int protid) throws IOException {
310: // Is this stream still alive ?
311: synchronized (this ) {
312: if (!alive)
313: throw new IOException("Broken mux stream");
314: }
315: // Allocate a new session identifier:
316: MuxSession session = allocateSession(protid);
317: // If SYN with long-length not set accepted, uncomment following:
318: // writer.writeMessage(session.getIdentifier(), MUX.SYN, protid);
319: writer.writeMessage(session.getIdentifier(), MUX.SYN, protid,
320: null, 0, 0);
321: return session;
322: }
323:
324: /**
325: * Get the InetAddress associated with that MUX stream, if any.
326: * MUX streams can run on any kind of Input/Output streams. This method
327: * will only return a non-null instance when possible.
328: * @return An InetAddress instance, or <strong>null</strong> if not
329: * available.
330: */
331:
332: public InetAddress getInetAddress() {
333: return inetaddr;
334: }
335:
336: /**
337: * Shutdown this stream, and associated sessions gracefully.
338: * @param force If <strong>true</strong> abort all existing sessions, and
339: * close the muxed streams physically. Otherwise, shutdown the muxed stream
340: * gracefully only if no more sessions are running.
341: * @return A boolean, <strong>true</strong> if shutdown was performed,
342: * <strong>false</strong> if it was not performed because <em>force</em>
343: * was <strong>false</strong> and some sessions were still running.
344: * @exception IOException If some IO error occured.
345: */
346:
347: public synchronized boolean shutdown(boolean force)
348: throws IOException {
349: // Has this stream already been killed ?
350: if (!alive)
351: return true;
352: boolean terminate = true;
353: // Check sessions status:
354: if (force) {
355: for (int i = 0; i < sessions.length; i++) {
356: MuxSession s = sessions[i];
357: if (s != null)
358: s.abort();
359: }
360: } else {
361: for (int i = 0; i < sessions.length; i++) {
362: if (sessions[i] != null) {
363: terminate = false;
364: break;
365: }
366: }
367: }
368: if (terminate)
369: cleanup();
370: return terminate;
371: }
372:
373: public MuxStream(boolean server, MuxStreamHandler handler,
374: InputStream in, OutputStream out) throws IOException {
375: this .server = server;
376: this .handler = handler;
377: this .in = in;
378: this .out = out;
379: this .reader = new MuxReader(this , in);
380: this .writer = new MuxWriter(this , out);
381: this .sessions = new MuxSession[8];
382: this .reader.start();
383: }
384:
385: public MuxStream(boolean server, MuxStreamHandler handler,
386: Socket socket) throws IOException {
387: this(server, handler, socket.getInputStream(), socket
388: .getOutputStream());
389: this.inetaddr = socket.getInetAddress();
390: }
391:
392: }
|