001: // $Id: FD_SIMPLE.java,v 1.11.6.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.stack.Protocol;
007: import org.jgroups.util.Promise;
008: import org.jgroups.util.TimeScheduler;
009: import org.jgroups.util.Streamable;
010:
011: import java.io.*;
012: import java.util.HashMap;
013: import java.util.Iterator;
014: import java.util.Properties;
015: import java.util.Vector;
016:
017: /**
018: * Simple failure detection protocol. Periodically sends a are-you-alive message to a randomly chosen member
019: * (excluding itself) and waits for a response. If a response has not been received within timeout msecs, a counter
020: * associated with that member will be incremented. If the counter exceeds max_missed_hbs, that member will be
021: * suspected. When a message or a heartbeat are received, the counter is reset to 0.
022: *
023: * @author Bela Ban Aug 2002
024: * @version $Revision: 1.11.6.1 $
025: */
026: public class FD_SIMPLE extends Protocol {
027: Address local_addr = null;
028: TimeScheduler timer = null;
029: HeartbeatTask task = null;
030: long interval = 3000; // interval in msecs between are-you-alive messages
031: long timeout = 3000; // time (in msecs) to wait for a response to are-you-alive
032: final Vector members = new Vector();
033: final HashMap counters = new HashMap(); // keys=Addresses, vals=Integer (count)
034: int max_missed_hbs = 5; // max number of missed responses until a member is suspected
035: static final String name = "FD_SIMPLE";
036:
037: public String getName() {
038: return "FD_SIMPLE";
039: }
040:
041: public void init() throws Exception {
042: timer = stack.timer;
043: }
044:
045: public boolean setProperties(Properties props) {
046: String str;
047:
048: super .setProperties(props);
049: str = props.getProperty("timeout");
050: if (str != null) {
051: timeout = Long.parseLong(str);
052: props.remove("timeout");
053: }
054:
055: str = props.getProperty("interval");
056: if (str != null) {
057: interval = Long.parseLong(str);
058: props.remove("interval");
059: }
060:
061: str = props.getProperty("max_missed_hbs");
062: if (str != null) {
063: max_missed_hbs = Integer.parseInt(str);
064: props.remove("max_missed_hbs");
065: }
066:
067: if (props.size() > 0) {
068: log
069: .error("FD_SIMPLE.setProperties(): the following properties are not recognized: "
070: + props);
071:
072: return false;
073: }
074: return true;
075: }
076:
077: public void stop() {
078: if (task != null) {
079: task.stop();
080: task = null;
081: }
082: }
083:
084: public void up(Event evt) {
085: Message msg, rsp;
086: Address sender;
087: FdHeader hdr = null;
088: boolean counter_reset = false;
089:
090: switch (evt.getType()) {
091:
092: case Event.SET_LOCAL_ADDRESS:
093: local_addr = (Address) evt.getArg();
094: break;
095:
096: case Event.MSG:
097: msg = (Message) evt.getArg();
098: sender = msg.getSrc();
099: resetCounter(sender);
100: counter_reset = true;
101:
102: hdr = (FdHeader) msg.removeHeader(name);
103: if (hdr == null)
104: break;
105:
106: switch (hdr.type) {
107: case FdHeader.ARE_YOU_ALIVE: // are-you-alive request, send i-am-alive response
108: rsp = new Message(sender);
109: rsp.putHeader(name, new FdHeader(FdHeader.I_AM_ALIVE));
110: passDown(new Event(Event.MSG, rsp));
111: return; // don't pass up further
112:
113: case FdHeader.I_AM_ALIVE:
114: if (log.isInfoEnabled())
115: log.info("received I_AM_ALIVE response from "
116: + sender);
117: if (task != null)
118: task.receivedHeartbeatResponse(sender);
119: if (!counter_reset)
120: resetCounter(sender);
121: return;
122:
123: default:
124: if (log.isWarnEnabled())
125: log
126: .warn("FdHeader type " + hdr.type
127: + " not known");
128: return;
129: }
130: }
131:
132: passUp(evt); // pass up to the layer above us
133: }
134:
135: public void down(Event evt) {
136: View new_view;
137: Address key;
138:
139: switch (evt.getType()) {
140:
141: // Start heartbeat thread when we have more than 1 member; stop it when membership drops below 2
142: case Event.VIEW_CHANGE:
143: new_view = (View) evt.getArg();
144: members.clear();
145: members.addAll(new_view.getMembers());
146: if (new_view.size() > 1) {
147: if (task == null) {
148: task = new HeartbeatTask();
149: if (log.isInfoEnabled())
150: log.info("starting heartbeat task");
151: timer.add(task, true);
152: }
153: } else {
154: if (task != null) {
155: if (log.isInfoEnabled())
156: log.info("stopping heartbeat task");
157: task.stop(); // will be removed from TimeScheduler
158: task = null;
159: }
160: }
161:
162: // remove all keys from 'counters' which are not in this new view
163: for (Iterator it = counters.keySet().iterator(); it
164: .hasNext();) {
165: key = (Address) it.next();
166: if (!members.contains(key)) {
167: if (log.isInfoEnabled())
168: log.info("removing " + key + " from counters");
169: it.remove();
170: }
171: }
172: }
173:
174: passDown(evt);
175: }
176:
177: /* -------------------------------- Private Methods ------------------------------- */
178:
179: Address getHeartbeatDest() {
180: Address retval = null;
181: int r, size;
182: Vector members_copy;
183:
184: if (members == null || members.size() < 2 || local_addr == null)
185: return null;
186: members_copy = (Vector) members.clone();
187: members_copy.removeElement(local_addr); // don't select myself as heartbeat destination
188: size = members_copy.size();
189: r = ((int) (Math.random() * (size + 1))) % size;
190: retval = (Address) members_copy.elementAt(r);
191: return retval;
192: }
193:
194: int incrementCounter(Address mbr) {
195: Integer cnt;
196: int ret = 0;
197:
198: if (mbr == null)
199: return ret;
200: synchronized (counters) {
201: cnt = (Integer) counters.get(mbr);
202: if (cnt == null) {
203: cnt = new Integer(0);
204: counters.put(mbr, cnt);
205: } else {
206: ret = cnt.intValue() + 1;
207: counters.put(mbr, new Integer(ret));
208: }
209: return ret;
210: }
211: }
212:
213: void resetCounter(Address mbr) {
214: if (mbr == null)
215: return;
216:
217: synchronized (counters) {
218: counters.put(mbr, new Integer(0));
219: }
220: }
221:
222: String printCounters() {
223: StringBuffer sb = new StringBuffer();
224: Address key;
225:
226: for (Iterator it = counters.keySet().iterator(); it.hasNext();) {
227: key = (Address) it.next();
228: sb.append(key).append(": ").append(counters.get(key))
229: .append('\n');
230: }
231: return sb.toString();
232: }
233:
234: /* ----------------------------- End of Private Methods --------------------------- */
235:
236: public static class FdHeader extends Header implements Streamable {
237: static final byte ARE_YOU_ALIVE = 1; // sent periodically to a random member
238: static final byte I_AM_ALIVE = 2; // response to above message
239:
240: byte type = ARE_YOU_ALIVE;
241:
242: public FdHeader() {
243: } // used for externalization
244:
245: FdHeader(byte type) {
246: this .type = type;
247: }
248:
249: public String toString() {
250: switch (type) {
251: case ARE_YOU_ALIVE:
252: return "[FD_SIMPLE: ARE_YOU_ALIVE]";
253: case I_AM_ALIVE:
254: return "[FD_SIMPLE: I_AM_ALIVE]";
255: default:
256: return "[FD_SIMPLE: unknown type (" + type + ")]";
257: }
258: }
259:
260: public void writeExternal(ObjectOutput out) throws IOException {
261: out.writeByte(type);
262: }
263:
264: public void readExternal(ObjectInput in) throws IOException,
265: ClassNotFoundException {
266: type = in.readByte();
267: }
268:
269: public long size() {
270: return Global.BYTE_SIZE;
271: }
272:
273: public void writeTo(DataOutputStream out) throws IOException {
274: out.writeByte(type);
275: }
276:
277: public void readFrom(DataInputStream in) throws IOException,
278: IllegalAccessException, InstantiationException {
279: type = in.readByte();
280: }
281:
282: }
283:
284: class HeartbeatTask implements TimeScheduler.Task {
285: boolean stopped = false;
286: final Promise promise = new Promise();
287: Address dest = null;
288:
289: void stop() {
290: stopped = true;
291: }
292:
293: public boolean cancelled() {
294: return stopped;
295: }
296:
297: public long nextInterval() {
298: return interval;
299: }
300:
301: public void receivedHeartbeatResponse(Address from) {
302: if (from != null && dest != null && from.equals(dest))
303: promise.setResult(from);
304: }
305:
306: public void run() {
307: Message msg;
308: int num_missed_hbs = 0;
309:
310: dest = getHeartbeatDest();
311: if (dest == null) {
312: if (log.isWarnEnabled())
313: log
314: .warn("heartbeat destination was null, will not send ARE_YOU_ALIVE message");
315: return;
316: }
317:
318: if (log.isInfoEnabled())
319: log.info("sending ARE_YOU_ALIVE message to " + dest
320: + ", counters are\n" + printCounters());
321:
322: promise.reset();
323: msg = new Message(dest);
324: msg.putHeader(name, new FdHeader(FdHeader.ARE_YOU_ALIVE));
325: passDown(new Event(Event.MSG, msg));
326:
327: promise.getResult(timeout);
328: num_missed_hbs = incrementCounter(dest);
329: if (num_missed_hbs >= max_missed_hbs) {
330:
331: if (log.isInfoEnabled())
332: log.info("missed " + num_missed_hbs + " from "
333: + dest + ", suspecting member");
334: passUp(new Event(Event.SUSPECT, dest));
335: }
336: }
337: }
338:
339: }
|