0001: // $Id: ConnectionTableNIO.java,v 1.24 2006/09/18 18:00:37 bstansberry Exp $
0002:
0003: package org.jgroups.blocks;
0004:
0005: import EDU.oswego.cs.dl.util.concurrent.*;
0006: import org.apache.commons.logging.Log;
0007: import org.apache.commons.logging.LogFactory;
0008: import org.jgroups.Address;
0009: import org.jgroups.stack.IpAddress;
0010: import org.jgroups.util.Util;
0011:
0012: import java.io.IOException;
0013: import java.net.*;
0014: import java.nio.ByteBuffer;
0015: import java.nio.channels.*;
0016: import java.nio.channels.spi.SelectorProvider;
0017: import java.util.Iterator;
0018: import java.util.LinkedList;
0019: import java.util.Set;
0020:
0021: /**
0022: * Manages incoming and outgoing TCP connections. For each outgoing message to destination P, if there
0023: * is not yet a connection for P, one will be created. Subsequent outgoing messages will use this
0024: * connection. For incoming messages, one server socket is created at startup. For each new incoming
0025: * client connecting, a new thread from a thread pool is allocated and listens for incoming messages
0026: * until the socket is closed by the peer.<br>Sockets/threads with no activity will be killed
0027: * after some time.
0028: * <p/>
0029: * Incoming messages from any of the sockets can be received by setting the message listener.
0030: *
0031: * We currently require use_incoming_packet_handler=true (release 2.4 will support use_incoming_packet_handler=false
0032: * due to threadless stack support).
0033: *
0034: * @author Bela Ban, Scott Marlow, Alex Fu
0035: */
0036: public class ConnectionTableNIO extends BasicConnectionTable implements
0037: Runnable {
0038:
0039: private ServerSocketChannel m_serverSocketChannel;
0040: private Selector m_acceptSelector;
0041: protected final static Log LOG = LogFactory
0042: .getLog(ConnectionTableNIO.class);
0043:
0044: private WriteHandler[] m_writeHandlers;
0045: private int m_nextWriteHandler = 0;
0046: private final Object m_lockNextWriteHandler = new Object();
0047:
0048: private ReadHandler[] m_readHandlers;
0049: private int m_nextReadHandler = 0;
0050: private final Object m_lockNextReadHandler = new Object();
0051:
0052: // thread pool for processing read requests
0053: private Executor m_requestProcessors;
0054: private volatile boolean serverStopping = false;
0055:
0056: private final LinkedList m_backGroundThreads = new LinkedList(); // Collection of all created threads
0057:
0058: private int m_reader_threads = 8;
0059:
0060: private int m_writer_threads = 8;
0061:
0062: private int m_processor_threads = 10; // PooledExecutor.createThreads()
0063: private int m_processor_minThreads = 10; // PooledExecutor.setMinimumPoolSize()
0064: private int m_processor_maxThreads = 10; // PooledExecutor.setMaxThreads()
0065: private int m_processor_queueSize = 100; // Number of queued requests that can be pending waiting
0066: // for a background thread to run the request.
0067: private int m_processor_keepAliveTime = -1; // PooledExecutor.setKeepAliveTime( milliseconds);
0068:
0069: // A negative value means to wait forever
0070:
0071: /**
0072: * @param srv_port
0073: * @throws Exception
0074: */
0075: public ConnectionTableNIO(int srv_port) throws Exception {
0076: this .srv_port = srv_port;
0077: start();
0078: }
0079:
0080: /**
0081: * @param srv_port
0082: * @param reaper_interval
0083: * @param conn_expire_time
0084: * @throws Exception
0085: */
0086: public ConnectionTableNIO(int srv_port, long reaper_interval,
0087: long conn_expire_time) throws Exception {
0088: this .srv_port = srv_port;
0089: this .reaper_interval = reaper_interval;
0090: this .conn_expire_time = conn_expire_time;
0091: start();
0092: }
0093:
0094: /**
0095: * @param r
0096: * @param bind_addr
0097: * @param external_addr
0098: * @param srv_port
0099: * @param max_port
0100: * @throws Exception
0101: */
0102: public ConnectionTableNIO(Receiver r, InetAddress bind_addr,
0103: InetAddress external_addr, int srv_port, int max_port)
0104: throws Exception {
0105: setReceiver(r);
0106: this .external_addr = external_addr;
0107: this .bind_addr = bind_addr;
0108: this .srv_port = srv_port;
0109: this .max_port = max_port;
0110: use_reaper = true;
0111: start();
0112: }
0113:
0114: public ConnectionTableNIO(Receiver r, InetAddress bind_addr,
0115: InetAddress external_addr, int srv_port, int max_port,
0116: boolean doStart) throws Exception {
0117: setReceiver(r);
0118: this .external_addr = external_addr;
0119: this .bind_addr = bind_addr;
0120: this .srv_port = srv_port;
0121: this .max_port = max_port;
0122: use_reaper = true;
0123: if (doStart)
0124: start();
0125: }
0126:
0127: /**
0128: * @param r
0129: * @param bind_addr
0130: * @param external_addr
0131: * @param srv_port
0132: * @param max_port
0133: * @param reaper_interval
0134: * @param conn_expire_time
0135: * @throws Exception
0136: */
0137: public ConnectionTableNIO(Receiver r, InetAddress bind_addr,
0138: InetAddress external_addr, int srv_port, int max_port,
0139: long reaper_interval, long conn_expire_time)
0140: throws Exception {
0141: setReceiver(r);
0142: this .bind_addr = bind_addr;
0143: this .external_addr = external_addr;
0144: this .srv_port = srv_port;
0145: this .max_port = max_port;
0146: this .reaper_interval = reaper_interval;
0147: this .conn_expire_time = conn_expire_time;
0148: use_reaper = true;
0149: start();
0150: }
0151:
0152: public ConnectionTableNIO(Receiver r, InetAddress bind_addr,
0153: InetAddress external_addr, int srv_port, int max_port,
0154: long reaper_interval, long conn_expire_time, boolean doStart)
0155: throws Exception {
0156: setReceiver(r);
0157: this .bind_addr = bind_addr;
0158: this .external_addr = external_addr;
0159: this .srv_port = srv_port;
0160: this .max_port = max_port;
0161: this .reaper_interval = reaper_interval;
0162: this .conn_expire_time = conn_expire_time;
0163: use_reaper = true;
0164: if (doStart)
0165: start();
0166: }
0167:
0168: public int getReaderThreads() {
0169: return m_reader_threads;
0170: }
0171:
0172: public void setReaderThreads(int m_reader_threads) {
0173: this .m_reader_threads = m_reader_threads;
0174: }
0175:
0176: public int getWriterThreads() {
0177: return m_writer_threads;
0178: }
0179:
0180: public void setWriterThreads(int m_writer_threads) {
0181: this .m_writer_threads = m_writer_threads;
0182: }
0183:
0184: public int getProcessorThreads() {
0185: return m_processor_threads;
0186: }
0187:
0188: public void setProcessorThreads(int m_processor_threads) {
0189: this .m_processor_threads = m_processor_threads;
0190: }
0191:
0192: public int getProcessorMinThreads() {
0193: return m_processor_minThreads;
0194: }
0195:
0196: public void setProcessorMinThreads(int m_processor_minThreads) {
0197: this .m_processor_minThreads = m_processor_minThreads;
0198: }
0199:
0200: public int getProcessorMaxThreads() {
0201: return m_processor_maxThreads;
0202: }
0203:
0204: public void setProcessorMaxThreads(int m_processor_maxThreads) {
0205: this .m_processor_maxThreads = m_processor_maxThreads;
0206: }
0207:
0208: public int getProcessorQueueSize() {
0209: return m_processor_queueSize;
0210: }
0211:
0212: public void setProcessorQueueSize(int m_processor_queueSize) {
0213: this .m_processor_queueSize = m_processor_queueSize;
0214: }
0215:
0216: public int getProcessorKeepAliveTime() {
0217: return m_processor_keepAliveTime;
0218: }
0219:
0220: public void setProcessorKeepAliveTime(int m_processor_keepAliveTime) {
0221: this .m_processor_keepAliveTime = m_processor_keepAliveTime;
0222: }
0223:
0224: /**
0225: * Try to obtain correct Connection (or create one if not yet existent)
0226: */
0227: ConnectionTable.Connection getConnection(Address dest)
0228: throws Exception {
0229: Connection conn;
0230: SocketChannel sock_ch;
0231:
0232: synchronized (conns) {
0233: conn = (Connection) conns.get(dest);
0234: if (conn == null) {
0235: InetSocketAddress destAddress = new InetSocketAddress(
0236: ((IpAddress) dest).getIpAddress(),
0237: ((IpAddress) dest).getPort());
0238: sock_ch = SocketChannel.open(destAddress);
0239: sock_ch.socket().setTcpNoDelay(tcp_nodelay);
0240: conn = new Connection(sock_ch, dest);
0241:
0242: conn.sendLocalAddress(local_addr);
0243: // This outbound connection is ready
0244:
0245: sock_ch.configureBlocking(false);
0246:
0247: try {
0248: if (LOG.isTraceEnabled())
0249: LOG
0250: .trace("About to change new connection send buff size from "
0251: + sock_ch.socket()
0252: .getSendBufferSize()
0253: + " bytes");
0254: sock_ch.socket().setSendBufferSize(send_buf_size);
0255: if (LOG.isTraceEnabled())
0256: LOG
0257: .trace("Changed new connection send buff size to "
0258: + sock_ch.socket()
0259: .getSendBufferSize()
0260: + " bytes");
0261: } catch (IllegalArgumentException ex) {
0262: if (log.isErrorEnabled())
0263: log
0264: .error("exception setting send buffer size to "
0265: + send_buf_size
0266: + " bytes: "
0267: + ex);
0268: }
0269: try {
0270: if (LOG.isTraceEnabled())
0271: LOG
0272: .trace("About to change new connection receive buff size from "
0273: + sock_ch.socket()
0274: .getReceiveBufferSize()
0275: + " bytes");
0276: sock_ch.socket()
0277: .setReceiveBufferSize(recv_buf_size);
0278: if (LOG.isTraceEnabled())
0279: LOG
0280: .trace("Changed new connection receive buff size to "
0281: + sock_ch.socket()
0282: .getReceiveBufferSize()
0283: + " bytes");
0284: } catch (IllegalArgumentException ex) {
0285: if (log.isErrorEnabled())
0286: log
0287: .error("exception setting receive buffer size to "
0288: + send_buf_size
0289: + " bytes: "
0290: + ex);
0291: }
0292:
0293: int idx;
0294: synchronized (m_lockNextWriteHandler) {
0295: idx = m_nextWriteHandler = (m_nextWriteHandler + 1)
0296: % m_writeHandlers.length;
0297: }
0298: conn.setupWriteHandler(m_writeHandlers[idx]);
0299:
0300: // Put the new connection to the queue
0301: try {
0302: synchronized (m_lockNextReadHandler) {
0303: idx = m_nextReadHandler = (m_nextReadHandler + 1)
0304: % m_readHandlers.length;
0305: }
0306: m_readHandlers[idx].add(conn);
0307:
0308: } catch (InterruptedException e) {
0309: if (LOG.isWarnEnabled())
0310: LOG
0311: .warn(
0312: "Thread ("
0313: + Thread
0314: .currentThread()
0315: .getName()
0316: + ") was interrupted, closing connection",
0317: e);
0318: // What can we do? Remove it from table then.
0319: conn.destroy();
0320: throw e;
0321: }
0322:
0323: // Add connection to table
0324: addConnection(dest, conn);
0325:
0326: notifyConnectionOpened(dest);
0327: if (LOG.isInfoEnabled())
0328: LOG.info("created socket to " + dest);
0329: }
0330: return conn;
0331: }
0332: }
0333:
0334: public final void start() throws Exception {
0335: super .start();
0336: //Roland Kurmann 4/7/2003, build new thread group
0337: thread_group = new ThreadGroup(Util.getGlobalThreadGroup(),
0338: "ConnectionTableThreads");
0339: init();
0340: srv_sock = createServerSocket(srv_port, max_port);
0341:
0342: if (external_addr != null)
0343: local_addr = new IpAddress(external_addr, srv_sock
0344: .getLocalPort());
0345: else if (bind_addr != null)
0346: local_addr = new IpAddress(bind_addr, srv_sock
0347: .getLocalPort());
0348: else
0349: local_addr = new IpAddress(srv_sock.getLocalPort());
0350:
0351: if (log.isInfoEnabled())
0352: log.info("server socket created on " + local_addr);
0353:
0354: //Roland Kurmann 4/7/2003, put in thread_group
0355: acceptor = new Thread(thread_group, this ,
0356: "ConnectionTable.AcceptorThread");
0357: acceptor.setDaemon(true);
0358: acceptor.start();
0359: m_backGroundThreads.add(acceptor);
0360:
0361: // start the connection reaper - will periodically remove unused connections
0362: if (use_reaper && reaper == null) {
0363: reaper = new Reaper();
0364: reaper.start();
0365: }
0366: }
0367:
0368: protected void init() throws Exception {
0369:
0370: // use directExector if max thread pool size is less than or equal to zero.
0371: if (getProcessorMaxThreads() <= 0) {
0372: m_requestProcessors = new DirectExecutor();
0373: } else {
0374: // Create worker thread pool for processing incoming buffers
0375: PooledExecutor requestProcessors = new PooledExecutor(
0376: new BoundedBuffer(getProcessorQueueSize()),
0377: getProcessorMaxThreads());
0378: requestProcessors.setThreadFactory(new ThreadFactory() {
0379: public Thread newThread(Runnable runnable) {
0380: Thread new_thread = new Thread(thread_group,
0381: runnable);
0382: new_thread.setDaemon(true);
0383: new_thread.setName("ConnectionTableNIO.Thread");
0384: m_backGroundThreads.add(new_thread);
0385: return new_thread;
0386: }
0387: });
0388: requestProcessors
0389: .setMinimumPoolSize(getProcessorMinThreads());
0390: requestProcessors
0391: .setKeepAliveTime(getProcessorKeepAliveTime());
0392: requestProcessors.waitWhenBlocked();
0393: requestProcessors.createThreads(getProcessorThreads());
0394: m_requestProcessors = requestProcessors;
0395: }
0396:
0397: m_writeHandlers = WriteHandler.create(getWriterThreads(),
0398: thread_group, m_backGroundThreads);
0399: m_readHandlers = ReadHandler.create(getReaderThreads(), this ,
0400: thread_group, m_backGroundThreads);
0401: }
0402:
0403: /**
0404: * Closes all open sockets, the server socket and all threads waiting for incoming messages
0405: */
0406: public void stop() {
0407: super .stop();
0408: serverStopping = true;
0409:
0410: if (reaper != null)
0411: reaper.stop();
0412:
0413: // Stop the main selector
0414: m_acceptSelector.wakeup();
0415:
0416: // Stop selector threads
0417: for (int i = 0; i < m_readHandlers.length; i++) {
0418: try {
0419: m_readHandlers[i].add(new Shutdown());
0420: } catch (InterruptedException e) {
0421: LOG
0422: .error(
0423: "Thread ("
0424: + Thread.currentThread()
0425: .getName()
0426: + ") was interrupted, failed to shutdown selector",
0427: e);
0428: }
0429: }
0430: for (int i = 0; i < m_writeHandlers.length; i++) {
0431: try {
0432: m_writeHandlers[i].QUEUE.put(new Shutdown());
0433: m_writeHandlers[i].SELECTOR.wakeup();
0434: } catch (InterruptedException e) {
0435: LOG
0436: .error(
0437: "Thread ("
0438: + Thread.currentThread()
0439: .getName()
0440: + ") was interrupted, failed to shutdown selector",
0441: e);
0442: }
0443: }
0444:
0445: // Stop the callback thread pool
0446: if (m_requestProcessors instanceof PooledExecutor)
0447: ((PooledExecutor) m_requestProcessors).shutdownNow();
0448:
0449: if (m_requestProcessors instanceof PooledExecutor) {
0450: try {
0451: ((PooledExecutor) m_requestProcessors)
0452: .awaitTerminationAfterShutdown(1000);
0453: } catch (InterruptedException e) {
0454: }
0455: }
0456:
0457: // then close the connections
0458: synchronized (conns) {
0459: Iterator it = conns.values().iterator();
0460: while (it.hasNext()) {
0461: Connection conn = (Connection) it.next();
0462: conn.destroy();
0463: }
0464: conns.clear();
0465: }
0466:
0467: while (m_backGroundThreads.size() > 0) {
0468: Thread t = (Thread) m_backGroundThreads.removeFirst();
0469: try {
0470: t.join();
0471: } catch (InterruptedException e) {
0472: LOG.error("Thread (" + Thread.currentThread().getName()
0473: + ") was interrupted while waiting on thread "
0474: + t.getName() + " to finish.");
0475: }
0476: }
0477: m_backGroundThreads.clear();
0478:
0479: }
0480:
0481: /**
0482: * Acceptor thread. Continuously accept new connections and assign readhandler/writehandler
0483: * to them.
0484: */
0485: public void run() {
0486: Connection conn;
0487:
0488: while (m_serverSocketChannel.isOpen() && !serverStopping) {
0489: int num;
0490: try {
0491: num = m_acceptSelector.select();
0492: } catch (IOException e) {
0493: if (LOG.isWarnEnabled())
0494: LOG
0495: .warn(
0496: "Select operation on listening socket failed",
0497: e);
0498: continue; // Give up this time
0499: }
0500:
0501: if (num > 0) {
0502: Set readyKeys = m_acceptSelector.selectedKeys();
0503: for (Iterator i = readyKeys.iterator(); i.hasNext();) {
0504: SelectionKey key = (SelectionKey) i.next();
0505: i.remove();
0506: // We only deal with new incoming connections
0507:
0508: ServerSocketChannel readyChannel = (ServerSocketChannel) key
0509: .channel();
0510: SocketChannel client_sock_ch;
0511: try {
0512: client_sock_ch = readyChannel.accept();
0513: } catch (IOException e) {
0514: if (LOG.isWarnEnabled())
0515: LOG
0516: .warn(
0517: "Attempt to accept new connection from listening socket failed",
0518: e);
0519: // Give up this connection
0520: continue;
0521: }
0522:
0523: if (LOG.isInfoEnabled())
0524: LOG.info("accepted connection, client_sock="
0525: + client_sock_ch.socket());
0526:
0527: try {
0528:
0529: if (LOG.isTraceEnabled())
0530: LOG
0531: .trace("About to change new connection send buff size from "
0532: + client_sock_ch
0533: .socket()
0534: .getSendBufferSize()
0535: + " bytes");
0536: client_sock_ch.socket().setSendBufferSize(
0537: send_buf_size);
0538: if (LOG.isTraceEnabled())
0539: LOG
0540: .trace("Changed new connection send buff size to "
0541: + client_sock_ch
0542: .socket()
0543: .getSendBufferSize()
0544: + " bytes");
0545: } catch (IllegalArgumentException ex) {
0546: if (log.isErrorEnabled())
0547: log.error(
0548: "exception setting send buffer size to "
0549: + send_buf_size
0550: + " bytes: ", ex);
0551: } catch (SocketException e) {
0552: if (log.isErrorEnabled())
0553: log.error(
0554: "exception setting send buffer size to "
0555: + send_buf_size
0556: + " bytes: ", e);
0557: }
0558:
0559: try {
0560: if (LOG.isTraceEnabled())
0561: LOG
0562: .trace("About to change new connection receive buff size from "
0563: + client_sock_ch
0564: .socket()
0565: .getReceiveBufferSize()
0566: + " bytes");
0567: client_sock_ch.socket().setReceiveBufferSize(
0568: recv_buf_size);
0569: if (LOG.isTraceEnabled())
0570: LOG
0571: .trace("Changed new connection receive buff size to "
0572: + client_sock_ch
0573: .socket()
0574: .getReceiveBufferSize()
0575: + " bytes");
0576: } catch (IllegalArgumentException ex) {
0577: if (log.isErrorEnabled())
0578: log.error(
0579: "exception setting receive buffer size to "
0580: + send_buf_size
0581: + " bytes: ", ex);
0582: } catch (SocketException e) {
0583: if (log.isErrorEnabled())
0584: log.error(
0585: "exception setting receive buffer size to "
0586: + recv_buf_size
0587: + " bytes: ", e);
0588: }
0589:
0590: conn = new Connection(client_sock_ch, null);
0591: try {
0592: conn.peer_addr = conn
0593: .readPeerAddress(client_sock_ch
0594: .socket());
0595:
0596: synchronized (conns) {
0597: if (conns
0598: .containsKey(conn.getPeerAddress())) {
0599: if (conn.getPeerAddress().equals(
0600: getLocalAddress())) {
0601: if (LOG.isTraceEnabled())
0602: LOG
0603: .trace(conn
0604: .getPeerAddress()
0605: + " is myself, not put it in table twice, but still read from it");
0606: } else {
0607: if (LOG.isWarnEnabled())
0608: LOG
0609: .warn(conn
0610: .getPeerAddress()
0611: + " is already there, will terminate connection");
0612: // keep existing connection, close this new one
0613: conn.destroy();
0614: continue;
0615: }
0616: } else {
0617: addConnection(conn.getPeerAddress(),
0618: conn);
0619: }
0620: }
0621: notifyConnectionOpened(conn.getPeerAddress());
0622: client_sock_ch.configureBlocking(false);
0623: } catch (IOException e) {
0624: if (LOG.isWarnEnabled())
0625: LOG
0626: .warn(
0627: "Attempt to configure non-blocking mode failed",
0628: e);
0629: // Give up this connection
0630: conn.destroy();
0631: continue;
0632: } catch (Exception e) {
0633: if (LOG.isWarnEnabled())
0634: LOG
0635: .warn(
0636: "Attempt to handshake with other peer failed",
0637: e);
0638: // Give up this connection
0639: conn.destroy();
0640: continue;
0641: }
0642:
0643: int idx;
0644: synchronized (m_lockNextWriteHandler) {
0645: idx = m_nextWriteHandler = (m_nextWriteHandler + 1)
0646: % m_writeHandlers.length;
0647: }
0648: conn.setupWriteHandler(m_writeHandlers[idx]);
0649:
0650: try {
0651: synchronized (m_lockNextReadHandler) {
0652: idx = m_nextReadHandler = (m_nextReadHandler + 1)
0653: % m_readHandlers.length;
0654: }
0655: m_readHandlers[idx].add(conn);
0656:
0657: } catch (InterruptedException e) {
0658: if (LOG.isWarnEnabled())
0659: LOG
0660: .warn(
0661: "Attempt to configure read handler for accepted connection failed",
0662: e);
0663: // close connection
0664: conn.destroy();
0665: }
0666: } // end of iteration
0667: } // end of selected key > 0
0668: } // end of thread
0669:
0670: if (m_serverSocketChannel.isOpen()) {
0671: try {
0672: m_serverSocketChannel.close();
0673: } catch (Exception e) {
0674: log.error("exception closing server listening socket",
0675: e);
0676: }
0677: }
0678: if (LOG.isTraceEnabled())
0679: LOG.trace("acceptor thread terminated");
0680:
0681: }
0682:
0683: /**
0684: * Finds first available port starting at start_port and returns server socket. Sets srv_port
0685: */
0686: protected ServerSocket createServerSocket(int start_port,
0687: int end_port) throws Exception {
0688: this .m_acceptSelector = Selector.open();
0689: m_serverSocketChannel = ServerSocketChannel.open();
0690: m_serverSocketChannel.configureBlocking(false);
0691: while (true) {
0692: try {
0693: SocketAddress sockAddr;
0694: if (bind_addr == null) {
0695: sockAddr = new InetSocketAddress(start_port);
0696: m_serverSocketChannel.socket().bind(sockAddr);
0697: } else {
0698: sockAddr = new InetSocketAddress(bind_addr,
0699: start_port);
0700: m_serverSocketChannel.socket().bind(sockAddr,
0701: backlog);
0702: }
0703: } catch (BindException bind_ex) {
0704: if (start_port == end_port)
0705: throw (BindException) ((new BindException(
0706: "No available port to bind to"))
0707: .initCause(bind_ex));
0708: start_port++;
0709: continue;
0710: } catch (SocketException bind_ex) {
0711: if (start_port == end_port)
0712: throw (BindException) ((new BindException(
0713: "No available port to bind to"))
0714: .initCause(bind_ex));
0715: start_port++;
0716: continue;
0717: } catch (IOException io_ex) {
0718: if (LOG.isErrorEnabled())
0719: LOG.error(
0720: "Attempt to bind serversocket failed, port="
0721: + start_port + ", bind addr="
0722: + bind_addr, io_ex);
0723: throw io_ex;
0724: }
0725: srv_port = start_port;
0726: break;
0727: }
0728: m_serverSocketChannel.register(this .m_acceptSelector,
0729: SelectionKey.OP_ACCEPT);
0730: return m_serverSocketChannel.socket();
0731: }
0732:
0733: protected void runRequest(Address addr, ByteBuffer buf)
0734: throws InterruptedException {
0735: m_requestProcessors.execute(new ExecuteTask(addr, buf));
0736: }
0737:
0738: // Represents shutdown
0739: private static class Shutdown {
0740: }
0741:
0742: // ReadHandler has selector to deal with read, it runs in seperated thread
0743: private static class ReadHandler implements Runnable {
0744: private final Selector SELECTOR = initHandler();
0745: private final LinkedQueue QUEUE = new LinkedQueue();
0746: private final ConnectionTableNIO connectTable;
0747:
0748: ReadHandler(ConnectionTableNIO ct) {
0749: connectTable = ct;
0750: }
0751:
0752: public Selector initHandler() {
0753: // Open the selector
0754: try {
0755: return Selector.open();
0756: } catch (IOException e) {
0757: if (LOG.isErrorEnabled())
0758: LOG.error(e);
0759: throw new IllegalStateException(e.getMessage());
0760: }
0761:
0762: }
0763:
0764: /**
0765: * create instances of ReadHandler threads for receiving data.
0766: *
0767: * @param workerThreads is the number of threads to create.
0768: */
0769: private static ReadHandler[] create(int workerThreads,
0770: ConnectionTableNIO ct, ThreadGroup tg,
0771: LinkedList backGroundThreads) {
0772: ReadHandler[] handlers = new ReadHandler[workerThreads];
0773: for (int looper = 0; looper < workerThreads; looper++) {
0774: handlers[looper] = new ReadHandler(ct);
0775:
0776: Thread thread = new Thread(tg, handlers[looper],
0777: "nioReadHandlerThread");
0778: thread.setDaemon(true);
0779: thread.start();
0780: backGroundThreads.add(thread);
0781: }
0782: return handlers;
0783: }
0784:
0785: private void add(Object conn) throws InterruptedException {
0786: QUEUE.put(conn);
0787: wakeup();
0788: }
0789:
0790: private void wakeup() {
0791: SELECTOR.wakeup();
0792: }
0793:
0794: public void run() {
0795: while (true) { // m_s can be closed by the management thread
0796: int events;
0797: try {
0798: events = SELECTOR.select();
0799: } catch (IOException e) {
0800: if (LOG.isWarnEnabled())
0801: LOG
0802: .warn(
0803: "Select operation on socket failed",
0804: e);
0805: continue; // Give up this time
0806: } catch (ClosedSelectorException e) {
0807: if (LOG.isWarnEnabled())
0808: LOG
0809: .warn(
0810: "Select operation on socket failed",
0811: e);
0812: return; // Selector gets closed, thread stops
0813: }
0814:
0815: if (events > 0) { // there are read-ready channels
0816: Set readyKeys = SELECTOR.selectedKeys();
0817: for (Iterator i = readyKeys.iterator(); i.hasNext();) {
0818: SelectionKey key = (SelectionKey) i.next();
0819: i.remove();
0820: // Do partial read and handle call back
0821: Connection conn = (Connection) key.attachment();
0822: try {
0823: if (conn.getSocketChannel().isOpen())
0824: readOnce(conn);
0825: else { // socket connection is already closed, clean up connection state
0826: conn.closed();
0827: }
0828: } catch (IOException e) {
0829: if (LOG.isTraceEnabled())
0830: LOG
0831: .trace(
0832: "Read operation on socket failed",
0833: e);
0834: // The connection must be bad, cancel the key, close socket, then
0835: // remove it from table!
0836: key.cancel();
0837: conn.destroy();
0838: conn.closed();
0839: }
0840: }
0841: }
0842:
0843: // Now we look at the connection queue to get any new connections added
0844: Object o;
0845: try {
0846: o = QUEUE.poll(0); // get a connection
0847: } catch (InterruptedException e) {
0848: if (LOG.isInfoEnabled())
0849: LOG
0850: .info(
0851: "Thread ("
0852: + Thread
0853: .currentThread()
0854: .getName()
0855: + ") was interrupted while polling queue",
0856: e);
0857: // We must give up
0858: continue;
0859: }
0860: if (null == o)
0861: continue;
0862: if (o instanceof Shutdown) { // shutdown command?
0863: try {
0864: SELECTOR.close();
0865: } catch (IOException e) {
0866: if (LOG.isInfoEnabled())
0867: LOG
0868: .info(
0869: "Read selector close operation failed",
0870: e);
0871: }
0872: return; // stop reading
0873: }
0874: Connection conn = (Connection) o;// must be a new connection
0875: SocketChannel sc = conn.getSocketChannel();
0876: try {
0877: sc.register(SELECTOR, SelectionKey.OP_READ, conn);
0878: } catch (ClosedChannelException e) {
0879: if (LOG.isInfoEnabled())
0880: LOG
0881: .info(
0882: "Socket channel was closed while we were trying to register it to selector",
0883: e);
0884: // Channel becomes bad. The connection must be bad,
0885: // close socket, then remove it from table!
0886: conn.destroy();
0887: conn.closed();
0888: }
0889: } // end of the while true loop
0890: }
0891:
0892: private void readOnce(Connection conn) throws IOException {
0893: ConnectionReadState readState = conn.getReadState();
0894: if (!readState.isHeadFinished()) { // a brand new message coming or header is not completed
0895: // Begin or continue to read header
0896: int size = readHeader(conn);
0897: if (0 == size) { // header is not completed
0898: return;
0899: }
0900: }
0901: // Begin or continue to read body
0902: if (readBody(conn) > 0) { // not finish yet
0903: return;
0904: }
0905: Address addr = conn.getPeerAddress();
0906: ByteBuffer buf = readState.getReadBodyBuffer();
0907: // Clear status
0908: readState.bodyFinished();
0909: // Assign worker thread to execute call back
0910: try {
0911: connectTable.runRequest(addr, buf);
0912: } catch (InterruptedException e) {
0913: // Cannot do call back, what can we do?
0914: // Give up handling the message then
0915: LOG
0916: .error(
0917: "Thread ("
0918: + Thread.currentThread()
0919: .getName()
0920: + ") was interrupted while assigning executor to process read request",
0921: e);
0922: }
0923: }
0924:
0925: /**
0926: * Read message header from channel. It doesn't try to complete. If there is nothing in
0927: * the channel, the method returns immediately.
0928: *
0929: * @param conn The connection
0930: * @return 0 if header hasn't been read completely, otherwise the size of message body
0931: * @throws IOException
0932: */
0933: private int readHeader(Connection conn) throws IOException {
0934: ConnectionReadState readState = conn.getReadState();
0935: ByteBuffer headBuf = readState.getReadHeadBuffer();
0936:
0937: SocketChannel sc = conn.getSocketChannel();
0938: while (headBuf.remaining() > 0) {
0939: int num = sc.read(headBuf);
0940: if (-1 == num) {// EOS
0941: throw new IOException("Peer closed socket");
0942: }
0943: if (0 == num) // no more data
0944: return 0;
0945: }
0946: // OK, now we get the whole header, change the status and return message size
0947: return readState.headFinished();
0948: }
0949:
0950: /**
0951: * Read message body from channel. It doesn't try to complete. If there is nothing in
0952: * the channel, the method returns immediately.
0953: *
0954: * @param conn The connection
0955: * @return remaining bytes for the message
0956: * @throws IOException
0957: */
0958: private int readBody(Connection conn) throws IOException {
0959: ByteBuffer bodyBuf = conn.getReadState()
0960: .getReadBodyBuffer();
0961:
0962: SocketChannel sc = conn.getSocketChannel();
0963: while (bodyBuf.remaining() > 0) {
0964: int num = sc.read(bodyBuf);
0965: if (-1 == num) // EOS
0966: throw new IOException(
0967: "Couldn't read from socket as peer closed the socket");
0968: if (0 == num) // no more data
0969: return bodyBuf.remaining();
0970: }
0971: // OK, we finished reading the whole message! Flip it (not necessary though)
0972: bodyBuf.flip();
0973: return 0;
0974: }
0975: }
0976:
0977: private class ExecuteTask implements Runnable {
0978: Address m_addr = null;
0979: ByteBuffer m_buf = null;
0980:
0981: public ExecuteTask(Address addr, ByteBuffer buf) {
0982: m_addr = addr;
0983: m_buf = buf;
0984: }
0985:
0986: public void run() {
0987: receive(m_addr, m_buf.array(), m_buf.arrayOffset(), m_buf
0988: .limit());
0989: }
0990: }
0991:
0992: private class ConnectionReadState {
0993: private final Connection m_conn;
0994:
0995: // Status for receiving message
0996: private boolean m_headFinished = false;
0997: private ByteBuffer m_readBodyBuf = null;
0998: private final ByteBuffer m_readHeadBuf = ByteBuffer
0999: .allocate(Connection.HEADER_SIZE);
1000:
1001: public ConnectionReadState(Connection conn) {
1002: m_conn = conn;
1003: }
1004:
1005: ByteBuffer getReadBodyBuffer() {
1006: return m_readBodyBuf;
1007: }
1008:
1009: ByteBuffer getReadHeadBuffer() {
1010: return m_readHeadBuf;
1011: }
1012:
1013: void bodyFinished() {
1014: m_headFinished = false;
1015: m_readHeadBuf.clear();
1016: m_readBodyBuf = null;
1017: m_conn.updateLastAccessed();
1018: }
1019:
1020: /**
1021: * Status change for finishing reading the message header (data already in buffer)
1022: *
1023: * @return message size
1024: */
1025: int headFinished() {
1026: m_headFinished = true;
1027: m_readHeadBuf.flip();
1028: int messageSize = m_readHeadBuf.getInt();
1029: m_readBodyBuf = ByteBuffer.allocate(messageSize);
1030: m_conn.updateLastAccessed();
1031: return messageSize;
1032: }
1033:
1034: boolean isHeadFinished() {
1035: return m_headFinished;
1036: }
1037: }
1038:
1039: class Connection extends ConnectionTable.Connection {
1040: private SocketChannel sock_ch = null;
1041: private WriteHandler m_writeHandler;
1042: private SelectorWriteHandler m_selectorWriteHandler;
1043: private final ConnectionReadState m_readState;
1044:
1045: private static final int HEADER_SIZE = 4;
1046: final ByteBuffer headerBuffer = ByteBuffer
1047: .allocate(HEADER_SIZE);
1048:
1049: Connection(SocketChannel s, Address peer_addr) {
1050: super (s.socket(), peer_addr);
1051: sock_ch = s;
1052: m_readState = new ConnectionReadState(this );
1053: is_running = true;
1054: }
1055:
1056: private ConnectionReadState getReadState() {
1057: return m_readState;
1058: }
1059:
1060: private void setupWriteHandler(WriteHandler hdlr) {
1061: m_writeHandler = hdlr;
1062: m_selectorWriteHandler = hdlr.add(sock_ch);
1063: }
1064:
1065: // void destroy()
1066: // {
1067: // closeSocket();
1068: // }
1069:
1070: void doSend(byte[] buffie, int offset, int length)
1071: throws Exception {
1072: FutureResult result = new FutureResult();
1073: m_writeHandler.write(sock_ch, ByteBuffer.wrap(buffie,
1074: offset, length), result, m_selectorWriteHandler);
1075: Exception ex = result.getException();
1076: if (ex != null) {
1077: if (LOG.isErrorEnabled())
1078: LOG.error("failed sending message", ex);
1079: if (ex.getCause() instanceof IOException)
1080: throw (IOException) ex.getCause();
1081: throw ex;
1082: }
1083: result.get();
1084: }
1085:
1086: SocketChannel getSocketChannel() {
1087: return sock_ch;
1088: }
1089:
1090: void closeSocket() {
1091:
1092: if (sock_ch != null) {
1093: try {
1094: if (sock_ch.isConnected() && sock_ch.isOpen()) {
1095: sock_ch.close();
1096: }
1097: } catch (Exception e) {
1098: log.error("error closing socket connection", e);
1099: }
1100: sock_ch = null;
1101: }
1102: }
1103:
1104: void closed() {
1105: Address peerAddr = getPeerAddress();
1106: synchronized (conns) {
1107: conns.remove(peerAddr);
1108: }
1109: notifyConnectionClosed(peerAddr);
1110: }
1111: }
1112:
1113: /**
1114: * Handle writing to non-blocking NIO connection.
1115: */
1116: private static class WriteHandler implements Runnable {
1117: // Create a queue for write requests
1118: private final LinkedQueue QUEUE = new LinkedQueue();
1119:
1120: private final Selector SELECTOR = initSelector();
1121: private int m_pendingChannels; // count of the number of channels that have pending writes
1122: // note that this variable is only accessed by one thread.
1123:
1124: // allocate and reuse the header for all buffer write operations
1125: private ByteBuffer m_headerBuffer = ByteBuffer
1126: .allocate(Connection.HEADER_SIZE);
1127:
1128: Selector initSelector() {
1129: try {
1130: return SelectorProvider.provider().openSelector();
1131: } catch (IOException e) {
1132: if (LOG.isErrorEnabled())
1133: LOG.error(e);
1134: throw new IllegalStateException(e.getMessage());
1135: }
1136: }
1137:
1138: /**
1139: * create instances of WriteHandler threads for sending data.
1140: *
1141: * @param workerThreads is the number of threads to create.
1142: */
1143: private static WriteHandler[] create(int workerThreads,
1144: ThreadGroup tg, LinkedList backGroundThreads) {
1145: WriteHandler[] handlers = new WriteHandler[workerThreads];
1146: for (int looper = 0; looper < workerThreads; looper++) {
1147: handlers[looper] = new WriteHandler();
1148:
1149: Thread thread = new Thread(tg, handlers[looper],
1150: "nioWriteHandlerThread");
1151: thread.setDaemon(true);
1152: thread.start();
1153: backGroundThreads.add(thread);
1154: }
1155: return handlers;
1156: }
1157:
1158: /**
1159: * Add a new channel to be handled.
1160: *
1161: * @param channel
1162: */
1163: private SelectorWriteHandler add(SocketChannel channel) {
1164: return new SelectorWriteHandler(channel, SELECTOR,
1165: m_headerBuffer);
1166: }
1167:
1168: /**
1169: * Writes buffer to the specified socket connection. This is always performed asynchronously. If you want
1170: * to perform a synchrounous write, call notification.`get() which will block until the write operation is complete.
1171: * Best practice is to call notification.getException() which may return any exceptions that occured during the write
1172: * operation.
1173: *
1174: * @param channel is where the buffer is written to.
1175: * @param buffer is what we write.
1176: * @param notification may be specified if you want to know how many bytes were written and know if an exception
1177: * occurred.
1178: */
1179: private void write(SocketChannel channel, ByteBuffer buffer,
1180: FutureResult notification, SelectorWriteHandler hdlr)
1181: throws InterruptedException {
1182: QUEUE.put(new WriteRequest(channel, buffer, notification,
1183: hdlr));
1184: }
1185:
1186: private void close(SelectorWriteHandler entry) {
1187: entry.cancel();
1188: }
1189:
1190: private void handleChannelError(SelectorWriteHandler entry,
1191: Throwable error) {
1192: // notify callers of the exception and drain all of the send buffers for this channel.
1193: do {
1194: if (error != null)
1195: entry.notifyError(error);
1196: } while (entry.next());
1197: close(entry);
1198: }
1199:
1200: // process the write operation
1201: private void processWrite(Selector selector) {
1202: Set keys = selector.selectedKeys();
1203: Object arr[] = keys.toArray();
1204: for (int looper = 0; looper < arr.length; looper++) {
1205: SelectionKey key = (SelectionKey) arr[looper];
1206: SelectorWriteHandler entry = (SelectorWriteHandler) key
1207: .attachment();
1208: boolean needToDecrementPendingChannels = false;
1209: try {
1210: if (0 == entry.write()) { // write the buffer and if the remaining bytes is zero,
1211: // notify the caller of number of bytes written.
1212: entry.notifyObject(new Integer(entry
1213: .getBytesWritten()));
1214: // switch to next write buffer or clear interest bit on socket channel.
1215: if (!entry.next()) {
1216: needToDecrementPendingChannels = true;
1217: }
1218: }
1219:
1220: } catch (IOException e) {
1221: needToDecrementPendingChannels = true;
1222: // connection must of closed
1223: handleChannelError(entry, e);
1224: } finally {
1225: if (needToDecrementPendingChannels)
1226: m_pendingChannels--;
1227: }
1228: }
1229: keys.clear();
1230: }
1231:
1232: public void run() {
1233: while (SELECTOR.isOpen()) {
1234: try {
1235: WriteRequest queueEntry;
1236: Object o;
1237:
1238: // When there are no more commands in the Queue, we will hit the blocking code after this loop.
1239: while (null != (o = QUEUE.poll(0))) {
1240: if (o instanceof Shutdown) // Stop the thread
1241: {
1242: try {
1243: SELECTOR.close();
1244: } catch (IOException e) {
1245: if (LOG.isInfoEnabled())
1246: LOG
1247: .info(
1248: "Write selector close operation failed",
1249: e);
1250: }
1251: return;
1252: }
1253: queueEntry = (WriteRequest) o;
1254:
1255: if (queueEntry.getHandler().add(queueEntry)) {
1256: // If the add operation returns true, than means that a buffer is available to be written to the
1257: // corresponding channel and channel's selection key has been modified to indicate interest in the
1258: // 'write' operation.
1259: // If the add operation threw an exception, we will not increment m_pendingChannels which
1260: // seems correct as long as a new buffer wasn't added to be sent.
1261: // Another way to view this is that we don't have to protect m_pendingChannels on the increment
1262: // side, only need to protect on the decrement side (this logic of this run() will be incorrect
1263: // if m_pendingChannels is set incorrectly).
1264: m_pendingChannels++;
1265: }
1266:
1267: try {
1268: // process any connections ready to be written to.
1269: if (SELECTOR.selectNow() > 0) {
1270: processWrite(SELECTOR);
1271: }
1272: } catch (IOException e) { // need to understand what causes this error so we can handle it properly
1273: if (LOG.isErrorEnabled())
1274: LOG
1275: .error(
1276: "SelectNow operation on write selector failed, didn't expect this to occur, please report this",
1277: e);
1278: return; // if select fails, give up so we don't go into a busy loop.
1279: }
1280: }
1281:
1282: // if there isn't any pending work to do, block on queue to get next request.
1283: if (m_pendingChannels == 0) {
1284: o = QUEUE.take();
1285: if (o instanceof Shutdown) { // Stop the thread
1286: try {
1287: SELECTOR.close();
1288: } catch (IOException e) {
1289: if (LOG.isInfoEnabled())
1290: LOG
1291: .info(
1292: "Write selector close operation failed",
1293: e);
1294: }
1295: return;
1296: }
1297: queueEntry = (WriteRequest) o;
1298: if (queueEntry.getHandler().add(queueEntry))
1299: m_pendingChannels++;
1300: }
1301: // otherwise do a blocking wait select operation.
1302: else {
1303: try {
1304: if ((SELECTOR.select()) > 0) {
1305: processWrite(SELECTOR);
1306: }
1307: } catch (IOException e) { // need to understand what causes this error
1308: if (LOG.isErrorEnabled())
1309: LOG
1310: .error(
1311: "Failure while writing to socket",
1312: e);
1313: }
1314: }
1315: } catch (InterruptedException e) {
1316: if (LOG.isErrorEnabled())
1317: LOG.error("Thread ("
1318: + Thread.currentThread().getName()
1319: + ") was interrupted", e);
1320: } catch (Throwable e) // Log throwable rather than terminating this thread.
1321: { // We are a daemon thread so we shouldn't prevent the process from terminating if
1322: // the controlling thread decides that should happen.
1323: if (LOG.isErrorEnabled())
1324: LOG.error("Thread ("
1325: + Thread.currentThread().getName()
1326: + ") caught Throwable", e);
1327: }
1328: }
1329: }
1330: }
1331:
1332: // Wrapper class for passing Write requests. There will be an instance of this class for each socketChannel
1333: // mapped to a Selector.
1334: public static class SelectorWriteHandler {
1335:
1336: private final LinkedList m_writeRequests = new LinkedList(); // Collection of writeRequests
1337: private boolean m_headerSent = false;
1338: private SocketChannel m_channel;
1339: private SelectionKey m_key;
1340: private Selector m_selector;
1341: private int m_bytesWritten = 0;
1342: private boolean m_enabled = false;
1343: private ByteBuffer m_headerBuffer;
1344:
1345: SelectorWriteHandler(SocketChannel channel, Selector selector,
1346: ByteBuffer headerBuffer) {
1347: m_channel = channel;
1348: m_selector = selector;
1349: m_headerBuffer = headerBuffer;
1350: }
1351:
1352: private void register(Selector selector, SocketChannel channel)
1353: throws ClosedChannelException {
1354: // register the channel but don't enable OP_WRITE until we have a write request.
1355: m_key = channel.register(selector, 0, this );
1356: }
1357:
1358: // return true if selection key is enabled when it wasn't previous to call.
1359: private boolean enable() {
1360: boolean rc = false;
1361:
1362: try {
1363: if (m_key == null) { // register the socket on first access,
1364: // we are the only thread using this variable, so no sync needed.
1365: register(m_selector, m_channel);
1366: }
1367: } catch (ClosedChannelException e) {
1368: return rc;
1369: }
1370:
1371: if (!m_enabled) {
1372: rc = true;
1373: try {
1374: m_key.interestOps(SelectionKey.OP_WRITE);
1375: } catch (CancelledKeyException e) { // channel must of closed
1376: return false;
1377: }
1378: m_enabled = true;
1379: }
1380: return rc;
1381: }
1382:
1383: private void disable() {
1384: if (m_enabled) {
1385: try {
1386: m_key.interestOps(0); // pass zero which means that we are not interested in being
1387: // notified of anything for this channel.
1388: } catch (CancelledKeyException eat) // If we finished writing and didn't get an exception, then
1389: { // we probably don't need to throw this exception (if they try to write
1390: // again, we will then throw an exception).
1391: }
1392: m_enabled = false;
1393: }
1394: }
1395:
1396: private void cancel() {
1397: m_key.cancel();
1398: }
1399:
1400: boolean add(WriteRequest entry) {
1401: m_writeRequests.add(entry);
1402: return enable();
1403: }
1404:
1405: WriteRequest getCurrentRequest() {
1406: return (WriteRequest) m_writeRequests.getFirst();
1407: }
1408:
1409: SocketChannel getChannel() {
1410: return m_channel;
1411: }
1412:
1413: ByteBuffer getBuffer() {
1414: return getCurrentRequest().getBuffer();
1415: }
1416:
1417: FutureResult getCallback() {
1418: return getCurrentRequest().getCallback();
1419: }
1420:
1421: int getBytesWritten() {
1422: return m_bytesWritten;
1423: }
1424:
1425: void notifyError(Throwable error) {
1426: if (getCallback() != null)
1427: getCallback().setException(error);
1428: }
1429:
1430: void notifyObject(Object result) {
1431: if (getCallback() != null)
1432: getCallback().set(result);
1433: }
1434:
1435: /**
1436: * switch to next request or disable write interest bit if there are no more buffers.
1437: *
1438: * @return true if another request was found to be processed.
1439: */
1440: boolean next() {
1441: m_headerSent = false;
1442: m_bytesWritten = 0;
1443:
1444: m_writeRequests.removeFirst(); // remove current entry
1445: boolean rc = !m_writeRequests.isEmpty();
1446: if (!rc) // disable select for this channel if no more entries
1447: disable();
1448: return rc;
1449: }
1450:
1451: /**
1452: * @return bytes remaining to write. This function will only throw IOException, unchecked exceptions are not
1453: * expected to be thrown from here. It is very important for the caller to know if an unchecked exception can
1454: * be thrown in here. Please correct the following throws list to include any other exceptions and update
1455: * caller to handle them.
1456: * @throws IOException
1457: */
1458: int write() throws IOException {
1459: // Send header first. Note that while we are writing the shared header buffer,
1460: // no other threads can access the header buffer as we are the only thread that has access to it.
1461: if (!m_headerSent) {
1462: m_headerSent = true;
1463: m_headerBuffer.clear();
1464: m_headerBuffer.putInt(getBuffer().remaining());
1465: m_headerBuffer.flip();
1466: do {
1467: getChannel().write(m_headerBuffer);
1468: } // we should be able to handle writing the header in one action but just in case, just do a busy loop
1469: while (m_headerBuffer.remaining() > 0);
1470:
1471: }
1472:
1473: m_bytesWritten += (getChannel().write(getBuffer()));
1474:
1475: return getBuffer().remaining();
1476: }
1477:
1478: }
1479:
1480: public static class WriteRequest {
1481: private final SocketChannel m_channel;
1482: private final ByteBuffer m_buffer;
1483: private final FutureResult m_callback;
1484: private final SelectorWriteHandler m_hdlr;
1485:
1486: WriteRequest(SocketChannel channel, ByteBuffer buffer,
1487: FutureResult callback, SelectorWriteHandler hdlr) {
1488: m_channel = channel;
1489: m_buffer = buffer;
1490: m_callback = callback;
1491: m_hdlr = hdlr;
1492: }
1493:
1494: SelectorWriteHandler getHandler() {
1495: return m_hdlr;
1496: }
1497:
1498: SocketChannel getChannel() {
1499: return m_channel;
1500: }
1501:
1502: ByteBuffer getBuffer() {
1503: return m_buffer;
1504: }
1505:
1506: FutureResult getCallback() {
1507: return m_callback;
1508: }
1509:
1510: }
1511:
1512: }
|