001: // $Id: Retransmitter.java,v 1.10.10.3 2007/04/27 06:26:38 belaban Exp $
002:
003: package org.jgroups.stack;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.Address;
008: import org.jgroups.util.TimeScheduler;
009: import org.jgroups.util.Util;
010:
011: import java.util.*;
012:
013: /**
014: * Maintains a pool of sequence numbers of messages that need to be retransmitted. Messages
015: * are aged and retransmission requests sent according to age (linear backoff used). If a
016: * TimeScheduler instance is given to the constructor, it will be used, otherwise Reransmitter
017: * will create its own. The retransmit timeouts have to be set first thing after creating an instance.
018: * The <code>add()</code> method adds a range of sequence numbers of messages to be retransmitted. The
019: * <code>remove()</code> method removes a sequence number again, cancelling retransmission requests for it.
020: * Whenever a message needs to be retransmitted, the <code>RetransmitCommand.retransmit()</code> method is called.
021: * It can be used e.g. by an ack-based scheme (e.g. AckSenderWindow) to retransmit a message to the receiver, or
022: * by a nak-based scheme to send a retransmission request to the sender of the missing message.
023: *
024: * @author John Giorgiadis
025: * @author Bela Ban
026: * @version $Revision: 1.10.10.3 $
027: */
028: public class Retransmitter {
029:
030: private static final long SEC = 1000;
031: /** Default retransmit intervals (ms) - exponential approx. */
032: private static long[] RETRANSMIT_TIMEOUTS = { 2 * SEC, 3 * SEC,
033: 5 * SEC, 8 * SEC };
034: /** Default retransmit thread suspend timeout (ms) */
035: private static final long SUSPEND_TIMEOUT = 2000;
036:
037: private Address sender = null;
038: private final LinkedList msgs = new LinkedList(); // List<Entry> of elements to be retransmitted
039: private RetransmitCommand cmd = null;
040: private boolean timer_owned;
041: private TimeScheduler timer = null;
042: protected static final Log log = LogFactory
043: .getLog(Retransmitter.class);
044:
045: /** Retransmit command (see Gamma et al.) used to retrieve missing messages */
046: public interface RetransmitCommand {
047: /**
048: * Get the missing messages between sequence numbers
049: * <code>first_seqno</code> and <code>last_seqno</code>. This can either be done by sending a
050: * retransmit message to destination <code>sender</code> (nak-based scheme), or by
051: * retransmitting the missing message(s) to <code>sender</code> (ack-based scheme).
052: * @param first_seqno The sequence number of the first missing message
053: * @param last_seqno The sequence number of the last missing message
054: * @param sender The destination of the member to which the retransmit request will be sent
055: * (nak-based scheme), or to which the message will be retransmitted (ack-based scheme).
056: */
057: void retransmit(long first_seqno, long last_seqno,
058: Address sender);
059: }
060:
061: /**
062: * Create a new Retransmitter associated with the given sender address
063: * @param sender the address from which retransmissions are expected or to which retransmissions are sent
064: * @param cmd the retransmission callback reference
065: * @param sched retransmissions scheduler
066: */
067: public Retransmitter(Address sender, RetransmitCommand cmd,
068: TimeScheduler sched) {
069: init(sender, cmd, sched, false);
070: }
071:
072: /**
073: * Create a new Retransmitter associated with the given sender address
074: * @param sender the address from which retransmissions are expected or to which retransmissions are sent
075: * @param cmd the retransmission callback reference
076: */
077: public Retransmitter(Address sender, RetransmitCommand cmd) {
078: init(sender, cmd, new TimeScheduler(), true);
079: }
080:
081: public void setRetransmitTimeouts(long[] timeouts) {
082: if (timeouts != null)
083: RETRANSMIT_TIMEOUTS = timeouts;
084: }
085:
086: /**
087: * Add the given range [first_seqno, last_seqno] in the list of
088: * entries eligible for retransmission. If first_seqno > last_seqno,
089: * then the range [last_seqno, first_seqno] is added instead
090: * <p>
091: * If retransmitter thread is suspended, wake it up
092: */
093: public void add(long first_seqno, long last_seqno) {
094: Entry e;
095:
096: if (first_seqno > last_seqno) {
097: long tmp = first_seqno;
098: first_seqno = last_seqno;
099: last_seqno = tmp;
100: }
101: synchronized (msgs) {
102: e = new Entry(first_seqno, last_seqno, RETRANSMIT_TIMEOUTS);
103: msgs.add(e);
104: timer.add(e);
105: }
106: }
107:
108: /**
109: * Remove the given sequence number from the list of seqnos eligible
110: * for retransmission. If there are no more seqno intervals in the
111: * respective entry, cancel the entry from the retransmission
112: * scheduler and remove it from the pending entries
113: */
114: public void remove(long seqno) {
115: Entry e;
116:
117: synchronized (msgs) {
118: for (ListIterator it = msgs.listIterator(); it.hasNext();) {
119: e = (Entry) it.next();
120: if (seqno < e.low || seqno > e.high)
121: continue;
122: e.remove(seqno);
123: if (e.low > e.high) {
124: e.cancel();
125: it.remove();
126: }
127: break;
128: }
129: }
130: }
131:
132: /**
133: * Reset the retransmitter: clear all msgs and cancel all the
134: * respective tasks
135: */
136: public void reset() {
137: Entry entry;
138:
139: synchronized (msgs) {
140: for (ListIterator it = msgs.listIterator(); it.hasNext();) {
141: entry = (Entry) it.next();
142: entry.cancel();
143: }
144: msgs.clear();
145: }
146: }
147:
148: /**
149: * Stop the rentransmition and clear all pending msgs.
150: * <p>
151: * If this retransmitter has been provided an externally managed
152: * scheduler, then just clear all msgs and the associated tasks, else
153: * stop the scheduler. In this case the method blocks until the
154: * scheduler's thread is dead. Only the owner of the scheduler should
155: * stop it.
156: */
157: public void stop() {
158: Entry entry;
159:
160: // i. If retransmitter is owned, stop it else cancel all tasks
161: // ii. Clear all pending msgs
162: synchronized (msgs) {
163: if (timer_owned) {
164: try {
165: timer.stop();
166: } catch (InterruptedException ex) {
167: if (log.isErrorEnabled())
168: log.error("failed stopping retransmitter", ex);
169: }
170: } else {
171: for (ListIterator it = msgs.listIterator(); it
172: .hasNext();) {
173: entry = (Entry) it.next();
174: entry.cancel();
175: }
176: }
177: msgs.clear();
178: }
179: }
180:
181: public String toString() {
182: synchronized (msgs) {
183: int size = size();
184: StringBuffer sb = new StringBuffer();
185: sb.append(size).append(" messages to retransmit: ").append(
186: msgs);
187: return sb.toString();
188: }
189: }
190:
191: public int size() {
192: int size = 0;
193: Entry entry;
194: synchronized (msgs) {
195: for (Iterator it = msgs.iterator(); it.hasNext();) {
196: entry = (Retransmitter.Entry) it.next();
197: size += entry.size();
198: }
199: }
200: return size;
201: }
202:
203: /* ------------------------------- Private Methods -------------------------------------- */
204:
205: /**
206: * Init this object
207: *
208: * @param sender the address from which retransmissions are expected
209: * @param cmd the retransmission callback reference
210: * @param sched retransmissions scheduler
211: * @param sched_owned whether the scheduler parameter is owned by this
212: * object or is externally provided
213: */
214: private void init(Address sender, RetransmitCommand cmd,
215: TimeScheduler sched, boolean sched_owned) {
216: this .sender = sender;
217: this .cmd = cmd;
218: timer_owned = sched_owned;
219: timer = sched;
220: }
221:
222: /* ---------------------------- End of Private Methods ------------------------------------ */
223:
224: /**
225: * The retransmit task executed by the scheduler in regular intervals
226: */
227: private static abstract class Task implements TimeScheduler.Task {
228: private final Interval intervals;
229: private boolean cancelled;
230:
231: protected Task(long[] intervals) {
232: this .intervals = new Interval(intervals);
233: this .cancelled = false;
234: }
235:
236: public long nextInterval() {
237: return (intervals.next());
238: }
239:
240: public boolean cancelled() {
241: return (cancelled);
242: }
243:
244: public void cancel() {
245: cancelled = true;
246: }
247: }
248:
249: /**
250: * The entry associated with an initial group of missing messages
251: * with contiguous sequence numbers and with all its subgroups.<br>
252: * E.g.
253: * - initial group: [5-34]
254: * - msg 12 is acknowledged, now the groups are: [5-11], [13-34]
255: * <p>
256: * Groups are stored in a list as long[2] arrays of the each group's
257: * bounds. For speed and convenience, the lowest & highest bounds of
258: * all the groups in this entry are also stored separately
259: */
260: private class Entry extends Task {
261: private long low;
262: private long high;
263: /** List<long[2]> of ranges to be retransmitted */
264: final java.util.List list = new ArrayList();
265:
266: public Entry(long low, long high, long[] intervals) {
267: super (intervals);
268: this .low = low;
269: this .high = high;
270: list.add(new long[] { low, high });
271: }
272:
273: /**
274: * Remove the given seqno and resize or partition groups as
275: * necessary. The algorithm is as follows:<br>
276: * i. Find the group with low <= seqno <= high
277: * ii. If seqno == low,
278: * a. if low == high, then remove the group
279: * Adjust global low. If global low was pointing to the group
280: * deleted in the previous step, set it to point to the next group.
281: * If there is no next group, set global low to be higher than
282: * global high. This way the entry is invalidated and will be removed
283: * all together from the pending msgs and the task scheduler
284: * iii. If seqno == high, adjust high, adjust global high if this is
285: * the group at the tail of the list
286: * iv. Else low < seqno < high, break [low,high] into [low,seqno-1]
287: * and [seqno+1,high]
288: *
289: * @param seqno the sequence number to remove
290: */
291: public void remove(long seqno) {
292: int i;
293: long[] bounds = null, newBounds;
294:
295: synchronized (list) {
296: for (i = 0; i < list.size(); ++i) {
297: bounds = (long[]) list.get(i);
298: if (seqno < bounds[0] || seqno > bounds[1])
299: continue;
300: break;
301: }
302: if (i == list.size())
303: return;
304:
305: if (seqno == bounds[0]) {
306: if (bounds[0] == bounds[1])
307: list.remove(i);
308: else
309: bounds[0]++;
310: if (i == 0)
311: low = list.isEmpty() ? high + 1
312: : ((long[]) list.get(i))[0];
313: } else if (seqno == bounds[1]) {
314: bounds[1]--;
315: if (i == list.size() - 1)
316: high = ((long[]) list.get(i))[1];
317: } else {
318: newBounds = new long[2];
319: newBounds[0] = seqno + 1;
320: newBounds[1] = bounds[1];
321: bounds[1] = seqno - 1;
322: list.add(i + 1, newBounds);
323: }
324: }
325: }
326:
327: /**
328: * Retransmission task:<br>
329: * For each interval, call the retransmission callback command
330: */
331: public void run() {
332: long[] bounds;
333: List copy;
334:
335: synchronized (list) {
336: copy = new LinkedList(list);
337: }
338:
339: for (Iterator it = copy.iterator(); it.hasNext();) {
340: bounds = (long[]) it.next();
341: try {
342: cmd.retransmit(bounds[0], bounds[1], sender);
343: } catch (Throwable t) {
344: log.error("failure asking " + cmd
345: + " for retransmission", t);
346: }
347: }
348: }
349:
350: int size() {
351: int size = 0;
352: long diff;
353: long[] tmp;
354: synchronized (list) {
355: for (Iterator it = list.iterator(); it.hasNext();) {
356: tmp = (long[]) it.next();
357: diff = tmp[1] - tmp[0] + 1;
358: size += diff;
359: }
360: }
361:
362: return size;
363: }
364:
365: public String toString() {
366: StringBuffer sb = new StringBuffer();
367: synchronized (list) {
368: long[] range;
369: boolean first = true;
370: for (Iterator it = list.iterator(); it.hasNext();) {
371: range = (long[]) it.next();
372: if (first) {
373: first = false;
374: } else {
375: sb.append(", ");
376: }
377: sb.append(range[0]).append('-').append(range[1]);
378: }
379: }
380:
381: return sb.toString();
382: }
383:
384: }
385:
386: public static void main(String[] args) {
387: Retransmitter xmitter;
388: Address sender;
389:
390: try {
391: sender = new org.jgroups.stack.IpAddress("localhost", 5555);
392: xmitter = new Retransmitter(sender, new MyXmitter());
393: xmitter.setRetransmitTimeouts(new long[] { 1000, 2000,
394: 4000, 8000 });
395:
396: xmitter.add(1, 10);
397: System.out.println("retransmitter: " + xmitter);
398: xmitter.remove(1);
399: System.out.println("retransmitter: " + xmitter);
400: xmitter.remove(2);
401: System.out.println("retransmitter: " + xmitter);
402: xmitter.remove(4);
403: System.out.println("retransmitter: " + xmitter);
404:
405: Util.sleep(3000);
406: xmitter.remove(3);
407: System.out.println("retransmitter: " + xmitter);
408:
409: Util.sleep(1000);
410: xmitter.remove(10);
411: System.out.println("retransmitter: " + xmitter);
412: xmitter.remove(8);
413: System.out.println("retransmitter: " + xmitter);
414: xmitter.remove(6);
415: System.out.println("retransmitter: " + xmitter);
416: xmitter.remove(7);
417: System.out.println("retransmitter: " + xmitter);
418: xmitter.remove(9);
419: System.out.println("retransmitter: " + xmitter);
420: xmitter.remove(5);
421: System.out.println("retransmitter: " + xmitter);
422: } catch (Exception e) {
423: log.error(e);
424: }
425: }
426:
427: static class MyXmitter implements Retransmitter.RetransmitCommand {
428:
429: public void retransmit(long first_seqno, long last_seqno,
430: Address sender) {
431: System.out.println("-- " + new java.util.Date()
432: + ": retransmit(" + first_seqno + ", " + last_seqno
433: + ", " + sender + ')');
434: }
435: }
436:
437: static void sleep(long timeout) {
438: Util.sleep(timeout);
439: }
440:
441: }
|