001: package org.jgroups.protocols;
002:
003: import org.jgroups.*;
004: import org.jgroups.stack.Protocol;
005: import org.jgroups.stack.IpAddress;
006: import org.jgroups.util.Streamable;
007: import org.jgroups.util.Util;
008:
009: import java.io.*;
010: import java.util.*;
011: import java.net.InetAddress;
012: import java.net.UnknownHostException;
013: import java.net.NetworkInterface;
014: import java.lang.reflect.Method;
015:
016: /**
017: * Catches SUSPECT events traveling up the stack. Verifies that the suspected member is really dead. If yes,
018: * passes SUSPECT event up the stack, otherwise discards it. Has to be placed somewhere above the FD layer and
019: * below the GMS layer (receiver of the SUSPECT event). Note that SUSPECT events may be reordered by this protocol.
020: * @author Bela Ban
021: * @version $Id: VERIFY_SUSPECT.java,v 1.21.2.1 2007/04/27 08:03:52 belaban Exp $
022: */
023: public class VERIFY_SUSPECT extends Protocol implements Runnable {
024: private Address local_addr = null;
025: private long timeout = 2000; // number of millisecs to wait for an are-you-dead msg
026: private int num_msgs = 1; // number of are-you-alive msgs and i-am-not-dead responses (for redundancy)
027: final Hashtable suspects = new Hashtable(); // keys=Addresses, vals=time in mcses since added
028: private Thread timer = null;
029: private boolean use_icmp = false; // use InetAddress.isReachable() to double-check (rather than an are-you-alive msg)
030: private InetAddress bind_addr; // interface for ICMP pings
031: /** network interface to be used to send the ICMP packets */
032: private NetworkInterface intf = null;
033: private Method is_reacheable;
034: static final String name = "VERIFY_SUSPECT";
035:
036: public String getName() {
037: return name;
038: }
039:
040: public boolean setProperties(Properties props) {
041: super .setProperties(props);
042:
043: boolean ignore_systemprops = Util
044: .isBindAddressPropertyIgnored();
045: String str = Util.getProperty(new String[] { Global.BIND_ADDR,
046: Global.BIND_ADDR_OLD }, props, "bind_addr",
047: ignore_systemprops, null);
048: if (str != null) {
049: try {
050: bind_addr = InetAddress.getByName(str);
051: } catch (UnknownHostException unknown) {
052: if (log.isFatalEnabled())
053: log
054: .fatal("(bind_addr): host " + str
055: + " not known");
056: return false;
057: }
058: props.remove("bind_addr");
059: }
060:
061: str = props.getProperty("timeout");
062: if (str != null) {
063: timeout = Long.parseLong(str);
064: props.remove("timeout");
065: }
066:
067: str = props.getProperty("num_msgs");
068: if (str != null) {
069: num_msgs = Integer.parseInt(str);
070: if (num_msgs <= 0) {
071: if (log.isWarnEnabled())
072: log.warn("num_msgs is invalid (" + num_msgs
073: + "): setting it to 1");
074: num_msgs = 1;
075: }
076: props.remove("num_msgs");
077: }
078:
079: str = props.getProperty("use_icmp");
080: if (str != null) {
081: use_icmp = Boolean.valueOf(str).booleanValue();
082: props.remove("use_icmp");
083:
084: try { // only test for the (JDK 5 method) if use_icmp is true
085: is_reacheable = InetAddress.class.getMethod(
086: "isReachable", new Class[] {
087: NetworkInterface.class, int.class,
088: int.class });
089: } catch (NoSuchMethodException e) {
090: // log.error("didn't find InetAddress.isReachable() method - requires JDK 5 or higher");
091: Error error = new NoSuchMethodError(
092: "didn't find InetAddress.isReachable() method - requires JDK 5 or higher");
093: error.initCause(e);
094: throw error;
095: }
096: }
097:
098: if (props.size() > 0) {
099: log.error("the following properties are not recognized: "
100: + props);
101: return false;
102: }
103: return true;
104: }
105:
106: public void up(Event evt) {
107: Address suspected_mbr;
108: Message msg, rsp;
109: Object obj;
110: VerifyHeader hdr;
111:
112: switch (evt.getType()) {
113:
114: case Event.SET_LOCAL_ADDRESS:
115: local_addr = (Address) evt.getArg();
116: break;
117:
118: case Event.SUSPECT: // it all starts here ...
119: suspected_mbr = (Address) evt.getArg();
120: if (suspected_mbr == null) {
121: if (log.isErrorEnabled())
122: log.error("suspected member is null");
123: return;
124: }
125:
126: if (local_addr != null && local_addr.equals(suspected_mbr)) {
127: if (log.isTraceEnabled())
128: log
129: .trace("I was suspected; ignoring SUSPECT message");
130: return;
131: }
132:
133: if (!use_icmp)
134: verifySuspect(suspected_mbr);
135: else
136: verifySuspectWithICMP(suspected_mbr);
137: return; // don't pass up; we will decide later (after verification) whether to pass it up
138:
139: case Event.MSG:
140: msg = (Message) evt.getArg();
141: obj = msg.getHeader(name);
142: if (obj == null || !(obj instanceof VerifyHeader))
143: break;
144: hdr = (VerifyHeader) msg.removeHeader(name);
145: switch (hdr.type) {
146: case VerifyHeader.ARE_YOU_DEAD:
147: if (hdr.from == null) {
148: if (log.isErrorEnabled())
149: log.error("ARE_YOU_DEAD: hdr.from is null");
150: } else {
151: for (int i = 0; i < num_msgs; i++) {
152: rsp = new Message(hdr.from, null, null);
153: rsp
154: .putHeader(name, new VerifyHeader(
155: VerifyHeader.I_AM_NOT_DEAD,
156: local_addr));
157: passDown(new Event(Event.MSG, rsp));
158: }
159: }
160: return;
161: case VerifyHeader.I_AM_NOT_DEAD:
162: if (hdr.from == null) {
163: if (log.isErrorEnabled())
164: log.error("I_AM_NOT_DEAD: hdr.from is null");
165: return;
166: }
167: unsuspect(hdr.from);
168: return;
169: }
170: return;
171:
172: case Event.CONFIG:
173: if (bind_addr == null) {
174: Map config = (Map) evt.getArg();
175: bind_addr = (InetAddress) config.get("bind_addr");
176: }
177: }
178: passUp(evt);
179: }
180:
181: /**
182: * Will be started when a suspect is added to the suspects hashtable. Continually iterates over the
183: * entries and removes entries whose time have elapsed. For each removed entry, a SUSPECT event is passed
184: * up the stack (because elapsed time means verification of member's liveness failed). Computes the shortest
185: * time to wait (min of all timeouts) and waits(time) msecs. Will be woken up when entry is removed (in case
186: * of successful verification of that member's liveness). Terminates when no entry remains in the hashtable.
187: */
188: public void run() {
189: Address mbr;
190: long val, curr_time, diff;
191:
192: while (timer != null && Thread.currentThread().equals(timer)
193: && suspects.size() > 0) {
194: diff = 0;
195:
196: List tmp = null;
197: synchronized (suspects) {
198: for (Enumeration e = suspects.keys(); e
199: .hasMoreElements();) {
200: mbr = (Address) e.nextElement();
201: val = ((Long) suspects.get(mbr)).longValue();
202: curr_time = System.currentTimeMillis();
203: diff = curr_time - val;
204: if (diff >= timeout) { // haven't been unsuspected, pass up SUSPECT
205: if (log.isTraceEnabled())
206: log
207: .trace("diff="
208: + diff
209: + ", mbr "
210: + mbr
211: + " is dead (passing up SUSPECT event)");
212: if (tmp == null)
213: tmp = new LinkedList();
214: tmp.add(mbr);
215: suspects.remove(mbr);
216: continue;
217: }
218: diff = Math.max(diff, timeout - diff);
219: }
220: }
221: if (tmp != null && tmp.size() > 0) {
222: for (Iterator it = tmp.iterator(); it.hasNext();)
223: passUp(new Event(Event.SUSPECT, it.next()));
224: }
225:
226: if (diff > 0)
227: Util.sleep(diff);
228: }
229: timer = null;
230: }
231:
232: /* --------------------------------- Private Methods ----------------------------------- */
233:
234: /**
235: * Sends ARE_YOU_DEAD message to suspected_mbr, wait for return or timeout
236: */
237: void verifySuspect(Address mbr) {
238: Message msg;
239: if (mbr == null)
240: return;
241:
242: synchronized (suspects) {
243: if (suspects.containsKey(mbr))
244: return;
245: suspects.put(mbr, new Long(System.currentTimeMillis()));
246: }
247: // moved out of synchronized statement (bela): http://jira.jboss.com/jira/browse/JGRP-302
248: if (log.isTraceEnabled())
249: log.trace("verifying that " + mbr + " is dead");
250: for (int i = 0; i < num_msgs; i++) {
251: msg = new Message(mbr, null, null);
252: msg.putHeader(name, new VerifyHeader(
253: VerifyHeader.ARE_YOU_DEAD, local_addr));
254: passDown(new Event(Event.MSG, msg));
255: }
256: if (timer == null)
257: startTimer();
258: }
259:
260: void verifySuspectWithICMP(Address suspected_mbr) {
261: InetAddress host = suspected_mbr instanceof IpAddress ? ((IpAddress) suspected_mbr)
262: .getIpAddress()
263: : null;
264: if (host == null)
265: throw new IllegalArgumentException(
266: "suspected_mbr is not of type IpAddress - FD_ICMP only works with these");
267: try {
268: if (log.isTraceEnabled())
269: log.trace("pinging host " + suspected_mbr
270: + " using interface " + intf);
271: long start = System.currentTimeMillis(), stop;
272: Boolean rc = (Boolean) is_reacheable.invoke(host,
273: new Object[] { intf, new Integer(0), // 0 == use the default TTL
274: new Integer((int) timeout) });
275: stop = System.currentTimeMillis();
276: if (rc.booleanValue()) { // success
277: if (log.isTraceEnabled())
278: log.trace("successfully received response from "
279: + host + " (after " + (stop - start)
280: + "ms)");
281: } else { // failure
282: if (log.isTraceEnabled())
283: log.debug("could not ping " + suspected_mbr
284: + " after " + (stop - start) + "ms; "
285: + "passing up SUSPECT event");
286: suspects.remove(suspected_mbr);
287: passUp(new Event(Event.SUSPECT, suspected_mbr));
288: }
289: } catch (Exception ex) {
290: if (log.isErrorEnabled())
291: log.error("failed pinging " + suspected_mbr, ex);
292: }
293: }
294:
295: void unsuspect(Address mbr) {
296: if (mbr == null)
297: return;
298: boolean removed = false;
299: synchronized (suspects) {
300: if (suspects.containsKey(mbr)) {
301: if (log.isTraceEnabled())
302: log.trace("member " + mbr + " is not dead !");
303: suspects.remove(mbr);
304: removed = true;
305: }
306: }
307: if (removed) {
308: passDown(new Event(Event.UNSUSPECT, mbr));
309: passUp(new Event(Event.UNSUSPECT, mbr));
310: }
311: }
312:
313: void startTimer() {
314: if (timer == null || !timer.isAlive()) {
315: timer = new Thread(this , "VERIFY_SUSPECT.TimerThread");
316: timer.setDaemon(true);
317: timer.start();
318: }
319: }
320:
321: public void init() throws Exception {
322: super .init();
323: if (bind_addr != null)
324: intf = NetworkInterface.getByInetAddress(bind_addr);
325: }
326:
327: public void stop() {
328: Thread tmp;
329: if (timer != null && timer.isAlive()) {
330: tmp = timer;
331: timer = null;
332: tmp.interrupt();
333: tmp = null;
334: }
335: timer = null;
336: }
337:
338: /* ----------------------------- End of Private Methods -------------------------------- */
339:
340: public static class VerifyHeader extends Header implements
341: Streamable {
342: static final short ARE_YOU_DEAD = 1; // 'from' is sender of verify msg
343: static final short I_AM_NOT_DEAD = 2; // 'from' is suspected member
344:
345: short type = ARE_YOU_DEAD;
346: Address from = null; // member who wants to verify that suspected_mbr is dead
347:
348: public VerifyHeader() {
349: } // used for externalization
350:
351: VerifyHeader(short type) {
352: this .type = type;
353: }
354:
355: VerifyHeader(short type, Address from) {
356: this (type);
357: this .from = from;
358: }
359:
360: public String toString() {
361: switch (type) {
362: case ARE_YOU_DEAD:
363: return "[VERIFY_SUSPECT: ARE_YOU_DEAD]";
364: case I_AM_NOT_DEAD:
365: return "[VERIFY_SUSPECT: I_AM_NOT_DEAD]";
366: default:
367: return "[VERIFY_SUSPECT: unknown type (" + type + ")]";
368: }
369: }
370:
371: public void writeExternal(ObjectOutput out) throws IOException {
372: out.writeShort(type);
373: out.writeObject(from);
374: }
375:
376: public void readExternal(ObjectInput in) throws IOException,
377: ClassNotFoundException {
378: type = in.readShort();
379: from = (Address) in.readObject();
380: }
381:
382: public void writeTo(DataOutputStream out) throws IOException {
383: out.writeShort(type);
384: Util.writeAddress(from, out);
385: }
386:
387: public void readFrom(DataInputStream in) throws IOException,
388: IllegalAccessException, InstantiationException {
389: type = in.readShort();
390: from = Util.readAddress(in);
391: }
392:
393: }
394:
395: }
|