001: // $Id: ConnectionTable.java,v 1.49 2006/09/30 16:17:30 belaban Exp $
002:
003: package org.jgroups.blocks;
004:
005: import org.jgroups.Address;
006: import org.jgroups.stack.IpAddress;
007: import org.jgroups.util.Util;
008:
009: import java.io.*;
010: import java.net.*;
011: import java.util.*;
012:
013: /**
014: * Manages incoming and outgoing TCP connections. For each outgoing message to destination P, if there
015: * is not yet a connection for P, one will be created. Subsequent outgoing messages will use this
016: * connection. For incoming messages, one server socket is created at startup. For each new incoming
017: * client connecting, a new thread from a thread pool is allocated and listens for incoming messages
018: * until the socket is closed by the peer.<br>Sockets/threads with no activity will be killed
019: * after some time.
020: * <p>
021: * Incoming messages from any of the sockets can be received by setting the message listener.
022: * @author Bela Ban
023: */
024: public class ConnectionTable extends BasicConnectionTable implements
025: Runnable {
026:
027: /**
028: * Regular ConnectionTable without expiration of idle connections
029: * @param srv_port The port on which the server will listen. If this port is reserved, the next
030: * free port will be taken (incrementing srv_port).
031: */
032: public ConnectionTable(int srv_port) throws Exception {
033: this .srv_port = srv_port;
034: start();
035: }
036:
037: public ConnectionTable(InetAddress bind_addr, int srv_port)
038: throws Exception {
039: this .srv_port = srv_port;
040: this .bind_addr = bind_addr;
041: start();
042: }
043:
044: /**
045: * ConnectionTable including a connection reaper. Connections that have been idle for more than conn_expire_time
046: * milliseconds will be closed and removed from the connection table. On next access they will be re-created.
047: * @param srv_port The port on which the server will listen
048: * @param reaper_interval Number of milliseconds to wait for reaper between attepts to reap idle connections
049: * @param conn_expire_time Number of milliseconds a connection can be idle (no traffic sent or received until
050: * it will be reaped
051: */
052: public ConnectionTable(int srv_port, long reaper_interval,
053: long conn_expire_time) throws Exception {
054: this .srv_port = srv_port;
055: this .reaper_interval = reaper_interval;
056: this .conn_expire_time = conn_expire_time;
057: use_reaper = true;
058: start();
059: }
060:
061: /**
062: * Create a ConnectionTable
063: * @param r A reference to a receiver of all messages received by this class. Method <code>receive()</code>
064: * will be called.
065: * @param bind_addr The host name or IP address of the interface to which the server socket will bind.
066: * This is interesting only in multi-homed systems. If bind_addr is null, the
067: * server socket will bind to the first available interface (e.g. /dev/hme0 on
068: * Solaris or /dev/eth0 on Linux systems).
069: * @param external_addr The address which will be broadcast to the group (the externally visible address
070: * which this host should be contacted on). If external_addr is null, it will default to
071: * the same address that the server socket is bound to.
072: * @param srv_port The port to which the server socket will bind to. If this port is reserved, the next
073: * free port will be taken (incrementing srv_port).
074: * @param max_port The largest port number that the server socket will be bound to. If max_port < srv_port
075: * then there is no limit.
076: */
077: public ConnectionTable(Receiver r, InetAddress bind_addr,
078: InetAddress external_addr, int srv_port, int max_port)
079: throws Exception {
080: setReceiver(r);
081: this .bind_addr = bind_addr;
082: this .external_addr = external_addr;
083: this .srv_port = srv_port;
084: this .max_port = max_port;
085: start();
086: }
087:
088: /**
089: * ConnectionTable including a connection reaper. Connections that have been idle for more than conn_expire_time
090: * milliseconds will be closed and removed from the connection table. On next access they will be re-created.
091: *
092: * @param r The Receiver
093: * @param bind_addr The host name or IP address of the interface to which the server socket will bind.
094: * This is interesting only in multi-homed systems. If bind_addr is null, the
095: * server socket will bind to the first available interface (e.g. /dev/hme0 on
096: * Solaris or /dev/eth0 on Linux systems).
097: * @param external_addr The address which will be broadcast to the group (the externally visible address
098: * which this host should be contacted on). If external_addr is null, it will default to
099: * the same address that the server socket is bound to.
100: * @param srv_port The port to which the server socket will bind to. If this port is reserved, the next
101: * free port will be taken (incrementing srv_port).
102: * @param max_port The largest port number that the server socket will be bound to. If max_port < srv_port
103: * then there is no limit.
104: * @param reaper_interval Number of milliseconds to wait for reaper between attepts to reap idle connections
105: * @param conn_expire_time Number of milliseconds a connection can be idle (no traffic sent or received until
106: * it will be reaped
107: */
108: public ConnectionTable(Receiver r, InetAddress bind_addr,
109: InetAddress external_addr, int srv_port, int max_port,
110: long reaper_interval, long conn_expire_time)
111: throws Exception {
112: setReceiver(r);
113: this .bind_addr = bind_addr;
114: this .external_addr = external_addr;
115: this .srv_port = srv_port;
116: this .max_port = max_port;
117: this .reaper_interval = reaper_interval;
118: this .conn_expire_time = conn_expire_time;
119: use_reaper = true;
120: start();
121: }
122:
123: /** Try to obtain correct Connection (or create one if not yet existent) */
124: Connection getConnection(Address dest) throws Exception {
125: Connection conn = null;
126: Socket sock;
127:
128: synchronized (conns) {
129: conn = (Connection) conns.get(dest);
130: if (conn == null) {
131: // changed by bela Jan 18 2004: use the bind address for the client sockets as well
132: SocketAddress tmpBindAddr = new InetSocketAddress(
133: bind_addr, 0);
134: InetAddress tmpDest = ((IpAddress) dest).getIpAddress();
135: SocketAddress destAddr = new InetSocketAddress(tmpDest,
136: ((IpAddress) dest).getPort());
137: sock = new Socket();
138: sock.bind(tmpBindAddr);
139: sock.setKeepAlive(true);
140: sock.setTcpNoDelay(tcp_nodelay);
141: if (linger > 0)
142: sock.setSoLinger(true, linger);
143: else
144: sock.setSoLinger(false, -1);
145: sock.connect(destAddr, sock_conn_timeout);
146:
147: try {
148: sock.setSendBufferSize(send_buf_size);
149: } catch (IllegalArgumentException ex) {
150: if (log.isErrorEnabled())
151: log.error(
152: "exception setting send buffer size to "
153: + send_buf_size + " bytes", ex);
154: }
155: try {
156: sock.setReceiveBufferSize(recv_buf_size);
157: } catch (IllegalArgumentException ex) {
158: if (log.isErrorEnabled())
159: log.error(
160: "exception setting receive buffer size to "
161: + send_buf_size + " bytes", ex);
162: }
163: conn = new Connection(sock, dest);
164: conn.sendLocalAddress(local_addr);
165: notifyConnectionOpened(dest);
166: // conns.put(dest, conn);
167: addConnection(dest, conn);
168: conn.init();
169: if (log.isInfoEnabled())
170: log.info("created socket to " + dest);
171: }
172: return conn;
173: }
174: }
175:
176: public final void start() throws Exception {
177: init();
178: srv_sock = createServerSocket(srv_port, max_port);
179:
180: if (external_addr != null)
181: local_addr = new IpAddress(external_addr, srv_sock
182: .getLocalPort());
183: else if (bind_addr != null)
184: local_addr = new IpAddress(bind_addr, srv_sock
185: .getLocalPort());
186: else
187: local_addr = new IpAddress(srv_sock.getLocalPort());
188:
189: if (log.isInfoEnabled())
190: log.info("server socket created on " + local_addr);
191:
192: //Roland Kurmann 4/7/2003, build new thread group
193: thread_group = new ThreadGroup(Util.getGlobalThreadGroup(),
194: "ConnectionTableGroup");
195: //Roland Kurmann 4/7/2003, put in thread_group
196: acceptor = new Thread(thread_group, this ,
197: "ConnectionTable.AcceptorThread");
198: acceptor.setDaemon(true);
199: acceptor.start();
200:
201: // start the connection reaper - will periodically remove unused connections
202: if (use_reaper && reaper == null) {
203: reaper = new Reaper();
204: reaper.start();
205: }
206: super .start();
207: }
208:
209: protected void init() throws Exception {
210: }
211:
212: /** Closes all open sockets, the server socket and all threads waiting for incoming messages */
213: public void stop() {
214: super .stop();
215:
216: // 1. Stop the reaper
217: if (reaper != null)
218: reaper.stop();
219:
220: // 2. close the server socket (this also stops the acceptor thread)
221: if (srv_sock != null) {
222: try {
223: ServerSocket tmp = srv_sock;
224: srv_sock = null;
225: tmp.close();
226: } catch (Exception e) {
227: }
228: }
229:
230: // 3. then close the connections
231: Connection conn;
232: Collection tmp = null;
233: synchronized (conns) {
234: tmp = new LinkedList(conns.values());
235: conns.clear();
236: }
237: if (tmp != null) {
238: for (Iterator it = tmp.iterator(); it.hasNext();) {
239: conn = (Connection) it.next();
240: conn.destroy();
241: }
242: tmp.clear();
243: }
244: local_addr = null;
245: }
246:
247: /**
248: * Acceptor thread. Continuously accept new connections. Create a new thread for each new
249: * connection and put it in conns. When the thread should stop, it is
250: * interrupted by the thread creator.
251: */
252: public void run() {
253: Socket client_sock = null;
254: Connection conn = null;
255: Address peer_addr;
256:
257: while (srv_sock != null) {
258: try {
259: conn = null;
260: client_sock = srv_sock.accept();
261: if (!running) {
262: if (log.isWarnEnabled())
263: log.warn("cannot accept connection from "
264: + client_sock.getRemoteSocketAddress()
265: + " as I'm closed");
266: break;
267: }
268: if (log.isTraceEnabled())
269: log.trace("accepted connection from "
270: + client_sock.getInetAddress() + ":"
271: + client_sock.getPort());
272: try {
273: client_sock.setSendBufferSize(send_buf_size);
274: } catch (IllegalArgumentException ex) {
275: if (log.isErrorEnabled())
276: log.error(
277: "exception setting send buffer size to "
278: + send_buf_size + " bytes", ex);
279: }
280: try {
281: client_sock.setReceiveBufferSize(recv_buf_size);
282: } catch (IllegalArgumentException ex) {
283: if (log.isErrorEnabled())
284: log.error(
285: "exception setting receive buffer size to "
286: + send_buf_size + " bytes", ex);
287: }
288:
289: client_sock.setKeepAlive(true);
290: client_sock.setTcpNoDelay(tcp_nodelay);
291: if (linger > 0)
292: client_sock.setSoLinger(true, linger);
293: else
294: client_sock.setSoLinger(false, -1);
295:
296: // create new thread and add to conn table
297: conn = new Connection(client_sock, null); // will call receive(msg)
298: // get peer's address
299: peer_addr = conn.readPeerAddress(client_sock);
300:
301: // client_addr=new IpAddress(client_sock.getInetAddress(), client_port);
302: conn.setPeerAddress(peer_addr);
303:
304: synchronized (conns) {
305: if (conns.containsKey(peer_addr)) {
306: if (log.isTraceEnabled())
307: log
308: .trace(peer_addr
309: + " is already there, will reuse connection");
310: //conn.destroy();
311: //continue; // return; // we cannot terminate the thread (bela Sept 2 2004)
312: } else {
313: // conns.put(peer_addr, conn);
314: addConnection(peer_addr, conn);
315: notifyConnectionOpened(peer_addr);
316: }
317: }
318:
319: conn.init(); // starts handler thread on this socket
320: } catch (SocketException sock_ex) {
321: if (log.isInfoEnabled())
322: log.info("exception is " + sock_ex);
323: if (conn != null)
324: conn.destroy();
325: if (srv_sock == null)
326: break; // socket was closed, therefore stop
327: } catch (Throwable ex) {
328: if (log.isWarnEnabled())
329: log.warn("exception is " + ex);
330: if (srv_sock == null)
331: break; // socket was closed, therefore stop
332: }
333: }
334: if (client_sock != null)
335: try {
336: client_sock.close();
337: } catch (IOException e) {
338: }
339: if (log.isTraceEnabled())
340: log.trace(Thread.currentThread().getName() + " terminated");
341: }
342:
343: /** Finds first available port starting at start_port and returns server socket.
344: * Will not bind to port >end_port. Sets srv_port */
345: protected ServerSocket createServerSocket(int start_port,
346: int end_port) throws Exception {
347: ServerSocket ret = null;
348:
349: while (true) {
350: try {
351: if (bind_addr == null)
352: ret = new ServerSocket(start_port);
353: else {
354:
355: ret = new ServerSocket(start_port, backlog,
356: bind_addr);
357: }
358: } catch (BindException bind_ex) {
359: if (start_port == end_port)
360: throw new BindException(
361: "No available port to bind to");
362: if (bind_addr != null) {
363: NetworkInterface nic = NetworkInterface
364: .getByInetAddress(bind_addr);
365: if (nic == null)
366: throw new BindException("bind_addr "
367: + bind_addr
368: + " is not a valid interface");
369: }
370: start_port++;
371: continue;
372: } catch (IOException io_ex) {
373: if (log.isErrorEnabled())
374: log.error("exception is " + io_ex);
375: }
376: srv_port = start_port;
377: break;
378: }
379: return ret;
380: }
381:
382: }
|