001: // $Id: NakReceiverWindow.java,v 1.28 2006/01/14 14:00:42 belaban Exp $
002:
003: package org.jgroups.stack;
004:
005: import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
006: import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
007: import org.apache.commons.logging.Log;
008: import org.apache.commons.logging.LogFactory;
009: import org.jgroups.Address;
010: import org.jgroups.Message;
011: import org.jgroups.util.List;
012: import org.jgroups.util.TimeScheduler;
013:
014: import java.util.*;
015:
016: /**
017: * Keeps track of messages according to their sequence numbers. Allows
018: * messages to be added out of order, and with gaps between sequence numbers.
019: * Method <code>remove()</code> removes the first message with a sequence
020: * number that is 1 higher than <code>next_to_remove</code> (this variable is
021: * then incremented), or it returns null if no message is present, or if no
022: * message's sequence number is 1 higher.
023: * <p>
024: * When there is a gap upon adding a message, its seqno will be added to the
025: * Retransmitter, which (using a timer) requests retransmissions of missing
026: * messages and keeps on trying until the message has been received, or the
027: * member who sent the message is suspected.
028: * <p>
029: * Started out as a copy of SlidingWindow. Main diff: RetransmitCommand is
030: * different, and retransmission thread is only created upon detection of a
031: * gap.
032: * <p>
033: * Change Nov 24 2000 (bela): for PBCAST, which has its own retransmission
034: * (via gossip), the retransmitter thread can be turned off
035: * <p>
036: * Change April 25 2001 (igeorg):<br>
037: * i. Restructuring: placed all nested class definitions at the top, then
038: * class static/non-static variables, then class private/public methods.<br>
039: * ii. Class and all nested classes are thread safe. Readers/writer lock
040: * added on <tt>NakReceiverWindow</tt> for finer grained locking.<br>
041: * iii. Internal or externally provided retransmission scheduler thread.<br>
042: * iv. Exponential backoff in time for retransmissions.<br>
043: *
044: * @author Bela Ban May 27 1999, May 2004
045: * @author John Georgiadis May 8 2001
046: */
047: public class NakReceiverWindow {
048:
049: public interface Listener {
050: void missingMessageReceived(long seqno, Message msg);
051: }
052:
053: /** The big read/write lock */
054: private final ReadWriteLock lock = new WriterPreferenceReadWriteLock();
055: //private final ReadWriteLock lock=new NullReadWriteLock();
056:
057: /** keep track of *next* seqno to remove and highest received */
058: private long head = 0;
059: private long tail = 0;
060:
061: /** lowest seqno delivered so far */
062: private long lowest_seen = 0;
063:
064: /** highest deliverable (or delivered) seqno so far */
065: private long highest_seen = 0;
066:
067: /** TreeMap<Long,Message>. Maintains messages keyed by (sorted) sequence numbers */
068: private final TreeMap received_msgs = new TreeMap();
069:
070: /** TreeMap<Long,Message>. Delivered (= seen by all members) messages. A remove() method causes a message to be
071: moved from received_msgs to delivered_msgs. Message garbage collection will gradually remove elements in this map */
072: private final TreeMap delivered_msgs = new TreeMap();
073:
074: /**
075: * Messages that have been received in order are sent up the stack (= delivered to the application). Delivered
076: * messages are removed from NakReceiverWindow.received_msgs and moved to NakReceiverWindow.delivered_msgs, where
077: * they are later garbage collected (by STABLE). Since we do retransmits only from sent messages, never
078: * received or delivered messages, we can turn the moving to delivered_msgs off, so we don't keep the message
079: * around, and don't need to wait for garbage collection to remove them.
080: */
081: private boolean discard_delivered_msgs = false;
082:
083: /** If value is > 0, the retransmit buffer is bounded: only the max_xmit_buf_size latest messages are kept,
084: * older ones are discarded when the buffer size is exceeded. A value <= 0 means unbounded buffers
085: */
086: private int max_xmit_buf_size = 0;
087:
088: /** if not set, no retransmitter thread will be started. Useful if
089: * protocols do their own retransmission (e.g PBCAST) */
090: private Retransmitter retransmitter = null;
091:
092: private Listener listener = null;
093:
094: protected static final Log log = LogFactory
095: .getLog(NakReceiverWindow.class);
096:
097: /**
098: * Creates a new instance with the given retransmit command
099: *
100: * @param sender The sender associated with this instance
101: * @param cmd The command used to retransmit a missing message, will
102: * be invoked by the table. If null, the retransmit thread will not be
103: * started
104: * @param start_seqno The first sequence number to be received
105: * @param sched the external scheduler to use for retransmission
106: * requests of missing msgs. If it's not provided or is null, an internal
107: * one is created
108: */
109: public NakReceiverWindow(Address sender,
110: Retransmitter.RetransmitCommand cmd, long start_seqno,
111: TimeScheduler sched) {
112: head = start_seqno;
113: tail = head;
114:
115: if (cmd != null)
116: retransmitter = sched == null ? new Retransmitter(sender,
117: cmd) : new Retransmitter(sender, cmd, sched);
118: }
119:
120: /**
121: * Creates a new instance with the given retransmit command
122: *
123: * @param sender The sender associated with this instance
124: * @param cmd The command used to retransmit a missing message, will
125: * be invoked by the table. If null, the retransmit thread will not be
126: * started
127: * @param start_seqno The first sequence number to be received
128: */
129: public NakReceiverWindow(Address sender,
130: Retransmitter.RetransmitCommand cmd, long start_seqno) {
131: this (sender, cmd, start_seqno, null);
132: }
133:
134: /**
135: * Creates a new instance without a retransmission thread
136: *
137: * @param sender The sender associated with this instance
138: * @param start_seqno The first sequence number to be received
139: */
140: public NakReceiverWindow(Address sender, long start_seqno) {
141: this (sender, null, start_seqno);
142: }
143:
144: public void setRetransmitTimeouts(long[] timeouts) {
145: if (retransmitter != null)
146: retransmitter.setRetransmitTimeouts(timeouts);
147: }
148:
149: public void setDiscardDeliveredMessages(boolean flag) {
150: this .discard_delivered_msgs = flag;
151: }
152:
153: public int getMaxXmitBufSize() {
154: return max_xmit_buf_size;
155: }
156:
157: public void setMaxXmitBufSize(int max_xmit_buf_size) {
158: this .max_xmit_buf_size = max_xmit_buf_size;
159: }
160:
161: public void setListener(Listener l) {
162: this .listener = l;
163: }
164:
165: /**
166: * Adds a message according to its sequence number (ordered).
167: * <p>
168: * Variables <code>head</code> and <code>tail</code> mark the start and
169: * end of the messages received, but not delivered yet. When a message is
170: * received, if its seqno is smaller than <code>head</code>, it is
171: * discarded (already received). If it is bigger than <code>tail</code>,
172: * we advance <code>tail</code> and add empty elements. If it is between
173: * <code>head</code> and <code>tail</code>, we set the corresponding
174: * missing (or already present) element. If it is equal to
175: * <code>tail</code>, we advance the latter by 1 and add the message
176: * (default case).
177: */
178: public void add(long seqno, Message msg) {
179: long old_tail;
180:
181: try {
182: lock.writeLock().acquire();
183: try {
184: old_tail = tail;
185: if (seqno < head) {
186: if (log.isTraceEnabled()) {
187: StringBuffer sb = new StringBuffer("seqno ");
188: sb.append(seqno).append(" is smaller than ")
189: .append(head).append(
190: "); discarding message");
191: log.trace(sb.toString());
192: }
193: return;
194: }
195:
196: // add at end (regular expected msg)
197: if (seqno == tail) {
198: received_msgs.put(new Long(seqno), msg);
199: tail++;
200: if (highest_seen + 2 == tail) {
201: highest_seen++;
202: } else {
203: updateHighestSeen();
204: }
205:
206: // highest_seen=seqno;
207: }
208: // gap detected
209: // i. add placeholders, creating gaps
210: // ii. add real msg
211: // iii. tell retransmitter to retrieve missing msgs
212: else if (seqno > tail) {
213: for (long i = tail; i < seqno; i++) {
214: received_msgs.put(new Long(i), null);
215: // XmitEntry xmit_entry=new XmitEntry();
216: //xmits.put(new Long(i), xmit_entry);
217: tail++;
218: }
219: received_msgs.put(new Long(seqno), msg);
220: tail = seqno + 1;
221: if (retransmitter != null) {
222: retransmitter.add(old_tail, seqno - 1);
223: }
224: } else if (seqno < tail) { // finally received missing message
225: if (log.isTraceEnabled()) {
226: log
227: .trace(new StringBuffer(
228: "added missing msg ").append(
229: msg.getSrc()).append('#')
230: .append(seqno));
231: }
232: if (listener != null) {
233: try {
234: listener.missingMessageReceived(seqno, msg);
235: } catch (Throwable t) {
236: }
237: }
238:
239: Object val = received_msgs.get(new Long(seqno));
240: if (val == null) {
241: // only set message if not yet received (bela July 23 2003)
242: received_msgs.put(new Long(seqno), msg);
243:
244: if (highest_seen + 1 == seqno || seqno == head)
245: updateHighestSeen();
246:
247: //XmitEntry xmit_entry=(XmitEntry)xmits.get(new Long(seqno));
248: //if(xmit_entry != null)
249: // xmit_entry.received=System.currentTimeMillis();
250: //long xmit_diff=xmit_entry == null? -1 : xmit_entry.received - xmit_entry.created;
251: //NAKACK.addXmitResponse(msg.getSrc(), seqno);
252: if (retransmitter != null)
253: retransmitter.remove(seqno);
254: }
255: }
256: updateLowestSeen();
257: } finally {
258: lock.writeLock().release();
259: }
260: } catch (InterruptedException e) {
261: log.error("failed acquiring write lock", e);
262: }
263: }
264:
265: /** Start from the current sequence number and set highest_seen until we find a gap (null value in the entry) */
266: void updateHighestSeen() {
267: SortedMap map = received_msgs.tailMap(new Long(highest_seen));
268: Map.Entry entry;
269: for (Iterator it = map.entrySet().iterator(); it.hasNext();) {
270: entry = (Map.Entry) it.next();
271: if (entry.getValue() != null)
272: highest_seen = ((Long) entry.getKey()).longValue();
273: else
274: break;
275: }
276: }
277:
278: public Message remove() {
279: Message retval = null;
280: Long key;
281: boolean bounded_buffer_enabled = max_xmit_buf_size > 0;
282:
283: try {
284: lock.writeLock().acquire();
285: try {
286: while (received_msgs.size() > 0) {
287: key = (Long) received_msgs.firstKey();
288: retval = (Message) received_msgs.get(key);
289: if (retval != null) { // message exists and is ready for delivery
290: received_msgs.remove(key); // move from received_msgs to ...
291: if (discard_delivered_msgs == false) {
292: delivered_msgs.put(key, retval); // delivered_msgs
293: }
294: head++; // is removed from retransmitter somewhere else (when missing message is received)
295: return retval;
296: } else { // message has not yet been received (gap in the message sequence stream)
297: if (bounded_buffer_enabled
298: && received_msgs.size() > max_xmit_buf_size) {
299: received_msgs.remove(key); // move from received_msgs to ...
300: head++;
301: retransmitter.remove(key.longValue());
302: } else {
303: break;
304: }
305: }
306: }
307: return retval;
308: } finally {
309: lock.writeLock().release();
310: }
311: } catch (InterruptedException e) {
312: log.error("failed acquiring write lock", e);
313: return null;
314: }
315: }
316:
317: /**
318: * Delete all messages <= seqno (they are stable, that is, have been
319: * received at all members). Stop when a number > seqno is encountered
320: * (all messages are ordered on seqnos).
321: */
322: public void stable(long seqno) {
323: try {
324: lock.writeLock().acquire();
325: try {
326: // we need to remove all seqnos *including* seqno: because headMap() *excludes* seqno, we
327: // simply increment it, so we have to correct behavior
328: SortedMap m = delivered_msgs
329: .headMap(new Long(seqno + 1));
330: if (m.size() > 0)
331: lowest_seen = Math.max(lowest_seen, ((Long) m
332: .lastKey()).longValue());
333: m.clear(); // removes entries from delivered_msgs
334: } finally {
335: lock.writeLock().release();
336: }
337: } catch (InterruptedException e) {
338: log.error("failed acquiring write lock", e);
339: }
340: }
341:
342: /**
343: * Reset the retransmitter and the nak window<br>
344: */
345: public void reset() {
346: try {
347: lock.writeLock().acquire();
348: try {
349: if (retransmitter != null)
350: retransmitter.reset();
351: _reset();
352: } finally {
353: lock.writeLock().release();
354: }
355: } catch (InterruptedException e) {
356: log.error("failed acquiring write lock", e);
357: }
358: }
359:
360: /**
361: * Stop the retransmitter and reset the nak window<br>
362: */
363: public void destroy() {
364: try {
365: lock.writeLock().acquire();
366: try {
367: if (retransmitter != null)
368: retransmitter.stop();
369: _reset();
370: } finally {
371: lock.writeLock().release();
372: }
373: } catch (InterruptedException e) {
374: log.error("failed acquiring write lock", e);
375: }
376: }
377:
378: /**
379: * @return the highest sequence number of a message consumed by the
380: * application (by <code>remove()</code>)
381: */
382: public long getHighestDelivered() {
383: try {
384: lock.readLock().acquire();
385: try {
386: return (Math.max(head - 1, -1));
387: } finally {
388: lock.readLock().release();
389: }
390: } catch (InterruptedException e) {
391: log.error("failed acquiring read lock", e);
392: return -1;
393: }
394: }
395:
396: /**
397: * @return the lowest sequence number of a message that has been
398: * delivered or is a candidate for delivery (by the next call to
399: * <code>remove()</code>)
400: */
401: public long getLowestSeen() {
402: try {
403: lock.readLock().acquire();
404: try {
405: return (lowest_seen);
406: } finally {
407: lock.readLock().release();
408: }
409: } catch (InterruptedException e) {
410: log.error("failed acquiring read lock", e);
411: return -1;
412: }
413: }
414:
415: /**
416: * Returns the highest deliverable seqno; e.g., for 1,2,3,5,6 it would
417: * be 3.
418: *
419: * @see NakReceiverWindow#getHighestReceived
420: */
421: public long getHighestSeen() {
422: try {
423: lock.readLock().acquire();
424: try {
425: return (highest_seen);
426: } finally {
427: lock.readLock().release();
428: }
429: } catch (InterruptedException e) {
430: log.error("failed acquiring read lock", e);
431: return -1;
432: }
433: }
434:
435: /**
436: * Find all messages between 'low' and 'high' (including 'low' and
437: * 'high') that have a null msg.
438: * Return them as a list of longs
439: *
440: * @return List<Long>. A list of seqnos, sorted in ascending order.
441: * E.g. [1, 4, 7, 8]
442: */
443: public List getMissingMessages(long low, long high) {
444: List retval = new List();
445: // long my_high;
446:
447: if (low > high) {
448: if (log.isErrorEnabled())
449: log.error("invalid range: low (" + low
450: + ") is higher than high (" + high + ')');
451: return null;
452: }
453:
454: try {
455: lock.readLock().acquire();
456: try {
457:
458: // my_high=Math.max(head - 1, 0);
459: // check only received messages, because delivered messages *must* have a non-null msg
460: SortedMap m = received_msgs.subMap(new Long(low),
461: new Long(high + 1));
462: for (Iterator it = m.keySet().iterator(); it.hasNext();) {
463: retval.add(it.next());
464: }
465:
466: // if(received_msgs.size() > 0) {
467: // entry=(Entry)received_msgs.peek();
468: // if(entry != null) my_high=entry.seqno;
469: // }
470: // for(long i=my_high + 1; i <= high; i++)
471: // retval.add(new Long(i));
472:
473: return retval;
474: } finally {
475: lock.readLock().release();
476: }
477: } catch (InterruptedException e) {
478: log.error("failed acquiring read lock", e);
479: return null;
480: }
481: }
482:
483: /**
484: * Returns the highest sequence number received so far (which may be
485: * higher than the highest seqno <em>delivered</em> so far; e.g., for
486: * 1,2,3,5,6 it would be 6.
487: *
488: * @see NakReceiverWindow#getHighestSeen
489: */
490: public long getHighestReceived() {
491: try {
492: lock.readLock().acquire();
493: try {
494: return Math.max(tail - 1, -1);
495: } finally {
496: lock.readLock().release();
497: }
498: } catch (InterruptedException e) {
499: log.error("failed acquiring read lock", e);
500: return -1;
501: }
502: }
503:
504: /**
505: * Return messages that are higher than <code>seqno</code> (excluding
506: * <code>seqno</code>). Check both received <em>and</em> delivered
507: * messages.
508: * @return List<Message>. All messages that have a seqno greater than <code>seqno</code>
509: */
510: public List getMessagesHigherThan(long seqno) {
511: List retval = new List();
512:
513: try {
514: lock.readLock().acquire();
515: try {
516: // check received messages
517: SortedMap m = received_msgs
518: .tailMap(new Long(seqno + 1));
519: for (Iterator it = m.values().iterator(); it.hasNext();) {
520: retval.add((it.next()));
521: }
522:
523: // we retrieve all msgs whose seqno is strictly greater than seqno (tailMap() *includes* seqno,
524: // but we need to exclude seqno, that's why we increment it
525: m = delivered_msgs.tailMap(new Long(seqno + 1));
526: for (Iterator it = m.values().iterator(); it.hasNext();) {
527: retval.add(((Message) it.next()).copy());
528: }
529: return (retval);
530:
531: } finally {
532: lock.readLock().release();
533: }
534: } catch (InterruptedException e) {
535: log.error("failed acquiring read lock", e);
536: return null;
537: }
538: }
539:
540: /**
541: * Return all messages m for which the following holds:
542: * m > lower && m <= upper (excluding lower, including upper). Check both
543: * <code>received_msgs</code> and <code>delivered_msgs</code>.
544: */
545: public List getMessagesInRange(long lower, long upper) {
546: List retval = new List();
547:
548: try {
549: lock.readLock().acquire();
550: try {
551: // check received messages
552: SortedMap m = received_msgs.subMap(new Long(lower + 1),
553: new Long(upper + 1));
554: for (Iterator it = m.values().iterator(); it.hasNext();) {
555: retval.add(it.next());
556: }
557:
558: m = delivered_msgs.subMap(new Long(lower + 1),
559: new Long(upper + 1));
560: for (Iterator it = m.values().iterator(); it.hasNext();) {
561: retval.add(((Message) it.next()).copy());
562: }
563: return retval;
564:
565: } finally {
566: lock.readLock().release();
567: }
568: } catch (InterruptedException e) {
569: log.error("failed acquiring read lock", e);
570: return null;
571: }
572: }
573:
574: /**
575: * Return a list of all messages for which there is a seqno in
576: * <code>missing_msgs</code>. The seqnos of the argument list are
577: * supposed to be in ascending order
578: * @param missing_msgs A List<Long> of seqnos
579: * @return List<Message>
580: */
581: public List getMessagesInList(List missing_msgs) {
582: List ret = new List();
583:
584: if (missing_msgs == null) {
585: if (log.isErrorEnabled())
586: log.error("argument list is null");
587: return ret;
588: }
589:
590: try {
591: lock.readLock().acquire();
592: try {
593: Long seqno;
594: Message msg;
595: for (Enumeration en = missing_msgs.elements(); en
596: .hasMoreElements();) {
597: seqno = (Long) en.nextElement();
598: msg = (Message) delivered_msgs.get(seqno);
599: if (msg != null)
600: ret.add(msg.copy());
601: msg = (Message) received_msgs.get(seqno);
602: if (msg != null)
603: ret.add(msg.copy());
604: }
605: return ret;
606: } finally {
607: lock.readLock().release();
608: }
609: } catch (InterruptedException e) {
610: log.error("failed acquiring read lock", e);
611: return null;
612: }
613: }
614:
615: /**
616: * Returns the message from received_msgs or delivered_msgs.
617: * @param sequence_num
618: * @return Message from received_msgs or delivered_msgs.
619: */
620: public Message get(long sequence_num) {
621: Message msg;
622: Long seqno = new Long(sequence_num);
623: try {
624: lock.readLock().acquire();
625: try {
626: msg = (Message) delivered_msgs.get(seqno);
627: if (msg != null)
628: return msg;
629: msg = (Message) received_msgs.get(seqno);
630: if (msg != null)
631: return msg;
632: } finally {
633: lock.readLock().release();
634: }
635: } catch (InterruptedException e) {
636: log.error("failed acquiring read lock", e);
637: }
638: return null;
639: }
640:
641: public int size() {
642: boolean acquired = false;
643: try {
644: lock.readLock().acquire();
645: acquired = true;
646: } catch (InterruptedException e) {
647: }
648: try {
649: return received_msgs.size();
650: } finally {
651: if (acquired)
652: lock.readLock().release();
653: }
654: }
655:
656: public String toString() {
657: StringBuffer sb = new StringBuffer();
658: try {
659: lock.readLock().acquire();
660: try {
661: sb.append("received_msgs: ").append(
662: printReceivedMessages());
663: sb.append(", delivered_msgs: ").append(
664: printDeliveredMessages());
665: } finally {
666: lock.readLock().release();
667: }
668: } catch (InterruptedException e) {
669: log.error("failed acquiring read lock", e);
670: return "";
671: }
672:
673: return sb.toString();
674: }
675:
676: /**
677: * Prints delivered_msgs. Requires read lock present.
678: * @return String
679: */
680: String printDeliveredMessages() {
681: StringBuffer sb = new StringBuffer();
682: Long min = null, max = null;
683:
684: if (delivered_msgs.size() > 0) {
685: try {
686: min = (Long) delivered_msgs.firstKey();
687: } catch (NoSuchElementException ex) {
688: }
689: try {
690: max = (Long) delivered_msgs.lastKey();
691: } catch (NoSuchElementException ex) {
692: }
693: }
694: sb.append('[').append(min).append(" - ").append(max)
695: .append(']');
696: if (min != null && max != null)
697: sb.append(" (size=").append(
698: max.longValue() - min.longValue()).append(")");
699: return sb.toString();
700: }
701:
702: /**
703: * Prints received_msgs. Requires read lock to be present
704: * @return String
705: */
706: String printReceivedMessages() {
707: StringBuffer sb = new StringBuffer();
708: sb.append('[');
709: if (received_msgs.size() > 0) {
710: Long first = null, last = null;
711: try {
712: first = (Long) received_msgs.firstKey();
713: } catch (NoSuchElementException ex) {
714: }
715: try {
716: last = (Long) received_msgs.lastKey();
717: } catch (NoSuchElementException ex) {
718: }
719: sb.append(first).append(" - ").append(last);
720: int non_received = 0;
721: Map.Entry entry;
722:
723: for (Iterator it = received_msgs.entrySet().iterator(); it
724: .hasNext();) {
725: entry = (Map.Entry) it.next();
726: if (entry.getValue() == null)
727: non_received++;
728: }
729: sb.append(" (size=").append(received_msgs.size()).append(
730: ", missing=").append(non_received).append(')');
731: }
732: sb.append(']');
733: return sb.toString();
734: }
735:
736: /* ------------------------------- Private Methods -------------------------------------- */
737:
738: /**
739: * Sets the value of lowest_seen to the lowest seqno of the delivered messages (if available), otherwise
740: * to the lowest seqno of received messages.
741: */
742: private void updateLowestSeen() {
743: Long lowest_seqno = null;
744:
745: // If both delivered and received messages are empty, let the highest
746: // seen seqno be the one *before* the one which is expected to be
747: // received next by the NakReceiverWindow (head-1)
748:
749: // incorrect: if received and delivered msgs are empty, don't do anything: we may have initial values,
750: // but both lists are cleaned after some time of inactivity
751: // (bela April 19 2004)
752: /*
753: if((delivered_msgs.size() == 0) && (msgs.size() == 0)) {
754: lowest_seen=0;
755: return;
756: }
757: */
758:
759: // The lowest seqno is the first seqno of the delivered messages
760: if (delivered_msgs.size() > 0) {
761: try {
762: lowest_seqno = (Long) delivered_msgs.firstKey();
763: if (lowest_seqno != null)
764: lowest_seen = lowest_seqno.longValue();
765: } catch (NoSuchElementException ex) {
766: }
767: }
768: // If no elements in delivered messages (e.g. due to message garbage collection), use the received messages
769: else {
770: if (received_msgs.size() > 0) {
771: try {
772: lowest_seqno = (Long) received_msgs.firstKey();
773: if (received_msgs.get(lowest_seqno) != null) { // only set lowest_seen if we *have* a msg
774: lowest_seen = lowest_seqno.longValue();
775: }
776: } catch (NoSuchElementException ex) {
777: }
778: }
779: }
780: }
781:
782: /**
783: * Find the highest seqno that is deliverable or was actually delivered.
784: * Returns seqno-1 if there are no messages in the queues (the first
785: * message to be expected is always seqno).
786: */
787: // private void updateHighestSeen() {
788: // long ret=0;
789: // Map.Entry entry=null;
790: //
791: // // If both delivered and received messages are empty, let the highest
792: // // seen seqno be the one *before* the one which is expected to be
793: // // received next by the NakReceiverWindow (head-1)
794: //
795: // // changed by bela (April 19 2004): we don't change the value if received and delivered msgs are empty
796: // /*if((delivered_msgs.size() == 0) && (msgs.size() == 0)) {
797: // highest_seen=0;
798: // return;
799: // }*/
800: //
801: //
802: // // The highest seqno is the last of the delivered messages, to start with,
803: // // or again the one before the first seqno expected (if no delivered
804: // // msgs). Then iterate through the received messages, and find the highest seqno *before* a gap
805: // Long highest_seqno=null;
806: // if(delivered_msgs.size() > 0) {
807: // try {
808: // highest_seqno=(Long)delivered_msgs.lastKey();
809: // ret=highest_seqno.longValue();
810: // }
811: // catch(NoSuchElementException ex) {
812: // }
813: // }
814: // else {
815: // ret=Math.max(head - 1, 0);
816: // }
817: //
818: // // Now check the received msgs head to tail. if there is an entry
819: // // with a non-null msg, increment ret until we find an entry with
820: // // a null msg
821: // for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) {
822: // entry=(Map.Entry)it.next();
823: // if(entry.getValue() != null)
824: // ret=((Long)entry.getKey()).longValue();
825: // else
826: // break;
827: // }
828: // highest_seen=Math.max(ret, 0);
829: // }
830:
831: /**
832: * Reset the Nak window. Should be called from within a writeLock() context.
833: * <p>
834: * i. Delete all received entries<br>
835: * ii. Delete alll delivered entries<br>
836: * iii. Reset all indices (head, tail, etc.)<br>
837: */
838: private void _reset() {
839: received_msgs.clear();
840: delivered_msgs.clear();
841: head = 0;
842: tail = 0;
843: lowest_seen = 0;
844: highest_seen = 0;
845: }
846: /* --------------------------- End of Private Methods ----------------------------------- */
847:
848: }
|