0001: // $Id: PBCAST.java,v 1.16.6.1 2007/04/27 08:03:55 belaban Exp $
0002:
0003: package org.jgroups.protocols.pbcast;
0004:
0005: import org.jgroups.Address;
0006: import org.jgroups.Event;
0007: import org.jgroups.Message;
0008: import org.jgroups.View;
0009: import org.jgroups.stack.NakReceiverWindow;
0010: import org.jgroups.stack.Protocol;
0011: import org.jgroups.util.List;
0012: import org.jgroups.util.Queue;
0013: import org.jgroups.util.QueueClosedException;
0014: import org.jgroups.util.Util;
0015:
0016: import java.util.*;
0017:
0018: /**
0019: * Implementation of probabilistic broadcast. Sends group messages via unreliable multicast. Gossips regularly to
0020: * a random subset of group members to retransmit missing messages. Gossiping is used both for bringing all
0021: * members to the same state (having received the same messages) and to garbage-collect messages seen by all members
0022: * (gc is piggybacked in gossip messages). See DESIGN for more details.
0023: * @author Bela Ban
0024: */
0025: public class PBCAST extends Protocol implements Runnable {
0026: boolean operational = false;
0027: long seqno = 1; // seqno for messages. 1 for the first message
0028: long gossip_round = 1; // identifies the gossip (together with sender)
0029: Address local_addr = null;
0030: final Hashtable digest = new Hashtable(); // stores all messages from members (key: member, val: NakReceiverWindow)
0031: Thread gossip_thread = null;
0032: GossipHandler gossip_handler = null; // removes gossips and other requests from queue and handles them
0033: final Queue gossip_queue = new Queue(); // (bounded) queue for incoming gossip requests
0034: int max_queue = 100; // max elements in gossip_queue (bounded buffer)
0035: long gossip_interval = 5000; // gossip every 5 seconds
0036: double subset = 0.1; // send gossip messages to a subset consisting of 10% of the mbrship
0037: long desired_avg_gossip = 30000; // receive a gossip every 30 secs on average
0038: final Vector members = new Vector();
0039: final List gossip_list = new List(); // list of gossips received, we periodically purge it (FIFO)
0040: int max_gossip_cache = 100; // number of gossips to keep until gossip list is purged
0041: int gc_lag = 30; // how many seqnos should we lag behind (see DESIGN)
0042: final Hashtable invalid_gossipers = new Hashtable(); // keys=Address, val=Integer (number of gossips from suspected mbrs)
0043: final int max_invalid_gossips = 2; // max number of gossip from non-member before that member is shunned
0044: Vector seen_list = null;
0045: boolean shun = false; // whether invalid gossipers will be shunned or not
0046: boolean dynamic = true; // whether to use dynamic or static gosssip_interval (overrides gossip_interval)
0047: boolean skip_sleep = true;
0048: boolean mcast_gossip = true; // use multicast for gossips (subset will be ignored, send to all members)
0049:
0050: public String getName() {
0051: return "PBCAST";
0052: }
0053:
0054: public Vector providedUpServices() {
0055: Vector retval = new Vector();
0056: retval.addElement(new Integer(Event.GET_DIGEST));
0057: retval.addElement(new Integer(Event.SET_DIGEST));
0058: retval.addElement(new Integer(Event.GET_DIGEST_STATE));
0059: return retval;
0060: }
0061:
0062: public void stop() {
0063: stopGossipThread();
0064: stopGossipHandler();
0065: operational = false;
0066: }
0067:
0068: public void up(Event evt) {
0069: Message m;
0070: PbcastHeader hdr;
0071: Address sender = null;
0072:
0073: switch (evt.getType()) {
0074: case Event.MSG:
0075: m = (Message) evt.getArg();
0076: if (m.getDest() != null
0077: && !m.getDest().isMulticastAddress()) {
0078: if (!(m.getHeader(getName()) instanceof PbcastHeader))
0079: break; // unicast address: not null and not mcast, pass up unchanged
0080: }
0081:
0082: // discard all multicast messages until we become operational (transition from joiner to member)
0083: if (!operational) {
0084:
0085: if (log.isInfoEnabled())
0086: log
0087: .info("event was discarded as I'm not yet operational. Event: "
0088: + Util.printEvent(evt));
0089: return; // don't pass up
0090: }
0091:
0092: if (m.getHeader(getName()) instanceof PbcastHeader)
0093: hdr = (PbcastHeader) m.removeHeader(getName());
0094: else {
0095: sender = m.getSrc();
0096:
0097: if (log.isErrorEnabled())
0098: log
0099: .error("PbcastHeader expected, but received header of type "
0100: + m.getHeader(getName()).getClass()
0101: .getName()
0102: + " from "
0103: + sender
0104: + ". Passing event up unchanged");
0105: break;
0106: }
0107:
0108: switch (hdr.type) {
0109: case PbcastHeader.MCAST_MSG: // messages are handled directly (high priority)
0110: handleUpMessage(m, hdr);
0111: return;
0112:
0113: // all other requests are put in the bounded gossip queue (discarded if full). this helps to ensure
0114: // that no 'gossip storms' will occur (overflowing the buffers and the network)
0115: case PbcastHeader.GOSSIP:
0116: case PbcastHeader.XMIT_REQ:
0117: case PbcastHeader.XMIT_RSP:
0118: case PbcastHeader.NOT_MEMBER:
0119: try {
0120: if (gossip_queue.size() >= max_queue) {
0121:
0122: if (log.isWarnEnabled())
0123: log
0124: .warn("gossip request "
0125: + PbcastHeader
0126: .type2String(hdr.type)
0127: + " discarded because "
0128: + "gossip_queue is full (number of elements="
0129: + gossip_queue.size() + ')');
0130: return;
0131: }
0132: gossip_queue.add(new GossipEntry(hdr, m.getSrc(), m
0133: .getBuffer()));
0134: } catch (Exception ex) {
0135: if (log.isWarnEnabled())
0136: log
0137: .warn("exception adding request to gossip_queue, details="
0138: + ex);
0139: }
0140: return;
0141:
0142: default:
0143: if (log.isErrorEnabled())
0144: log.error("type (" + hdr.type
0145: + ") of PbcastHeader not known !");
0146: return;
0147: }
0148:
0149: case Event.SET_LOCAL_ADDRESS:
0150: local_addr = (Address) evt.getArg();
0151: break; // pass up
0152: }
0153:
0154: passUp(evt); // pass up by default
0155: }
0156:
0157: public void down(Event evt) {
0158: PbcastHeader hdr;
0159: Message m, copy;
0160: View v;
0161: Vector mbrs;
0162: Address key;
0163: NakReceiverWindow win;
0164:
0165: switch (evt.getType()) {
0166:
0167: case Event.MSG:
0168: m = (Message) evt.getArg();
0169: if (m.getDest() != null
0170: && !m.getDest().isMulticastAddress()) {
0171: break; // unicast address: not null and not mcast, pass down unchanged
0172: } else { // multicast address
0173: hdr = new PbcastHeader(PbcastHeader.MCAST_MSG, seqno);
0174: m.putHeader(getName(), hdr);
0175:
0176: // put message in NakReceiverWindow (to be on the safe side if we don't receive it ...)
0177: synchronized (digest) {
0178: win = (NakReceiverWindow) digest.get(local_addr);
0179: if (win == null) {
0180: if (log.isInfoEnabled())
0181: log
0182: .info("NakReceiverWindow for sender "
0183: + local_addr
0184: + " not found. Creating new NakReceiverWindow starting at seqno="
0185: + seqno);
0186: win = new NakReceiverWindow(local_addr, seqno);
0187: digest.put(local_addr, win);
0188: }
0189: copy = m.copy();
0190: copy.setSrc(local_addr);
0191: win.add(seqno, copy);
0192: }
0193: seqno++;
0194: break;
0195: }
0196:
0197: case Event.SET_DIGEST:
0198: setDigest((Digest) evt.getArg());
0199: return; // don't pass down
0200:
0201: case Event.GET_DIGEST: // don't pass down
0202: passUp(new Event(Event.GET_DIGEST_OK, getDigest()));
0203: return;
0204:
0205: case Event.GET_DIGEST_STATE: // don't pass down
0206: passUp(new Event(Event.GET_DIGEST_STATE_OK, getDigest()));
0207: return;
0208:
0209: case Event.VIEW_CHANGE:
0210: v = (View) evt.getArg();
0211: if (v == null) {
0212: if (log.isErrorEnabled())
0213: log.error("view is null !");
0214: break;
0215: }
0216: mbrs = v.getMembers();
0217:
0218: // update internal membership list
0219: synchronized (members) {
0220: members.removeAllElements();
0221: for (int i = 0; i < mbrs.size(); i++)
0222: members.addElement(mbrs.elementAt(i));
0223: }
0224:
0225: // delete all members in digest that are not in new membership list
0226: if (mbrs.size() > 0) {
0227: synchronized (digest) {
0228: for (Enumeration e = digest.keys(); e
0229: .hasMoreElements();) {
0230: key = (Address) e.nextElement();
0231: if (!mbrs.contains(key)) {
0232: win = (NakReceiverWindow) digest.get(key);
0233: win.reset();
0234: digest.remove(key);
0235: }
0236: }
0237: }
0238: }
0239:
0240: // add all members from new membership list that are not yet in digest
0241: for (int i = 0; i < mbrs.size(); i++) {
0242: key = (Address) mbrs.elementAt(i);
0243: if (!digest.containsKey(key)) {
0244: digest.put(key, new NakReceiverWindow(key, 1));
0245: }
0246: }
0247:
0248: if (dynamic) {
0249: gossip_interval = computeGossipInterval(members.size(),
0250: desired_avg_gossip);
0251:
0252: if (log.isInfoEnabled())
0253: log.info("VIEW_CHANGE: gossip_interval="
0254: + gossip_interval);
0255: if (gossip_thread != null) {
0256: skip_sleep = true;
0257: gossip_thread.interrupt(); // wake up and sleep according to the new gossip_interval
0258: }
0259: }
0260:
0261: startGossipThread(); // will only be started if not yet running
0262: startGossipHandler();
0263: break;
0264:
0265: case Event.BECOME_SERVER:
0266: operational = true;
0267: break;
0268: }
0269:
0270: passDown(evt);
0271: }
0272:
0273: /** Gossip thread. Sends gossips containing a message digest every <code>gossip_interval</code> msecs */
0274: public void run() {
0275: while (gossip_thread != null) { // stopGossipThread() sets gossip_thread to null
0276: if (dynamic) {
0277: gossip_interval = computeGossipInterval(members.size(),
0278: desired_avg_gossip);
0279:
0280: if (log.isInfoEnabled())
0281: log.info("gossip_interval=" + gossip_interval);
0282: }
0283:
0284: Util.sleep(gossip_interval);
0285: if (skip_sleep)
0286: skip_sleep = false;
0287: else
0288: sendGossip();
0289: }
0290: }
0291:
0292: /** Setup the Protocol instance acording to the configuration string */
0293: public boolean setProperties(Properties props) {
0294: super .setProperties(props);
0295: String str;
0296:
0297: str = props.getProperty("dynamic");
0298: if (str != null) {
0299: dynamic = Boolean.valueOf(str).booleanValue();
0300: props.remove("dynamic");
0301: }
0302:
0303: str = props.getProperty("shun");
0304: if (str != null) {
0305: shun = Boolean.valueOf(str).booleanValue();
0306: props.remove("shun");
0307: }
0308:
0309: str = props.getProperty("gossip_interval");
0310: if (str != null) {
0311: gossip_interval = Long.parseLong(str);
0312: props.remove("gossip_interval");
0313: }
0314:
0315: str = props.getProperty("mcast_gossip");
0316: if (str != null) {
0317: mcast_gossip = Boolean.valueOf(str).booleanValue();
0318: props.remove("mcast_gossip");
0319: }
0320:
0321: str = props.getProperty("subset");
0322: if (str != null) {
0323: subset = Double.parseDouble(str);
0324: props.remove("subset");
0325: }
0326:
0327: str = props.getProperty("desired_avg_gossip");
0328: if (str != null) {
0329: desired_avg_gossip = Long.parseLong(str);
0330: props.remove("desired_avg_gossip");
0331: }
0332:
0333: str = props.getProperty("max_queue");
0334: if (str != null) {
0335: max_queue = Integer.parseInt(str);
0336: props.remove("max_queue");
0337: }
0338:
0339: str = props.getProperty("max_gossip_cache");
0340: if (str != null) {
0341: max_gossip_cache = Integer.parseInt(str);
0342: props.remove("max_gossip_cache");
0343: }
0344:
0345: str = props.getProperty("gc_lag");
0346: if (str != null) {
0347: gc_lag = Integer.parseInt(str);
0348: props.remove("gc_lag");
0349: }
0350:
0351: if (props.size() > 0) {
0352: log
0353: .error("PBCAST.setProperties(): the following properties are not recognized: "
0354: + props);
0355:
0356: return false;
0357: }
0358: return true;
0359: }
0360:
0361: /* --------------------------------- Private Methods --------------------------------------------- */
0362:
0363: /**
0364: Ensures that FIFO is observed for all messages for a certain member. The NakReceiverWindow corresponding
0365: to a certain sender is looked up in a hashtable. Then, the message is added to the NakReceiverWindow.
0366: As many messages as possible are then removed from the table and passed up.
0367: */
0368: void handleUpMessage(Message m, PbcastHeader hdr) {
0369: Address sender = m.getSrc();
0370: NakReceiverWindow win = null;
0371: Message tmpmsg;
0372: long tmp_seqno = hdr.seqno;
0373:
0374: if (sender == null) {
0375: if (log.isErrorEnabled())
0376: log.error("sender is null");
0377: return;
0378: }
0379:
0380: synchronized (digest) {
0381: win = (NakReceiverWindow) digest.get(sender);
0382: if (win == null) {
0383: if (log.isWarnEnabled())
0384: log
0385: .warn("NakReceiverWindow for sender "
0386: + sender
0387: + " not found. Creating new NakReceiverWindow starting at seqno="
0388: + tmp_seqno);
0389: win = new NakReceiverWindow(sender, tmp_seqno);
0390: digest.put(sender, win);
0391: }
0392:
0393: // *************************************
0394: // The header was removed before, so we add it again for the NakReceiverWindow. When there is a
0395: // retransmission request, the header will already be attached to the message (both message and
0396: // header are *copied* into delivered_msgs when a message is removed from NakReceiverWindow).
0397: // *************************************
0398: m.putHeader(getName(), hdr);
0399: win.add(tmp_seqno, m);
0400:
0401: if (log.isInfoEnabled())
0402: log
0403: .info("receiver window for " + sender + " is "
0404: + win);
0405:
0406: // Try to remove as many message as possible and send them up the stack
0407: while ((tmpmsg = win.remove()) != null) {
0408: tmpmsg.removeHeader(getName()); // need to remove header again, so upper protocols don't get confused
0409: passUp(new Event(Event.MSG, tmpmsg));
0410: }
0411:
0412: // Garbage collect messages if singleton member (because then we won't receive any gossips, triggering
0413: // garbage collection)
0414: if (members.size() == 1) {
0415: tmp_seqno = Math.max(tmp_seqno - gc_lag, 0);
0416: if (tmp_seqno <= 0) {
0417: } else {
0418: if (log.isTraceEnabled())
0419: log.trace("deleting messages < " + tmp_seqno
0420: + " from " + sender);
0421: win.stable(tmp_seqno);
0422: }
0423: }
0424: }
0425: }
0426:
0427: /**
0428: * Returns for each sender the 'highest seen' seqno from the digest. Highest seen means the
0429: * highest seqno without any gaps, e.g. if for a sender P the messages 2 3 4 6 7 were received,
0430: * then only 2, 3 and 4 can be delivered, so 4 is the highest seen. 6 and 7 cannot because there
0431: * 5 is missing. If there are no message, the highest seen seqno is -1.
0432: */
0433: Digest getDigest() {
0434: Digest ret = new Digest(digest.size());
0435: long highest_seqno, lowest_seqno;
0436: Address key;
0437: NakReceiverWindow win;
0438:
0439: for (Enumeration e = digest.keys(); e.hasMoreElements();) {
0440: key = (Address) e.nextElement();
0441: win = (NakReceiverWindow) digest.get(key);
0442: lowest_seqno = win.getLowestSeen();
0443: highest_seqno = win.getHighestSeen();
0444: ret.add(key, lowest_seqno, highest_seqno);
0445: }
0446:
0447: if (log.isInfoEnabled())
0448: log.info("digest is " + ret);
0449:
0450: return ret;
0451: }
0452:
0453: /**
0454: * Sets (or resets) the contents of the 'digest' table. Its current messages will be deleted and the
0455: * NakReceiverTables reset.
0456: */
0457: void setDigest(Digest d) {
0458: NakReceiverWindow win;
0459:
0460: long tmp_seqno = 1;
0461:
0462: synchronized (digest) {
0463: for (Enumeration e = digest.elements(); e.hasMoreElements();) {
0464: win = (NakReceiverWindow) e.nextElement();
0465: win.reset();
0466: }
0467: digest.clear();
0468:
0469: Map.Entry entry;
0470: Address sender;
0471: org.jgroups.protocols.pbcast.Digest.Entry val;
0472: for (Iterator it = d.senders.entrySet().iterator(); it
0473: .hasNext();) {
0474: entry = (Map.Entry) it.next();
0475: sender = (Address) entry.getKey();
0476: if (sender == null) {
0477: if (log.isErrorEnabled())
0478: log
0479: .error("cannot set item because sender is null");
0480: continue;
0481: }
0482: val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
0483: .getValue();
0484: tmp_seqno = val.high_seqno;
0485: digest.put(sender, new NakReceiverWindow(sender,
0486: tmp_seqno + 1)); // next to expect, digest had *last* seen !
0487: }
0488: }
0489: }
0490:
0491: String printDigest() {
0492: long highest_seqno;
0493: Address key;
0494: NakReceiverWindow win;
0495: StringBuffer sb = new StringBuffer();
0496:
0497: for (Enumeration e = digest.keys(); e.hasMoreElements();) {
0498: key = (Address) e.nextElement();
0499: win = (NakReceiverWindow) digest.get(key);
0500: highest_seqno = win.getHighestSeen();
0501: sb.append(key + ": " + highest_seqno + '\n');
0502: }
0503: return sb.toString();
0504: }
0505:
0506: String printIncomingMessageQueue() {
0507: StringBuffer sb = new StringBuffer();
0508: NakReceiverWindow win;
0509:
0510: win = (NakReceiverWindow) digest.get(local_addr);
0511: sb.append(win);
0512: return sb.toString();
0513: }
0514:
0515: void startGossipThread() {
0516: if (gossip_thread == null) {
0517: gossip_thread = new Thread(this );
0518: gossip_thread.setDaemon(true);
0519: gossip_thread.start();
0520: }
0521: }
0522:
0523: void stopGossipThread() {
0524: Thread tmp;
0525:
0526: if (gossip_thread != null) {
0527: if (gossip_thread.isAlive()) {
0528: tmp = gossip_thread;
0529: gossip_thread = null;
0530: tmp.interrupt();
0531: tmp = null;
0532: }
0533: }
0534: gossip_thread = null;
0535: }
0536:
0537: void startGossipHandler() {
0538: if (gossip_handler == null) {
0539: gossip_handler = new GossipHandler(gossip_queue);
0540: gossip_handler.start();
0541: }
0542: }
0543:
0544: void stopGossipHandler() {
0545: if (gossip_handler != null) {
0546: gossip_handler.stop();
0547: gossip_handler = null;
0548: }
0549: }
0550:
0551: /**
0552: * Send a gossip message with a message digest of the highest seqnos seen per sender to a subset
0553: * of the current membership. Exclude self (I receive all mcasts sent by myself).
0554: */
0555: void sendGossip() {
0556: Vector current_mbrs = (Vector) members.clone();
0557: Vector subset_mbrs = null;
0558: Gossip gossip = null;
0559: Message msg;
0560: Address dest;
0561: PbcastHeader hdr;
0562:
0563: if (local_addr != null)
0564: current_mbrs.remove(local_addr); // don't pick myself
0565:
0566: if (mcast_gossip) { // send gossip to all members using a multicast
0567: gossip = new Gossip(local_addr, gossip_round, getDigest()
0568: .copy(), null); // not_seen list is null, prevents forwarding
0569: for (int i = 0; i < current_mbrs.size(); i++)
0570: // all members have seen this gossip. Used for garbage collection
0571: gossip.addToSeenList((Address) current_mbrs
0572: .elementAt(i));
0573: hdr = new PbcastHeader(gossip, PbcastHeader.GOSSIP);
0574: msg = new Message(); // null dest == multicast to all members
0575: msg.putHeader(getName(), hdr);
0576:
0577: if (log.isInfoEnabled())
0578: log.info("(from " + local_addr
0579: + ") multicasting gossip " + gossip.shortForm()
0580: + " to all members");
0581:
0582: passDown(new Event(Event.MSG, msg));
0583: } else {
0584: subset_mbrs = Util.pickSubset(current_mbrs, subset);
0585:
0586: for (int i = 0; i < subset_mbrs.size(); i++) {
0587: gossip = new Gossip(local_addr, gossip_round,
0588: getDigest().copy(), (Vector) current_mbrs
0589: .clone());
0590: gossip.addToSeenList(local_addr);
0591: hdr = new PbcastHeader(gossip, PbcastHeader.GOSSIP);
0592: dest = (Address) subset_mbrs.elementAt(i);
0593: msg = new Message(dest);
0594: msg.putHeader(getName(), hdr);
0595:
0596: if (log.isInfoEnabled())
0597: log.info("(from " + local_addr
0598: + ") sending gossip " + gossip.shortForm()
0599: + " to " + subset_mbrs);
0600:
0601: passDown(new Event(Event.MSG, msg));
0602: }
0603: }
0604:
0605: gossip_round++;
0606: }
0607:
0608: /**
0609: * MOST IMPORTANT METHOD IN THIS CLASS !! This guy really decides how a gossip reaches all members,
0610: * or whether it will flood the network !<p>
0611: * Scrutinize the gossip received and request retransmission of messages that we haven't received yet.
0612: * A gossip has a digest which carries for each sender the lowest and highest seqno seen. We check
0613: * this range against our own digest and request retransmission of missing messages if needed.<br>
0614: * <em>See DESIGN for a description of this method</em>
0615: */
0616: void handleGossip(Gossip gossip) {
0617: long my_low = 0, my_high = 0, their_low, their_high;
0618: Hashtable ht = null;
0619: Digest their_digest;
0620: NakReceiverWindow win;
0621: Message msg;
0622: Address dest;
0623: Vector new_dests;
0624: PbcastHeader hdr;
0625: List missing_msgs; // list of missing messages (for retransmission) (List of Longs)
0626:
0627: if (log.isTraceEnabled())
0628: log.trace("(from " + local_addr + ") received gossip "
0629: + gossip.shortForm() + " from " + gossip.sender);
0630:
0631: if (gossip == null || gossip.digest == null) {
0632: if (log.isWarnEnabled())
0633: log.warn("gossip is null or digest is null");
0634: return;
0635: }
0636:
0637: /* 1. If gossip sender is null, we cannot ask it for missing messages anyway, so discard gossip ! */
0638: if (gossip.sender == null) {
0639: if (log.isErrorEnabled())
0640: log
0641: .error("sender of gossip is null; "
0642: + "don't know where to send XMIT_REQ to. Discarding gossip");
0643: return;
0644: }
0645:
0646: /* 2. Don't process the gossip if the sender of the gossip is not a member anymore. If it is a newly
0647: joined member, discard it as well (we can't tell the difference). When the new member will be
0648: added to the membership, then its gossips will be processed */
0649: if (!members.contains(gossip.sender)) {
0650: if (log.isWarnEnabled())
0651: log
0652: .warn("sender "
0653: + gossip.sender
0654: + " is not a member. Gossip will not be processed");
0655: if (shun)
0656: shunInvalidGossiper(gossip.sender);
0657: return;
0658: }
0659:
0660: /* 3. If this gossip was received before, just discard it and return (don't process the
0661: same gossip twice). This prevents flooding of the gossip sender with retransmission reqs */
0662: while (gossip_list.size() >= max_gossip_cache)
0663: // first delete oldest gossips
0664: gossip_list.removeFromHead();
0665:
0666: if (gossip_list.contains(gossip)) // already received, don't re-broadcast
0667: return;
0668: else
0669: gossip_list.add(gossip.copy()); // add to list of received gossips
0670:
0671: /* 4. Send a HEARD_FROM event containing all members in the gossip-chain down to the FD layer.
0672: This ensures that we don't suspect them */
0673: seen_list = gossip.getSeenList();
0674: if (seen_list.size() > 0)
0675: passDown(new Event(Event.HEARD_FROM, seen_list.clone()));
0676:
0677: /* 5. Compare their digest against ours. Find out if some messages in the their digest are
0678: not in our digest. If yes, put them in the 'ht' hashtable for retransmission */
0679: their_digest = gossip.digest;
0680:
0681: Map.Entry entry;
0682: Address sender;
0683: org.jgroups.protocols.pbcast.Digest.Entry val;
0684: for (Iterator it = their_digest.senders.entrySet().iterator(); it
0685: .hasNext();) {
0686: entry = (Map.Entry) it.next();
0687: sender = (Address) entry.getKey();
0688: val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
0689: .getValue();
0690: their_low = val.low_seqno;
0691: their_high = val.high_seqno;
0692: if (their_low == 0 && their_high == 0)
0693: continue; // won't have any messages for this sender, don't even re-send
0694:
0695: win = (NakReceiverWindow) digest.get(sender);
0696: if (win == null) {
0697: // this specific sender in this digest is probably not a member anymore, new digests
0698: // won't contain it. for now, just ignore it. if it is a new member, it will be in the next
0699: // gossips
0700:
0701: if (log.isWarnEnabled())
0702: log.warn("sender " + sender
0703: + " not found, skipping...");
0704: continue;
0705: }
0706:
0707: my_low = win.getLowestSeen();
0708: my_high = win.getHighestSeen();
0709: if (my_high < their_high) {
0710: // changed by Bela (June 26 2003) - replaced my_high with my_low (not tested though !)
0711: if (my_low + 1 < their_low) {
0712: } else {
0713: missing_msgs = win.getMissingMessages(my_high,
0714: their_high);
0715: if (missing_msgs != null) {
0716: if (log.isInfoEnabled())
0717: log.info("asking " + gossip.sender
0718: + " for retransmission of "
0719: + sender + ", missing messages: "
0720: + missing_msgs + "\nwin for "
0721: + sender + ":\n" + win + '\n');
0722: if (ht == null)
0723: ht = new Hashtable();
0724: ht.put(sender, missing_msgs);
0725: }
0726: }
0727: }
0728: }
0729:
0730: /* 6. Send a XMIT_REQ to the sender of the gossip. The sender will then resend those messages as
0731: an XMIT_RSP unicast message (the messages are in its buffer, as a List) */
0732: if (ht == null || ht.size() == 0) {
0733: } else {
0734: hdr = new PbcastHeader(PbcastHeader.XMIT_REQ);
0735: hdr.xmit_reqs = ht;
0736:
0737: if (log.isInfoEnabled())
0738: log.info("sending XMIT_REQ to " + gossip.sender);
0739: msg = new Message(gossip.sender, null, null);
0740: msg.putHeader(getName(), hdr);
0741: passDown(new Event(Event.MSG, msg));
0742: }
0743:
0744: /* 7. Remove myself from 'not_seen' list. If not_seen list is empty, we can garbage-collect messages
0745: smaller than the digest. Since all the members have seen the gossip, it will not be re-sent */
0746: gossip.removeFromNotSeenList(local_addr);
0747: if (gossip.sizeOfNotSeenList() == 0) {
0748: garbageCollect(gossip.digest);
0749: return;
0750: }
0751:
0752: /* 8. If we make it to this point, re-send to subset of remaining members in 'not_seen' list */
0753: new_dests = Util.pickSubset(gossip.getNotSeenList(), subset);
0754:
0755: if (log.isInfoEnabled())
0756: log.info("(from " + local_addr + ") forwarding gossip "
0757: + gossip.shortForm() + " to " + new_dests);
0758: gossip.addToSeenList(local_addr);
0759: for (int i = 0; i < new_dests.size(); i++) {
0760: dest = (Address) new_dests.elementAt(i);
0761: msg = new Message(dest, null, null);
0762: hdr = new PbcastHeader(gossip.copy(), PbcastHeader.GOSSIP);
0763: msg.putHeader(getName(), hdr);
0764: passDown(new Event(Event.MSG, msg));
0765: }
0766: }
0767:
0768: /**
0769: * Find the messages indicated in <code>xmit_reqs</code> and re-send them to
0770: * <code>requester</code>
0771: */
0772: void handleXmitRequest(Address requester, Hashtable xmit_reqs) {
0773: NakReceiverWindow win;
0774: Address sender;
0775: List msgs, missing_msgs, xmit_msgs;
0776: Message msg;
0777:
0778: if (requester == null) {
0779: if (log.isErrorEnabled())
0780: log.error("requester is null");
0781: return;
0782: }
0783:
0784: if (log.isInfoEnabled())
0785: log.info("retransmission requests are "
0786: + printXmitReqs(xmit_reqs));
0787: for (Enumeration e = xmit_reqs.keys(); e.hasMoreElements();) {
0788: sender = (Address) e.nextElement();
0789: win = (NakReceiverWindow) digest.get(sender);
0790: if (win == null) {
0791: if (log.isWarnEnabled())
0792: log
0793: .warn("sender "
0794: + sender
0795: + " not found in my digest; skipping retransmit request !");
0796: continue;
0797: }
0798:
0799: missing_msgs = (List) xmit_reqs.get(sender);
0800: msgs = win.getMessagesInList(missing_msgs); // msgs to be sent back to requester
0801:
0802: // re-send the messages to requester. don't add a header since they already have headers
0803: // (when added to the NakReceiverWindow, the headers were not removed)
0804: xmit_msgs = new List();
0805: for (Enumeration en = msgs.elements(); en.hasMoreElements();) {
0806: msg = ((Message) en.nextElement()).copy();
0807: xmit_msgs.add(msg);
0808: }
0809:
0810: // create a msg with the List of xmit_msgs as contents, add header
0811: msg = new Message(requester, null, xmit_msgs);
0812: msg.putHeader(getName(), new PbcastHeader(
0813: PbcastHeader.XMIT_RSP));
0814: passDown(new Event(Event.MSG, msg));
0815: }
0816: }
0817:
0818: void handleXmitRsp(List xmit_msgs) {
0819: Message m;
0820: PbcastHeader hdr;
0821:
0822: for (Enumeration e = xmit_msgs.elements(); e.hasMoreElements();) {
0823: m = (Message) e.nextElement();
0824: hdr = (PbcastHeader) m.removeHeader(getName());
0825: if (hdr == null) {
0826: log.warn("header is null, ignoring message");
0827: } else {
0828: if (log.isInfoEnabled())
0829: log.info("received #" + hdr.seqno + ", type="
0830: + PbcastHeader.type2String(hdr.type)
0831: + ", msg=" + m);
0832: handleUpMessage(m, hdr);
0833: }
0834: }
0835: }
0836:
0837: String printXmitReqs(Hashtable xmit_reqs) {
0838: StringBuffer sb = new StringBuffer();
0839: Address key;
0840: boolean first = true;
0841:
0842: if (xmit_reqs == null)
0843: return "<null>";
0844:
0845: for (Enumeration e = xmit_reqs.keys(); e.hasMoreElements();) {
0846: key = (Address) e.nextElement();
0847: if (!first) {
0848: sb.append(", ");
0849: } else
0850: first = false;
0851: sb.append(key + ": " + xmit_reqs.get(key));
0852: }
0853: return sb.toString();
0854: }
0855:
0856: void garbageCollect(Digest d) {
0857: Address sender;
0858: long tmp_seqno;
0859: NakReceiverWindow win;
0860: Map.Entry entry;
0861: org.jgroups.protocols.pbcast.Digest.Entry val;
0862:
0863: for (Iterator it = d.senders.entrySet().iterator(); it
0864: .hasNext();) {
0865: entry = (Map.Entry) it.next();
0866: sender = (Address) entry.getKey();
0867: val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
0868: .getValue();
0869: win = (NakReceiverWindow) digest.get(sender);
0870: if (win == null) {
0871: if (log.isDebugEnabled())
0872: log
0873: .debug("sender "
0874: + sender
0875: + " not found in our message digest, skipping");
0876: continue;
0877: }
0878: tmp_seqno = val.high_seqno;
0879: tmp_seqno = Math.max(tmp_seqno - gc_lag, 0);
0880: if (tmp_seqno <= 0) {
0881: continue;
0882: }
0883:
0884: if (log.isTraceEnabled())
0885: log.trace("(from " + local_addr
0886: + ") GC: deleting messages < " + tmp_seqno
0887: + " from " + sender);
0888: win.stable(tmp_seqno);
0889: }
0890: }
0891:
0892: /**
0893: * If sender of gossip is not a member, send a NOT_MEMBER to sender (after n gossips received).
0894: * This will cause that member to leave the group and possibly re-join.
0895: */
0896: void shunInvalidGossiper(Address invalid_gossiper) {
0897: int num_pings = 0;
0898: Message shun_msg;
0899:
0900: if (invalid_gossipers.containsKey(invalid_gossiper)) {
0901: num_pings = ((Integer) invalid_gossipers
0902: .get(invalid_gossiper)).intValue();
0903: if (num_pings >= max_invalid_gossips) {
0904:
0905: if (log.isInfoEnabled())
0906: log.info("sender " + invalid_gossiper
0907: + " is not member of " + members
0908: + " ! Telling it to leave group");
0909: shun_msg = new Message(invalid_gossiper, null, null);
0910: shun_msg.putHeader(getName(), new PbcastHeader(
0911: PbcastHeader.NOT_MEMBER));
0912: passDown(new Event(Event.MSG, shun_msg));
0913: invalid_gossipers.remove(invalid_gossiper);
0914: } else {
0915: num_pings++;
0916: invalid_gossipers.put(invalid_gossiper, new Integer(
0917: num_pings));
0918: }
0919: } else {
0920: num_pings++;
0921: invalid_gossipers.put(invalid_gossiper, new Integer(
0922: num_pings));
0923: }
0924: }
0925:
0926: /** Computes the gossip_interval. See DESIGN for details */
0927: long computeGossipInterval(int num_mbrs, double desired_avg_gossip) {
0928: return getRandom((long) (num_mbrs * desired_avg_gossip * 2));
0929: }
0930:
0931: long getRandom(long range) {
0932: return (long) ((Math.random() * range) % range);
0933: }
0934:
0935: /* ------------------------------- End of Private Methods ---------------------------------------- */
0936:
0937: private static class GossipEntry {
0938: PbcastHeader hdr = null;
0939: Address sender = null;
0940: byte[] data = null;
0941:
0942: GossipEntry(PbcastHeader hdr, Address sender, byte[] data) {
0943: this .hdr = hdr;
0944: this .sender = sender;
0945: this .data = data;
0946: }
0947:
0948: public String toString() {
0949: return "hdr=" + hdr + ", sender=" + sender + ", data="
0950: + data;
0951: }
0952: }
0953:
0954: /**
0955: Handles gossip and retransmission requests. Removes requests from a (bounded) queue.
0956: */
0957: private class GossipHandler implements Runnable {
0958: Thread t = null;
0959: final Queue queue;
0960:
0961: GossipHandler(Queue q) {
0962: queue = q;
0963: }
0964:
0965: void start() {
0966: if (t == null) {
0967: t = new Thread(this , "PBCAST.GossipHandlerThread");
0968: t.setDaemon(true);
0969: t.start();
0970: }
0971: }
0972:
0973: void stop() {
0974: Thread tmp;
0975: if (t != null && t.isAlive()) {
0976: tmp = t;
0977: t = null;
0978: if (queue != null)
0979: queue.close(false); // don't flush elements
0980: tmp.interrupt();
0981: }
0982: t = null;
0983: }
0984:
0985: public void run() {
0986: GossipEntry entry;
0987: PbcastHeader hdr;
0988: List xmit_msgs;
0989: byte[] data;
0990:
0991: while (t != null && queue != null) {
0992: try {
0993: entry = (GossipEntry) queue.remove();
0994: hdr = entry.hdr;
0995: if (hdr == null) {
0996: if (log.isErrorEnabled())
0997: log
0998: .error("gossip entry has no PbcastHeader");
0999: continue;
1000: }
1001:
1002: switch (hdr.type) {
1003:
1004: case PbcastHeader.GOSSIP:
1005: handleGossip(hdr.gossip);
1006: break;
1007:
1008: case PbcastHeader.XMIT_REQ:
1009: if (hdr.xmit_reqs == null) {
1010: if (log.isWarnEnabled())
1011: log.warn("request is null !");
1012: break;
1013: }
1014: handleXmitRequest(entry.sender, hdr.xmit_reqs);
1015: break;
1016:
1017: case PbcastHeader.XMIT_RSP:
1018: data = entry.data;
1019: if (data == null) {
1020: if (log.isWarnEnabled())
1021: log
1022: .warn("buffer is null (no xmitted msgs)");
1023: break;
1024: }
1025: try {
1026: xmit_msgs = (List) Util
1027: .objectFromByteBuffer(data);
1028: } catch (Exception ex) {
1029: if (log.isErrorEnabled())
1030: log
1031: .error(
1032: "failed creating retransmitted messages from buffer",
1033: ex);
1034: break;
1035: }
1036: handleXmitRsp(xmit_msgs);
1037: break;
1038:
1039: case PbcastHeader.NOT_MEMBER: // we are shunned
1040: if (shun) {
1041: if (log.isInfoEnabled())
1042: log
1043: .info("I am being shunned. Will leave and re-join");
1044: passUp(new Event(Event.EXIT));
1045: }
1046: break;
1047:
1048: default:
1049: if (log.isErrorEnabled())
1050: log.error("type (" + hdr.type
1051: + ") of PbcastHeader not known !");
1052: return;
1053: }
1054: } catch (QueueClosedException closed) {
1055: break;
1056: }
1057: }
1058: }
1059: }
1060:
1061: }
|