001: // $Id: FD.java,v 2007/04/27 08:03:51 belaban Exp $
003: package org.jgroups.protocols;
005: import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
006: import org.jgroups.*;
007: import org.jgroups.stack.Protocol;
008: import org.jgroups.util.*;
010: import java.io.*;
011: import java.util.*;
012: import java.util.List;
014: /**
015: * Failure detection based on simple heartbeat protocol. Regularly polls members for
016: * liveness. Multicasts SUSPECT messages when a member is not reachable. The simple
017: * algorithms works as follows: the membership is known and ordered. Each HB protocol
018: * periodically sends an 'are-you-alive' message to its *neighbor*. A neighbor is the next in
019: * rank in the membership list, which is recomputed upon a view change. When a response hasn't
020: * been received for n milliseconds and m tries, the corresponding member is suspected (and
021: * eventually excluded if faulty).<p>
022: * FD starts when it detects (in a view change notification) that there are at least
023: * 2 members in the group. It stops running when the membership drops below 2.<p>
024: * When a message is received from the monitored neighbor member, it causes the pinger thread to
025: * 'skip' sending the next are-you-alive message. Thus, traffic is reduced.<p>
026: * When we receive a ping from a member that's not in the membership list, we shun it by sending it a
027: * NOT_MEMBER message. That member will then leave the group (and possibly rejoin). This is only done if
028: * <code>shun</code> is true.
029: * @author Bela Ban
030: * @version $Revision: $
031: */
032: public class FD extends Protocol {
033: Address ping_dest = null;
034: Address local_addr = null;
035: long timeout = 3000; // number of millisecs to wait for an are-you-alive msg
036: long last_ack = System.currentTimeMillis();
037: int num_tries = 0;
038: int max_tries = 2; // number of times to send a are-you-alive msg (tot time= max_tries*timeout)
039: final List members = new CopyOnWriteArrayList();
040: final Hashtable invalid_pingers = new Hashtable(7); // keys=Address, val=Integer (number of pings from suspected mbrs)
042: /** Members from which we select ping_dest. may be subset of {@link #members} */
043: final List pingable_mbrs = new CopyOnWriteArrayList();
045: boolean shun = true;
046: TimeScheduler timer = null;
047: private Monitor monitor = null; // task that performs the actual monitoring for failure detection
048: private final Object monitor_mutex = new Object();
049: protected int num_heartbeats = 0;
050: protected int num_suspect_events = 0;
052: /** Transmits SUSPECT message until view change or UNSUSPECT is received */
053: protected final Broadcaster bcast_task = new Broadcaster();
054: final static String name = "FD";
056: BoundedList suspect_history = new BoundedList(20);
058: public String getName() {
059: return name;
060: }
062: public String getLocalAddress() {
063: return local_addr != null ? local_addr.toString() : "null";
064: }
066: public String getMembers() {
067: return members != null ? members.toString() : "null";
068: }
070: public String getPingableMembers() {
071: return pingable_mbrs != null ? pingable_mbrs.toString()
072: : "null";
073: }
075: public String getPingDest() {
076: return ping_dest != null ? ping_dest.toString() : "null";
077: }
079: public int getNumberOfHeartbeatsSent() {
080: return num_heartbeats;
081: }
083: public int getNumSuspectEventsGenerated() {
084: return num_suspect_events;
085: }
087: public long getTimeout() {
088: return timeout;
089: }
091: public void setTimeout(long timeout) {
092: this .timeout = timeout;
093: }
095: public int getMaxTries() {
096: return max_tries;
097: }
099: public void setMaxTries(int max_tries) {
100: this .max_tries = max_tries;
101: }
103: public int getCurrentNumTries() {
104: return num_tries;
105: }
107: public boolean isShun() {
108: return shun;
109: }
111: public void setShun(boolean flag) {
112: this .shun = flag;
113: }
115: public String printSuspectHistory() {
116: StringBuffer sb = new StringBuffer();
117: for (Enumeration en = suspect_history.elements(); en
118: .hasMoreElements();) {
119: sb.append(new Date()).append(": ").append(en.nextElement())
120: .append("\n");
121: }
122: return sb.toString();
123: }
125: public boolean setProperties(Properties props) {
126: String str;
128: super .setProperties(props);
129: str = props.getProperty("timeout");
130: if (str != null) {
131: timeout = Long.parseLong(str);
132: props.remove("timeout");
133: }
135: str = props.getProperty("max_tries"); // before suspecting a member
136: if (str != null) {
137: max_tries = Integer.parseInt(str);
138: props.remove("max_tries");
139: }
141: str = props.getProperty("shun");
142: if (str != null) {
143: shun = Boolean.valueOf(str).booleanValue();
144: props.remove("shun");
145: }
147: if (!props.isEmpty()) {
148: log.error("the following properties are not recognized: "
149: + props);
150: return false;
151: }
152: return true;
153: }
155: public void resetStats() {
156: num_heartbeats = num_suspect_events = 0;
157: suspect_history.removeAll();
158: }
160: public void init() throws Exception {
161: if (stack != null && stack.timer != null)
162: timer = stack.timer;
163: else
164: throw new Exception(
165: "FD.init(): timer cannot be retrieved from protocol stack");
166: }
168: public void stop() {
169: stopMonitor();
170: }
172: private Object getPingDest(List mbrs) {
173: Object tmp, retval = null;
175: if (mbrs == null || mbrs.size() < 2 || local_addr == null)
176: return null;
177: for (int i = 0; i < mbrs.size(); i++) {
178: tmp = mbrs.get(i);
179: if (local_addr.equals(tmp)) {
180: if (i + 1 >= mbrs.size())
181: retval = mbrs.get(0);
182: else
183: retval = mbrs.get(i + 1);
184: break;
185: }
186: }
187: return retval;
188: }
190: private void startMonitor() {
191: synchronized (monitor_mutex) {
192: if (monitor != null && monitor.started == false) {
193: monitor = null;
194: }
195: if (monitor == null) {
196: monitor = createMonitor();
197: last_ack = System.currentTimeMillis(); // start from scratch
198: timer.add(monitor, true); // fixed-rate scheduling
199: num_tries = 0;
200: }
201: }
202: }
204: private void stopMonitor() {
205: synchronized (monitor_mutex) {
206: if (monitor != null) {
207: monitor.stop();
208: monitor = null;
209: }
210: }
211: }
213: protected Monitor createMonitor() {
214: return new Monitor();
215: }
217: public void up(Event evt) {
218: Message msg;
219: FdHeader hdr;
220: Object sender, tmphdr;
222: switch (evt.getType()) {
224: case Event.SET_LOCAL_ADDRESS:
225: local_addr = (Address) evt.getArg();
226: break;
228: case Event.MSG:
229: msg = (Message) evt.getArg();
230: tmphdr = msg.getHeader(name);
231: if (tmphdr == null || !(tmphdr instanceof FdHeader)) {
232: if (ping_dest != null
233: && (sender = msg.getSrc()) != null) {
234: if (ping_dest.equals(sender)) {
235: last_ack = System.currentTimeMillis();
236: if (log.isTraceEnabled())
237: log.trace("received msg from " + sender
238: + " (counts as ack)");
239: num_tries = 0;
240: }
241: }
242: break; // message did not originate from FD layer, just pass up
243: }
245: hdr = (FdHeader) msg.removeHeader(name);
246: switch (hdr.type) {
247: case FdHeader.HEARTBEAT: // heartbeat request; send heartbeat ack
248: Address hb_sender = msg.getSrc();
249: if (log.isTraceEnabled())
250: log.trace("received are-you-alive from "
251: + hb_sender + ", sending response");
252: sendHeartbeatResponse(hb_sender);
254: // 2. Shun the sender of a HEARTBEAT message if that sender is not a member. This will cause
255: // the sender to leave the group (and possibly rejoin it later)
256: if (shun)
257: shunInvalidHeartbeatSender(hb_sender);
258: break; // don't pass up !
260: case FdHeader.HEARTBEAT_ACK: // heartbeat ack
261: if (ping_dest != null && ping_dest.equals(hdr.from)) {
262: last_ack = System.currentTimeMillis();
263: num_tries = 0;
264: if (log.isDebugEnabled())
265: log.debug("received ack from " + hdr.from);
266: } else {
267: /* modified by Luís Palma Nunes Mendes on 11 Aug 2006
268: * By not doing this check, if we keep receiving HEARTBEAT_ACK messages from
269: * other members than ping_dest, Monitor Thread would be restarted every time,
270: * taking down the timeouts with it. This inhibits ping_dest Failure Detection.
271: */
272: synchronized (this ) {
273: Address previewNextPingDest = (Address) getPingDest(pingable_mbrs);
274: /* We are only interested to stop or restart the monitor thread iff the current target ping_dest is going
275: change */
276: if (log.isDebugEnabled())
277: log
278: .debug("Recevied Ack. is invalid (was from: "
279: + hdr.from + "), ");
280: if ((previewNextPingDest != null
281: && ping_dest != null && !previewNextPingDest
282: .equals(ping_dest))
283: || (previewNextPingDest != null && ping_dest == null)
284: || (previewNextPingDest == null && ping_dest != null)) {
285: stop();
286: ping_dest = previewNextPingDest;
287: if (log.isDebugEnabled())
288: log
289: .debug("changing to a new destination: "
290: + ping_dest);
291: if (ping_dest != null) {
292: try {
293: startMonitor();
294: } catch (Exception ex) {
295: if (log.isWarnEnabled())
296: log
297: .warn("exception when calling startMonitor(): "
298: + ex);
299: }
300: }
301: } else if (ping_dest == null) {
302: if (log.isTraceEnabled())
303: log
304: .trace("and ping_dest is null, stop Monitor");
305: stop();
306: } else {
307: if (log.isTraceEnabled())
308: log
309: .trace("but we must keep pinging same destination");
310: }
311: }
312: }
313: break;
315: case FdHeader.SUSPECT:
316: if (hdr.mbrs != null) {
317: if (log.isTraceEnabled())
318: log.trace("[SUSPECT] suspect hdr is " + hdr);
319: for (int i = 0; i < hdr.mbrs.size(); i++) {
320: Address m = (Address) hdr.mbrs.elementAt(i);
321: if (local_addr != null && m.equals(local_addr)) {
322: if (log.isWarnEnabled())
323: log
324: .warn("I was suspected by "
325: + msg.getSrc()
326: + "; ignoring the SUSPECT "
327: + "message and sending back a HEARTBEAT_ACK");
328: sendHeartbeatResponse(msg.getSrc());
329: continue;
330: } else {
331: pingable_mbrs.remove(m);
332: ping_dest = (Address) getPingDest(pingable_mbrs);
333: }
334: passUp(new Event(Event.SUSPECT, m));
335: passDown(new Event(Event.SUSPECT, m));
336: }
337: }
338: break;
340: case FdHeader.NOT_MEMBER:
341: if (shun) {
342: if (log.isDebugEnabled())
343: log
344: .debug("[NOT_MEMBER] I'm being shunned; exiting");
345: passUp(new Event(Event.EXIT));
346: }
347: break;
348: }
349: return;
350: }
351: passUp(evt); // pass up to the layer above us
352: }
354: public void down(Event evt) {
355: View v;
357: switch (evt.getType()) {
358: case Event.VIEW_CHANGE:
359: passDown(evt);
360: stop();
361: synchronized (this ) {
362: v = (View) evt.getArg();
363: members.clear();
364: members.addAll(v.getMembers());
365: bcast_task.adjustSuspectedMembers(members);
366: pingable_mbrs.clear();
367: pingable_mbrs.addAll(members);
368: ping_dest = (Address) getPingDest(pingable_mbrs);
369: if (ping_dest != null) {
370: try {
371: startMonitor();
372: } catch (Exception ex) {
373: if (log.isWarnEnabled())
374: log
375: .warn("exception when calling startMonitor(): "
376: + ex);
377: }
378: }
379: }
380: break;
382: case Event.UNSUSPECT:
383: unsuspect((Address) evt.getArg());
384: passDown(evt);
385: break;
387: default:
388: passDown(evt);
389: break;
390: }
391: }
393: private void sendHeartbeatResponse(Address dest) {
394: Message hb_ack = new Message(dest, null, null);
395: FdHeader tmp_hdr = new FdHeader(FdHeader.HEARTBEAT_ACK);
396: tmp_hdr.from = local_addr;
397: hb_ack.putHeader(name, tmp_hdr);
398: passDown(new Event(Event.MSG, hb_ack));
399: }
401: private void unsuspect(Address mbr) {
402: bcast_task.removeSuspectedMember(mbr);
403: pingable_mbrs.clear();
404: pingable_mbrs.addAll(members);
405: pingable_mbrs.removeAll(bcast_task.getSuspectedMembers());
406: ping_dest = (Address) getPingDest(pingable_mbrs);
407: }
409: /**
410: * If sender is not a member, send a NOT_MEMBER to sender (after n pings received)
411: */
412: private void shunInvalidHeartbeatSender(Address hb_sender) {
413: int num_pings = 0;
414: Message shun_msg;
416: if (hb_sender != null && members != null
417: && !members.contains(hb_sender)) {
418: if (invalid_pingers.containsKey(hb_sender)) {
419: num_pings = ((Integer) invalid_pingers.get(hb_sender))
420: .intValue();
421: if (num_pings >= max_tries) {
422: if (log.isDebugEnabled())
423: log.debug(hb_sender + " is not in " + members
424: + " ! Shunning it");
425: shun_msg = new Message(hb_sender, null, null);
426: shun_msg.putHeader(name, new FdHeader(
427: FdHeader.NOT_MEMBER));
428: passDown(new Event(Event.MSG, shun_msg));
429: invalid_pingers.remove(hb_sender);
430: } else {
431: num_pings++;
432: invalid_pingers.put(hb_sender, new Integer(
433: num_pings));
434: }
435: } else {
436: num_pings++;
437: invalid_pingers.put(hb_sender, new Integer(num_pings));
438: }
439: }
440: }
442: public static class FdHeader extends Header implements Streamable {
443: public static final byte HEARTBEAT = 0;
444: public static final byte HEARTBEAT_ACK = 1;
445: public static final byte SUSPECT = 2;
446: public static final byte NOT_MEMBER = 3; // received as response by pinged mbr when we are not a member
448: byte type = HEARTBEAT;
449: Vector mbrs = null;
450: Address from = null; // member who detected that suspected_mbr has failed
452: public FdHeader() {
453: } // used for externalization
455: public FdHeader(byte type) {
456: this .type = type;
457: }
459: public FdHeader(byte type, Vector mbrs, Address from) {
460: this (type);
461: this .mbrs = mbrs;
462: this .from = from;
463: }
465: public String toString() {
466: switch (type) {
467: case HEARTBEAT:
468: return "[FD: heartbeat]";
469: case HEARTBEAT_ACK:
470: return "[FD: heartbeat ack]";
471: case SUSPECT:
472: return "[FD: SUSPECT (suspected_mbrs=" + mbrs
473: + ", from=" + from + ")]";
474: case NOT_MEMBER:
475: return "[FD: NOT_MEMBER]";
476: default:
477: return "[FD: unknown type (" + type + ")]";
478: }
479: }
481: public void writeExternal(ObjectOutput out) throws IOException {
482: out.writeByte(type);
483: if (mbrs == null)
484: out.writeBoolean(false);
485: else {
486: out.writeBoolean(true);
487: out.writeInt(mbrs.size());
488: for (Iterator it = mbrs.iterator(); it.hasNext();) {
489: Address addr = (Address) it.next();
490: Marshaller.write(addr, out);
491: }
492: }
493: Marshaller.write(from, out);
494: }
496: public void readExternal(ObjectInput in) throws IOException,
497: ClassNotFoundException {
498: type = in.readByte();
499: boolean mbrs_not_null = in.readBoolean();
500: if (mbrs_not_null) {
501: int len = in.readInt();
502: mbrs = new Vector(11);
503: for (int i = 0; i < len; i++) {
504: Address addr = (Address) Marshaller.read(in);
505: mbrs.add(addr);
506: }
507: }
508: from = (Address) Marshaller.read(in);
509: }
511: public long size() {
512: int retval = Global.BYTE_SIZE; // type
513: retval += Util.size(mbrs);
514: retval += Util.size(from);
515: return retval;
516: }
518: public void writeTo(DataOutputStream out) throws IOException {
519: out.writeByte(type);
520: Util.writeAddresses(mbrs, out);
521: Util.writeAddress(from, out);
522: }
524: public void readFrom(DataInputStream in) throws IOException,
525: IllegalAccessException, InstantiationException {
526: type = in.readByte();
527: mbrs = (Vector) Util.readAddresses(in, Vector.class);
528: from = Util.readAddress(in);
529: }
531: }
533: protected class Monitor implements TimeScheduler.Task {
534: boolean started = true;
536: public void stop() {
537: started = false;
538: }
540: public boolean cancelled() {
541: return !started;
542: }
544: public long nextInterval() {
545: return timeout;
546: }
548: public void run() {
549: Message hb_req;
550: long not_heard_from; // time in msecs we haven't heard from ping_dest
552: if (ping_dest == null) {
553: if (log.isWarnEnabled())
554: log.warn("ping_dest is null: members=" + members
555: + ", pingable_mbrs=" + pingable_mbrs
556: + ", local_addr=" + local_addr);
557: return;
558: }
560: // 1. send heartbeat request
561: hb_req = new Message(ping_dest, null, null);
562: hb_req.putHeader(name, new FdHeader(FdHeader.HEARTBEAT)); // send heartbeat request
563: if (log.isDebugEnabled())
564: log.debug("sending are-you-alive msg to " + ping_dest
565: + " (own address=" + local_addr + ')');
566: passDown(new Event(Event.MSG, hb_req));
567: num_heartbeats++;
569: // 2. If the time of the last heartbeat is > timeout and max_tries heartbeat messages have not been
570: // received, then broadcast a SUSPECT message. Will be handled by coordinator, which may install
571: // a new view
572: not_heard_from = System.currentTimeMillis() - last_ack;
573: // quick & dirty fix: increase timeout by 500msecs to allow for latency (bela June 27 2003)
574: if (not_heard_from > timeout + 500) { // no heartbeat ack for more than timeout msecs
575: if (num_tries >= max_tries) {
576: if (log.isDebugEnabled())
577: log.debug("[" + local_addr
578: + "]: received no heartbeat ack from "
579: + ping_dest + " for " + (num_tries + 1)
580: + " times ("
581: + ((num_tries + 1) * timeout)
582: + " milliseconds), suspecting it");
583: // broadcast a SUSPECT message to all members - loop until
584: // unsuspect or view change is received
585: bcast_task.addSuspectedMember(ping_dest);
586: num_tries = 0;
587: if (stats) {
588: num_suspect_events++;
589: suspect_history.add(ping_dest);
590: }
591: } else {
592: if (log.isDebugEnabled())
593: log.debug("heartbeat missing from " + ping_dest
594: + " (number=" + num_tries + ')');
595: num_tries++;
596: }
597: }
598: }
600: public String toString() {
601: return Boolean.toString(started);
602: }
604: }
606: /**
607: * Task that periodically broadcasts a list of suspected members to the group. Goal is not to lose
608: * a SUSPECT message: since these are bcast unreliably, they might get dropped. The BroadcastTask makes
609: * sure they are retransmitted until a view has been received which doesn't contain the suspected members
610: * any longer. Then the task terminates.
611: */
612: protected final class Broadcaster {
613: final Vector suspected_mbrs = new Vector(7);
614: BroadcastTask task = null;
615: private final Object bcast_mutex = new Object();
617: Vector getSuspectedMembers() {
618: return suspected_mbrs;
619: }
621: /**
622: * Starts a new task, or - if already running - adds the argument to the running task.
623: * @param suspect
624: */
625: private void startBroadcastTask(Address suspect) {
626: synchronized (bcast_mutex) {
627: if (task == null || task.cancelled()) {
628: task = new BroadcastTask((Vector) suspected_mbrs
629: .clone());
630: task.addSuspectedMember(suspect);
631: task.run(); // run immediately the first time
632: timer.add(task); // then every timeout milliseconds, until cancelled
633: if (log.isTraceEnabled())
634: log.trace("BroadcastTask started");
635: } else {
636: task.addSuspectedMember(suspect);
637: }
638: }
639: }
641: private void stopBroadcastTask() {
642: synchronized (bcast_mutex) {
643: if (task != null) {
644: task.stop();
645: task = null;
646: }
647: }
648: }
650: /** Adds a suspected member. Starts the task if not yet running */
651: protected void addSuspectedMember(Address mbr) {
652: if (mbr == null)
653: return;
654: if (!members.contains(mbr))
655: return;
656: boolean added = false;
657: synchronized (suspected_mbrs) {
658: if (!suspected_mbrs.contains(mbr)) {
659: suspected_mbrs.addElement(mbr);
660: added = true;
661: }
662: }
663: if (added)
664: startBroadcastTask(mbr);
665: }
667: void removeSuspectedMember(Address suspected_mbr) {
668: if (suspected_mbr == null)
669: return;
670: if (log.isDebugEnabled())
671: log.debug("member is " + suspected_mbr);
672: synchronized (suspected_mbrs) {
673: suspected_mbrs.removeElement(suspected_mbr);
674: if (suspected_mbrs.isEmpty())
675: stopBroadcastTask();
676: }
677: }
679: void removeAll() {
680: synchronized (suspected_mbrs) {
681: suspected_mbrs.removeAllElements();
682: stopBroadcastTask();
683: }
684: }
686: /** Removes all elements from suspected_mbrs that are <em>not</em> in the new membership */
687: void adjustSuspectedMembers(List new_mbrship) {
688: if (new_mbrship == null || new_mbrship.isEmpty())
689: return;
690: StringBuffer sb = new StringBuffer();
691: synchronized (suspected_mbrs) {
692: sb.append("suspected_mbrs: ").append(suspected_mbrs);
693: suspected_mbrs.retainAll(new_mbrship);
694: if (suspected_mbrs.isEmpty())
695: stopBroadcastTask();
696: sb.append(", after adjustment: ")
697: .append(suspected_mbrs);
698: log.debug(sb.toString());
699: }
700: }
701: }
703: protected final class BroadcastTask implements TimeScheduler.Task {
704: boolean cancelled = false;
705: private final Vector suspected_members = new Vector();
707: BroadcastTask(Vector suspected_members) {
708: this .suspected_members.addAll(suspected_members);
709: }
711: public void stop() {
712: cancelled = true;
713: suspected_members.clear();
714: if (log.isTraceEnabled())
715: log.trace("BroadcastTask stopped");
716: }
718: public boolean cancelled() {
719: return cancelled;
720: }
722: public long nextInterval() {
723: return FD.this .timeout;
724: }
726: public void run() {
727: Message suspect_msg;
728: FD.FdHeader hdr;
730: synchronized (suspected_members) {
731: if (suspected_members.isEmpty()) {
732: stop();
733: if (log.isDebugEnabled())
734: log.debug("task done (no suspected members)");
735: return;
736: }
738: hdr = new FdHeader(FdHeader.SUSPECT);
739: hdr.mbrs = (Vector) suspected_members.clone();
740: hdr.from = local_addr;
741: }
742: suspect_msg = new Message(); // mcast SUSPECT to all members
743: suspect_msg.putHeader(name, hdr);
744: if (log.isDebugEnabled())
745: log
746: .debug("broadcasting SUSPECT message [suspected_mbrs="
747: + suspected_members + "] to group");
748: passDown(new Event(Event.MSG, suspect_msg));
749: if (log.isDebugEnabled())
750: log.debug("task done");
751: }
753: public void addSuspectedMember(Address suspect) {
754: if (suspect != null && !suspected_members.contains(suspect)) {
755: suspected_members.add(suspect);
756: }
757: }
758: }
760: }