001: // $Id: TCP.java,v 1.37.2.1 2007/04/27 08:03:50 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.Address;
006: import org.jgroups.blocks.ConnectionTable;
007: import org.jgroups.stack.IpAddress;
008:
009: import java.net.InetAddress;
010: import java.util.Collection;
011: import java.util.Properties;
012:
013: /**
014: * TCP based protocol. Creates a server socket, which gives us the local address of this group member. For
015: * each accept() on the server socket, a new thread is created that listens on the socket.
016: * For each outgoing message m, if m.dest is in the ougoing hashtable, the associated socket will be reused
017: * to send message, otherwise a new socket is created and put in the hashtable.
018: * When a socket connection breaks or a member is removed from the group, the corresponding items in the
019: * incoming and outgoing hashtables will be removed as well.<br>
020: * This functionality is in ConnectionTable, which isT used by TCP. TCP sends messages using ct.send() and
021: * registers with the connection table to receive all incoming messages.
022: * @author Bela Ban
023: */
024: public class TCP extends BasicTCP implements ConnectionTable.Receiver {
025: private ConnectionTable ct = null;
026:
027: public TCP() {
028: }
029:
030: public String getName() {
031: return "TCP";
032: }
033:
034: public int getOpenConnections() {
035: return ct.getNumConnections();
036: }
037:
038: public String printConnections() {
039: return ct.toString();
040: }
041:
042: /** Setup the Protocol instance acording to the configuration string */
043: public boolean setProperties(Properties props) {
044: super .setProperties(props);
045: if (props.size() > 0) {
046: log.error("the following properties are not recognized: "
047: + props);
048: return false;
049: }
050: return true;
051: }
052:
053: public void send(Address dest, byte[] data, int offset, int length)
054: throws Exception {
055: ct.send(dest, data, offset, length);
056: }
057:
058: public void retainAll(Collection members) {
059: ct.retainAll(members);
060: }
061:
062: public void start() throws Exception {
063: ct = getConnectionTable(reaper_interval, conn_expire_time,
064: bind_addr, external_addr, start_port, end_port);
065: ct.setUseSendQueues(use_send_queues);
066: // ct.addConnectionListener(this);
067: ct.setReceiveBufferSize(recv_buf_size);
068: ct.setSendBufferSize(send_buf_size);
069: ct.setSocketConnectionTimeout(sock_conn_timeout);
070: ct.setTcpNodelay(tcp_nodelay);
071: ct.setLinger(linger);
072: local_addr = ct.getLocalAddress();
073: if (additional_data != null && local_addr instanceof IpAddress)
074: ((IpAddress) local_addr).setAdditionalData(additional_data);
075: super .start();
076: }
077:
078: public void stop() {
079: ct.stop();
080: super .stop();
081: }
082:
083: /**
084: * @param reaperInterval
085: * @param connExpireTime
086: * @param bindAddress
087: * @param startPort
088: * @throws Exception
089: * @return ConnectionTable
090: * Sub classes overrides this method to initialize a different version of
091: * ConnectionTable.
092: */
093: protected ConnectionTable getConnectionTable(long reaperInterval,
094: long connExpireTime, InetAddress bindAddress,
095: InetAddress externalAddress, int startPort, int endPort)
096: throws Exception {
097: ConnectionTable cTable;
098: if (reaperInterval == 0 && connExpireTime == 0) {
099: cTable = new ConnectionTable(this , bindAddress,
100: externalAddress, startPort, endPort);
101: } else {
102: if (reaperInterval == 0) {
103: reaperInterval = 5000;
104: if (log.isWarnEnabled())
105: log.warn("reaper_interval was 0, set it to "
106: + reaperInterval);
107: }
108: if (connExpireTime == 0) {
109: connExpireTime = 1000 * 60 * 5;
110: if (log.isWarnEnabled())
111: log.warn("conn_expire_time was 0, set it to "
112: + connExpireTime);
113: }
114: cTable = new ConnectionTable(this, bindAddress,
115: externalAddress, startPort, endPort,
116: reaperInterval, connExpireTime);
117: }
118: return cTable;
119: }
120:
121: }
|