001: //$Id: RingNodeFlowControl.java,v 1.4 2005/08/08 12:45:41 belaban Exp $
002:
003: package org.jgroups.protocols.ring;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007:
008: public class RingNodeFlowControl {
009: final int initialWindow;
010: final float windowReduceFactor;
011: final int belowThresholdAdvanceAmount;
012: final float aboveThresholdAdvanceAmount;
013: private int memberCount;
014: private int previousBacklog;
015: private int backlog;
016: protected final Log log = LogFactory.getLog(this .getClass());
017:
018: public RingNodeFlowControl(int initialWindow,
019: float windowReduceFactor, int belowThresholdAdvanceAmount,
020: float aboveThresholdAdvanceAmount) {
021: this .initialWindow = initialWindow;
022: this .windowReduceFactor = windowReduceFactor;
023: this .belowThresholdAdvanceAmount = belowThresholdAdvanceAmount;
024: this .aboveThresholdAdvanceAmount = aboveThresholdAdvanceAmount;
025: }
026:
027: public RingNodeFlowControl() {
028: this (20, 0.7F, 3, 1.0F);
029: }
030:
031: public void invalidate() {
032: previousBacklog = backlog = 0;
033: }
034:
035: public int getBacklog() {
036: return backlog;
037: }
038:
039: public void setBacklog(int backlog) {
040: if (backlog < 0)
041: throw new IllegalArgumentException(
042: "backlog value has to be positive");
043: this .backlog = backlog;
044: }
045:
046: public int getBacklogDifference() {
047: return backlog - previousBacklog;
048: }
049:
050: public int getPreviousBacklog() {
051: return previousBacklog;
052: }
053:
054: public void setPreviousBacklog() {
055: this .previousBacklog = backlog;
056: }
057:
058: public void viewChanged(int memberCount) {
059: this .memberCount = memberCount;
060: }
061:
062: public int getAllowedToBroadcast(RingToken token) {
063: int fairWindowShare = 0;
064: int windowSize = token.getWindowSize();
065: if (memberCount == 0)
066: memberCount = 1;
067: int maxMessages = (windowSize / memberCount);
068: if (maxMessages < 1)
069: maxMessages = 1;
070:
071: int backlogAverage = token.getBacklog() + backlog
072: - previousBacklog;
073: if (backlogAverage > 0) {
074: fairWindowShare = windowSize * backlog / backlogAverage;
075: }
076: fairWindowShare = (fairWindowShare < 1) ? 1 : fairWindowShare;
077:
078: int maxAllowed = windowSize
079: - token.getLastRoundBroadcastCount();
080: if (maxAllowed < 1)
081: maxAllowed = 0;
082:
083: if (log.isInfoEnabled())
084: log.info("fairWindowShare=" + fairWindowShare
085: + " maxMessages=" + maxMessages + " maxAllowed="
086: + maxAllowed);
087:
088: return (fairWindowShare < maxAllowed) ? Math.min(
089: fairWindowShare, maxMessages) : Math.min(maxAllowed,
090: maxMessages);
091: }
092:
093: public void updateWindow(RingToken token) {
094: int threshold = token.getWindowThreshold();
095: int window = token.getWindowSize();
096: if (window < initialWindow) {
097: window = initialWindow;
098: }
099:
100: boolean congested = (token.getRetransmissionRequests().size() > 0);
101:
102: if (congested) {
103: threshold = (int) (window * windowReduceFactor);
104: window = initialWindow;
105: } else {
106: if (window < threshold) {
107: window += belowThresholdAdvanceAmount;
108: } else {
109: window += aboveThresholdAdvanceAmount;
110: }
111: }
112: token.setWindowSize(window);
113: token.setWindowThreshold(threshold);
114: }
115:
116: }
|