0001: // $Id: TOTAL_OLD.java,v 1.12 2006/01/19 09:53:37 belaban Exp $
0002:
0003: package org.jgroups.protocols;
0004:
0005: import org.apache.commons.logging.Log;
0006: import org.apache.commons.logging.LogFactory;
0007: import org.jgroups.*;
0008: import org.jgroups.util.Util;
0009: import org.jgroups.stack.Protocol;
0010:
0011: import java.io.IOException;
0012: import java.io.ObjectInput;
0013: import java.io.ObjectOutput;
0014: import java.util.Vector;
0015:
0016: /**
0017: * class SavedMessages
0018: * <p/>
0019: * Stores a set of messages along with their sequence id (assigned by the sequencer).
0020: */
0021: class SavedMessages {
0022:
0023: final Log log = LogFactory.getLog(SavedMessages.class);
0024:
0025: /**
0026: * class Entry (inner class)
0027: * <p/>
0028: * object type to store in the messages Vector (need to store sequence id in addition to message)
0029: */
0030: class Entry {
0031: private final Message msg;
0032: private final long seq;
0033:
0034: Entry(Message msg, long seq) {
0035: this .msg = msg;
0036: this .seq = seq;
0037: }
0038:
0039: public Message getMsg() {
0040: return msg;
0041: }
0042:
0043: public long getSeq() {
0044: return seq;
0045: }
0046: } // class Entry
0047:
0048: private final Vector messages; // vector of "Entry"s to store "Message"s, sorted by sequence id
0049:
0050: /**
0051: * Constructor - creates an empty space to store messages
0052: */
0053: SavedMessages() {
0054: messages = new Vector();
0055: }
0056:
0057: /**
0058: * inserts the specified message and sequence id into the "list" of stored messages
0059: * if the sequence id given is already stored, then nothing is stored
0060: */
0061: public void insertMessage(Message msg, long seq) {
0062: synchronized (messages) {
0063: int size = messages.size();
0064: int index = 0;
0065: long this _seq = -1; // used to prevent duplicate messages being stored
0066:
0067: // find the index where this message should be inserted
0068: try {
0069: while ((index < size)
0070: && ((this _seq = ((Entry) (messages
0071: .elementAt(index))).getSeq()) < seq)) {
0072: index++;
0073: }
0074: } catch (java.lang.ClassCastException e) {
0075: log
0076: .error("Error: (TOTAL_OLD) SavedMessages.insertMessage() - ClassCastException: could not cast element of \"messages\" to an Entry (index "
0077: + index + ')');
0078: return;
0079: }
0080:
0081: // check that the sequences aren't the same (don't want duplicates)
0082: if (this _seq == seq) {
0083: log
0084: .error("SavedMessages.insertMessage() - sequence "
0085: + seq
0086: + " already exists in saved messages. Message NOT saved.");
0087: return;
0088: }
0089:
0090: messages.insertElementAt(new Entry(msg, seq), index);
0091: } // synchronized( messages )
0092: }
0093:
0094: /**
0095: * returns a copy of the stored message with the given sequence id
0096: * if delete_msg is true, then the message is removed from the
0097: * the list of stored messages, otherwise the message is not
0098: * removed from the list
0099: * if no message is stored with this sequence id, null is returned
0100: */
0101: private Message getMessage(long seq, boolean delete_msg) {
0102: synchronized (messages) {
0103: int size = messages.size();
0104: int index = 0;
0105: long this _seq = -1;
0106: try {
0107: while ((index < size)
0108: && ((this _seq = (((Entry) (messages
0109: .elementAt(index))).getSeq())) < seq)) {
0110: index++;
0111: }
0112: } catch (java.lang.ClassCastException e) {
0113: log
0114: .error("Error: (TOTAL_OLD) SavedMessages.getMessage() - ClassCastException: could not cast element of \"messages\" to an Entry (index "
0115: + index + ')');
0116: return null;
0117: }
0118: // determine if we found the specified sequence
0119: if (this _seq == seq) {
0120: // we found the message at index
0121: Object temp_obj = messages.elementAt(index);
0122: if (temp_obj instanceof Entry) {
0123: Message ret_val = ((Entry) temp_obj).getMsg()
0124: .copy();
0125:
0126: // should we delete
0127: if (delete_msg) {
0128: messages.removeElementAt(index);
0129: }
0130:
0131: return ret_val;
0132: } else {
0133: log
0134: .error("Error: (TOTAL_OLD) SavedMessages.getMessage() - could not cast element of \"messages\" to an Entry (index "
0135: + index + ')');
0136: return null;
0137: } // if ( temp_obj instanceof Entry )
0138: } else {
0139: // we didn't find this sequence number in the messages
0140: return null;
0141: }
0142: } // synchronized( messages )
0143: }
0144:
0145: /**
0146: * returns a stored message with the given sequence id
0147: * the message is then removed from the list of stored messages
0148: * if no message is stored with this sequence id, null is returned
0149: */
0150: public Message getMessage(long seq) {
0151: return getMessage(seq, true);
0152: }
0153:
0154: /**
0155: * similar to GetMessage, except a copy of the message is returned
0156: * and the message is not removed from the list
0157: */
0158: public Message peekMessage(long seq) {
0159: return getMessage(seq, false);
0160: }
0161:
0162: /**
0163: * returns a copy of the stored message with the lowest sequence id
0164: * if delete_msg is true, then the message is removed from the
0165: * the list of stored messages, otherwise the message is not
0166: * removed from the list
0167: * if their are no messages stored, null is returned
0168: */
0169: private Message getFirstMessage(boolean delete_msg) {
0170: synchronized (messages) {
0171: if (isEmpty()) {
0172: return null;
0173: } else {
0174: Object temp_obj = messages.firstElement();
0175: if (temp_obj instanceof Entry) {
0176: Message ret_val = ((Entry) temp_obj).getMsg()
0177: .copy();
0178: messages.removeElementAt(0);
0179: return ret_val;
0180: } else {
0181: log
0182: .error("Error: (TOTAL_OLD) SavedMessages.getFirstMessage() - could not cast element of \"messages\" to an Entry");
0183: return null;
0184: } // if ( temp_obj instanceof Entry )
0185: }
0186: } // synchronized( messages )
0187: }
0188:
0189: /**
0190: * returns the stored message with the lowest sequence id;
0191: * the message is then removed from the list of stored messages
0192: * if their are no messages stored, null is returned
0193: */
0194: public synchronized Message getFirstMessage() {
0195: return getFirstMessage(true);
0196: }
0197:
0198: /**
0199: * similar to GetFirstMessage, except a copy of the message is returned
0200: * and the message is not removed from the list
0201: */
0202: public Message peekFirstMessage() {
0203: return getFirstMessage(false);
0204: }
0205:
0206: /**
0207: * returns the lowest sequence id of the messages stored
0208: * if no messages are stored, -1 is returned
0209: */
0210: public long getFirstSeq() {
0211: synchronized (messages) {
0212: if (isEmpty()) {
0213: return -1;
0214: } else {
0215: Object temp_obj = messages.firstElement();
0216: if (temp_obj instanceof Entry) {
0217: return ((Entry) temp_obj).getSeq();
0218: } else {
0219: log
0220: .error("Error: (TOTAL_OLD) SavedMessages.getFirstSeq() - could not cast element of \"messages\" to an Entry ");
0221: return -1;
0222: }
0223: }
0224: } // synchronized( messages )
0225: }
0226:
0227: /**
0228: * returns true if there are messages stored
0229: * returns false if there are no messages stored
0230: */
0231: public boolean isEmpty() {
0232: return messages.isEmpty();
0233: }
0234:
0235: /**
0236: * returns the number of messages stored
0237: */
0238: public int getSize() {
0239: return messages.size();
0240: }
0241:
0242: /**
0243: * clears all of the stored messages
0244: */
0245: public void clearMessages() {
0246: synchronized (messages) {
0247: messages.removeAllElements();
0248: }
0249: }
0250: } // class SavedMessages
0251:
0252: /**
0253: * class MessageAcks
0254: * <p/>
0255: * Used by sequencer to store cumulative acknowledgements of broadcast messages
0256: * sent to the group in this view
0257: */
0258: class MessageAcks {
0259:
0260: final Log log = LogFactory.getLog(MessageAcks.class);
0261:
0262: // TODO: may also want to store some sort of timestamp in each Entry (maybe)
0263: /**
0264: * class Entry (inner class)
0265: * <p/>
0266: * object type to store cumulative acknowledgements using a member's Address
0267: * and the sequence id of a message
0268: */
0269: class Entry {
0270: public final Address addr;
0271: public long seq;
0272:
0273: Entry(Address addr, long seq) {
0274: this .addr = addr;
0275: this .seq = seq;
0276: }
0277:
0278: Entry(Address addr) {
0279: this .addr = addr;
0280: this .seq = -1; // means that no acknowledgements have been made yet
0281: }
0282: } // class Entry
0283:
0284: // Vector of "Entry"s representing cumulative acknowledgements for each member of the group
0285: private final Vector acks;
0286:
0287: private final SavedMessages message_history; // history of broadcast messages sent
0288:
0289: /**
0290: * Constructor - creates a Vector of "Entry"s given a Vector of "Address"es for the members
0291: */
0292: MessageAcks(Vector members) {
0293: acks = new Vector();
0294:
0295: // initialize the message history to contain no messages
0296: message_history = new SavedMessages();
0297:
0298: // insert slots for each member in the acknowledgement Vector
0299: reset(members);
0300: }
0301:
0302: /**
0303: * resets acknowledgement Vector with "Entry"s using the given Vector of "Address"es
0304: * also clears the message history
0305: */
0306: public synchronized void reset(Vector members) {
0307: clear();
0308:
0309: // initialize Vector of acknowledgements (no acks for any member)
0310: int num_members = members.size();
0311: for (int i = 0; i < num_members; i++) {
0312: Object temp_obj = members.elementAt(i);
0313: if (temp_obj instanceof Address) {
0314: acks.addElement(new Entry((Address) temp_obj));
0315: } else {
0316: log
0317: .error("Error: (TOTAL_OLD) MessageAcks.reset() - could not cast element of \"members\" to an Address object");
0318: return;
0319: }
0320: }
0321: }
0322:
0323: /**
0324: * clear all acknowledgements and the message history
0325: */
0326: private void clear() {
0327: acks.removeAllElements();
0328: message_history.clearMessages();
0329: }
0330:
0331: /**
0332: * returns the Entry from the acknowledgement Vector with the given Address
0333: * returns null if an Entry with the given Address is not found
0334: */
0335: private Entry getEntry(Address addr) {
0336: synchronized (acks) {
0337: // look for this addreess in the acknowledgement Vector
0338: int size = acks.size();
0339: for (int i = 0; i < size; i++) {
0340: Object temp_obj = acks.elementAt(i);
0341: if (temp_obj instanceof Entry) {
0342: Entry this _entry = (Entry) temp_obj;
0343: if ((this _entry.addr).equals(addr)) {
0344: // the given Address matches this entry
0345: return this _entry;
0346: }
0347: } else {
0348: log
0349: .error("Error: (TOTAL_OLD) MessageAcks.getEntry() - could not cast element of \"acks\" to an Entry");
0350: } // if ( temp_obj instanceof Entry )
0351: }
0352:
0353: // if we get here, we didn't find this Address
0354: return null;
0355: }
0356: }
0357:
0358: /**
0359: * sets the sequence id for the given Address to the given value
0360: * note: if the current sequence value for this host is greater than
0361: * the given value, the sequence for this member is NOT changed
0362: * (i.e. it will only set it to a larger value)
0363: * if the given Address is not found in the member list,
0364: * nothing is changed
0365: */
0366: public void setSeq(Address addr, long seq) {
0367: Entry this _entry = getEntry(addr);
0368: if ((this _entry != null) && (this _entry.seq < seq)) {
0369: this _entry.seq = seq;
0370:
0371: // try to remove any messages that we don't need anymore
0372: truncateHistory();
0373: }
0374: }
0375:
0376: /**
0377: * returns the sequence id of the "latest" cumulative acknowledgement
0378: * for the specified Address
0379: * if the Address is not found in the member list, a negative value
0380: * is returned
0381: * note: the value returned may also be negative if their have been
0382: * no acknowledgements from the given address
0383: */
0384: public long getSeq(Address addr) {
0385: Entry this _entry = getEntry(addr);
0386: if (this _entry == null) {
0387: return -2; // TODO: change this to something else (e.g. constant) later (maybe)
0388: } else {
0389: return this _entry.seq;
0390: }
0391: }
0392:
0393: /**
0394: * returns the message in the history that matches the given sequence id
0395: * returns null if no message exists in the history with this sequence id
0396: */
0397: public Message getMessage(long seq) {
0398: return message_history.peekMessage(seq);
0399: }
0400:
0401: /**
0402: * adds the given message (with the specified sequence id) to the
0403: * message history
0404: * if the given sequence id already exists in the message history,
0405: * the message is NOT added
0406: */
0407: public void addMessage(Message msg, long seq) {
0408: message_history.insertMessage(msg, seq);
0409: }
0410:
0411: /**
0412: * returns the minimum cumulative acknowledged sequence id from all the members
0413: * (i.e. the greatest sequence id cumulatively acknowledged by all members)
0414: */
0415: private long getLowestSeqAck() {
0416: synchronized (acks) {
0417: long ret_val = -10; // start with a negative value
0418:
0419: int size = acks.size();
0420: for (int i = 0; i < size; i++) {
0421: Object temp_obj = acks.elementAt(i);
0422: if (temp_obj instanceof Entry) {
0423: long this _seq = ((Entry) temp_obj).seq;
0424: if (this _seq < ret_val) {
0425: ret_val = this _seq;
0426: }
0427: } else {
0428: log
0429: .error("Error: (TOTAL_OLD) MessageAcks.getLowestSeqAck() - could not cast element of \"acks\" to an Entry (index="
0430: + i + ')');
0431: return -1;
0432: }
0433: }
0434:
0435: return ret_val;
0436: }
0437: }
0438:
0439: /**
0440: * removes messages from the history that have been acknowledged
0441: * by all the members of the group
0442: */
0443: private synchronized void truncateHistory() {
0444: long lowest_ack_seq = getLowestSeqAck();
0445: if (lowest_ack_seq < 0) {
0446: // either no members, or someone has not received any messages yet
0447: // either way, do nothing
0448: return;
0449: }
0450:
0451: // don't want message_history being altered during this operation
0452: synchronized (message_history) {
0453: long lowest_stored_seq;
0454: // keep deleting the oldest stored message for as long as we can
0455: while (((lowest_stored_seq = message_history.getFirstSeq()) >= 0)
0456: && (lowest_stored_seq > lowest_ack_seq)) {
0457: // we can delete the oldest stored message
0458: message_history.getFirstMessage();
0459: }
0460: } // synchronized( message_history )
0461: }
0462: } // class MessageAcks
0463:
0464: /**
0465: * **************************************************************************
0466: * class TOTAL_OLD extends Protocol
0467: * <p/>
0468: * TODO: (more comments)
0469: * Sequencer based total ordering protocol layer
0470: * - requires the following layers "below" it in the stack
0471: * (or layers with equivalent functionality):
0472: * GMS, FD, PING, UDP, ...
0473: *
0474: * @author Manish Sambhu mms21@cornell.edu Spring 1999
0475: * **************************************************************************
0476: */
0477: public class TOTAL_OLD extends Protocol {
0478: // the unique name of the protocol
0479: private final static String PROTOCOL_NAME = "TOTAL_OLD";
0480:
0481: private Address local_addr = null;
0482: private Vector members = new Vector(); // note: members should never be null
0483: // (because of synchronized blocks)
0484:
0485: /**
0486: * next_seq_id
0487: * the sequence id of the next message we expect to receive
0488: * note: this value is only meaningful when non-negative
0489: */
0490: private long next_seq_id = -1;
0491:
0492: /**
0493: * next_seq_id_to_assign
0494: * used only by the sequencer to assign sequence ids to requests
0495: * and resend them to the group
0496: * note: this value is only meaningful when non-negative
0497: */
0498: private long next_seq_id_to_assign = -1;
0499:
0500: private final static long INIT_SEQ_ID = 10; // this value is pretty much arbitrary (should be positive though)
0501:
0502: /**
0503: * queued_messages
0504: * broadcast messages that we received that we are storing so that we can
0505: * deterministically order the messages based on their sequence ids
0506: */
0507: private final SavedMessages queued_messages = new SavedMessages();
0508:
0509: /**
0510: * ack_history
0511: * used only by the sequencer
0512: * stores the cumulative acks for each member of the group
0513: * also stores messages that may be needed for resend requests
0514: * (i.e. messages that have not been acked by all group members)
0515: */
0516: private MessageAcks ack_history = null;
0517:
0518: /**
0519: * retrans_thread
0520: * thread that handles sending requests to the sequencer for messages
0521: * that may not have been received but were expected to arrive
0522: */
0523: private final TotalRetransmissionThread retrans_thread = new TotalRetransmissionThread(
0524: this );
0525:
0526: final Log log = LogFactory.getLog(TOTAL_OLD.class);
0527:
0528: /**
0529: * returns the unique name of this protocol
0530: */
0531: public String getName() {
0532: return PROTOCOL_NAME;
0533: }
0534:
0535: public void start() throws Exception {
0536: // Start work
0537: retrans_thread.start();
0538: }
0539:
0540: public void stop() {
0541: // stop the retransmission thread
0542: retrans_thread.stopResendRequests();
0543: }
0544:
0545: /**
0546: * Just remove if you don't need to reset any state
0547: */
0548: public void reset() {
0549: // TODO: find out when this would be called, maybe do more here
0550:
0551: // don't accept any messages until we receive a TOTAL_NEW_VIEW message from the sequencer
0552: next_seq_id = -1;
0553: // clear (i.e. delete) any messages that did not get propagated up
0554: queued_messages.clearMessages();
0555:
0556: // reset the retransmission thread state
0557: retrans_thread.reset();
0558: }
0559:
0560: /**
0561: * @return the next sequence id expected to be received in this view
0562: */
0563: protected long getNextSeqID() {
0564: return next_seq_id;
0565: }
0566:
0567: /**
0568: * Returns the sequence id of the "first" queued message
0569: * (i.e., the lowest seq id queued).
0570: * @return the sequence id of the queued message, or -1 if no messages are queued.
0571: */
0572: protected long getFirstQueuedSeqID() {
0573: return queued_messages.getFirstSeq();
0574: }
0575:
0576: /**
0577: * handles an Event coming up the Protocol Stack
0578: */
0579: public void up(Event evt) {
0580: Message msg;
0581:
0582: //System.out.println("UP: " + evt);
0583:
0584: Object temp_obj; // used for type checking before performing casts
0585: switch (evt.getType()) {
0586:
0587: case Event.SET_LOCAL_ADDRESS:
0588: temp_obj = evt.getArg();
0589: if (temp_obj instanceof Address) {
0590: local_addr = (Address) temp_obj;
0591: } else {
0592: log
0593: .error("Error: Total.up() - could not cast local address to an Address object");
0594: }
0595: break;
0596:
0597: case Event.MSG:
0598: // get the message and the header for the TOTAL_OLD layer
0599: temp_obj = evt.getArg();
0600: if (temp_obj instanceof Message) {
0601: msg = (Message) temp_obj;
0602: temp_obj = msg.removeHeader(getName());
0603: if (temp_obj instanceof TotalHeader) {
0604: TotalHeader hdr = (TotalHeader) temp_obj;
0605:
0606: // switch on the "command" defined by the header
0607: switch (hdr.total_header_type) {
0608:
0609: case TotalHeader.TOTAL_UNICAST:
0610: // don't process this message, just pass it up (TotalHeader header already removed)
0611: passUp(evt);
0612: return;
0613:
0614: case TotalHeader.TOTAL_BCAST:
0615: handleBCastMessage(msg, hdr.seq_id);
0616: break;
0617:
0618: case TotalHeader.TOTAL_REQUEST:
0619: // if we are the sequencer, respond to this request
0620: if (isSequencer()) {
0621: handleRequestMessage(msg);
0622: }
0623: break;
0624:
0625: case TotalHeader.TOTAL_NEW_VIEW:
0626: // store the sequence id that we should expect next
0627: next_seq_id = hdr.seq_id;
0628:
0629: // TODO: need to send some sort of ACK or something to the sequencer (maybe)
0630: break;
0631:
0632: case TotalHeader.TOTAL_CUM_SEQ_ACK:
0633: // if we are the sequencer, update state
0634: if (isSequencer()) {
0635: temp_obj = msg.getSrc();
0636: if (temp_obj instanceof Address) {
0637: ack_history.setSeq((Address) temp_obj,
0638: hdr.seq_id);
0639: } else {
0640: log
0641: .error("Error: TOTAL_OLD.Up() - could not cast source of message to an Address object (case TotalHeader.TOTAL_CUM_SEQ_ACK)");
0642: }
0643: }
0644: break;
0645:
0646: case TotalHeader.TOTAL_RESEND:
0647: // if we are the sequencer, respond to this request
0648: if (isSequencer()) {
0649: handleResendRequest(msg, hdr.seq_id);
0650: }
0651: break;
0652:
0653: default:
0654: // unrecognized header type - discard message
0655: log
0656: .error("Error: TOTAL_OLD.up() - unrecognized TotalHeader in message - "
0657: + hdr.toString());
0658: return; // don't let it call passUp()
0659: } // switch( hdr.total_header_type )
0660: } else {
0661: log
0662: .error("Error: TOTAL_OLD.up() - could not cast message header to TotalHeader (case Event.MSG)");
0663: } // if ( temp_obj instanceof TotalHeader )
0664: } else {
0665: log
0666: .error("Error: TOTAL_OLD.up() - could not cast argument of Event to a Message (case Event.MSG)");
0667: } // if ( temp_obj instanceof Address )
0668:
0669: //System.out.println("The message is " + msg);
0670: return; // don't blindly pass up messages immediately (if at all)
0671:
0672: // begin mms21
0673: /*
0674: case Event.BECOME_SERVER:
0675: System.out.println( "Become Server event passed up to TOTAL_OLD (debug - mms21)" );
0676: break;
0677: */
0678:
0679: case Event.TMP_VIEW: // TODO: this may be temporary
0680: case Event.VIEW_CHANGE:
0681: System.out
0682: .println("View Change event passed up to TOTAL_OLD (debug - mms21)");
0683: View new_view = (View) evt.getArg();
0684: members = new_view.getMembers();
0685: // print the members of this new view
0686: System.out
0687: .println("New view members (printed in TOTAL_OLD):");
0688: int view_size = members.size();
0689: for (int i = 0; i < view_size; i++) {
0690: System.out.println(" "
0691: + members.elementAt(i).toString());
0692: }
0693:
0694: // reset the state for total ordering for this new view
0695: reset();
0696:
0697: // if we are the sequencer in this new view, send a new
0698: // TOTAL_NEW_VIEW message to the group
0699: if (isSequencer()) {
0700: // we are the sequencer in this new view
0701: log
0702: .error("TOTAL_OLD.up() - I am the sequencer of this new view");
0703:
0704: // we need to keep track of acknowledgements messages
0705: ack_history = new MessageAcks(members);
0706:
0707: // start assigning messages with this sequence id
0708: next_seq_id_to_assign = INIT_SEQ_ID;
0709:
0710: // send a message to the group with the initial sequence id to expect
0711: Message new_view_msg = new Message(null, local_addr,
0712: null);
0713: new_view_msg.putHeader(getName(), new TotalHeader(
0714: TotalHeader.TOTAL_NEW_VIEW,
0715: next_seq_id_to_assign));
0716: passDown(new Event(Event.MSG, new_view_msg));
0717: }
0718:
0719: break;
0720: // end mms21
0721:
0722: default:
0723: break;
0724: } // switch( evt.getType() )
0725:
0726: passUp(evt); // Pass up to the layer above us
0727: }
0728:
0729: /**
0730: * passes up (calling passUp()) any stored messages eligible according to
0731: * the total ordering property
0732: */
0733: private synchronized int passUpMessages() {
0734: if (next_seq_id < 0) {
0735: // don't know what to pass up so don't pass up anything
0736: return 0;
0737: }
0738:
0739: long lowest_seq_stored = queued_messages.getFirstSeq();
0740: if (lowest_seq_stored < 0) {
0741: // there are no messages stored
0742: return 0;
0743: }
0744: if (lowest_seq_stored < next_seq_id) {
0745: // it is bad to have messages stored that have a lower sequence id than what
0746: // we are expecting
0747: log
0748: .error("Error: TOTAL_OLD.passUpMessages() - next expected sequence id ("
0749: + next_seq_id
0750: + ") is greater than the sequence id of a stored message ("
0751: + lowest_seq_stored + ')');
0752: return 0;
0753: } else if (next_seq_id == lowest_seq_stored) {
0754: // we can pass this first message up the Protocol Stack
0755: Message msg = queued_messages.getFirstMessage();
0756: if (msg == null) {
0757: log
0758: .error("Error: TOTAL_OLD.passUpMessages() - unexpected null Message retrieved from stored messages");
0759: return 0;
0760: }
0761: passUp(new Event(Event.MSG, msg));
0762:
0763: // increment the next expected sequence id
0764: next_seq_id++;
0765:
0766: return (1 + passUpMessages());
0767: } else {
0768: /* don't drop messages, it should be requesting resends
0769: // all messages stored have sequence ids greater than expected
0770: if ( queued_messages.getSize() > 10 ) {
0771: {
0772: log.error( "WARNING: TOTAL_OLD.passUpMessages() - more than 10 messages saved" );
0773: log.error( "Dropping sequence id: " + next_seq_id );
0774: }
0775: next_seq_id++;
0776: return passUpMessages();
0777: }
0778: */
0779: return 0;
0780: }
0781: }
0782:
0783: private final long last_request_time = -1;
0784:
0785: /**
0786: * stores the message in the list of messages. also passes up any messages
0787: * if it can (i.e. if it satisfies total ordering).
0788: * if the sequence for the next expected message is unknown, the message is
0789: * discarded without being stored
0790: */
0791: private synchronized void handleBCastMessage(Message msg, long seq) {
0792: /* store the message anyway, hopefully we'll get a TOTAL_NEW_VIEW message later
0793: if ( next_seq < 0 ) {
0794: // don't know what sequence id to expect
0795: log.error( "TOTAL_OLD.handleBCastMessage() - received broadcast message but don't know what sequence id to expect" );
0796: return;
0797: }
0798: */
0799:
0800: if (seq < next_seq_id) {
0801: // we're expecting a message with a greater sequence id
0802: // hopefully, we've already seen this message so just ignore it
0803: return;
0804: }
0805:
0806: // save this message in the list of received broadcast messages
0807: queued_messages.insertMessage(msg, seq);
0808:
0809: // try to pass up any messages
0810: int num_passed = passUpMessages();
0811: // TODO: this if is temporary (debug)
0812: if (num_passed > 1)
0813: log.error("TOTAL_OLD.handleBCastMessage() - " + num_passed
0814: + " message(s) passed up the Protocol Stack");
0815:
0816: /* this is handles by the retransmission thread now
0817: // see if we may need to issue any resend requests
0818: if ( queued_messages.getSize() > 1 ) { // TODO: magical constant N?
0819: Address sequencer = getSequencer();
0820: //Object sequencer = msg.makeReply().getSrc(); // test (debug)
0821: if ( sequencer == null ) {
0822: // couldn't get the sequencer of the group
0823: log.error( "TOTAL_OLD.handleBCastMessage() - couldn't determine sequencer to send a TOTAL_RESEND request" );
0824: return;
0825: }
0826:
0827: if ( local_addr == null ) {
0828: // don't know local address, can't set source of message
0829: log.error( "TOTAL_OLD.handleBCastMessage() - do not know local address so cannot send resend request for message " + seq );
0830: return;
0831: }
0832:
0833: long time_now = System.currentTimeMillis();
0834: if ( (last_request_time >= 0) && ((time_now - last_request_time) < 1000) ) {
0835: return;
0836: } else {
0837: last_request_time = time_now;
0838: }
0839: // request a resend request for all missing sequence ids
0840: // from the next one expected up to the "earliest" queued one
0841: // TODO: (works a little different now)
0842: long first_queued_seq = queued_messages.getFirstSeq();
0843: long max_resend_seq = ((next_seq_id + 10) > first_queued_seq) ? first_queued_seq : (next_seq_id + 10);
0844: for( long resend_seq=next_seq_id; resend_seq<=max_resend_seq ; resend_seq++ ) {
0845: Message resend_msg = new Message( sequencer, local_addr, null );
0846: resend_msg.putHeader(getName(), new TotalHeader( TotalHeader.TOTAL_RESEND, resend_seq ) );
0847: passDown( new Event( Event.MSG, resend_msg ) );
0848: log.error( "TOTAL_OLD.handleBCastMessage() - resend requested for message " + resend_seq );
0849: }
0850: }
0851: */
0852: }
0853:
0854: /**
0855: * respond to a request message by broadcasting a copy of the message to the group
0856: * with the next sequence id assigned to it
0857: * if we do not know what the next sequence id is to assign, discard the message
0858: */
0859: private synchronized void handleRequestMessage(Message msg) {
0860: if (next_seq_id_to_assign < 0) {
0861: // we cannot assign a valid sequence id
0862: log
0863: .error("Error: TOTAL_OLD.handleRequestMessage() - cannot handle request... do not know what sequence id to assign");
0864: return;
0865: }
0866:
0867: // make the message a broadcast message to the group
0868: msg.setDest(null);
0869:
0870: // set the source of the message to be me
0871: msg.setSrc(local_addr);
0872:
0873: // add the sequence id to the message
0874: msg.putHeader(getName(), new TotalHeader(
0875: TotalHeader.TOTAL_BCAST, next_seq_id_to_assign));
0876:
0877: // store a copy of this message is the history
0878: Message msg_copy = msg.copy();
0879: ack_history.addMessage(msg_copy, next_seq_id_to_assign);
0880:
0881: // begin debug
0882: Object header = msg_copy.getHeader(getName());
0883: if (!(header instanceof TotalHeader)) {
0884: log
0885: .error("Error: TOTAL_OLD.handleRequestMessage() - BAD: stored message that did not contain a TotalHeader - "
0886: + next_seq_id_to_assign);
0887: }
0888: // end debug
0889:
0890: // increment the next sequence id to use
0891: next_seq_id_to_assign++;
0892:
0893: // pass this new Message (wrapped in an Event) down the Protocol Stack
0894: passDown(new Event(Event.MSG, msg));
0895: }
0896:
0897: /**
0898: * respond to a request to resend a message with the specified sequence id
0899: */
0900: private synchronized void handleResendRequest(Message msg, long seq) {
0901: log
0902: .error("TOTAL_OLD.handleRequestMessage() - received resend request for message "
0903: + seq);
0904:
0905: /* just rebroadcast for now because i can't get the source - this is bad (TODO: fix this)
0906: Object requester = msg.makeReply().getSrc(); // Address? of requester - test (debug)
0907: /*
0908: Object temp_obj = msg.getSrc();
0909: if ( temp_obj instanceof Address ) {
0910: Address requester = (Address) temp_obj;
0911: } else {
0912: log.error( "Error: TOTAL_OLD.handleResendRequest() - could not cast source of message to an Address" );
0913: return;
0914: }
0915: * /
0916: if ( requester == null ) {
0917: // don't know who to send this back to
0918: log.error( "TOTAL_OLD.handleResendRequest() - do not know who requested this resend request for sequence " + seq );
0919: return;
0920: }
0921: */
0922: Address requester = null;
0923: // log.error( "TOTAL_OLD: got here - 1" );
0924: Message resend_msg = ack_history.getMessage(seq);
0925: // log.error( "TOTAL_OLD: got here - 2" );
0926: if (resend_msg == null) {
0927: // couldn't find this message in the history
0928: log
0929: .error("TOTAL_OLD.handleResendRequest() - could not find the message "
0930: + seq + " in the history to resend");
0931: return;
0932: }
0933: resend_msg.setDest(requester);
0934:
0935: // note: do not need to add a TotalHeader because it should already be a
0936: // TOTAL_BCAST message
0937: // begin debug
0938: Object header = resend_msg.getHeader(getName());
0939: if (header instanceof TotalHeader) {
0940: //log.error( "TOTAL_OLD: resend msg GOOD (header is TotalHeader) - " + seq );
0941: } else {
0942: log
0943: .error("TOTAL_OLD: resend msg BAD (header is NOT a TotalHeader) - "
0944: + seq);
0945: }
0946: // end debug
0947:
0948: passDown(new Event(Event.MSG, resend_msg));
0949: log
0950: .error("TOTAL_OLD.handleResendRequest() - responded to resend request for message "
0951: + seq);
0952: }
0953:
0954: /**
0955: * handles an Event coming down the Protocol Stack
0956: */
0957: public void down(Event evt) {
0958: Message msg;
0959:
0960: //System.out.println("DOWN: " + evt);
0961:
0962: switch (evt.getType()) {
0963:
0964: case Event.VIEW_CHANGE:
0965: // this will probably never happen
0966: log.error("NOTE: VIEW_CHANGE Event going down through "
0967: + PROTOCOL_NAME);
0968:
0969: Vector new_members = ((View) evt.getArg()).getMembers();
0970: synchronized (members) {
0971: members.removeAllElements();
0972: if (new_members != null && new_members.size() > 0)
0973: for (int i = 0; i < new_members.size(); i++)
0974: members.addElement(new_members.elementAt(i));
0975: }
0976: break;
0977:
0978: case Event.MSG:
0979: Object temp_obj = evt.getArg();
0980: if (temp_obj instanceof Message) {
0981: msg = (Message) temp_obj;
0982:
0983: // note: a TotalHeader is added to every message (Event.MSG)
0984: // that is sent
0985:
0986: // check if this is a broadcast message
0987: if (msg.getDest() == null) {
0988: // yes, this is a broadcast message
0989:
0990: // send out a request for a message to be broadcast
0991: // (the sequencer will handle this)
0992: Address sequencer = getSequencer();
0993: if (sequencer != null) {
0994: // we only need to send the request to the sequencer (who will broadcast it)
0995: msg.setDest(sequencer);
0996: } else {
0997: // couldn't find sequencer of the group
0998: // for now, just send it to the original destination
0999: // (don't need to do anything here)
1000: }
1001:
1002: //msg.putHeader(getName(), TotalHeader.getRequestHeader() );
1003: msg.putHeader(getName(), new TotalHeader(
1004: TotalHeader.TOTAL_REQUEST, -1));
1005:
1006: } else {
1007: // this is a point to point unicast message so just send it to its original destination
1008: msg.putHeader(getName(), new TotalHeader(
1009: TotalHeader.TOTAL_UNICAST, -1)); // sequence id in header is irrelevant
1010: }
1011: } else {
1012: log
1013: .error("Error: TOTAL_OLD.down() - could not cast argument of Event to a Message (case Event.MSG)");
1014: } // if ( temp_obj instanceof Message )
1015: break;
1016:
1017: default:
1018: break;
1019: } // switch( evt.getType() )
1020:
1021: passDown(evt); // Pass on to the layer below us
1022:
1023: }
1024:
1025: /**
1026: * returns true if we are currently the sequencer of the group;
1027: * returns false otherwise
1028: * note: returns false if our local address is unknown, or the list of members is
1029: * empty
1030: */
1031: private boolean isSequencer() {
1032: if (local_addr == null) {
1033: // don't know my own local address
1034: log
1035: .error("TOTAL_OLD.isSequencer() - local address unknown!");
1036: return false;
1037: }
1038:
1039: synchronized (members) {
1040: if (members.size() == 0) {
1041: // there are no members listed for the group (not even myself)
1042: log.error("TOTAL_OLD.isSequencer() - no members!");
1043: return false;
1044: }
1045:
1046: Object temp_obj = members.elementAt(0);
1047: if (temp_obj instanceof Address) {
1048: Address seq_addr = (Address) temp_obj;
1049: return local_addr.equals(seq_addr);
1050: } else {
1051: log
1052: .error("Error: TOTAL_OLD.isSequencer() - could not cast element of \"members\" to an Address");
1053: return false;
1054: } // if ( temp_obj instanceof Address )
1055: }
1056: }
1057:
1058: /**
1059: * returns the Address of the local machine
1060: * returns null if it is not known yet
1061: */
1062: protected Address getLocalAddr() {
1063: return local_addr;
1064: }
1065:
1066: /**
1067: * returns the address of the current sequencer of the group
1068: * returns null if the list of members is empty
1069: */
1070: protected Address getSequencer() {
1071: synchronized (members) {
1072: if (members.size() == 0) {
1073: log.error("TOTAL_OLD.getSequencer() - no members");
1074: return null;
1075: } else {
1076: Object temp_obj = members.elementAt(0);
1077: if (temp_obj instanceof Address) {
1078: return (Address) temp_obj;
1079: } else {
1080: log
1081: .error("Error: TOTAL_OLD.getSequencer() - could not cast first element of \"members\" to an Address");
1082: return null;
1083: }
1084: }
1085: }
1086: }
1087:
1088: /**
1089: * class TotalHeader
1090: * <p/>
1091: * The header that is prepended to every message passed down through the TOTAL_OLD layer
1092: * and removed (and processed) from every message passed up through the TOTAL_OLD layer
1093: */
1094: public static class TotalHeader extends Header {
1095: // Total message types
1096: public final static int TOTAL_UNICAST = 0; // a point to point unicast message that should not be processed by TOTAL_OLD
1097: public final static int TOTAL_BCAST = 1; // message broadcast by the sequencer
1098: public final static int TOTAL_REQUEST = 2; // request for a message to be broadcast
1099: public final static int TOTAL_NEW_VIEW = 3; // reset with a view change, sequence number also reset
1100: public final static int TOTAL_NEW_VIEW_ACK = 4; // acknowledgement of new view and sequence id
1101: public final static int TOTAL_CUM_SEQ_ACK = 5; // cumulatively acknowledge the reception of messages up to a sequence id
1102: public final static int TOTAL_SEQ_ACK = 6; // acknowledge the reception of a message with a certain sequence id (probably won't be used)
1103: public final static int TOTAL_RESEND = 7; // request the message with a certain sequence id
1104:
1105: public int total_header_type;
1106:
1107: final Log log = LogFactory.getLog(TotalHeader.class);
1108:
1109: // TODO: finish commenting meaning of seq_id for different header types
1110: /**
1111: * For TOTAL_BCAST messages, seq_id is used to determine the order of messages
1112: * in the view. The seq_id is expected to increment by one for each new message
1113: * sent in the current view. this sequence id is reset with each new view.
1114: * the GMS layer should make sure that messages sent in one view are not
1115: * received in another view.
1116: * For TOTAL_REQUEST messages, seq_id is not used.
1117: * For TOTAL_NEW_VIEW, seq_id is the sequence id that the sequencer of this
1118: * view will use for the first message broadcast to the group
1119: * (i.e. the expected sequence id is "reset" to this value).
1120: * For TOTAL_NEW_VIEW_ACK, ..
1121: * For TOTAL_CUM_SEQ_ACK messages, the seq_id is the cumulative sequence id
1122: * that the sender has received.
1123: * For TOTAL_SEQ_ACK messages, seq_id is the sequence id that is being acknowledged.
1124: * For TOTAL_RESEND, seq_id is the sequence id to be sent again.
1125: */
1126: public long seq_id; // see use above (varies between types of headers)
1127:
1128: public TotalHeader() {
1129: } // used for externalization
1130:
1131: public TotalHeader(int type, long seq) {
1132: switch (type) {
1133: case TOTAL_UNICAST:
1134: case TOTAL_BCAST:
1135: case TOTAL_REQUEST:
1136: case TOTAL_NEW_VIEW:
1137: case TOTAL_NEW_VIEW_ACK:
1138: case TOTAL_CUM_SEQ_ACK:
1139: case TOTAL_SEQ_ACK:
1140: case TOTAL_RESEND:
1141: // the given type is a known one
1142: total_header_type = type;
1143: break;
1144:
1145: default:
1146: // this type is unknown
1147: log
1148: .error("Error: TotalHeader.TotalHeader() - unknown TotalHeader type given: "
1149: + type);
1150: total_header_type = -1;
1151: break;
1152: }
1153:
1154: seq_id = seq;
1155: }
1156:
1157: //static TotalHeader getRequestHeader() {
1158: //return new TotalHeader( TOTAL_REQUEST, -1 ); // sequence id is irrelevant
1159: //}
1160:
1161: public String toString() {
1162: String type = "";
1163: switch (total_header_type) {
1164: case TOTAL_UNICAST:
1165: type = "TOTAL_UNICAST";
1166: break;
1167:
1168: case TOTAL_BCAST:
1169: type = "TOTAL_BCAST";
1170: break;
1171:
1172: case TOTAL_REQUEST:
1173: type = "TOTAL_REQUEST";
1174: break;
1175:
1176: case TOTAL_NEW_VIEW:
1177: type = "NEW_VIEW";
1178: break;
1179:
1180: case TOTAL_NEW_VIEW_ACK:
1181: type = "NEW_VIEW_ACK";
1182: break;
1183:
1184: case TOTAL_CUM_SEQ_ACK:
1185: type = "TOTAL_CUM_SEQ_ACK";
1186: break;
1187:
1188: case TOTAL_SEQ_ACK:
1189: type = "TOTAL_SEQ_ACK";
1190: break;
1191:
1192: case TOTAL_RESEND:
1193: type = "TOTAL_RESEND";
1194: break;
1195:
1196: default:
1197: type = "UNKNOWN TYPE (" + total_header_type + ')';
1198: break;
1199: }
1200:
1201: return "[ TOTAL_OLD: type=" + type + ", seq=" + seq_id
1202: + " ]";
1203: }
1204:
1205: public void writeExternal(ObjectOutput out) throws IOException {
1206: out.writeInt(total_header_type);
1207: out.writeLong(seq_id);
1208: }
1209:
1210: public void readExternal(ObjectInput in) throws IOException,
1211: ClassNotFoundException {
1212: total_header_type = in.readInt();
1213: seq_id = in.readLong();
1214: }
1215:
1216: } // class TotalHeader
1217:
1218: } // class TOTAL_OLD
1219:
1220: /**
1221: * **************************************************************************
1222: * class TotalRetransmissionThread
1223: * <p/>
1224: * thread that handles retransmission for the TOTAL_OLD protocol
1225: * **************************************************************************
1226: */
1227: class TotalRetransmissionThread extends Thread {
1228: // state variables to determine when and what to request
1229: private long last_retrans_request_time; // last time (in milliseconds) that we sent a resend request
1230: private long last_requested_seq; // latest sequence id that we have requested
1231:
1232: // retransmission constants
1233: final private static long polling_delay = 1000; // how long (in milliseconds) to sleep before rechecking for resend
1234: final private static long resend_timeout = 2000; // amount of time (in milliseconds) to wait on a resend request before resending another request
1235: final private static int max_request = 10; // maximum number of resend request to send out in one iteration
1236:
1237: // reference to the parent TOTAL_OLD protocol instance
1238: private TOTAL_OLD prot_ptr;
1239:
1240: // flag to specify if the thread should continue running
1241: private boolean is_running;
1242:
1243: final Log log = LogFactory.getLog(TotalRetransmissionThread.class);
1244:
1245: /**
1246: * constructor
1247: * <p/>
1248: * creates and initializes a retransmission thread for the
1249: * specified instance of a TOTAL_OLD protocol
1250: */
1251: TotalRetransmissionThread(TOTAL_OLD parent_prot) {
1252: super (Util.getGlobalThreadGroup(), "retransmission thread");
1253: if (parent_prot != null) {
1254: prot_ptr = parent_prot;
1255: } else {
1256: // parent thread not specified
1257: log
1258: .fatal("given parent protocol reference is null\n (FATAL ERROR - TOTAL_OLD protocol will not function properly)");
1259:
1260: // prevent the run method from doing any work
1261: is_running = false;
1262: }
1263:
1264: // initialize the state variables
1265: reset();
1266:
1267: // let the thread make resend requests
1268: is_running = true;
1269: }
1270:
1271: /**
1272: * resets the state of the thread as if it was just started
1273: * the thread will assume that there were no resend requests make
1274: */
1275: public void reset() {
1276: // we have not made any resend requests for any messages
1277: last_retrans_request_time = -1;
1278: last_requested_seq = -1;
1279: }
1280:
1281: /**
1282: * send a resend request to the given sequencer (from the given local_addr)
1283: * for the given sequence id
1284: */
1285: private void sendResendRequest(Address sequencer,
1286: Address local_addr, long seq_id) {
1287: Message resend_msg = new Message(sequencer, local_addr, null);
1288: resend_msg.putHeader(getName(), new TOTAL_OLD.TotalHeader(
1289: TOTAL_OLD.TotalHeader.TOTAL_RESEND, seq_id));
1290: prot_ptr.passDown(new Event(Event.MSG, resend_msg));
1291:
1292: // debug
1293: log
1294: .error("TotalRetransmissionThread.resend() - resend requested for message "
1295: + seq_id);
1296: }
1297:
1298: /**
1299: * checks if a resend request should be made to the sequencer. if a request needs
1300: * to be made, it makes the appropriate requests with the parameters specified
1301: * by the constants in this class
1302: */
1303: private void checkForResend() {
1304: long first_seq_id = prot_ptr.getFirstQueuedSeqID(); // sequence id of first queued message
1305: /*
1306: // begin debug
1307: System.out.println( "DEBUG (TotalRetransmissionThread) - first_seq_id = " + first_seq_id );
1308: // end debug
1309: */
1310: if (first_seq_id >= 0) {
1311: // there is at least one message in the queue
1312:
1313: long next_seq_id = prot_ptr.getNextSeqID(); // next sequence id expected from the group
1314: if ((next_seq_id < first_seq_id)) { // TODO: handle case to resend TOTAL_NEW_VIEW message
1315: // there are messages that we received out of order
1316: //log.error( "DEBUG (TotalRetransmissionThread) - there are messages queued" ); // debug
1317:
1318: // see if it is time to send a request
1319: long time_now = System.currentTimeMillis();
1320: if ((next_seq_id > last_requested_seq)
1321: || (time_now > (last_retrans_request_time + resend_timeout))
1322: || (last_retrans_request_time < 0)) {
1323: // send a resend request to the sequencer
1324: //log.error( "DEBUG (TotalRetransmissionThread) - sending resend requests" ); // debug
1325: Address sequencer = prot_ptr.getSequencer();
1326: if (sequencer == null) {
1327: System.out
1328: .println("Error: (TOTAL_OLD) TotalRetransmissionThread.checkForResend() - could not determine sequencer to send a TOTAL_RESEND request");
1329:
1330: return;
1331: }
1332:
1333: Address local_addr = prot_ptr.getLocalAddr();
1334: if (local_addr == null) {
1335: System.out
1336: .println("Warning: (TOTAL_OLD) TotalRetransmissionThread.checkForResend() - local address not specified in TOTAL_RESEND request... attempting to send requests anyway");
1337: }
1338:
1339: long temp_long = (next_seq_id + max_request); // potential max seq id to request (exclusive)
1340: long last_resend_seq_id = (temp_long > first_seq_id) ? first_seq_id
1341: : temp_long;
1342: for (long resend_seq = next_seq_id; resend_seq < last_resend_seq_id; resend_seq++) {
1343: sendResendRequest(sequencer, local_addr,
1344: resend_seq);
1345: }
1346: // update state for this set of resend requests
1347: last_retrans_request_time = time_now;
1348: last_requested_seq = last_resend_seq_id;
1349: }
1350: } // if ( (next_seq_id < first_seq_id) )
1351: } // if ( first_seq_id >= 0 )
1352: // else there are no messages to request
1353: }
1354:
1355: /**
1356: * overloaded from Thread
1357: * method that executes when the thread is started
1358: */
1359: public void run() {
1360: while (is_running) {
1361: // resend any requests if necessary
1362: //log.error( "DEBUG (TotalRetransmissionThread) - heartbeat" ); // debug
1363: checkForResend();
1364:
1365: // wait before check again
1366: try {
1367: sleep(polling_delay);
1368: } catch (InterruptedException e) {
1369: } // do nothing if interrupted
1370: }
1371: }
1372:
1373: /**
1374: * stops the thread from making any further resend requests
1375: * note: the thread may not die immediately
1376: */
1377: public void stopResendRequests() {
1378: is_running = false;
1379: }
1380: } // class TotalRetransmissionThread
|