0001: //$Id: TOTAL_TOKEN.java,v 1.14 2006/04/28 15:25:00 belaban Exp $
0002:
0003: package org.jgroups.protocols;
0004:
0005: import org.jgroups.*;
0006: import org.jgroups.blocks.GroupRequest;
0007: import org.jgroups.protocols.pbcast.Digest;
0008: import org.jgroups.protocols.ring.RingNodeFlowControl;
0009: import org.jgroups.protocols.ring.RingToken;
0010: import org.jgroups.protocols.ring.TokenLostException;
0011: import org.jgroups.protocols.ring.UdpRingNode;
0012: import org.jgroups.stack.IpAddress;
0013: import org.jgroups.stack.RpcProtocol;
0014: import org.jgroups.util.RspList;
0015: import org.jgroups.util.Util;
0016:
0017: import java.io.IOException;
0018: import java.io.ObjectInput;
0019: import java.io.ObjectOutput;
0020: import java.util.*;
0021:
0022: /**
0023: * <p>
0024: * Total order implementation based on <a href="http://citeseer.nj.nec.com/amir95totem.html">
0025: * The Totem Single-Ring Ordering and Membership Protocol</a>.
0026: * <p>
0027: *
0028: * <p>
0029: * However, this is an adaption of algorithm mentioned in the research paper above since we reuse
0030: * our own membership protocol and failure detectors. Somewhat different flow control mechanism is
0031: * also implemented.
0032: *
0033: * <p>
0034: * Token passing is done through reliable point-to-point udp channels provided by UNICAST layer.
0035: * Process groups nodes members are organized in a logical ring.
0036: * </p>
0037: *
0038: * <p>
0039: * Total token layer doesn't need NAKACK nor STABLE layer beneath it since it implements it's own
0040: * retransmission and tracks stability of the messages from the information piggybacked on the
0041: * token itself.
0042: * </p>
0043: *
0044: * <p>
0045: * For the typical protocol stack configuration used, see org.jgroups.demos.TotalTokenDemo and
0046: * total-token.xml configuration file provided with this distribution of JGroups.
0047: * </p>
0048: *
0049: *
0050: *
0051: *@author Vladimir Blagojevic vladimir@cs.yorku.ca
0052: *@version $Revision: 1.14 $
0053: *
0054: *@see org.jgroups.protocols.ring.RingNodeFlowControl
0055: *@see org.jgroups.protocols.ring.RingNode
0056: *@see org.jgroups.protocols.ring.TcpRingNode
0057: *@see org.jgroups.protocols.ring.UdpRingNode
0058: *
0059: **/
0060:
0061: public class TOTAL_TOKEN extends RpcProtocol {
0062: private static final Object[] NULL_OBJ = new Object[] {};
0063: private static final Class[] NULL_TYPES = new Class[] {};
0064:
0065: public static class TotalTokenHeader extends Header {
0066:
0067: /**
0068: * sequence number of the message
0069: */
0070: private long seq;
0071:
0072: /**
0073: *used for externalization
0074: */
0075: public TotalTokenHeader() {
0076: }
0077:
0078: public TotalTokenHeader(long seq) {
0079: this .seq = seq;
0080: }
0081:
0082: public TotalTokenHeader(Long seq) {
0083: this .seq = seq.longValue();
0084: }
0085:
0086: /**
0087: *Returns sequence number of the message that owns this header
0088: *@return sequence number
0089: */
0090: public long getSeq() {
0091: return seq;
0092: }
0093:
0094: /**
0095: *Returns size of the header
0096: * @return headersize in bytes
0097: */
0098: public long size() {
0099: //calculated using Util.SizeOf(Object)
0100: return 121;
0101: }
0102:
0103: /**
0104: * Manual serialization
0105: *
0106: *
0107: */
0108: public void writeExternal(ObjectOutput out) throws IOException {
0109: out.writeLong(seq);
0110: }
0111:
0112: /**
0113: * Manual deserialization
0114: *
0115: */
0116: public void readExternal(ObjectInput in) throws IOException,
0117: ClassNotFoundException {
0118: seq = in.readLong();
0119: }
0120:
0121: public String toString() {
0122: return "[TotalTokenHeader=" + seq + ']';
0123: }
0124: }
0125:
0126: public static class RingTokenHeader extends Header {
0127: public RingTokenHeader() {
0128: }
0129:
0130: public void writeExternal(ObjectOutput out) throws IOException {
0131: }
0132:
0133: public void readExternal(ObjectInput in) throws IOException,
0134: ClassNotFoundException {
0135: }
0136:
0137: public long size() {
0138: //calculated using Util.SizeOf(Object)
0139: return 110;
0140: }
0141: }
0142:
0143: private static final int OPERATIONAL_STATE = 0;
0144: private static final int RECOVERY_STATE = 1;
0145:
0146: UdpRingNode node;
0147: RingNodeFlowControl flowControl;
0148: Address localAddress;
0149: private final TokenTransmitter tokenRetransmitter = new TokenTransmitter();
0150: final List newMessagesQueue = Collections
0151: .synchronizedList(new ArrayList());
0152: SortedSet liveMembersInRecovery, suspects;
0153:
0154: final Object mutex = new Object();
0155: TreeMap receivedMessagesQueue;
0156: long myAru = 0;
0157:
0158: final Object threadCoordinationMutex = new Object();
0159: final boolean tokenInStack = false;
0160: final boolean threadDeliveringMessage = false;
0161: boolean tokenSeen = false;
0162:
0163: volatile boolean isRecoveryLeader = false;
0164: volatile int state;
0165: volatile int sleepTime = 10;
0166:
0167: long highestSeenSeq = 0;
0168: long lastRoundTokensAru = 0;
0169: int lastRoundTransmitCount, lastRoundRebroadcastCount = 0;
0170: int blockSendingBacklogThreshold = Integer.MAX_VALUE;
0171: int unblockSendingBacklogThreshold = Integer.MIN_VALUE;
0172: boolean tokenCirculating = false;
0173: boolean senderBlocked = false;
0174: final Object block_sending = new Object();
0175: public static final String prot_name = "TOTAL_TOKEN";
0176:
0177: public String getName() {
0178: return prot_name;
0179: }
0180:
0181: private String getState() {
0182: if (state == OPERATIONAL_STATE) {
0183: return "OPERATIONAL";
0184: } else
0185: return "RECOVERY";
0186: }
0187:
0188: public void start() throws Exception {
0189: super .start();
0190: receivedMessagesQueue = new TreeMap();
0191: tokenRetransmitter.start();
0192: }
0193:
0194: /**
0195: * Overrides @org.jgroups.stack.MessageProtocol#stop().
0196: */
0197: public void stop() {
0198: super .stop();
0199: tokenRetransmitter.shutDown();
0200: }
0201:
0202: /**
0203: * Setup the Protocol instance acording to the configuration string
0204: *
0205: */
0206: public boolean setProperties(Properties props) {
0207: String str;
0208:
0209: super .setProperties(props);
0210: str = props.getProperty("block_sending");
0211: if (str != null) {
0212: blockSendingBacklogThreshold = Integer.parseInt(str);
0213: props.remove("block_sending");
0214: }
0215:
0216: str = props.getProperty("unblock_sending");
0217: if (str != null) {
0218: unblockSendingBacklogThreshold = Integer.parseInt(str);
0219: props.remove("unblock_sending");
0220: }
0221:
0222: if (props.size() > 0) {
0223: log
0224: .error("UDP.setProperties(): the following properties are not recognized: "
0225: + props);
0226:
0227: return false;
0228: }
0229: return true;
0230: }
0231:
0232: public IpAddress getTokenReceiverAddress() {
0233: return node != null ? node.getTokenReceiverAddress() : null;
0234: }
0235:
0236: public Vector providedUpServices() {
0237: Vector retval = new Vector();
0238: retval.addElement(new Integer(Event.GET_DIGEST));
0239: retval.addElement(new Integer(Event.GET_DIGEST_STATE));
0240: retval.addElement(new Integer(Event.SET_DIGEST));
0241: return retval;
0242: }
0243:
0244: public boolean handleUpEvent(Event evt) {
0245: Message msg;
0246: Header h;
0247: switch (evt.getType()) {
0248:
0249: case Event.SET_LOCAL_ADDRESS:
0250: localAddress = (Address) evt.getArg();
0251: node = new UdpRingNode(this , localAddress);
0252: flowControl = new RingNodeFlowControl();
0253: break;
0254:
0255: case Event.SUSPECT:
0256: Address suspect = (Address) evt.getArg();
0257: onSuspectMessage(suspect);
0258: break;
0259:
0260: case Event.MSG:
0261: msg = (Message) evt.getArg();
0262: h = msg.getHeader(getName());
0263: if (h instanceof TotalTokenHeader) {
0264: messageArrived(msg);
0265: return false;
0266: } else if (h instanceof RingTokenHeader) {
0267: if (node != null) {
0268: Object tmp = msg.getObject();
0269: node.tokenArrived(tmp);
0270: }
0271: return false;
0272: }
0273: }
0274: return true;
0275: }
0276:
0277: public boolean handleDownEvent(Event evt) {
0278: switch (evt.getType()) {
0279: case Event.GET_DIGEST:
0280: case Event.GET_DIGEST_STATE:
0281:
0282: Digest d = new Digest(members.size());
0283: Address sender = null;
0284: //all members have same digest :)
0285: for (int j = 0; j < members.size(); j++) {
0286: sender = (Address) members.elementAt(j);
0287: d.add(sender, highestSeenSeq, highestSeenSeq);
0288: }
0289: passUp(new Event(Event.GET_DIGEST_OK, d));
0290: return false;
0291: case Event.SET_DIGEST:
0292: Digest receivedDigest = (Digest) evt.getArg();
0293:
0294: // changed by bela July 12 2005, not sure if this is correct, don't know what the original author
0295: // intended to do here
0296: // myAru = receivedDigest.highSeqnoAt(0);
0297: myAru = receivedDigest.highSeqnoAt(localAddress);
0298: return false;
0299:
0300: case Event.VIEW_CHANGE:
0301: onViewChange();
0302: return true;
0303:
0304: /*
0305: case Event.CLEANUP:
0306: // do not pass cleanup event
0307: //further down. This is a hack to enable
0308: // sucessfull leave from group when using pbcast.GMS.
0309: // It just buys us 5 seconds to imminent STOP
0310: // event following CLEANUP. We hope that the moment
0311: // this node disconnect up until new view is installed
0312: // at other members is less than 5 seconds.
0313:
0314: //The proper way would be to:
0315: //trap DISCONNECT event on the way down, do not pass it further.
0316: //wait for the new view to be installed (effectively excluding this node out of
0317: //ring) , wait for one token roundtrip time, and then send that trapped
0318: //DISCONNECT event down furhter to generate DISCONNECT_OK on the way up.
0319: // CLEANUP and STOP are generated after DISCONNECT.
0320:
0321: //However, as the things stand right now pbcast.GMS stops working immediately
0322: //when it receives DISCONNECT thus the new view is never generated in node that is
0323: //leaving the group.
0324:
0325: //pbcsat.GMS should still generate new view and stop working when
0326: //it receives STOP event.
0327:
0328: //In timeline DISCONNECT < CLEANUP < STOP
0329: return false;
0330: */
0331:
0332: case Event.MSG:
0333: Message msg = (Message) evt.getArg();
0334: //handle only multicasts
0335: if (msg == null)
0336: return false;
0337: if (msg.getDest() == null
0338: || msg.getDest().isMulticastAddress()) {
0339: newMessagesQueue.add(msg);
0340: return false;
0341: }
0342: }
0343: return true;
0344: }
0345:
0346: private void onViewChange() {
0347: isRecoveryLeader = false;
0348:
0349: if (suspects != null) {
0350: suspects.clear();
0351: suspects = null;
0352: }
0353: if (liveMembersInRecovery != null) {
0354: liveMembersInRecovery.clear();
0355: liveMembersInRecovery = null;
0356: }
0357: }
0358:
0359: private void onSuspectMessage(Address suspect) {
0360: state = RECOVERY_STATE;
0361: if (suspects == null || suspects.size() == 0) {
0362: suspects = Collections.synchronizedSortedSet(new TreeSet());
0363: liveMembersInRecovery = Collections
0364: .synchronizedSortedSet(new TreeSet(members));
0365: }
0366: suspects.add(suspect);
0367: liveMembersInRecovery.removeAll(suspects);
0368: isRecoveryLeader = isRecoveryLeader(liveMembersInRecovery);
0369: }
0370:
0371: /**
0372: * Given a set of surviving members in the transitioanl view
0373: * returns true if this stack is elected to be recovery leader.
0374: *
0375: */
0376: private boolean isRecoveryLeader(SortedSet liveMembers) {
0377: boolean recoveryLeader = false;
0378: if (liveMembers.size() > 0) {
0379: recoveryLeader = localAddress.equals(liveMembers.first());
0380: }
0381:
0382: if (log.isInfoEnabled())
0383: log.info("live memebers are " + liveMembers);
0384: if (log.isInfoEnabled())
0385: log.info("I am recovery leader?" + recoveryLeader);
0386: return recoveryLeader;
0387:
0388: }
0389:
0390: public long getAllReceivedUpTo() {
0391: return myAru;
0392: }
0393:
0394: public void installTransitionalView(Vector members) {
0395: if (node != null)
0396: node.reconfigure(members);
0397: }
0398:
0399: /**
0400: * Total Token recovery protocol (TTRP)
0401: *
0402: *
0403: *
0404: * Upon transition to recovery state, coordinator sends multiple reliable
0405: * unicasts message requesting each ring member to report it's allReceivedUpto
0406: * value. When all replies are received, a response list of allReceivedUpto
0407: * values is sorted and transformed into a set while dropping the lowest
0408: * allReceivedUpto value. For example , received response list [4,4,5,6,7,7,8]
0409: * is transformed into [5,6,7,8] thus not including the lowest value 4.
0410: *
0411: * The objective of the recovery protocol is to have each member receive all
0412: * messages up to maximum sequence value M from the response list, thus
0413: * satisfying virtual synchrony properties.
0414: *
0415: * Note that if all members report the same allReceivedUpto values, then
0416: * virtual synchrony is satisfied (since all surviving members have seen
0417: * the same set of messages) and we can immediately inject operational
0418: * token which will enable installment of the new view.
0419: *
0420: * Otherwise, a constructed set S of all allReceivedUpto values represent sequence ids
0421: * of messages that have to be received by all mebers prior to installing new
0422: * view thus satisfying virtual synchrony properties.
0423: *
0424: * A transitional view, visible only to TOTAL_TOKEN layer is then installed
0425: * on the ring by a coordinator. Again a multiple unicast are used. A
0426: * transitional view is deduced from current view by excluding suspected members.
0427: * Coordinator creates a recovery token by appending the set S of sequence ids to
0428: * token retransmission request list. Recovery token is then inserted into
0429: * transitional ring view.
0430: *
0431: * Upon reception of recovery token, ring members are not allowed to transmit
0432: * any additional new messages but only to retransmit messages from the
0433: * specified token retransmission request list.
0434: *
0435: * When all member detect that they have received all messages upto sequence
0436: * value M , the next member that first receives token converts it to operatioanl
0437: * token and normal operational state is restored in all nodes.
0438: *
0439: * If token is lost during recovery stage, recovery protocol is restarted.
0440: *
0441: * */
0442: private void recover() {
0443:
0444: if (isRecoveryLeader && state == RECOVERY_STATE) {
0445:
0446: if (log.isInfoEnabled())
0447: log.info("I am starting recovery now");
0448:
0449: Vector m = new Vector(liveMembersInRecovery);
0450:
0451: RspList list = callRemoteMethods(m, "getAllReceivedUpTo",
0452: NULL_OBJ, NULL_TYPES, GroupRequest.GET_ALL, 0);
0453: //RspList list = callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0);
0454: Vector myAllReceivedUpTos = list.getResults();
0455:
0456: callRemoteMethods(m, "getAllReceivedUpTo", NULL_OBJ,
0457: NULL_TYPES, GroupRequest.GET_ALL, 0);
0458: //callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0);
0459: Vector myAllReceivedUpTosConfirm = list.getResults();
0460:
0461: while (!myAllReceivedUpTos
0462: .equals(myAllReceivedUpTosConfirm)) {
0463: myAllReceivedUpTos = myAllReceivedUpTosConfirm;
0464: callRemoteMethods(m, "getAllReceivedUpTo", NULL_OBJ,
0465: NULL_TYPES, GroupRequest.GET_ALL, 0);
0466: // callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0);
0467: myAllReceivedUpTosConfirm = list.getResults();
0468:
0469: if (log.isInfoEnabled())
0470: log.info("myAllReceivedUpto values are"
0471: + myAllReceivedUpTos);
0472: if (log.isInfoEnabled())
0473: log.info("myAllReceivedUpto confirm values are "
0474: + myAllReceivedUpTosConfirm);
0475: }
0476:
0477: if (log.isInfoEnabled())
0478: log.info("myAllReceivedUpto stabilized values are"
0479: + myAllReceivedUpTos);
0480: if (log.isInfoEnabled())
0481: log
0482: .info("installing transitional view to repair the ring...");
0483:
0484: callRemoteMethods(m, "installTransitionalView",
0485: new Object[] { m }, new String[] { Vector.class
0486: .getName() }, GroupRequest.GET_ALL, 0);
0487: //callRemoteMethods(m, "installTransitionalView", m, GroupRequest.GET_ALL, 0);
0488:
0489: Vector xmits = prepareRecoveryRetransmissionList(myAllReceivedUpTos);
0490: RingToken injectToken = null;
0491: if (xmits.size() > 1) {
0492:
0493: if (log.isInfoEnabled())
0494: log
0495: .info("VS not satisfied, injecting recovery token...");
0496: long aru = ((Long) xmits.firstElement()).longValue();
0497: long highest = ((Long) xmits.lastElement()).longValue();
0498:
0499: injectToken = new RingToken(RingToken.RECOVERY);
0500: injectToken.setHighestSequence(highest);
0501: injectToken.setAllReceivedUpto(aru);
0502:
0503: Collection rtr = injectToken
0504: .getRetransmissionRequests();
0505: rtr.addAll(xmits);
0506: } else {
0507:
0508: if (log.isInfoEnabled())
0509: log
0510: .info("VS satisfied, injecting operational token...");
0511: injectToken = new RingToken();
0512: long sequence = ((Long) xmits.firstElement())
0513: .longValue();
0514: injectToken.setHighestSequence(sequence);
0515: injectToken.setAllReceivedUpto(sequence);
0516: }
0517: if (node != null)
0518: node.passToken(injectToken);
0519: tokenRetransmitter.resetTimeout();
0520: }
0521: }
0522:
0523: /**
0524: * Prepares a retransmissions list for recovery protocol
0525: * given a collection of all myReceivedUpTo values as
0526: * reported by polled surviving members.
0527: *
0528: *
0529: *
0530: */
0531: private Vector prepareRecoveryRetransmissionList(Vector sequences) {
0532: Collections.sort(sequences);
0533: Long first = (Long) sequences.firstElement();
0534: Long last = (Long) sequences.lastElement();
0535:
0536: Vector retransmissions = new Vector();
0537: if (first.equals(last)) {
0538: retransmissions.add(new Long(first.longValue()));
0539: } else {
0540: for (long j = first.longValue() + 1; j <= last.longValue(); j++) {
0541: retransmissions.add(new Long(j));
0542: }
0543: }
0544: return retransmissions;
0545: }
0546:
0547: protected void updateView(View newMembers) {
0548: super .updateView(newMembers);
0549: Vector newViewMembers = newMembers.getMembers();
0550: flowControl.viewChanged(newViewMembers.size());
0551: if (node != null)
0552: node.reconfigure(newViewMembers);
0553: boolean isCoordinator = localAddress.equals(newViewMembers
0554: .firstElement());
0555: int memberSize = newViewMembers.size();
0556:
0557: if (memberSize == 1 && isCoordinator && !tokenCirculating) {
0558: //create token for the first time , lets roll
0559: tokenCirculating = true;
0560: RingToken token = new RingToken();
0561: if (node != null)
0562: node.passToken(token);
0563: tokenRetransmitter.resetTimeout();
0564: }
0565: sleepTime = (20 / memberSize);
0566: }
0567:
0568: /**
0569: * TOTAL_TOKEN's up-handler thread invokes this method after multicast
0570: * message originating from some other TOTAL_TOKEN stack layer arrives at
0571: * this stack layer.
0572: *
0573: * Up-handler thread coordinates it's access to a shared variables
0574: * with TokenTransmitter thread.
0575: *
0576: * See tokenReceived() for details.
0577: *
0578: */
0579: private void messageArrived(Message m) {
0580: TotalTokenHeader h = (TotalTokenHeader) m.getHeader(getName());
0581: long seq = h.getSeq();
0582:
0583: synchronized (mutex) {
0584: if ((myAru + 1) <= seq) {
0585: if (seq > highestSeenSeq) {
0586: highestSeenSeq = seq;
0587: }
0588:
0589: receivedMessagesQueue.put(new Long(seq), m);
0590: if ((myAru + 1) == seq) {
0591: myAru = seq;
0592: passUp(new Event(Event.MSG, m));
0593: }
0594: if (isReceiveQueueHolePlugged()) {
0595: myAru = deliverMissingMessages();
0596: }
0597: }
0598: }
0599: }
0600:
0601: /**
0602: * Returns true if there is a hole in receive queue and at
0603: * least one messages with sequence id consecutive to myAru.
0604: *
0605: *
0606: */
0607: private boolean isReceiveQueueHolePlugged() {
0608: return ((myAru < highestSeenSeq) && receivedMessagesQueue
0609: .containsKey(new Long(myAru + 1)));
0610: }
0611:
0612: /**
0613: * Delivers as much as possible messages from receive
0614: * message queue as long as they are consecutive with
0615: * respect to their sequence ids.
0616: *
0617: */
0618: private long deliverMissingMessages() {
0619: Map.Entry entry = null;
0620: boolean inOrder = true;
0621: long lastDelivered = myAru;
0622: Set deliverySet = receivedMessagesQueue.tailMap(
0623: new Long(myAru + 1)).entrySet();
0624:
0625: if (log.isInfoEnabled())
0626: log.info("hole getting plugged, prior muAru " + myAru);
0627:
0628: for (Iterator iterator = deliverySet.iterator(); inOrder
0629: && iterator.hasNext();) {
0630: entry = (Map.Entry) iterator.next();
0631: long nextInQueue = ((Long) entry.getKey()).longValue();
0632: if (lastDelivered + 1 == nextInQueue) {
0633: Message m = (Message) entry.getValue();
0634: passUp(new Event(Event.MSG, m));
0635: lastDelivered++;
0636: } else {
0637: inOrder = false;
0638: }
0639: }
0640:
0641: if (log.isInfoEnabled())
0642: log.info("hole getting plugged, post muAru "
0643: + lastDelivered);
0644: return lastDelivered;
0645: }
0646:
0647: /**
0648: * Checks if the receivedMessageQueue has any missing sequence
0649: * numbers in it, and if it does it finds holes in sequences from
0650: * this stack's receivedMessageQueue and adds them to token retransmission
0651: * list, thus informing other group members about messages missing
0652: * from this stack.
0653: *
0654: *
0655: */
0656: private void updateTokenRtR(RingToken token) {
0657: long holeLowerBound = 0;
0658: long holeUpperBound = 0;
0659: Long missingSequence = null;
0660: Collection retransmissionList = null;
0661:
0662: //any holes?
0663: if (myAru < token.getHighestSequence()) {
0664: retransmissionList = token.getRetransmissionRequests();
0665: Set received = receivedMessagesQueue.tailMap(
0666: new Long(myAru + 1)).keySet();
0667: Iterator nonMissing = received.iterator();
0668: holeLowerBound = myAru;
0669:
0670: if (log.isDebugEnabled())
0671: log.debug("retransmission request prior"
0672: + retransmissionList);
0673:
0674: while (nonMissing.hasNext()) {
0675: Long seq = (Long) nonMissing.next();
0676: holeUpperBound = seq.longValue();
0677:
0678: while (holeLowerBound < holeUpperBound) {
0679: missingSequence = new Long(++holeLowerBound);
0680: retransmissionList.add(missingSequence);
0681: }
0682: holeLowerBound = holeUpperBound;
0683: }
0684:
0685: holeUpperBound = token.getHighestSequence();
0686: while (holeLowerBound < holeUpperBound) {
0687: missingSequence = new Long(++holeLowerBound);
0688: retransmissionList.add(missingSequence);
0689: }
0690:
0691: if (log.isDebugEnabled())
0692: log.debug("retransmission request after"
0693: + retransmissionList);
0694: }
0695: }
0696:
0697: /**
0698: * Sends messages in this stacks's outgoing queue and
0699: * saves a copy of each outgoing message in case they got lost.
0700: * If messages get lost it is thus guaranteed that each stack
0701: * that sent any message has a copy of it ready for retransmitting.
0702: *
0703: * Each sent message is stamped by monotonically increasing
0704: * sequence number starting from the highest sequence "seen"
0705: * on the ring.
0706: *
0707: * Returns number of messages actually sent. The number of
0708: * sent messages is bounded above by the flow control
0709: * algorithm (allowedCount) and bounded below by the number
0710: * of pending messages in newMessagesQueue.
0711: */
0712: private int broadcastMessages(int allowedCount, RingToken token) {
0713: List sendList = null;
0714: synchronized (newMessagesQueue) {
0715: int queueSize = newMessagesQueue.size();
0716:
0717: if (queueSize <= 0) {
0718: return 0;
0719: }
0720:
0721: else if (queueSize > allowedCount) {
0722: sendList = new ArrayList(newMessagesQueue.subList(0,
0723: allowedCount));
0724: newMessagesQueue.removeAll(sendList);
0725: }
0726:
0727: else {
0728: sendList = new ArrayList();
0729: sendList.addAll(newMessagesQueue);
0730: newMessagesQueue.clear();
0731: }
0732: }
0733:
0734: long tokenSeq = token.getHighestSequence();
0735:
0736: for (Iterator iterator = sendList.iterator(); iterator
0737: .hasNext();) {
0738: Message m = (Message) iterator.next();
0739: m.setSrc(localAddress);
0740: m.setDest(null); // mcast address
0741: m.putHeader(getName(), new TotalTokenHeader(++tokenSeq));
0742: receivedMessagesQueue.put(new Long(tokenSeq), m);
0743: passDown(new Event(Event.MSG, m));
0744: }
0745:
0746: if (token.getHighestSequence() == token.getAllReceivedUpto()) {
0747: token.setAllReceivedUpto(tokenSeq);
0748: }
0749: token.setHighestSequence(tokenSeq);
0750: return sendList.size();
0751: }
0752:
0753: /**
0754: * TokenTransmitter thread runs this method after receiving token.
0755: * Thread is possibly blocked if up-handler thread is currently running
0756: * through this stack i.e delivering an Event. Up-hanlder thread will
0757: * notify blocked TokenTransmitter thread when it has delivered current
0758: * Event so TokenTransmitter can proceed.
0759: * TokenTransmitter thread in turn notifies possibly blocked up-handler thread
0760: * when token has left the stack. Thus TokenTransmitter and up-hadler thread
0761: * coordinate their access to shared variables(receivedMessageQueue and myAru).
0762: *
0763: * tokenReceived method and subsequent methods called from tokenReceived represent
0764: * in some parts the totaly ordered algorithm presented in Amir's paper (see class
0765: * header for link)
0766: *
0767: *
0768: *
0769: */
0770: private void tokenReceived(RingToken token) {
0771:
0772: if (log.isInfoEnabled())
0773: log.info(token.toString());
0774: if (log.isDebugEnabled())
0775: log.debug(getState());
0776:
0777: flowControl.setBacklog(newMessagesQueue.size());
0778: flowControl.updateWindow(token);
0779:
0780: blockSenderIfRequired();
0781: unBlockSenderIfAcceptable();
0782:
0783: long tokensAru = 0;
0784: int broadcastCount = 0;
0785: int rebroadcastCount = 0;
0786: synchronized (mutex) {
0787: if (!tokenSeen) {
0788: long lastRoundAru = token.getHighestSequence()
0789: - token.getLastRoundBroadcastCount();
0790: if (myAru < token.getAllReceivedUpto()) {
0791: myAru = lastRoundAru;
0792: }
0793: //if(log.isInfoEnabled()) log.info("TOTAL_TOKEN.tokenReceived()", "tokenSeen " + myAru);
0794: tokenSeen = true;
0795: }
0796:
0797: if (token.getType() == RingToken.RECOVERY) {
0798: highestSeenSeq = token.getHighestSequence();
0799: if (highestSeenSeq == myAru) {
0800: if (log.isInfoEnabled())
0801: log.info("member node recovered");
0802: token.addRecoveredMember(localAddress);
0803: }
0804: }
0805:
0806: updateTokenRtR(token);
0807:
0808: int allowedToBroadcast = flowControl
0809: .getAllowedToBroadcast(token);
0810: rebroadcastCount = rebroadcastMessages(token);
0811: allowedToBroadcast -= rebroadcastCount;
0812:
0813: if (log.isInfoEnabled())
0814: log.info("myAllReceivedUpto" + myAru);
0815: if (log.isInfoEnabled())
0816: log.info("allowedToBroadcast" + allowedToBroadcast);
0817: if (log.isInfoEnabled())
0818: log.info("newMessagesQueue.size()"
0819: + newMessagesQueue.size());
0820:
0821: tokensAru = token.getAllReceivedUpto();
0822:
0823: if (myAru < tokensAru
0824: || localAddress.equals(token.getAruId())
0825: || token.getAruId() == null) {
0826: token.setAllReceivedUpto(myAru);
0827: if (token.getAllReceivedUpto() == token
0828: .getHighestSequence()) {
0829: token.setAruId(null);
0830: } else {
0831: token.setAruId(localAddress);
0832: }
0833: }
0834: if (allowedToBroadcast > 0
0835: && token.getType() == RingToken.OPERATIONAL) {
0836: broadcastCount = broadcastMessages(allowedToBroadcast,
0837: token);
0838: }
0839:
0840: if (tokensAru > lastRoundTokensAru) {
0841: removeStableMessages(receivedMessagesQueue,
0842: lastRoundTokensAru);
0843: }
0844:
0845: } //end synchronized
0846:
0847: //give CPU some breath
0848: Util.sleep(sleepTime);
0849:
0850: token.incrementTokenSequence();
0851: token.addLastRoundBroadcastCount(broadcastCount
0852: - lastRoundTransmitCount);
0853: token.addBacklog(flowControl.getBacklogDifference());
0854: flowControl.setPreviousBacklog();
0855: lastRoundTransmitCount = broadcastCount;
0856: lastRoundRebroadcastCount = rebroadcastCount;
0857: lastRoundTokensAru = tokensAru;
0858: }
0859:
0860: /**
0861: *
0862: * Rebroadcasts messages specified in token's retransmission
0863: * request list if those messages are available in this stack.
0864: * Returns number of rebroadcasted messages.
0865: *
0866: */
0867: private int rebroadcastMessages(RingToken token) {
0868: int rebroadCastCount = 0;
0869: Collection rexmitRequests = token.getRetransmissionRequests();
0870: if (rexmitRequests.size() > 0) {
0871: Collection rbl = getRebroadcastList(rexmitRequests);
0872: rebroadCastCount = rbl.size();
0873: if (rebroadCastCount > 0) {
0874:
0875: if (log.isInfoEnabled())
0876: log.info("rebroadcasting " + rbl);
0877:
0878: Long s = null;
0879: for (Iterator iterator = rbl.iterator(); iterator
0880: .hasNext();) {
0881: s = (Long) iterator.next();
0882: Message m = (Message) receivedMessagesQueue.get(s);
0883: passDown(new Event(Event.MSG, m));
0884: }
0885: }
0886: }
0887: return rebroadCastCount;
0888: }
0889:
0890: private void invalidateOnTokenloss() {
0891: lastRoundTransmitCount = 0;
0892: flowControl.invalidate();
0893: }
0894:
0895: /**
0896: * Checks if the down pending queue's (newMessagesQueue) size is
0897: * greater than blockSendingBacklogThreshold specified in the properties.
0898: * If it is, client's sending thread is effectively blocked until
0899: * down pending queue's size drops below unblockSendingBacklogThreshold.
0900: *
0901: *
0902: */
0903: private void blockSenderIfRequired() {
0904: if (!senderBlocked
0905: && flowControl.getBacklog() > blockSendingBacklogThreshold) {
0906: synchronized (block_sending) {
0907: senderBlocked = true;
0908: while (senderBlocked) {
0909: try {
0910: block_sending.wait();
0911: } catch (InterruptedException e) {
0912: }
0913: }
0914: }
0915: }
0916: }
0917:
0918: /**
0919: * Checks if the down pending queue's (newMessagesQueue) size is
0920: * smaller than unblockSendingBacklogThreshold specified in the properties.
0921: * If it is, client's sending thread is effectively unblocked enabling
0922: * new messages to be queued for transmission.
0923: *
0924: *
0925: */
0926: private void unBlockSenderIfAcceptable() {
0927: if (senderBlocked
0928: && flowControl.getBacklog() < unblockSendingBacklogThreshold) {
0929: synchronized (block_sending) {
0930: senderBlocked = false;
0931: block_sending.notifyAll();
0932: }
0933: }
0934:
0935: }
0936:
0937: /**
0938: * Removes messages determined to be stable(i.e seen by all members)
0939: * from the specified queue. If the client also clears all reference
0940: * to these messages (in application space) they become eligible for garabge
0941: * collection.
0942: *
0943: *
0944: */
0945:
0946: private void removeStableMessages(TreeMap m, long upToSeq) {
0947:
0948: if (m.size() > 0) {
0949: long first = ((Long) m.firstKey()).longValue();
0950: if (first > upToSeq) {
0951: upToSeq = first;
0952: }
0953:
0954: if (log.isDebugEnabled())
0955: log.debug("cutting queue first key " + m.firstKey()
0956: + " cut at " + upToSeq + " last key "
0957: + m.lastKey());
0958: SortedMap stable = m.headMap(new Long(upToSeq));
0959: stable.clear();
0960: }
0961: }
0962:
0963: /**
0964: * Determines a subset of message sequence numbers
0965: * available for retransmission from this stack.
0966: *
0967: */
0968: private Collection getRebroadcastList(Collection rtr) {
0969: ArrayList rebroadcastList = new ArrayList(rtr);
0970: rebroadcastList.retainAll(receivedMessagesQueue.keySet());
0971: rtr.removeAll(rebroadcastList);
0972: Collections.sort(rebroadcastList);
0973: return rebroadcastList;
0974: }
0975:
0976: /**
0977: * TokenTransimitter thread transmits the token to the next member
0978: * in the logical ring as well as it receives token from the previous
0979: * member in the ring. Smoothed ring roundtrip time is computed
0980: * in order to detect token loss. If the timeout expires AND this
0981: * stack has received SUSPECT message, recovery protocol is invoked.
0982: * See recover method for details.
0983: *
0984: */
0985: private class TokenTransmitter extends Thread {
0986: long rtt = 0;
0987: long timer;
0988: double srtt = 1000; //1 second to start
0989: final double a = 0.09;
0990: final int timeoutFactor = 10;
0991: volatile boolean running = false;
0992:
0993: private TokenTransmitter() {
0994: super (Util.getGlobalThreadGroup(), "TokenTransmitter");
0995: resetTimeout();
0996: running = true;
0997: }
0998:
0999: private void shutDown() {
1000: running = false;
1001: }
1002:
1003: private void recalculateTimeout() {
1004: long now = System.currentTimeMillis();
1005: if (timer > 0) {
1006: rtt = now - timer;
1007: srtt = (1 - a) * srtt + a * rtt;
1008: }
1009: }
1010:
1011: private double getTimeout() {
1012: return srtt * timeoutFactor;
1013: }
1014:
1015: private void resetTimeout() {
1016: timer = System.currentTimeMillis();
1017: }
1018:
1019: private boolean isRecoveryCompleted(RingToken token) {
1020: return liveMembersInRecovery.equals(token
1021: .getRecoveredMembers());
1022: }
1023:
1024: public void run() {
1025: while (running) {
1026: RingToken token = null;
1027: int timeout = 0;
1028:
1029: if (node == null) {
1030: // sleep some time, then retry
1031: Util.sleep(500);
1032: continue;
1033: }
1034:
1035: try {
1036: timeout = (int) getTimeout();
1037:
1038: if (log.isInfoEnabled())
1039: log.info("timeout(ms)=" + timeout);
1040:
1041: token = (RingToken) node.receiveToken(timeout);
1042:
1043: if (token.getType() == RingToken.OPERATIONAL
1044: && state == RECOVERY_STATE) {
1045: state = OPERATIONAL_STATE;
1046: }
1047:
1048: tokenReceived(token);
1049: recalculateTimeout();
1050:
1051: if (token.getType() == RingToken.RECOVERY
1052: && isRecoveryCompleted(token)) {
1053:
1054: if (log.isInfoEnabled())
1055: log
1056: .info("all members recovered, injecting operational token");
1057: token.setType(RingToken.OPERATIONAL);
1058: }
1059: node.passToken(token);
1060: resetTimeout();
1061: } catch (TokenLostException tle) {
1062: invalidateOnTokenloss();
1063: state = RECOVERY_STATE;
1064: recover();
1065: }
1066: }
1067: }
1068: }
1069: }
|