001: // $Id: STABLE.java,v 1.46.6.1 2007/04/27 08:03:55 belaban Exp $
002:
003: package org.jgroups.protocols.pbcast;
004:
005: import org.jgroups.*;
006: import org.jgroups.stack.Protocol;
007: import org.jgroups.util.Streamable;
008: import org.jgroups.util.TimeScheduler;
009: import org.jgroups.util.Util;
010:
011: import java.io.*;
012: import java.util.Iterator;
013: import java.util.Map;
014: import java.util.Properties;
015: import java.util.Vector;
016:
017: /**
018: * Computes the broadcast messages that are stable; i.e., have been received by all members. Sends
019: * STABLE events up the stack when this is the case. This allows NAKACK to garbage collect messages that
020: * have been seen by all members.<p>
021: * Works as follows: periodically we mcast our highest seqnos (seen for each member) to the group.
022: * A stability vector, which maintains the highest seqno for each member and initially contains no data,
023: * is updated when such a message is received. The entry for a member P is computed set to
024: * min(entry[P], digest[P]). When messages from all members have been received, a stability
025: * message is mcast, which causes all members to send a STABLE event up the stack (triggering garbage collection
026: * in the NAKACK layer).<p>
027: * The stable task now terminates after max_num_gossips if no messages or view changes have been sent or received
028: * in the meantime. It will resume when messages are received. This effectively suspends sending superfluous
029: * STABLE messages in the face of no activity.<br/>
030: * New: when <code>max_bytes</code> is exceeded (unless disabled by setting it to 0),
031: * a STABLE task will be started (unless it is already running).
032: * @author Bela Ban
033: */
034: public class STABLE extends Protocol {
035: Address local_addr = null;
036: final Vector mbrs = new Vector();
037: final Digest digest = new Digest(10); // keeps track of the highest seqnos from all members
038: final Digest latest_local_digest = new Digest(10); // keeps track of the latest digests received from NAKACK
039: final Vector heard_from = new Vector(); // keeps track of who we already heard from (STABLE_GOSSIP msgs)
040:
041: /** Sends a STABLE gossip every 20 seconds on average. 0 disables gossipping of STABLE messages */
042: long desired_avg_gossip = 20000;
043:
044: /** delay before we send STABILITY msg (give others a change to send first). This should be set to a very
045: * small number (> 0 !) if <code>max_bytes</code> is used */
046: long stability_delay = 6000;
047: private StabilitySendTask stability_task = null;
048: final Object stability_mutex = new Object(); // to synchronize on stability_task
049: private volatile StableTask stable_task = null; // bcasts periodic STABLE message (added to timer below)
050: final Object stable_task_mutex = new Object(); // to sync on stable_task
051: TimeScheduler timer = null; // to send periodic STABLE msgs (and STABILITY messages)
052: static final String name = "STABLE";
053:
054: /** Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE
055: * message will be broadcast and <code>num_bytes_received</code> reset to 0 . If this is > 0, then ideally
056: * <code>stability_delay</code> should be set to a low number as well */
057: long max_bytes = 0;
058:
059: /** The total number of bytes received from unicast and multicast messages */
060: long num_bytes_received = 0;
061:
062: /** When true, don't take part in garbage collection protocol: neither send STABLE messages nor
063: * handle STABILITY messages */
064: boolean suspended = false;
065:
066: boolean initialized = false;
067:
068: private ResumeTask resume_task = null;
069: final Object resume_task_mutex = new Object();
070:
071: /** Number of gossip messages */
072: int num_gossips = 0;
073:
074: private static final long MAX_SUSPEND_TIME = 200000;
075:
076: public String getName() {
077: return name;
078: }
079:
080: public long getDesiredAverageGossip() {
081: return desired_avg_gossip;
082: }
083:
084: public void setDesiredAverageGossip(long gossip_interval) {
085: desired_avg_gossip = gossip_interval;
086: }
087:
088: public long getMaxBytes() {
089: return max_bytes;
090: }
091:
092: public void setMaxBytes(long max_bytes) {
093: this .max_bytes = max_bytes;
094: }
095:
096: public int getNumberOfGossipMessages() {
097: return num_gossips;
098: }
099:
100: public void resetStats() {
101: super .resetStats();
102: num_gossips = 0;
103: }
104:
105: public Vector requiredDownServices() {
106: Vector retval = new Vector();
107: retval.addElement(new Integer(Event.GET_DIGEST_STABLE)); // NAKACK layer
108: return retval;
109: }
110:
111: public boolean setProperties(Properties props) {
112: String str;
113:
114: super .setProperties(props);
115: str = props.getProperty("digest_timeout");
116: if (str != null) {
117: props.remove("digest_timeout");
118: log
119: .error("digest_timeout has been deprecated; it will be ignored");
120: }
121:
122: str = props.getProperty("desired_avg_gossip");
123: if (str != null) {
124: desired_avg_gossip = Long.parseLong(str);
125: props.remove("desired_avg_gossip");
126: }
127:
128: str = props.getProperty("stability_delay");
129: if (str != null) {
130: stability_delay = Long.parseLong(str);
131: props.remove("stability_delay");
132: }
133:
134: str = props.getProperty("max_gossip_runs");
135: if (str != null) {
136: props.remove("max_gossip_runs");
137: log
138: .error("max_gossip_runs has been deprecated and will be ignored");
139: }
140:
141: str = props.getProperty("max_bytes");
142: if (str != null) {
143: max_bytes = Long.parseLong(str);
144: props.remove("max_bytes");
145: }
146:
147: str = props.getProperty("max_suspend_time");
148: if (str != null) {
149: log
150: .error("max_suspend_time is not supported any longer; please remove it (ignoring it)");
151: props.remove("max_suspend_time");
152: }
153:
154: if (props.size() > 0) {
155: log.error("these properties are not recognized: " + props);
156:
157: return false;
158: }
159: return true;
160: }
161:
162: private void suspend(long timeout) {
163: if (!suspended) {
164: suspended = true;
165: if (log.isDebugEnabled())
166: log.debug("suspending message garbage collection");
167: }
168: startResumeTask(timeout); // will not start task if already running
169: }
170:
171: private void resume() {
172: resetDigest(mbrs); // start from scratch
173: suspended = false;
174: if (log.isDebugEnabled())
175: log.debug("resuming message garbage collection");
176: stopResumeTask();
177: }
178:
179: public void start() throws Exception {
180: if (stack != null && stack.timer != null)
181: timer = stack.timer;
182: else
183: throw new Exception(
184: "timer cannot be retrieved from protocol stack");
185: if (desired_avg_gossip > 0)
186: startStableTask();
187: }
188:
189: public void stop() {
190: stopStableTask();
191: clearDigest();
192: }
193:
194: public void up(Event evt) {
195: Message msg;
196: StableHeader hdr;
197: int type = evt.getType();
198:
199: switch (type) {
200:
201: case Event.MSG:
202: msg = (Message) evt.getArg();
203:
204: // only if message counting is enabled, and only for multicast messages
205: // fixes http://jira.jboss.com/jira/browse/JGRP-233
206: if (max_bytes > 0) {
207: Address dest = msg.getDest();
208: if (dest == null || dest.isMulticastAddress()) {
209: num_bytes_received += (long) Math.max(msg
210: .getLength(), 24);
211: if (num_bytes_received >= max_bytes) {
212: if (log.isTraceEnabled()) {
213: log.trace(new StringBuffer(
214: "max_bytes has been reached (")
215: .append(max_bytes).append(
216: ", bytes received=")
217: .append(num_bytes_received).append(
218: "): triggers stable msg"));
219: }
220: num_bytes_received = 0;
221: // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest)
222: passDown(new Event(Event.GET_DIGEST_STABLE));
223: }
224: }
225: }
226:
227: hdr = (StableHeader) msg.removeHeader(name);
228: if (hdr == null)
229: break;
230: switch (hdr.type) {
231: case StableHeader.STABLE_GOSSIP:
232: handleStableMessage(msg.getSrc(), hdr.stableDigest);
233: break;
234: case StableHeader.STABILITY:
235: handleStabilityMessage(hdr.stableDigest, msg.getSrc());
236: break;
237: default:
238: if (log.isErrorEnabled())
239: log.error("StableHeader type " + hdr.type
240: + " not known");
241: }
242: return; // don't pass STABLE or STABILITY messages up the stack
243:
244: case Event.GET_DIGEST_STABLE_OK:
245: Digest d = (Digest) evt.getArg();
246: synchronized (latest_local_digest) {
247: latest_local_digest.replace(d);
248: }
249: if (log.isTraceEnabled())
250: log.trace("setting latest_local_digest from NAKACK: "
251: + d.printHighSeqnos());
252: sendStableMessage(d);
253: break;
254:
255: case Event.VIEW_CHANGE:
256: View view = (View) evt.getArg();
257: handleViewChange(view);
258: break;
259:
260: case Event.SET_LOCAL_ADDRESS:
261: local_addr = (Address) evt.getArg();
262: break;
263: }
264: passUp(evt);
265: }
266:
267: public void down(Event evt) {
268: switch (evt.getType()) {
269: case Event.VIEW_CHANGE:
270: View v = (View) evt.getArg();
271: handleViewChange(v);
272: break;
273:
274: case Event.SUSPEND_STABLE:
275: long timeout = 0;
276: Object t = evt.getArg();
277: if (t != null && t instanceof Long)
278: timeout = ((Long) t).longValue();
279: suspend(timeout);
280: break;
281:
282: case Event.RESUME_STABLE:
283: resume();
284: break;
285: }
286: passDown(evt);
287: }
288:
289: public void runMessageGarbageCollection() {
290: Digest copy;
291: synchronized (digest) {
292: copy = digest.copy();
293: }
294: sendStableMessage(copy);
295: }
296:
297: /* --------------------------------------- Private Methods ---------------------------------------- */
298:
299: private void handleViewChange(View v) {
300: Vector tmp = v.getMembers();
301: mbrs.clear();
302: mbrs.addAll(tmp);
303: adjustSenders(digest, tmp);
304: adjustSenders(latest_local_digest, tmp);
305: resetDigest(tmp);
306: if (!initialized)
307: initialized = true;
308: }
309:
310: /** Digest and members are guaranteed to be non-null */
311: private static void adjustSenders(Digest d, Vector members) {
312: synchronized (d) {
313: // 1. remove all members from digest who are not in the view
314: Iterator it = d.senders.keySet().iterator();
315: Address mbr;
316: while (it.hasNext()) {
317: mbr = (Address) it.next();
318: if (!members.contains(mbr))
319: it.remove();
320: }
321: // 2. add members to digest which are in the new view but not in the digest
322: for (int i = 0; i < members.size(); i++) {
323: mbr = (Address) members.get(i);
324: if (!d.contains(mbr))
325: d.add(mbr, -1, -1);
326: }
327: }
328: }
329:
330: private void clearDigest() {
331: synchronized (digest) {
332: digest.clear();
333: }
334: }
335:
336: /** Update my own digest from a digest received by somebody else. Returns whether the update was successful.
337: * Needs to be called with a lock on digest */
338: private boolean updateLocalDigest(Digest d, Address sender) {
339: if (d == null || d.size() == 0)
340: return false;
341:
342: if (!initialized) {
343: if (log.isTraceEnabled())
344: log
345: .trace("STABLE message will not be handled as I'm not yet initialized");
346: return false;
347: }
348:
349: if (!digest.sameSenders(d)) {
350: if (log.isTraceEnabled())
351: log
352: .trace(new StringBuffer("received a digest ")
353: .append(d.printHighSeqnos())
354: .append(" from ")
355: .append(sender)
356: .append(
357: " which has different members than mine (")
358: .append(digest.printHighSeqnos())
359: .append(
360: "), discarding it and resetting heard_from list"));
361: // to avoid sending incorrect stability/stable msgs, we simply reset our heard_from list, see DESIGN
362: resetDigest(mbrs);
363: return false;
364: }
365:
366: StringBuffer sb = null;
367: if (log.isTraceEnabled())
368: sb = new StringBuffer("my [").append(local_addr).append(
369: "] digest before: ").append(digest).append(
370: "\ndigest from ").append(sender).append(": ")
371: .append(d);
372: Address mbr;
373: long highest_seqno, my_highest_seqno, new_highest_seqno;
374: long highest_seen_seqno, my_highest_seen_seqno, new_highest_seen_seqno;
375: Map.Entry entry;
376: org.jgroups.protocols.pbcast.Digest.Entry val;
377: for (Iterator it = d.senders.entrySet().iterator(); it
378: .hasNext();) {
379: entry = (Map.Entry) it.next();
380: mbr = (Address) entry.getKey();
381: val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
382: .getValue();
383: highest_seqno = val.high_seqno;
384: highest_seen_seqno = val.high_seqno_seen;
385:
386: // compute the minimum of the highest seqnos deliverable (for garbage collection)
387: my_highest_seqno = digest.highSeqnoAt(mbr);
388: // compute the maximum of the highest seqnos seen (for retransmission of last missing message)
389: my_highest_seen_seqno = digest.highSeqnoSeenAt(mbr);
390:
391: new_highest_seqno = Math.min(my_highest_seqno,
392: highest_seqno);
393: new_highest_seen_seqno = Math.max(my_highest_seen_seqno,
394: highest_seen_seqno);
395: digest.setHighestDeliveredAndSeenSeqnos(mbr,
396: new_highest_seqno, new_highest_seen_seqno);
397: }
398: if (log.isTraceEnabled()) {
399: sb.append("\nmy [").append(local_addr).append(
400: "] digest after: ").append(digest).append("\n");
401: log.trace(sb);
402: }
403: return true;
404: }
405:
406: private void resetDigest(Vector new_members) {
407: if (new_members == null || new_members.size() == 0)
408: return;
409: synchronized (heard_from) {
410: heard_from.clear();
411: heard_from.addAll(new_members);
412: }
413:
414: Digest copy_of_latest;
415: synchronized (latest_local_digest) {
416: copy_of_latest = latest_local_digest.copy();
417: }
418: synchronized (digest) {
419: digest.replace(copy_of_latest);
420: if (log.isTraceEnabled())
421: log.trace("resetting digest from NAKACK: "
422: + copy_of_latest.printHighSeqnos());
423: }
424: }
425:
426: /**
427: * Removes mbr from heard_from and returns true if this was the last member, otherwise false.
428: * Resets the heard_from list (populates with membership)
429: * @param mbr
430: */
431: private boolean removeFromHeardFromList(Address mbr) {
432: synchronized (heard_from) {
433: heard_from.remove(mbr);
434: if (heard_from.size() == 0) {
435: resetDigest(this .mbrs);
436: return true;
437: }
438: }
439: return false;
440: }
441:
442: void startStableTask() {
443: // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case
444: // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss
445: // 1 cycle: on the next message or view, we will start the task
446: if (stable_task != null)
447: return;
448: synchronized (stable_task_mutex) {
449: if (stable_task != null && stable_task.running()) {
450: return; // already running
451: }
452: stable_task = new StableTask();
453: timer.add(stable_task, true); // fixed-rate scheduling
454: }
455: if (log.isTraceEnabled())
456: log.trace("stable task started");
457: }
458:
459: void stopStableTask() {
460: // contrary to startStableTask(), we don't need double-checked locking here because this method is not
461: // called frequently
462: synchronized (stable_task_mutex) {
463: if (stable_task != null) {
464: stable_task.stop();
465: stable_task = null;
466: }
467: }
468: }
469:
470: void startResumeTask(long max_suspend_time) {
471: max_suspend_time = (long) (max_suspend_time * 1.1); // little slack
472: if (max_suspend_time <= 0)
473: max_suspend_time = MAX_SUSPEND_TIME;
474:
475: synchronized (resume_task_mutex) {
476: if (resume_task != null && resume_task.running()) {
477: return; // already running
478: } else {
479: resume_task = new ResumeTask(max_suspend_time);
480: timer.add(resume_task, true); // fixed-rate scheduling
481: }
482: }
483: if (log.isDebugEnabled())
484: log.debug("resume task started, max_suspend_time="
485: + max_suspend_time);
486: }
487:
488: void stopResumeTask() {
489: synchronized (resume_task_mutex) {
490: if (resume_task != null) {
491: resume_task.stop();
492: resume_task = null;
493: }
494: }
495: }
496:
497: void startStabilityTask(Digest d, long delay) {
498: synchronized (stability_mutex) {
499: if (stability_task != null && stability_task.running()) {
500: } else {
501: stability_task = new StabilitySendTask(d, delay); // runs only once
502: timer.add(stability_task, true);
503: }
504: }
505: }
506:
507: void stopStabilityTask() {
508: synchronized (stability_mutex) {
509: if (stability_task != null) {
510: stability_task.stop();
511: stability_task = null;
512: }
513: }
514: }
515:
516: /**
517: Digest d contains (a) the highest seqnos <em>deliverable</em> for each sender and (b) the highest seqnos
518: <em>seen</em> for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest
519: seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability
520: message, which results in garbage collection of messages lower than the ones in the stability vector. The
521: maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN
522: for details).
523: */
524: private void handleStableMessage(Address sender, Digest d) {
525: if (d == null || sender == null) {
526: if (log.isErrorEnabled())
527: log.error("digest or sender is null");
528: return;
529: }
530:
531: if (!initialized) {
532: if (log.isTraceEnabled())
533: log
534: .trace("STABLE message will not be handled as I'm not yet initialized");
535: return;
536: }
537:
538: if (suspended) {
539: if (log.isTraceEnabled())
540: log
541: .trace("STABLE message will not be handled as I'm suspended");
542: return;
543: }
544:
545: if (log.isTraceEnabled())
546: log.trace(new StringBuffer("received stable msg from ")
547: .append(sender).append(": ").append(
548: d.printHighSeqnos()));
549: if (!heard_from.contains(sender)) { // already received gossip from sender; discard it
550: if (log.isTraceEnabled())
551: log.trace("already received stable msg from " + sender);
552: return;
553: }
554:
555: Digest copy;
556: synchronized (digest) {
557: boolean success = updateLocalDigest(d, sender);
558: if (!success) // we can only remove the sender from heard_from if *all* elements of my digest were updated
559: return;
560: copy = digest.copy();
561: }
562:
563: boolean was_last = removeFromHeardFromList(sender);
564: if (was_last) {
565: sendStabilityMessage(copy);
566: }
567: }
568:
569: /**
570: * Bcasts a STABLE message of the current digest to all members. Message contains highest seqnos of all members
571: * seen by this member. Highest seqnos are retrieved from the NAKACK layer below.
572: * @param d A <em>copy</em> of this.digest
573: */
574: private void sendStableMessage(Digest d) {
575: if (suspended) {
576: if (log.isTraceEnabled())
577: log
578: .trace("will not send STABLE message as I'm suspended");
579: return;
580: }
581:
582: if (d != null && d.size() > 0) {
583: if (log.isTraceEnabled())
584: log.trace("sending stable msg " + d.printHighSeqnos());
585: Message msg = new Message(); // mcast message
586: StableHeader hdr = new StableHeader(
587: StableHeader.STABLE_GOSSIP, d);
588: msg.putHeader(name, hdr);
589: num_gossips++;
590: passDown(new Event(Event.MSG, msg));
591: }
592: }
593:
594: /**
595: Schedules a stability message to be mcast after a random number of milliseconds (range 1-5 secs).
596: The reason for waiting a random amount of time is that, in the worst case, all members receive a
597: STABLE_GOSSIP message from the last outstanding member at the same time and would therefore mcast the
598: STABILITY message at the same time too. To avoid this, each member waits random N msecs. If, before N
599: elapses, some other member sent the STABILITY message, we just cancel our own message. If, during
600: waiting for N msecs to send STABILITY message S1, another STABILITY message S2 is to be sent, we just
601: discard S2.
602: @param tmp A copy of te stability digest, so we don't need to copy it again
603: */
604: void sendStabilityMessage(Digest tmp) {
605: long delay;
606:
607: if (suspended) {
608: if (log.isTraceEnabled())
609: log
610: .trace("STABILITY message will not be sent as I'm suspended");
611: return;
612: }
613:
614: // give other members a chance to mcast STABILITY message. if we receive STABILITY by the end of
615: // our random sleep, we will not send the STABILITY msg. this prevents that all mbrs mcast a
616: // STABILITY msg at the same time
617: delay = Util.random(stability_delay);
618: startStabilityTask(tmp, delay);
619: }
620:
621: void handleStabilityMessage(Digest d, Address sender) {
622: if (d == null) {
623: if (log.isErrorEnabled())
624: log.error("stability digest is null");
625: return;
626: }
627:
628: if (!initialized) {
629: if (log.isTraceEnabled())
630: log
631: .trace("STABLE message will not be handled as I'm not yet initialized");
632: return;
633: }
634:
635: if (suspended) {
636: if (log.isDebugEnabled()) {
637: log
638: .debug("stability message will not be handled as I'm suspended");
639: }
640: return;
641: }
642:
643: if (log.isTraceEnabled())
644: log.trace(new StringBuffer("received stability msg from ")
645: .append(sender).append(": ").append(
646: d.printHighSeqnos()));
647: stopStabilityTask();
648:
649: // we won't handle the gossip d, if d's members don't match the membership in my own digest,
650: // this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
651: if (!this .digest.sameSenders(d)) {
652: if (log.isDebugEnabled()) {
653: log
654: .debug("received digest (digest="
655: + d
656: + ") which does not match my own digest ("
657: + this .digest
658: + "): ignoring digest and re-initializing own digest");
659: }
660: return;
661: }
662:
663: resetDigest(mbrs);
664:
665: // pass STABLE event down the stack, so NAKACK can garbage collect old messages
666: passDown(new Event(Event.STABLE, d));
667: }
668:
669: /* ------------------------------------End of Private Methods ------------------------------------- */
670:
671: public static class StableHeader extends Header implements
672: Streamable {
673: public static final int STABLE_GOSSIP = 1;
674: public static final int STABILITY = 2;
675:
676: int type = 0;
677: // Digest digest=new Digest(); // used for both STABLE_GOSSIP and STABILITY message
678: Digest stableDigest = null; // changed by Bela April 4 2004
679:
680: public StableHeader() {
681: } // used for externalizable
682:
683: public StableHeader(int type, Digest digest) {
684: this .type = type;
685: this .stableDigest = digest;
686: }
687:
688: static String type2String(int t) {
689: switch (t) {
690: case STABLE_GOSSIP:
691: return "STABLE_GOSSIP";
692: case STABILITY:
693: return "STABILITY";
694: default:
695: return "<unknown>";
696: }
697: }
698:
699: public String toString() {
700: StringBuffer sb = new StringBuffer();
701: sb.append('[');
702: sb.append(type2String(type));
703: sb.append("]: digest is ");
704: sb.append(stableDigest);
705: return sb.toString();
706: }
707:
708: public void writeExternal(ObjectOutput out) throws IOException {
709: out.writeInt(type);
710: if (stableDigest == null) {
711: out.writeBoolean(false);
712: return;
713: }
714: out.writeBoolean(true);
715: stableDigest.writeExternal(out);
716: }
717:
718: public void readExternal(ObjectInput in) throws IOException,
719: ClassNotFoundException {
720: type = in.readInt();
721: boolean digest_not_null = in.readBoolean();
722: if (digest_not_null) {
723: stableDigest = new Digest();
724: stableDigest.readExternal(in);
725: }
726: }
727:
728: public long size() {
729: long retval = Global.INT_SIZE + Global.BYTE_SIZE; // type + presence for digest
730: if (stableDigest != null)
731: retval += stableDigest.serializedSize();
732: return retval;
733: }
734:
735: public void writeTo(DataOutputStream out) throws IOException {
736: out.writeInt(type);
737: Util.writeStreamable(stableDigest, out);
738: }
739:
740: public void readFrom(DataInputStream in) throws IOException,
741: IllegalAccessException, InstantiationException {
742: type = in.readInt();
743: stableDigest = (Digest) Util.readStreamable(Digest.class,
744: in);
745: }
746:
747: }
748:
749: /**
750: Mcast periodic STABLE message. Interval between sends varies. Terminates after num_gossip_runs is 0.
751: However, UP or DOWN messages will reset num_gossip_runs to max_gossip_runs. This has the effect that the
752: stable_send task terminates only after a period of time within which no messages were either sent
753: or received
754: */
755: private class StableTask implements TimeScheduler.Task {
756: boolean stopped = false;
757:
758: public void stop() {
759: stopped = true;
760: }
761:
762: public boolean running() { // syntactic sugar
763: return !stopped;
764: }
765:
766: public boolean cancelled() {
767: return stopped;
768: }
769:
770: public long nextInterval() {
771: long interval = computeSleepTime();
772: if (interval <= 0)
773: return 10000;
774: else
775: return interval;
776: }
777:
778: public void run() {
779: if (suspended) {
780: if (log.isTraceEnabled())
781: log.trace("stable task will not run as suspended="
782: + suspended);
783: return;
784: }
785:
786: // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest)
787: passDown(new Event(Event.GET_DIGEST_STABLE));
788: }
789:
790: long computeSleepTime() {
791: return getRandom((mbrs.size() * desired_avg_gossip * 2));
792: }
793:
794: long getRandom(long range) {
795: return (long) ((Math.random() * range) % range);
796: }
797: }
798:
799: /**
800: * Multicasts a STABILITY message.
801: */
802: private class StabilitySendTask implements TimeScheduler.Task {
803: Digest d = null;
804: boolean stopped = false;
805: long delay = 2000;
806:
807: StabilitySendTask(Digest d, long delay) {
808: this .d = d;
809: this .delay = delay;
810: }
811:
812: public boolean running() {
813: return !stopped;
814: }
815:
816: public void stop() {
817: stopped = true;
818: }
819:
820: public boolean cancelled() {
821: return stopped;
822: }
823:
824: /** wait a random number of msecs (to give other a chance to send the STABILITY msg first) */
825: public long nextInterval() {
826: return delay;
827: }
828:
829: public void run() {
830: Message msg;
831: StableHeader hdr;
832:
833: if (suspended) {
834: if (log.isDebugEnabled()) {
835: log
836: .debug("STABILITY message will not be sent as suspended="
837: + suspended);
838: }
839: stopped = true;
840: return;
841: }
842:
843: if (d != null && !stopped) {
844: msg = new Message();
845: hdr = new StableHeader(StableHeader.STABILITY, d);
846: msg.putHeader(STABLE.name, hdr);
847: if (log.isTraceEnabled())
848: log.trace("sending stability msg "
849: + d.printHighSeqnos());
850: passDown(new Event(Event.MSG, msg));
851: d = null;
852: }
853: stopped = true; // run only once
854: }
855: }
856:
857: private class ResumeTask implements TimeScheduler.Task {
858: boolean running = true;
859: long max_suspend_time = 0;
860:
861: ResumeTask(long max_suspend_time) {
862: this .max_suspend_time = max_suspend_time;
863: }
864:
865: void stop() {
866: running = false;
867: }
868:
869: public boolean running() {
870: return running;
871: }
872:
873: public boolean cancelled() {
874: return running == false;
875: }
876:
877: public long nextInterval() {
878: return max_suspend_time;
879: }
880:
881: public void run() {
882: if (suspended)
883: log
884: .warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; "
885: + "check why this event was not received (or increase max_suspend_time for large state transfers)");
886: resume();
887: }
888: }
889:
890: }
|