001: // $Id: FD.java,v 1.9.10.1 2007/04/27 08:03:55 belaban Exp $
002:
003: package org.jgroups.protocols.pbcast;
004:
005: import org.jgroups.Address;
006: import org.jgroups.Event;
007: import org.jgroups.Message;
008: import org.jgroups.View;
009: import org.jgroups.stack.Protocol;
010: import org.jgroups.util.Util;
011:
012: import java.util.Enumeration;
013: import java.util.Hashtable;
014: import java.util.Properties;
015: import java.util.Vector;
016:
017: /**
018: * Passive failure detection protocol. It assumes a pbcast protocol, which uses rounds of gossiping for
019: * reliable message delivery. Gossip protocols typically involve all the members sending gossips in regular
020: * intervals. This protocol therefore works as follows: it allocates a timestamp for each member and updates
021: * the timestamp whenever it receives a message from a sender P. Any type of message is accepted from P. For
022: * example, PBCAST regularly sends the following messages:
023: * <ul>
024: * <li>regular mcast message from P
025: * <li>regular ucast message from P
026: * <li>gossip from P
027: * <li>retransmit request from P
028: * <li>retransmit response from P
029: * </ul>
030: *
031: * @author Bela Ban
032: */
033: public class FD extends Protocol implements Runnable {
034: Address local_addr = null;
035: Thread checker = null; // checks timestamps for timeout, generates SUSPECT event
036: final Object checker_lock = new Object();
037: long timeout = 6000; // number of millisecs to wait for a member to be suspected
038: // (should be higher than the gossip_interval value in PBCAST
039: final Hashtable members = new Hashtable(11); // keys=Addresses (members), vals=Entries (timestamp)
040: final Vector suspected_mbrs = new Vector(11); // currently suspected members (dynamically updated)
041:
042: static class Entry {
043: long timestamp;
044:
045: Entry(long timestamp) {
046: this .timestamp = timestamp;
047: }
048:
049: public String toString() {
050: return Long.toString(timestamp);
051: }
052: }
053:
054: public String getName() {
055: return "FD";
056: }
057:
058: public boolean setProperties(Properties props) {
059: String str;
060:
061: super .setProperties(props);
062: str = props.getProperty("timeout");
063: if (str != null) {
064: timeout = Long.parseLong(str);
065: props.remove("timeout");
066: }
067:
068: if (props.size() > 0) {
069: log
070: .error("FD.setProperties(): the following properties are not recognized: "
071: + props);
072:
073: return false;
074: }
075: return true;
076: }
077:
078: public void stop() {
079: stopChecker();
080: }
081:
082: public void up(Event evt) {
083: Message msg;
084: Address sender;
085:
086: switch (evt.getType()) {
087:
088: case Event.SET_LOCAL_ADDRESS:
089: local_addr = (Address) evt.getArg();
090: break;
091:
092: case Event.MSG:
093: msg = (Message) evt.getArg();
094: sender = msg.getSrc();
095: updateSender(sender);
096: break;
097: }
098:
099: passUp(evt); // pass up to the layer above us
100: }
101:
102: public void down(Event evt) {
103: View v;
104: Vector mbrs;
105: Address mbr;
106:
107: switch (evt.getType()) {
108:
109: case Event.VIEW_CHANGE:
110: v = (View) evt.getArg();
111: mbrs = v.getMembers();
112: passDown(evt);
113: for (Enumeration e = members.keys(); e.hasMoreElements();) {
114: mbr = (Address) e.nextElement();
115: if (!mbrs.contains(mbr)) {
116: members.remove(mbr);
117: }
118: }
119: members.remove(local_addr);
120: if (members.size() > 0 && checker == null)
121: startChecker();
122: return;
123:
124: // generated by PBCAST, contains list of members a gossip has visited. we can safely reset their counter
125: case Event.HEARD_FROM:
126: updateSenders((Vector) evt.getArg());
127: return; // don't pass down
128: }
129:
130: passDown(evt);
131: }
132:
133: public void run() {
134: Address mbr;
135: long timestamp, diff;
136:
137: while (checker != null
138: && Thread.currentThread().equals(checker)
139: && members.size() > 0) {
140: for (Enumeration e = members.keys(); e.hasMoreElements();) {
141: mbr = (Address) e.nextElement();
142: timestamp = ((Entry) members.get(mbr)).timestamp;
143: diff = System.currentTimeMillis() - timestamp;
144: if (diff >= timeout) {
145: if (log.isInfoEnabled())
146: log.info("suspecting " + mbr);
147: passUp(new Event(Event.SUSPECT, mbr));
148: if (!suspected_mbrs.contains(mbr))
149: suspected_mbrs.addElement(mbr);
150: }
151: }
152: Util.sleep(timeout);
153: }
154: checker = null;
155: }
156:
157: void startChecker() {
158: synchronized (checker_lock) {
159: if (checker == null) {
160: checker = new Thread(this , "FD.CheckerThread");
161: checker.setDaemon(true);
162: checker.start();
163: }
164: }
165: }
166:
167: void stopChecker() {
168: Thread tmp;
169: synchronized (checker_lock) {
170: if (checker != null && checker.isAlive()) {
171: tmp = checker;
172: checker = null;
173: tmp.interrupt();
174: try {
175: tmp.join(timeout);
176: } catch (Exception ex) {
177: }
178: if (tmp.isAlive())
179: if (log.isWarnEnabled())
180: log
181: .warn("interrupted checker thread is still alive !");
182: }
183: checker = null;
184: }
185: }
186:
187: void updateSender(Address mbr) {
188: Entry entry;
189: long curr_time;
190:
191: if (mbr == null) {
192: if (log.isDebugEnabled())
193: log.debug("member " + mbr + " not found");
194: return;
195: }
196:
197: if (suspected_mbrs.size() > 0 && suspected_mbrs.contains(mbr)) {
198: passUp(new Event(Event.UNSUSPECT, mbr));
199: suspected_mbrs.remove(mbr);
200: }
201:
202: if (mbr.equals(local_addr))
203: return;
204: entry = (Entry) members.get(mbr);
205: curr_time = System.currentTimeMillis();
206: if (entry != null)
207: entry.timestamp = curr_time;
208: else
209: members.put(mbr, new Entry(curr_time));
210: }
211:
212: void updateSenders(Vector v) {
213: Address mbr;
214: if (v == null)
215: return;
216: for (int i = 0; i < v.size(); i++) {
217: mbr = (Address) v.elementAt(i);
218: updateSender(mbr);
219: }
220: }
221:
222: String printTimestamps() {
223: StringBuffer sb = new StringBuffer();
224: Address mbr;
225:
226: synchronized (members) {
227: for (Enumeration e = members.keys(); e.hasMoreElements();) {
228: mbr = (Address) e.nextElement();
229: sb
230: .append("\n"
231: + mbr
232: + ": "
233: + (System.currentTimeMillis() - ((Entry) members
234: .get(mbr)).timestamp));
235: }
236: }
237: return sb.toString();
238: }
239:
240: }
|