001: // $Id: AckReceiverWindow.java,v 1.20.10.3 2007/04/25 02:25:08 belaban Exp $
002:
003: package org.jgroups.stack;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.Message;
008:
009: import java.util.HashMap;
010: import java.util.TreeSet;
011:
012: /**
013: * Counterpart of AckSenderWindow. Simple FIFO buffer.
014: * Every message received is ACK'ed (even duplicates) and added to a hashmap
015: * keyed by seqno. The next seqno to be received is stored in <code>next_to_remove</code>. When a message with
016: * a seqno less than next_to_remove is received, it will be discarded. The <code>remove()</code> method removes
017: * and returns a message whose seqno is equal to next_to_remove, or null if not found.<br>
018: * Change May 28 2002 (bela): replaced TreeSet with HashMap. Keys do not need to be sorted, and adding a key to
019: * a sorted set incurs overhead.
020: *
021: * @author Bela Ban
022: */
023: public class AckReceiverWindow {
024: long next_to_remove = 0;
025: final HashMap msgs = new HashMap(); // keys: seqnos (Long), values: Messages
026: static final Log log = LogFactory.getLog(AckReceiverWindow.class);
027:
028: public AckReceiverWindow(long initial_seqno) {
029: this .next_to_remove = initial_seqno;
030: }
031:
032: /** Adds a new message. Message cannot be null */
033: public void add(long seqno, Message msg) {
034: if (msg == null)
035: throw new IllegalArgumentException("msg must be non-null");
036: synchronized (msgs) {
037: if (seqno < next_to_remove) {
038: if (log.isTraceEnabled())
039: log.trace("discarded msg with seqno=" + seqno
040: + " (next msg to receive is "
041: + next_to_remove + ')');
042: return;
043: }
044: Long seq = new Long(seqno);
045: if (!msgs.containsKey(seq)) { // todo: replace with atomic action once we have util.concurrent (JDK 5)
046: msgs.put(seq, msg);
047: } else {
048: if (log.isTraceEnabled())
049: log.trace("seqno " + seqno
050: + " already received - dropping it");
051: }
052: }
053: }
054:
055: /**
056: * Removes a message whose seqno is equal to <code>next_to_remove</code>, increments the latter.
057: * Returns message that was removed, or null, if no message can be removed. Messages are thus
058: * removed in order.
059: */
060: public Message remove() {
061: Message retval;
062:
063: synchronized (msgs) {
064: Long key = new Long(next_to_remove);
065: retval = (Message) msgs.remove(key);
066: if (retval != null) {
067: next_to_remove++;
068: if (log.isTraceEnabled())
069: log.trace("removed seqno=" + key);
070: }
071: }
072: return retval;
073: }
074:
075: public void reset() {
076: synchronized (msgs) {
077: msgs.clear();
078: }
079: }
080:
081: public int size() {
082: return msgs.size();
083: }
084:
085: public String toString() {
086: StringBuffer sb = new StringBuffer();
087: sb.append(msgs.size()).append(" msgs (").append("next=")
088: .append(next_to_remove).append(")");
089: TreeSet s = new TreeSet(msgs.keySet());
090: if (!s.isEmpty()) {
091: sb.append(" [").append(s.first()).append(" - ").append(
092: s.last()).append("]");
093: }
094: return sb.toString();
095: }
096:
097: public String printDetails() {
098: StringBuffer sb = new StringBuffer();
099: sb.append(msgs.size()).append(" msgs (").append("next=")
100: .append(next_to_remove).append(")").append(", msgs=")
101: .append(new TreeSet(msgs.keySet()));
102: return sb.toString();
103: }
104:
105: }
|