001: package org.jgroups.blocks;
003: import org.jgroups.Address;
004: import org.jgroups.Version;
005: import org.jgroups.stack.IpAddress;
006: import org.jgroups.util.Queue;
007: import org.jgroups.util.QueueClosedException;
008: import org.jgroups.util.Util;
009: import org.apache.commons.logging.Log;
010: import org.apache.commons.logging.LogFactory;
012: import java.net.Socket;
013: import java.net.InetAddress;
014: import java.net.SocketException;
015: import java.net.ServerSocket;
016: import java.io.DataOutputStream;
017: import java.io.DataInputStream;
018: import java.io.BufferedOutputStream;
019: import java.io.BufferedInputStream;
020: import java.io.IOException;
021: import java.io.EOFException;
022: import java.util.*;
024: /**
025: * Shared class for TCP connection tables.
026: * @author Scott Marlow
027: */
028: public abstract class BasicConnectionTable {
029: final HashMap conns = new HashMap(); // keys: Addresses (peer address), values: Connection
030: Receiver receiver = null;
031: boolean use_send_queues = true;
032: InetAddress bind_addr = null;
033: Address local_addr = null; // bind_addr + port of srv_sock
034: int srv_port = 7800;
035: int recv_buf_size = 120000;
036: int send_buf_size = 60000;
037: final Vector conn_listeners = new Vector(); // listeners to be notified when a conn is established/torn down
038: final Object recv_mutex = new Object(); // to serialize simultaneous access to receive() from multiple Connections
039: Reaper reaper = null; // closes conns that have been idle for more than n secs
040: long reaper_interval = 60000; // reap unused conns once a minute
041: long conn_expire_time = 300000; // connections can be idle for 5 minutes before they are reaped
042: int sock_conn_timeout = 1000; // max time in millis to wait for Socket.connect() to return
043: ThreadGroup thread_group = null;
044: protected final Log log = LogFactory.getLog(getClass());
045: final byte[] cookie = { 'b', 'e', 'l', 'a' };
046: boolean use_reaper = false; // by default we don't reap idle conns
047: static final int backlog = 20; // 20 conn requests are queued by ServerSocket (addtl will be discarded)
048: ServerSocket srv_sock = null;
049: boolean reuse_addr = false;
050: boolean tcp_nodelay = false;
051: int linger = -1;
053: /**
054: * The address which will be broadcast to the group (the externally visible address which this host should
055: * be contacted on). If external_addr is null, it will default to the same address that the server socket is bound to.
056: */
057: InetAddress external_addr = null;
058: int max_port = 0; // maximum port to bind to (if < srv_port, no limit)
059: Thread acceptor = null; // continuously calls srv_sock.accept()
060: boolean running = false;
062: final static long MAX_JOIN_TIMEOUT = 10000;
064: public final void setReceiver(Receiver r) {
065: receiver = r;
066: }
068: public void addConnectionListener(ConnectionListener l) {
069: if (l != null && !conn_listeners.contains(l))
070: conn_listeners.addElement(l);
071: }
073: public void removeConnectionListener(ConnectionListener l) {
074: if (l != null)
075: conn_listeners.removeElement(l);
076: }
078: public Address getLocalAddress() {
079: if (local_addr == null)
080: local_addr = bind_addr != null ? new IpAddress(bind_addr,
081: srv_port) : null;
082: return local_addr;
083: }
085: public int getSendBufferSize() {
086: return send_buf_size;
087: }
089: public void setSendBufferSize(int send_buf_size) {
090: this .send_buf_size = send_buf_size;
091: }
093: public int getReceiveBufferSize() {
094: return recv_buf_size;
095: }
097: public void setReceiveBufferSize(int recv_buf_size) {
098: this .recv_buf_size = recv_buf_size;
099: }
101: public int getSocketConnectionTimeout() {
102: return sock_conn_timeout;
103: }
105: public void setSocketConnectionTimeout(int sock_conn_timeout) {
106: this .sock_conn_timeout = sock_conn_timeout;
107: }
109: public int getNumConnections() {
110: return conns.size();
111: }
113: public boolean getTcpNodelay() {
114: return tcp_nodelay;
115: }
117: public void setTcpNodelay(boolean tcp_nodelay) {
118: this .tcp_nodelay = tcp_nodelay;
119: }
121: public int getLinger() {
122: return linger;
123: }
125: public void setLinger(int linger) {
126: this .linger = linger;
127: }
129: public boolean getUseSendQueues() {
130: return use_send_queues;
131: }
133: public void setUseSendQueues(boolean flag) {
134: this .use_send_queues = flag;
135: }
137: public void start() throws Exception {
138: running = true;
139: }
141: public void stop() {
142: running = false;
143: }
145: /**
146: Remove <code>addr</code>from connection table. This is typically triggered when a member is suspected.
147: */
148: public void remove(Address addr) {
149: Connection conn;
151: synchronized (conns) {
152: conn = (Connection) conns.remove(addr);
153: }
155: if (conn != null) {
156: try {
157: conn.destroy(); // won't do anything if already destroyed
158: } catch (Exception e) {
159: }
160: }
161: if (log.isTraceEnabled())
162: log.trace("removed " + addr + ", connections are "
163: + toString());
164: }
166: /**
167: * Calls the receiver callback. We serialize access to this method because it may be called concurrently
168: * by several Connection handler threads. Therefore the receiver doesn't need to synchronize.
169: */
170: public void receive(Address sender, byte[] data, int offset,
171: int length) {
172: if (receiver != null) {
173: synchronized (recv_mutex) {
174: receiver.receive(sender, data, offset, length);
175: }
176: } else if (log.isErrorEnabled())
177: log.error("receiver is null (not set) !");
178: }
180: public String toString() {
181: StringBuffer ret = new StringBuffer();
182: Address key;
183: Connection val;
184: Map.Entry entry;
185: HashMap copy;
187: synchronized (conns) {
188: copy = new HashMap(conns);
189: }
190: ret.append("connections (" + copy.size() + "):\n");
191: for (Iterator it = copy.entrySet().iterator(); it.hasNext();) {
192: entry = (Map.Entry) it.next();
193: key = (Address) entry.getKey();
194: val = (Connection) entry.getValue();
195: ret.append("key: " + key + ": " + val + '\n');
196: }
197: ret.append('\n');
198: return ret.toString();
199: }
201: void notifyConnectionOpened(Address peer) {
202: if (peer == null)
203: return;
204: for (int i = 0; i < conn_listeners.size(); i++)
205: ((ConnectionListener) conn_listeners.elementAt(i))
206: .connectionOpened(peer);
207: }
209: void notifyConnectionClosed(Address peer) {
210: if (peer == null)
211: return;
212: for (int i = 0; i < conn_listeners.size(); i++)
213: ((ConnectionListener) conn_listeners.elementAt(i))
214: .connectionClosed(peer);
215: }
217: void addConnection(Address peer, Connection c) {
218: conns.put(peer, c);
219: if (reaper != null && !reaper.isRunning())
220: reaper.start();
221: }
223: public void send(Address dest, byte[] data, int offset, int length)
224: throws Exception {
225: Connection conn;
226: if (dest == null) {
227: if (log.isErrorEnabled())
228: log.error("destination is null");
229: return;
230: }
232: if (data == null) {
233: log.warn("data is null; discarding packet");
234: return;
235: }
237: if (!running) {
238: if (log.isWarnEnabled())
239: log
240: .warn("connection table is not running, discarding message to "
241: + dest);
242: return;
243: }
245: // 1. Try to obtain correct Connection (or create one if not yet existent)
246: try {
247: conn = getConnection(dest);
248: if (conn == null)
249: return;
250: } catch (Throwable ex) {
251: throw new Exception("connection to " + dest
252: + " could not be established", ex);
253: }
255: // 2. Send the message using that connection
256: try {
257: conn.send(data, offset, length);
258: } catch (Throwable ex) {
259: if (log.isTraceEnabled())
260: log.trace("sending msg to " + dest + " failed ("
261: + ex.getClass().getName()
262: + "); removing from connection table", ex);
263: remove(dest);
264: }
265: }
267: abstract Connection getConnection(Address dest) throws Exception;
269: /**
270: * Removes all connections from ConnectionTable which are not in c
271: * @param c
272: */
273: //public void retainAll(Collection c) {
274: // conns.keySet().retainAll(c);
275: //}
277: /**
278: * Removes all connections from ConnectionTable which are not in current_mbrs
279: * @param current_mbrs
280: */
281: public void retainAll(Collection current_mbrs) {
282: if (current_mbrs == null)
283: return;
284: HashMap copy;
285: synchronized (conns) {
286: copy = new HashMap(conns);
287: conns.keySet().retainAll(current_mbrs);
288: }
290: // All of the connections that were not retained must be destroyed
291: // so that their resources are cleaned up.
292: Map.Entry entry;
293: for (Iterator it = copy.entrySet().iterator(); it.hasNext();) {
294: entry = (Map.Entry) it.next();
295: Object oKey = entry.getKey();
296: if (!current_mbrs.contains(oKey)) { // This connection NOT in the resultant connection set
297: Connection conn = (Connection) entry.getValue();
298: if (null != conn) { // Destroy this connection
299: if (log.isTraceEnabled())
300: log.trace("Destroy this orphaned connection: "
301: + conn);
302: conn.destroy();
303: }
304: }
305: }
306: copy.clear();
307: }
309: /** Used for message reception. */
310: public interface Receiver {
311: void receive(Address sender, byte[] data, int offset, int length);
312: }
314: /** Used to be notified about connection establishment and teardown. */
315: public interface ConnectionListener {
316: void connectionOpened(Address peer_addr);
318: void connectionClosed(Address peer_addr);
319: }
321: class Connection implements Runnable {
322: Socket sock = null; // socket to/from peer (result of srv_sock.accept() or new Socket())
323: String sock_addr = null; // used for Thread.getName()
324: DataOutputStream out = null; // for sending messages
325: DataInputStream in = null; // for receiving messages
326: Thread receiverThread = null; // thread for receiving messages
327: Address peer_addr = null; // address of the 'other end' of the connection
328: final Object send_mutex = new Object(); // serialize sends
329: long last_access = System.currentTimeMillis(); // last time a message was sent or received
331: /** Queue<byte[]> of data to be sent to the peer of this connection */
332: Queue send_queue = new Queue();
333: Sender sender = new ConnectionTable.Connection.Sender();
334: boolean is_running = false;
336: private String getSockAddress() {
337: if (sock_addr != null)
338: return sock_addr;
339: if (sock != null) {
340: StringBuffer sb = new StringBuffer();
341: sb.append(sock.getLocalAddress().getHostAddress())
342: .append(':').append(sock.getLocalPort());
343: sb.append(" - ").append(
344: sock.getInetAddress().getHostAddress()).append(
345: ':').append(sock.getPort());
346: sock_addr = sb.toString();
347: }
348: return sock_addr;
349: }
351: Connection(Socket s, Address peer_addr) {
352: sock = s;
353: this .peer_addr = peer_addr;
354: try {
355: // out=new DataOutputStream(sock.getOutputStream());
356: // in=new DataInputStream(sock.getInputStream());
358: // The change to buffered input and output stream yielded a 400% performance gain !
359: // bela Sept 7 2006
360: out = new DataOutputStream(new BufferedOutputStream(
361: sock.getOutputStream()));
362: in = new DataInputStream(new BufferedInputStream(sock
363: .getInputStream()));
364: } catch (Exception ex) {
365: if (log.isErrorEnabled())
366: log.error("exception is " + ex);
367: }
368: }
370: boolean established() {
371: return receiverThread != null;
372: }
374: void setPeerAddress(Address peer_addr) {
375: this .peer_addr = peer_addr;
376: }
378: Address getPeerAddress() {
379: return peer_addr;
380: }
382: void updateLastAccessed() {
383: last_access = System.currentTimeMillis();
384: }
386: void init() {
387: is_running = true;
388: if (receiverThread == null || !receiverThread.isAlive()) {
389: // Roland Kurmann 4/7/2003, put in thread_group
390: receiverThread = new Thread(thread_group, this ,
391: "ConnectionTable.Connection.Receiver ["
392: + getSockAddress() + "]");
393: receiverThread.setDaemon(true);
394: receiverThread.start();
395: if (log.isTraceEnabled())
396: log
397: .trace("ConnectionTable.Connection.Receiver started");
398: }
400: }
402: void destroy() {
403: is_running = false;
404: closeSocket(); // should terminate handler as well
405: sender.stop();
406: Thread tmp = receiverThread;
407: receiverThread = null;
408: if (tmp != null) {
409: try {
410: tmp.interrupt();
411: tmp.join(MAX_JOIN_TIMEOUT);
412: } catch (InterruptedException e) {
413: }
414: if (tmp.isAlive()) {
415: if (log.isWarnEnabled())
416: log
417: .warn("stopped receiver thread, but thread ("
418: + tmp + ") is still alive !");
419: }
420: }
421: }
423: /**
424: *
425: * @param data Guaranteed to be non null
426: * @param offset
427: * @param length
428: */
429: void send(byte[] data, int offset, int length) {
430: if (!is_running) {
431: if (log.isWarnEnabled())
432: log
433: .warn("Connection is not running, discarding message");
434: return;
435: }
436: if (use_send_queues) {
437: try {
438: // we need to copy the byte[] buffer here because the original buffer might get changed meanwhile
439: byte[] tmp = new byte[length];
440: System.arraycopy(data, offset, tmp, 0, length);
441: send_queue.add(tmp);
442: if (!sender.isRunning())
443: sender.start();
444: } catch (QueueClosedException e) {
445: log.error("failed adding message to send_queue", e);
446: }
447: } else
448: _send(data, offset, length);
449: }
451: private void _send(byte[] data, int offset, int length) {
452: synchronized (send_mutex) {
453: try {
454: doSend(data, offset, length);
455: updateLastAccessed();
456: } catch (IOException io_ex) {
457: if (log.isWarnEnabled())
458: log
459: .warn("peer closed connection, trying to re-send msg");
460: try {
461: doSend(data, offset, length);
462: updateLastAccessed();
463: } catch (IOException io_ex2) {
464: if (log.isErrorEnabled())
465: log
466: .error("2nd attempt to send data failed too");
467: } catch (Exception ex2) {
468: if (log.isErrorEnabled())
469: log.error("exception is " + ex2);
470: }
471: } catch (InterruptedException iex) {
472: } catch (Throwable ex) {
473: if (log.isErrorEnabled())
474: log.error("exception is " + ex);
475: }
476: }
477: }
479: void doSend(byte[] data, int offset, int length)
480: throws Exception {
481: try {
482: // we're using 'double-writes', sending the buffer to the destination in 2 pieces. this would
483: // ensure that, if the peer closed the connection while we were idle, we would get an exception.
484: // this won't happen if we use a single write (see Stevens, ch. 5.13).
485: if (out != null) {
486: out.writeInt(length); // write the length of the data buffer first
487: Util.doubleWrite(data, offset, length, out);
488: out.flush(); // may not be very efficient (but safe)
489: }
490: } catch (Exception ex) {
491: remove(peer_addr);
492: throw ex;
493: }
494: }
496: /**
497: * Reads the peer's address. First a cookie has to be sent which has to match my own cookie, otherwise
498: * the connection will be refused
499: */
500: Address readPeerAddress(Socket client_sock) throws Exception {
501: Address client_peer_addr = null;
502: byte[] input_cookie = new byte[cookie.length];
503: int client_port = client_sock != null ? client_sock
504: .getPort() : 0;
505: short version;
506: InetAddress client_addr = client_sock != null ? client_sock
507: .getInetAddress() : null;
509: if (in != null) {
510: initCookie(input_cookie);
512: // read the cookie first
513: in.read(input_cookie, 0, input_cookie.length);
514: if (!matchCookie(input_cookie))
515: throw new SocketException(
516: "ConnectionTable.Connection.readPeerAddress(): cookie sent by "
517: + client_peer_addr
518: + " does not match own cookie; terminating connection");
519: // then read the version
520: version = in.readShort();
522: if (Version.isBinaryCompatible(version) == false) {
523: if (log.isWarnEnabled())
524: log.warn(new StringBuffer("packet from ")
525: .append(client_addr).append(':')
526: .append(client_port).append(
527: " has different version (")
528: .append(version)
529: .append(") from ours (").append(
530: Version.version).append(
531: "). This may cause problems"));
532: }
533: client_peer_addr = new IpAddress();
534: client_peer_addr.readFrom(in);
536: updateLastAccessed();
537: }
538: return client_peer_addr;
539: }
541: /**
542: * Send the cookie first, then the our port number. If the cookie doesn't match the receiver's cookie,
543: * the receiver will reject the connection and close it.
544: */
545: void sendLocalAddress(Address local_addr) {
546: if (local_addr == null) {
547: if (log.isWarnEnabled())
548: log.warn("local_addr is null");
549: return;
550: }
551: if (out != null) {
552: try {
553: // write the cookie
554: out.write(cookie, 0, cookie.length);
556: // write the version
557: out.writeShort(Version.version);
558: local_addr.writeTo(out);
559: out.flush(); // needed ?
560: updateLastAccessed();
561: } catch (Throwable t) {
562: if (log.isErrorEnabled())
563: log.error("exception is " + t);
564: }
565: }
566: }
568: void initCookie(byte[] c) {
569: if (c != null)
570: for (int i = 0; i < c.length; i++)
571: c[i] = 0;
572: }
574: boolean matchCookie(byte[] input) {
575: if (input == null || input.length < cookie.length)
576: return false;
577: for (int i = 0; i < cookie.length; i++)
578: if (cookie[i] != input[i])
579: return false;
580: return true;
581: }
583: String printCookie(byte[] c) {
584: if (c == null)
585: return "";
586: return new String(c);
587: }
589: public void run() {
590: byte[] buf = new byte[256]; // start with 256, increase as we go
591: int len = 0;
593: while (receiverThread != null
594: && receiverThread.equals(Thread.currentThread())
595: && is_running) {
596: try {
597: if (in == null) {
598: if (log.isErrorEnabled())
599: log.error("input stream is null !");
600: break;
601: }
602: len = in.readInt();
603: if (len > buf.length)
604: buf = new byte[len];
605: in.readFully(buf, 0, len);
606: updateLastAccessed();
607: receive(peer_addr, buf, 0, len); // calls receiver.receive(msg)
608: } catch (OutOfMemoryError mem_ex) {
609: if (log.isWarnEnabled())
610: log
611: .warn("dropped invalid message, closing connection");
612: break; // continue;
613: } catch (EOFException eof_ex) { // peer closed connection
614: if (log.isTraceEnabled())
615: log.trace("exception is " + eof_ex);
616: notifyConnectionClosed(peer_addr);
617: break;
618: } catch (IOException io_ex) {
619: if (log.isTraceEnabled())
620: log.trace("exception is " + io_ex);
621: notifyConnectionClosed(peer_addr);
622: break;
623: } catch (Throwable e) {
624: if (log.isWarnEnabled())
625: log.warn("exception is " + e);
626: }
627: }
628: if (log.isTraceEnabled())
629: log
630: .trace("ConnectionTable.Connection.Receiver terminated");
631: receiverThread = null;
632: closeSocket();
633: // remove(peer_addr);
634: }
636: public String toString() {
637: StringBuffer ret = new StringBuffer();
638: InetAddress local = null, remote = null;
639: String local_str, remote_str;
641: if (sock == null)
642: ret.append("<null socket>");
643: else {
644: //since the sock variable gets set to null we want to make
645: //make sure we make it through here without a nullpointer exception
646: Socket tmp_sock = sock;
647: local = tmp_sock.getLocalAddress();
648: remote = tmp_sock.getInetAddress();
649: local_str = local != null ? Util.shortName(local)
650: : "<null>";
651: remote_str = remote != null ? Util.shortName(remote)
652: : "<null>";
653: ret
654: .append('<'
655: + local_str
656: + ':'
657: + tmp_sock.getLocalPort()
658: + " --> "
659: + remote_str
660: + ':'
661: + tmp_sock.getPort()
662: + "> ("
663: + ((System.currentTimeMillis() - last_access) / 1000)
664: + " secs old)");
665: tmp_sock = null;
666: }
668: return ret.toString();
669: }
671: void closeSocket() {
672: Util.close(sock); // should actually close in/out (so we don't need to close them explicitly)
673: sock = null;
674: Util.close(out); // flushes data
675: // removed 4/22/2003 (request by Roland Kurmann)
676: // out=null;
677: Util.close(in);
678: }
680: class Sender implements Runnable {
681: Thread senderThread;
682: private boolean is_it_running = false;
684: void start() {
685: if (senderThread == null || !senderThread.isAlive()) {
686: senderThread = new Thread(thread_group, this ,
687: "ConnectionTable.Connection.Sender ["
688: + getSockAddress() + "]");
689: senderThread.setDaemon(true);
690: senderThread.start();
691: is_it_running = true;
692: if (log.isTraceEnabled())
693: log
694: .trace("ConnectionTable.Connection.Sender thread started");
695: }
696: }
698: void stop() {
699: is_it_running = false;
700: if (send_queue != null)
701: send_queue.close(false);
702: if (senderThread != null) {
703: Thread tmp = senderThread;
704: senderThread = null;
705: tmp.interrupt();
706: try {
707: tmp.join(MAX_JOIN_TIMEOUT);
708: } catch (InterruptedException e) {
709: }
710: if (tmp.isAlive()) {
711: if (log.isWarnEnabled())
712: log
713: .warn("sender thread was interrupted, but is still alive: "
714: + tmp);
715: }
716: }
717: }
719: boolean isRunning() {
720: return is_it_running && senderThread != null;
721: }
723: public void run() {
724: byte[] data;
725: while (senderThread != null
726: && senderThread.equals(Thread.currentThread())
727: && is_it_running) {
728: try {
729: data = (byte[]) send_queue.remove();
730: if (data == null)
731: continue;
732: _send(data, 0, data.length);
733: } catch (QueueClosedException e) {
734: break;
735: }
736: }
737: is_it_running = false;
738: if (log.isTraceEnabled())
739: log
740: .trace("ConnectionTable.Connection.Sender thread terminated");
741: }
742: }
744: }
746: class Reaper implements Runnable {
747: Thread t = null;
749: Reaper() {
750: ;
751: }
753: public void start() {
754: if (conns.size() == 0)
755: return;
756: if (t != null && !t.isAlive())
757: t = null;
758: if (t == null) {
759: //RKU 7.4.2003, put in threadgroup
760: t = new Thread(thread_group, this ,
761: "ConnectionTable.ReaperThread");
762: t.setDaemon(true); // will allow us to terminate if all remaining threads are daemons
763: t.start();
764: }
765: }
767: public void stop() {
768: Thread tmp = t;
769: if (t != null)
770: t = null;
771: if (tmp != null) {
772: tmp.interrupt(); // interrupts the sleep()
773: try {
774: tmp.join(MAX_JOIN_TIMEOUT);
775: } catch (InterruptedException e) {
776: }
777: if (tmp.isAlive()) {
778: if (log.isWarnEnabled())
779: log
780: .warn("reaper thread was interrupted, but is still alive: "
781: + tmp);
782: }
783: }
784: }
786: public boolean isRunning() {
787: return t != null;
788: }
790: public void run() {
791: Connection value;
792: Map.Entry entry;
793: long curr_time;
795: if (log.isInfoEnabled())
796: log
797: .info("connection reaper thread was started. Number of connections="
798: + conns.size()
799: + ", reaper_interval="
800: + reaper_interval
801: + ", conn_expire_time="
802: + conn_expire_time);
804: while (conns.size() > 0 && t != null
805: && t.equals(Thread.currentThread())) {
806: Util.sleep(reaper_interval);
807: if (t == null || !Thread.currentThread().equals(t))
808: break;
809: synchronized (conns) {
810: curr_time = System.currentTimeMillis();
811: for (Iterator it = conns.entrySet().iterator(); it
812: .hasNext();) {
813: entry = (Map.Entry) it.next();
814: value = (Connection) entry.getValue();
815: if (log.isInfoEnabled())
816: log
817: .info("connection is "
818: + ((curr_time - value.last_access) / 1000)
819: + " seconds old (curr-time="
820: + curr_time
821: + ", last_access="
822: + value.last_access + ')');
823: if (value.last_access + conn_expire_time < curr_time) {
824: if (log.isInfoEnabled())
825: log
826: .info("connection "
827: + value
828: + " has been idle for too long (conn_expire_time="
829: + conn_expire_time
830: + "), will be removed");
831: value.destroy();
832: it.remove();
833: }
834: }
835: }
836: }
837: if (log.isInfoEnabled())
838: log.info("reaper terminated");
839: t = null;
840: }
841: }
842: }