001: // $Id: FD_PID.java,v 1.8.10.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.stack.IpAddress;
007: import org.jgroups.stack.Protocol;
008: import org.jgroups.util.Promise;
009: import org.jgroups.util.TimeScheduler;
010: import org.jgroups.util.Util;
011:
012: import java.io.IOException;
013: import java.io.ObjectInput;
014: import java.io.ObjectOutput;
015: import java.net.InetAddress;
016: import java.util.Enumeration;
017: import java.util.Hashtable;
018: import java.util.Properties;
019: import java.util.Vector;
020:
021: /**
022: * Process-ID based FD protocol. The existence of a process will be tested
023: * via the process ID instead of message based pinging. In order to probe a process' existence, the application (or
024: * some other protocol layer) has to send down a SET_PID event for the member. The addresses of all members will
025: * be associated with their respective PIDs. The PID will be used to probe for the existence of that process.<p>
026: * A cache of Addresses and PIDs is maintained in each member, which is adjusted upon reception of view changes.
027: * The population of the addr:pid cache is as follows:<br>
028: * When a new member joins, it requests the PID cache from the coordinator. Then it broadcasts its own addr:pid
029: * association, so all members can update their cache. When a member P is to be pinged by Q, and Q doesn't have
030: * P'd PID, Q will broadcast a WHO_HAS_PID message, to which all members who have that entry will respond. The
031: * latter case should actually never happen because all members should always have consistent caches. However,
032: * it is left in the code as a second line of defense.<p>
033: * Note that
034: * <em>1. The SET_PID has to be sent down after connecting to a channel !</em><p>
035: * <em>2. Note that if a process is shunned and subsequently reconnects, the SET_PID event has to be resent !</em><p>
036: * <em>3. This protocol only works for groups whose members are on the same host </em>. 'Host' actually means the
037: * same IP address (e.g. for multi-homed systems).
038: */
039: public class FD_PID extends Protocol {
040: Address ping_dest = null; // address of the member we monitor
041: int ping_pid = 0; // PID of the member we monitor
042: Address local_addr = null; // our own address
043: int local_pid = 0; // PID of this process
044: long timeout = 3000; // msecs to wait for an are-you-alive msg
045: long get_pids_timeout = 3000; // msecs to wait for the PID cache from the coordinator
046: final long get_pids_retry_timeout = 500; // msecs to wait until we retry fetching the cache from the coord
047: int num_tries = 3; // attempts the coord is solicited for PID cache until we give up
048: final Vector members = new Vector(); // list of group members (updated on VIEW_CHANGE)
049: final Hashtable pids = new Hashtable(); // keys=Addresses, vals=Integer (PIDs)
050: boolean own_pid_sent = false; // has own PID been broadcast yet ?
051: final Vector pingable_mbrs = new Vector(); // mbrs from which we select ping_dest. possible subset of 'members'
052: final Promise get_pids_promise = new Promise(); // used for rendezvous on GET_PIDS and GET_PIDS_RSP
053: boolean got_cache_from_coord = false; // was cache already fetched ?
054: TimeScheduler timer = null; // timer for recurring task of liveness pinging
055: Monitor monitor = null; // object that performs the actual monitoring
056:
057: public String getName() {
058: return "FD_PID";
059: }
060:
061: public boolean setProperties(Properties props) {
062: String str;
063:
064: super .setProperties(props);
065: str = props.getProperty("timeout");
066: if (str != null) {
067: timeout = Long.parseLong(str);
068: props.remove("timeout");
069: }
070:
071: str = props.getProperty("get_pids_timeout");
072: if (str != null) {
073: get_pids_timeout = Long.parseLong(str);
074: props.remove("get_pids_timeout");
075: }
076:
077: str = props.getProperty("num_tries");
078: if (str != null) {
079: num_tries = Integer.parseInt(str);
080: props.remove("num_tries");
081: }
082:
083: if (props.size() > 0) {
084: log
085: .error("FD_PID.setProperties(): the following properties are not recognized: "
086: + props);
087:
088: return false;
089: }
090: return true;
091: }
092:
093: public void start() throws Exception {
094: if (stack != null && stack.timer != null)
095: timer = stack.timer;
096: else {
097: if (log.isWarnEnabled())
098: log
099: .warn("TimeScheduler in protocol stack is null (or protocol stack is null)");
100: return;
101: }
102:
103: if (monitor != null && monitor.started == false) {
104: monitor = null;
105: }
106: if (monitor == null) {
107: monitor = new Monitor();
108: timer.add(monitor, true); // fixed-rate scheduling
109: }
110: }
111:
112: public void stop() {
113: if (monitor != null) {
114: monitor.stop();
115: monitor = null;
116: }
117: }
118:
119: public void up(Event evt) {
120: Message msg;
121: FdHeader hdr = null;
122: Object tmphdr;
123:
124: switch (evt.getType()) {
125:
126: case Event.SET_LOCAL_ADDRESS:
127: local_addr = (Address) evt.getArg();
128: break;
129:
130: case Event.MSG:
131: msg = (Message) evt.getArg();
132: tmphdr = msg.getHeader(getName());
133: if (tmphdr == null || !(tmphdr instanceof FdHeader))
134: break; // message did not originate from FD_PID layer, just pass up
135:
136: hdr = (FdHeader) msg.removeHeader(getName());
137:
138: switch (hdr.type) {
139:
140: case FdHeader.SUSPECT:
141: if (hdr.mbr != null) {
142:
143: if (log.isInfoEnabled())
144: log.info("[SUSPECT] hdr: " + hdr);
145: passUp(new Event(Event.SUSPECT, hdr.mbr));
146: passDown(new Event(Event.SUSPECT, hdr.mbr));
147: }
148: break;
149:
150: // If I have the PID for the address 'hdr.mbr', return it. Otherwise look it up in my cache and return it
151: case FdHeader.WHO_HAS_PID:
152: if (local_addr != null
153: && local_addr.equals(msg.getSrc()))
154: return; // don't reply to WHO_HAS bcasts sent by me !
155:
156: if (hdr.mbr == null) {
157: if (log.isErrorEnabled())
158: log.error("[WHO_HAS_PID] hdr.mbr is null");
159: return;
160: }
161:
162: // 1. Try my own address, maybe it's me whose PID is wanted
163: if (local_addr != null && local_addr.equals(hdr.mbr)
164: && local_pid > 0) {
165: sendIHavePidMessage(msg.getSrc(), hdr.mbr,
166: local_pid); // unicast message to msg.getSrc()
167: return;
168: }
169:
170: // 2. If I don't have it, maybe it is in the cache
171: if (pids.containsKey(hdr.mbr))
172: sendIHavePidMessage(msg.getSrc(), hdr.mbr,
173: ((Integer) pids.get(hdr.mbr)).intValue()); // ucast msg
174: break;
175:
176: // Update the cache with the add:pid entry (if on the same host)
177: case FdHeader.I_HAVE_PID:
178:
179: if (log.isInfoEnabled())
180: log.info("i-have pid: " + hdr.mbr + " --> "
181: + hdr.pid);
182:
183: if (hdr.mbr == null || hdr.pid <= 0) {
184: if (log.isErrorEnabled())
185: log
186: .error("[I_HAVE_PID] hdr.mbr is null or hdr.pid == 0");
187: return;
188: }
189:
190: if (!sameHost(local_addr, hdr.mbr)) {
191: if (log.isErrorEnabled())
192: log.error(hdr.mbr
193: + " is not on the same host as I ("
194: + local_addr
195: + ", discarding I_HAVE_PID event");
196: return;
197: }
198:
199: // if(!pids.containsKey(hdr.mbr))
200: pids.put(hdr.mbr, new Integer(hdr.pid)); // update the cache
201:
202: if (log.isInfoEnabled())
203: log.info("[" + local_addr + "]: the cache is "
204: + pids);
205:
206: if (ping_pid <= 0 && ping_dest != null
207: && pids.containsKey(ping_dest)) {
208: ping_pid = ((Integer) pids.get(ping_dest))
209: .intValue();
210: try {
211: start();
212: } catch (Exception ex) {
213: if (log.isWarnEnabled())
214: log.warn("exception when calling start(): "
215: + ex);
216: }
217: }
218: break;
219:
220: // Return the cache to the sender of this message
221: case FdHeader.GET_PIDS:
222: if (hdr.mbr == null) {
223:
224: if (log.isErrorEnabled())
225: log.error("[GET_PIDS]: hdr.mbr == null");
226: return;
227: }
228: hdr = new FdHeader(FdHeader.GET_PIDS_RSP);
229: hdr.pids = (Hashtable) pids.clone();
230: msg = new Message(hdr.mbr, null, null);
231: msg.putHeader(getName(), hdr);
232: passDown(new Event(Event.MSG, msg));
233: break;
234:
235: case FdHeader.GET_PIDS_RSP:
236: if (hdr.pids == null) {
237:
238: if (log.isErrorEnabled())
239: log.error("[GET_PIDS_RSP]: cache is null");
240: return;
241: }
242: get_pids_promise.setResult(hdr.pids);
243: break;
244: }
245: return;
246: }
247:
248: passUp(evt); // pass up to the layer above us
249: }
250:
251: public void down(Event evt) {
252: Integer pid;
253: Address mbr, tmp_ping_dest;
254: View v;
255:
256: switch (evt.getType()) {
257:
258: case Event.SET_PID:
259: // 1. Set the PID for local_addr
260: pid = (Integer) evt.getArg();
261: if (pid == null) {
262: if (log.isErrorEnabled())
263: log.error("SET_PID did not contain a pid !");
264: return;
265: }
266: local_pid = pid.intValue();
267:
268: if (log.isInfoEnabled())
269: log.info("local_pid=" + local_pid);
270: break;
271:
272: case Event.VIEW_CHANGE:
273: synchronized (this ) {
274: v = (View) evt.getArg();
275: members.removeAllElements();
276: members.addAll(v.getMembers());
277: pingable_mbrs.removeAllElements();
278: pingable_mbrs.addAll(members);
279: passDown(evt);
280:
281: // 1. Get the addr:pid cache from the coordinator (only if not already fetched)
282: if (!got_cache_from_coord) {
283: getPidsFromCoordinator();
284: got_cache_from_coord = true;
285: }
286:
287: // 2. Broadcast my own addr:pid to all members so they can update their cache
288: if (!own_pid_sent) {
289: if (local_pid > 0) {
290: sendIHavePidMessage(null, // send to all members
291: local_addr, local_pid);
292: own_pid_sent = true;
293: } else if (log.isWarnEnabled())
294: log.warn("[VIEW_CHANGE]: local_pid == 0");
295: }
296:
297: // 3. Remove all entries in 'pids' which are not in the new membership
298: if (members != null) {
299: for (Enumeration e = pids.keys(); e
300: .hasMoreElements();) {
301: mbr = (Address) e.nextElement();
302: if (!members.contains(mbr))
303: pids.remove(mbr);
304: }
305: }
306: tmp_ping_dest = determinePingDest();
307: ping_pid = 0;
308: if (tmp_ping_dest == null) {
309: stop();
310: ping_dest = null;
311: } else {
312: ping_dest = tmp_ping_dest;
313: try {
314: start();
315: } catch (Exception ex) {
316: if (log.isWarnEnabled())
317: log.warn("exception when calling start(): "
318: + ex);
319: }
320: }
321: }
322: break;
323:
324: default:
325: passDown(evt);
326: break;
327: }
328: }
329:
330: /* ----------------------------------- Private Methods -------------------------------------- */
331:
332: /**
333: * Determines coordinator C. If C is null and we are the first member, return. Else loop: send GET_PIDS message
334: * to coordinator and wait for GET_PIDS_RSP response. Loop until valid response has been received.
335: */
336: void getPidsFromCoordinator() {
337: Address coord;
338: int attempts = num_tries;
339: Message msg;
340: FdHeader hdr;
341: Hashtable result;
342:
343: get_pids_promise.reset();
344: while (attempts > 0) {
345: if ((coord = determineCoordinator()) != null) {
346: if (coord.equals(local_addr)) { // we are the first member --> empty cache
347:
348: if (log.isInfoEnabled())
349: log.info("first member; cache is empty");
350: return;
351: }
352: hdr = new FdHeader(FdHeader.GET_PIDS);
353: hdr.mbr = local_addr;
354: msg = new Message(coord, null, null);
355: msg.putHeader(getName(), hdr);
356: passDown(new Event(Event.MSG, msg));
357: result = (Hashtable) get_pids_promise
358: .getResult(get_pids_timeout);
359: if (result != null) {
360: pids.putAll(result); // replace all entries (there should be none !) in pids with the new values
361:
362: if (log.isInfoEnabled())
363: log.info("got cache from " + coord
364: + ": cache is " + pids);
365: return;
366: } else {
367:
368: if (log.isErrorEnabled())
369: log.error("received null cache; retrying");
370: }
371: }
372:
373: Util.sleep(get_pids_retry_timeout);
374: --attempts;
375: }
376: }
377:
378: void broadcastSuspectMessage(Address suspected_mbr) {
379: Message suspect_msg;
380: FdHeader hdr;
381:
382: if (log.isInfoEnabled())
383: log.info("suspecting " + suspected_mbr
384: + " (own address is " + local_addr + ')');
385:
386: hdr = new FdHeader(FdHeader.SUSPECT);
387: hdr.mbr = suspected_mbr;
388: suspect_msg = new Message(); // mcast SUSPECT to all members
389: suspect_msg.putHeader(getName(), hdr);
390: passDown(new Event(Event.MSG, suspect_msg));
391: }
392:
393: void broadcastWhoHasPidMessage(Address mbr) {
394: Message msg;
395: FdHeader hdr;
396:
397: if (local_addr != null && mbr != null)
398: if (log.isInfoEnabled())
399: log.info("[" + local_addr + "]: who-has " + mbr);
400:
401: msg = new Message(); // bcast msg
402: hdr = new FdHeader(FdHeader.WHO_HAS_PID);
403: hdr.mbr = mbr;
404: msg.putHeader(getName(), hdr);
405: passDown(new Event(Event.MSG, msg));
406: }
407:
408: /**
409: * Sends or broadcasts a I_HAVE_PID response. If 'dst' is null, the reponse will be broadcast, otherwise
410: * it will be unicast back to the requester
411: */
412: void sendIHavePidMessage(Address dst, Address mbr, int pid) {
413: Message msg = new Message(dst, null, null);
414: FdHeader hdr = new FdHeader(FdHeader.I_HAVE_PID);
415: hdr.mbr = mbr;
416: hdr.pid = pid;
417: msg.putHeader(getName(), hdr);
418: passDown(new Event(Event.MSG, msg));
419: }
420:
421: /**
422: * Set ping_dest and ping_pid. If ping_pid is not known, broadcast a WHO_HAS_PID message.
423: */
424: Address determinePingDest() {
425: Address tmp;
426:
427: if (pingable_mbrs == null || pingable_mbrs.size() < 2
428: || local_addr == null)
429: return null;
430: for (int i = 0; i < pingable_mbrs.size(); i++) {
431: tmp = (Address) pingable_mbrs.elementAt(i);
432: if (local_addr.equals(tmp)) {
433: if (i + 1 >= pingable_mbrs.size())
434: return (Address) pingable_mbrs.elementAt(0);
435: else
436: return (Address) pingable_mbrs.elementAt(i + 1);
437: }
438: }
439: return null;
440: }
441:
442: Address determineCoordinator() {
443: return members.size() > 0 ? (Address) members.elementAt(0)
444: : null;
445: }
446:
447: /**
448: * Checks whether 2 Addresses are on the same host
449: */
450: boolean sameHost(Address one, Address two) {
451: InetAddress a, b;
452: String host_a, host_b;
453:
454: if (one == null || two == null)
455: return false;
456: if (!(one instanceof IpAddress) || !(two instanceof IpAddress)) {
457: if (log.isErrorEnabled())
458: log
459: .error("addresses have to be of type IpAddress to be compared");
460: return false;
461: }
462:
463: a = ((IpAddress) one).getIpAddress();
464: b = ((IpAddress) two).getIpAddress();
465: if (a == null || b == null)
466: return false;
467: host_a = a.getHostAddress();
468: host_b = b.getHostAddress();
469: return host_a.equals(host_b);
470: }
471:
472: /* ------------------------------- End of Private Methods ------------------------------------ */
473:
474: public static class FdHeader extends Header {
475: static final int SUSPECT = 10;
476: static final int WHO_HAS_PID = 11;
477: static final int I_HAVE_PID = 12;
478: static final int GET_PIDS = 13; // sent by joining member to coordinator
479: static final int GET_PIDS_RSP = 14; // sent by coordinator to joining member in response to GET_PIDS
480:
481: int type = SUSPECT;
482: Address mbr = null; // set on SUSPECT (suspected mbr), WHO_HAS_PID (requested mbr), I_HAVE_PID
483: int pid = 0; // set on I_HAVE_PID
484: Hashtable pids = null; // set on GET_PIDS_RSP
485:
486: public FdHeader() {
487: } // used for externalization
488:
489: FdHeader(int type) {
490: this .type = type;
491: }
492:
493: public String toString() {
494: StringBuffer sb = new StringBuffer();
495: sb.append(type2String(type));
496: if (mbr != null)
497: sb.append(", mbr=" + mbr);
498: if (pid > 0)
499: sb.append(", pid=" + pid);
500: if (pids != null)
501: sb.append(", pids=" + pids);
502: return sb.toString();
503: }
504:
505: public static String type2String(int type) {
506: switch (type) {
507: case SUSPECT:
508: return "SUSPECT";
509: case WHO_HAS_PID:
510: return "WHO_HAS_PID";
511: case I_HAVE_PID:
512: return "I_HAVE_PID";
513: case GET_PIDS:
514: return "GET_PIDS";
515: case GET_PIDS_RSP:
516: return "GET_PIDS_RSP";
517: default:
518: return "unknown type (" + type + ')';
519: }
520: }
521:
522: public void writeExternal(ObjectOutput out) throws IOException {
523: out.writeInt(type);
524: out.writeObject(mbr);
525: out.writeInt(pid);
526: out.writeObject(pids);
527: }
528:
529: public void readExternal(ObjectInput in) throws IOException,
530: ClassNotFoundException {
531: type = in.readInt();
532: mbr = (Address) in.readObject();
533: pid = in.readInt();
534: pids = (Hashtable) in.readObject();
535: }
536:
537: }
538:
539: /**
540: * An instance of this class will be added to the TimeScheduler to be scheduled to be run every timeout
541: * msecs. When there is no ping_dest (e.g. only 1 member in the group), this task will be cancelled in
542: * TimeScheduler (and re-scheduled if ping_dest becomes available later).
543: */
544: private class Monitor implements TimeScheduler.Task {
545: boolean started = true;
546:
547: void stop() {
548: started = false;
549: }
550:
551: /* -------------------------------------- TimeScheduler.Task Interface -------------------------------- */
552:
553: public boolean cancelled() {
554: return !started;
555: }
556:
557: public long nextInterval() {
558: return timeout;
559: }
560:
561: /**
562: * Periodically probe for the destination process identified by ping_dest/ping_pid. Suspect the ping_dest
563: * member if /prop/<ping_pid> process does not exist.
564: */
565: public void run() {
566: if (ping_dest == null) {
567: if (log.isWarnEnabled())
568: log.warn("ping_dest is null, skipping ping");
569: return;
570: }
571:
572: if (log.isInfoEnabled())
573: log.info("ping_dest=" + ping_dest + ", ping_pid="
574: + ping_pid + ", cache=" + pids);
575:
576: // If the PID for ping_dest is not known, send a broadcast to solicit it
577: if (ping_pid <= 0) {
578: if (ping_dest != null && pids.containsKey(ping_dest)) {
579: ping_pid = ((Integer) pids.get(ping_dest))
580: .intValue();
581:
582: if (log.isInfoEnabled())
583: log.info("found PID for " + ping_dest
584: + " in cache (pid=" + ping_pid + ')');
585: } else {
586:
587: if (log.isErrorEnabled())
588: log.error("PID for " + ping_dest + " not known"
589: + ", cache is " + pids);
590: broadcastWhoHasPidMessage(ping_dest);
591: return;
592: }
593: }
594:
595: if (!Util.fileExists("/proc/" + ping_pid)) {
596:
597: if (log.isInfoEnabled())
598: log.info("process " + ping_pid + " does not exist");
599: broadcastSuspectMessage(ping_dest);
600: pingable_mbrs.removeElement(ping_dest);
601: ping_dest = determinePingDest();
602: if (ping_dest == null)
603: stop();
604: ping_pid = 0;
605: } else {
606:
607: if (log.isInfoEnabled())
608: log.info(ping_dest + " is alive");
609: }
610: }
611:
612: /* ---------------------------------- End of TimeScheduler.Task Interface ---------------------------- */
613:
614: }
615:
616: }
|