001: package org.jgroups.protocols;
002:
003: import org.jgroups.blocks.ConnectionTableNIO;
004: import org.jgroups.blocks.BasicConnectionTable;
005: import org.jgroups.Address;
006: import org.jgroups.stack.IpAddress;
007:
008: import java.net.InetAddress;
009: import java.util.Properties;
010: import java.util.Collection;
011:
012: /**
013: * Transport using NIO
014: * @author Scott Marlow
015: * @author Alex Fu
016: * @author Bela Ban
017: * @version $Id: TCP_NIO.java,v 1.11.2.1 2007/04/27 08:03:51 belaban Exp $
018: */
019: public class TCP_NIO extends BasicTCP implements
020: BasicConnectionTable.Receiver {
021:
022: /*
023: * (non-Javadoc)
024: *
025: * @see org.jgroups.protocols.TCP#getConnectionTable(long, long)
026: */
027: protected ConnectionTableNIO getConnectionTable(long ri, long cet,
028: InetAddress b_addr, InetAddress bc_addr, int s_port,
029: int e_port) throws Exception {
030: ConnectionTableNIO retval = null;
031: if (ri == 0 && cet == 0) {
032: retval = new ConnectionTableNIO(this , b_addr, bc_addr,
033: s_port, e_port, false);
034: } else {
035: if (ri == 0) {
036: ri = 5000;
037: if (log.isWarnEnabled())
038: log.warn("reaper_interval was 0, set it to " + ri);
039: }
040: if (cet == 0) {
041: cet = 1000 * 60 * 5;
042: if (log.isWarnEnabled())
043: log
044: .warn("conn_expire_time was 0, set it to "
045: + cet);
046: }
047: retval = new ConnectionTableNIO(this , b_addr, bc_addr,
048: s_port, e_port, ri, cet, false);
049: }
050:
051: retval.setProcessorMaxThreads(getProcessorMaxThreads());
052: retval.setProcessorQueueSize(getProcessorQueueSize());
053: retval.setProcessorMinThreads(getProcessorMinThreads());
054: retval.setProcessorKeepAliveTime(getProcessorKeepAliveTime());
055: retval.setProcessorThreads(getProcessorThreads());
056: retval.start();
057: return retval;
058: }
059:
060: public String printConnections() {
061: return ct.toString();
062: }
063:
064: public void send(Address dest, byte[] data, int offset, int length)
065: throws Exception {
066: ct.send(dest, data, offset, length);
067: }
068:
069: public void start() throws Exception {
070: ct = getConnectionTable(reaper_interval, conn_expire_time,
071: bind_addr, external_addr, start_port, end_port);
072: ct.setUseSendQueues(use_send_queues);
073: // ct.addConnectionListener(this);
074: ct.setReceiveBufferSize(recv_buf_size);
075: ct.setSendBufferSize(send_buf_size);
076: ct.setSocketConnectionTimeout(sock_conn_timeout);
077: ct.setTcpNodelay(tcp_nodelay);
078: ct.setLinger(linger);
079: local_addr = ct.getLocalAddress();
080: if (additional_data != null && local_addr instanceof IpAddress)
081: ((IpAddress) local_addr).setAdditionalData(additional_data);
082: super .start();
083: }
084:
085: public void retainAll(Collection members) {
086: ct.retainAll(members);
087: }
088:
089: public void stop() {
090: ct.stop();
091: super .stop();
092: }
093:
094: public String getName() {
095: return "TCP_NIO";
096: }
097:
098: public int getReaderThreads() {
099: return m_reader_threads;
100: }
101:
102: public int getWriterThreads() {
103: return m_writer_threads;
104: }
105:
106: public int getProcessorThreads() {
107: return m_processor_threads;
108: }
109:
110: public int getProcessorMinThreads() {
111: return m_processor_minThreads;
112: }
113:
114: public int getProcessorMaxThreads() {
115: return m_processor_maxThreads;
116: }
117:
118: public int getProcessorQueueSize() {
119: return m_processor_queueSize;
120: }
121:
122: public int getProcessorKeepAliveTime() {
123: return m_processor_keepAliveTime;
124: }
125:
126: public int getOpenConnections() {
127: return ct.getNumConnections();
128: }
129:
130: /** Setup the Protocol instance acording to the configuration string */
131: public boolean setProperties(Properties props) {
132: String str;
133:
134: str = props.getProperty("reader_threads");
135: if (str != null) {
136: m_reader_threads = Integer.parseInt(str);
137: props.remove("reader_threads");
138: }
139:
140: str = props.getProperty("writer_threads");
141: if (str != null) {
142: m_writer_threads = Integer.parseInt(str);
143: props.remove("writer_threads");
144: }
145:
146: str = props.getProperty("processor_threads");
147: if (str != null) {
148: m_processor_threads = Integer.parseInt(str);
149: props.remove("processor_threads");
150: }
151:
152: str = props.getProperty("processor_minThreads");
153: if (str != null) {
154: m_processor_minThreads = Integer.parseInt(str);
155: props.remove("processor_minThreads");
156: }
157:
158: str = props.getProperty("processor_maxThreads");
159: if (str != null) {
160: m_processor_maxThreads = Integer.parseInt(str);
161: props.remove("processor_maxThreads");
162: }
163:
164: str = props.getProperty("processor_queueSize");
165: if (str != null) {
166: m_processor_queueSize = Integer.parseInt(str);
167: props.remove("processor_queueSize");
168: }
169:
170: str = props.getProperty("processor_keepAliveTime");
171: if (str != null) {
172: m_processor_keepAliveTime = Integer.parseInt(str);
173: props.remove("processor_keepAliveTime");
174: }
175:
176: return super .setProperties(props);
177: }
178:
179: private int m_reader_threads = 8;
180:
181: private int m_writer_threads = 8;
182:
183: private int m_processor_threads = 10; // PooledExecutor.createThreads()
184: private int m_processor_minThreads = 10; // PooledExecutor.setMinimumPoolSize()
185: private int m_processor_maxThreads = 10; // PooledExecutor.setMaxThreads()
186: private int m_processor_queueSize = 100; // Number of queued requests that can be pending waiting
187: // for a background thread to run the request.
188: private int m_processor_keepAliveTime = -1; // PooledExecutor.setKeepAliveTime( milliseconds);
189: // A negative value means to wait forever
190: private ConnectionTableNIO ct;
191: }
|