001: // MuxSession.java
002: // $Id: MuxSession.java,v 1.9 2000/08/16 21:38:01 ylafon Exp $
003: // (c) COPYRIGHT MIT and INRIA, 1996.
004: // Please first read the full copyright statement in file COPYRIGHT.html
005:
006: package org.w3c.www.mux;
007:
008: import java.io.IOException;
009: import java.io.InputStream;
010: import java.io.OutputStream;
011: import java.io.PrintStream;
012:
013: import java.net.InetAddress;
014:
015: public class MuxSession /* extends Socket */{
016: /**
017: * The default input buffer size for all sessions.
018: */
019: public static final int INPUT_BUFFER_SIZE = 4096;
020:
021: /**
022: * The stream to which that session belongs.
023: */
024: protected MuxStream stream = null;
025: /**
026: * The session's input stream.
027: */
028: protected MuxInputStream in = null;
029: /**
030: * The session's output stream.
031: */
032: protected MuxOutputStream out = null;
033: /**
034: * This session's identifier.
035: */
036: protected int id = -1;
037: /**
038: * This session's protocol identifier.
039: */
040: protected int protid = -1;
041: /**
042: * Has this session been aborted ?
043: */
044: protected boolean aborted = false;
045: /**
046: * Has this session emitted a FIN (is it half-closed ?)
047: */
048: protected boolean finsent = false;
049:
050: /**
051: * Push some data into that session's input stream.
052: * @param data The buffer containing the data to be pushed.
053: * @param off Offset of the data within above buffer.
054: * @param len Length of data to be pushed.
055: * @param noflush Set to <strong>true</strong> if there is already more
056: * data available for that session.
057: * @exception IOException If IO was interrupted.
058: */
059:
060: protected final void pushInput(byte data[], int off, int len,
061: boolean noflush) throws IOException {
062: in.push(data, off, len, noflush);
063: }
064:
065: /**
066: * Send a FIN message on that session.
067: */
068:
069: protected final void sendFIN() throws IOException {
070: if (!finsent) {
071: stream.getMuxWriter().writeMessage(id, MUX.FIN, 0);
072: stream.getMuxWriter().flush();
073: finsent = true;
074: }
075: }
076:
077: /**
078: * We have received a FIN on that session's output stream.
079: * @exception IOException If some IO error occured.
080: */
081:
082: protected final void notifyFIN() throws IOException {
083: in.close();
084: if (finsent)
085: shutdown();
086: }
087:
088: /**
089: * The other end is telling us that something is going wrong. Cleanup.
090: */
091:
092: protected void notifyRST() throws IOException {
093: in.error("Broken pipe");
094: out.close();
095: shutdown();
096: }
097:
098: protected void notifyPUSH() {
099: System.out.println("MuxSession:notifyPUSH: not handled");
100: }
101:
102: protected final void notifyCredit(int credit) {
103: out.notifyCredit(credit);
104: }
105:
106: protected final void notifyControl(int fragsz) {
107: out.notifyControl(fragsz);
108: }
109:
110: protected void notifyOutputClose() throws IOException {
111: stream.getMuxWriter().writeMessage(id, MUX.FIN, 0);
112: }
113:
114: /**
115: * Abort that session.
116: * The MUX stream erred, the underlying transport streams are broken.
117: * Terminate that session, make sure any further action on it will trigger
118: * an IO error.
119: */
120:
121: protected synchronized void abort() {
122: aborted = true;
123: try {
124: shutdown();
125: } catch (Exception ex) {
126: }
127: }
128:
129: /**
130: * Shutdown that session gracefully.
131: */
132:
133: public void shutdown() throws IOException {
134: // Close both streams:
135: try {
136: in.close();
137: } catch (Exception ex) {
138: ex.printStackTrace();
139: }
140: try {
141: out.close();
142: } catch (Exception ex) {
143: ex.printStackTrace();
144: }
145: // Unregister the session
146: stream.unregisterSession(this );
147: }
148:
149: /**
150: * Get the Mux stream to which that session is attached.
151: * @return A MuxStream instance.
152: */
153:
154: protected final MuxStream getMuxStream() {
155: return stream;
156: }
157:
158: /**
159: * Get this session's input stream buffer size.
160: * @return The standard buffer size for that session input stream.
161: */
162:
163: protected int getInputBufferSize() {
164: return INPUT_BUFFER_SIZE;
165: }
166:
167: /**
168: * Get the other end's IP address.
169: * @return An InetAddress instance.
170: */
171:
172: public InetAddress getInetAddress() {
173: return getMuxStream().getInetAddress();
174: }
175:
176: /**
177: * Get this session identifier.
178: * @return An integer identifier for that session.
179: */
180:
181: public final int getIdentifier() {
182: return id;
183: }
184:
185: /**
186: * Get this session protocol identifier.
187: * @return An integer identifying the protocol runnin on that session.
188: */
189:
190: public final int getProtocolIdentifier() {
191: return protid;
192: }
193:
194: /**
195: * Get this session's input stream.
196: * @return An InputStream instance.
197: */
198:
199: public synchronized InputStream getInputStream() throws IOException {
200: if (aborted)
201: throw new IOException("Aborted mux session");
202: return in;
203: }
204:
205: /**
206: * Get this session's output stream.
207: * @return An OutputStream instance.
208: */
209:
210: public synchronized OutputStream getOutputStream()
211: throws IOException {
212: if (aborted)
213: throw new IOException("Aborted mux session");
214: return out;
215: }
216:
217: protected MuxSession(MuxStream stream, int id, int protid) {
218: // Initialize state:
219: this .stream = stream;
220: this .id = id;
221: this .protid = protid;
222: // Create input and output streams:
223: this .in = new MuxInputStream(this );
224: this .out = new MuxOutputStream(this);
225: }
226:
227: }
|