001: // $Id: STABLE.java,v 2007/04/27 08:03:52 belaban Exp $
003: package org.jgroups.protocols;
005: import org.jgroups.*;
006: import org.jgroups.blocks.GroupRequest;
007: import org.jgroups.blocks.MethodCall;
008: import org.jgroups.stack.RpcProtocol;
009: import org.jgroups.util.TimeScheduler;
010: import org.jgroups.util.Util;
012: import java.util.Properties;
013: import java.util.Vector;
015: /**
016: * Computes the broadcast messages that are stable; i.e., that have been received
017: * by all members. Sends STABLE events up the stack when this is the case.
018: * Uses a probabilistic scheme to do so, as described in:<br>
019: * GSGC: An Efficient Gossip-Style Garbage Collection Scheme for Scalable
020: * Reliable Multicast, K. Guo et al., 1997.
021: * <p>
022: * The only difference is that instead of using counters for an estimation of
023: * messages received from each member, we retrieve this actual information
024: * from the NAKACK layer (which must be present for the STABLE protocol to
025: * work).
026: * <p>
027: * Note: the the <tt>Event.MSG</tt> call path path must be as lightweight as
028: * possible. It should not request any lock for which there is a high
029: * contention and/or long delay.
030: * <p>
031: * <pre>
032: * Changes(igeorg - 2.VI.2001):
033: * i. Thread-safety (in RPC calls most notably on the lines of Gianluca
034: * Collot's bugfix)
035: * ii. All slow calls (RPCs, seqnos requests, etc.) placed outside locks
036: * iii. Removed redundant initialization in adaptation to a higher round
037: * iv. heard_from[this meber] is always set to true on every new round
038: * (i.e. on every stability bcast).
039: * v. Replaced gossip thread with <tt>TimeScheduler.Task</tt>
040: * </pre>
041: * <p>
042: * [[[ TODO(igeorg - 2.VI.2001)
043: * i. Faster stability convergence by better selection of gossip subsets
044: * (replace Util.pickSubset()).
045: * ii. Special mutex on the <tt>Event.MSG</tt> call path. I.e. remove
046: * <tt>synchronized(this)</t>> with e.g. <tt>synchronized(msg_mutex)</tt>.
047: * ]] TODO
048: */
049: public class STABLE extends RpcProtocol {
050: /** The protocol name */
051: private static final String PROT_NAME = "STABLE";
053: /** Default subgroup size for gossiping expressed as percentage overthe group's size */
054: private static final double SUBSET_SIZE = 0.1;
056: /** Default max number of msgs to wait for before sending gossip */
057: private static final int GOSSIP_MSG_INTERVAL = 100;
059: /** Default max time to wait before sending gossip (ms) */
060: private static final int GOSSIP_INTERVAL = 10000;
062: private Address local_addr = null;
063: private ViewId vid = null;
064: private final Vector mbrs = new Vector(11);
066: /** gossip round */
067: private long round = 1;
069: /** highest seqno received for each member (corresponds to membership) */
070: private long[] seqnos = new long[0];
072: /** Array of members from which we have received a gossip in the current round */
073: private boolean[] heard_from = new boolean[0];
075: /** Percentage of members to which gossip is sent (parameterizable by user) */
076: private double subset = SUBSET_SIZE;
078: /** The gossiping task scheduler */
079: private TimeScheduler sched = null;
081: private Task gossip_task;
083: /** wait for n messages until sending gossip ... */
084: private int max_msgs = GOSSIP_MSG_INTERVAL;
086: /** ... or until max_wait_time has elapsed, whichever comes first */
087: private long max_wait_time = GOSSIP_INTERVAL;
089: /** Current number of msgs left to be received before sending a gossip */
090: private long num_msgs = max_msgs;
092: /** mutex for interacting with NAKACK layer (GET_MSGS_RECVD) */
093: private final Object highest_seqnos_mutex = new Object();
095: /** Time to wait for a reply from NAKACK layer (GET_MSGS_RECVD) */
096: private long highest_seqnos_timeout = 4000;
098: /**
099: * @return this protocol name
100: */
101: public String getName() {
102: return (PROT_NAME);
103: }
105: /**
106: * The events expected to be handled from some layer above:
107: * <ul>
108: * <li>
110: * </li>
111: * </ul>
112: * @return a list of events expected by to be handled from some layer
113: * above
114: */
115: public Vector requiredUpServices() {
116: Vector retval = new Vector(1);
117: retval.addElement(new Integer(Event.GET_MSGS_RECEIVED));
118: return retval;
119: }
121: /**
122: * Set the parameters for this layer.
123: *
124: * <ul>
125: * <li>
126: * <i>subset</i>: the percentage of the group'size to which the
127: * msgs_seen_so_far gossip is sent periodically.</li>
128: * <li>
129: * <i>max_msgs</i>: the max number of msgs to wait for between two
130: * consecutive gossipings.</li>
131: * <li>
132: * <i>max_wait_time</i>: the max time to wait for between two consecutive
133: * gossipings.</li>
134: * <li>
135: * <i>highest_seqno_timeout</i>: time to wait to receive from NAKACK
136: * the array of highest deliverable seqnos
137: * </li>
138: * </ul>
139: *
140: * @param props the list of parameters
141: */
142: public boolean setProperties(Properties props) {
143: String str;
145: super .setProperties(props);
146: str = props.getProperty("subset");
147: if (str != null) {
148: subset = Float.parseFloat(str);
149: props.remove("subset");
150: }
152: str = props.getProperty("max_msgs");
153: if (str != null) {
154: num_msgs = max_msgs = Integer.parseInt(str);
155: if (max_msgs <= 1) {
156: if (log.isFatalEnabled())
157: log
158: .fatal("value for 'max_msgs' must be greater than 1 !");
159: return false;
160: }
161: props.remove("max_msgs");
162: }
164: str = props.getProperty("max_wait_time");
165: if (str != null) {
166: max_wait_time = Long.parseLong(str);
167: props.remove("max_wait_time");
168: }
170: str = props.getProperty("highest_seqnos_timeout");
171: if (str != null) {
172: highest_seqnos_timeout = Long.parseLong(str);
173: props.remove("highest_seqnos_timeout");
174: }
176: if (props.size() > 0) {
177: log
178: .error("STABLE.setProperties(): these properties are not recognized: "
179: + props);
181: return false;
182: }
183: return true;
184: }
186: /**
187: * Start the layer:
188: * i. Set the gossip task scheduler
189: * ii. Reset the layer's state.
190: * iii. Start the gossiping task
191: */
192: public void start() throws Exception {
193: TimeScheduler timer;
195: super .start();
196: timer = stack != null ? stack.timer : null;
197: if (timer == null)
198: throw new Exception("STABLE.start(): timer is null");
200: sched = timer;
202: // we use only asynchronous method invocations...
203: if (_corr != null)
204: _corr.setDeadlockDetection(false);
205: initialize();
206: startGossip();
207: }
209: /**
210: * Stop scheduling the gossip task
211: */
212: public void stop() {
213: super .stop();
214: synchronized (this ) {
215: if (gossip_task != null)
216: gossip_task.cancel();
217: gossip_task = null;
218: }
219: }
221: /* ------------------------- Request handler methods ------------------ */
223: /**
224: * Contains the highest sequence numbers as seen by <code>sender</code>
225: *
226: * @param view_id The view ID in which the gossip was sent. Must be the
227: * same as ours, otherwise it is discarded
228: *
229: * @param gossip_round The round in which the gossip was sent
230: *
231: * @param gossip_seqnos A vector with the highest sequence numbers as
232: * seen by <code>sender</code>
233: *
234: * @param heard The sender's <code>heard_from</code> array. This allows
235: * us to minimize the gossip msgs for a given round as a member does not
236: * have to receive gossip msgs from each member, but members pass gossips
237: * they've received from others on in their own gossips. E.g. when a
238: * member P (of group {P,Q,R}) receives a gossip from R, its own gossip
239: * to Q might be {R,P}. Q, who hasn't received a gossip from R, will not
240: * need to receive it anymore as it is already sent by P. This simple
241: * scheme reduces the number of gossip messages needed.
242: *
243: * @param sender The sender of the gossip message (obviously :-))
244: */
245: public void gossip(ViewId view_id, long gossip_round,
246: long[] gossip_seqnos, boolean[] heard, Object sender) {
247: Object[] params;
248: MethodCall call;
250: synchronized (this ) {
252: if (log.isInfoEnabled())
253: log.info("sender=" + sender + ", round=" + gossip_round
254: + ", seqnos="
255: + Util.array2String(gossip_seqnos) + ", heard="
256: + Util.array2String(heard));
257: if (vid == null || view_id == null || !vid.equals(view_id)) {
259: if (log.isInfoEnabled())
260: log
261: .info("view ID s are different (" + vid
262: + " != " + view_id
263: + "). Discarding gossip received");
264: return;
265: }
266: if (gossip_round < this .round) {
268: if (log.isInfoEnabled())
269: log
270: .info("received a gossip from a previous round ("
271: + gossip_round
272: + "); my round is "
273: + round + ". Discarding gossip");
274: return;
275: }
276: if (gossip_seqnos == null || seqnos == null
277: || seqnos.length != gossip_seqnos.length) {
279: if (log.isWarnEnabled())
280: log
281: .warn("size of seqnos and gossip_seqnos are not equal ! "
282: + "Discarding gossip");
283: return;
284: }
286: // (1) If round greater than local round:
287: // i. Adjust the local to the received round
288: //
289: // (2)
290: // i. local_seqnos = arrayMin(local_seqnos, gossip_seqnos)
291: // ii. local_heard = arrayMax(local_heard, gossip_heard)
292: // iii. If heard from all, bcast our seqnos (stability vector)
293: if (round == gossip_round) {
294: update(sender, gossip_seqnos, heard);
295: } else if (round < gossip_round) {
297: if (log.isInfoEnabled())
298: log.info("received a gossip from a higher round ("
299: + gossip_round + "); adopting my round ("
300: + round + ") to " + gossip_round);
301: round = gossip_round;
302: set(sender, gossip_seqnos, heard_from);
303: }
305: if (log.isInfoEnabled())
306: log.info("heard_from=" + Util.array2String(heard_from));
307: if (!heardFromAll())
308: return;
310: params = new Object[] { vid.clone(),
311: new Long(gossip_round), seqnos.clone(), local_addr };
312: } // synchronized(this)
314: call = new MethodCall("stability", params, new String[] {
315: ViewId.class.getName(), long.class.getName(),
316: long[].class.getName(), Object.class.getName() });
317: callRemoteMethods(null, call, GroupRequest.GET_NONE, 0);
318: }
320: /**
321: * Contains the highest message sequence numbers (for each member) that
322: * can safely be deleted (because they have been seen by all members).
323: */
324: public void stability(ViewId view_id, long gossip_round,
325: long[] stability_vector, Object sender) {
326: // i. Proceed to the next round; init the heard from list
327: // ii. Send up the stability vector
328: // iii. get a fresh copy of the highest deliverable seqnos
329: synchronized (this ) {
331: if (log.isInfoEnabled())
332: log.info("sender=" + sender + ", round=" + gossip_round
333: + ", vector="
334: + Util.array2String(stability_vector) + ')');
335: if (vid == null || view_id == null || !vid.equals(view_id)) {
337: if (log.isInfoEnabled())
338: log
339: .info("view ID s are different (" + vid
340: + " != " + view_id
341: + "). Discarding gossip received");
342: return;
343: }
345: if (round > gossip_round)
346: return;
347: round = gossip_round + 1;
348: for (int i = 0; i < heard_from.length; i++)
349: heard_from[i] = false;
350: }
351: heard_from[mbrs.indexOf(local_addr)] = true;
353: passUp(new Event(Event.STABLE, stability_vector));
354: getHighestSeqnos();
355: }
357: /* --------------------- End of Request handler methods --------------- */
359: /**
360: * <b>Callback</b>. Called by superclass when event may be handled.
361: * <p>
362: * <b>Do not use <code>PassUp</code> in this method as the event is passed
363: * up by default by the superclass after this method returns !</b>
364: *
365: * @return boolean Defaults to true. If false, event will not be passed
366: * up the stack.
367: */
368: public boolean handleUpEvent(Event evt) {
369: switch (evt.getType()) {
370: case Event.MSG:
371: if (!upMsg(evt))
372: return (false);
373: break;
374: case Event.SET_LOCAL_ADDRESS:
375: local_addr = (Address) evt.getArg();
376: break;
377: }
379: return true;
380: }
382: /**
383: * <b>Callback</b>. Called by superclass when event may be handled.
384: * <p>
385: * <b>Do not use <code>PassDown</code> in this method as the event is
386: * passed down by default by the superclass after this method returns !</b>
387: *
388: * @return boolean Defaults to true. If false, event will not be passed
389: * down the stack.
390: */
391: public boolean handleDownEvent(Event evt) {
392: switch (evt.getType()) {
393: case Event.VIEW_CHANGE:
394: if (!downViewChange(evt))
395: return (false);
396: break;
397: // does anyone else below needs this msg except STABLE?
398: case Event.GET_MSGS_RECEIVED_OK:
399: if (!downGetMsgsReceived(evt))
400: return (false);
401: break;
402: }
404: return (true);
405: }
407: /**
408: * The gossip task that runs periodically
409: */
410: private void gossipRun() {
411: num_msgs = max_msgs;
412: sendGossip();
413: }
415: /**
416: * <pre>
417: * Reset the state of msg garbage-collection:
418: * i. Reset the table of highest seqnos seen by each member
419: * ii. Reset the tbl of mbrs from which highest seqnos have been recorded
420: * </pre>
421: */
422: private void initialize() {
423: synchronized (this ) {
424: seqnos = new long[mbrs.size()];
425: for (int i = 0; i < seqnos.length; i++)
426: seqnos[i] = -1;
428: heard_from = new boolean[mbrs.size()];
429: for (int i = 0; i < heard_from.length; i++)
430: heard_from[i] = false;
431: }
432: }
434: /**
435: * (1)<br>
436: * Merge this member's table of highest seqnos seen by a each member
437: * with the one received from a gossip by another member. The result is
438: * the element-wise minimum of the input arrays. For each entry:<br>
439: *
440: * <tt>seqno[mbr_i] = min(seqno[mbr_i], gossip_seqno[mbr_i])</tt>
441: * <p>
442: *
443: * (2)<br>
444: * Merge the <tt>heard from</tt> tables of this member and the sender of
445: * the gossip. The resulting table is:<br>
446: *
447: * <tt>heard_from[mbr_i] = heard_from[mbr_i] | sender_heard[mbr_i]</tt>
448: *
449: * @param sender the sender of the gossip
450: * @param gossip_seqnos the highest deliverable seqnos of the sender
451: * @param gossip_heard_from the table of members sender has heard from
452: *
453: */
454: private void update(Object sender, long[] gossip_seqnos,
455: boolean[] gossip_heard_from) {
456: int index;
458: synchronized (this ) {
459: index = mbrs.indexOf(sender);
460: if (index < 0) {
461: if (log.isWarnEnabled())
462: log.warn("sender " + sender
463: + " not found in mbrs !");
464: return;
465: }
467: for (int i = 0; i < gossip_seqnos.length; i++)
468: seqnos[i] = Math.min(seqnos[i], gossip_seqnos[i]);
470: heard_from[index] = true;
471: for (int i = 0; i < heard_from.length; i++)
472: heard_from[i] = heard_from[i] | gossip_heard_from[i];
473: }
474: }
476: /**
477: * Set the seqnos and heard_from arrays to those of the sender. The
478: * method is called when the sender seems to know more than this member.
479: * The situation occurs if either:
480: * <ul>
481: * <li>
482: * sender.heard_from > this.heard_from, i.e. the sender has heard
483: * from more members than we have</li>
484: * <li>
485: * sender.round > this.round, i.e. the sender is in a more recent round
486: * than we are</li>
487: * </ul>
488: *
489: * In both cases, this member is assigned the state of the sender
490: */
491: private void set(Object sender, long[] gossip_seqnos,
492: boolean[] gossip_heard_from) {
493: int index;
495: synchronized (this ) {
496: index = mbrs.indexOf(sender);
497: if (index < 0) {
498: if (log.isWarnEnabled())
499: log.warn("sender " + sender
500: + " not found in mbrs !");
501: return;
502: }
504: seqnos = gossip_seqnos;
505: heard_from = gossip_heard_from;
506: }
507: }
509: /**
510: * @return true, if we have received the highest deliverable seqnos
511: * directly or indirectly from all members
512: */
513: private boolean heardFromAll() {
514: synchronized (this ) {
515: if (heard_from == null)
516: return false;
517: for (int i = 0; i < heard_from.length; i++)
518: if (!heard_from[i])
519: return false;
520: }
522: return true;
523: }
525: /**
526: * Send our <code>seqnos</code> array to a subset of the membership
527: */
528: private void sendGossip() {
529: Vector gossip_subset;
530: Object[] params;
531: MethodCall call;
533: synchronized (this ) {
534: gossip_subset = Util.pickSubset(mbrs, subset);
535: if (gossip_subset == null || gossip_subset.size() < 1) {
536: if (log.isWarnEnabled())
537: log.warn("picked empty subset !");
538: return;
539: }
541: if (log.isInfoEnabled())
542: log.info("subset=" + gossip_subset + ", round=" + round
543: + ", seqnos=" + Util.array2String(seqnos));
545: params = new Object[] { vid.clone(), new Long(round),
546: seqnos.clone(), heard_from.clone(), local_addr };
547: }
549: call = new MethodCall("gossip", params, new String[] {
550: ViewId.class.getName(), long.class.getName(),
551: long[].class.getName(), boolean[].class.getName(),
552: Object.class.getName() });
553: for (int i = 0; i < gossip_subset.size(); i++) {
554: try {
555: callRemoteMethod((Address) gossip_subset.get(i), call,
556: GroupRequest.GET_NONE, 0);
557: } catch (Exception e) {
558: if (log.isDebugEnabled())
559: log.debug("exception=" + e);
560: }
561: }
562: }
564: /**
565: * Sends GET_MSGS_RECEIVED to NAKACK layer (above us !) and stores result
566: * in <code>seqnos</code>. In case <code>seqnos</code> does not yet exist
567: * it creates and initializes it.
568: */
569: private void getHighestSeqnos() {
570: synchronized (highest_seqnos_mutex) {
571: passUp(new Event(Event.GET_MSGS_RECEIVED));
573: try {
574: highest_seqnos_mutex.wait(highest_seqnos_timeout);
575: } catch (InterruptedException e) {
576: if (log.isErrorEnabled())
577: log
578: .error("Interrupted while waiting for highest seqnos from NAKACK");
579: }
580: }
581: }
583: /**
584: * Start scheduling the gossip task
585: */
586: private void startGossip() {
587: synchronized (this ) {
588: if (gossip_task != null)
589: gossip_task.cancel();
590: gossip_task = new Task(new Times(
591: new long[] { GOSSIP_INTERVAL }));
592: sched.add(gossip_task);
593: }
594: }
596: /**
597: * Received a <tt>MSG</tt> event from a layer below
598: *
599: * A msg received:
600: * If unicast ignore; if multicast and time for gossiping has been
601: * reached, send out a gossip to a subset of the mbrs
602: *
603: * @return true if the event should be forwarded to the layer above
604: */
605: private boolean upMsg(Event e) {
606: Message msg = (Message) e.getArg();
608: if (msg.getDest() != null
609: && (!msg.getDest().isMulticastAddress()))
610: return (true);
612: synchronized (this ) {
613: --num_msgs;
614: if (num_msgs > 0)
615: return (true);
616: num_msgs = max_msgs;
618: gossip_task.cancel();
619: gossip_task = new Task(new Times(new long[] { 0,
621: sched.add(gossip_task);
622: }
624: return (true);
625: }
627: /**
628: * Received a <tt>VIEW_CHANGE</tt> event from a layer above
629: *
630: * A new view:
631: * i. Set the new mbrs list and the new view ID.
632: * ii. Reset the highest deliverable seqnos seen
633: *
634: * @return true if the event should be forwarded to the layer below
635: */
636: private boolean downViewChange(Event e) {
637: View v = (View) e.getArg();
638: Vector new_mbrs = v.getMembers();
640: /*
641: // Could this ever happen? GMS is always sending non-null value
642: if(new_mbrs == null) {
643: / Trace.println(
644: "STABLE.handleDownEvent()", Trace.ERROR,
645: "Received VIEW_CHANGE event with null mbrs list");
646: break;
647: }
648: */
650: synchronized (this ) {
651: vid = v.getVid();
652: mbrs.clear();
653: mbrs.addAll(new_mbrs);
654: initialize();
655: }
657: return (true);
658: }
660: /**
661: * Received a <tt>GET_MSGS__RECEIVED_OK</tt> event from a layer above
662: *
663: * Updated list of highest deliverable seqnos:
664: * i. Update the local copy of highest deliverable seqnos
665: *
666: * @return true if the event should be forwarded to the layer below
667: */
668: private boolean downGetMsgsReceived(Event e) {
669: long[] new_seqnos = (long[]) e.getArg();
671: try {
672: synchronized (this ) {
673: if (new_seqnos == null)
674: return (true);
675: if (new_seqnos.length != seqnos.length) {
677: if (log.isInfoEnabled())
678: log
679: .info("GET_MSGS_RECEIVED: array of highest "
680: + "seqnos seen so far (received from NAKACK layer) "
681: + "has a different length ("
682: + new_seqnos.length
683: + ") from 'seqnos' array ("
684: + seqnos.length + ')');
685: return (true);
686: }
687: System.arraycopy(new_seqnos, 0, seqnos, 0,
688: seqnos.length);
689: }
691: } finally {
692: synchronized (highest_seqnos_mutex) {
693: highest_seqnos_mutex.notifyAll();
694: }
695: }
697: return (true);
698: }
700: /**
701: * Select next interval from list. Once the end of the list is reached,
702: * keep returning the last value. It would be sensible that list of
703: * times is in increasing order
704: */
705: private static class Times {
706: private int next = 0;
707: private long[] times;
709: Times(long[] times) {
710: if (times.length == 0)
711: throw new IllegalArgumentException("times");
712: this .times = times;
713: }
715: public synchronized long next() {
716: if (next >= times.length)
717: return (times[times.length - 1]);
718: else
719: return (times[next++]);
720: }
722: public long[] times() {
723: return (times);
724: }
726: public synchronized void reset() {
727: next = 0;
728: }
729: }
731: /**
732: * The gossiping task
733: */
734: private class Task implements TimeScheduler.Task {
735: private final Times intervals;
736: private boolean cancelled = false;
738: Task(Times intervals) {
739: this .intervals = intervals;
740: }
742: public long nextInterval() {
743: return (intervals.next());
744: }
746: public boolean cancelled() {
747: return (cancelled);
748: }
750: public void cancel() {
751: cancelled = true;
752: }
754: public void run() {
755: gossipRun();
756: }
757: }
758: }