0001: // $Id: NAKACK.java,v 1.15.10.1 2007/04/27 08:03:52 belaban Exp $
0002:
0003: package org.jgroups.protocols;
0004:
0005: import org.jgroups.*;
0006: import org.jgroups.stack.*;
0007: import org.jgroups.util.List;
0008: import org.jgroups.util.TimeScheduler;
0009: import org.jgroups.util.Util;
0010:
0011: import java.util.Enumeration;
0012: import java.util.Hashtable;
0013: import java.util.Properties;
0014: import java.util.Vector;
0015:
0016: /**
0017: * Negative AcKnowledgement layer (NAKs), paired with positive ACKs. The default is to send a message
0018: * using NAKs: the sender sends messages with monotonically increasing seqnos, receiver requests
0019: * retransmissions of missing messages (gaps). When a SWITCH_NAK_ACK event is received, the mode
0020: * is switched to using NAK_ACKS: the sender still uses monotonically increasing seqnos, but the receiver
0021: * acknowledges every message. NAK and NAK_ACK seqnos are the same, when switching the mode, the current
0022: * seqno is reused. Both NAK and NAK_ACK messages use the current view ID in which the message is sent to
0023: * queue messages destined for an upcoming view, or discard messages sent in a previous view. Both modes
0024: * reset their seqnos to 0 when receiving a view change. The NAK_ACK scheme is used for broadcasting
0025: * view changes.
0026: * <p/>
0027: * The third mode is for out-of-band control messages (activated by SWITCH_OUT_OF_BAND): this mode does
0028: * neither employ view IDs, nor does it use the same seqnos as NAK and NAK_ACK. It uses its own seqnos,
0029: * unrelated to the ones used by NAK and NAK_ACK, and never resets them. In combination with the sender's
0030: * address, this makes every out-of-band message unique. Out-of-band messages are used for example for
0031: * broadcasting FLUSH messages.<p>
0032: * Once a mode is set, it remains in effect until exactly 1 message has been sent, afterwards the default
0033: * mode NAK is used again.
0034: * <p/>
0035: * The following communication between 2 peers exists (left side is initiator,
0036: * right side receiver): <pre>
0037: * <p/>
0038: * <p/>
0039: * send_out_of_band
0040: * --------------> synchronous (1)
0041: * <-------------
0042: * ack
0043: * <p/>
0044: * <p/>
0045: * send_nak
0046: * --------------> asynchronous (2)
0047: * <p/>
0048: * <p/>
0049: * send_nak_ack
0050: * --------------> synchronous (3)
0051: * <--------------
0052: * ack
0053: * <p/>
0054: * <p/>
0055: * retransmit
0056: * <-------------- asynchronous (4)
0057: * <p/>
0058: * <p/>
0059: * </pre>
0060: * <p/>
0061: * When a message is sent, it will contain a header describing the type of the
0062: * message, and containing additional data, such as sequence number etc. When a
0063: * message is received, it is fed into either the OutOfBander or NAKer, depending on the
0064: * header's type.<p>
0065: * Note that in the synchronous modes, ACKs are sent for each request. If a reliable unicast protocol layer
0066: * exists somewhere underneath this layer, then even the ACKs are transmitted reliably, thus increasing
0067: * the number of messages exchanged. However, since it is envisaged that ACK/OUT_OF_BAND are not used
0068: * frequently, this problem is currently not addressed.
0069: *
0070: * @author Bela Ban
0071: */
0072: public class NAKACK extends Protocol {
0073: long[] retransmit_timeout = { 2000, 3000, 5000, 8000 }; // time(s) to wait before requesting xmit
0074: NAKer naker = null;
0075: OutOfBander out_of_bander = null;
0076: ViewId vid = null;
0077: View view = null;
0078: boolean is_server = false;
0079: Address local_addr = null;
0080: final List queued_msgs = new List(); // msgs for next view (vid > current vid)
0081: Vector members = null; // for OutOfBander: this is the destination set to
0082: // send messages to
0083: boolean send_next_msg_out_of_band = false;
0084: boolean send_next_msg_acking = false;
0085: long rebroadcast_timeout = 0; // until all outstanding ACKs recvd (rebcasting)
0086: TimeScheduler timer = null;
0087: static final String WRAPPED_MSG_KEY = "NAKACK.WRAPPED_HDR";
0088:
0089: /**
0090: * Do some initial tasks
0091: */
0092: public void init() throws Exception {
0093: timer = stack != null ? stack.timer : null;
0094: if (timer == null)
0095: if (log.isErrorEnabled())
0096: log.error("timer is null");
0097: naker = new NAKer();
0098: out_of_bander = new OutOfBander();
0099: }
0100:
0101: public void stop() {
0102: out_of_bander.stop();
0103: naker.stop();
0104: }
0105:
0106: public String getName() {
0107: return "NAKACK";
0108: }
0109:
0110: public Vector providedUpServices() {
0111: Vector retval = new Vector(3);
0112: retval.addElement(new Integer(Event.GET_MSGS_RECEIVED));
0113: retval.addElement(new Integer(Event.GET_MSG_DIGEST));
0114: retval.addElement(new Integer(Event.GET_MSGS));
0115: return retval;
0116: }
0117:
0118: public Vector providedDownServices() {
0119: Vector retval = new Vector(1);
0120: retval.addElement(new Integer(Event.GET_MSGS_RECEIVED));
0121: return retval;
0122: }
0123:
0124: /**
0125: * <b>Callback</b>. Called by superclass when event may be handled.<p>
0126: * <b>Do not use <code>passUp()</code> in this method as the event is passed up
0127: * by default by the superclass after this method returns !</b>
0128: */
0129: public void up(Event evt) {
0130: NakAckHeader hdr;
0131: Message msg, msg_copy;
0132: int rc;
0133:
0134: switch (evt.getType()) {
0135:
0136: case Event.SUSPECT:
0137:
0138: if (log.isInfoEnabled())
0139: log.info("received SUSPECT event (suspected member="
0140: + evt.getArg() + ')');
0141: naker.suspect((Address) evt.getArg());
0142: out_of_bander.suspect((Address) evt.getArg());
0143: break;
0144:
0145: case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
0146: naker.stable((long[]) evt.getArg());
0147: return; // don't pass up further (Bela Aug 7 2001)
0148:
0149: case Event.SET_LOCAL_ADDRESS:
0150: local_addr = (Address) evt.getArg();
0151: break;
0152:
0153: case Event.GET_MSGS_RECEIVED: // returns the highest seqnos delivered to the appl. (used by STABLE)
0154: long[] highest = naker.getHighestSeqnosDelivered();
0155: passDown(new Event(Event.GET_MSGS_RECEIVED_OK, highest));
0156: return; // don't pass up further (bela Aug 7 2001)
0157:
0158: case Event.MSG:
0159: synchronized (this ) {
0160: msg = (Message) evt.getArg();
0161:
0162: // check to see if this is a wrapped msg. If yes, send an ACK
0163: hdr = (NakAckHeader) msg.removeHeader(WRAPPED_MSG_KEY); // see whether it is a wrapped message
0164: if (hdr != null && hdr.type == NakAckHeader.WRAPPED_MSG) { // send back an ACK to hdr.sender
0165: Message ack_msg = new Message(hdr.sender, null,
0166: null);
0167: NakAckHeader h = new NakAckHeader(
0168: NakAckHeader.NAK_ACK_RSP, hdr.seqno, null);
0169: if (hdr.sender == null)
0170: if (log.isWarnEnabled())
0171: log
0172: .warn("WRAPPED: header's 'sender' field is null; "
0173: + "cannot send ACK !");
0174: ack_msg.putHeader(getName(), h);
0175: passDown(new Event(Event.MSG, ack_msg));
0176: }
0177:
0178: hdr = (NakAckHeader) msg.removeHeader(getName());
0179: if (hdr == null)
0180: break; // pass up
0181:
0182: switch (hdr.type) {
0183:
0184: case NakAckHeader.NAK_ACK_MSG:
0185: case NakAckHeader.NAK_MSG:
0186: if (hdr.type == NakAckHeader.NAK_ACK_MSG) { // first thing: send ACK back to sender
0187: Message ack_msg = new Message(msg.getSrc(),
0188: null, null);
0189: NakAckHeader h = new NakAckHeader(
0190: NakAckHeader.NAK_ACK_RSP, hdr.seqno,
0191: null);
0192: ack_msg.putHeader(getName(), h);
0193: passDown(new Event(Event.MSG, ack_msg));
0194: }
0195:
0196: // while still a client, we just pass up all messages, without checking for message
0197: // view IDs or seqnos: other layers further up will discard messages not destined
0198: // for us (e.g. based on view IDs).
0199: // Also: store msg in queue, when view change is received, replay messages with the same
0200: // vid as the new view
0201: if (!is_server) {
0202: msg_copy = msg.copy(); // msg without header
0203: msg_copy.putHeader(getName(), hdr); // put header back on as we removed it above
0204: queued_msgs.add(msg_copy); // need a copy since passUp() will modify msg
0205: passUp(new Event(Event.MSG, msg));
0206: return;
0207: }
0208:
0209: // check for VIDs: is the message's VID the same as ours ?
0210: if (vid != null && hdr.vid != null) { // only check if our vid and message's vid available
0211: Address my_addr = vid.getCoordAddress(), other_addr = hdr.vid
0212: .getCoordAddress();
0213:
0214: if (my_addr == null || other_addr == null) {
0215: if (log.isWarnEnabled())
0216: log
0217: .warn("my vid or message's vid does not contain "
0218: + "a coordinator; discarding message !");
0219: return;
0220: }
0221: if (!my_addr.equals(other_addr)) {
0222: if (log.isWarnEnabled())
0223: log.warn("creator of own vid ("
0224: + my_addr
0225: + ")is different from "
0226: + "creator of message's vid ("
0227: + other_addr
0228: + "); discarding message !");
0229: return;
0230: }
0231:
0232: rc = hdr.vid.compareTo(vid);
0233: if (rc > 0) { // message is sent in next view -> store !
0234:
0235: if (log.isInfoEnabled())
0236: log
0237: .info("message's vid ("
0238: + hdr.vid
0239: + '#'
0240: + hdr.seqno
0241: + ") is bigger than current vid: ("
0242: + vid
0243: + ") message is queued !");
0244: msg.putHeader(getName(), hdr); // put header back on as we removed it above
0245: queued_msgs.add(msg);
0246: return;
0247: }
0248: if (rc < 0) { // message sent in prev. view -> discard !
0249:
0250: if (log.isWarnEnabled())
0251: log.warn("message's vid (" + hdr.vid
0252: + ") is smaller than "
0253: + "current vid (" + vid
0254: + "): message <" + msg.getSrc()
0255: + ":#" + hdr.seqno
0256: + "> is discarded ! Hdr is "
0257: + hdr);
0258: return;
0259: }
0260: // If we made it down here, the vids are the same --> OK
0261: }
0262:
0263: msg.putHeader(getName(), hdr); // stored in received_msgs, re-sent later that's why hdr is added !
0264: naker.receive(hdr.seqno, msg, null);
0265: return; // naker passes message up for us !
0266:
0267: case NakAckHeader.RETRANSMIT_MSG:
0268: naker.retransmit(msg.getSrc(), hdr.seqno,
0269: hdr.last_seqno);
0270: return;
0271:
0272: case NakAckHeader.NAK_ACK_RSP:
0273: naker.receiveAck(hdr.seqno, msg.getSrc());
0274: return; // discard, no need to pass up
0275:
0276: case NakAckHeader.OUT_OF_BAND_MSG:
0277: out_of_bander.receive(hdr.seqno, msg,
0278: hdr.stable_msgs);
0279: return; // naker passes message up for us !
0280:
0281: case NakAckHeader.OUT_OF_BAND_RSP:
0282: out_of_bander.receiveAck(hdr.seqno, msg.getSrc());
0283: return;
0284:
0285: default:
0286: if (log.isErrorEnabled())
0287: log.error("NakAck header type " + hdr.type
0288: + " not known !");
0289: break;
0290: }
0291: } //end synchronized
0292:
0293: }
0294:
0295: passUp(evt);
0296: }
0297:
0298: /**
0299: * <b>Callback</b>. Called by superclass when event may be handled.<p>
0300: * <b>Do not use <code>passDown</code> in this method as the event is passed down
0301: * by default by the superclass after this method returns !</b>
0302: */
0303: public void down(Event evt) {
0304: Message msg;
0305:
0306: if (log.isTraceEnabled())
0307: log.trace("queued_msgs has " + queued_msgs.size()
0308: + " messages " + "\n\nnaker:\n"
0309: + naker.dumpContents() + "\n\nout_of_bander: "
0310: + out_of_bander.dumpContents()
0311: + "\n-----------------------------\n");
0312:
0313: switch (evt.getType()) {
0314:
0315: case Event.MSG:
0316: msg = (Message) evt.getArg();
0317:
0318: // unicast address: not null and not mcast, pass down unchanged
0319: if (vid == null
0320: || (msg.getDest() != null && !msg.getDest()
0321: .isMulticastAddress()))
0322: break;
0323:
0324: if (send_next_msg_out_of_band) {
0325: out_of_bander.send(msg);
0326: send_next_msg_out_of_band = false;
0327: } else if (send_next_msg_acking) {
0328: naker.setAcks(true); // require acks when sending a msg
0329: naker.send(msg);
0330: naker.setAcks(false); // don't require acks when sending a msg
0331: send_next_msg_acking = false;
0332: } else
0333: naker.send(msg);
0334:
0335: return; // don't pass down the stack, naker does this for us !
0336:
0337: case Event.GET_MSG_DIGEST:
0338: long[] highest_seqnos = (long[]) evt.getArg();
0339: Digest digest = naker.computeMessageDigest(highest_seqnos);
0340: passUp(new Event(Event.GET_MSG_DIGEST_OK, digest));
0341: return;
0342:
0343: case Event.GET_MSGS:
0344: List lower_seqnos = naker.getMessagesInRange((long[][]) evt
0345: .getArg());
0346: passUp(new Event(Event.GET_MSGS_OK, lower_seqnos));
0347: return;
0348:
0349: case Event.REBROADCAST_MSGS:
0350: rebroadcastMsgs((Vector) evt.getArg());
0351: break;
0352:
0353: case Event.TMP_VIEW:
0354: Vector mbrs = ((View) evt.getArg()).getMembers();
0355: members = mbrs != null ? (Vector) mbrs.clone()
0356: : new Vector(11);
0357: break;
0358:
0359: case Event.VIEW_CHANGE:
0360: synchronized (this ) {
0361: view = ((View) ((View) evt.getArg()).clone());
0362: vid = view.getVid();
0363:
0364: members = (Vector) view.getMembers().clone();
0365:
0366: naker.reset();
0367: out_of_bander.reset();
0368:
0369: is_server = true; // check vids from now on
0370:
0371: // deliver messages received previously for this view
0372: if (queued_msgs.size() > 0)
0373: deliverQueuedMessages();
0374: }
0375: break;
0376:
0377: case Event.BECOME_SERVER:
0378: is_server = true;
0379: break;
0380:
0381: case Event.SWITCH_NAK:
0382: naker.setAcks(false); // don't require acks when sending a msg
0383: return; // don't pass down any further
0384:
0385: case Event.SWITCH_NAK_ACK:
0386: send_next_msg_acking = true;
0387: return; // don't pass down any further
0388:
0389: case Event.SWITCH_OUT_OF_BAND:
0390: send_next_msg_out_of_band = true;
0391: return;
0392:
0393: case Event.GET_MSGS_RECEIVED: // return the highest seqnos delivered (=consumed by the application)
0394: long[] h = naker.getHighestSeqnosDelivered();
0395: passUp(new Event(Event.GET_MSGS_RECEIVED_OK, h));
0396: break;
0397: }
0398:
0399: passDown(evt);
0400: }
0401:
0402: boolean coordinator() {
0403: if (members == null || members.size() < 1 || local_addr == null)
0404: return false;
0405: return local_addr.equals(members.elementAt(0));
0406: }
0407:
0408: /**
0409: * Rebroadcasts the messages given as arguments
0410: */
0411: void rebroadcastMsgs(Vector v) {
0412: Vector final_v;
0413: Message m1, m2;
0414: NakAckHeader h1, h2;
0415:
0416: if (v == null)
0417: return;
0418: final_v = new Vector(v.size());
0419:
0420: // weed out duplicates
0421: //TODO Check!!!!!
0422: for (int i = 0; i < v.size(); i++) {
0423: boolean present = false;
0424: m1 = (Message) v.elementAt(i);
0425: h1 = m1 != null ? (NakAckHeader) m1.getHeader(getName())
0426: : null;
0427: if (m1 == null || h1 == null) { // +++ remove
0428: if (log.isErrorEnabled())
0429: log.error("message is null");
0430: continue;
0431: }
0432:
0433: for (int j = 0; j < final_v.size(); j++) {
0434: m2 = (Message) final_v.elementAt(j);
0435: h2 = m2 != null ? (NakAckHeader) m2
0436: .getHeader(getName()) : null;
0437: if (m2 == null || h2 == null) { // +++ remove
0438: if (log.isErrorEnabled())
0439: log.error("message m2 is null");
0440: continue;
0441: }
0442: if (h1.seqno == h2.seqno && m1.getSrc() != null
0443: && m2.getSrc() != null
0444: && m1.getSrc().equals(m2.getSrc())) {
0445: present = true;
0446: }
0447: }
0448: if (!present)
0449: final_v.addElement(m1);
0450: }
0451:
0452: if (log.isWarnEnabled())
0453: log.warn("rebroadcasting " + final_v.size() + " messages");
0454:
0455: /* Now re-broadcast messages using original NakAckHeader (same seqnos, same sender !) */
0456: for (int i = 0; i < final_v.size(); i++) {
0457: m1 = (Message) final_v.elementAt(i);
0458: naker.resend(m1);
0459: }
0460:
0461: // Wait until all members have acked reception of outstanding msgs. This will empty our
0462: // retransmission table (AckMcastSenderWindow)
0463: naker.waitUntilAllAcksReceived(rebroadcast_timeout);
0464: passUp(new Event(Event.REBROADCAST_MSGS_OK));
0465: }
0466:
0467: /**
0468: * Deliver all messages in the queue where <code>msg.vid == vid</code> holds. Messages were stored
0469: * in the queue because their vid was greater than the current view.
0470: */
0471: void deliverQueuedMessages() {
0472: NakAckHeader hdr;
0473: Message tmpmsg;
0474: int rc;
0475:
0476: while (queued_msgs.size() > 0) {
0477: tmpmsg = (Message) queued_msgs.removeFromHead();
0478: hdr = (NakAckHeader) tmpmsg.getHeader(getName());
0479: rc = hdr.vid.compareTo(vid);
0480: if (rc == 0) { // same vid -> OK
0481: up(new Event(Event.MSG, tmpmsg));
0482: } else if (rc > 0) {
0483: } else
0484: /** todo Maybe messages from previous vids are stored while client */
0485: ; // can't be the case; only messages for future views are stored !
0486: }
0487: }
0488:
0489: public boolean setProperties(Properties props) {
0490: String str;
0491: long[] tmp;
0492:
0493: super .setProperties(props);
0494: str = props.getProperty("retransmit_timeout");
0495: if (str != null) {
0496: tmp = Util.parseCommaDelimitedLongs(str);
0497: props.remove("retransmit_timeout");
0498: if (tmp != null && tmp.length > 0)
0499: retransmit_timeout = tmp;
0500: }
0501:
0502: str = props.getProperty("rebroadcast_timeout");
0503: if (str != null) {
0504: rebroadcast_timeout = Long.parseLong(str);
0505: props.remove("rebroadcast_timeout");
0506: }
0507:
0508: if (props.size() > 0) {
0509: log
0510: .error("NAKACK.setProperties(): these properties are not recognized: "
0511: + props);
0512:
0513: return false;
0514: }
0515: return true;
0516: }
0517:
0518: class NAKer implements Retransmitter.RetransmitCommand,
0519: AckMcastSenderWindow.RetransmitCommand {
0520: long seqno = 0; // current message sequence number
0521: final Hashtable received_msgs = new Hashtable(); // ordered by sender -> NakReceiverWindow
0522: final Hashtable sent_msgs = new Hashtable(); // ordered by seqno (sent by me !) - Messages
0523: final AckMcastSenderWindow sender_win = new AckMcastSenderWindow(
0524: this , timer);
0525: boolean acking = false; // require acks when sending msgs
0526: long deleted_up_to = 0;
0527:
0528: // Used to periodically retransmit the last message
0529: final LastMessageRetransmitter last_msg_xmitter = new LastMessageRetransmitter();
0530:
0531: private class LastMessageRetransmitter implements
0532: TimeScheduler.Task {
0533: boolean stopped = false;
0534: int num_times = 2; // number of times a message is retransmitted
0535: long last_xmitted_seqno = 0;
0536:
0537: public void stop() {
0538: stopped = true;
0539: }
0540:
0541: public boolean cancelled() {
0542: return stopped;
0543: }
0544:
0545: public long nextInterval() {
0546: return retransmit_timeout[0];
0547: }
0548:
0549: /**
0550: * Periodically retransmits the last seqno to all members. If the seqno doesn't change (ie. there
0551: * were no new messages sent) then the retransmitter task doesn't retransmit after 'num_times' times.
0552: */
0553: public void run() {
0554: synchronized (sent_msgs) {
0555: long prevSeqno = seqno - 1;
0556:
0557: if (prevSeqno == last_xmitted_seqno) {
0558:
0559: if (log.isInfoEnabled())
0560: log.info("prevSeqno=" + prevSeqno
0561: + ", last_xmitted_seqno="
0562: + last_xmitted_seqno
0563: + ", num_times=" + num_times);
0564: if (--num_times <= 0)
0565: return;
0566: } else {
0567: num_times = 3;
0568: last_xmitted_seqno = prevSeqno;
0569: }
0570:
0571: if ((prevSeqno >= 0) && (prevSeqno > deleted_up_to)) {
0572:
0573: if (log.isInfoEnabled())
0574: log.info("retransmitting last message "
0575: + prevSeqno);
0576: retransmit(null, prevSeqno, prevSeqno);
0577: }
0578: }
0579: }
0580:
0581: }
0582:
0583: NAKer() {
0584: if (timer != null)
0585: timer.add(last_msg_xmitter, true); // fixed-rate scheduling
0586: else if (log.isErrorEnabled())
0587: log.error("timer is null");
0588: }
0589:
0590: long getNextSeqno() {
0591: return seqno++;
0592: }
0593:
0594: long getHighestSeqnoSent() {
0595: long highest_sent = -1;
0596: for (Enumeration e = sent_msgs.keys(); e.hasMoreElements();)
0597: highest_sent = Math.max(highest_sent, ((Long) e
0598: .nextElement()).longValue());
0599: return highest_sent;
0600: }
0601:
0602: /**
0603: * Returns an array of the highest sequence numbers consumed by the application so far,
0604: * its order corresponding with <code>mbrs</code>. Used by coordinator as argument when
0605: * sending initial FLUSH request to members
0606: */
0607: long[] getHighestSeqnosDelivered() {
0608: long[] highest_deliv = members != null ? new long[members
0609: .size()] : null;
0610: Address mbr;
0611: NakReceiverWindow win;
0612:
0613: if (highest_deliv == null)
0614: return null;
0615:
0616: for (int i = 0; i < highest_deliv.length; i++)
0617: highest_deliv[i] = -1;
0618:
0619: synchronized (members) {
0620: for (int i = 0; i < members.size(); i++) {
0621: mbr = (Address) members.elementAt(i);
0622: win = (NakReceiverWindow) received_msgs.get(mbr);
0623: if (win != null)
0624: highest_deliv[i] = win.getHighestDelivered();
0625: }
0626: }
0627: return highest_deliv;
0628: }
0629:
0630: /**
0631: * Return all messages sent by us that are higher than <code>seqno</code>
0632: */
0633: List getSentMessagesHigherThan(long seqno) {
0634: List retval = new List();
0635: Long key;
0636:
0637: for (Enumeration e = sent_msgs.keys(); e.hasMoreElements();) {
0638: key = (Long) e.nextElement();
0639: if (key.longValue() > seqno)
0640: retval.add(sent_msgs.get(key));
0641: }
0642: return retval;
0643: }
0644:
0645: /**
0646: * Returns a message digest: for each member P in <code>highest_seqnos</code>, the highest seqno
0647: * received from P is added to the digest's array. If P == this, then the highest seqno
0648: * <em>sent</em> is added: this makes sure that messages sent but not yet received are also
0649: * re-broadcast (because they are also unstable).<p>If my highest seqno for a member P is
0650: * higher than the one in <code>highest_seqnos</code>, then all messages from P (received or sent)
0651: * whose seqno is higher are added to the digest's messages. The coordinator will use all digests
0652: * to compute a set of messages than need to be re-broadcast to the members before installing
0653: * a new view.
0654: */
0655: Digest computeMessageDigest(long[] highest_seqnos) {
0656: Digest digest = highest_seqnos != null ? new Digest(
0657: highest_seqnos.length) : null;
0658: Address sender;
0659: NakReceiverWindow win;
0660: List unstable_msgs;
0661: int own_index;
0662: long highest_seqno_sent = -1, highest_seqno_received = -1;
0663:
0664: if (digest == null) {
0665:
0666: if (log.isWarnEnabled())
0667: log
0668: .warn("highest_seqnos is null, cannot compute digest !");
0669: return null;
0670: }
0671:
0672: if (highest_seqnos.length != members.size()) {
0673:
0674: if (log.isWarnEnabled())
0675: log
0676: .warn("the mbrship size and the size "
0677: + "of the highest_seqnos array are not equal, cannot compute digest !");
0678: return null;
0679: }
0680:
0681: System.arraycopy(highest_seqnos, 0, digest.highest_seqnos,
0682: 0, digest.highest_seqnos.length);
0683:
0684: for (int i = 0; i < highest_seqnos.length; i++) {
0685: sender = (Address) members.elementAt(i);
0686: if (sender == null)
0687: continue;
0688: win = (NakReceiverWindow) received_msgs.get(sender);
0689: if (win == null)
0690: continue;
0691: digest.highest_seqnos[i] = win.getHighestReceived();
0692: unstable_msgs = win
0693: .getMessagesHigherThan(highest_seqnos[i]);
0694: for (Enumeration e = unstable_msgs.elements(); e
0695: .hasMoreElements();)
0696: digest.msgs.add(e.nextElement());
0697: }
0698:
0699: /** If our highest seqno <em>sent</em> is higher than the one <em>received</em>, we have to
0700: (a) set it in the digest and (b) add the corresponding messages **/
0701:
0702: own_index = members.indexOf(local_addr);
0703: if (own_index == -1) {
0704:
0705: if (log.isWarnEnabled())
0706: log.warn("no own address in highest_seqnos");
0707: return digest;
0708: }
0709: highest_seqno_received = digest.highest_seqnos[own_index];
0710: highest_seqno_sent = getHighestSeqnoSent();
0711:
0712: if (highest_seqno_sent > highest_seqno_received) {
0713: // (a) Set highest seqno sent in digest
0714: digest.highest_seqnos[own_index] = highest_seqno_sent;
0715:
0716: // (b) Add messages between highest_seqno_received and highest_seqno_sent
0717: unstable_msgs = getSentMessagesHigherThan(highest_seqno_received);
0718: for (Enumeration e = unstable_msgs.elements(); e
0719: .hasMoreElements();)
0720: digest.msgs.add(e.nextElement());
0721: }
0722:
0723: return digest;
0724: }
0725:
0726: /**
0727: * For each non-null member m in <code>range</code>, get messages with sequence numbers between
0728: * range[m][0] and range[m][1], excluding range[m][0] and including range[m][1].
0729: */
0730: List getMessagesInRange(long[][] range) {
0731: List retval = new List();
0732: List tmp;
0733: NakReceiverWindow win;
0734: Address sender;
0735:
0736: for (int i = 0; i < range.length; i++) {
0737: if (range[i] != null) {
0738: sender = (Address) members.elementAt(i);
0739: if (sender == null)
0740: continue;
0741: win = (NakReceiverWindow) received_msgs.get(sender);
0742: if (win == null)
0743: continue;
0744: tmp = win.getMessagesInRange(range[i][0],
0745: range[i][1]);
0746: if (tmp == null || tmp.size() < 1)
0747: continue;
0748: for (Enumeration e = tmp.elements(); e
0749: .hasMoreElements();)
0750: retval.add(e.nextElement());
0751: }
0752: }
0753: return retval;
0754: }
0755:
0756: void setAcks(boolean f) {
0757: acking = f;
0758: }
0759:
0760: /**
0761: * Vector with messages (ordered by sender) that are stable and can be discarded.
0762: * This applies to NAK-based sender and receivers.
0763: */
0764: void stable(long[] seqnos) {
0765: int index;
0766: long seqno;
0767: NakReceiverWindow recv_win;
0768: Address sender;
0769:
0770: if (members == null || local_addr == null) {
0771: if (log.isWarnEnabled())
0772: log.warn("members or local_addr are null !");
0773: return;
0774: }
0775: index = members.indexOf(local_addr);
0776:
0777: if (index < 0) {
0778:
0779: if (log.isWarnEnabled())
0780: log.warn("member " + local_addr + " not found in "
0781: + members);
0782: return;
0783: }
0784: seqno = seqnos[index];
0785:
0786: if (log.isInfoEnabled())
0787: log.info("deleting stable messages [" + deleted_up_to
0788: + " - " + seqno + ']');
0789:
0790: // delete sent messages that are stable (kept for retransmission requests from receivers)
0791: synchronized (sent_msgs) {
0792: for (long i = deleted_up_to; i <= seqno; i++) {
0793: sent_msgs.remove(new Long(i));
0794: }
0795: deleted_up_to = seqno;
0796: }
0797: // delete received msgs that are stable
0798: for (int i = 0; i < members.size(); i++) {
0799: sender = (Address) members.elementAt(i);
0800: recv_win = (NakReceiverWindow) received_msgs
0801: .get(sender);
0802: if (recv_win != null)
0803: recv_win.stable(seqnos[i]); // delete all messages with seqnos <= seqnos[i]
0804: }
0805: }
0806:
0807: void send(Message msg) {
0808: long id = getNextSeqno();
0809: ViewId vid_copy;
0810:
0811: if (vid == null)
0812: return;
0813: vid_copy = (ViewId) vid.clone();
0814: /** todo No needs to copy vid */
0815:
0816: if (acking) {
0817: msg.putHeader(getName(), new NakAckHeader(
0818: NakAckHeader.NAK_ACK_MSG, id, vid_copy));
0819: sender_win
0820: .add(id, msg.copy(), (Vector) members.clone()); // msg must be copied !
0821: } else
0822: msg.putHeader(getName(), new NakAckHeader(
0823: NakAckHeader.NAK_MSG, id, vid_copy));
0824:
0825: if (log.isInfoEnabled())
0826: log.info("sending msg #" + id);
0827:
0828: sent_msgs.put(new Long(id), msg.copy());
0829: passDown(new Event(Event.MSG, msg));
0830: }
0831:
0832: /** Re-broadcast message. Message already contains NakAckHeader (therefore also seqno).
0833: Wrap message (including header) in a new message and bcasts using ACKS. Every receiver
0834: acks the message, unwraps it to get the original message and delivers the original message
0835: (if not yet delivered).
0836: // send msgs in Vector arg again (they already have a NakAckHeader !)
0837: // -> use the same seq numbers
0838: // -> destination has to be set to null (broadcast), e.g.:
0839: // dst=p (me !), src=q --> dst=null, src=q
0840:
0841: TODO:
0842: -----
0843: resend() has to wait until it received all ACKs from all recipients (for all msgs), or until
0844: members were suspected. Thus we can ensure that all members received outstanding msgs before
0845: we switch to a new view. Otherwise, because the switch to a new view resets NAK and ACK msg
0846: transmission, slow members might never receive all outstanding messages.
0847: */
0848:
0849: /**
0850: * 1. Set the destination address of the original msg to null
0851: * 2. Add a new header WRAPPED_MSG and send msg. The receiver will ACK the msg,
0852: * remove the header and deliver the msg
0853: */
0854: void resend(Message msg) {
0855: Message copy = msg.copy();
0856: NakAckHeader hdr = (NakAckHeader) copy.getHeader(getName());
0857: NakAckHeader wrapped_hdr;
0858: long id = hdr.seqno;
0859:
0860: if (vid == null)
0861: return;
0862: copy.setDest(null); // broadcast, e.g. dst(p), src(q) --> dst(null), src(q)
0863: wrapped_hdr = new NakAckHeader(NakAckHeader.WRAPPED_MSG,
0864: hdr.seqno, hdr.vid);
0865: wrapped_hdr.sender = local_addr;
0866: copy.putHeader(WRAPPED_MSG_KEY, wrapped_hdr);
0867: sender_win.add(id, copy.copy(), (Vector) members.clone());
0868: if (log.isInfoEnabled())
0869: log.info("resending " + copy.getHeader(getName()));
0870: passDown(new Event(Event.MSG, copy));
0871: }
0872:
0873: void waitUntilAllAcksReceived(long timeout) {
0874: sender_win.waitUntilAllAcksReceived(timeout);
0875: }
0876:
0877: void receive(long id, Message msg, Vector stable_msgs) {
0878: /** todo Vector stable_msgs is not used in NAKer.receive() */
0879: Address sender = msg.getSrc();
0880: NakReceiverWindow win = (NakReceiverWindow) received_msgs
0881: .get(sender);
0882: Message msg_to_deliver;
0883:
0884: if (win == null) {
0885: win = new NakReceiverWindow(sender, this , 0);
0886: win.setRetransmitTimeouts(retransmit_timeout);
0887: received_msgs.put(sender, win);
0888: }
0889:
0890: if (log.isInfoEnabled())
0891: log.info("received <" + sender + '#' + id + '>');
0892:
0893: win.add(id, msg); // add in order, then remove and pass up as many msgs as possible
0894: while (true) {
0895: msg_to_deliver = win.remove();
0896: if (msg_to_deliver == null)
0897: break;
0898:
0899: if (msg_to_deliver.getHeader(getName()) instanceof NakAckHeader)
0900: msg_to_deliver.removeHeader(getName());
0901: passUp(new Event(Event.MSG, msg_to_deliver));
0902: }
0903: }
0904:
0905: void receiveAck(long id, Address sender) {
0906:
0907: if (log.isInfoEnabled())
0908: log.info("received ack <-- ACK <" + sender + '#' + id
0909: + '>');
0910: sender_win.ack(id, sender);
0911: }
0912:
0913: /**
0914: * Implementation of interface AckMcastSenderWindow.RetransmitCommand.<p>
0915: * Called by retransmission thread of AckMcastSenderWindow. <code>msg</code> is already
0916: * a copy, so does not need to be copied again.
0917: */
0918: public void retransmit(long seqno, Message msg, Address dest) {
0919:
0920: if (log.isInfoEnabled())
0921: log.info("retransmitting message " + seqno + " to "
0922: + dest + ", header is "
0923: + msg.getHeader(getName()));
0924:
0925: // check whether dest is member of group. If not, discard retransmission message and
0926: // also remove it from sender_win (AckMcastSenderWindow)
0927: if (members != null) {
0928: if (!members.contains(dest)) {
0929:
0930: if (log.isInfoEnabled())
0931: log
0932: .info("retransmitting "
0933: + seqno
0934: + ") to "
0935: + dest
0936: + ": "
0937: + dest
0938: + " is not a member; discarding retransmission and removing "
0939: + dest + " from sender_win");
0940: sender_win.remove(dest);
0941: return;
0942: }
0943: }
0944:
0945: msg.setDest(dest);
0946: passDown(new Event(Event.MSG, msg));
0947: }
0948:
0949: /**
0950: * Implementation of Retransmitter.RetransmitCommand.<p>
0951: * Called by retransmission thread when gap is detected. Sends retr. request
0952: * to originator of msg
0953: */
0954: public void retransmit(long first_seqno, long last_seqno,
0955: Address sender) {
0956:
0957: if (log.isInfoEnabled())
0958: log.info("retransmit([" + first_seqno + ", "
0959: + last_seqno + "]) to " + sender + ", vid="
0960: + vid);
0961:
0962: NakAckHeader hdr = new NakAckHeader(
0963: NakAckHeader.RETRANSMIT_MSG, first_seqno,
0964: (ViewId) vid.clone());
0965: /** todo Not necessary to clone vid */
0966: Message retransmit_msg = new Message(sender, null, null);
0967:
0968: hdr.last_seqno = last_seqno;
0969: retransmit_msg.putHeader(getName(), hdr);
0970: passDown(new Event(Event.MSG, retransmit_msg));
0971: }
0972:
0973: // Retransmit from sent-table, called when RETRANSMIT message is received
0974: void retransmit(Address dest, long first_seqno, long last_seqno) {
0975: Message m, retr_msg;
0976:
0977: for (long i = first_seqno; i <= last_seqno; i++) {
0978: m = (Message) sent_msgs.get(new Long(i));
0979: if (m == null) {
0980: if (log.isWarnEnabled())
0981: log.warn("(to " + dest + "): message with "
0982: + "seqno=" + i + " not found !");
0983: continue;
0984: }
0985:
0986: retr_msg = m.copy();
0987: retr_msg.setDest(dest);
0988:
0989: try {
0990: passDown(new Event(Event.MSG, retr_msg));
0991: } catch (Exception e) {
0992: if (log.isDebugEnabled())
0993: log.debug("exception is " + e);
0994: }
0995: }
0996: }
0997:
0998: void stop() {
0999: if (sender_win != null)
1000: sender_win.stop();
1001: }
1002:
1003: void reset() {
1004: NakReceiverWindow win;
1005:
1006: // Only reset if not coord: coord may have to retransmit the VIEW_CHANGE msg to slow members,
1007: // since VIEW_CHANGE results in retransmitter resetting, retransmission would be killed, and the
1008: // slow mbr would never receive a new view (see ./design/ViewChangeRetransmission.txt)
1009: if (!coordinator())
1010: sender_win.reset();
1011:
1012: sent_msgs.clear();
1013: for (Enumeration e = received_msgs.elements(); e
1014: .hasMoreElements();) {
1015: win = (NakReceiverWindow) e.nextElement();
1016: win.reset();
1017: }
1018: received_msgs.clear();
1019: seqno = 0;
1020: deleted_up_to = 0;
1021: }
1022:
1023: public void suspect(Address mbr) {
1024: NakReceiverWindow w;
1025:
1026: w = (NakReceiverWindow) received_msgs.get(mbr);
1027: if (w != null) {
1028: w.reset();
1029: received_msgs.remove(mbr);
1030: }
1031:
1032: sender_win.suspect(mbr); // don't keep retransmitting messages to mbr
1033: }
1034:
1035: String dumpContents() {
1036: StringBuffer ret = new StringBuffer();
1037:
1038: ret.append("\nsent_msgs: " + sent_msgs.size());
1039:
1040: ret.append("\nreceived_msgs: ");
1041: for (Enumeration e = received_msgs.keys(); e
1042: .hasMoreElements();) {
1043: Address key = (Address) e.nextElement();
1044: NakReceiverWindow w = (NakReceiverWindow) received_msgs
1045: .get(key);
1046: ret.append('\n' + w.toString());
1047: }
1048:
1049: ret.append("\nsender_win: " + sender_win.toString());
1050:
1051: return ret.toString();
1052: }
1053:
1054: }
1055:
1056: class OutOfBander implements AckMcastSenderWindow.RetransmitCommand {
1057: final AckMcastSenderWindow sender_win = new AckMcastSenderWindow(
1058: this , timer);
1059: final AckMcastReceiverWindow receiver_win = new AckMcastReceiverWindow();
1060: long seqno = 0;
1061:
1062: void send(Message msg) {
1063: long id = seqno++;
1064: Vector stable_msgs = sender_win.getStableMessages();
1065: NakAckHeader hdr;
1066:
1067: if (log.isInfoEnabled())
1068: log.info("sending msg #=" + id);
1069:
1070: hdr = new NakAckHeader(NakAckHeader.OUT_OF_BAND_MSG, id,
1071: null);
1072: hdr.stable_msgs = stable_msgs;
1073: msg.putHeader(getName(), hdr);
1074:
1075: // msg needs to be copied, otherwise it will be modified by the code below
1076: sender_win.add(id, msg.copy(), (Vector) members.clone());
1077:
1078: passDown(new Event(Event.MSG, msg));
1079: }
1080:
1081: void receive(long id, Message msg, Vector stable_msgs) {
1082: Address sender = msg.getSrc();
1083:
1084: // first thing: send ACK back to sender
1085: Message ack_msg = new Message(msg.getSrc(), null, null);
1086: NakAckHeader hdr = new NakAckHeader(
1087: NakAckHeader.OUT_OF_BAND_RSP, id, null);
1088: ack_msg.putHeader(getName(), hdr);
1089:
1090: if (log.isInfoEnabled())
1091: log.info("received <" + sender + '#' + id + ">\n");
1092:
1093: if (receiver_win.add(sender, id)) // not received previously
1094: passUp(new Event(Event.MSG, msg));
1095:
1096: passDown(new Event(Event.MSG, ack_msg)); // send ACK
1097: if (log.isInfoEnabled())
1098: log.info("sending ack <" + sender + '#' + id + ">\n");
1099:
1100: if (stable_msgs != null)
1101: receiver_win.remove(sender, stable_msgs);
1102: }
1103:
1104: void receiveAck(long id, Address sender) {
1105: if (log.isInfoEnabled())
1106: log.info("received ack <" + sender + '#' + id + '>');
1107: sender_win.ack(id, sender);
1108: }
1109:
1110: /**
1111: * Called by retransmission thread of AckMcastSenderWindow. <code>msg</code> is already
1112: * a copy, so does not need to be copied again. All the necessary header are already present;
1113: * no header needs to be added ! The message is retransmitted as <em>unicast</em> !
1114: */
1115: public void retransmit(long seqno, Message msg, Address dest) {
1116: if (log.isInfoEnabled())
1117: log.info("dest=" + dest + ", msg #" + seqno);
1118: msg.setDest(dest);
1119: passDown(new Event(Event.MSG, msg));
1120: }
1121:
1122: void reset() {
1123: sender_win.reset(); // +++ ?
1124: receiver_win.reset(); // +++ ?
1125: }
1126:
1127: void suspect(Address mbr) {
1128: sender_win.suspect(mbr);
1129: receiver_win.suspect(mbr);
1130: }
1131:
1132: void start() {
1133: sender_win.start();
1134: }
1135:
1136: void stop() {
1137: if (sender_win != null)
1138: sender_win.stop();
1139: }
1140:
1141: String dumpContents() {
1142: StringBuffer ret = new StringBuffer();
1143: ret.append("\nsender_win:\n" + sender_win.toString()
1144: + "\nreceiver_win:\n" + receiver_win.toString());
1145: return ret.toString();
1146: }
1147:
1148: }
1149:
1150: }
|