001: // $Id: FLOW_CONTROL.java,v 1.11.6.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.Address;
006: import org.jgroups.Event;
007: import org.jgroups.Message;
008: import org.jgroups.blocks.GroupRequest;
009: import org.jgroups.stack.MessageProtocol;
010: import org.jgroups.util.ReusableThread;
011: import org.jgroups.util.RspList;
012: import org.jgroups.util.Util;
013:
014: import java.io.Serializable;
015: import java.util.HashMap;
016: import java.util.Properties;
017:
018: /**
019: * FLOW_CONTROL provides end-end congestion control and flow control.
020: * Attempts to maximize through put, by minimizing the
021: * possible block times(Forward flow control). Initially, sender starts with a smaller
022: * window size <code> W</code> and large expected RTT <code>grpRTT</code>. Sender also
023: * keeps a margin in the window size. When the margin is hit, insted of waiting for the
024: * window size to be exhausted, sender multicasts a FLOW_CONTROL info request message.
025: * If the window size is exhausted before the responses are received, send will be blocked.
026: * FCInfo(flow control info) from all the receivers is gathered at the sender, and current RTT
027: * is computed. If the current RTT is greater than estimated RTT window size and margin are reduced,
028: * otherwise they are increased.
029: * <p>
030: * Horizontal interaction is initiated by the sender with the other group members.
031: * <p>
032: * <em>Note: A reliable transport layer is required for this protocol to function properly.</em>
033: * With little effort this can be made completely independent.
034: * <p>
035: * todo Handle view changes (e.g., members {A,B,C}, blocked on C, and C crashes --> unblock).
036: * <br> Also block on down() instead of sending BLOCK_SEND. // done, bela April 28 2006
037: *
038: * @author Ananda Bollu
039: */
040:
041: public class FLOW_CONTROL extends MessageProtocol implements Runnable {
042: private int _numMSGsSentThisPeriod = 0;
043: private static final String FLOW_CONTROL = "FLOW_CONTROL";
044: private final HashMap _rcvdMSGCounter = new HashMap();
045:
046: private int _windowSize = 1000;
047: private int _fwdMarginSize = 200;
048: private int _estimatedRTT = 100000;
049: private boolean waitingForResponse = false;
050: private final ReusableThread _reusableThread;
051: private double RTT_WEIGHT = 0.125;
052: private int _msgsSentAfterFCreq = 0;
053: private final double TIME_OUT_FACTOR = 0.25;//if resp not received from more than n*TIME_OUT_INCREMENT_FACTOR
054: private final double TIME_OUT_INCR_MULT = 1.25;
055: private double WINDOW_SIZE_REDUCTION = 0.75;
056: private double WINDOW_SIZE_EXPANSION = 1.25;
057: private boolean isBlockState = false;
058:
059: private final Object block_sending = new Object();
060:
061: private int _windowsize_cap = 1000000; //initial window size can not be more than 10^6 messages.
062:
063: public FLOW_CONTROL() {
064: _reusableThread = new ReusableThread(FLOW_CONTROL);
065: }
066:
067: public String getName() {
068: return FLOW_CONTROL;
069: }
070:
071: /**
072: * If Event.MSG type is received count is incremented by one,
073: * and message is passed to the down_prot. At some point,
074: * based on the algorithm(FLOW_CONTROL protocol definition)
075: * data collection sequence is started. This is done by each
076: * member in SENDER role when _numMSGsSentThisPeriod hits the margin.
077: * Before rsp arrives only _fwdMarginSize number of messages can be sent,
078: * and then sender will be blocked.
079: */
080: public boolean handleDownEvent(Event evt) {
081: if (evt.getType() == Event.MSG) {
082: _numMSGsSentThisPeriod++;
083: if ((_numMSGsSentThisPeriod > (_windowSize - _fwdMarginSize))
084: && !waitingForResponse) {
085: waitingForResponse = true;
086: //wait for the previous request to return.before assigning a new task.
087: _reusableThread.waitUntilDone();
088: _reusableThread.assignTask(this );
089: }
090: if (waitingForResponse) {
091: _msgsSentAfterFCreq++;
092: if ((_msgsSentAfterFCreq >= _fwdMarginSize)
093: && !isBlockState) {
094:
095: if (log.isInfoEnabled())
096: log.info("ACTION BLOCK");
097: log.error("0;" + System.currentTimeMillis() + ';'
098: + _windowSize);
099: synchronized (block_sending) {
100: isBlockState = true;
101: while (isBlockState) {
102: try {
103: block_sending.wait();
104: } catch (InterruptedException e) {
105: }
106: }
107: }
108: }
109: }
110: }
111: return true;
112: }
113:
114: /**
115: * If Event.MSG type is received message, number of received
116: * messages from the sender is incremented. And the message is
117: * passed up the stack.
118: */
119: public boolean handleUpEvent(Event evt) {
120: if (evt.getType() == Event.MSG) {
121: Message msg = (Message) evt.getArg();
122: Address src = msg.getSrc();
123: FCInfo fcForSrc = (FCInfo) _rcvdMSGCounter.get(src);
124: if (fcForSrc == null) {
125: fcForSrc = new FCInfo();
126: _rcvdMSGCounter.put(src, fcForSrc);
127: }
128: fcForSrc.increment(1);
129:
130: if (log.isInfoEnabled())
131: log.info("message (" + fcForSrc.getRcvdMSGCount()
132: + ") received from " + src);
133: }
134: return true;
135: }
136:
137: /**
138: * Called when a request for this protocol layer is received.
139: * Processes and return value is sent back in the reply.
140: * FLOW_CONTROL protocol of all members gets this message(including sender?)
141: *
142: * @return Object containing FC information for sender with senderID.
143: * <b>Callback</b>. Called when a request for this protocol layer is received.
144: */
145: public Object handle(Message req) {
146: Address src = req.getSrc();
147: Long resp = new Long(((FCInfo) _rcvdMSGCounter.get(src))
148: .getRcvdMSGCount());
149:
150: if (log.isInfoEnabled())
151: log.info("Reqest came from " + src + " Prepared response "
152: + resp);
153: return resp;
154: }
155:
156: /**
157: * FCInfo request must be submitted in a different thread.
158: * handleDownEvent() can still be called to send messages
159: * while waiting for FCInfo from receivers. usually takes
160: * RTT.
161: */
162: public void run() {
163:
164: if (log.isInfoEnabled())
165: log.info("--- hit the _fwdMargin. Remaining size "
166: + _fwdMarginSize);
167: reqFCInfo();
168: }
169:
170: /**
171: * Following parameters can be optionally supplied:
172: * <ul>
173: * <li>window size cap - <code>int</code> Limits the window size to a reasonable value.
174: * <li>window size - <code>int</code> these many number of messages are sent before a block could happen
175: * <li>forward margin -<code>int</code> a request for flow control information is sent when remaining window size hits this margin
176: * <li>RTT weight -<code>double</code> Max RTT in the group is calculated during each Flow control request. lower number assigns
177: * higher weight to current RTT in estimating RTT.
178: * <li>window size reduction factor -<code>double</code> When current RTT is greater than estimated RTT current window size
179: * is reduced by this multiple.
180: * <li>window size expansion factor -<code>double</code> When current RTT is less than estimated RTT window is incremented
181: * by this multiple.
182: * </ul>
183: *
184: * @see org.jgroups.stack.Protocol#setProperties(Properties)
185: */
186: public boolean setProperties(Properties props) {
187: String str = null;
188: String winsizekey = "window_size";
189: String fwdmrgnkey = "fwd_mrgn";
190: String rttweightkey = "rttweight";
191: String sizereductionkey = "reduction";
192: String sizeexpansionkey = "expansion";
193: String windowsizeCapKey = "window_size_cap";
194:
195: super .setProperties(props);
196: str = props.getProperty(windowsizeCapKey);
197: if (str != null) {
198: _windowsize_cap = Integer.parseInt(str);
199: props.remove(windowsizeCapKey);
200: }
201: str = props.getProperty(winsizekey);
202: if (str != null) {
203: _windowSize = Integer.parseInt(str);
204: if (_windowSize > _windowsize_cap)
205: _windowSize = _windowsize_cap;
206: props.remove(winsizekey);
207: }
208:
209: str = props.getProperty(fwdmrgnkey);
210: if (str != null) {
211: _fwdMarginSize = Integer.parseInt(str);
212: props.remove(fwdmrgnkey);
213: }
214:
215: str = props.getProperty(rttweightkey);
216: if (str != null) {
217: RTT_WEIGHT = Double.parseDouble(str);
218: props.remove(rttweightkey);
219: }
220:
221: str = props.getProperty(sizereductionkey);
222: if (str != null) {
223: WINDOW_SIZE_REDUCTION = Double.parseDouble(str);
224: props.remove(sizereductionkey);
225: }
226:
227: str = props.getProperty(sizeexpansionkey);
228: if (str != null) {
229: WINDOW_SIZE_EXPANSION = Double.parseDouble(str);
230: props.remove(sizeexpansionkey);
231: }
232:
233: if (props.size() > 0) {
234: log
235: .error("FLOW_CONTROL.setProperties(): the following properties are not recognized: "
236: + props);
237:
238: return false;
239: }
240: return true;
241:
242: }
243:
244: /*-----------private stuff ------*/
245:
246: private RspList reqFCInfo() {
247: RspList rspList = null;
248: long reqSentTime = 0, rspRcvdTime = 0;
249: try {
250: reqSentTime = System.currentTimeMillis();
251: //alternatively use _estimatedRTT for timeout.(timeout is the right way, but need to
252: //check the use cases.
253: rspList = castMessage(null, new Message(null, null, Util
254: .objectToByteBuffer(FLOW_CONTROL)),
255: GroupRequest.GET_ALL, 0);
256: rspRcvdTime = System.currentTimeMillis();
257: } catch (Exception ex) {
258: ex.printStackTrace();
259: }
260:
261: /*If NAKACK layer is present, if n+1 th message is FLOW_CONTROL Request, if responses are received
262: that means all n messages sent earlier are received(?), ignore NAK_ACK.
263: */
264: //ANALYSE RESPONSES
265: long currentRTT = rspRcvdTime - reqSentTime;
266:
267: if (currentRTT > _estimatedRTT) {
268: _windowSize = (int) (_windowSize * WINDOW_SIZE_REDUCTION);
269: _fwdMarginSize = (int) (_fwdMarginSize * WINDOW_SIZE_REDUCTION);
270: } else {
271: _windowSize = (int) (_windowSize * WINDOW_SIZE_EXPANSION);
272: if (_windowSize > _windowsize_cap)
273: _windowSize = _windowsize_cap;
274: _fwdMarginSize = (int) (_fwdMarginSize * WINDOW_SIZE_EXPANSION);
275: }
276:
277: _estimatedRTT = (int) ((RTT_WEIGHT * currentRTT) + (1.0 - RTT_WEIGHT)
278: * _estimatedRTT);
279:
280: //reset for new FLOW_CONTROL request period.
281: _numMSGsSentThisPeriod = 0;
282: waitingForResponse = false;
283: _msgsSentAfterFCreq = 0;
284:
285: if (isBlockState) {
286: if (log.isWarnEnabled())
287: log.warn("ACTION UNBLOCK");
288: log.error("1;" + System.currentTimeMillis() + ';'
289: + _windowSize);
290: synchronized (block_sending) {
291: isBlockState = false;
292: block_sending.notifyAll();
293: }
294: }
295:
296: if (log.isWarnEnabled())
297: log.warn("estimatedTimeout = " + _estimatedRTT);
298: if (log.isWarnEnabled())
299: log.warn("window size = " + _windowSize
300: + " forward margin size = " + _fwdMarginSize);
301:
302: return rspList;
303: }
304:
305: /* use this instead of Integer. */
306: private static class FCInfo implements Serializable {
307: int _curValue;
308: private static final long serialVersionUID = -8365016426836017979L;
309:
310: FCInfo() {
311: }
312:
313: public void increment(int i) {
314: _curValue += i;
315: }
316:
317: public int getRcvdMSGCount() {
318: return _curValue;
319: }
320:
321: public String toString() {
322: return Integer.toString(_curValue);
323: }
324: }
325:
326: }
|