001: // $Id: Link.java,v 1.7.10.1 2007/04/27 08:03:56 belaban Exp $
002:
003: package org.jgroups.blocks;
004:
005: import org.jgroups.util.TimedWriter;
006: import org.jgroups.util.Util;
007: import org.apache.commons.logging.Log;
008: import org.apache.commons.logging.LogFactory;
009:
010: import java.io.*;
011: import java.net.InetAddress;
012: import java.net.ServerSocket;
013: import java.net.Socket;
014:
015: /**
016: * Implements a physical link between 2 parties (point-to-point connection). For incoming traffic,
017: * a server socket is created (bound to a given local address and port). The receiver thread does the
018: * following: it accepts a new connection from the server socket and (on the same thread) reads messages
019: * until the connection breaks. Then it goes back to accept(). This is done in 2 nested while-loops.
020: * The outgoing connection is established when started. If this fails, the link is marked as not established.
021: * This means that there is not outgoing socket.<br>
022: * A heartbeat will be exchanged between the 2 peers periodically as long as the connection is established
023: * (outgoing socket is okay). When the connection breaks, the heartbeat will stop and a connection establisher
024: * thread will be started. It periodically tries to re-establish connection to the peer. When this happens
025: * it will stop and the heartbeat thread will resume.<br>
026: * For details see Link.txt
027: * @author Bela Ban, June 2000
028: */
029: public class Link implements Runnable {
030: String local_addr = null, remote_addr = null;
031: InetAddress local = null, remote = null;
032: int local_port = 0, remote_port = 0;
033: ServerSocket srv_sock = null;
034: Socket outgoing = null; // traffic to peer
035: Socket incoming = null; // traffic from peer
036: DataOutputStream outstream = null;
037: DataInputStream instream = null;
038: boolean established = false; // (incoming and outgoing) connections to peer are up and running
039: boolean stop = false;
040: boolean trace = false;
041: Thread receiver_thread = null;
042: final long receiver_thread_join_timeout = 2000;
043: Receiver receiver = null;
044: static final int HB_PACKET = -99;
045: Heartbeat hb = null;
046: long timeout = 10000; // if no heartbeat was received for timeout msecs, assume peer is dead
047: long hb_interval = 3000; // send a heartbeat every n msecs
048: final Object outgoing_mutex = new Object(); // sync on creation and closing of outgoing socket
049: TimedWriter writer = null;
050: Log log = LogFactory.getLog(getClass());
051:
052: public interface Receiver {
053: void receive(byte[] msg);
054:
055: void linkDown(InetAddress local, int local_port,
056: InetAddress remote, int remote_port);
057:
058: void linkUp(InetAddress local, int local_port,
059: InetAddress remote, int remote_port);
060:
061: void missedHeartbeat(InetAddress local, int local_port,
062: InetAddress remote, int remote_port, int num_hbs);
063:
064: void receivedHeartbeatAgain(InetAddress local, int local_port,
065: InetAddress remote, int remote_port);
066: }
067:
068: public Link(String local_addr, int local_port, String remote_addr,
069: int remote_port) {
070: this .local_addr = local_addr;
071: this .local_port = local_port;
072: this .remote_addr = remote_addr;
073: this .remote_port = remote_port;
074: hb = new Heartbeat(timeout, hb_interval);
075: }
076:
077: public Link(String local_addr, int local_port, String remote_addr,
078: int remote_port, Receiver r) {
079: this (local_addr, local_port, remote_addr, remote_port);
080: setReceiver(r);
081: }
082:
083: public Link(String local_addr, int local_port, String remote_addr,
084: int remote_port, long timeout, long hb_interval, Receiver r) {
085: this .local_addr = local_addr;
086: this .local_port = local_port;
087: this .remote_addr = remote_addr;
088: this .remote_port = remote_port;
089: this .timeout = timeout;
090: this .hb_interval = hb_interval;
091: hb = new Heartbeat(timeout, hb_interval);
092: setReceiver(r);
093: }
094:
095: public void setTrace(boolean t) {
096: trace = t;
097: }
098:
099: public void setReceiver(Receiver r) {
100: receiver = r;
101: }
102:
103: public boolean established() {
104: return established;
105: }
106:
107: public InetAddress getLocalAddress() {
108: return local;
109: }
110:
111: public InetAddress getRemoteAddress() {
112: return remote;
113: }
114:
115: public int getLocalPort() {
116: return local_port;
117: }
118:
119: public int getRemotePort() {
120: return remote_port;
121: }
122:
123: public void start() throws Exception {
124: local = InetAddress.getByName(local_addr);
125: remote = InetAddress.getByName(remote_addr);
126: srv_sock = new ServerSocket(local_port, 1, local);
127: createOutgoingConnection(hb_interval); // connection to peer established, sets established=true
128: startReceiverThread(); // start reading from incoming socket
129: hb.start(); // starts heartbeat (conn establisher is not yet started)
130: }
131:
132: public void stop() {
133: stopReceiverThread();
134: hb.stop();
135: try {
136: srv_sock.close();
137: } catch (Exception e) {
138: }
139: established = false;
140: }
141:
142: /** Tries to send buffer across out socket. Tries to establish connection if not yet connected. */
143: public boolean send(byte[] buf) {
144: if (buf == null || buf.length == 0) {
145: if (log.isTraceEnabled())
146: System.err
147: .println("Link.send(): buffer is null or does not contain any data !");
148: return false;
149: }
150: if (!established) { // will be set by ConnectionEstablisher when connection has been set up
151: if (log.isTraceEnabled())
152: log
153: .error("Link.send(): connection not established, discarding message");
154: return false;
155: }
156:
157: try {
158: outstream.writeInt(buf.length); // synchronized anyway
159: outstream.write(buf); // synchronized anyway, we don't need to sync on outstream
160: return true;
161: } catch (Exception ex) { // either IOException or EOFException (subclass if IOException)
162: if (log.isTraceEnabled())
163: log.error("Link.send1(): sending failed; retrying");
164: return retry(buf);
165: }
166: }
167:
168: boolean retry(byte[] buf) {
169: closeOutgoingConnection(); // there something wrong, close connection
170: if (!createOutgoingConnection()) { // ... and re-open. if this fails,
171: closeOutgoingConnection(); // just abort and return failure to caller
172: return false;
173: } else {
174: try {
175: outstream.writeInt(buf.length);
176: outstream.write(buf);
177: return true;
178: } catch (Exception e) {
179: if (log.isTraceEnabled())
180: System.out
181: .println("Link.send2(): failed, closing connection");
182: closeOutgoingConnection();
183: return false;
184: }
185: }
186: }
187:
188: /** Receiver thread main loop. Accept a connection and then read on it until the connection
189: breaks. Only then is the next connection handled. The reason is that there is only supposed
190: to be 1 connection to this server socket at the same time.
191: */
192: public void run() {
193: int num_bytes;
194: byte[] buf;
195: InetAddress peer = null;
196: int peer_port = 0;
197:
198: while (!stop) {
199: try {
200: if (log.isTraceEnabled())
201: System.out.println("-- WAITING for ACCEPT");
202: incoming = srv_sock.accept();
203: instream = new DataInputStream(incoming
204: .getInputStream());
205: peer = incoming.getInetAddress();
206: peer_port = incoming.getPort();
207:
208: if (log.isTraceEnabled())
209: System.out.println("-- ACCEPT: incoming is "
210: + printSocket(incoming));
211:
212: /** This piece of code would only accept connections from the peer address defined above. */
213: if (remote.equals(incoming.getInetAddress())) {
214: if (log.isTraceEnabled())
215: System.out
216: .println("Link.run(): accepted connection from "
217: + peer + ':' + peer_port);
218: } else {
219: if (log.isTraceEnabled())
220: log
221: .error("Link.run(): rejected connection request from "
222: + peer
223: + ':'
224: + peer_port
225: + ". Address not specified as peer in link !");
226: closeIncomingConnection(); // only close incoming connection
227: continue;
228: }
229:
230: // now try to create outgoing connection
231: if (!established) {
232: createOutgoingConnection();
233: }
234:
235: while (!stop) {
236: try {
237: num_bytes = instream.readInt();
238: if (num_bytes == HB_PACKET) {
239: hb.receivedHeartbeat();
240: continue;
241: }
242:
243: buf = new byte[num_bytes];
244: instream.readFully(buf, 0, buf.length);
245: hb.receivedMessage(); // equivalent to heartbeat response (HB_PACKET)
246: if (receiver != null)
247: receiver.receive(buf);
248: } catch (Exception ex) { // IOException, EOFException, SocketException
249: closeIncomingConnection(); // close incoming when read() fails
250: break;
251: }
252: }
253: } catch (IOException io_ex) {
254: receiver_thread = null;
255: break;
256: } catch (Exception e) {
257: }
258: }
259: }
260:
261: public String toString() {
262: StringBuffer ret = new StringBuffer();
263: ret.append("Link <" + local_addr + ':' + local_port + " --> "
264: + remote_addr + ':' + remote_port + '>');
265: ret.append(established ? " (established)"
266: : " (not established)");
267: return ret.toString();
268: }
269:
270: public boolean equals(Object other) {
271: Link o;
272:
273: if (other == null)
274: return false;
275: if (!(other instanceof Link))
276: return false;
277: o = (Link) other;
278: if (local_addr.equals(o.local_addr)
279: && remote_addr.equals(o.remote_addr)
280: && local_port == o.local_port
281: && remote_port == o.remote_port)
282: return true;
283: else
284: return false;
285: }
286:
287: public int hashCode() {
288: return local_addr.hashCode() + remote_addr.hashCode()
289: + local_port + remote_port;
290: }
291:
292: void startReceiverThread() {
293: stopReceiverThread();
294: receiver_thread = new Thread(this , "Link.ReceiverThreadThread");
295: receiver_thread.setDaemon(true);
296: receiver_thread.start();
297: }
298:
299: void stopReceiverThread() {
300: if (receiver_thread != null && receiver_thread.isAlive()) {
301: stop = true;
302: closeIncomingConnection();
303: try {
304: receiver_thread.join(receiver_thread_join_timeout);
305: } catch (Exception e) {
306: }
307: stop = false;
308: }
309: receiver_thread = null;
310: }
311:
312: /** Tries to create an outgoing connection. If successful, the heartbeat is started. Does <em>not</em>
313: stop the connection establisher ! The reason is that this method is going to be called by the
314: connection establisher as well, therefore it would kill itself ! */
315: boolean createOutgoingConnection() {
316: synchronized (outgoing_mutex) { // serialize access with ConnectionEstablisher
317: if (established) {
318: return true;
319: }
320: try {
321: // create a socket to remote:remote_port, bind to local address (choose any local port);
322: outgoing = new Socket(remote, remote_port, local, 0); // 0 means choose any local port
323: outgoing.setSoLinger(true, 1); // 1 second // +++ ? needed ? it is off by default !
324: outstream = new DataOutputStream(outgoing
325: .getOutputStream());
326: if (receiver != null)
327: receiver.linkUp(local, local_port, remote,
328: remote_port);
329: established = true;
330:
331: if (log.isTraceEnabled())
332: System.out.println("-- CREATE: outgoing is "
333: + printSocket(outgoing));
334:
335: return true;
336: } catch (Exception e) {
337: established = false;
338: return false;
339: }
340: }
341: }
342:
343: /**
344: Tries to create an outgoing connection. If successful, the heartbeat is started. Does <em>not</em>
345: stop the connection establisher ! The reason is that this method is going to be called by the
346: connection establisher as well, therefore it would kill itself !
347: */
348: boolean createOutgoingConnection(long timeout) {
349: synchronized (outgoing_mutex) { // serialize access with ConnectionEstablisher
350: if (established) {
351: return true;
352: }
353: try {
354: if (writer == null)
355: writer = new TimedWriter();
356:
357: // create a socket to remote:remote_port, bind to local address (choose any local port);
358: // outgoing=new Socket(remote, remote_port, local, 0); // 0 means choose any local port
359: outgoing = writer.createSocket(local, remote,
360: remote_port, timeout);
361: outgoing.setSoLinger(true, 1); // 1 second // +++ ? needed ? it is off by default !
362: outstream = new DataOutputStream(outgoing
363: .getOutputStream());
364: if (receiver != null)
365: receiver.linkUp(local, local_port, remote,
366: remote_port);
367: established = true;
368: if (log.isTraceEnabled())
369: System.out.println("-- CREATE: outgoing is "
370: + printSocket(outgoing));
371: return true;
372: } catch (Exception e) {
373: established = false;
374: return false;
375: }
376: }
377: }
378:
379: /** Closes the outgoing connection */
380: void closeOutgoingConnection() {
381: synchronized (outgoing_mutex) {
382: if (!established) {
383: return;
384: }
385: if (outstream != null) {
386:
387: if (log.isTraceEnabled())
388: System.out.println("-- CLOSE: outgoing is "
389: + printSocket(outgoing));
390:
391: try {
392: outstream.close(); // flush data before socket is closed
393: } catch (Exception e) {
394: }
395: outstream = null;
396: }
397: if (outgoing != null) {
398: try {
399: outgoing.close();
400: } catch (Exception e) {
401: }
402: outgoing = null;
403: }
404: established = false;
405: if (receiver != null)
406: receiver.linkDown(local, local_port, remote,
407: remote_port);
408: }
409: }
410:
411: /** When the heartbeat thread detects that the peer 'hangs' (not detected by incoming.read()),
412: then it closes the outgoing *and* incoming socket. The latter needs to be done,
413: so that we can return to accept() and await a new client connection request. */
414: synchronized void closeIncomingConnection() {
415: if (instream != null) {
416:
417: if (log.isTraceEnabled())
418: System.out.println("-- CLOSE: incoming is "
419: + printSocket(incoming));
420:
421: try {
422: instream.close();
423: } catch (Exception e) {
424: }
425: instream = null;
426: }
427: if (incoming != null) {
428: try {
429: incoming.close();
430: } catch (Exception e) {
431: }
432: incoming = null;
433: }
434: }
435:
436: /** Close outgoing and incoming sockets. */
437: synchronized void closeConnections() {
438:
439: // 1. Closes the outgoing connection. Then the connection establisher is started. The heartbeat
440: // thread cannot be stopped in here, because this method is called by it !
441: closeOutgoingConnection();
442:
443: // 2. When the heartbeat thread detects that the peer 'hangs' (not detected by incoming.read()),
444: // then it closes the outgoing *and* incoming socket. The latter needs to be done,
445: // so that we can return to accept() and await a new client connection request.
446: closeIncomingConnection();
447: }
448:
449: String printSocket(Socket s) {
450: if (s == null)
451: return "<null>";
452: StringBuffer ret = new StringBuffer();
453: ret.append(s.getLocalAddress().getHostName());
454: ret.append(':');
455: ret.append(s.getLocalPort());
456: ret.append(" --> ");
457: ret.append(s.getInetAddress().getHostName());
458: ret.append(':');
459: ret.append(s.getPort());
460: return ret.toString();
461: }
462:
463: /**
464: Sends heartbeats across the link as long as we are connected (established=true). Uses a TimedWriter
465: for both sending and responding to heartbeats. The reason is that a write() might hang if the
466: peer has not closed its end, but the connection hangs (e.g. network partition, peer was stop-a'ed,
467: ctrl-z of peer or peer's NIC was unplumbed) and the writer buffer is filled to capacity. This way,
468: we don't hang sending timeouts.
469: */
470: class Heartbeat implements Runnable {
471: Thread thread = null;
472: long hb_timeout = 10000; // time to wait for heartbeats from peer, if not received -> boom !
473: long interval = 3000; // {send a heartbeat | try to create connection} every 3 secs
474: boolean stop_hb = false;
475: long last_hb = System.currentTimeMillis();
476: boolean missed_hb = false;
477: final TimedWriter timed_writer = new TimedWriter();
478:
479: public Heartbeat(long timeout, long hb_interval) {
480: this .hb_timeout = timeout;
481: this .interval = hb_interval;
482: }
483:
484: public synchronized void start() {
485: stop();
486: stop_hb = false;
487: missed_hb = false;
488: last_hb = System.currentTimeMillis();
489: thread = new Thread(this , "HeartbeatThread");
490: thread.setDaemon(true);
491: thread.start();
492: }
493:
494: public synchronized void interrupt() {
495: thread.interrupt();
496: }
497:
498: public synchronized void stop() {
499: if (thread != null && thread.isAlive()) {
500: stop_hb = true;
501: missed_hb = false;
502: thread.interrupt();
503: try {
504: thread.join(hb_timeout + 1000);
505: } catch (Exception e) {
506: }
507: thread = null;
508: }
509: }
510:
511: /**
512: When we receive a message from the peer, this means the peer is alive. Therefore we
513: update the time of the last heartbeat.
514: */
515: public void receivedMessage() {
516: last_hb = System.currentTimeMillis();
517: if (missed_hb) {
518: if (receiver != null)
519: receiver.receivedHeartbeatAgain(local, local_port,
520: remote, remote_port);
521: missed_hb = false;
522: }
523: }
524:
525: /** Callback, called by the Link whenever it encounters a heartbeat (HB_PACKET) */
526: public void receivedHeartbeat() {
527: last_hb = System.currentTimeMillis();
528: if (missed_hb) {
529: if (receiver != null)
530: receiver.receivedHeartbeatAgain(local, local_port,
531: remote, remote_port);
532: missed_hb = false;
533: }
534: }
535:
536: /**
537: Sends heartbeats when connection is established. Tries to establish connection when not established.
538: Switches between 'established' and 'not established' roles.
539: */
540: public void run() {
541: long diff = 0, curr_time = 0, num_missed_hbs = 0;
542:
543: if (log.isTraceEnabled())
544: System.out.println("heartbeat to " + remote + ':'
545: + remote_port + " started");
546: while (!stop_hb) {
547:
548: if (established) { // send heartbeats
549:
550: // 1. Send heartbeat (use timed write)
551: if (outstream != null) {
552: try {
553: timed_writer.write(outstream, HB_PACKET,
554: 1500);
555: Thread.sleep(interval);
556: } catch (Exception io_ex) { // IOException and TimedWriter.Timeout
557: closeOutgoingConnection(); // sets established to false
558: continue;
559: }
560: } else {
561: established = false;
562: continue;
563: }
564:
565: // 2. If time of last HB received > timeout --> close connection
566: curr_time = System.currentTimeMillis();
567: diff = curr_time - last_hb;
568:
569: if (curr_time - last_hb > interval) {
570: num_missed_hbs = (curr_time - last_hb)
571: / interval;
572: if (receiver != null)
573: receiver.missedHeartbeat(local, local_port,
574: remote, remote_port,
575: (int) num_missed_hbs);
576: missed_hb = true;
577: }
578:
579: if (diff >= hb_timeout) {
580: if (log.isTraceEnabled())
581: System.out
582: .println("###### Link.Heartbeat.run(): no heartbeat receveived for "
583: + diff
584: + " msecs. Closing connections. #####");
585: closeConnections(); // close both incoming *and* outgoing connections
586: }
587: } else { // try to establish connection
588: synchronized (outgoing_mutex) { // serialize access with createOutgoingConnection()
589: if (established) {
590: continue;
591: }
592: try {
593: outgoing = timed_writer.createSocket(local,
594: remote, remote_port, interval);
595: outstream = new DataOutputStream(outgoing
596: .getOutputStream());
597: if (receiver != null)
598: receiver.linkUp(local, local_port,
599: remote, remote_port);
600: established = true;
601: if (log.isTraceEnabled())
602: System.out.println("-- CREATE (CE): "
603: + printSocket(outgoing));
604: continue;
605: } catch (InterruptedException interrupted_ex) {
606: continue;
607: } catch (Exception ex) { // IOException, TimedWriter.Timeout
608: Util.sleep(interval); // returns when done or interrupted
609: }
610: }
611: }
612: }
613: if (log.isTraceEnabled())
614: System.out.println("heartbeat to " + remote + ':'
615: + remote_port + " stopped");
616: thread = null;
617: }
618: }
619:
620: private static class MyReceiver implements Link.Receiver {
621:
622: public void receive(byte[] msg) {
623: System.out.println("<-- " + new String(msg));
624: }
625:
626: public void linkDown(InetAddress l, int lp, InetAddress r,
627: int rp) {
628: System.out.println("** linkDown(): " + r + ':' + rp);
629: }
630:
631: public void linkUp(InetAddress l, int lp, InetAddress r, int rp) {
632: System.out.println("** linkUp(): " + r + ':' + rp);
633: }
634:
635: public void missedHeartbeat(InetAddress l, int lp,
636: InetAddress r, int rp, int num) {
637: System.out.println("** missedHeartbeat(): " + r + ':' + rp);
638: }
639:
640: public void receivedHeartbeatAgain(InetAddress l, int lp,
641: InetAddress r, int rp) {
642: System.out.println("** receivedHeartbeatAgain(): " + r
643: + ':' + rp);
644: }
645: }
646:
647: public static void main(String[] args) {
648: String local, remote;
649: int local_port, remote_port;
650:
651: if (args.length != 4) {
652: System.err
653: .println("\nLink <local host> <local port> <remote host> <remote port>\n");
654: return;
655: }
656: local = args[0];
657: remote = args[2];
658: local_port = Integer.parseInt(args[1]);
659: remote_port = Integer.parseInt(args[3]);
660:
661: Link l = new Link(local, local_port, remote, remote_port,
662: new MyReceiver());
663:
664: try {
665: l.start();
666: System.out.println(l);
667:
668: BufferedReader in = new BufferedReader(
669: new InputStreamReader(System.in));
670: while (true) {
671: System.out.print("> ");
672: System.out.flush();
673: String line = in.readLine();
674: l.send(line.getBytes());
675: }
676: } catch (Exception e) {
677: System.err.println(e);
678: }
679: }
680: }
|