001: // $Id: FD_PROB.java,v 1.10.6.1 2007/04/27 08:03:52 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.stack.Protocol;
007: import org.jgroups.util.Util;
008: import org.jgroups.util.Streamable;
009:
010: import java.io.*;
011: import java.util.Enumeration;
012: import java.util.Hashtable;
013: import java.util.Properties;
014: import java.util.Vector;
015:
016: /**
017: * Probabilistic failure detection protocol based on "A Gossip-Style Failure Detection Service"
018: * by Renesse, Minsky and Hayden.<p>
019: * Each member maintains a list of all other members: for each member P, 2 data are maintained, a heartbeat
020: * counter and the time of the last increment of the counter. Each member periodically sends its own heartbeat
021: * counter list to a randomly chosen member Q. Q updates its own heartbeat counter list and the associated
022: * time (if counter was incremented). Each member periodically increments its own counter. If, when sending
023: * its heartbeat counter list, a member P detects that another member Q's heartbeat counter was not incremented
024: * for timeout seconds, Q will be suspected.<p>
025: * This protocol can be used both with a PBCAST *and* regular stacks.
026: * @author Bela Ban 1999
027: * @version $Revision: 1.10.6.1 $
028: */
029: public class FD_PROB extends Protocol implements Runnable {
030: Address local_addr = null;
031: Thread hb = null;
032: long timeout = 3000; // before a member with a non updated timestamp is suspected
033: long gossip_interval = 1000;
034: Vector members = null;
035: final Hashtable counters = new Hashtable(); // keys=Addresses, vals=FdEntries
036: final Hashtable invalid_pingers = new Hashtable(); // keys=Address, vals=Integer (number of pings from suspected mbrs)
037: int max_tries = 2; // number of times to send a are-you-alive msg (tot time= max_tries*timeout)
038:
039: public String getName() {
040: return "FD_PROB";
041: }
042:
043: public boolean setProperties(Properties props) {
044: String str;
045:
046: super .setProperties(props);
047: str = props.getProperty("timeout");
048: if (str != null) {
049: timeout = Long.parseLong(str);
050: props.remove("timeout");
051: }
052:
053: str = props.getProperty("gossip_interval");
054: if (str != null) {
055: gossip_interval = Long.parseLong(str);
056: props.remove("gossip_interval");
057: }
058:
059: str = props.getProperty("max_tries");
060: if (str != null) {
061: max_tries = Integer.parseInt(str);
062: props.remove("max_tries");
063: }
064:
065: if (props.size() > 0) {
066: log
067: .error("FD_PROB.setProperties(): the following properties are not recognized: "
068: + props);
069:
070: return false;
071: }
072: return true;
073: }
074:
075: public void start() throws Exception {
076: if (hb == null) {
077: hb = new Thread(this , "FD_PROB.HeartbeatThread");
078: hb.setDaemon(true);
079: hb.start();
080: }
081: }
082:
083: public void stop() {
084: Thread tmp = null;
085: if (hb != null && hb.isAlive()) {
086: tmp = hb;
087: hb = null;
088: tmp.interrupt();
089: try {
090: tmp.join(timeout);
091: } catch (Exception ex) {
092: }
093: }
094: hb = null;
095: }
096:
097: public void up(Event evt) {
098: Message msg;
099: FdHeader hdr = null;
100: Object obj;
101:
102: switch (evt.getType()) {
103:
104: case Event.SET_LOCAL_ADDRESS:
105: local_addr = (Address) evt.getArg();
106: break;
107:
108: case Event.MSG:
109: msg = (Message) evt.getArg();
110: obj = msg.getHeader(getName());
111: if (obj == null || !(obj instanceof FdHeader)) {
112: updateCounter(msg.getSrc()); // got a msg from this guy, reset its time (we heard from it now)
113: break;
114: }
115:
116: hdr = (FdHeader) msg.removeHeader(getName());
117: switch (hdr.type) {
118: case FdHeader.HEARTBEAT: // heartbeat request; send heartbeat ack
119: if (checkPingerValidity(msg.getSrc()) == false) // false == sender of heartbeat is not a member
120: return;
121:
122: // 2. Update my own array of counters
123:
124: if (log.isInfoEnabled())
125: log.info("<-- HEARTBEAT from " + msg.getSrc());
126: updateCounters(hdr);
127: return; // don't pass up !
128: case FdHeader.NOT_MEMBER:
129: if (log.isWarnEnabled())
130: log.warn("NOT_MEMBER: I'm being shunned; exiting");
131: passUp(new Event(Event.EXIT));
132: return;
133: default:
134: if (log.isWarnEnabled())
135: log
136: .warn("FdHeader type " + hdr.type
137: + " not known");
138: return;
139: }
140: }
141: passUp(evt); // pass up to the layer above us
142: }
143:
144: public void down(Event evt) {
145: int num_mbrs;
146: Vector excluded_mbrs;
147: FdEntry entry;
148: Address mbr;
149:
150: switch (evt.getType()) {
151:
152: // Start heartbeat thread when we have more than 1 member; stop it when membership drops below 2
153: case Event.VIEW_CHANGE:
154: passDown(evt);
155: synchronized (this ) {
156: View v = (View) evt.getArg();
157:
158: // mark excluded members
159: excluded_mbrs = computeExcludedMembers(members, v
160: .getMembers());
161: if (excluded_mbrs != null && excluded_mbrs.size() > 0) {
162: for (int i = 0; i < excluded_mbrs.size(); i++) {
163: mbr = (Address) excluded_mbrs.elementAt(i);
164: entry = (FdEntry) counters.get(mbr);
165: if (entry != null)
166: entry.setExcluded(true);
167: }
168: }
169:
170: members = v != null ? v.getMembers() : null;
171: if (members != null) {
172: num_mbrs = members.size();
173: if (num_mbrs >= 2) {
174: if (hb == null) {
175: try {
176: start();
177: } catch (Exception ex) {
178: if (log.isWarnEnabled())
179: log
180: .warn("exception when calling start(): "
181: + ex);
182: }
183: }
184: } else
185: stop();
186: }
187: }
188: break;
189:
190: default:
191: passDown(evt);
192: break;
193: }
194: }
195:
196: /**
197: Loop while more than 1 member available. Choose a member randomly (not myself !) and send a
198: heartbeat. Wait for ack. If ack not received withing timeout, mcast SUSPECT message.
199: */
200: public void run() {
201: Message hb_msg;
202: FdHeader hdr;
203: Address hb_dest, key;
204: FdEntry entry;
205: long curr_time, diff;
206:
207: if (log.isInfoEnabled())
208: log.info("heartbeat thread was started");
209:
210: while (hb != null && members.size() > 1) {
211:
212: // 1. Get a random member P (excluding ourself)
213: hb_dest = getHeartbeatDest();
214: if (hb_dest == null) {
215: if (log.isWarnEnabled())
216: log.warn("hb_dest is null");
217: Util.sleep(gossip_interval);
218: continue;
219: }
220:
221: // 2. Increment own counter
222: entry = (FdEntry) counters.get(local_addr);
223: if (entry == null) {
224: entry = new FdEntry();
225: counters.put(local_addr, entry);
226: }
227: entry.incrementCounter();
228:
229: // 3. Send heartbeat to P
230: hdr = createHeader();
231: if (hdr == null)
232: if (log.isWarnEnabled())
233: log
234: .warn("header could not be created. Heartbeat will not be sent");
235: else {
236: hb_msg = new Message(hb_dest, null, null);
237: hb_msg.putHeader(getName(), hdr);
238:
239: if (log.isInfoEnabled())
240: log.info("--> HEARTBEAT to " + hb_dest);
241: passDown(new Event(Event.MSG, hb_msg));
242: }
243:
244: if (log.isInfoEnabled())
245: log.info("own counters are " + printCounters());
246:
247: // 4. Suspect members from which we haven't heard for timeout msecs
248: for (Enumeration e = counters.keys(); e.hasMoreElements();) {
249: curr_time = System.currentTimeMillis();
250: key = (Address) e.nextElement();
251: entry = (FdEntry) counters.get(key);
252:
253: if (entry.getTimestamp() > 0
254: && (diff = curr_time - entry.getTimestamp()) >= timeout) {
255: if (entry.excluded()) {
256: if (diff >= 2 * timeout) { // remove members marked as 'excluded' after 2*timeout msecs
257: counters.remove(key);
258: if (log.isInfoEnabled())
259: log.info("removed " + key);
260: }
261: } else {
262: if (log.isInfoEnabled())
263: log.info("suspecting " + key);
264: passUp(new Event(Event.SUSPECT, key));
265: }
266: }
267: }
268: Util.sleep(gossip_interval);
269: } // end while
270:
271: if (log.isInfoEnabled())
272: log.info("heartbeat thread was stopped");
273: }
274:
275: /* -------------------------------- Private Methods ------------------------------- */
276:
277: Address getHeartbeatDest() {
278: Address retval = null;
279: int r, size;
280: Vector members_copy;
281:
282: if (members == null || members.size() < 2 || local_addr == null)
283: return null;
284: members_copy = (Vector) members.clone();
285: members_copy.removeElement(local_addr); // don't select myself as heartbeat destination
286: size = members_copy.size();
287: r = ((int) (Math.random() * (size + 1))) % size;
288: retval = (Address) members_copy.elementAt(r);
289: return retval;
290: }
291:
292: /** Create a header containing the counters for all members */
293: FdHeader createHeader() {
294: int num_mbrs = counters.size(), index = 0;
295: FdHeader ret = null;
296: Address key;
297: FdEntry entry;
298:
299: if (num_mbrs <= 0)
300: return null;
301: ret = new FdHeader(FdHeader.HEARTBEAT, num_mbrs);
302: for (Enumeration e = counters.keys(); e.hasMoreElements();) {
303: key = (Address) e.nextElement();
304: entry = (FdEntry) counters.get(key);
305: if (entry.excluded())
306: continue;
307: if (index >= ret.members.length) {
308: if (log.isWarnEnabled())
309: log.warn("index " + index + " is out of bounds ("
310: + ret.members.length + ')');
311: break;
312: }
313: ret.members[index] = key;
314: ret.counters[index] = entry.getCounter();
315: index++;
316: }
317: return ret;
318: }
319:
320: /** Set my own counters values to max(own-counter, counter) */
321: void updateCounters(FdHeader hdr) {
322: Address key;
323: FdEntry entry;
324:
325: if (hdr == null || hdr.members == null || hdr.counters == null) {
326: if (log.isWarnEnabled())
327: log.warn("hdr is null or contains no counters");
328: return;
329: }
330:
331: for (int i = 0; i < hdr.members.length; i++) {
332: key = hdr.members[i];
333: if (key == null)
334: continue;
335: entry = (FdEntry) counters.get(key);
336: if (entry == null) {
337: entry = new FdEntry(hdr.counters[i]);
338: counters.put(key, entry);
339: continue;
340: }
341:
342: if (entry.excluded())
343: continue;
344:
345: // only update counter (and adjust timestamp) if new counter is greater then old one
346: entry.setCounter(Math.max(entry.getCounter(),
347: hdr.counters[i]));
348: }
349: }
350:
351: /** Resets the counter for mbr */
352: void updateCounter(Address mbr) {
353: FdEntry entry;
354:
355: if (mbr == null)
356: return;
357: entry = (FdEntry) counters.get(mbr);
358: if (entry != null)
359: entry.setTimestamp();
360: }
361:
362: String printCounters() {
363: StringBuffer sb = new StringBuffer();
364: Address mbr;
365: FdEntry entry;
366:
367: for (Enumeration e = counters.keys(); e.hasMoreElements();) {
368: mbr = (Address) e.nextElement();
369: entry = (FdEntry) counters.get(mbr);
370: sb.append("\n" + mbr + ": " + entry._toString());
371: }
372: return sb.toString();
373: }
374:
375: static Vector computeExcludedMembers(Vector old_mbrship,
376: Vector new_mbrship) {
377: Vector ret = new Vector();
378: if (old_mbrship == null || new_mbrship == null)
379: return ret;
380: for (int i = 0; i < old_mbrship.size(); i++)
381: if (!new_mbrship.contains(old_mbrship.elementAt(i)))
382: ret.addElement(old_mbrship.elementAt(i));
383: return ret;
384: }
385:
386: /** If hb_sender is not a member, send a SUSPECT to sender (after n pings received) */
387: boolean checkPingerValidity(Object hb_sender) {
388: int num_pings = 0;
389: Message shun_msg;
390: Header hdr;
391:
392: if (hb_sender != null && members != null
393: && !members.contains(hb_sender)) {
394: if (invalid_pingers.containsKey(hb_sender)) {
395: num_pings = ((Integer) invalid_pingers.get(hb_sender))
396: .intValue();
397: if (num_pings >= max_tries) {
398: if (log.isErrorEnabled())
399: log.error("sender " + hb_sender
400: + " is not member in " + members
401: + " ! Telling it to leave group");
402: shun_msg = new Message((Address) hb_sender, null,
403: null);
404: hdr = new FdHeader(FdHeader.NOT_MEMBER);
405: shun_msg.putHeader(getName(), hdr);
406: passDown(new Event(Event.MSG, shun_msg));
407: invalid_pingers.remove(hb_sender);
408: } else {
409: num_pings++;
410: invalid_pingers.put(hb_sender, new Integer(
411: num_pings));
412: }
413: } else {
414: num_pings++;
415: invalid_pingers.put(hb_sender, new Integer(num_pings));
416: }
417: return false;
418: } else
419: return true;
420: }
421:
422: /* ----------------------------- End of Private Methods --------------------------- */
423:
424: public static class FdHeader extends Header implements Streamable {
425: static final byte HEARTBEAT = 1; // sent periodically to a random member
426: static final byte NOT_MEMBER = 2; // sent to the sender, when it is not a member anymore (shunned)
427:
428: byte type = HEARTBEAT;
429: Address[] members = null;
430: long[] counters = null; // correlates with 'members' (same indexes)
431:
432: public FdHeader() {
433: } // used for externalization
434:
435: FdHeader(byte type) {
436: this .type = type;
437: }
438:
439: FdHeader(byte type, int num_elements) {
440: this (type);
441: members = new Address[num_elements];
442: counters = new long[num_elements];
443: }
444:
445: public String toString() {
446: switch (type) {
447: case HEARTBEAT:
448: return "[FD_PROB: HEARTBEAT]";
449: case NOT_MEMBER:
450: return "[FD_PROB: NOT_MEMBER]";
451: default:
452: return "[FD_PROB: unknown type (" + type + ")]";
453: }
454: }
455:
456: public String printDetails() {
457: StringBuffer sb = new StringBuffer();
458: Address mbr;
459:
460: if (members != null && counters != null)
461: for (int i = 0; i < members.length; i++) {
462: mbr = members[i];
463: if (mbr == null)
464: sb.append("\n<null>");
465: else
466: sb.append("\n" + mbr);
467: sb.append(": " + counters[i]);
468: }
469: return sb.toString();
470: }
471:
472: public void writeExternal(ObjectOutput out) throws IOException {
473: out.writeByte(type);
474:
475: if (members != null) {
476: out.writeInt(members.length);
477: out.writeObject(members);
478: } else
479: out.writeInt(0);
480:
481: if (counters != null) {
482: out.writeInt(counters.length);
483: for (int i = 0; i < counters.length; i++)
484: out.writeLong(counters[i]);
485: } else
486: out.writeInt(0);
487: }
488:
489: public void readExternal(ObjectInput in) throws IOException,
490: ClassNotFoundException {
491: int num;
492: type = in.readByte();
493:
494: num = in.readInt();
495: if (num == 0)
496: members = null;
497: else {
498: members = (Address[]) in.readObject();
499: }
500:
501: num = in.readInt();
502: if (num == 0)
503: counters = null;
504: else {
505: counters = new long[num];
506: for (int i = 0; i < counters.length; i++)
507: counters[i] = in.readLong();
508: }
509: }
510:
511: public long size() {
512: long retval = Global.BYTE_SIZE;
513: retval += Global.SHORT_SIZE; // number of members
514: if (members != null && members.length > 0) {
515: for (int i = 0; i < members.length; i++) {
516: Address member = members[i];
517: retval += Util.size(member);
518: }
519: }
520:
521: retval += Global.SHORT_SIZE; // counters
522: if (counters != null && counters.length > 0) {
523: retval += counters.length * Global.LONG_SIZE;
524: }
525:
526: return retval;
527: }
528:
529: public void writeTo(DataOutputStream out) throws IOException {
530: out.writeByte(type);
531: if (members == null || members.length == 0)
532: out.writeShort(0);
533: else {
534: out.writeShort(members.length);
535: for (int i = 0; i < members.length; i++) {
536: Address member = members[i];
537: Util.writeAddress(member, out);
538: }
539: }
540:
541: if (counters == null || counters.length == 0) {
542: out.writeShort(0);
543: } else {
544: out.writeShort(counters.length);
545: for (int i = 0; i < counters.length; i++) {
546: long counter = counters[i];
547: out.writeLong(counter);
548: }
549: }
550: }
551:
552: public void readFrom(DataInputStream in) throws IOException,
553: IllegalAccessException, InstantiationException {
554: type = in.readByte();
555: short len = in.readShort();
556: if (len > 0) {
557: members = new Address[len];
558: for (int i = 0; i < len; i++) {
559: members[i] = Util.readAddress(in);
560: }
561: }
562:
563: len = in.readShort();
564: if (len > 0) {
565: counters = new long[len];
566: for (int i = 0; i < counters.length; i++) {
567: counters[i] = in.readLong();
568: }
569: }
570: }
571:
572: }
573:
574: private static class FdEntry {
575: private long counter = 0; // heartbeat counter
576: private long timestamp = 0; // last time the counter was incremented
577: private boolean excluded = false; // set to true if member was excluded from group
578:
579: FdEntry() {
580:
581: }
582:
583: FdEntry(long counter) {
584: this .counter = counter;
585: timestamp = System.currentTimeMillis();
586: }
587:
588: long getCounter() {
589: return counter;
590: }
591:
592: long getTimestamp() {
593: return timestamp;
594: }
595:
596: boolean excluded() {
597: return excluded;
598: }
599:
600: synchronized void setCounter(long new_counter) {
601: if (new_counter > counter) { // only set time if counter was incremented
602: timestamp = System.currentTimeMillis();
603: counter = new_counter;
604: }
605: }
606:
607: synchronized void incrementCounter() {
608: counter++;
609: timestamp = System.currentTimeMillis();
610: }
611:
612: synchronized void setTimestamp() {
613: timestamp = System.currentTimeMillis();
614: }
615:
616: synchronized void setExcluded(boolean flag) {
617: excluded = flag;
618: }
619:
620: public String toString() {
621: return "counter=" + counter + ", timestamp=" + timestamp
622: + ", excluded=" + excluded;
623: }
624:
625: public String _toString() {
626: return "counter=" + counter + ", age="
627: + (System.currentTimeMillis() - timestamp)
628: + ", excluded=" + excluded;
629: }
630: }
631:
632: }
|