001: /*
002: * Copyright 1996-2005 Sun Microsystems, Inc. All Rights Reserved.
003: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
004: *
005: * This code is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU General Public License version 2 only, as
007: * published by the Free Software Foundation. Sun designates this
008: * particular file as subject to the "Classpath" exception as provided
009: * by Sun in the LICENSE file that accompanied this code.
010: *
011: * This code is distributed in the hope that it will be useful, but WITHOUT
012: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
014: * version 2 for more details (a copy is included in the LICENSE file that
015: * accompanied this code).
016: *
017: * You should have received a copy of the GNU General Public License version
018: * 2 along with this work; if not, write to the Free Software Foundation,
019: * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
020: *
021: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
022: * CA 95054 USA or visit www.sun.com if you need additional information or
023: * have any questions.
024: */
025: package sun.rmi.transport.tcp;
026:
027: import java.lang.ref.Reference;
028: import java.lang.ref.SoftReference;
029: import java.lang.ref.WeakReference;
030: import java.lang.reflect.InvocationTargetException;
031: import java.io.DataInputStream;
032: import java.io.DataOutputStream;
033: import java.io.IOException;
034: import java.io.InputStream;
035: import java.io.OutputStream;
036: import java.io.BufferedInputStream;
037: import java.io.BufferedOutputStream;
038: import java.net.InetAddress;
039: import java.net.ServerSocket;
040: import java.net.Socket;
041: import java.rmi.RemoteException;
042: import java.rmi.server.ExportException;
043: import java.rmi.server.LogStream;
044: import java.rmi.server.RMIFailureHandler;
045: import java.rmi.server.RMISocketFactory;
046: import java.rmi.server.RemoteCall;
047: import java.rmi.server.ServerNotActiveException;
048: import java.rmi.server.UID;
049: import java.security.AccessControlContext;
050: import java.security.AccessController;
051: import java.util.ArrayList;
052: import java.util.LinkedList;
053: import java.util.List;
054: import java.util.Map;
055: import java.util.WeakHashMap;
056: import java.util.logging.Level;
057: import java.util.concurrent.ExecutorService;
058: import java.util.concurrent.RejectedExecutionException;
059: import java.util.concurrent.SynchronousQueue;
060: import java.util.concurrent.ThreadFactory;
061: import java.util.concurrent.ThreadPoolExecutor;
062: import java.util.concurrent.TimeUnit;
063: import java.util.concurrent.atomic.AtomicInteger;
064: import sun.rmi.runtime.Log;
065: import sun.rmi.runtime.NewThreadAction;
066: import sun.rmi.transport.Channel;
067: import sun.rmi.transport.Connection;
068: import sun.rmi.transport.DGCAckHandler;
069: import sun.rmi.transport.Endpoint;
070: import sun.rmi.transport.StreamRemoteCall;
071: import sun.rmi.transport.Target;
072: import sun.rmi.transport.Transport;
073: import sun.rmi.transport.TransportConstants;
074: import sun.rmi.transport.proxy.HttpReceiveSocket;
075: import sun.security.action.GetIntegerAction;
076: import sun.security.action.GetLongAction;
077: import sun.security.action.GetPropertyAction;
078:
079: /**
080: * TCPTransport is the socket-based implementation of the RMI Transport
081: * abstraction.
082: *
083: * @author Ann Wollrath
084: * @author Peter Jones
085: */
086: public class TCPTransport extends Transport {
087:
088: /* tcp package log */
089: static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp",
090: "tcp", LogStream.parseLevel(AccessController
091: .doPrivileged(new GetPropertyAction(
092: "sun.rmi.transport.tcp.logLevel"))));
093:
094: /** maximum number of connection handler threads */
095: private static final int maxConnectionThreads = // default no limit
096: AccessController.doPrivileged(new GetIntegerAction(
097: "sun.rmi.transport.tcp.maxConnectionThreads",
098: Integer.MAX_VALUE));
099:
100: /** keep alive time for idle connection handler threads */
101: private static final long threadKeepAliveTime = // default 1 minute
102: AccessController.doPrivileged(new GetLongAction(
103: "sun.rmi.transport.tcp.threadKeepAliveTime", 60000));
104:
105: /** thread pool for connection handlers */
106: private static final ExecutorService connectionThreadPool = new ThreadPoolExecutor(
107: 0, maxConnectionThreads, threadKeepAliveTime,
108: TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
109: new ThreadFactory() {
110: public Thread newThread(Runnable runnable) {
111: return AccessController
112: .doPrivileged(new NewThreadAction(runnable,
113: "TCP Connection(idle)", true, true));
114: }
115: });
116:
117: /** total connections handled */
118: private static final AtomicInteger connectionCount = new AtomicInteger(
119: 0);
120:
121: /** client host for the current thread's connection */
122: private static final ThreadLocal<ConnectionHandler> threadConnectionHandler = new ThreadLocal<ConnectionHandler>();
123:
124: /** endpoints for this transport */
125: private final LinkedList<TCPEndpoint> epList;
126: /** number of objects exported on this transport */
127: private int exportCount = 0;
128: /** server socket for this transport */
129: private ServerSocket server = null;
130: /** table mapping endpoints to channels */
131: private final Map<TCPEndpoint, Reference<TCPChannel>> channelTable = new WeakHashMap<TCPEndpoint, Reference<TCPChannel>>();
132:
133: static final RMISocketFactory defaultSocketFactory = RMISocketFactory
134: .getDefaultSocketFactory();
135:
136: /** number of milliseconds in accepted-connection timeout.
137: * Warning: this should be greater than 15 seconds (the client-side
138: * timeout), and defaults to 2 hours.
139: * The maximum representable value is slightly more than 24 days
140: * and 20 hours.
141: */
142: private static final int connectionReadTimeout = // default 2 hours
143: AccessController.doPrivileged(new GetIntegerAction(
144: "sun.rmi.transport.tcp.readTimeout", 2 * 3600 * 1000));
145:
146: /**
147: * Constructs a TCPTransport.
148: */
149: TCPTransport(LinkedList<TCPEndpoint> epList) {
150: // assert ((epList.size() != null) && (epList.size() >= 1))
151: this .epList = epList;
152: if (tcpLog.isLoggable(Log.BRIEF)) {
153: tcpLog.log(Log.BRIEF, "Version = "
154: + TransportConstants.Version + ", ep = "
155: + getEndpoint());
156: }
157: }
158:
159: /**
160: * Closes all cached connections in every channel subordinated to this
161: * transport. Currently, this only closes outgoing connections.
162: */
163: public void shedConnectionCaches() {
164: List<TCPChannel> channels;
165: synchronized (channelTable) {
166: channels = new ArrayList<TCPChannel>(channelTable.values()
167: .size());
168: for (Reference<TCPChannel> ref : channelTable.values()) {
169: TCPChannel ch = ref.get();
170: if (ch != null) {
171: channels.add(ch);
172: }
173: }
174: }
175: for (TCPChannel channel : channels) {
176: channel.shedCache();
177: }
178: }
179:
180: /**
181: * Returns a <I>Channel</I> that generates connections to the
182: * endpoint <I>ep</I>. A Channel is an object that creates and
183: * manages connections of a particular type to some particular
184: * address space.
185: * @param ep the endpoint to which connections will be generated.
186: * @return the channel or null if the transport cannot
187: * generate connections to this endpoint
188: */
189: public TCPChannel getChannel(Endpoint ep) {
190: TCPChannel ch = null;
191: if (ep instanceof TCPEndpoint) {
192: synchronized (channelTable) {
193: Reference<TCPChannel> ref = channelTable.get(ep);
194: if (ref != null) {
195: ch = ref.get();
196: }
197: if (ch == null) {
198: TCPEndpoint tcpEndpoint = (TCPEndpoint) ep;
199: ch = new TCPChannel(this , tcpEndpoint);
200: channelTable.put(tcpEndpoint,
201: new WeakReference<TCPChannel>(ch));
202: }
203: }
204: }
205: return ch;
206: }
207:
208: /**
209: * Removes the <I>Channel</I> that generates connections to the
210: * endpoint <I>ep</I>.
211: */
212: public void free(Endpoint ep) {
213: if (ep instanceof TCPEndpoint) {
214: synchronized (channelTable) {
215: Reference<TCPChannel> ref = channelTable.remove(ep);
216: if (ref != null) {
217: TCPChannel channel = ref.get();
218: if (channel != null) {
219: channel.shedCache();
220: }
221: }
222: }
223: }
224: }
225:
226: /**
227: * Export the object so that it can accept incoming calls.
228: */
229: public void exportObject(Target target) throws RemoteException {
230: /*
231: * Ensure that a server socket is listening, and count this
232: * export while synchronized to prevent the server socket from
233: * being closed due to concurrent unexports.
234: */
235: synchronized (this ) {
236: listen();
237: exportCount++;
238: }
239:
240: /*
241: * Try to add the Target to the exported object table; keep
242: * counting this export (to keep server socket open) only if
243: * that succeeds.
244: */
245: boolean ok = false;
246: try {
247: super .exportObject(target);
248: ok = true;
249: } finally {
250: if (!ok) {
251: synchronized (this ) {
252: decrementExportCount();
253: }
254: }
255: }
256: }
257:
258: protected synchronized void targetUnexported() {
259: decrementExportCount();
260: }
261:
262: /**
263: * Decrements the count of exported objects, closing the current
264: * server socket if the count reaches zero.
265: **/
266: private void decrementExportCount() {
267: assert Thread.holdsLock(this );
268: exportCount--;
269: if (exportCount == 0 && getEndpoint().getListenPort() != 0) {
270: ServerSocket ss = server;
271: server = null;
272: try {
273: ss.close();
274: } catch (IOException e) {
275: }
276: }
277: }
278:
279: /**
280: * Verify that the current access control context has permission to
281: * accept the connection being dispatched by the current thread.
282: */
283: protected void checkAcceptPermission(AccessControlContext acc) {
284: SecurityManager sm = System.getSecurityManager();
285: if (sm == null) {
286: return;
287: }
288: ConnectionHandler h = threadConnectionHandler.get();
289: if (h == null) {
290: throw new Error(
291: "checkAcceptPermission not in ConnectionHandler thread");
292: }
293: h.checkAcceptPermission(sm, acc);
294: }
295:
296: private TCPEndpoint getEndpoint() {
297: synchronized (epList) {
298: return epList.getLast();
299: }
300: }
301:
302: /**
303: * Listen on transport's endpoint.
304: */
305: private void listen() throws RemoteException {
306: assert Thread.holdsLock(this );
307: TCPEndpoint ep = getEndpoint();
308: int port = ep.getPort();
309:
310: if (server == null) {
311: if (tcpLog.isLoggable(Log.BRIEF)) {
312: tcpLog.log(Log.BRIEF, "(port " + port
313: + ") create server socket");
314: }
315:
316: try {
317: server = ep.newServerSocket();
318: /*
319: * Don't retry ServerSocket if creation fails since
320: * "port in use" will cause export to hang if an
321: * RMIFailureHandler is not installed.
322: */
323: Thread t = AccessController
324: .doPrivileged(new NewThreadAction(
325: new AcceptLoop(server), "TCP Accept-"
326: + port, true));
327: t.start();
328: } catch (java.net.BindException e) {
329: throw new ExportException("Port already in use: "
330: + port, e);
331: } catch (IOException e) {
332: throw new ExportException("Listen failed on port: "
333: + port, e);
334: }
335:
336: } else {
337: // otherwise verify security access to existing server socket
338: SecurityManager sm = System.getSecurityManager();
339: if (sm != null) {
340: sm.checkListen(port);
341: }
342: }
343: }
344:
345: /**
346: * Worker for accepting connections from a server socket.
347: **/
348: private class AcceptLoop implements Runnable {
349:
350: private final ServerSocket serverSocket;
351:
352: // state for throttling loop on exceptions (local to accept thread)
353: private long lastExceptionTime = 0L;
354: private int recentExceptionCount;
355:
356: AcceptLoop(ServerSocket serverSocket) {
357: this .serverSocket = serverSocket;
358: }
359:
360: public void run() {
361: try {
362: executeAcceptLoop();
363: } finally {
364: try {
365: /*
366: * Only one accept loop is started per server
367: * socket, so after no more connections will be
368: * accepted, ensure that the server socket is no
369: * longer listening.
370: */
371: serverSocket.close();
372: } catch (IOException e) {
373: }
374: }
375: }
376:
377: /**
378: * Accepts connections from the server socket and executes
379: * handlers for them in the thread pool.
380: **/
381: private void executeAcceptLoop() {
382: if (tcpLog.isLoggable(Log.BRIEF)) {
383: tcpLog.log(Log.BRIEF, "listening on port "
384: + getEndpoint().getPort());
385: }
386:
387: while (true) {
388: Socket socket = null;
389: try {
390: socket = serverSocket.accept();
391:
392: /*
393: * Find client host name (or "0.0.0.0" if unknown)
394: */
395: InetAddress clientAddr = socket.getInetAddress();
396: String clientHost = (clientAddr != null ? clientAddr
397: .getHostAddress()
398: : "0.0.0.0");
399:
400: /*
401: * Execute connection handler in the thread pool,
402: * which uses non-system threads.
403: */
404: try {
405: connectionThreadPool
406: .execute(new ConnectionHandler(socket,
407: clientHost));
408: } catch (RejectedExecutionException e) {
409: closeSocket(socket);
410: tcpLog.log(Log.BRIEF,
411: "rejected connection from "
412: + clientHost);
413: }
414:
415: } catch (Throwable t) {
416: try {
417: /*
418: * If the server socket has been closed, such
419: * as because there are no more exported
420: * objects, then we expect accept to throw an
421: * exception, so just terminate normally.
422: */
423: if (serverSocket.isClosed()) {
424: break;
425: }
426:
427: try {
428: if (tcpLog.isLoggable(Level.WARNING)) {
429: tcpLog.log(Level.WARNING,
430: "accept loop for "
431: + serverSocket
432: + " throws", t);
433: }
434: } catch (Throwable tt) {
435: }
436: } finally {
437: /*
438: * Always close the accepted socket (if any)
439: * if an exception occurs, but only after
440: * logging an unexpected exception.
441: */
442: if (socket != null) {
443: closeSocket(socket);
444: }
445: }
446:
447: /*
448: * In case we're running out of file descriptors,
449: * release resources held in caches.
450: */
451: if (!(t instanceof SecurityException)) {
452: try {
453: TCPEndpoint.shedConnectionCaches();
454: } catch (Throwable tt) {
455: }
456: }
457:
458: /*
459: * A NoClassDefFoundError can occur if no file
460: * descriptors are available, in which case this
461: * loop should not terminate.
462: */
463: if (t instanceof Exception
464: || t instanceof OutOfMemoryError
465: || t instanceof NoClassDefFoundError) {
466: if (!continueAfterAcceptFailure(t)) {
467: return;
468: }
469: // continue loop
470: } else {
471: throw (Error) t;
472: }
473: }
474: }
475: }
476:
477: /**
478: * Returns true if the accept loop should continue after the
479: * specified exception has been caught, or false if the accept
480: * loop should terminate (closing the server socket). If
481: * there is an RMIFailureHandler, this method returns the
482: * result of passing the specified exception to it; otherwise,
483: * this method always returns true, after sleeping to throttle
484: * the accept loop if necessary.
485: **/
486: private boolean continueAfterAcceptFailure(Throwable t) {
487: RMIFailureHandler fh = RMISocketFactory.getFailureHandler();
488: if (fh != null) {
489: return fh
490: .failure(t instanceof Exception ? (Exception) t
491: : new InvocationTargetException(t));
492: } else {
493: throttleLoopOnException();
494: return true;
495: }
496: }
497:
498: /**
499: * Throttles the accept loop after an exception has been
500: * caught: if a burst of 10 exceptions in 5 seconds occurs,
501: * then wait for 10 seconds to curb busy CPU usage.
502: **/
503: private void throttleLoopOnException() {
504: long now = System.currentTimeMillis();
505: if (lastExceptionTime == 0L
506: || (now - lastExceptionTime) > 5000) {
507: // last exception was long ago (or this is the first)
508: lastExceptionTime = now;
509: recentExceptionCount = 0;
510: } else {
511: // exception burst window was started recently
512: if (++recentExceptionCount >= 10) {
513: try {
514: Thread.sleep(10000);
515: } catch (InterruptedException ignore) {
516: }
517: }
518: }
519: }
520: }
521:
522: /** close socket and eat exception */
523: private static void closeSocket(Socket sock) {
524: try {
525: sock.close();
526: } catch (IOException ex) {
527: // eat exception
528: }
529: }
530:
531: /**
532: * handleMessages decodes transport operations and handles messages
533: * appropriately. If an exception occurs during message handling,
534: * the socket is closed.
535: */
536: void handleMessages(Connection conn, boolean persistent) {
537: int port = getEndpoint().getPort();
538:
539: try {
540: DataInputStream in = new DataInputStream(conn
541: .getInputStream());
542: do {
543: int op = in.read(); // transport op
544: if (op == -1) {
545: if (tcpLog.isLoggable(Log.BRIEF)) {
546: tcpLog.log(Log.BRIEF, "(port " + port
547: + ") connection closed");
548: }
549: break;
550: }
551:
552: if (tcpLog.isLoggable(Log.BRIEF)) {
553: tcpLog.log(Log.BRIEF, "(port " + port + ") op = "
554: + op);
555: }
556:
557: switch (op) {
558: case TransportConstants.Call:
559: // service incoming RMI call
560: RemoteCall call = new StreamRemoteCall(conn);
561: if (serviceCall(call) == false)
562: return;
563: break;
564:
565: case TransportConstants.Ping:
566: // send ack for ping
567: DataOutputStream out = new DataOutputStream(conn
568: .getOutputStream());
569: out.writeByte(TransportConstants.PingAck);
570: conn.releaseOutputStream();
571: break;
572:
573: case TransportConstants.DGCAck:
574: DGCAckHandler.received(UID.read(in));
575: break;
576:
577: default:
578: throw new IOException("unknown transport op " + op);
579: }
580: } while (persistent);
581:
582: } catch (IOException e) {
583: // exception during processing causes connection to close (below)
584: if (tcpLog.isLoggable(Log.BRIEF)) {
585: tcpLog.log(Log.BRIEF,
586: "(port " + port + ") exception: ", e);
587: }
588: } finally {
589: try {
590: conn.close();
591: } catch (IOException ex) {
592: // eat exception
593: }
594: }
595: }
596:
597: /**
598: * Returns the client host for the current thread's connection. Throws
599: * ServerNotActiveException if no connection is active for this thread.
600: */
601: public static String getClientHost()
602: throws ServerNotActiveException {
603: ConnectionHandler h = threadConnectionHandler.get();
604: if (h != null) {
605: return h.getClientHost();
606: } else {
607: throw new ServerNotActiveException("not in a remote call");
608: }
609: }
610:
611: /**
612: * Services messages on accepted connection
613: */
614: private class ConnectionHandler implements Runnable {
615:
616: /** int value of "POST" in ASCII (Java's specified data formats
617: * make this once-reviled tactic again socially acceptable) */
618: private static final int POST = 0x504f5354;
619:
620: /** most recently accept-authorized AccessControlContext */
621: private AccessControlContext okContext;
622: /** cache of accept-authorized AccessControlContexts */
623: private Map<AccessControlContext, Reference<AccessControlContext>> authCache;
624: /** security manager which authorized contexts in authCache */
625: private SecurityManager cacheSecurityManager = null;
626:
627: private Socket socket;
628: private String remoteHost;
629:
630: ConnectionHandler(Socket socket, String remoteHost) {
631: this .socket = socket;
632: this .remoteHost = remoteHost;
633: }
634:
635: String getClientHost() {
636: return remoteHost;
637: }
638:
639: /**
640: * Verify that the given AccessControlContext has permission to
641: * accept this connection.
642: */
643: void checkAcceptPermission(SecurityManager sm,
644: AccessControlContext acc) {
645: /*
646: * Note: no need to synchronize on cache-related fields, since this
647: * method only gets called from the ConnectionHandler's thread.
648: */
649: if (sm != cacheSecurityManager) {
650: okContext = null;
651: authCache = new WeakHashMap<AccessControlContext, Reference<AccessControlContext>>();
652: cacheSecurityManager = sm;
653: }
654: if (acc.equals(okContext) || authCache.containsKey(acc)) {
655: return;
656: }
657: InetAddress addr = socket.getInetAddress();
658: String host = (addr != null) ? addr.getHostAddress() : "*";
659:
660: sm.checkAccept(host, socket.getPort());
661:
662: authCache.put(acc, new SoftReference<AccessControlContext>(
663: acc));
664: okContext = acc;
665: }
666:
667: public void run() {
668: Thread t = Thread.currentThread();
669: String name = t.getName();
670: try {
671: t.setName("RMI TCP Connection("
672: + connectionCount.incrementAndGet() + ")-"
673: + remoteHost);
674: run0();
675: } finally {
676: t.setName(name);
677: }
678: }
679:
680: private void run0() {
681: TCPEndpoint endpoint = getEndpoint();
682: int port = endpoint.getPort();
683:
684: threadConnectionHandler.set(this );
685:
686: // set socket to disable Nagle's algorithm (always send
687: // immediately)
688: // TBD: should this be left up to socket factory instead?
689: try {
690: socket.setTcpNoDelay(true);
691: } catch (Exception e) {
692: // if we fail to set this, ignore and proceed anyway
693: }
694: // set socket to timeout after excessive idle time
695: try {
696: if (connectionReadTimeout > 0)
697: socket.setSoTimeout(connectionReadTimeout);
698: } catch (Exception e) {
699: // too bad, continue anyway
700: }
701:
702: try {
703: InputStream sockIn = socket.getInputStream();
704: InputStream bufIn = sockIn.markSupported() ? sockIn
705: : new BufferedInputStream(sockIn);
706:
707: // Read magic (or HTTP wrapper)
708: bufIn.mark(4);
709: DataInputStream in = new DataInputStream(bufIn);
710: int magic = in.readInt();
711:
712: if (magic == POST) {
713: tcpLog.log(Log.BRIEF, "decoding HTTP-wrapped call");
714:
715: // It's really a HTTP-wrapped request. Repackage
716: // the socket in a HttpReceiveSocket, reinitialize
717: // sockIn and in, and reread magic.
718: bufIn.reset(); // unread "POST"
719:
720: try {
721: socket = new HttpReceiveSocket(socket, bufIn,
722: null);
723: remoteHost = "0.0.0.0";
724: sockIn = socket.getInputStream();
725: bufIn = new BufferedInputStream(sockIn);
726: in = new DataInputStream(bufIn);
727: magic = in.readInt();
728:
729: } catch (IOException e) {
730: throw new RemoteException(
731: "Error HTTP-unwrapping call", e);
732: }
733: }
734: // bufIn's mark will invalidate itself when it overflows
735: // so it doesn't have to be turned off
736:
737: // read and verify transport header
738: short version = in.readShort();
739: if (magic != TransportConstants.Magic
740: || version != TransportConstants.Version) {
741: // protocol mismatch detected...
742: // just close socket: this would recurse if we marshal an
743: // exception to the client and the protocol at other end
744: // doesn't match.
745: closeSocket(socket);
746: return;
747: }
748:
749: OutputStream sockOut = socket.getOutputStream();
750: BufferedOutputStream bufOut = new BufferedOutputStream(
751: sockOut);
752: DataOutputStream out = new DataOutputStream(bufOut);
753:
754: int remotePort = socket.getPort();
755:
756: if (tcpLog.isLoggable(Log.BRIEF)) {
757: tcpLog.log(Log.BRIEF, "accepted socket from ["
758: + remoteHost + ":" + remotePort + "]");
759: }
760:
761: TCPEndpoint ep;
762: TCPChannel ch;
763: TCPConnection conn;
764:
765: // send ack (or nack) for protocol
766: byte protocol = in.readByte();
767: switch (protocol) {
768: case TransportConstants.SingleOpProtocol:
769: // no ack for protocol
770:
771: // create dummy channel for receiving messages
772: ep = new TCPEndpoint(remoteHost, socket
773: .getLocalPort(), endpoint
774: .getClientSocketFactory(), endpoint
775: .getServerSocketFactory());
776: ch = new TCPChannel(TCPTransport.this , ep);
777: conn = new TCPConnection(ch, socket, bufIn, bufOut);
778:
779: // read input messages
780: handleMessages(conn, false);
781: break;
782:
783: case TransportConstants.StreamProtocol:
784: // send ack
785: out.writeByte(TransportConstants.ProtocolAck);
786:
787: // suggest endpoint (in case client doesn't know host name)
788: if (tcpLog.isLoggable(Log.VERBOSE)) {
789: tcpLog.log(Log.VERBOSE, "(port " + port + ") "
790: + "suggesting " + remoteHost + ":"
791: + remotePort);
792: }
793:
794: out.writeUTF(remoteHost);
795: out.writeInt(remotePort);
796: out.flush();
797:
798: // read and discard (possibly bogus) endpoint
799: // REMIND: would be faster to read 2 bytes then skip N+4
800: String clientHost = in.readUTF();
801: int clientPort = in.readInt();
802: if (tcpLog.isLoggable(Log.VERBOSE)) {
803: tcpLog.log(Log.VERBOSE, "(port " + port
804: + ") client using " + clientHost + ":"
805: + clientPort);
806: }
807:
808: // create dummy channel for receiving messages
809: // (why not use clientHost and clientPort?)
810: ep = new TCPEndpoint(remoteHost, socket
811: .getLocalPort(), endpoint
812: .getClientSocketFactory(), endpoint
813: .getServerSocketFactory());
814: ch = new TCPChannel(TCPTransport.this , ep);
815: conn = new TCPConnection(ch, socket, bufIn, bufOut);
816:
817: // read input messages
818: handleMessages(conn, true);
819: break;
820:
821: case TransportConstants.MultiplexProtocol:
822: if (tcpLog.isLoggable(Log.VERBOSE)) {
823: tcpLog.log(Log.VERBOSE, "(port " + port
824: + ") accepting multiplex protocol");
825: }
826:
827: // send ack
828: out.writeByte(TransportConstants.ProtocolAck);
829:
830: // suggest endpoint (in case client doesn't already have one)
831: if (tcpLog.isLoggable(Log.VERBOSE)) {
832: tcpLog.log(Log.VERBOSE, "(port " + port
833: + ") suggesting " + remoteHost + ":"
834: + remotePort);
835: }
836:
837: out.writeUTF(remoteHost);
838: out.writeInt(remotePort);
839: out.flush();
840:
841: // read endpoint client has decided to use
842: ep = new TCPEndpoint(in.readUTF(), in.readInt(),
843: endpoint.getClientSocketFactory(), endpoint
844: .getServerSocketFactory());
845: if (tcpLog.isLoggable(Log.VERBOSE)) {
846: tcpLog.log(Log.VERBOSE, "(port " + port
847: + ") client using " + ep.getHost()
848: + ":" + ep.getPort());
849: }
850:
851: ConnectionMultiplexer multiplexer;
852: synchronized (channelTable) {
853: // create or find channel for this endpoint
854: ch = getChannel(ep);
855: multiplexer = new ConnectionMultiplexer(ch,
856: bufIn, sockOut, false);
857: ch.useMultiplexer(multiplexer);
858: }
859: multiplexer.run();
860: break;
861:
862: default:
863: // protocol not understood, send nack and close socket
864: out.writeByte(TransportConstants.ProtocolNack);
865: out.flush();
866: break;
867: }
868:
869: } catch (IOException e) {
870: // socket in unknown state: destroy socket
871: tcpLog.log(Log.BRIEF, "terminated with exception:", e);
872: } finally {
873: closeSocket(socket);
874: }
875: }
876: }
877: }
|