0001: // $Id: NAKACK.java,v 1.81.2.1 2007/04/27 08:03:55 belaban Exp $
0002:
0003: package org.jgroups.protocols.pbcast;
0004:
0005: import org.jgroups.*;
0006: import org.jgroups.stack.NakReceiverWindow;
0007: import org.jgroups.stack.Protocol;
0008: import org.jgroups.stack.Retransmitter;
0009: import org.jgroups.util.*;
0010:
0011: import java.io.IOException;
0012: import java.util.*;
0013:
0014: /**
0015: * Negative AcKnowledgement layer (NAKs). Messages are assigned a monotonically increasing sequence number (seqno).
0016: * Receivers deliver messages ordered according to seqno and request retransmission of missing messages. Retransmitted
0017: * messages are bundled into bigger ones, e.g. when getting an xmit request for messages 1-10, instead of sending 10
0018: * unicast messages, we bundle all 10 messages into 1 and send it. However, since this protocol typically sits below
0019: * FRAG, we cannot count on FRAG to fragement/defragment the (possibly) large message into smaller ones. Therefore we
0020: * only bundle messages up to max_xmit_size bytes to prevent too large messages. For example, if the bundled message
0021: * size was a total of 34000 bytes, and max_xmit_size=16000, we'd send 3 messages: 2 16K and a 2K message. <em>Note that
0022: * max_xmit_size should be the same value as FRAG.frag_size (or smaller).</em><br/> Retransmit requests are always sent
0023: * to the sender. If the sender dies, and not everyone has received its messages, they will be lost. In the future, this
0024: * may be changed to have receivers store all messages, so that retransmit requests can be answered by any member.
0025: * Trivial to implement, but not done yet. For most apps, the default retransmit properties are sufficient, if not use
0026: * vsync.
0027: *
0028: * @author Bela Ban
0029: */
0030: public class NAKACK extends Protocol implements
0031: Retransmitter.RetransmitCommand, NakReceiverWindow.Listener {
0032: private long[] retransmit_timeout = { 600, 1200, 2400, 4800 }; // time(s) to wait before requesting retransmission
0033: private boolean is_server = false;
0034: private Address local_addr = null;
0035: private final Vector members = new Vector(11);
0036: private View view;
0037: private long seqno = -1; // current message sequence number (starts with 0)
0038: private long max_xmit_size = 8192; // max size of a retransmit message (otherwise send multiple)
0039: private int gc_lag = 20; // number of msgs garbage collection lags behind
0040:
0041: /**
0042: * Retransmit messages using multicast rather than unicast. This has the advantage that, if many receivers lost a
0043: * message, the sender only retransmits once.
0044: */
0045: private boolean use_mcast_xmit = true;
0046:
0047: /**
0048: * Ask a random member for retransmission of a missing message. If set to true, discard_delivered_msgs will be
0049: * set to false
0050: */
0051: private boolean xmit_from_random_member = false;
0052:
0053: /**
0054: * Messages that have been received in order are sent up the stack (= delivered to the application). Delivered
0055: * messages are removed from NakReceiverWindow.received_msgs and moved to NakReceiverWindow.delivered_msgs, where
0056: * they are later garbage collected (by STABLE). Since we do retransmits only from sent messages, never
0057: * received or delivered messages, we can turn the moving to delivered_msgs off, so we don't keep the message
0058: * around, and don't need to wait for garbage collection to remove them.
0059: */
0060: private boolean discard_delivered_msgs = false;
0061:
0062: /** If value is > 0, the retransmit buffer is bounded: only the max_xmit_buf_size latest messages are kept,
0063: * older ones are discarded when the buffer size is exceeded. A value <= 0 means unbounded buffers
0064: */
0065: private int max_xmit_buf_size = 0;
0066:
0067: /**
0068: * Hashtable<Address,NakReceiverWindow>. Stores received messages (keyed by sender). Note that this is no long term
0069: * storage; messages are just stored until they can be delivered (ie., until the correct FIFO order is established)
0070: */
0071: private final HashMap received_msgs = new HashMap(11);
0072:
0073: /** TreeMap<Long,Message>. Map of messages sent by me (keyed and sorted on sequence number) */
0074: private final TreeMap sent_msgs = new TreeMap();
0075:
0076: private boolean leaving = false;
0077: private boolean started = false;
0078: private TimeScheduler timer = null;
0079: private static final String name = "NAKACK";
0080:
0081: private long xmit_reqs_received;
0082: private long xmit_reqs_sent;
0083: private long xmit_rsps_received;
0084: private long xmit_rsps_sent;
0085: private long missing_msgs_received;
0086:
0087: /** Captures stats on XMIT_REQS, XMIT_RSPS per sender */
0088: private HashMap sent = new HashMap();
0089:
0090: /** Captures stats on XMIT_REQS, XMIT_RSPS per receiver */
0091: private HashMap received = new HashMap();
0092:
0093: private int stats_list_size = 20;
0094:
0095: /** BoundedList<XmitRequest>. Keeps track of the last stats_list_size XMIT requests */
0096: private BoundedList receive_history;
0097:
0098: /** BoundedList<MissingMessage>. Keeps track of the last stats_list_size missing messages received */
0099: private BoundedList send_history;
0100:
0101: public NAKACK() {
0102: }
0103:
0104: public String getName() {
0105: return name;
0106: }
0107:
0108: public long getXmitRequestsReceived() {
0109: return xmit_reqs_received;
0110: }
0111:
0112: public long getXmitRequestsSent() {
0113: return xmit_reqs_sent;
0114: }
0115:
0116: public long getXmitResponsesReceived() {
0117: return xmit_rsps_received;
0118: }
0119:
0120: public long getXmitResponsesSent() {
0121: return xmit_rsps_sent;
0122: }
0123:
0124: public long getMissingMessagesReceived() {
0125: return missing_msgs_received;
0126: }
0127:
0128: public int getPendingRetransmissionRequests() {
0129: int num = 0;
0130: NakReceiverWindow win;
0131: synchronized (received_msgs) {
0132: for (Iterator it = received_msgs.values().iterator(); it
0133: .hasNext();) {
0134: win = (NakReceiverWindow) it.next();
0135: num += win.size();
0136: }
0137: }
0138: return num;
0139: }
0140:
0141: public int getSentTableSize() {
0142: int size;
0143: synchronized (sent_msgs) {
0144: size = sent_msgs.size();
0145: }
0146: return size;
0147: }
0148:
0149: public int getReceivedTableSize() {
0150: int ret = 0;
0151: NakReceiverWindow win;
0152: Set s = new LinkedHashSet(received_msgs.values());
0153: for (Iterator it = s.iterator(); it.hasNext();) {
0154: win = (NakReceiverWindow) it.next();
0155: ret += win.size();
0156: }
0157: return ret;
0158: }
0159:
0160: public void resetStats() {
0161: xmit_reqs_received = xmit_reqs_sent = xmit_rsps_received = xmit_rsps_sent = missing_msgs_received = 0;
0162: sent.clear();
0163: received.clear();
0164: if (receive_history != null)
0165: receive_history.removeAll();
0166: if (send_history != null)
0167: send_history.removeAll();
0168: }
0169:
0170: public void init() throws Exception {
0171: if (stats) {
0172: send_history = new BoundedList(stats_list_size);
0173: receive_history = new BoundedList(stats_list_size);
0174: }
0175: }
0176:
0177: public int getGcLag() {
0178: return gc_lag;
0179: }
0180:
0181: public void setGcLag(int gc_lag) {
0182: this .gc_lag = gc_lag;
0183: }
0184:
0185: public boolean isUseMcastXmit() {
0186: return use_mcast_xmit;
0187: }
0188:
0189: public void setUseMcastXmit(boolean use_mcast_xmit) {
0190: this .use_mcast_xmit = use_mcast_xmit;
0191: }
0192:
0193: public boolean isXmitFromRandomMember() {
0194: return xmit_from_random_member;
0195: }
0196:
0197: public void setXmitFromRandomMember(boolean xmit_from_random_member) {
0198: this .xmit_from_random_member = xmit_from_random_member;
0199: }
0200:
0201: public boolean isDiscardDeliveredMsgs() {
0202: return discard_delivered_msgs;
0203: }
0204:
0205: public void setDiscardDeliveredMsgs(boolean discard_delivered_msgs) {
0206: this .discard_delivered_msgs = discard_delivered_msgs;
0207: }
0208:
0209: public int getMaxXmitBufSize() {
0210: return max_xmit_buf_size;
0211: }
0212:
0213: public void setMaxXmitBufSize(int max_xmit_buf_size) {
0214: this .max_xmit_buf_size = max_xmit_buf_size;
0215: }
0216:
0217: public long getMaxXmitSize() {
0218: return max_xmit_size;
0219: }
0220:
0221: public void setMaxXmitSize(long max_xmit_size) {
0222: this .max_xmit_size = max_xmit_size;
0223: }
0224:
0225: public boolean setProperties(Properties props) {
0226: String str;
0227: long[] tmp;
0228:
0229: super .setProperties(props);
0230: str = props.getProperty("retransmit_timeout");
0231: if (str != null) {
0232: tmp = Util.parseCommaDelimitedLongs(str);
0233: props.remove("retransmit_timeout");
0234: if (tmp != null && tmp.length > 0) {
0235: retransmit_timeout = tmp;
0236: }
0237: }
0238:
0239: str = props.getProperty("gc_lag");
0240: if (str != null) {
0241: gc_lag = Integer.parseInt(str);
0242: if (gc_lag < 0) {
0243: log
0244: .error("NAKACK.setProperties(): gc_lag cannot be negative, setting it to 0");
0245: }
0246: props.remove("gc_lag");
0247: }
0248:
0249: str = props.getProperty("max_xmit_size");
0250: if (str != null) {
0251: max_xmit_size = Long.parseLong(str);
0252: props.remove("max_xmit_size");
0253: }
0254:
0255: str = props.getProperty("use_mcast_xmit");
0256: if (str != null) {
0257: use_mcast_xmit = Boolean.valueOf(str).booleanValue();
0258: props.remove("use_mcast_xmit");
0259: }
0260:
0261: str = props.getProperty("discard_delivered_msgs");
0262: if (str != null) {
0263: discard_delivered_msgs = Boolean.valueOf(str)
0264: .booleanValue();
0265: props.remove("discard_delivered_msgs");
0266: }
0267:
0268: str = props.getProperty("xmit_from_random_member");
0269: if (str != null) {
0270: xmit_from_random_member = Boolean.valueOf(str)
0271: .booleanValue();
0272: props.remove("xmit_from_random_member");
0273: }
0274:
0275: str = props.getProperty("max_xmit_buf_size");
0276: if (str != null) {
0277: max_xmit_buf_size = Integer.parseInt(str);
0278: props.remove("max_xmit_buf_size");
0279: }
0280:
0281: str = props.getProperty("stats_list_size");
0282: if (str != null) {
0283: stats_list_size = Integer.parseInt(str);
0284: props.remove("stats_list_size");
0285: }
0286:
0287: if (xmit_from_random_member) {
0288: if (discard_delivered_msgs) {
0289: discard_delivered_msgs = false;
0290: log
0291: .warn("xmit_from_random_member set to true: changed discard_delivered_msgs to false");
0292: }
0293: }
0294:
0295: if (props.size() > 0) {
0296: log
0297: .error("NAKACK.setProperties(): these properties are not recognized: "
0298: + props);
0299:
0300: return false;
0301: }
0302: return true;
0303: }
0304:
0305: public Map dumpStats() {
0306: Map retval = super .dumpStats();
0307: if (retval == null)
0308: retval = new HashMap();
0309:
0310: retval.put("xmit_reqs_received", new Long(xmit_reqs_received));
0311: retval.put("xmit_reqs_sent", new Long(xmit_reqs_sent));
0312: retval.put("xmit_rsps_received", new Long(xmit_rsps_received));
0313: retval.put("xmit_rsps_sent", new Long(xmit_rsps_sent));
0314: retval.put("missing_msgs_received", new Long(
0315: missing_msgs_received));
0316:
0317: retval.put("sent_msgs", printSentMsgs());
0318:
0319: StringBuffer sb = new StringBuffer();
0320: Map.Entry entry;
0321: Address addr;
0322: Object w;
0323: synchronized (received_msgs) {
0324: for (Iterator it = received_msgs.entrySet().iterator(); it
0325: .hasNext();) {
0326: entry = (Map.Entry) it.next();
0327: addr = (Address) entry.getKey();
0328: w = entry.getValue();
0329: sb.append(addr).append(": ").append(w.toString())
0330: .append('\n');
0331: }
0332: }
0333:
0334: retval.put("received_msgs", sb.toString());
0335: return retval;
0336: }
0337:
0338: public String printStats() {
0339: Map.Entry entry;
0340: Object key, val;
0341: StringBuffer sb = new StringBuffer();
0342: sb.append("sent:\n");
0343: for (Iterator it = sent.entrySet().iterator(); it.hasNext();) {
0344: entry = (Map.Entry) it.next();
0345: key = entry.getKey();
0346: if (key == null)
0347: key = "<mcast dest>";
0348: val = entry.getValue();
0349: sb.append(key).append(": ").append(val).append("\n");
0350: }
0351: sb.append("\nreceived:\n");
0352: for (Iterator it = received.entrySet().iterator(); it.hasNext();) {
0353: entry = (Map.Entry) it.next();
0354: key = entry.getKey();
0355: val = entry.getValue();
0356: sb.append(key).append(": ").append(val).append("\n");
0357: }
0358:
0359: sb.append("\nXMIT_REQS sent:\n");
0360: XmitRequest tmp;
0361: for (Enumeration en = send_history.elements(); en
0362: .hasMoreElements();) {
0363: tmp = (XmitRequest) en.nextElement();
0364: sb.append(tmp).append("\n");
0365: }
0366:
0367: sb.append("\nMissing messages received\n");
0368: MissingMessage missing;
0369: for (Enumeration en = receive_history.elements(); en
0370: .hasMoreElements();) {
0371: missing = (MissingMessage) en.nextElement();
0372: sb.append(missing).append("\n");
0373: }
0374:
0375: return sb.toString();
0376: }
0377:
0378: public Vector providedUpServices() {
0379: Vector retval = new Vector(5);
0380: retval.addElement(new Integer(Event.GET_DIGEST));
0381: retval.addElement(new Integer(Event.GET_DIGEST_STABLE));
0382: retval.addElement(new Integer(Event.GET_DIGEST_STATE));
0383: retval.addElement(new Integer(Event.SET_DIGEST));
0384: retval.addElement(new Integer(Event.MERGE_DIGEST));
0385: return retval;
0386: }
0387:
0388: public Vector providedDownServices() {
0389: Vector retval = new Vector(2);
0390: retval.addElement(new Integer(Event.GET_DIGEST));
0391: retval.addElement(new Integer(Event.GET_DIGEST_STABLE));
0392: return retval;
0393: }
0394:
0395: public void start() throws Exception {
0396: timer = stack != null ? stack.timer : null;
0397: if (timer == null)
0398: throw new Exception("timer is null");
0399: started = true;
0400: }
0401:
0402: public void stop() {
0403: started = false;
0404: reset(); // clears sent_msgs and destroys all NakReceiverWindows
0405: }
0406:
0407: /**
0408: * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>passDown()</code> in this
0409: * method as the event is passed down by default by the superclass after this method returns !</b>
0410: */
0411: public void down(Event evt) {
0412: Digest digest;
0413: Vector mbrs;
0414:
0415: switch (evt.getType()) {
0416:
0417: case Event.MSG:
0418: Message msg = (Message) evt.getArg();
0419: Address dest = msg.getDest();
0420: if (dest != null && !dest.isMulticastAddress()) {
0421: break; // unicast address: not null and not mcast, pass down unchanged
0422: }
0423: send(evt, msg);
0424: return; // don't pass down the stack
0425:
0426: case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
0427: stable((Digest) evt.getArg());
0428: return; // do not pass down further (Bela Aug 7 2001)
0429:
0430: case Event.GET_DIGEST:
0431: digest = getDigest();
0432: passUp(new Event(Event.GET_DIGEST_OK,
0433: digest != null ? digest.copy() : null));
0434: return;
0435:
0436: case Event.GET_DIGEST_STABLE:
0437: digest = getDigestHighestDeliveredMsgs();
0438: passUp(new Event(Event.GET_DIGEST_STABLE_OK,
0439: digest != null ? digest.copy() : null));
0440: return;
0441:
0442: case Event.GET_DIGEST_STATE:
0443: digest = getDigest();
0444: passUp(new Event(Event.GET_DIGEST_STATE_OK,
0445: digest != null ? digest.copy() : null));
0446: return;
0447:
0448: case Event.SET_DIGEST:
0449: setDigest((Digest) evt.getArg());
0450: return;
0451:
0452: case Event.MERGE_DIGEST:
0453: mergeDigest((Digest) evt.getArg());
0454: return;
0455:
0456: case Event.CONFIG:
0457: passDown(evt);
0458: if (log.isDebugEnabled()) {
0459: log.debug("received CONFIG event: " + evt.getArg());
0460: }
0461: handleConfigEvent((HashMap) evt.getArg());
0462: return;
0463:
0464: case Event.TMP_VIEW:
0465: View tmp_view = (View) evt.getArg();
0466: mbrs = tmp_view.getMembers();
0467: members.clear();
0468: members.addAll(mbrs);
0469: adjustReceivers(false);
0470: break;
0471:
0472: case Event.VIEW_CHANGE:
0473: tmp_view = (View) evt.getArg();
0474: mbrs = tmp_view.getMembers();
0475: members.clear();
0476: members.addAll(mbrs);
0477: adjustReceivers(true);
0478: is_server = true; // check vids from now on
0479:
0480: Set tmp = new LinkedHashSet(members);
0481: tmp.add(null); // for null destination (= mcast)
0482: sent.keySet().retainAll(tmp);
0483: received.keySet().retainAll(tmp);
0484: view = tmp_view;
0485: break;
0486:
0487: case Event.BECOME_SERVER:
0488: is_server = true;
0489: break;
0490:
0491: case Event.DISCONNECT:
0492: leaving = true;
0493: reset();
0494: break;
0495: }
0496:
0497: passDown(evt);
0498: }
0499:
0500: /**
0501: * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>PassUp</code> in this
0502: * method as the event is passed up by default by the superclass after this method returns !</b>
0503: */
0504: public void up(Event evt) {
0505: NakAckHeader hdr;
0506: Message msg;
0507: Digest digest;
0508:
0509: switch (evt.getType()) {
0510:
0511: case Event.MSG:
0512: msg = (Message) evt.getArg();
0513: hdr = (NakAckHeader) msg.getHeader(name);
0514: if (hdr == null)
0515: break; // pass up (e.g. unicast msg)
0516:
0517: // discard messages while not yet server (i.e., until JOIN has returned)
0518: if (!is_server) {
0519: if (log.isTraceEnabled())
0520: log.trace("message was discarded (not yet server)");
0521: return;
0522: }
0523:
0524: // Changed by bela Jan 29 2003: we must not remove the header, otherwise
0525: // further xmit requests will fail !
0526: //hdr=(NakAckHeader)msg.removeHeader(getName());
0527:
0528: switch (hdr.type) {
0529:
0530: case NakAckHeader.MSG:
0531: handleMessage(msg, hdr);
0532: return; // transmitter passes message up for us !
0533:
0534: case NakAckHeader.XMIT_REQ:
0535: if (hdr.range == null) {
0536: if (log.isErrorEnabled()) {
0537: log
0538: .error("XMIT_REQ: range of xmit msg is null; discarding request from "
0539: + msg.getSrc());
0540: }
0541: return;
0542: }
0543: handleXmitReq(msg.getSrc(), hdr.range.low,
0544: hdr.range.high, hdr.sender);
0545: return;
0546:
0547: case NakAckHeader.XMIT_RSP:
0548: if (log.isTraceEnabled())
0549: log.trace("received missing messages " + hdr.range);
0550: handleXmitRsp(msg);
0551: return;
0552:
0553: default:
0554: if (log.isErrorEnabled()) {
0555: log.error("NakAck header type " + hdr.type
0556: + " not known !");
0557: }
0558: return;
0559: }
0560:
0561: case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
0562: stable((Digest) evt.getArg());
0563: return; // do not pass up further (Bela Aug 7 2001)
0564:
0565: case Event.GET_DIGEST:
0566: digest = getDigestHighestDeliveredMsgs();
0567: passDown(new Event(Event.GET_DIGEST_OK, digest));
0568: return;
0569:
0570: case Event.GET_DIGEST_STABLE:
0571: digest = getDigestHighestDeliveredMsgs();
0572: passDown(new Event(Event.GET_DIGEST_STABLE_OK, digest));
0573: return;
0574:
0575: case Event.SET_LOCAL_ADDRESS:
0576: local_addr = (Address) evt.getArg();
0577: break;
0578:
0579: case Event.CONFIG:
0580: passUp(evt);
0581: if (log.isDebugEnabled()) {
0582: log.debug("received CONFIG event: " + evt.getArg());
0583: }
0584: handleConfigEvent((HashMap) evt.getArg());
0585: return;
0586: }
0587: passUp(evt);
0588: }
0589:
0590: /* --------------------------------- Private Methods --------------------------------------- */
0591:
0592: /**
0593: * Adds the message to the sent_msgs table and then passes it down the stack. Change Bela Ban May 26 2002: we don't
0594: * store a copy of the message, but a reference ! This saves us a lot of memory. However, this also means that a
0595: * message should not be changed after storing it in the sent-table ! See protocols/DESIGN for details.
0596: * Made seqno increment and adding to sent_msgs atomic, e.g. seqno won't get incremented if adding to
0597: * sent_msgs fails e.g. due to an OOM (see http://jira.jboss.com/jira/browse/JGRP-179). bela Jan 13 2006
0598: */
0599: private void send(Event evt, Message msg) {
0600: if (msg == null)
0601: throw new NullPointerException("msg is null; event is "
0602: + evt);
0603:
0604: if (!started) {
0605: if (log.isWarnEnabled())
0606: log
0607: .warn("["
0608: + local_addr
0609: + "] discarded message as start() has not been called, message: "
0610: + msg);
0611: return;
0612: }
0613:
0614: long msg_id;
0615: synchronized (sent_msgs) {
0616: try { // incrementing seqno and adding the msg to sent_msgs needs to be atomic
0617: msg_id = seqno + 1;
0618: msg.putHeader(name, new NakAckHeader(NakAckHeader.MSG,
0619: msg_id));
0620: if (Global.copy) {
0621: sent_msgs.put(new Long(msg_id), msg.copy());
0622: } else {
0623: sent_msgs.put(new Long(msg_id), msg);
0624: }
0625: seqno = msg_id;
0626: } catch (Throwable t) {
0627: if (t instanceof Error)
0628: throw (Error) t;
0629: if (t instanceof RuntimeException)
0630: throw (RuntimeException) t;
0631: else {
0632: throw new RuntimeException("failure adding msg "
0633: + msg + " to the retransmit table", t);
0634: }
0635: }
0636: }
0637:
0638: try { // moved passDown() out of synchronized clause (bela Sept 7 2006) http://jira.jboss.com/jira/browse/JGRP-300
0639: if (log.isTraceEnabled())
0640: log.trace("sending " + local_addr + "#" + msg_id);
0641: passDown(evt); // if this fails, since msg is in sent_msgs, it can be retransmitted
0642: } catch (Throwable t) { // eat the exception, don't pass it up the stack
0643: if (log.isWarnEnabled()) {
0644: log.warn("failure passing message down", t);
0645: }
0646: }
0647: }
0648:
0649: /**
0650: * Finds the corresponding NakReceiverWindow and adds the message to it (according to seqno). Then removes as many
0651: * messages as possible from the NRW and passes them up the stack. Discards messages from non-members.
0652: */
0653: private void handleMessage(Message msg, NakAckHeader hdr) {
0654: NakReceiverWindow win;
0655: Message msg_to_deliver;
0656: Address sender = msg.getSrc();
0657:
0658: if (sender == null) {
0659: if (log.isErrorEnabled())
0660: log.error("sender of message is null");
0661: return;
0662: }
0663:
0664: if (log.isTraceEnabled()) {
0665: StringBuffer sb = new StringBuffer('[');
0666: sb.append(local_addr).append(": received ").append(sender)
0667: .append('#').append(hdr.seqno);
0668: log.trace(sb.toString());
0669: }
0670:
0671: // msg is potentially re-sent later as result of XMIT_REQ reception; that's why hdr is added !
0672:
0673: // Changed by bela Jan 29 2003: we currently don't resend from received msgs, just from sent_msgs !
0674: // msg.putHeader(getName(), hdr);
0675:
0676: synchronized (received_msgs) {
0677: win = (NakReceiverWindow) received_msgs.get(sender);
0678: }
0679: if (win == null) { // discard message if there is no entry for sender
0680: if (leaving)
0681: return;
0682: if (log.isWarnEnabled()) {
0683: StringBuffer sb = new StringBuffer('[');
0684: sb.append(local_addr).append(
0685: "] discarded message from non-member ").append(
0686: sender).append(", my view is ").append(
0687: this .view);
0688: log.warn(sb);
0689: }
0690: return;
0691: }
0692: win.add(hdr.seqno, msg); // add in order, then remove and pass up as many msgs as possible
0693:
0694: // Prevents concurrent passing up of messages by different threads (http://jira.jboss.com/jira/browse/JGRP-198);
0695: // this is all the more important once we have a threadless stack (http://jira.jboss.com/jira/browse/JGRP-181),
0696: // where lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time
0697: // We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in
0698: // delivery of P1, Q1, Q2, P2: FIFO (implemented by NAKACK) says messages need to be delivered only in the
0699: // order in which they were sent by the sender
0700: synchronized (win) {
0701: while ((msg_to_deliver = win.remove()) != null) {
0702:
0703: // Changed by bela Jan 29 2003: not needed (see above)
0704: //msg_to_deliver.removeHeader(getName());
0705: passUp(new Event(Event.MSG, msg_to_deliver));
0706: }
0707: }
0708: }
0709:
0710: /**
0711: * Retransmit from sent-table, called when XMIT_REQ is received. Bundles all messages to be xmitted into one large
0712: * message and sends them back with an XMIT_RSP header. Note that since we cannot count on a fragmentation layer
0713: * below us, we have to make sure the message doesn't exceed max_xmit_size bytes. If this is the case, we split the
0714: * message into multiple, smaller-chunked messages. But in most cases this still yields fewer messages than if each
0715: * requested message was retransmitted separately.
0716: *
0717: * @param xmit_requester The sender of the XMIT_REQ, we have to send the requested copy of the message to this address
0718: * @param first_seqno The first sequence number to be retransmitted (<= last_seqno)
0719: * @param last_seqno The last sequence number to be retransmitted (>= first_seqno)
0720: * @param original_sender The member who originally sent the messsage. Guaranteed to be non-null
0721: */
0722: private void handleXmitReq(Address xmit_requester,
0723: long first_seqno, long last_seqno, Address original_sender) {
0724: Message m, tmp;
0725: LinkedList list;
0726: long size = 0, marker = first_seqno, len;
0727: NakReceiverWindow win = null;
0728: boolean amISender; // am I the original sender ?
0729:
0730: if (log.isTraceEnabled()) {
0731: StringBuffer sb = new StringBuffer();
0732: sb.append(local_addr).append(
0733: ": received xmit request from ").append(
0734: xmit_requester).append(" for ");
0735: sb.append(original_sender).append(" [").append(first_seqno)
0736: .append(" - ").append(last_seqno).append("]");
0737: log.trace(sb.toString());
0738: }
0739:
0740: if (first_seqno > last_seqno) {
0741: if (log.isErrorEnabled())
0742: log.error("first_seqno (" + first_seqno
0743: + ") > last_seqno (" + last_seqno
0744: + "): not able to retransmit");
0745: return;
0746: }
0747:
0748: if (stats) {
0749: xmit_reqs_received += last_seqno - first_seqno + 1;
0750: updateStats(received, xmit_requester, 1, 0, 0);
0751: }
0752:
0753: amISender = local_addr.equals(original_sender);
0754: if (!amISender)
0755: win = (NakReceiverWindow) received_msgs
0756: .get(original_sender);
0757:
0758: list = new LinkedList();
0759: for (long i = first_seqno; i <= last_seqno; i++) {
0760: if (amISender) {
0761: m = (Message) sent_msgs.get(new Long(i)); // no need to synchronize
0762: } else {
0763: m = win != null ? win.get(i) : null;
0764: }
0765: if (m == null) {
0766: if (log.isErrorEnabled()) {
0767: StringBuffer sb = new StringBuffer();
0768: sb.append("(requester=").append(xmit_requester)
0769: .append(", local_addr=").append(
0770: this .local_addr);
0771: sb.append(") message ").append(original_sender)
0772: .append("::").append(i);
0773: sb.append(" not found in ").append(
0774: (amISender ? "sent" : "received")).append(
0775: " msgs. ");
0776: if (win != null) {
0777: sb.append("Received messages from ").append(
0778: original_sender).append(": ").append(
0779: win.toString());
0780: } else {
0781: sb.append("\nSent messages: ").append(
0782: printSentMsgs());
0783: }
0784: log.error(sb);
0785: }
0786: continue;
0787: }
0788: len = m.size();
0789: size += len;
0790: if (size > max_xmit_size && list.size() > 0) { // changed from >= to > (yaron-r, bug #943709)
0791: // yaronr: added &&listSize()>0 since protocols between FRAG and NAKACK add headers, and message exceeds size.
0792:
0793: // size has reached max_xmit_size. go ahead and send message (excluding the current message)
0794: if (log.isTraceEnabled())
0795: log.trace("xmitting msgs [" + marker + '-'
0796: + (i - 1) + "] to " + xmit_requester);
0797: sendXmitRsp(xmit_requester, (LinkedList) list.clone(),
0798: marker, i - 1);
0799: marker = i;
0800: list.clear();
0801: // fixed Dec 15 2003 (bela, patch from Joel Dice (dicej)), see explanantion under
0802: // bug report #854887
0803: size = len;
0804: }
0805: if (Global.copy) {
0806: tmp = m.copy();
0807: } else {
0808: tmp = m;
0809: }
0810: // tmp.setDest(xmit_requester);
0811: // tmp.setSrc(local_addr);
0812: if (tmp.getSrc() == null)
0813: tmp.setSrc(local_addr);
0814: list.add(tmp);
0815: }
0816:
0817: if (list.size() > 0) {
0818: if (log.isTraceEnabled())
0819: log.trace("xmitting msgs [" + marker + '-' + last_seqno
0820: + "] to " + xmit_requester);
0821: sendXmitRsp(xmit_requester, (LinkedList) list.clone(),
0822: marker, last_seqno);
0823: list.clear();
0824: }
0825: }
0826:
0827: private static void updateStats(HashMap map, Address key, int req,
0828: int rsp, int missing) {
0829: Entry entry = (Entry) map.get(key);
0830: if (entry == null) {
0831: entry = new Entry();
0832: map.put(key, entry);
0833: }
0834: entry.xmit_reqs += req;
0835: entry.xmit_rsps += rsp;
0836: entry.missing_msgs_rcvd += missing;
0837: }
0838:
0839: private void sendXmitRsp(Address dest, LinkedList xmit_list,
0840: long first_seqno, long last_seqno) {
0841: Buffer buf;
0842: if (xmit_list == null || xmit_list.size() == 0) {
0843: if (log.isErrorEnabled())
0844: log.error("xmit_list is empty");
0845: return;
0846: }
0847: if (use_mcast_xmit)
0848: dest = null;
0849:
0850: if (stats) {
0851: xmit_rsps_sent += xmit_list.size();
0852: updateStats(sent, dest, 0, 1, 0);
0853: }
0854:
0855: try {
0856: buf = Util.msgListToByteBuffer(xmit_list);
0857: Message msg = new Message(dest, null, buf.getBuf(), buf
0858: .getOffset(), buf.getLength());
0859: msg.putHeader(name, new NakAckHeader(NakAckHeader.XMIT_RSP,
0860: first_seqno, last_seqno));
0861: passDown(new Event(Event.MSG, msg));
0862: } catch (IOException ex) {
0863: log.error("failed marshalling xmit list", ex);
0864: }
0865: }
0866:
0867: private void handleXmitRsp(Message msg) {
0868: LinkedList list;
0869: Message m;
0870:
0871: if (msg == null) {
0872: if (log.isWarnEnabled())
0873: log.warn("message is null");
0874: return;
0875: }
0876: try {
0877: list = Util.byteBufferToMessageList(msg.getRawBuffer(), msg
0878: .getOffset(), msg.getLength());
0879: if (list != null) {
0880: if (stats) {
0881: xmit_rsps_received += list.size();
0882: updateStats(received, msg.getSrc(), 0, 1, 0);
0883: }
0884: for (Iterator it = list.iterator(); it.hasNext();) {
0885: m = (Message) it.next();
0886: up(new Event(Event.MSG, m));
0887: }
0888: list.clear();
0889: }
0890: } catch (Exception ex) {
0891: if (log.isErrorEnabled()) {
0892: log
0893: .error(
0894: "failed reading list of retransmitted messages",
0895: ex);
0896: }
0897: }
0898: }
0899:
0900: /**
0901: * Remove old members from NakReceiverWindows and add new members (starting seqno=0). Essentially removes all
0902: * entries from received_msgs that are not in <code>members</code>
0903: */
0904: private void adjustReceivers(boolean remove) {
0905: Address sender;
0906: NakReceiverWindow win;
0907:
0908: synchronized (received_msgs) {
0909: if (remove) {
0910: // 1. Remove all senders in received_msgs that are not members anymore
0911: for (Iterator it = received_msgs.keySet().iterator(); it
0912: .hasNext();) {
0913: sender = (Address) it.next();
0914: if (!members.contains(sender)) {
0915: win = (NakReceiverWindow) received_msgs
0916: .get(sender);
0917: win.reset();
0918: if (log.isDebugEnabled()) {
0919: log
0920: .debug("removing "
0921: + sender
0922: + " from received_msgs (not member anymore)");
0923: }
0924: it.remove();
0925: }
0926: }
0927: }
0928:
0929: // 2. Add newly joined members to received_msgs (starting seqno=0)
0930: for (int i = 0; i < members.size(); i++) {
0931: sender = (Address) members.elementAt(i);
0932: if (!received_msgs.containsKey(sender)) {
0933: win = createNakReceiverWindow(sender, 0);
0934: received_msgs.put(sender, win);
0935: }
0936: }
0937: }
0938: }
0939:
0940: /**
0941: * Returns a message digest: for each member P the highest seqno received from P is added to the digest.
0942: */
0943: private Digest getDigest() {
0944: Digest digest;
0945: Address sender;
0946: Range range;
0947:
0948: digest = new Digest(members.size());
0949: for (int i = 0; i < members.size(); i++) {
0950: sender = (Address) members.elementAt(i);
0951: range = getLowestAndHighestSeqno(sender, false); // get the highest received seqno
0952: if (range == null) {
0953: if (log.isErrorEnabled()) {
0954: log.error("range is null");
0955: }
0956: continue;
0957: }
0958: digest.add(sender, range.low, range.high); // add another entry to the digest
0959: }
0960: return digest;
0961: }
0962:
0963: /**
0964: * Returns a message digest: for each member P the highest seqno received from P <em>without a gap</em> is added to
0965: * the digest. E.g. if the seqnos received from P are [+3 +4 +5 -6 +7 +8], then 5 will be returned. Also, the
0966: * highest seqno <em>seen</em> is added. The max of all highest seqnos seen will be used (in STABLE) to determine
0967: * whether the last seqno from a sender was received (see "Last Message Dropped" topic in DESIGN).
0968: */
0969: private Digest getDigestHighestDeliveredMsgs() {
0970: Digest digest;
0971: Address sender;
0972: Range range;
0973: long high_seqno_seen;
0974:
0975: digest = new Digest(members.size());
0976: for (int i = 0; i < members.size(); i++) {
0977: sender = (Address) members.elementAt(i);
0978: range = getLowestAndHighestSeqno(sender, true); // get the highest deliverable seqno
0979: if (range == null) {
0980: if (log.isErrorEnabled()) {
0981: log.error("range is null");
0982: }
0983: continue;
0984: }
0985: high_seqno_seen = getHighSeqnoSeen(sender);
0986: digest.add(sender, range.low, range.high, high_seqno_seen); // add another entry to the digest
0987: }
0988: return digest;
0989: }
0990:
0991: /**
0992: * Creates a NakReceiverWindow for each sender in the digest according to the sender's seqno. If NRW already exists,
0993: * reset it.
0994: */
0995: private void setDigest(Digest d) {
0996: if (d == null || d.senders == null) {
0997: if (log.isErrorEnabled()) {
0998: log.error("digest or digest.senders is null");
0999: }
1000: return;
1001: }
1002:
1003: clear();
1004:
1005: Map.Entry entry;
1006: Address sender;
1007: org.jgroups.protocols.pbcast.Digest.Entry val;
1008: long initial_seqno;
1009: NakReceiverWindow win;
1010:
1011: for (Iterator it = d.senders.entrySet().iterator(); it
1012: .hasNext();) {
1013: entry = (Map.Entry) it.next();
1014: sender = (Address) entry.getKey();
1015: val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
1016: .getValue();
1017:
1018: if (sender == null || val == null) {
1019: if (log.isWarnEnabled()) {
1020: log.warn("sender or value is null");
1021: }
1022: continue;
1023: }
1024: initial_seqno = val.high_seqno;
1025: win = createNakReceiverWindow(sender, initial_seqno);
1026: synchronized (received_msgs) {
1027: received_msgs.put(sender, win);
1028: }
1029: }
1030: }
1031:
1032: /**
1033: * For all members of the digest, adjust the NakReceiverWindows in the received_msgs hashtable. If the member
1034: * already exists, sets its seqno to be the max of the seqno and the seqno of the member in the digest. If no entry
1035: * exists, create one with the initial seqno set to the seqno of the member in the digest.
1036: */
1037: private void mergeDigest(Digest d) {
1038: if (d == null || d.senders == null) {
1039: if (log.isErrorEnabled()) {
1040: log.error("digest or digest.senders is null");
1041: }
1042: return;
1043: }
1044:
1045: Map.Entry entry;
1046: Address sender;
1047: org.jgroups.protocols.pbcast.Digest.Entry val;
1048: NakReceiverWindow win;
1049: long initial_seqno;
1050:
1051: for (Iterator it = d.senders.entrySet().iterator(); it
1052: .hasNext();) {
1053: entry = (Map.Entry) it.next();
1054: sender = (Address) entry.getKey();
1055: val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
1056: .getValue();
1057:
1058: if (sender == null || val == null) {
1059: if (log.isWarnEnabled()) {
1060: log.warn("sender or value is null");
1061: }
1062: continue;
1063: }
1064: initial_seqno = val.high_seqno;
1065: synchronized (received_msgs) {
1066: win = (NakReceiverWindow) received_msgs.get(sender);
1067: if (win == null) {
1068: win = createNakReceiverWindow(sender, initial_seqno);
1069: received_msgs.put(sender, win);
1070: } else {
1071: if (win.getHighestReceived() < initial_seqno) {
1072: win.reset();
1073: received_msgs.remove(sender);
1074: win = createNakReceiverWindow(sender,
1075: initial_seqno);
1076: received_msgs.put(sender, win);
1077: }
1078: }
1079: }
1080: }
1081: }
1082:
1083: private NakReceiverWindow createNakReceiverWindow(Address sender,
1084: long initial_seqno) {
1085: NakReceiverWindow win = new NakReceiverWindow(sender, this ,
1086: initial_seqno, timer);
1087: win.setRetransmitTimeouts(retransmit_timeout);
1088: win.setDiscardDeliveredMessages(discard_delivered_msgs);
1089: win.setMaxXmitBufSize(this .max_xmit_buf_size);
1090: if (stats)
1091: win.setListener(this );
1092: return win;
1093: }
1094:
1095: /**
1096: * Returns the lowest seqno still in cache (so it can be retransmitted) and the highest seqno received so far.
1097: *
1098: * @param sender The address for which the highest and lowest seqnos are to be retrieved
1099: * @param stop_at_gaps If true, the highest seqno *deliverable* will be returned. If false, the highest seqno
1100: * *received* will be returned. E.g. for [+3 +4 +5 -6 +7 +8], the highest_seqno_received is 8,
1101: * whereas the higheset_seqno_seen (deliverable) is 5.
1102: */
1103: private Range getLowestAndHighestSeqno(Address sender,
1104: boolean stop_at_gaps) {
1105: Range r = null;
1106: NakReceiverWindow win;
1107:
1108: if (sender == null) {
1109: if (log.isErrorEnabled()) {
1110: log.error("sender is null");
1111: }
1112: return r;
1113: }
1114: synchronized (received_msgs) {
1115: win = (NakReceiverWindow) received_msgs.get(sender);
1116: }
1117: if (win == null) {
1118: if (log.isErrorEnabled()) {
1119: log.error("sender " + sender
1120: + " not found in received_msgs");
1121: }
1122: return r;
1123: }
1124: if (stop_at_gaps) {
1125: r = new Range(win.getLowestSeen(), win.getHighestSeen()); // deliverable messages (no gaps)
1126: } else {
1127: r = new Range(win.getLowestSeen(),
1128: win.getHighestReceived() + 1); // received messages
1129: }
1130: return r;
1131: }
1132:
1133: /**
1134: * Returns the highest seqno seen from sender. E.g. if we received 1, 2, 4, 5 from P, then 5 will be returned
1135: * (doesn't take gaps into account). If we are the sender, we will return the highest seqno <em>sent</em> rather
1136: * then <em>received</em>
1137: */
1138: private long getHighSeqnoSeen(Address sender) {
1139: NakReceiverWindow win;
1140: long ret = 0;
1141:
1142: if (sender == null) {
1143: if (log.isErrorEnabled()) {
1144: log.error("sender is null");
1145: }
1146: return ret;
1147: }
1148: if (sender.equals(local_addr)) {
1149: return seqno - 1;
1150: }
1151:
1152: synchronized (received_msgs) {
1153: win = (NakReceiverWindow) received_msgs.get(sender);
1154: }
1155: if (win == null) {
1156: if (log.isErrorEnabled()) {
1157: log.error("sender " + sender
1158: + " not found in received_msgs");
1159: }
1160: return ret;
1161: }
1162: ret = win.getHighestReceived();
1163: return ret;
1164: }
1165:
1166: /**
1167: * Garbage collect messages that have been seen by all members. Update sent_msgs: for the sender P in the digest
1168: * which is equal to the local address, garbage collect all messages <= seqno at digest[P]. Update received_msgs:
1169: * for each sender P in the digest and its highest seqno seen SEQ, garbage collect all delivered_msgs in the
1170: * NakReceiverWindow corresponding to P which are <= seqno at digest[P].
1171: */
1172: private void stable(Digest d) {
1173: NakReceiverWindow recv_win;
1174: long my_highest_rcvd; // highest seqno received in my digest for a sender P
1175: long stability_highest_rcvd; // highest seqno received in the stability vector for a sender P
1176:
1177: if (members == null || local_addr == null || d == null) {
1178: if (log.isWarnEnabled())
1179: log.warn("members, local_addr or digest are null !");
1180: return;
1181: }
1182:
1183: if (log.isTraceEnabled()) {
1184: log.trace("received stable digest " + d);
1185: }
1186:
1187: Map.Entry entry;
1188: Address sender;
1189: org.jgroups.protocols.pbcast.Digest.Entry val;
1190: long high_seqno_delivered, high_seqno_received;
1191:
1192: for (Iterator it = d.senders.entrySet().iterator(); it
1193: .hasNext();) {
1194: entry = (Map.Entry) it.next();
1195: sender = (Address) entry.getKey();
1196: if (sender == null)
1197: continue;
1198: val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
1199: .getValue();
1200: high_seqno_delivered = val.high_seqno;
1201: high_seqno_received = val.high_seqno_seen;
1202:
1203: // check whether the last seqno received for a sender P in the stability vector is > last seqno
1204: // received for P in my digest. if yes, request retransmission (see "Last Message Dropped" topic
1205: // in DESIGN)
1206: synchronized (received_msgs) {
1207: recv_win = (NakReceiverWindow) received_msgs
1208: .get(sender);
1209: }
1210: if (recv_win != null) {
1211: my_highest_rcvd = recv_win.getHighestReceived();
1212: stability_highest_rcvd = high_seqno_received;
1213:
1214: if (stability_highest_rcvd >= 0
1215: && stability_highest_rcvd > my_highest_rcvd) {
1216: if (log.isTraceEnabled()) {
1217: log
1218: .trace("my_highest_rcvd ("
1219: + my_highest_rcvd
1220: + ") < stability_highest_rcvd ("
1221: + stability_highest_rcvd
1222: + "): requesting retransmission of "
1223: + sender + '#'
1224: + stability_highest_rcvd);
1225: }
1226: retransmit(stability_highest_rcvd,
1227: stability_highest_rcvd, sender);
1228: }
1229: }
1230:
1231: high_seqno_delivered -= gc_lag;
1232: if (high_seqno_delivered < 0) {
1233: continue;
1234: }
1235:
1236: if (log.isTraceEnabled())
1237: log.trace("deleting msgs <= " + high_seqno_delivered
1238: + " from " + sender);
1239:
1240: // garbage collect from sent_msgs if sender was myself
1241: if (sender.equals(local_addr)) {
1242: synchronized (sent_msgs) {
1243: // gets us a subset from [lowest seqno - seqno]
1244: SortedMap stable_keys = sent_msgs.headMap(new Long(
1245: high_seqno_delivered));
1246: if (stable_keys != null) {
1247: stable_keys.clear(); // this will modify sent_msgs directly
1248: }
1249: }
1250: }
1251:
1252: // delete *delivered* msgs that are stable
1253: // recv_win=(NakReceiverWindow)received_msgs.get(sender);
1254: if (recv_win != null)
1255: recv_win.stable(high_seqno_delivered); // delete all messages with seqnos <= seqno
1256: }
1257: }
1258:
1259: /* ---------------------- Interface Retransmitter.RetransmitCommand ---------------------- */
1260:
1261: /**
1262: * Implementation of Retransmitter.RetransmitCommand. Called by retransmission thread when gap is detected.
1263: */
1264: public void retransmit(long first_seqno, long last_seqno,
1265: Address sender) {
1266: NakAckHeader hdr;
1267: Message retransmit_msg;
1268: Address dest = sender; // to whom do we send the XMIT request ?
1269:
1270: if (xmit_from_random_member && !local_addr.equals(sender)) {
1271: Address random_member = (Address) Util
1272: .pickRandomElement(members);
1273: if (random_member != null
1274: && !local_addr.equals(random_member)) {
1275: dest = random_member;
1276: if (log.isTraceEnabled())
1277: log.trace("picked random member " + dest
1278: + " to send XMIT request to");
1279: }
1280: }
1281:
1282: hdr = new NakAckHeader(NakAckHeader.XMIT_REQ, first_seqno,
1283: last_seqno, sender);
1284: retransmit_msg = new Message(dest, null, null);
1285: if (log.isTraceEnabled())
1286: log
1287: .trace(local_addr + ": sending XMIT_REQ (["
1288: + first_seqno + ", " + last_seqno
1289: + "]) to " + dest);
1290: retransmit_msg.putHeader(name, hdr);
1291: passDown(new Event(Event.MSG, retransmit_msg));
1292: if (stats) {
1293: xmit_reqs_sent += last_seqno - first_seqno + 1;
1294: updateStats(sent, dest, 1, 0, 0);
1295: for (long i = first_seqno; i <= last_seqno; i++) {
1296: XmitRequest req = new XmitRequest(sender, i, dest);
1297: send_history.add(req);
1298: }
1299: }
1300: }
1301:
1302: /* ------------------- End of Interface Retransmitter.RetransmitCommand -------------------- */
1303:
1304: /* ----------------------- Interface NakReceiverWindow.Listener ---------------------- */
1305: public void missingMessageReceived(long seqno, Message msg) {
1306: if (stats) {
1307: missing_msgs_received++;
1308: updateStats(received, msg.getSrc(), 0, 0, 1);
1309: MissingMessage missing = new MissingMessage(msg.getSrc(),
1310: seqno);
1311: receive_history.add(missing);
1312: }
1313: }
1314:
1315: /* ------------------- End of Interface NakReceiverWindow.Listener ------------------- */
1316:
1317: private void clear() {
1318: NakReceiverWindow win;
1319:
1320: // changed April 21 2004 (bela): SourceForge bug# 938584. We cannot delete our own messages sent between
1321: // a join() and a getState(). Otherwise retransmission requests from members who missed those msgs might
1322: // fail. Not to worry though: those msgs will be cleared by STABLE (message garbage collection)
1323:
1324: // sent_msgs.clear();
1325:
1326: synchronized (received_msgs) {
1327: for (Iterator it = received_msgs.values().iterator(); it
1328: .hasNext();) {
1329: win = (NakReceiverWindow) it.next();
1330: win.reset();
1331: }
1332: received_msgs.clear();
1333: }
1334: }
1335:
1336: private void reset() {
1337: NakReceiverWindow win;
1338:
1339: synchronized (sent_msgs) {
1340: sent_msgs.clear();
1341: seqno = -1;
1342: }
1343:
1344: synchronized (received_msgs) {
1345: for (Iterator it = received_msgs.values().iterator(); it
1346: .hasNext();) {
1347: win = (NakReceiverWindow) it.next();
1348: win.destroy();
1349: }
1350: received_msgs.clear();
1351: }
1352: }
1353:
1354: public String printMessages() {
1355: StringBuffer ret = new StringBuffer();
1356: Map.Entry entry;
1357: Address addr;
1358: Object w;
1359:
1360: ret.append("\nsent_msgs: ").append(printSentMsgs());
1361: ret.append("\nreceived_msgs:\n");
1362: synchronized (received_msgs) {
1363: for (Iterator it = received_msgs.entrySet().iterator(); it
1364: .hasNext();) {
1365: entry = (Map.Entry) it.next();
1366: addr = (Address) entry.getKey();
1367: w = entry.getValue();
1368: ret.append(addr).append(": ").append(w.toString())
1369: .append('\n');
1370: }
1371: }
1372: return ret.toString();
1373: }
1374:
1375: public String printSentMsgs() {
1376: StringBuffer sb = new StringBuffer();
1377: Long min_seqno, max_seqno;
1378: synchronized (sent_msgs) {
1379: min_seqno = sent_msgs.size() > 0 ? (Long) sent_msgs
1380: .firstKey() : new Long(0);
1381: max_seqno = sent_msgs.size() > 0 ? (Long) sent_msgs
1382: .lastKey() : new Long(0);
1383: }
1384: sb.append('[').append(min_seqno).append(" - ")
1385: .append(max_seqno).append("] (").append(
1386: sent_msgs.size()).append(")");
1387: return sb.toString();
1388: }
1389:
1390: private void handleConfigEvent(HashMap map) {
1391: if (map == null) {
1392: return;
1393: }
1394: if (map.containsKey("frag_size")) {
1395: max_xmit_size = ((Integer) map.get("frag_size")).intValue();
1396: if (log.isInfoEnabled()) {
1397: log.info("max_xmit_size=" + max_xmit_size);
1398: }
1399: }
1400: }
1401:
1402: static class Entry {
1403: long xmit_reqs, xmit_rsps, missing_msgs_rcvd;
1404:
1405: public String toString() {
1406: StringBuffer sb = new StringBuffer();
1407: sb.append(xmit_reqs).append(" xmit_reqs").append(", ")
1408: .append(xmit_rsps).append(" xmit_rsps");
1409: sb.append(", ").append(missing_msgs_rcvd).append(
1410: " missing msgs");
1411: return sb.toString();
1412: }
1413: }
1414:
1415: static class XmitRequest {
1416: Address original_sender; // original sender of message
1417: long seq, timestamp = System.currentTimeMillis();
1418: Address xmit_dest; // destination to which XMIT_REQ is sent, usually the original sender
1419:
1420: XmitRequest(Address original_sender, long seqno,
1421: Address xmit_dest) {
1422: this .original_sender = original_sender;
1423: this .xmit_dest = xmit_dest;
1424: this .seq = seqno;
1425: }
1426:
1427: public String toString() {
1428: StringBuffer sb = new StringBuffer();
1429: sb.append(new Date(timestamp)).append(": ").append(
1430: original_sender).append(" #").append(seq);
1431: sb.append(" (XMIT_REQ sent to ").append(xmit_dest).append(
1432: ")");
1433: return sb.toString();
1434: }
1435: }
1436:
1437: static class MissingMessage {
1438: Address original_sender;
1439: long seq, timestamp = System.currentTimeMillis();
1440:
1441: MissingMessage(Address original_sender, long seqno) {
1442: this .original_sender = original_sender;
1443: this .seq = seqno;
1444: }
1445:
1446: public String toString() {
1447: StringBuffer sb = new StringBuffer();
1448: sb.append(new Date(timestamp)).append(": ").append(
1449: original_sender).append(" #").append(seq);
1450: return sb.toString();
1451: }
1452: }
1453:
1454: /* ----------------------------- End of Private Methods ------------------------------------ */
1455:
1456: }
|