001: /*
002: * Copyright 1996-1997 Sun Microsystems, Inc. All Rights Reserved.
003: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
004: *
005: * This code is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU General Public License version 2 only, as
007: * published by the Free Software Foundation. Sun designates this
008: * particular file as subject to the "Classpath" exception as provided
009: * by Sun in the LICENSE file that accompanied this code.
010: *
011: * This code is distributed in the hope that it will be useful, but WITHOUT
012: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
014: * version 2 for more details (a copy is included in the LICENSE file that
015: * accompanied this code).
016: *
017: * You should have received a copy of the GNU General Public License version
018: * 2 along with this work; if not, write to the Free Software Foundation,
019: * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
020: *
021: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
022: * CA 95054 USA or visit www.sun.com if you need additional information or
023: * have any questions.
024: */
025: package sun.rmi.transport.tcp;
026:
027: import java.io.*;
028:
029: /**
030: * MultiplexInputStream manages receiving data over a connection managed
031: * by a ConnectionMultiplexer object. This object is responsible for
032: * requesting more bytes of data as space in its internal buffer becomes
033: * available.
034: *
035: * @author Peter Jones
036: */
037: final class MultiplexInputStream extends InputStream {
038:
039: /** object managing multiplexed connection */
040: private ConnectionMultiplexer manager;
041:
042: /** information about the connection this is the input stream for */
043: private MultiplexConnectionInfo info;
044:
045: /** input buffer */
046: private byte buffer[];
047:
048: /** number of real data bytes present in buffer */
049: private int present = 0;
050:
051: /** current position to read from in input buffer */
052: private int pos = 0;
053:
054: /** pending number of bytes this stream has requested */
055: private int requested = 0;
056:
057: /** true if this connection has been disconnected */
058: private boolean disconnected = false;
059:
060: /**
061: * lock acquired to access shared variables:
062: * buffer, present, pos, requested, & disconnected
063: * WARNING: Any of the methods manager.send*() should not be
064: * invoked while this lock is held, since they could potentially
065: * block if the underlying connection's transport buffers are
066: * full, and the manager may need to acquire this lock to process
067: * and consume data coming over the underlying connection.
068: */
069: private Object lock = new Object();
070:
071: /** level at which more data is requested when read past */
072: private int waterMark;
073:
074: /** data structure for holding reads of one byte */
075: private byte temp[] = new byte[1];
076:
077: /**
078: * Create a new MultiplexInputStream for the given manager.
079: * @param manager object that manages this connection
080: * @param info structure for connection this stream reads from
081: * @param bufferLength length of input buffer
082: */
083: MultiplexInputStream(ConnectionMultiplexer manager,
084: MultiplexConnectionInfo info, int bufferLength) {
085: this .manager = manager;
086: this .info = info;
087:
088: buffer = new byte[bufferLength];
089: waterMark = bufferLength / 2;
090: }
091:
092: /**
093: * Read a byte from the connection.
094: */
095: public synchronized int read() throws IOException {
096: int n = read(temp, 0, 1);
097: if (n != 1)
098: return -1;
099: return temp[0] & 0xFF;
100: }
101:
102: /**
103: * Read a subarray of bytes from connection. This method blocks for
104: * at least one byte, and it returns the number of bytes actually read,
105: * or -1 if the end of the stream was detected.
106: * @param b array to read bytes into
107: * @param off offset of beginning of bytes to read into
108: * @param len number of bytes to read
109: */
110: public synchronized int read(byte b[], int off, int len)
111: throws IOException {
112: if (len <= 0)
113: return 0;
114:
115: int moreSpace;
116: synchronized (lock) {
117: if (pos >= present)
118: pos = present = 0;
119: else if (pos >= waterMark) {
120: System.arraycopy(buffer, pos, buffer, 0, present - pos);
121: present -= pos;
122: pos = 0;
123: }
124: int freeSpace = buffer.length - present;
125: moreSpace = Math.max(freeSpace - requested, 0);
126: }
127: if (moreSpace > 0)
128: manager.sendRequest(info, moreSpace);
129: synchronized (lock) {
130: requested += moreSpace;
131: while ((pos >= present) && !disconnected) {
132: try {
133: lock.wait();
134: } catch (InterruptedException e) {
135: }
136: }
137: if (disconnected && pos >= present)
138: return -1;
139:
140: int available = present - pos;
141: if (len < available) {
142: System.arraycopy(buffer, pos, b, off, len);
143: pos += len;
144: return len;
145: } else {
146: System.arraycopy(buffer, pos, b, off, available);
147: pos = present = 0;
148: // could send another request here, if len > available??
149: return available;
150: }
151: }
152: }
153:
154: /**
155: * Return the number of bytes immediately available for reading.
156: */
157: public int available() throws IOException {
158: synchronized (lock) {
159: return present - pos;
160: }
161: }
162:
163: /**
164: * Close this connection.
165: */
166: public void close() throws IOException {
167: manager.sendClose(info);
168: }
169:
170: /**
171: * Receive bytes transmitted from connection at remote endpoint.
172: * @param length number of bytes transmitted
173: * @param in input stream with those bytes ready to be read
174: */
175: void receive(int length, DataInputStream in) throws IOException {
176: /* TO DO: Optimize so that data received from stream can be loaded
177: * directly into user's buffer if there is a pending read().
178: */
179: synchronized (lock) {
180: if ((pos > 0) && ((buffer.length - present) < length)) {
181: System.arraycopy(buffer, pos, buffer, 0, present - pos);
182: present -= pos;
183: pos = 0;
184: }
185: if ((buffer.length - present) < length)
186: throw new IOException("Receive buffer overflow");
187: in.readFully(buffer, present, length);
188: present += length;
189: requested -= length;
190: lock.notifyAll();
191: }
192: }
193:
194: /**
195: * Disconnect this stream from all connection activity.
196: */
197: void disconnect() {
198: synchronized (lock) {
199: disconnected = true;
200: lock.notifyAll();
201: }
202: }
203: }
|