01: package org.jgroups.protocols;
02:
03: import org.jgroups.Address;
04: import org.jgroups.Event;
05: import org.jgroups.Message;
06: import org.jgroups.stack.Protocol;
07:
08: import java.util.Vector;
09:
10: /**
11: * Title: Flow control layer
12: * Description: This layer limits the number of sent messages without a receive of an own message to MAXSENTMSGS,
13: * just put this layer above GMS and you will get a more
14: * Copyright: Copyright (c) 2000
15: * Company: Computer Network Laboratory
16: * @author Gianluca Collot
17: * @version 1.0
18: */
19: public class FLOWCONTROL extends Protocol {
20:
21: final Vector queuedMsgs = new Vector();
22: int sentMsgs = 0;
23: static final int MAXSENTMSGS = 1;
24: Address myAddr;
25:
26: public FLOWCONTROL() {
27: }
28:
29: public String getName() {
30: return "FLOWCONTROL";
31: }
32:
33: /**
34: * Checs if up messages are from myaddr and in the case sends down queued messages or
35: * decremnts sentMsgs if there are no queued messages
36: */
37: public void up(Event evt) {
38: Message msg;
39: switch (evt.getType()) {
40: case Event.SET_LOCAL_ADDRESS:
41: myAddr = (Address) evt.getArg();
42: break;
43:
44: case Event.MSG:
45: msg = (Message) evt.getArg();
46: if (log.isDebugEnabled())
47: log.debug("Message received");
48: if (msg.getSrc().equals(myAddr)) {
49: if (queuedMsgs.size() > 0) {
50: if (log.isDebugEnabled())
51: log
52: .debug("Message from me received - Queue size was "
53: + queuedMsgs.size());
54: passDown((Event) queuedMsgs.remove(0));
55: } else {
56: if (log.isDebugEnabled())
57: log
58: .debug("Message from me received - No messages in queue");
59: sentMsgs--;
60: }
61: }
62: }
63: passUp(evt);
64: }
65:
66: /**
67: * Checs if it can send the message, else puts the message in the queue.
68: */
69: public void down(Event evt) {
70: Message msg;
71: if (evt.getType() == Event.MSG) {
72: msg = (Message) evt.getArg();
73: if ((msg.getDest() == null)
74: || (msg.getDest().equals(myAddr))) {
75: if (sentMsgs < MAXSENTMSGS) {
76: sentMsgs++;
77: if (log.isDebugEnabled())
78: log.debug("Message " + sentMsgs + " sent");
79: } else {
80: queuedMsgs.add(evt); //queues message (we add the event to avoid creating a new event to send the message)
81: return;
82: }
83: }
84: }
85: passDown(evt);
86: }
87:
88: }
|