001: /*
002: * Copyright 1996-2003 Sun Microsystems, Inc. All Rights Reserved.
003: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
004: *
005: * This code is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU General Public License version 2 only, as
007: * published by the Free Software Foundation. Sun designates this
008: * particular file as subject to the "Classpath" exception as provided
009: * by Sun in the LICENSE file that accompanied this code.
010: *
011: * This code is distributed in the hope that it will be useful, but WITHOUT
012: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
014: * version 2 for more details (a copy is included in the LICENSE file that
015: * accompanied this code).
016: *
017: * You should have received a copy of the GNU General Public License version
018: * 2 along with this work; if not, write to the Free Software Foundation,
019: * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
020: *
021: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
022: * CA 95054 USA or visit www.sun.com if you need additional information or
023: * have any questions.
024: */
025: package sun.rmi.transport.tcp;
026:
027: import java.io.*;
028: import java.util.*;
029: import java.rmi.server.LogStream;
030:
031: import sun.rmi.runtime.Log;
032:
033: /**
034: * ConnectionMultiplexer manages the transparent multiplexing of
035: * multiple virtual connections from one endpoint to another through
036: * one given real connection to that endpoint. The input and output
037: * streams for the the underlying real connection must be supplied.
038: * A callback object is also supplied to be informed of new virtual
039: * connections opened by the remote endpoint. After creation, the
040: * run() method must be called in a thread created for demultiplexing
041: * the connections. The openConnection() method is called to
042: * initiate a virtual connection from this endpoint.
043: *
044: * @author Peter Jones
045: */
046: final class ConnectionMultiplexer {
047:
048: /** "multiplex" log level */
049: static int logLevel = LogStream.parseLevel(getLogLevel());
050:
051: private static String getLogLevel() {
052: return (String) java.security.AccessController
053: .doPrivileged(new sun.security.action.GetPropertyAction(
054: "sun.rmi.transport.tcp.multiplex.logLevel"));
055: }
056:
057: /* multiplex system log */
058: static final Log multiplexLog = Log.getLog(
059: "sun.rmi.transport.tcp.multiplex", "multiplex",
060: ConnectionMultiplexer.logLevel);
061:
062: /** multiplexing protocol operation codes */
063: private final static int OPEN = 0xE1;
064: private final static int CLOSE = 0xE2;
065: private final static int CLOSEACK = 0xE3;
066: private final static int REQUEST = 0xE4;
067: private final static int TRANSMIT = 0xE5;
068:
069: /** object to notify for new connections from remote endpoint */
070: private TCPChannel channel;
071:
072: /** input stream for underlying single connection */
073: private InputStream in;
074:
075: /** output stream for underlying single connection */
076: private OutputStream out;
077:
078: /** true if underlying connection originated from this endpoint
079: (used for generating unique connection IDs) */
080: private boolean orig;
081:
082: /** layered stream for reading formatted data from underlying connection */
083: private DataInputStream dataIn;
084:
085: /** layered stream for writing formatted data to underlying connection */
086: private DataOutputStream dataOut;
087:
088: /** table holding currently open connection IDs and related info */
089: private Hashtable connectionTable = new Hashtable(7);
090:
091: /** number of currently open connections */
092: private int numConnections = 0;
093:
094: /** maximum allowed open connections */
095: private final static int maxConnections = 256;
096:
097: /** ID of last connection opened */
098: private int lastID = 0x1001;
099:
100: /** true if this mechanism is still alive */
101: private boolean alive = true;
102:
103: /**
104: * Create a new ConnectionMultiplexer using the given underlying
105: * input/output stream pair. The run method must be called
106: * (possibly on a new thread) to handle the demultiplexing.
107: * @param channel object to notify when new connection is received
108: * @param in input stream of underlying connection
109: * @param out output stream of underlying connection
110: * @param orig true if this endpoint intiated the underlying
111: * connection (needs to be set differently at both ends)
112: */
113: public ConnectionMultiplexer(TCPChannel channel, InputStream in,
114: OutputStream out, boolean orig) {
115: this .channel = channel;
116: this .in = in;
117: this .out = out;
118: this .orig = orig;
119:
120: dataIn = new DataInputStream(in);
121: dataOut = new DataOutputStream(out);
122: }
123:
124: /**
125: * Process multiplexing protocol received from underlying connection.
126: */
127: public void run() throws IOException {
128: try {
129: int op, id, length;
130: Integer idObj;
131: MultiplexConnectionInfo info;
132:
133: while (true) {
134:
135: // read next op code from remote endpoint
136: op = dataIn.readUnsignedByte();
137: switch (op) {
138:
139: // remote endpoint initiating new connection
140: case OPEN:
141: id = dataIn.readUnsignedShort();
142:
143: if (multiplexLog.isLoggable(Log.VERBOSE)) {
144: multiplexLog.log(Log.VERBOSE,
145: "operation OPEN " + id);
146: }
147:
148: idObj = new Integer(id);
149: info = (MultiplexConnectionInfo) connectionTable
150: .get(idObj);
151: if (info != null)
152: throw new IOException(
153: "OPEN: Connection ID already exists");
154: info = new MultiplexConnectionInfo(id);
155: info.in = new MultiplexInputStream(this , info, 2048);
156: info.out = new MultiplexOutputStream(this , info,
157: 2048);
158: synchronized (connectionTable) {
159: connectionTable.put(idObj, info);
160: ++numConnections;
161: }
162: sun.rmi.transport.Connection conn;
163: conn = new TCPConnection(channel, info.in, info.out);
164: channel.acceptMultiplexConnection(conn);
165: break;
166:
167: // remote endpoint closing connection
168: case CLOSE:
169: id = dataIn.readUnsignedShort();
170:
171: if (multiplexLog.isLoggable(Log.VERBOSE)) {
172: multiplexLog.log(Log.VERBOSE,
173: "operation CLOSE " + id);
174: }
175:
176: idObj = new Integer(id);
177: info = (MultiplexConnectionInfo) connectionTable
178: .get(idObj);
179: if (info == null)
180: throw new IOException(
181: "CLOSE: Invalid connection ID");
182: info.in.disconnect();
183: info.out.disconnect();
184: if (!info.closed)
185: sendCloseAck(info);
186: synchronized (connectionTable) {
187: connectionTable.remove(idObj);
188: --numConnections;
189: }
190: break;
191:
192: // remote endpoint acknowledging close of connection
193: case CLOSEACK:
194: id = dataIn.readUnsignedShort();
195:
196: if (multiplexLog.isLoggable(Log.VERBOSE)) {
197: multiplexLog.log(Log.VERBOSE,
198: "operation CLOSEACK " + id);
199: }
200:
201: idObj = new Integer(id);
202: info = (MultiplexConnectionInfo) connectionTable
203: .get(idObj);
204: if (info == null)
205: throw new IOException(
206: "CLOSEACK: Invalid connection ID");
207: if (!info.closed)
208: throw new IOException(
209: "CLOSEACK: Connection not closed");
210: info.in.disconnect();
211: info.out.disconnect();
212: synchronized (connectionTable) {
213: connectionTable.remove(idObj);
214: --numConnections;
215: }
216: break;
217:
218: // remote endpoint declaring additional bytes receivable
219: case REQUEST:
220: id = dataIn.readUnsignedShort();
221: idObj = new Integer(id);
222: info = (MultiplexConnectionInfo) connectionTable
223: .get(idObj);
224: if (info == null)
225: throw new IOException(
226: "REQUEST: Invalid connection ID");
227: length = dataIn.readInt();
228:
229: if (multiplexLog.isLoggable(Log.VERBOSE)) {
230: multiplexLog.log(Log.VERBOSE,
231: "operation REQUEST " + id + ": "
232: + length);
233: }
234:
235: info.out.request(length);
236: break;
237:
238: // remote endpoint transmitting data packet
239: case TRANSMIT:
240: id = dataIn.readUnsignedShort();
241: idObj = new Integer(id);
242: info = (MultiplexConnectionInfo) connectionTable
243: .get(idObj);
244: if (info == null)
245: throw new IOException(
246: "SEND: Invalid connection ID");
247: length = dataIn.readInt();
248:
249: if (multiplexLog.isLoggable(Log.VERBOSE)) {
250: multiplexLog.log(Log.VERBOSE,
251: "operation TRANSMIT " + id + ": "
252: + length);
253: }
254:
255: info.in.receive(length, dataIn);
256: break;
257:
258: default:
259: throw new IOException("Invalid operation: "
260: + Integer.toHexString(op));
261: }
262: }
263: } finally {
264: shutDown();
265: }
266: }
267:
268: /**
269: * Initiate a new multiplexed connection through the underlying
270: * connection.
271: */
272: public synchronized TCPConnection openConnection()
273: throws IOException {
274: // generate ID that should not be already used
275: // If all possible 32768 IDs are used,
276: // this method will block searching for a new ID forever.
277: int id;
278: Integer idObj;
279: do {
280: lastID = (++lastID) & 0x7FFF;
281: id = lastID;
282:
283: // The orig flag (copied to the high bit of the ID) is used
284: // to have two distinct ranges to choose IDs from for the
285: // two endpoints.
286: if (orig)
287: id |= 0x8000;
288: idObj = new Integer(id);
289: } while (connectionTable.get(idObj) != null);
290:
291: // create multiplexing streams and bookkeeping information
292: MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
293: info.in = new MultiplexInputStream(this , info, 2048);
294: info.out = new MultiplexOutputStream(this , info, 2048);
295:
296: // add to connection table if multiplexer has not died
297: synchronized (connectionTable) {
298: if (!alive)
299: throw new IOException("Multiplexer connection dead");
300: if (numConnections >= maxConnections)
301: throw new IOException("Cannot exceed " + maxConnections
302: + " simultaneous multiplexed connections");
303: connectionTable.put(idObj, info);
304: ++numConnections;
305: }
306:
307: // inform remote endpoint of new connection
308: synchronized (dataOut) {
309: try {
310: dataOut.writeByte(OPEN);
311: dataOut.writeShort(id);
312: dataOut.flush();
313: } catch (IOException e) {
314: multiplexLog.log(Log.BRIEF, "exception: ", e);
315:
316: shutDown();
317: throw e;
318: }
319: }
320:
321: return new TCPConnection(channel, info.in, info.out);
322: }
323:
324: /**
325: * Shut down all connections and clean up.
326: */
327: public void shutDown() {
328: // inform all associated streams
329: synchronized (connectionTable) {
330: // return if multiplexer already officially dead
331: if (!alive)
332: return;
333: alive = false;
334:
335: Enumeration enum_ = connectionTable.elements();
336: while (enum_.hasMoreElements()) {
337: MultiplexConnectionInfo info = (MultiplexConnectionInfo) enum_
338: .nextElement();
339: info.in.disconnect();
340: info.out.disconnect();
341: }
342: connectionTable.clear();
343: numConnections = 0;
344: }
345:
346: // close underlying connection, if possible (and not already done)
347: try {
348: in.close();
349: } catch (IOException e) {
350: }
351: try {
352: out.close();
353: } catch (IOException e) {
354: }
355: }
356:
357: /**
358: * Send request for more data on connection to remote endpoint.
359: * @param info connection information structure
360: * @param len number of more bytes that can be received
361: */
362: void sendRequest(MultiplexConnectionInfo info, int len)
363: throws IOException {
364: synchronized (dataOut) {
365: if (alive && !info.closed)
366: try {
367: dataOut.writeByte(REQUEST);
368: dataOut.writeShort(info.id);
369: dataOut.writeInt(len);
370: dataOut.flush();
371: } catch (IOException e) {
372: multiplexLog.log(Log.BRIEF, "exception: ", e);
373:
374: shutDown();
375: throw e;
376: }
377: }
378: }
379:
380: /**
381: * Send packet of requested data on connection to remote endpoint.
382: * @param info connection information structure
383: * @param buf array containg bytes to send
384: * @param off offset of first array index of packet
385: * @param len number of bytes in packet to send
386: */
387: void sendTransmit(MultiplexConnectionInfo info, byte buf[],
388: int off, int len) throws IOException {
389: synchronized (dataOut) {
390: if (alive && !info.closed)
391: try {
392: dataOut.writeByte(TRANSMIT);
393: dataOut.writeShort(info.id);
394: dataOut.writeInt(len);
395: dataOut.write(buf, off, len);
396: dataOut.flush();
397: } catch (IOException e) {
398: multiplexLog.log(Log.BRIEF, "exception: ", e);
399:
400: shutDown();
401: throw e;
402: }
403: }
404: }
405:
406: /**
407: * Inform remote endpoint that connection has been closed.
408: * @param info connection information structure
409: */
410: void sendClose(MultiplexConnectionInfo info) throws IOException {
411: info.out.disconnect();
412: synchronized (dataOut) {
413: if (alive && !info.closed)
414: try {
415: dataOut.writeByte(CLOSE);
416: dataOut.writeShort(info.id);
417: dataOut.flush();
418: info.closed = true;
419: } catch (IOException e) {
420: multiplexLog.log(Log.BRIEF, "exception: ", e);
421:
422: shutDown();
423: throw e;
424: }
425: }
426: }
427:
428: /**
429: * Acknowledge remote endpoint's closing of connection.
430: * @param info connection information structure
431: */
432: void sendCloseAck(MultiplexConnectionInfo info) throws IOException {
433: synchronized (dataOut) {
434: if (alive && !info.closed)
435: try {
436: dataOut.writeByte(CLOSEACK);
437: dataOut.writeShort(info.id);
438: dataOut.flush();
439: info.closed = true;
440: } catch (IOException e) {
441: multiplexLog.log(Log.BRIEF, "exception: ", e);
442:
443: shutDown();
444: throw e;
445: }
446: }
447: }
448:
449: /**
450: * Shut down connection upon finalization.
451: */
452: protected void finalize() throws Throwable {
453: super.finalize();
454: shutDown();
455: }
456: }
|