Source Code Cross Referenced for TCPTransport.java in  » 6.0-JDK-Modules-sun » rmi » sun » rmi » transport » tcp » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » 6.0 JDK Modules sun » rmi » sun.rmi.transport.tcp 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Copyright 1996-2005 Sun Microsystems, Inc.  All Rights Reserved.
003:         * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
004:         *
005:         * This code is free software; you can redistribute it and/or modify it
006:         * under the terms of the GNU General Public License version 2 only, as
007:         * published by the Free Software Foundation.  Sun designates this
008:         * particular file as subject to the "Classpath" exception as provided
009:         * by Sun in the LICENSE file that accompanied this code.
010:         *
011:         * This code is distributed in the hope that it will be useful, but WITHOUT
012:         * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013:         * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
014:         * version 2 for more details (a copy is included in the LICENSE file that
015:         * accompanied this code).
016:         *
017:         * You should have received a copy of the GNU General Public License version
018:         * 2 along with this work; if not, write to the Free Software Foundation,
019:         * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
020:         *
021:         * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
022:         * CA 95054 USA or visit www.sun.com if you need additional information or
023:         * have any questions.
024:         */
025:        package sun.rmi.transport.tcp;
026:
027:        import java.lang.ref.Reference;
028:        import java.lang.ref.SoftReference;
029:        import java.lang.ref.WeakReference;
030:        import java.lang.reflect.InvocationTargetException;
031:        import java.io.DataInputStream;
032:        import java.io.DataOutputStream;
033:        import java.io.IOException;
034:        import java.io.InputStream;
035:        import java.io.OutputStream;
036:        import java.io.BufferedInputStream;
037:        import java.io.BufferedOutputStream;
038:        import java.net.InetAddress;
039:        import java.net.ServerSocket;
040:        import java.net.Socket;
041:        import java.rmi.RemoteException;
042:        import java.rmi.server.ExportException;
043:        import java.rmi.server.LogStream;
044:        import java.rmi.server.RMIFailureHandler;
045:        import java.rmi.server.RMISocketFactory;
046:        import java.rmi.server.RemoteCall;
047:        import java.rmi.server.ServerNotActiveException;
048:        import java.rmi.server.UID;
049:        import java.security.AccessControlContext;
050:        import java.security.AccessController;
051:        import java.util.ArrayList;
052:        import java.util.LinkedList;
053:        import java.util.List;
054:        import java.util.Map;
055:        import java.util.WeakHashMap;
056:        import java.util.logging.Level;
057:        import java.util.concurrent.ExecutorService;
058:        import java.util.concurrent.RejectedExecutionException;
059:        import java.util.concurrent.SynchronousQueue;
060:        import java.util.concurrent.ThreadFactory;
061:        import java.util.concurrent.ThreadPoolExecutor;
062:        import java.util.concurrent.TimeUnit;
063:        import java.util.concurrent.atomic.AtomicInteger;
064:        import sun.rmi.runtime.Log;
065:        import sun.rmi.runtime.NewThreadAction;
066:        import sun.rmi.transport.Channel;
067:        import sun.rmi.transport.Connection;
068:        import sun.rmi.transport.DGCAckHandler;
069:        import sun.rmi.transport.Endpoint;
070:        import sun.rmi.transport.StreamRemoteCall;
071:        import sun.rmi.transport.Target;
072:        import sun.rmi.transport.Transport;
073:        import sun.rmi.transport.TransportConstants;
074:        import sun.rmi.transport.proxy.HttpReceiveSocket;
075:        import sun.security.action.GetIntegerAction;
076:        import sun.security.action.GetLongAction;
077:        import sun.security.action.GetPropertyAction;
078:
079:        /**
080:         * TCPTransport is the socket-based implementation of the RMI Transport
081:         * abstraction.
082:         *
083:         * @author Ann Wollrath
084:         * @author Peter Jones
085:         */
086:        public class TCPTransport extends Transport {
087:
088:            /* tcp package log */
089:            static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp",
090:                    "tcp", LogStream.parseLevel(AccessController
091:                            .doPrivileged(new GetPropertyAction(
092:                                    "sun.rmi.transport.tcp.logLevel"))));
093:
094:            /** maximum number of connection handler threads */
095:            private static final int maxConnectionThreads = // default no limit
096:            AccessController.doPrivileged(new GetIntegerAction(
097:                    "sun.rmi.transport.tcp.maxConnectionThreads",
098:                    Integer.MAX_VALUE));
099:
100:            /** keep alive time for idle connection handler threads */
101:            private static final long threadKeepAliveTime = // default 1 minute
102:            AccessController.doPrivileged(new GetLongAction(
103:                    "sun.rmi.transport.tcp.threadKeepAliveTime", 60000));
104:
105:            /** thread pool for connection handlers */
106:            private static final ExecutorService connectionThreadPool = new ThreadPoolExecutor(
107:                    0, maxConnectionThreads, threadKeepAliveTime,
108:                    TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
109:                    new ThreadFactory() {
110:                        public Thread newThread(Runnable runnable) {
111:                            return AccessController
112:                                    .doPrivileged(new NewThreadAction(runnable,
113:                                            "TCP Connection(idle)", true, true));
114:                        }
115:                    });
116:
117:            /** total connections handled */
118:            private static final AtomicInteger connectionCount = new AtomicInteger(
119:                    0);
120:
121:            /** client host for the current thread's connection */
122:            private static final ThreadLocal<ConnectionHandler> threadConnectionHandler = new ThreadLocal<ConnectionHandler>();
123:
124:            /** endpoints for this transport */
125:            private final LinkedList<TCPEndpoint> epList;
126:            /** number of objects exported on this transport */
127:            private int exportCount = 0;
128:            /** server socket for this transport */
129:            private ServerSocket server = null;
130:            /** table mapping endpoints to channels */
131:            private final Map<TCPEndpoint, Reference<TCPChannel>> channelTable = new WeakHashMap<TCPEndpoint, Reference<TCPChannel>>();
132:
133:            static final RMISocketFactory defaultSocketFactory = RMISocketFactory
134:                    .getDefaultSocketFactory();
135:
136:            /** number of milliseconds in accepted-connection timeout.
137:             * Warning: this should be greater than 15 seconds (the client-side
138:             * timeout), and defaults to 2 hours.
139:             * The maximum representable value is slightly more than 24 days
140:             * and 20 hours.
141:             */
142:            private static final int connectionReadTimeout = // default 2 hours
143:            AccessController.doPrivileged(new GetIntegerAction(
144:                    "sun.rmi.transport.tcp.readTimeout", 2 * 3600 * 1000));
145:
146:            /**
147:             * Constructs a TCPTransport.
148:             */
149:            TCPTransport(LinkedList<TCPEndpoint> epList) {
150:                // assert ((epList.size() != null) && (epList.size() >= 1))
151:                this .epList = epList;
152:                if (tcpLog.isLoggable(Log.BRIEF)) {
153:                    tcpLog.log(Log.BRIEF, "Version = "
154:                            + TransportConstants.Version + ", ep = "
155:                            + getEndpoint());
156:                }
157:            }
158:
159:            /**
160:             * Closes all cached connections in every channel subordinated to this
161:             * transport.  Currently, this only closes outgoing connections.
162:             */
163:            public void shedConnectionCaches() {
164:                List<TCPChannel> channels;
165:                synchronized (channelTable) {
166:                    channels = new ArrayList<TCPChannel>(channelTable.values()
167:                            .size());
168:                    for (Reference<TCPChannel> ref : channelTable.values()) {
169:                        TCPChannel ch = ref.get();
170:                        if (ch != null) {
171:                            channels.add(ch);
172:                        }
173:                    }
174:                }
175:                for (TCPChannel channel : channels) {
176:                    channel.shedCache();
177:                }
178:            }
179:
180:            /**
181:             * Returns a <I>Channel</I> that generates connections to the
182:             * endpoint <I>ep</I>. A Channel is an object that creates and
183:             * manages connections of a particular type to some particular
184:             * address space.
185:             * @param ep the endpoint to which connections will be generated.
186:             * @return the channel or null if the transport cannot
187:             * generate connections to this endpoint
188:             */
189:            public TCPChannel getChannel(Endpoint ep) {
190:                TCPChannel ch = null;
191:                if (ep instanceof  TCPEndpoint) {
192:                    synchronized (channelTable) {
193:                        Reference<TCPChannel> ref = channelTable.get(ep);
194:                        if (ref != null) {
195:                            ch = ref.get();
196:                        }
197:                        if (ch == null) {
198:                            TCPEndpoint tcpEndpoint = (TCPEndpoint) ep;
199:                            ch = new TCPChannel(this , tcpEndpoint);
200:                            channelTable.put(tcpEndpoint,
201:                                    new WeakReference<TCPChannel>(ch));
202:                        }
203:                    }
204:                }
205:                return ch;
206:            }
207:
208:            /**
209:             * Removes the <I>Channel</I> that generates connections to the
210:             * endpoint <I>ep</I>.
211:             */
212:            public void free(Endpoint ep) {
213:                if (ep instanceof  TCPEndpoint) {
214:                    synchronized (channelTable) {
215:                        Reference<TCPChannel> ref = channelTable.remove(ep);
216:                        if (ref != null) {
217:                            TCPChannel channel = ref.get();
218:                            if (channel != null) {
219:                                channel.shedCache();
220:                            }
221:                        }
222:                    }
223:                }
224:            }
225:
226:            /**
227:             * Export the object so that it can accept incoming calls.
228:             */
229:            public void exportObject(Target target) throws RemoteException {
230:                /*
231:                 * Ensure that a server socket is listening, and count this
232:                 * export while synchronized to prevent the server socket from
233:                 * being closed due to concurrent unexports.
234:                 */
235:                synchronized (this ) {
236:                    listen();
237:                    exportCount++;
238:                }
239:
240:                /*
241:                 * Try to add the Target to the exported object table; keep
242:                 * counting this export (to keep server socket open) only if
243:                 * that succeeds.
244:                 */
245:                boolean ok = false;
246:                try {
247:                    super .exportObject(target);
248:                    ok = true;
249:                } finally {
250:                    if (!ok) {
251:                        synchronized (this ) {
252:                            decrementExportCount();
253:                        }
254:                    }
255:                }
256:            }
257:
258:            protected synchronized void targetUnexported() {
259:                decrementExportCount();
260:            }
261:
262:            /**
263:             * Decrements the count of exported objects, closing the current
264:             * server socket if the count reaches zero.
265:             **/
266:            private void decrementExportCount() {
267:                assert Thread.holdsLock(this );
268:                exportCount--;
269:                if (exportCount == 0 && getEndpoint().getListenPort() != 0) {
270:                    ServerSocket ss = server;
271:                    server = null;
272:                    try {
273:                        ss.close();
274:                    } catch (IOException e) {
275:                    }
276:                }
277:            }
278:
279:            /**
280:             * Verify that the current access control context has permission to
281:             * accept the connection being dispatched by the current thread.
282:             */
283:            protected void checkAcceptPermission(AccessControlContext acc) {
284:                SecurityManager sm = System.getSecurityManager();
285:                if (sm == null) {
286:                    return;
287:                }
288:                ConnectionHandler h = threadConnectionHandler.get();
289:                if (h == null) {
290:                    throw new Error(
291:                            "checkAcceptPermission not in ConnectionHandler thread");
292:                }
293:                h.checkAcceptPermission(sm, acc);
294:            }
295:
296:            private TCPEndpoint getEndpoint() {
297:                synchronized (epList) {
298:                    return epList.getLast();
299:                }
300:            }
301:
302:            /**
303:             * Listen on transport's endpoint.
304:             */
305:            private void listen() throws RemoteException {
306:                assert Thread.holdsLock(this );
307:                TCPEndpoint ep = getEndpoint();
308:                int port = ep.getPort();
309:
310:                if (server == null) {
311:                    if (tcpLog.isLoggable(Log.BRIEF)) {
312:                        tcpLog.log(Log.BRIEF, "(port " + port
313:                                + ") create server socket");
314:                    }
315:
316:                    try {
317:                        server = ep.newServerSocket();
318:                        /*
319:                         * Don't retry ServerSocket if creation fails since
320:                         * "port in use" will cause export to hang if an
321:                         * RMIFailureHandler is not installed.
322:                         */
323:                        Thread t = AccessController
324:                                .doPrivileged(new NewThreadAction(
325:                                        new AcceptLoop(server), "TCP Accept-"
326:                                                + port, true));
327:                        t.start();
328:                    } catch (java.net.BindException e) {
329:                        throw new ExportException("Port already in use: "
330:                                + port, e);
331:                    } catch (IOException e) {
332:                        throw new ExportException("Listen failed on port: "
333:                                + port, e);
334:                    }
335:
336:                } else {
337:                    // otherwise verify security access to existing server socket
338:                    SecurityManager sm = System.getSecurityManager();
339:                    if (sm != null) {
340:                        sm.checkListen(port);
341:                    }
342:                }
343:            }
344:
345:            /**
346:             * Worker for accepting connections from a server socket.
347:             **/
348:            private class AcceptLoop implements  Runnable {
349:
350:                private final ServerSocket serverSocket;
351:
352:                // state for throttling loop on exceptions (local to accept thread)
353:                private long lastExceptionTime = 0L;
354:                private int recentExceptionCount;
355:
356:                AcceptLoop(ServerSocket serverSocket) {
357:                    this .serverSocket = serverSocket;
358:                }
359:
360:                public void run() {
361:                    try {
362:                        executeAcceptLoop();
363:                    } finally {
364:                        try {
365:                            /*
366:                             * Only one accept loop is started per server
367:                             * socket, so after no more connections will be
368:                             * accepted, ensure that the server socket is no
369:                             * longer listening.
370:                             */
371:                            serverSocket.close();
372:                        } catch (IOException e) {
373:                        }
374:                    }
375:                }
376:
377:                /**
378:                 * Accepts connections from the server socket and executes
379:                 * handlers for them in the thread pool.
380:                 **/
381:                private void executeAcceptLoop() {
382:                    if (tcpLog.isLoggable(Log.BRIEF)) {
383:                        tcpLog.log(Log.BRIEF, "listening on port "
384:                                + getEndpoint().getPort());
385:                    }
386:
387:                    while (true) {
388:                        Socket socket = null;
389:                        try {
390:                            socket = serverSocket.accept();
391:
392:                            /*
393:                             * Find client host name (or "0.0.0.0" if unknown)
394:                             */
395:                            InetAddress clientAddr = socket.getInetAddress();
396:                            String clientHost = (clientAddr != null ? clientAddr
397:                                    .getHostAddress()
398:                                    : "0.0.0.0");
399:
400:                            /*
401:                             * Execute connection handler in the thread pool,
402:                             * which uses non-system threads.
403:                             */
404:                            try {
405:                                connectionThreadPool
406:                                        .execute(new ConnectionHandler(socket,
407:                                                clientHost));
408:                            } catch (RejectedExecutionException e) {
409:                                closeSocket(socket);
410:                                tcpLog.log(Log.BRIEF,
411:                                        "rejected connection from "
412:                                                + clientHost);
413:                            }
414:
415:                        } catch (Throwable t) {
416:                            try {
417:                                /*
418:                                 * If the server socket has been closed, such
419:                                 * as because there are no more exported
420:                                 * objects, then we expect accept to throw an
421:                                 * exception, so just terminate normally.
422:                                 */
423:                                if (serverSocket.isClosed()) {
424:                                    break;
425:                                }
426:
427:                                try {
428:                                    if (tcpLog.isLoggable(Level.WARNING)) {
429:                                        tcpLog.log(Level.WARNING,
430:                                                "accept loop for "
431:                                                        + serverSocket
432:                                                        + " throws", t);
433:                                    }
434:                                } catch (Throwable tt) {
435:                                }
436:                            } finally {
437:                                /*
438:                                 * Always close the accepted socket (if any)
439:                                 * if an exception occurs, but only after
440:                                 * logging an unexpected exception.
441:                                 */
442:                                if (socket != null) {
443:                                    closeSocket(socket);
444:                                }
445:                            }
446:
447:                            /*
448:                             * In case we're running out of file descriptors,
449:                             * release resources held in caches.
450:                             */
451:                            if (!(t instanceof  SecurityException)) {
452:                                try {
453:                                    TCPEndpoint.shedConnectionCaches();
454:                                } catch (Throwable tt) {
455:                                }
456:                            }
457:
458:                            /*
459:                             * A NoClassDefFoundError can occur if no file
460:                             * descriptors are available, in which case this
461:                             * loop should not terminate.
462:                             */
463:                            if (t instanceof  Exception
464:                                    || t instanceof  OutOfMemoryError
465:                                    || t instanceof  NoClassDefFoundError) {
466:                                if (!continueAfterAcceptFailure(t)) {
467:                                    return;
468:                                }
469:                                // continue loop
470:                            } else {
471:                                throw (Error) t;
472:                            }
473:                        }
474:                    }
475:                }
476:
477:                /**
478:                 * Returns true if the accept loop should continue after the
479:                 * specified exception has been caught, or false if the accept
480:                 * loop should terminate (closing the server socket).  If
481:                 * there is an RMIFailureHandler, this method returns the
482:                 * result of passing the specified exception to it; otherwise,
483:                 * this method always returns true, after sleeping to throttle
484:                 * the accept loop if necessary.
485:                 **/
486:                private boolean continueAfterAcceptFailure(Throwable t) {
487:                    RMIFailureHandler fh = RMISocketFactory.getFailureHandler();
488:                    if (fh != null) {
489:                        return fh
490:                                .failure(t instanceof  Exception ? (Exception) t
491:                                        : new InvocationTargetException(t));
492:                    } else {
493:                        throttleLoopOnException();
494:                        return true;
495:                    }
496:                }
497:
498:                /**
499:                 * Throttles the accept loop after an exception has been
500:                 * caught: if a burst of 10 exceptions in 5 seconds occurs,
501:                 * then wait for 10 seconds to curb busy CPU usage.
502:                 **/
503:                private void throttleLoopOnException() {
504:                    long now = System.currentTimeMillis();
505:                    if (lastExceptionTime == 0L
506:                            || (now - lastExceptionTime) > 5000) {
507:                        // last exception was long ago (or this is the first)
508:                        lastExceptionTime = now;
509:                        recentExceptionCount = 0;
510:                    } else {
511:                        // exception burst window was started recently
512:                        if (++recentExceptionCount >= 10) {
513:                            try {
514:                                Thread.sleep(10000);
515:                            } catch (InterruptedException ignore) {
516:                            }
517:                        }
518:                    }
519:                }
520:            }
521:
522:            /** close socket and eat exception */
523:            private static void closeSocket(Socket sock) {
524:                try {
525:                    sock.close();
526:                } catch (IOException ex) {
527:                    // eat exception
528:                }
529:            }
530:
531:            /**
532:             * handleMessages decodes transport operations and handles messages
533:             * appropriately.  If an exception occurs during message handling,
534:             * the socket is closed.
535:             */
536:            void handleMessages(Connection conn, boolean persistent) {
537:                int port = getEndpoint().getPort();
538:
539:                try {
540:                    DataInputStream in = new DataInputStream(conn
541:                            .getInputStream());
542:                    do {
543:                        int op = in.read(); // transport op
544:                        if (op == -1) {
545:                            if (tcpLog.isLoggable(Log.BRIEF)) {
546:                                tcpLog.log(Log.BRIEF, "(port " + port
547:                                        + ") connection closed");
548:                            }
549:                            break;
550:                        }
551:
552:                        if (tcpLog.isLoggable(Log.BRIEF)) {
553:                            tcpLog.log(Log.BRIEF, "(port " + port + ") op = "
554:                                    + op);
555:                        }
556:
557:                        switch (op) {
558:                        case TransportConstants.Call:
559:                            // service incoming RMI call
560:                            RemoteCall call = new StreamRemoteCall(conn);
561:                            if (serviceCall(call) == false)
562:                                return;
563:                            break;
564:
565:                        case TransportConstants.Ping:
566:                            // send ack for ping
567:                            DataOutputStream out = new DataOutputStream(conn
568:                                    .getOutputStream());
569:                            out.writeByte(TransportConstants.PingAck);
570:                            conn.releaseOutputStream();
571:                            break;
572:
573:                        case TransportConstants.DGCAck:
574:                            DGCAckHandler.received(UID.read(in));
575:                            break;
576:
577:                        default:
578:                            throw new IOException("unknown transport op " + op);
579:                        }
580:                    } while (persistent);
581:
582:                } catch (IOException e) {
583:                    // exception during processing causes connection to close (below)
584:                    if (tcpLog.isLoggable(Log.BRIEF)) {
585:                        tcpLog.log(Log.BRIEF,
586:                                "(port " + port + ") exception: ", e);
587:                    }
588:                } finally {
589:                    try {
590:                        conn.close();
591:                    } catch (IOException ex) {
592:                        // eat exception
593:                    }
594:                }
595:            }
596:
597:            /**
598:             * Returns the client host for the current thread's connection.  Throws
599:             * ServerNotActiveException if no connection is active for this thread.
600:             */
601:            public static String getClientHost()
602:                    throws ServerNotActiveException {
603:                ConnectionHandler h = threadConnectionHandler.get();
604:                if (h != null) {
605:                    return h.getClientHost();
606:                } else {
607:                    throw new ServerNotActiveException("not in a remote call");
608:                }
609:            }
610:
611:            /**
612:             * Services messages on accepted connection
613:             */
614:            private class ConnectionHandler implements  Runnable {
615:
616:                /** int value of "POST" in ASCII (Java's specified data formats
617:                 *  make this once-reviled tactic again socially acceptable) */
618:                private static final int POST = 0x504f5354;
619:
620:                /** most recently accept-authorized AccessControlContext */
621:                private AccessControlContext okContext;
622:                /** cache of accept-authorized AccessControlContexts */
623:                private Map<AccessControlContext, Reference<AccessControlContext>> authCache;
624:                /** security manager which authorized contexts in authCache */
625:                private SecurityManager cacheSecurityManager = null;
626:
627:                private Socket socket;
628:                private String remoteHost;
629:
630:                ConnectionHandler(Socket socket, String remoteHost) {
631:                    this .socket = socket;
632:                    this .remoteHost = remoteHost;
633:                }
634:
635:                String getClientHost() {
636:                    return remoteHost;
637:                }
638:
639:                /**
640:                 * Verify that the given AccessControlContext has permission to
641:                 * accept this connection.
642:                 */
643:                void checkAcceptPermission(SecurityManager sm,
644:                        AccessControlContext acc) {
645:                    /*
646:                     * Note: no need to synchronize on cache-related fields, since this
647:                     * method only gets called from the ConnectionHandler's thread.
648:                     */
649:                    if (sm != cacheSecurityManager) {
650:                        okContext = null;
651:                        authCache = new WeakHashMap<AccessControlContext, Reference<AccessControlContext>>();
652:                        cacheSecurityManager = sm;
653:                    }
654:                    if (acc.equals(okContext) || authCache.containsKey(acc)) {
655:                        return;
656:                    }
657:                    InetAddress addr = socket.getInetAddress();
658:                    String host = (addr != null) ? addr.getHostAddress() : "*";
659:
660:                    sm.checkAccept(host, socket.getPort());
661:
662:                    authCache.put(acc, new SoftReference<AccessControlContext>(
663:                            acc));
664:                    okContext = acc;
665:                }
666:
667:                public void run() {
668:                    Thread t = Thread.currentThread();
669:                    String name = t.getName();
670:                    try {
671:                        t.setName("RMI TCP Connection("
672:                                + connectionCount.incrementAndGet() + ")-"
673:                                + remoteHost);
674:                        run0();
675:                    } finally {
676:                        t.setName(name);
677:                    }
678:                }
679:
680:                private void run0() {
681:                    TCPEndpoint endpoint = getEndpoint();
682:                    int port = endpoint.getPort();
683:
684:                    threadConnectionHandler.set(this );
685:
686:                    // set socket to disable Nagle's algorithm (always send
687:                    // immediately)
688:                    // TBD: should this be left up to socket factory instead?
689:                    try {
690:                        socket.setTcpNoDelay(true);
691:                    } catch (Exception e) {
692:                        // if we fail to set this, ignore and proceed anyway
693:                    }
694:                    // set socket to timeout after excessive idle time
695:                    try {
696:                        if (connectionReadTimeout > 0)
697:                            socket.setSoTimeout(connectionReadTimeout);
698:                    } catch (Exception e) {
699:                        // too bad, continue anyway
700:                    }
701:
702:                    try {
703:                        InputStream sockIn = socket.getInputStream();
704:                        InputStream bufIn = sockIn.markSupported() ? sockIn
705:                                : new BufferedInputStream(sockIn);
706:
707:                        // Read magic (or HTTP wrapper)
708:                        bufIn.mark(4);
709:                        DataInputStream in = new DataInputStream(bufIn);
710:                        int magic = in.readInt();
711:
712:                        if (magic == POST) {
713:                            tcpLog.log(Log.BRIEF, "decoding HTTP-wrapped call");
714:
715:                            // It's really a HTTP-wrapped request.  Repackage
716:                            // the socket in a HttpReceiveSocket, reinitialize
717:                            // sockIn and in, and reread magic.
718:                            bufIn.reset(); // unread "POST"
719:
720:                            try {
721:                                socket = new HttpReceiveSocket(socket, bufIn,
722:                                        null);
723:                                remoteHost = "0.0.0.0";
724:                                sockIn = socket.getInputStream();
725:                                bufIn = new BufferedInputStream(sockIn);
726:                                in = new DataInputStream(bufIn);
727:                                magic = in.readInt();
728:
729:                            } catch (IOException e) {
730:                                throw new RemoteException(
731:                                        "Error HTTP-unwrapping call", e);
732:                            }
733:                        }
734:                        // bufIn's mark will invalidate itself when it overflows
735:                        // so it doesn't have to be turned off
736:
737:                        // read and verify transport header
738:                        short version = in.readShort();
739:                        if (magic != TransportConstants.Magic
740:                                || version != TransportConstants.Version) {
741:                            // protocol mismatch detected...
742:                            // just close socket: this would recurse if we marshal an
743:                            // exception to the client and the protocol at other end
744:                            // doesn't match.
745:                            closeSocket(socket);
746:                            return;
747:                        }
748:
749:                        OutputStream sockOut = socket.getOutputStream();
750:                        BufferedOutputStream bufOut = new BufferedOutputStream(
751:                                sockOut);
752:                        DataOutputStream out = new DataOutputStream(bufOut);
753:
754:                        int remotePort = socket.getPort();
755:
756:                        if (tcpLog.isLoggable(Log.BRIEF)) {
757:                            tcpLog.log(Log.BRIEF, "accepted socket from ["
758:                                    + remoteHost + ":" + remotePort + "]");
759:                        }
760:
761:                        TCPEndpoint ep;
762:                        TCPChannel ch;
763:                        TCPConnection conn;
764:
765:                        // send ack (or nack) for protocol
766:                        byte protocol = in.readByte();
767:                        switch (protocol) {
768:                        case TransportConstants.SingleOpProtocol:
769:                            // no ack for protocol
770:
771:                            // create dummy channel for receiving messages
772:                            ep = new TCPEndpoint(remoteHost, socket
773:                                    .getLocalPort(), endpoint
774:                                    .getClientSocketFactory(), endpoint
775:                                    .getServerSocketFactory());
776:                            ch = new TCPChannel(TCPTransport.this , ep);
777:                            conn = new TCPConnection(ch, socket, bufIn, bufOut);
778:
779:                            // read input messages
780:                            handleMessages(conn, false);
781:                            break;
782:
783:                        case TransportConstants.StreamProtocol:
784:                            // send ack
785:                            out.writeByte(TransportConstants.ProtocolAck);
786:
787:                            // suggest endpoint (in case client doesn't know host name)
788:                            if (tcpLog.isLoggable(Log.VERBOSE)) {
789:                                tcpLog.log(Log.VERBOSE, "(port " + port + ") "
790:                                        + "suggesting " + remoteHost + ":"
791:                                        + remotePort);
792:                            }
793:
794:                            out.writeUTF(remoteHost);
795:                            out.writeInt(remotePort);
796:                            out.flush();
797:
798:                            // read and discard (possibly bogus) endpoint
799:                            // REMIND: would be faster to read 2 bytes then skip N+4
800:                            String clientHost = in.readUTF();
801:                            int clientPort = in.readInt();
802:                            if (tcpLog.isLoggable(Log.VERBOSE)) {
803:                                tcpLog.log(Log.VERBOSE, "(port " + port
804:                                        + ") client using " + clientHost + ":"
805:                                        + clientPort);
806:                            }
807:
808:                            // create dummy channel for receiving messages
809:                            // (why not use clientHost and clientPort?)
810:                            ep = new TCPEndpoint(remoteHost, socket
811:                                    .getLocalPort(), endpoint
812:                                    .getClientSocketFactory(), endpoint
813:                                    .getServerSocketFactory());
814:                            ch = new TCPChannel(TCPTransport.this , ep);
815:                            conn = new TCPConnection(ch, socket, bufIn, bufOut);
816:
817:                            // read input messages
818:                            handleMessages(conn, true);
819:                            break;
820:
821:                        case TransportConstants.MultiplexProtocol:
822:                            if (tcpLog.isLoggable(Log.VERBOSE)) {
823:                                tcpLog.log(Log.VERBOSE, "(port " + port
824:                                        + ") accepting multiplex protocol");
825:                            }
826:
827:                            // send ack
828:                            out.writeByte(TransportConstants.ProtocolAck);
829:
830:                            // suggest endpoint (in case client doesn't already have one)
831:                            if (tcpLog.isLoggable(Log.VERBOSE)) {
832:                                tcpLog.log(Log.VERBOSE, "(port " + port
833:                                        + ") suggesting " + remoteHost + ":"
834:                                        + remotePort);
835:                            }
836:
837:                            out.writeUTF(remoteHost);
838:                            out.writeInt(remotePort);
839:                            out.flush();
840:
841:                            // read endpoint client has decided to use
842:                            ep = new TCPEndpoint(in.readUTF(), in.readInt(),
843:                                    endpoint.getClientSocketFactory(), endpoint
844:                                            .getServerSocketFactory());
845:                            if (tcpLog.isLoggable(Log.VERBOSE)) {
846:                                tcpLog.log(Log.VERBOSE, "(port " + port
847:                                        + ") client using " + ep.getHost()
848:                                        + ":" + ep.getPort());
849:                            }
850:
851:                            ConnectionMultiplexer multiplexer;
852:                            synchronized (channelTable) {
853:                                // create or find channel for this endpoint
854:                                ch = getChannel(ep);
855:                                multiplexer = new ConnectionMultiplexer(ch,
856:                                        bufIn, sockOut, false);
857:                                ch.useMultiplexer(multiplexer);
858:                            }
859:                            multiplexer.run();
860:                            break;
861:
862:                        default:
863:                            // protocol not understood, send nack and close socket
864:                            out.writeByte(TransportConstants.ProtocolNack);
865:                            out.flush();
866:                            break;
867:                        }
868:
869:                    } catch (IOException e) {
870:                        // socket in unknown state: destroy socket
871:                        tcpLog.log(Log.BRIEF, "terminated with exception:", e);
872:                    } finally {
873:                        closeSocket(socket);
874:                    }
875:                }
876:            }
877:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.