Source Code Cross Referenced for ConnectionTableNIO.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » blocks » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.blocks 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        // $Id: ConnectionTableNIO.java,v 1.24 2006/09/18 18:00:37 bstansberry Exp $
0002:
0003:        package org.jgroups.blocks;
0004:
0005:        import EDU.oswego.cs.dl.util.concurrent.*;
0006:        import org.apache.commons.logging.Log;
0007:        import org.apache.commons.logging.LogFactory;
0008:        import org.jgroups.Address;
0009:        import org.jgroups.stack.IpAddress;
0010:        import org.jgroups.util.Util;
0011:
0012:        import java.io.IOException;
0013:        import java.net.*;
0014:        import java.nio.ByteBuffer;
0015:        import java.nio.channels.*;
0016:        import java.nio.channels.spi.SelectorProvider;
0017:        import java.util.Iterator;
0018:        import java.util.LinkedList;
0019:        import java.util.Set;
0020:
0021:        /**
0022:         * Manages incoming and outgoing TCP connections. For each outgoing message to destination P, if there
0023:         * is not yet a connection for P, one will be created. Subsequent outgoing messages will use this
0024:         * connection.  For incoming messages, one server socket is created at startup. For each new incoming
0025:         * client connecting, a new thread from a thread pool is allocated and listens for incoming messages
0026:         * until the socket is closed by the peer.<br>Sockets/threads with no activity will be killed
0027:         * after some time.
0028:         * <p/>
0029:         * Incoming messages from any of the sockets can be received by setting the message listener.
0030:         *
0031:         * We currently require use_incoming_packet_handler=true (release 2.4 will support use_incoming_packet_handler=false
0032:         * due to threadless stack support).
0033:         *
0034:         * @author Bela Ban, Scott Marlow, Alex Fu
0035:         */
0036:        public class ConnectionTableNIO extends BasicConnectionTable implements 
0037:                Runnable {
0038:
0039:            private ServerSocketChannel m_serverSocketChannel;
0040:            private Selector m_acceptSelector;
0041:            protected final static Log LOG = LogFactory
0042:                    .getLog(ConnectionTableNIO.class);
0043:
0044:            private WriteHandler[] m_writeHandlers;
0045:            private int m_nextWriteHandler = 0;
0046:            private final Object m_lockNextWriteHandler = new Object();
0047:
0048:            private ReadHandler[] m_readHandlers;
0049:            private int m_nextReadHandler = 0;
0050:            private final Object m_lockNextReadHandler = new Object();
0051:
0052:            // thread pool for processing read requests
0053:            private Executor m_requestProcessors;
0054:            private volatile boolean serverStopping = false;
0055:
0056:            private final LinkedList m_backGroundThreads = new LinkedList(); // Collection of all created threads
0057:
0058:            private int m_reader_threads = 8;
0059:
0060:            private int m_writer_threads = 8;
0061:
0062:            private int m_processor_threads = 10; // PooledExecutor.createThreads()
0063:            private int m_processor_minThreads = 10; // PooledExecutor.setMinimumPoolSize()
0064:            private int m_processor_maxThreads = 10; // PooledExecutor.setMaxThreads()
0065:            private int m_processor_queueSize = 100; // Number of queued requests that can be pending waiting
0066:            // for a background thread to run the request.
0067:            private int m_processor_keepAliveTime = -1; // PooledExecutor.setKeepAliveTime( milliseconds);
0068:
0069:            // A negative value means to wait forever
0070:
0071:            /**
0072:             * @param srv_port
0073:             * @throws Exception
0074:             */
0075:            public ConnectionTableNIO(int srv_port) throws Exception {
0076:                this .srv_port = srv_port;
0077:                start();
0078:            }
0079:
0080:            /**
0081:             * @param srv_port
0082:             * @param reaper_interval
0083:             * @param conn_expire_time
0084:             * @throws Exception
0085:             */
0086:            public ConnectionTableNIO(int srv_port, long reaper_interval,
0087:                    long conn_expire_time) throws Exception {
0088:                this .srv_port = srv_port;
0089:                this .reaper_interval = reaper_interval;
0090:                this .conn_expire_time = conn_expire_time;
0091:                start();
0092:            }
0093:
0094:            /**
0095:             * @param r
0096:             * @param bind_addr
0097:             * @param external_addr
0098:             * @param srv_port
0099:             * @param max_port
0100:             * @throws Exception
0101:             */
0102:            public ConnectionTableNIO(Receiver r, InetAddress bind_addr,
0103:                    InetAddress external_addr, int srv_port, int max_port)
0104:                    throws Exception {
0105:                setReceiver(r);
0106:                this .external_addr = external_addr;
0107:                this .bind_addr = bind_addr;
0108:                this .srv_port = srv_port;
0109:                this .max_port = max_port;
0110:                use_reaper = true;
0111:                start();
0112:            }
0113:
0114:            public ConnectionTableNIO(Receiver r, InetAddress bind_addr,
0115:                    InetAddress external_addr, int srv_port, int max_port,
0116:                    boolean doStart) throws Exception {
0117:                setReceiver(r);
0118:                this .external_addr = external_addr;
0119:                this .bind_addr = bind_addr;
0120:                this .srv_port = srv_port;
0121:                this .max_port = max_port;
0122:                use_reaper = true;
0123:                if (doStart)
0124:                    start();
0125:            }
0126:
0127:            /**
0128:             * @param r
0129:             * @param bind_addr
0130:             * @param external_addr
0131:             * @param srv_port
0132:             * @param max_port
0133:             * @param reaper_interval
0134:             * @param conn_expire_time
0135:             * @throws Exception
0136:             */
0137:            public ConnectionTableNIO(Receiver r, InetAddress bind_addr,
0138:                    InetAddress external_addr, int srv_port, int max_port,
0139:                    long reaper_interval, long conn_expire_time)
0140:                    throws Exception {
0141:                setReceiver(r);
0142:                this .bind_addr = bind_addr;
0143:                this .external_addr = external_addr;
0144:                this .srv_port = srv_port;
0145:                this .max_port = max_port;
0146:                this .reaper_interval = reaper_interval;
0147:                this .conn_expire_time = conn_expire_time;
0148:                use_reaper = true;
0149:                start();
0150:            }
0151:
0152:            public ConnectionTableNIO(Receiver r, InetAddress bind_addr,
0153:                    InetAddress external_addr, int srv_port, int max_port,
0154:                    long reaper_interval, long conn_expire_time, boolean doStart)
0155:                    throws Exception {
0156:                setReceiver(r);
0157:                this .bind_addr = bind_addr;
0158:                this .external_addr = external_addr;
0159:                this .srv_port = srv_port;
0160:                this .max_port = max_port;
0161:                this .reaper_interval = reaper_interval;
0162:                this .conn_expire_time = conn_expire_time;
0163:                use_reaper = true;
0164:                if (doStart)
0165:                    start();
0166:            }
0167:
0168:            public int getReaderThreads() {
0169:                return m_reader_threads;
0170:            }
0171:
0172:            public void setReaderThreads(int m_reader_threads) {
0173:                this .m_reader_threads = m_reader_threads;
0174:            }
0175:
0176:            public int getWriterThreads() {
0177:                return m_writer_threads;
0178:            }
0179:
0180:            public void setWriterThreads(int m_writer_threads) {
0181:                this .m_writer_threads = m_writer_threads;
0182:            }
0183:
0184:            public int getProcessorThreads() {
0185:                return m_processor_threads;
0186:            }
0187:
0188:            public void setProcessorThreads(int m_processor_threads) {
0189:                this .m_processor_threads = m_processor_threads;
0190:            }
0191:
0192:            public int getProcessorMinThreads() {
0193:                return m_processor_minThreads;
0194:            }
0195:
0196:            public void setProcessorMinThreads(int m_processor_minThreads) {
0197:                this .m_processor_minThreads = m_processor_minThreads;
0198:            }
0199:
0200:            public int getProcessorMaxThreads() {
0201:                return m_processor_maxThreads;
0202:            }
0203:
0204:            public void setProcessorMaxThreads(int m_processor_maxThreads) {
0205:                this .m_processor_maxThreads = m_processor_maxThreads;
0206:            }
0207:
0208:            public int getProcessorQueueSize() {
0209:                return m_processor_queueSize;
0210:            }
0211:
0212:            public void setProcessorQueueSize(int m_processor_queueSize) {
0213:                this .m_processor_queueSize = m_processor_queueSize;
0214:            }
0215:
0216:            public int getProcessorKeepAliveTime() {
0217:                return m_processor_keepAliveTime;
0218:            }
0219:
0220:            public void setProcessorKeepAliveTime(int m_processor_keepAliveTime) {
0221:                this .m_processor_keepAliveTime = m_processor_keepAliveTime;
0222:            }
0223:
0224:            /**
0225:             * Try to obtain correct Connection (or create one if not yet existent)
0226:             */
0227:            ConnectionTable.Connection getConnection(Address dest)
0228:                    throws Exception {
0229:                Connection conn;
0230:                SocketChannel sock_ch;
0231:
0232:                synchronized (conns) {
0233:                    conn = (Connection) conns.get(dest);
0234:                    if (conn == null) {
0235:                        InetSocketAddress destAddress = new InetSocketAddress(
0236:                                ((IpAddress) dest).getIpAddress(),
0237:                                ((IpAddress) dest).getPort());
0238:                        sock_ch = SocketChannel.open(destAddress);
0239:                        sock_ch.socket().setTcpNoDelay(tcp_nodelay);
0240:                        conn = new Connection(sock_ch, dest);
0241:
0242:                        conn.sendLocalAddress(local_addr);
0243:                        // This outbound connection is ready
0244:
0245:                        sock_ch.configureBlocking(false);
0246:
0247:                        try {
0248:                            if (LOG.isTraceEnabled())
0249:                                LOG
0250:                                        .trace("About to change new connection send buff size from "
0251:                                                + sock_ch.socket()
0252:                                                        .getSendBufferSize()
0253:                                                + " bytes");
0254:                            sock_ch.socket().setSendBufferSize(send_buf_size);
0255:                            if (LOG.isTraceEnabled())
0256:                                LOG
0257:                                        .trace("Changed new connection send buff size to "
0258:                                                + sock_ch.socket()
0259:                                                        .getSendBufferSize()
0260:                                                + " bytes");
0261:                        } catch (IllegalArgumentException ex) {
0262:                            if (log.isErrorEnabled())
0263:                                log
0264:                                        .error("exception setting send buffer size to "
0265:                                                + send_buf_size
0266:                                                + " bytes: "
0267:                                                + ex);
0268:                        }
0269:                        try {
0270:                            if (LOG.isTraceEnabled())
0271:                                LOG
0272:                                        .trace("About to change new connection receive buff size from "
0273:                                                + sock_ch.socket()
0274:                                                        .getReceiveBufferSize()
0275:                                                + " bytes");
0276:                            sock_ch.socket()
0277:                                    .setReceiveBufferSize(recv_buf_size);
0278:                            if (LOG.isTraceEnabled())
0279:                                LOG
0280:                                        .trace("Changed new connection receive buff size to "
0281:                                                + sock_ch.socket()
0282:                                                        .getReceiveBufferSize()
0283:                                                + " bytes");
0284:                        } catch (IllegalArgumentException ex) {
0285:                            if (log.isErrorEnabled())
0286:                                log
0287:                                        .error("exception setting receive buffer size to "
0288:                                                + send_buf_size
0289:                                                + " bytes: "
0290:                                                + ex);
0291:                        }
0292:
0293:                        int idx;
0294:                        synchronized (m_lockNextWriteHandler) {
0295:                            idx = m_nextWriteHandler = (m_nextWriteHandler + 1)
0296:                                    % m_writeHandlers.length;
0297:                        }
0298:                        conn.setupWriteHandler(m_writeHandlers[idx]);
0299:
0300:                        // Put the new connection to the queue
0301:                        try {
0302:                            synchronized (m_lockNextReadHandler) {
0303:                                idx = m_nextReadHandler = (m_nextReadHandler + 1)
0304:                                        % m_readHandlers.length;
0305:                            }
0306:                            m_readHandlers[idx].add(conn);
0307:
0308:                        } catch (InterruptedException e) {
0309:                            if (LOG.isWarnEnabled())
0310:                                LOG
0311:                                        .warn(
0312:                                                "Thread ("
0313:                                                        + Thread
0314:                                                                .currentThread()
0315:                                                                .getName()
0316:                                                        + ") was interrupted, closing connection",
0317:                                                e);
0318:                            // What can we do? Remove it from table then.
0319:                            conn.destroy();
0320:                            throw e;
0321:                        }
0322:
0323:                        // Add connection to table
0324:                        addConnection(dest, conn);
0325:
0326:                        notifyConnectionOpened(dest);
0327:                        if (LOG.isInfoEnabled())
0328:                            LOG.info("created socket to " + dest);
0329:                    }
0330:                    return conn;
0331:                }
0332:            }
0333:
0334:            public final void start() throws Exception {
0335:                super .start();
0336:                //Roland Kurmann 4/7/2003, build new thread group
0337:                thread_group = new ThreadGroup(Util.getGlobalThreadGroup(),
0338:                        "ConnectionTableThreads");
0339:                init();
0340:                srv_sock = createServerSocket(srv_port, max_port);
0341:
0342:                if (external_addr != null)
0343:                    local_addr = new IpAddress(external_addr, srv_sock
0344:                            .getLocalPort());
0345:                else if (bind_addr != null)
0346:                    local_addr = new IpAddress(bind_addr, srv_sock
0347:                            .getLocalPort());
0348:                else
0349:                    local_addr = new IpAddress(srv_sock.getLocalPort());
0350:
0351:                if (log.isInfoEnabled())
0352:                    log.info("server socket created on " + local_addr);
0353:
0354:                //Roland Kurmann 4/7/2003, put in thread_group
0355:                acceptor = new Thread(thread_group, this ,
0356:                        "ConnectionTable.AcceptorThread");
0357:                acceptor.setDaemon(true);
0358:                acceptor.start();
0359:                m_backGroundThreads.add(acceptor);
0360:
0361:                // start the connection reaper - will periodically remove unused connections
0362:                if (use_reaper && reaper == null) {
0363:                    reaper = new Reaper();
0364:                    reaper.start();
0365:                }
0366:            }
0367:
0368:            protected void init() throws Exception {
0369:
0370:                // use directExector if max thread pool size is less than or equal to zero.
0371:                if (getProcessorMaxThreads() <= 0) {
0372:                    m_requestProcessors = new DirectExecutor();
0373:                } else {
0374:                    // Create worker thread pool for processing incoming buffers
0375:                    PooledExecutor requestProcessors = new PooledExecutor(
0376:                            new BoundedBuffer(getProcessorQueueSize()),
0377:                            getProcessorMaxThreads());
0378:                    requestProcessors.setThreadFactory(new ThreadFactory() {
0379:                        public Thread newThread(Runnable runnable) {
0380:                            Thread new_thread = new Thread(thread_group,
0381:                                    runnable);
0382:                            new_thread.setDaemon(true);
0383:                            new_thread.setName("ConnectionTableNIO.Thread");
0384:                            m_backGroundThreads.add(new_thread);
0385:                            return new_thread;
0386:                        }
0387:                    });
0388:                    requestProcessors
0389:                            .setMinimumPoolSize(getProcessorMinThreads());
0390:                    requestProcessors
0391:                            .setKeepAliveTime(getProcessorKeepAliveTime());
0392:                    requestProcessors.waitWhenBlocked();
0393:                    requestProcessors.createThreads(getProcessorThreads());
0394:                    m_requestProcessors = requestProcessors;
0395:                }
0396:
0397:                m_writeHandlers = WriteHandler.create(getWriterThreads(),
0398:                        thread_group, m_backGroundThreads);
0399:                m_readHandlers = ReadHandler.create(getReaderThreads(), this ,
0400:                        thread_group, m_backGroundThreads);
0401:            }
0402:
0403:            /**
0404:             * Closes all open sockets, the server socket and all threads waiting for incoming messages
0405:             */
0406:            public void stop() {
0407:                super .stop();
0408:                serverStopping = true;
0409:
0410:                if (reaper != null)
0411:                    reaper.stop();
0412:
0413:                // Stop the main selector
0414:                m_acceptSelector.wakeup();
0415:
0416:                // Stop selector threads
0417:                for (int i = 0; i < m_readHandlers.length; i++) {
0418:                    try {
0419:                        m_readHandlers[i].add(new Shutdown());
0420:                    } catch (InterruptedException e) {
0421:                        LOG
0422:                                .error(
0423:                                        "Thread ("
0424:                                                + Thread.currentThread()
0425:                                                        .getName()
0426:                                                + ") was interrupted, failed to shutdown selector",
0427:                                        e);
0428:                    }
0429:                }
0430:                for (int i = 0; i < m_writeHandlers.length; i++) {
0431:                    try {
0432:                        m_writeHandlers[i].QUEUE.put(new Shutdown());
0433:                        m_writeHandlers[i].SELECTOR.wakeup();
0434:                    } catch (InterruptedException e) {
0435:                        LOG
0436:                                .error(
0437:                                        "Thread ("
0438:                                                + Thread.currentThread()
0439:                                                        .getName()
0440:                                                + ") was interrupted, failed to shutdown selector",
0441:                                        e);
0442:                    }
0443:                }
0444:
0445:                // Stop the callback thread pool
0446:                if (m_requestProcessors instanceof  PooledExecutor)
0447:                    ((PooledExecutor) m_requestProcessors).shutdownNow();
0448:
0449:                if (m_requestProcessors instanceof  PooledExecutor) {
0450:                    try {
0451:                        ((PooledExecutor) m_requestProcessors)
0452:                                .awaitTerminationAfterShutdown(1000);
0453:                    } catch (InterruptedException e) {
0454:                    }
0455:                }
0456:
0457:                // then close the connections
0458:                synchronized (conns) {
0459:                    Iterator it = conns.values().iterator();
0460:                    while (it.hasNext()) {
0461:                        Connection conn = (Connection) it.next();
0462:                        conn.destroy();
0463:                    }
0464:                    conns.clear();
0465:                }
0466:
0467:                while (m_backGroundThreads.size() > 0) {
0468:                    Thread t = (Thread) m_backGroundThreads.removeFirst();
0469:                    try {
0470:                        t.join();
0471:                    } catch (InterruptedException e) {
0472:                        LOG.error("Thread (" + Thread.currentThread().getName()
0473:                                + ") was interrupted while waiting on thread "
0474:                                + t.getName() + " to finish.");
0475:                    }
0476:                }
0477:                m_backGroundThreads.clear();
0478:
0479:            }
0480:
0481:            /**
0482:             * Acceptor thread. Continuously accept new connections and assign readhandler/writehandler
0483:             * to them.
0484:             */
0485:            public void run() {
0486:                Connection conn;
0487:
0488:                while (m_serverSocketChannel.isOpen() && !serverStopping) {
0489:                    int num;
0490:                    try {
0491:                        num = m_acceptSelector.select();
0492:                    } catch (IOException e) {
0493:                        if (LOG.isWarnEnabled())
0494:                            LOG
0495:                                    .warn(
0496:                                            "Select operation on listening socket failed",
0497:                                            e);
0498:                        continue; // Give up this time
0499:                    }
0500:
0501:                    if (num > 0) {
0502:                        Set readyKeys = m_acceptSelector.selectedKeys();
0503:                        for (Iterator i = readyKeys.iterator(); i.hasNext();) {
0504:                            SelectionKey key = (SelectionKey) i.next();
0505:                            i.remove();
0506:                            // We only deal with new incoming connections
0507:
0508:                            ServerSocketChannel readyChannel = (ServerSocketChannel) key
0509:                                    .channel();
0510:                            SocketChannel client_sock_ch;
0511:                            try {
0512:                                client_sock_ch = readyChannel.accept();
0513:                            } catch (IOException e) {
0514:                                if (LOG.isWarnEnabled())
0515:                                    LOG
0516:                                            .warn(
0517:                                                    "Attempt to accept new connection from listening socket failed",
0518:                                                    e);
0519:                                // Give up this connection
0520:                                continue;
0521:                            }
0522:
0523:                            if (LOG.isInfoEnabled())
0524:                                LOG.info("accepted connection, client_sock="
0525:                                        + client_sock_ch.socket());
0526:
0527:                            try {
0528:
0529:                                if (LOG.isTraceEnabled())
0530:                                    LOG
0531:                                            .trace("About to change new connection send buff size from "
0532:                                                    + client_sock_ch
0533:                                                            .socket()
0534:                                                            .getSendBufferSize()
0535:                                                    + " bytes");
0536:                                client_sock_ch.socket().setSendBufferSize(
0537:                                        send_buf_size);
0538:                                if (LOG.isTraceEnabled())
0539:                                    LOG
0540:                                            .trace("Changed new connection send buff size to "
0541:                                                    + client_sock_ch
0542:                                                            .socket()
0543:                                                            .getSendBufferSize()
0544:                                                    + " bytes");
0545:                            } catch (IllegalArgumentException ex) {
0546:                                if (log.isErrorEnabled())
0547:                                    log.error(
0548:                                            "exception setting send buffer size to "
0549:                                                    + send_buf_size
0550:                                                    + " bytes: ", ex);
0551:                            } catch (SocketException e) {
0552:                                if (log.isErrorEnabled())
0553:                                    log.error(
0554:                                            "exception setting send buffer size to "
0555:                                                    + send_buf_size
0556:                                                    + " bytes: ", e);
0557:                            }
0558:
0559:                            try {
0560:                                if (LOG.isTraceEnabled())
0561:                                    LOG
0562:                                            .trace("About to change new connection receive buff size from "
0563:                                                    + client_sock_ch
0564:                                                            .socket()
0565:                                                            .getReceiveBufferSize()
0566:                                                    + " bytes");
0567:                                client_sock_ch.socket().setReceiveBufferSize(
0568:                                        recv_buf_size);
0569:                                if (LOG.isTraceEnabled())
0570:                                    LOG
0571:                                            .trace("Changed new connection receive buff size to "
0572:                                                    + client_sock_ch
0573:                                                            .socket()
0574:                                                            .getReceiveBufferSize()
0575:                                                    + " bytes");
0576:                            } catch (IllegalArgumentException ex) {
0577:                                if (log.isErrorEnabled())
0578:                                    log.error(
0579:                                            "exception setting receive buffer size to "
0580:                                                    + send_buf_size
0581:                                                    + " bytes: ", ex);
0582:                            } catch (SocketException e) {
0583:                                if (log.isErrorEnabled())
0584:                                    log.error(
0585:                                            "exception setting receive buffer size to "
0586:                                                    + recv_buf_size
0587:                                                    + " bytes: ", e);
0588:                            }
0589:
0590:                            conn = new Connection(client_sock_ch, null);
0591:                            try {
0592:                                conn.peer_addr = conn
0593:                                        .readPeerAddress(client_sock_ch
0594:                                                .socket());
0595:
0596:                                synchronized (conns) {
0597:                                    if (conns
0598:                                            .containsKey(conn.getPeerAddress())) {
0599:                                        if (conn.getPeerAddress().equals(
0600:                                                getLocalAddress())) {
0601:                                            if (LOG.isTraceEnabled())
0602:                                                LOG
0603:                                                        .trace(conn
0604:                                                                .getPeerAddress()
0605:                                                                + " is myself, not put it in table twice, but still read from it");
0606:                                        } else {
0607:                                            if (LOG.isWarnEnabled())
0608:                                                LOG
0609:                                                        .warn(conn
0610:                                                                .getPeerAddress()
0611:                                                                + " is already there, will terminate connection");
0612:                                            // keep existing connection, close this new one
0613:                                            conn.destroy();
0614:                                            continue;
0615:                                        }
0616:                                    } else {
0617:                                        addConnection(conn.getPeerAddress(),
0618:                                                conn);
0619:                                    }
0620:                                }
0621:                                notifyConnectionOpened(conn.getPeerAddress());
0622:                                client_sock_ch.configureBlocking(false);
0623:                            } catch (IOException e) {
0624:                                if (LOG.isWarnEnabled())
0625:                                    LOG
0626:                                            .warn(
0627:                                                    "Attempt to configure non-blocking mode failed",
0628:                                                    e);
0629:                                // Give up this connection
0630:                                conn.destroy();
0631:                                continue;
0632:                            } catch (Exception e) {
0633:                                if (LOG.isWarnEnabled())
0634:                                    LOG
0635:                                            .warn(
0636:                                                    "Attempt to handshake with other peer failed",
0637:                                                    e);
0638:                                // Give up this connection
0639:                                conn.destroy();
0640:                                continue;
0641:                            }
0642:
0643:                            int idx;
0644:                            synchronized (m_lockNextWriteHandler) {
0645:                                idx = m_nextWriteHandler = (m_nextWriteHandler + 1)
0646:                                        % m_writeHandlers.length;
0647:                            }
0648:                            conn.setupWriteHandler(m_writeHandlers[idx]);
0649:
0650:                            try {
0651:                                synchronized (m_lockNextReadHandler) {
0652:                                    idx = m_nextReadHandler = (m_nextReadHandler + 1)
0653:                                            % m_readHandlers.length;
0654:                                }
0655:                                m_readHandlers[idx].add(conn);
0656:
0657:                            } catch (InterruptedException e) {
0658:                                if (LOG.isWarnEnabled())
0659:                                    LOG
0660:                                            .warn(
0661:                                                    "Attempt to configure read handler for accepted connection failed",
0662:                                                    e);
0663:                                // close connection
0664:                                conn.destroy();
0665:                            }
0666:                        } // end of iteration
0667:                    } // end of selected key > 0
0668:                } // end of thread
0669:
0670:                if (m_serverSocketChannel.isOpen()) {
0671:                    try {
0672:                        m_serverSocketChannel.close();
0673:                    } catch (Exception e) {
0674:                        log.error("exception closing server listening socket",
0675:                                e);
0676:                    }
0677:                }
0678:                if (LOG.isTraceEnabled())
0679:                    LOG.trace("acceptor thread terminated");
0680:
0681:            }
0682:
0683:            /**
0684:             * Finds first available port starting at start_port and returns server socket. Sets srv_port
0685:             */
0686:            protected ServerSocket createServerSocket(int start_port,
0687:                    int end_port) throws Exception {
0688:                this .m_acceptSelector = Selector.open();
0689:                m_serverSocketChannel = ServerSocketChannel.open();
0690:                m_serverSocketChannel.configureBlocking(false);
0691:                while (true) {
0692:                    try {
0693:                        SocketAddress sockAddr;
0694:                        if (bind_addr == null) {
0695:                            sockAddr = new InetSocketAddress(start_port);
0696:                            m_serverSocketChannel.socket().bind(sockAddr);
0697:                        } else {
0698:                            sockAddr = new InetSocketAddress(bind_addr,
0699:                                    start_port);
0700:                            m_serverSocketChannel.socket().bind(sockAddr,
0701:                                    backlog);
0702:                        }
0703:                    } catch (BindException bind_ex) {
0704:                        if (start_port == end_port)
0705:                            throw (BindException) ((new BindException(
0706:                                    "No available port to bind to"))
0707:                                    .initCause(bind_ex));
0708:                        start_port++;
0709:                        continue;
0710:                    } catch (SocketException bind_ex) {
0711:                        if (start_port == end_port)
0712:                            throw (BindException) ((new BindException(
0713:                                    "No available port to bind to"))
0714:                                    .initCause(bind_ex));
0715:                        start_port++;
0716:                        continue;
0717:                    } catch (IOException io_ex) {
0718:                        if (LOG.isErrorEnabled())
0719:                            LOG.error(
0720:                                    "Attempt to bind serversocket failed, port="
0721:                                            + start_port + ", bind addr="
0722:                                            + bind_addr, io_ex);
0723:                        throw io_ex;
0724:                    }
0725:                    srv_port = start_port;
0726:                    break;
0727:                }
0728:                m_serverSocketChannel.register(this .m_acceptSelector,
0729:                        SelectionKey.OP_ACCEPT);
0730:                return m_serverSocketChannel.socket();
0731:            }
0732:
0733:            protected void runRequest(Address addr, ByteBuffer buf)
0734:                    throws InterruptedException {
0735:                m_requestProcessors.execute(new ExecuteTask(addr, buf));
0736:            }
0737:
0738:            // Represents shutdown
0739:            private static class Shutdown {
0740:            }
0741:
0742:            // ReadHandler has selector to deal with read, it runs in seperated thread
0743:            private static class ReadHandler implements  Runnable {
0744:                private final Selector SELECTOR = initHandler();
0745:                private final LinkedQueue QUEUE = new LinkedQueue();
0746:                private final ConnectionTableNIO connectTable;
0747:
0748:                ReadHandler(ConnectionTableNIO ct) {
0749:                    connectTable = ct;
0750:                }
0751:
0752:                public Selector initHandler() {
0753:                    // Open the selector
0754:                    try {
0755:                        return Selector.open();
0756:                    } catch (IOException e) {
0757:                        if (LOG.isErrorEnabled())
0758:                            LOG.error(e);
0759:                        throw new IllegalStateException(e.getMessage());
0760:                    }
0761:
0762:                }
0763:
0764:                /**
0765:                 * create instances of ReadHandler threads for receiving data.
0766:                 *
0767:                 * @param workerThreads is the number of threads to create.
0768:                 */
0769:                private static ReadHandler[] create(int workerThreads,
0770:                        ConnectionTableNIO ct, ThreadGroup tg,
0771:                        LinkedList backGroundThreads) {
0772:                    ReadHandler[] handlers = new ReadHandler[workerThreads];
0773:                    for (int looper = 0; looper < workerThreads; looper++) {
0774:                        handlers[looper] = new ReadHandler(ct);
0775:
0776:                        Thread thread = new Thread(tg, handlers[looper],
0777:                                "nioReadHandlerThread");
0778:                        thread.setDaemon(true);
0779:                        thread.start();
0780:                        backGroundThreads.add(thread);
0781:                    }
0782:                    return handlers;
0783:                }
0784:
0785:                private void add(Object conn) throws InterruptedException {
0786:                    QUEUE.put(conn);
0787:                    wakeup();
0788:                }
0789:
0790:                private void wakeup() {
0791:                    SELECTOR.wakeup();
0792:                }
0793:
0794:                public void run() {
0795:                    while (true) { // m_s can be closed by the management thread
0796:                        int events;
0797:                        try {
0798:                            events = SELECTOR.select();
0799:                        } catch (IOException e) {
0800:                            if (LOG.isWarnEnabled())
0801:                                LOG
0802:                                        .warn(
0803:                                                "Select operation on socket failed",
0804:                                                e);
0805:                            continue; // Give up this time
0806:                        } catch (ClosedSelectorException e) {
0807:                            if (LOG.isWarnEnabled())
0808:                                LOG
0809:                                        .warn(
0810:                                                "Select operation on socket failed",
0811:                                                e);
0812:                            return; // Selector gets closed, thread stops
0813:                        }
0814:
0815:                        if (events > 0) { // there are read-ready channels
0816:                            Set readyKeys = SELECTOR.selectedKeys();
0817:                            for (Iterator i = readyKeys.iterator(); i.hasNext();) {
0818:                                SelectionKey key = (SelectionKey) i.next();
0819:                                i.remove();
0820:                                // Do partial read and handle call back
0821:                                Connection conn = (Connection) key.attachment();
0822:                                try {
0823:                                    if (conn.getSocketChannel().isOpen())
0824:                                        readOnce(conn);
0825:                                    else { // socket connection is already closed, clean up connection state
0826:                                        conn.closed();
0827:                                    }
0828:                                } catch (IOException e) {
0829:                                    if (LOG.isTraceEnabled())
0830:                                        LOG
0831:                                                .trace(
0832:                                                        "Read operation on socket failed",
0833:                                                        e);
0834:                                    // The connection must be bad, cancel the key, close socket, then
0835:                                    // remove it from table!
0836:                                    key.cancel();
0837:                                    conn.destroy();
0838:                                    conn.closed();
0839:                                }
0840:                            }
0841:                        }
0842:
0843:                        // Now we look at the connection queue to get any new connections added
0844:                        Object o;
0845:                        try {
0846:                            o = QUEUE.poll(0); // get a connection
0847:                        } catch (InterruptedException e) {
0848:                            if (LOG.isInfoEnabled())
0849:                                LOG
0850:                                        .info(
0851:                                                "Thread ("
0852:                                                        + Thread
0853:                                                                .currentThread()
0854:                                                                .getName()
0855:                                                        + ") was interrupted while polling queue",
0856:                                                e);
0857:                            // We must give up
0858:                            continue;
0859:                        }
0860:                        if (null == o)
0861:                            continue;
0862:                        if (o instanceof  Shutdown) { // shutdown command?
0863:                            try {
0864:                                SELECTOR.close();
0865:                            } catch (IOException e) {
0866:                                if (LOG.isInfoEnabled())
0867:                                    LOG
0868:                                            .info(
0869:                                                    "Read selector close operation failed",
0870:                                                    e);
0871:                            }
0872:                            return; // stop reading
0873:                        }
0874:                        Connection conn = (Connection) o;// must be a new connection
0875:                        SocketChannel sc = conn.getSocketChannel();
0876:                        try {
0877:                            sc.register(SELECTOR, SelectionKey.OP_READ, conn);
0878:                        } catch (ClosedChannelException e) {
0879:                            if (LOG.isInfoEnabled())
0880:                                LOG
0881:                                        .info(
0882:                                                "Socket channel was closed while we were trying to register it to selector",
0883:                                                e);
0884:                            // Channel becomes bad. The connection must be bad,
0885:                            // close socket, then remove it from table!
0886:                            conn.destroy();
0887:                            conn.closed();
0888:                        }
0889:                    } // end of the while true loop
0890:                }
0891:
0892:                private void readOnce(Connection conn) throws IOException {
0893:                    ConnectionReadState readState = conn.getReadState();
0894:                    if (!readState.isHeadFinished()) { // a brand new message coming or header is not completed
0895:                        // Begin or continue to read header
0896:                        int size = readHeader(conn);
0897:                        if (0 == size) { // header is not completed
0898:                            return;
0899:                        }
0900:                    }
0901:                    // Begin or continue to read body
0902:                    if (readBody(conn) > 0) { // not finish yet
0903:                        return;
0904:                    }
0905:                    Address addr = conn.getPeerAddress();
0906:                    ByteBuffer buf = readState.getReadBodyBuffer();
0907:                    // Clear status
0908:                    readState.bodyFinished();
0909:                    // Assign worker thread to execute call back
0910:                    try {
0911:                        connectTable.runRequest(addr, buf);
0912:                    } catch (InterruptedException e) {
0913:                        // Cannot do call back, what can we do?
0914:                        // Give up handling the message then
0915:                        LOG
0916:                                .error(
0917:                                        "Thread ("
0918:                                                + Thread.currentThread()
0919:                                                        .getName()
0920:                                                + ") was interrupted while assigning executor to process read request",
0921:                                        e);
0922:                    }
0923:                }
0924:
0925:                /**
0926:                 * Read message header from channel. It doesn't try to complete. If there is nothing in
0927:                 * the channel, the method returns immediately.
0928:                 *
0929:                 * @param conn The connection
0930:                 * @return 0 if header hasn't been read completely, otherwise the size of message body
0931:                 * @throws IOException
0932:                 */
0933:                private int readHeader(Connection conn) throws IOException {
0934:                    ConnectionReadState readState = conn.getReadState();
0935:                    ByteBuffer headBuf = readState.getReadHeadBuffer();
0936:
0937:                    SocketChannel sc = conn.getSocketChannel();
0938:                    while (headBuf.remaining() > 0) {
0939:                        int num = sc.read(headBuf);
0940:                        if (-1 == num) {// EOS
0941:                            throw new IOException("Peer closed socket");
0942:                        }
0943:                        if (0 == num) // no more data
0944:                            return 0;
0945:                    }
0946:                    // OK, now we get the whole header, change the status and return message size
0947:                    return readState.headFinished();
0948:                }
0949:
0950:                /**
0951:                 * Read message body from channel. It doesn't try to complete. If there is nothing in
0952:                 * the channel, the method returns immediately.
0953:                 *
0954:                 * @param conn The connection
0955:                 * @return remaining bytes for the message
0956:                 * @throws IOException
0957:                 */
0958:                private int readBody(Connection conn) throws IOException {
0959:                    ByteBuffer bodyBuf = conn.getReadState()
0960:                            .getReadBodyBuffer();
0961:
0962:                    SocketChannel sc = conn.getSocketChannel();
0963:                    while (bodyBuf.remaining() > 0) {
0964:                        int num = sc.read(bodyBuf);
0965:                        if (-1 == num) // EOS
0966:                            throw new IOException(
0967:                                    "Couldn't read from socket as peer closed the socket");
0968:                        if (0 == num) // no more data
0969:                            return bodyBuf.remaining();
0970:                    }
0971:                    // OK, we finished reading the whole message! Flip it (not necessary though)
0972:                    bodyBuf.flip();
0973:                    return 0;
0974:                }
0975:            }
0976:
0977:            private class ExecuteTask implements  Runnable {
0978:                Address m_addr = null;
0979:                ByteBuffer m_buf = null;
0980:
0981:                public ExecuteTask(Address addr, ByteBuffer buf) {
0982:                    m_addr = addr;
0983:                    m_buf = buf;
0984:                }
0985:
0986:                public void run() {
0987:                    receive(m_addr, m_buf.array(), m_buf.arrayOffset(), m_buf
0988:                            .limit());
0989:                }
0990:            }
0991:
0992:            private class ConnectionReadState {
0993:                private final Connection m_conn;
0994:
0995:                // Status for receiving message
0996:                private boolean m_headFinished = false;
0997:                private ByteBuffer m_readBodyBuf = null;
0998:                private final ByteBuffer m_readHeadBuf = ByteBuffer
0999:                        .allocate(Connection.HEADER_SIZE);
1000:
1001:                public ConnectionReadState(Connection conn) {
1002:                    m_conn = conn;
1003:                }
1004:
1005:                ByteBuffer getReadBodyBuffer() {
1006:                    return m_readBodyBuf;
1007:                }
1008:
1009:                ByteBuffer getReadHeadBuffer() {
1010:                    return m_readHeadBuf;
1011:                }
1012:
1013:                void bodyFinished() {
1014:                    m_headFinished = false;
1015:                    m_readHeadBuf.clear();
1016:                    m_readBodyBuf = null;
1017:                    m_conn.updateLastAccessed();
1018:                }
1019:
1020:                /**
1021:                 * Status change for finishing reading the message header (data already in buffer)
1022:                 *
1023:                 * @return message size
1024:                 */
1025:                int headFinished() {
1026:                    m_headFinished = true;
1027:                    m_readHeadBuf.flip();
1028:                    int messageSize = m_readHeadBuf.getInt();
1029:                    m_readBodyBuf = ByteBuffer.allocate(messageSize);
1030:                    m_conn.updateLastAccessed();
1031:                    return messageSize;
1032:                }
1033:
1034:                boolean isHeadFinished() {
1035:                    return m_headFinished;
1036:                }
1037:            }
1038:
1039:            class Connection extends ConnectionTable.Connection {
1040:                private SocketChannel sock_ch = null;
1041:                private WriteHandler m_writeHandler;
1042:                private SelectorWriteHandler m_selectorWriteHandler;
1043:                private final ConnectionReadState m_readState;
1044:
1045:                private static final int HEADER_SIZE = 4;
1046:                final ByteBuffer headerBuffer = ByteBuffer
1047:                        .allocate(HEADER_SIZE);
1048:
1049:                Connection(SocketChannel s, Address peer_addr) {
1050:                    super (s.socket(), peer_addr);
1051:                    sock_ch = s;
1052:                    m_readState = new ConnectionReadState(this );
1053:                    is_running = true;
1054:                }
1055:
1056:                private ConnectionReadState getReadState() {
1057:                    return m_readState;
1058:                }
1059:
1060:                private void setupWriteHandler(WriteHandler hdlr) {
1061:                    m_writeHandler = hdlr;
1062:                    m_selectorWriteHandler = hdlr.add(sock_ch);
1063:                }
1064:
1065:                //      void destroy()
1066:                //      {
1067:                //         closeSocket();
1068:                //      }
1069:
1070:                void doSend(byte[] buffie, int offset, int length)
1071:                        throws Exception {
1072:                    FutureResult result = new FutureResult();
1073:                    m_writeHandler.write(sock_ch, ByteBuffer.wrap(buffie,
1074:                            offset, length), result, m_selectorWriteHandler);
1075:                    Exception ex = result.getException();
1076:                    if (ex != null) {
1077:                        if (LOG.isErrorEnabled())
1078:                            LOG.error("failed sending message", ex);
1079:                        if (ex.getCause() instanceof  IOException)
1080:                            throw (IOException) ex.getCause();
1081:                        throw ex;
1082:                    }
1083:                    result.get();
1084:                }
1085:
1086:                SocketChannel getSocketChannel() {
1087:                    return sock_ch;
1088:                }
1089:
1090:                void closeSocket() {
1091:
1092:                    if (sock_ch != null) {
1093:                        try {
1094:                            if (sock_ch.isConnected() && sock_ch.isOpen()) {
1095:                                sock_ch.close();
1096:                            }
1097:                        } catch (Exception e) {
1098:                            log.error("error closing socket connection", e);
1099:                        }
1100:                        sock_ch = null;
1101:                    }
1102:                }
1103:
1104:                void closed() {
1105:                    Address peerAddr = getPeerAddress();
1106:                    synchronized (conns) {
1107:                        conns.remove(peerAddr);
1108:                    }
1109:                    notifyConnectionClosed(peerAddr);
1110:                }
1111:            }
1112:
1113:            /**
1114:             * Handle writing to non-blocking NIO connection.
1115:             */
1116:            private static class WriteHandler implements  Runnable {
1117:                // Create a queue for write requests
1118:                private final LinkedQueue QUEUE = new LinkedQueue();
1119:
1120:                private final Selector SELECTOR = initSelector();
1121:                private int m_pendingChannels; // count of the number of channels that have pending writes
1122:                // note that this variable is only accessed by one thread.
1123:
1124:                // allocate and reuse the header for all buffer write operations
1125:                private ByteBuffer m_headerBuffer = ByteBuffer
1126:                        .allocate(Connection.HEADER_SIZE);
1127:
1128:                Selector initSelector() {
1129:                    try {
1130:                        return SelectorProvider.provider().openSelector();
1131:                    } catch (IOException e) {
1132:                        if (LOG.isErrorEnabled())
1133:                            LOG.error(e);
1134:                        throw new IllegalStateException(e.getMessage());
1135:                    }
1136:                }
1137:
1138:                /**
1139:                 * create instances of WriteHandler threads for sending data.
1140:                 *
1141:                 * @param workerThreads is the number of threads to create.
1142:                 */
1143:                private static WriteHandler[] create(int workerThreads,
1144:                        ThreadGroup tg, LinkedList backGroundThreads) {
1145:                    WriteHandler[] handlers = new WriteHandler[workerThreads];
1146:                    for (int looper = 0; looper < workerThreads; looper++) {
1147:                        handlers[looper] = new WriteHandler();
1148:
1149:                        Thread thread = new Thread(tg, handlers[looper],
1150:                                "nioWriteHandlerThread");
1151:                        thread.setDaemon(true);
1152:                        thread.start();
1153:                        backGroundThreads.add(thread);
1154:                    }
1155:                    return handlers;
1156:                }
1157:
1158:                /**
1159:                 * Add a new channel to be handled.
1160:                 *
1161:                 * @param channel
1162:                 */
1163:                private SelectorWriteHandler add(SocketChannel channel) {
1164:                    return new SelectorWriteHandler(channel, SELECTOR,
1165:                            m_headerBuffer);
1166:                }
1167:
1168:                /**
1169:                 * Writes buffer to the specified socket connection.  This is always performed asynchronously.  If you want
1170:                 * to perform a synchrounous write, call notification.`get() which will block until the write operation is complete.
1171:                 * Best practice is to call notification.getException() which may return any exceptions that occured during the write
1172:                 * operation.
1173:                 *
1174:                 * @param channel      is where the buffer is written to.
1175:                 * @param buffer       is what we write.
1176:                 * @param notification may be specified if you want to know how many bytes were written and know if an exception
1177:                 *                     occurred.
1178:                 */
1179:                private void write(SocketChannel channel, ByteBuffer buffer,
1180:                        FutureResult notification, SelectorWriteHandler hdlr)
1181:                        throws InterruptedException {
1182:                    QUEUE.put(new WriteRequest(channel, buffer, notification,
1183:                            hdlr));
1184:                }
1185:
1186:                private void close(SelectorWriteHandler entry) {
1187:                    entry.cancel();
1188:                }
1189:
1190:                private void handleChannelError(SelectorWriteHandler entry,
1191:                        Throwable error) {
1192:                    // notify callers of the exception and drain all of the send buffers for this channel.
1193:                    do {
1194:                        if (error != null)
1195:                            entry.notifyError(error);
1196:                    } while (entry.next());
1197:                    close(entry);
1198:                }
1199:
1200:                // process the write operation
1201:                private void processWrite(Selector selector) {
1202:                    Set keys = selector.selectedKeys();
1203:                    Object arr[] = keys.toArray();
1204:                    for (int looper = 0; looper < arr.length; looper++) {
1205:                        SelectionKey key = (SelectionKey) arr[looper];
1206:                        SelectorWriteHandler entry = (SelectorWriteHandler) key
1207:                                .attachment();
1208:                        boolean needToDecrementPendingChannels = false;
1209:                        try {
1210:                            if (0 == entry.write()) { // write the buffer and if the remaining bytes is zero,
1211:                                // notify the caller of number of bytes written.
1212:                                entry.notifyObject(new Integer(entry
1213:                                        .getBytesWritten()));
1214:                                // switch to next write buffer or clear interest bit on socket channel.
1215:                                if (!entry.next()) {
1216:                                    needToDecrementPendingChannels = true;
1217:                                }
1218:                            }
1219:
1220:                        } catch (IOException e) {
1221:                            needToDecrementPendingChannels = true;
1222:                            // connection must of closed
1223:                            handleChannelError(entry, e);
1224:                        } finally {
1225:                            if (needToDecrementPendingChannels)
1226:                                m_pendingChannels--;
1227:                        }
1228:                    }
1229:                    keys.clear();
1230:                }
1231:
1232:                public void run() {
1233:                    while (SELECTOR.isOpen()) {
1234:                        try {
1235:                            WriteRequest queueEntry;
1236:                            Object o;
1237:
1238:                            // When there are no more commands in the Queue, we will hit the blocking code after this loop.
1239:                            while (null != (o = QUEUE.poll(0))) {
1240:                                if (o instanceof  Shutdown) // Stop the thread
1241:                                {
1242:                                    try {
1243:                                        SELECTOR.close();
1244:                                    } catch (IOException e) {
1245:                                        if (LOG.isInfoEnabled())
1246:                                            LOG
1247:                                                    .info(
1248:                                                            "Write selector close operation failed",
1249:                                                            e);
1250:                                    }
1251:                                    return;
1252:                                }
1253:                                queueEntry = (WriteRequest) o;
1254:
1255:                                if (queueEntry.getHandler().add(queueEntry)) {
1256:                                    // If the add operation returns true, than means that a buffer is available to be written to the
1257:                                    // corresponding channel and channel's selection key has been modified to indicate interest in the
1258:                                    // 'write' operation.
1259:                                    // If the add operation threw an exception, we will not increment m_pendingChannels which
1260:                                    // seems correct as long as a new buffer wasn't added to be sent.
1261:                                    // Another way to view this is that we don't have to protect m_pendingChannels on the increment
1262:                                    // side, only need to protect on the decrement side (this logic of this run() will be incorrect
1263:                                    // if m_pendingChannels is set incorrectly).
1264:                                    m_pendingChannels++;
1265:                                }
1266:
1267:                                try {
1268:                                    // process any connections ready to be written to.
1269:                                    if (SELECTOR.selectNow() > 0) {
1270:                                        processWrite(SELECTOR);
1271:                                    }
1272:                                } catch (IOException e) { // need to understand what causes this error so we can handle it properly
1273:                                    if (LOG.isErrorEnabled())
1274:                                        LOG
1275:                                                .error(
1276:                                                        "SelectNow operation on write selector failed, didn't expect this to occur, please report this",
1277:                                                        e);
1278:                                    return; // if select fails, give up so we don't go into a busy loop.
1279:                                }
1280:                            }
1281:
1282:                            // if there isn't any pending work to do, block on queue to get next request.
1283:                            if (m_pendingChannels == 0) {
1284:                                o = QUEUE.take();
1285:                                if (o instanceof  Shutdown) { // Stop the thread
1286:                                    try {
1287:                                        SELECTOR.close();
1288:                                    } catch (IOException e) {
1289:                                        if (LOG.isInfoEnabled())
1290:                                            LOG
1291:                                                    .info(
1292:                                                            "Write selector close operation failed",
1293:                                                            e);
1294:                                    }
1295:                                    return;
1296:                                }
1297:                                queueEntry = (WriteRequest) o;
1298:                                if (queueEntry.getHandler().add(queueEntry))
1299:                                    m_pendingChannels++;
1300:                            }
1301:                            // otherwise do a blocking wait select operation.
1302:                            else {
1303:                                try {
1304:                                    if ((SELECTOR.select()) > 0) {
1305:                                        processWrite(SELECTOR);
1306:                                    }
1307:                                } catch (IOException e) { // need to understand what causes this error
1308:                                    if (LOG.isErrorEnabled())
1309:                                        LOG
1310:                                                .error(
1311:                                                        "Failure while writing to socket",
1312:                                                        e);
1313:                                }
1314:                            }
1315:                        } catch (InterruptedException e) {
1316:                            if (LOG.isErrorEnabled())
1317:                                LOG.error("Thread ("
1318:                                        + Thread.currentThread().getName()
1319:                                        + ") was interrupted", e);
1320:                        } catch (Throwable e) // Log throwable rather than terminating this thread.
1321:                        { // We are a daemon thread so we shouldn't prevent the process from terminating if
1322:                            // the controlling thread decides that should happen.
1323:                            if (LOG.isErrorEnabled())
1324:                                LOG.error("Thread ("
1325:                                        + Thread.currentThread().getName()
1326:                                        + ") caught Throwable", e);
1327:                        }
1328:                    }
1329:                }
1330:            }
1331:
1332:            // Wrapper class for passing Write requests.  There will be an instance of this class for each socketChannel
1333:            // mapped to a Selector.
1334:            public static class SelectorWriteHandler {
1335:
1336:                private final LinkedList m_writeRequests = new LinkedList(); // Collection of writeRequests
1337:                private boolean m_headerSent = false;
1338:                private SocketChannel m_channel;
1339:                private SelectionKey m_key;
1340:                private Selector m_selector;
1341:                private int m_bytesWritten = 0;
1342:                private boolean m_enabled = false;
1343:                private ByteBuffer m_headerBuffer;
1344:
1345:                SelectorWriteHandler(SocketChannel channel, Selector selector,
1346:                        ByteBuffer headerBuffer) {
1347:                    m_channel = channel;
1348:                    m_selector = selector;
1349:                    m_headerBuffer = headerBuffer;
1350:                }
1351:
1352:                private void register(Selector selector, SocketChannel channel)
1353:                        throws ClosedChannelException {
1354:                    // register the channel but don't enable OP_WRITE until we have a write request.
1355:                    m_key = channel.register(selector, 0, this );
1356:                }
1357:
1358:                // return true if selection key is enabled when it wasn't previous to call.
1359:                private boolean enable() {
1360:                    boolean rc = false;
1361:
1362:                    try {
1363:                        if (m_key == null) { // register the socket on first access,
1364:                            // we are the only thread using this variable, so no sync needed.
1365:                            register(m_selector, m_channel);
1366:                        }
1367:                    } catch (ClosedChannelException e) {
1368:                        return rc;
1369:                    }
1370:
1371:                    if (!m_enabled) {
1372:                        rc = true;
1373:                        try {
1374:                            m_key.interestOps(SelectionKey.OP_WRITE);
1375:                        } catch (CancelledKeyException e) { // channel must of closed
1376:                            return false;
1377:                        }
1378:                        m_enabled = true;
1379:                    }
1380:                    return rc;
1381:                }
1382:
1383:                private void disable() {
1384:                    if (m_enabled) {
1385:                        try {
1386:                            m_key.interestOps(0); // pass zero which means that we are not interested in being
1387:                            // notified of anything for this channel.
1388:                        } catch (CancelledKeyException eat) // If we finished writing and didn't get an exception, then
1389:                        { // we probably don't need to throw this exception (if they try to write
1390:                            // again, we will then throw an exception).
1391:                        }
1392:                        m_enabled = false;
1393:                    }
1394:                }
1395:
1396:                private void cancel() {
1397:                    m_key.cancel();
1398:                }
1399:
1400:                boolean add(WriteRequest entry) {
1401:                    m_writeRequests.add(entry);
1402:                    return enable();
1403:                }
1404:
1405:                WriteRequest getCurrentRequest() {
1406:                    return (WriteRequest) m_writeRequests.getFirst();
1407:                }
1408:
1409:                SocketChannel getChannel() {
1410:                    return m_channel;
1411:                }
1412:
1413:                ByteBuffer getBuffer() {
1414:                    return getCurrentRequest().getBuffer();
1415:                }
1416:
1417:                FutureResult getCallback() {
1418:                    return getCurrentRequest().getCallback();
1419:                }
1420:
1421:                int getBytesWritten() {
1422:                    return m_bytesWritten;
1423:                }
1424:
1425:                void notifyError(Throwable error) {
1426:                    if (getCallback() != null)
1427:                        getCallback().setException(error);
1428:                }
1429:
1430:                void notifyObject(Object result) {
1431:                    if (getCallback() != null)
1432:                        getCallback().set(result);
1433:                }
1434:
1435:                /**
1436:                 * switch to next request or disable write interest bit if there are no more buffers.
1437:                 *
1438:                 * @return true if another request was found to be processed.
1439:                 */
1440:                boolean next() {
1441:                    m_headerSent = false;
1442:                    m_bytesWritten = 0;
1443:
1444:                    m_writeRequests.removeFirst(); // remove current entry
1445:                    boolean rc = !m_writeRequests.isEmpty();
1446:                    if (!rc) // disable select for this channel if no more entries
1447:                        disable();
1448:                    return rc;
1449:                }
1450:
1451:                /**
1452:                 * @return bytes remaining to write.  This function will only throw IOException, unchecked exceptions are not
1453:                 *         expected to be thrown from here.  It is very important for the caller to know if an unchecked exception can
1454:                 *         be thrown in here.  Please correct the following throws list to include any other exceptions and update
1455:                 *         caller to handle them.
1456:                 * @throws IOException
1457:                 */
1458:                int write() throws IOException {
1459:                    // Send header first.  Note that while we are writing the shared header buffer,
1460:                    // no other threads can access the header buffer as we are the only thread that has access to it.
1461:                    if (!m_headerSent) {
1462:                        m_headerSent = true;
1463:                        m_headerBuffer.clear();
1464:                        m_headerBuffer.putInt(getBuffer().remaining());
1465:                        m_headerBuffer.flip();
1466:                        do {
1467:                            getChannel().write(m_headerBuffer);
1468:                        } // we should be able to handle writing the header in one action but just in case, just do a busy loop
1469:                        while (m_headerBuffer.remaining() > 0);
1470:
1471:                    }
1472:
1473:                    m_bytesWritten += (getChannel().write(getBuffer()));
1474:
1475:                    return getBuffer().remaining();
1476:                }
1477:
1478:            }
1479:
1480:            public static class WriteRequest {
1481:                private final SocketChannel m_channel;
1482:                private final ByteBuffer m_buffer;
1483:                private final FutureResult m_callback;
1484:                private final SelectorWriteHandler m_hdlr;
1485:
1486:                WriteRequest(SocketChannel channel, ByteBuffer buffer,
1487:                        FutureResult callback, SelectorWriteHandler hdlr) {
1488:                    m_channel = channel;
1489:                    m_buffer = buffer;
1490:                    m_callback = callback;
1491:                    m_hdlr = hdlr;
1492:                }
1493:
1494:                SelectorWriteHandler getHandler() {
1495:                    return m_hdlr;
1496:                }
1497:
1498:                SocketChannel getChannel() {
1499:                    return m_channel;
1500:                }
1501:
1502:                ByteBuffer getBuffer() {
1503:                    return m_buffer;
1504:                }
1505:
1506:                FutureResult getCallback() {
1507:                    return m_callback;
1508:                }
1509:
1510:            }
1511:
1512:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.