001: // $Id: AckMcastSenderWindow.java,v 1.10.6.1 2007/04/27 06:26:38 belaban Exp $
002:
003: package org.jgroups.stack;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.Address;
008: import org.jgroups.Message;
009: import org.jgroups.util.TimeScheduler;
010:
011: import java.io.PrintWriter;
012: import java.io.StringWriter;
013: import java.util.*;
014:
015: /**
016: * Keeps track of ACKs from receivers for each message. When a new message is
017: * sent, it is tagged with a sequence number and the receiver set (set of
018: * members to which the message is sent) and added to a hashtable
019: * (key = sequence number, val = message + receiver set). Each incoming ACK
020: * is noted and when all ACKs for a specific sequence number haven been
021: * received, the corresponding entry is removed from the hashtable. A
022: * retransmission thread periodically re-sends the message point-to-point to
023: * all receivers from which no ACKs have been received yet. A view change or
024: * suspect message causes the corresponding non-existing receivers to be
025: * removed from the hashtable.
026: * <p>
027: * This class may need flow control in order to avoid needless
028: * retransmissions because of timeouts.
029: *
030: * @author Bela Ban June 9 1999
031: * @author John Georgiadis May 8 2001
032: * @version $Revision: 1.10.6.1 $
033: */
034: public class AckMcastSenderWindow {
035: /**
036: * Called by retransmitter thread whenever a message needs to be re-sent
037: * to a destination. <code>dest</code> has to be set in the
038: * <code>dst</code> field of <code>msg</code>, as the latter was sent
039: * multicast, but now we are sending a unicast message. Message has to be
040: * copied before sending it (as headers will be appended and therefore
041: * the message changed!).
042: */
043: public interface RetransmitCommand {
044: /**
045: * Retranmit the given msg
046: *
047: * @param seqno the sequence number associated with the message
048: * @param msg the msg to retransmit (it should be a copy!)
049: * @param dest the msg destination
050: */
051: void retransmit(long seqno, Message msg, Address dest);
052: }
053:
054: /**
055: * The retransmit task executed by the scheduler in regular intervals
056: */
057: private static abstract class Task implements TimeScheduler.Task {
058: private final Interval intervals;
059: private boolean cancelled;
060:
061: protected Task(long[] intervals) {
062: this .intervals = new Interval(intervals);
063: this .cancelled = false;
064: }
065:
066: public long nextInterval() {
067: return (intervals.next());
068: }
069:
070: public void cancel() {
071: cancelled = true;
072: }
073:
074: public boolean cancelled() {
075: return (cancelled);
076: }
077: }
078:
079: /**
080: * The entry associated with a pending msg
081: */
082: private class Entry extends Task {
083: /** The msg sequence number */
084: public final long seqno;
085: /** The msg to retransmit */
086: public Message msg = null;
087: /** destination addr -> boolean (true = received, false = not) */
088: public final Hashtable senders = new Hashtable();
089: /** How many destinations have received the msg */
090: public int num_received = 0;
091:
092: public Entry(long seqno, Message msg, Vector dests,
093: long[] intervals) {
094: super (intervals);
095: this .seqno = seqno;
096: this .msg = msg;
097: for (int i = 0; i < dests.size(); i++)
098: senders.put(dests.elementAt(i), Boolean.FALSE);
099: }
100:
101: boolean allReceived() {
102: return (num_received >= senders.size());
103: }
104:
105: /** Retransmit this entry */
106: public void run() {
107: _retransmit(this );
108: }
109:
110: public String toString() {
111: StringBuffer buff = new StringBuffer();
112: buff.append("num_received = ").append(num_received).append(
113: ", received msgs = ").append(senders);
114: return (buff.toString());
115: }
116: }
117:
118: private static final long SEC = 1000;
119: /** Default retransmit intervals (ms) - exponential approx. */
120: private static final long[] RETRANSMIT_TIMEOUTS = { 2 * SEC,
121: 3 * SEC, 5 * SEC, 8 * SEC };
122: /** Default retransmit thread suspend timeout (ms) */
123: private static final long SUSPEND_TIMEOUT = 2000;
124:
125: protected static final Log log = LogFactory
126: .getLog(AckMcastSenderWindow.class);
127:
128: // Msg tables related
129: /** Table of pending msgs: seqno -> Entry */
130: private final Hashtable msgs = new Hashtable();
131:
132: /** List of recently suspected members. Used to cease retransmission to suspected members */
133: private final LinkedList suspects = new LinkedList();
134:
135: /** Max number in suspects list */
136: private static final int max_suspects = 20;
137:
138: /**
139: * List of acknowledged msgs since the last call to
140: * <code>getStableMessages()</code>
141: */
142: private final Vector stable_msgs = new Vector();
143: /** Whether a call to <code>waitUntilAcksReceived()</code> is still active */
144: private boolean waiting = false;
145:
146: // Retransmission thread related
147: /** Whether retransmitter is externally provided or owned by this object */
148: private boolean retransmitter_owned;
149: /** The retransmission scheduler */
150: private TimeScheduler retransmitter = null;
151: /** Retransmission intervals */
152: private long[] retransmit_intervals;
153: /** The callback object for retransmission */
154: private RetransmitCommand cmd = null;
155:
156: /**
157: * Convert exception stack trace to string
158: */
159: private static String _toString(Throwable ex) {
160: StringWriter sw = new StringWriter();
161: PrintWriter pw = new PrintWriter(sw);
162: ex.printStackTrace(pw);
163: return (sw.toString());
164: }
165:
166: /**
167: * @param entry the record associated with the msg to retransmit. It
168: * contains the list of receivers that haven't yet ack reception
169: */
170: private void _retransmit(Entry entry) {
171: Address sender;
172: boolean received;
173:
174: synchronized (entry) {
175: for (Enumeration e = entry.senders.keys(); e
176: .hasMoreElements();) {
177: sender = (Address) e.nextElement();
178: received = ((Boolean) entry.senders.get(sender))
179: .booleanValue();
180: if (!received) {
181: if (suspects.contains(sender)) {
182:
183: if (log.isWarnEnabled())
184: log
185: .warn("removing "
186: + sender
187: + " from retransmit list as it is in the suspect list");
188: remove(sender);
189: continue;
190: }
191:
192: if (log.isInfoEnabled())
193: log.info("--> retransmitting msg #"
194: + entry.seqno + " to " + sender);
195: cmd.retransmit(entry.seqno, entry.msg.copy(),
196: sender);
197: }
198: }
199: }
200: }
201:
202: /**
203: * Setup this object's state
204: *
205: * @param cmd the callback object for retranmissions
206: * @param retransmit_intervals the interval between two consecutive
207: * retransmission attempts
208: * @param sched the external scheduler to use to schedule retransmissions
209: * @param sched_owned if true, the scheduler is owned by this object and
210: * can be started/stopped/destroyed. If false, the scheduler is shared
211: * among multiple objects and start()/stop() should not be called from
212: * within this object
213: *
214: * @throws IllegalArgumentException if <code>cmd</code> is null
215: */
216: private void init(RetransmitCommand cmd,
217: long[] retransmit_intervals, TimeScheduler sched,
218: boolean sched_owned) {
219: if (cmd == null) {
220: if (log.isErrorEnabled())
221: log.error("command is null. Cannot retransmit "
222: + "messages !");
223: throw new IllegalArgumentException("cmd");
224: }
225:
226: retransmitter_owned = sched_owned;
227: retransmitter = sched;
228: this .retransmit_intervals = retransmit_intervals;
229: this .cmd = cmd;
230:
231: start();
232: }
233:
234: /**
235: * Create and <b>start</b> the retransmitter
236: *
237: * @param cmd the callback object for retranmissions
238: * @param retransmit_intervals the interval between two consecutive
239: * retransmission attempts
240: * @param sched the external scheduler to use to schedule retransmissions
241: *
242: * @throws IllegalArgumentException if <code>cmd</code> is null
243: */
244: public AckMcastSenderWindow(RetransmitCommand cmd,
245: long[] retransmit_intervals, TimeScheduler sched) {
246: init(cmd, retransmit_intervals, sched, false);
247: }
248:
249: /**
250: * Create and <b>start</b> the retransmitter
251: *
252: * @param cmd the callback object for retranmissions
253: * @param sched the external scheduler to use to schedule retransmissions
254: *
255: * @throws IllegalArgumentException if <code>cmd</code> is null
256: */
257: public AckMcastSenderWindow(RetransmitCommand cmd,
258: TimeScheduler sched) {
259: init(cmd, RETRANSMIT_TIMEOUTS, sched, false);
260: }
261:
262: /**
263: * Create and <b>start</b> the retransmitter
264: *
265: * @param cmd the callback object for retranmissions
266: * @param retransmit_intervals the interval between two consecutive
267: * retransmission attempts
268: *
269: * @throws IllegalArgumentException if <code>cmd</code> is null
270: */
271: public AckMcastSenderWindow(RetransmitCommand cmd,
272: long[] retransmit_intervals) {
273: init(cmd, retransmit_intervals, new TimeScheduler(), true);
274: }
275:
276: /**
277: * Create and <b>start</b> the retransmitter
278: *
279: * @param cmd the callback object for retranmissions
280: *
281: * @throws IllegalArgumentException if <code>cmd</code> is null
282: */
283: public AckMcastSenderWindow(RetransmitCommand cmd) {
284: this (cmd, RETRANSMIT_TIMEOUTS);
285: }
286:
287: /**
288: * Adds a new message to the hash table.
289: *
290: * @param seqno The sequence number associated with the message
291: * @param msg The message (should be a copy!)
292: * @param receivers The set of addresses to which the message was sent
293: * and from which consequently an ACK is expected
294: */
295: public void add(long seqno, Message msg, Vector receivers) {
296: Entry e;
297:
298: if (waiting)
299: return;
300: if (receivers.size() == 0)
301: return;
302:
303: synchronized (msgs) {
304: if (msgs.get(new Long(seqno)) != null)
305: return;
306: e = new Entry(seqno, msg, receivers, retransmit_intervals);
307: msgs.put(new Long(seqno), e);
308: retransmitter.add(e);
309: }
310: }
311:
312: /**
313: * An ACK has been received from <code>sender</code>. Tag the sender in
314: * the hash table as 'received'. If all ACKs have been received, remove
315: * the entry all together.
316: *
317: * @param seqno The sequence number of the message for which an ACK has
318: * been received.
319: * @param sender The sender which sent the ACK
320: */
321: public void ack(long seqno, Address sender) {
322: Entry entry;
323: Boolean received;
324:
325: synchronized (msgs) {
326: entry = (Entry) msgs.get(new Long(seqno));
327: if (entry == null)
328: return;
329:
330: synchronized (entry) {
331: received = (Boolean) entry.senders.get(sender);
332: if (received == null || received.booleanValue())
333: return;
334:
335: // If not yet received
336: entry.senders.put(sender, Boolean.TRUE);
337: entry.num_received++;
338: if (!entry.allReceived())
339: return;
340: }
341:
342: synchronized (stable_msgs) {
343: entry.cancel();
344: msgs.remove(new Long(seqno));
345: stable_msgs.add(new Long(seqno));
346: }
347: // wake up waitUntilAllAcksReceived() method
348: msgs.notifyAll();
349: }
350: }
351:
352: /**
353: * Remove <code>obj</code> from all receiver sets and wake up
354: * retransmission thread.
355: *
356: * @param obj the sender to remove
357: */
358: public void remove(Address obj) {
359: Long key;
360: Entry entry;
361:
362: synchronized (msgs) {
363: for (Enumeration e = msgs.keys(); e.hasMoreElements();) {
364: key = (Long) e.nextElement();
365: entry = (Entry) msgs.get(key);
366: synchronized (entry) {
367: //if (((Boolean)entry.senders.remove(obj)).booleanValue()) entry.num_received--;
368: //if (!entry.allReceived()) continue;
369: Boolean received = (Boolean) entry.senders
370: .remove(obj);
371: if (received == null)
372: continue; // suspected member not in entry.senders ?
373: if (received.booleanValue())
374: entry.num_received--;
375: if (!entry.allReceived())
376: continue;
377: }
378: synchronized (stable_msgs) {
379: entry.cancel();
380: msgs.remove(key);
381: stable_msgs.add(key);
382: }
383: // wake up waitUntilAllAcksReceived() method
384: msgs.notifyAll();
385: }
386: }
387: }
388:
389: /**
390: * Process with address <code>suspected</code> is suspected: remove it
391: * from all receiver sets. This means that no ACKs are expected from this
392: * process anymore.
393: *
394: * @param suspected The suspected process
395: */
396: public void suspect(Address suspected) {
397:
398: if (log.isInfoEnabled())
399: log.info("suspect is " + suspected);
400: remove(suspected);
401: suspects.add(suspected);
402: if (suspects.size() >= max_suspects)
403: suspects.removeFirst();
404: }
405:
406: /**
407: * @return a copy of stable messages, or null (if non available). Removes
408: * all stable messages afterwards
409: */
410: public Vector getStableMessages() {
411: Vector retval;
412:
413: synchronized (stable_msgs) {
414: retval = (stable_msgs.size() > 0) ? (Vector) stable_msgs
415: .clone() : null;
416: if (stable_msgs.size() > 0)
417: stable_msgs.clear();
418: }
419:
420: return (retval);
421: }
422:
423: public void clearStableMessages() {
424: synchronized (stable_msgs) {
425: stable_msgs.clear();
426: }
427: }
428:
429: /**
430: * @return the number of currently pending msgs
431: */
432: public long size() {
433: synchronized (msgs) {
434: return (msgs.size());
435: }
436: }
437:
438: /** Returns the number of members for a given entry for which acks have to be received */
439: public long getNumberOfResponsesExpected(long seqno) {
440: Entry entry = (Entry) msgs.get(new Long(seqno));
441: if (entry != null)
442: return entry.senders.size();
443: else
444: return -1;
445: }
446:
447: /** Returns the number of members for a given entry for which acks have been received */
448: public long getNumberOfResponsesReceived(long seqno) {
449: Entry entry = (Entry) msgs.get(new Long(seqno));
450: if (entry != null)
451: return entry.num_received;
452: else
453: return -1;
454: }
455:
456: /** Prints all members plus whether an ack has been received from those members for a given seqno */
457: public String printDetails(long seqno) {
458: Entry entry = (Entry) msgs.get(new Long(seqno));
459: if (entry != null)
460: return entry.toString();
461: else
462: return null;
463: }
464:
465: /**
466: * Waits until all outstanding messages have been ACKed by all receivers.
467: * Takes into account suspicions and view changes. Returns when there are
468: * no entries left in the hashtable. <b>While waiting, no entries can be
469: * added to the hashtable (they will be discarded).</b>
470: *
471: * @param timeout Miliseconds to wait. 0 means wait indefinitely.
472: */
473: public void waitUntilAllAcksReceived(long timeout) {
474: long time_to_wait, start_time, current_time;
475: Address suspect;
476:
477: // remove all suspected members from retransmission
478: for (Iterator it = suspects.iterator(); it.hasNext();) {
479: suspect = (Address) it.next();
480: remove(suspect);
481: }
482:
483: time_to_wait = timeout;
484: waiting = true;
485: if (timeout <= 0) {
486: synchronized (msgs) {
487: while (msgs.size() > 0)
488: try {
489: msgs.wait();
490: } catch (InterruptedException ex) {
491: }
492: }
493: } else {
494: start_time = System.currentTimeMillis();
495: synchronized (msgs) {
496: while (msgs.size() > 0) {
497: current_time = System.currentTimeMillis();
498: time_to_wait = timeout
499: - (current_time - start_time);
500: if (time_to_wait <= 0)
501: break;
502:
503: try {
504: msgs.wait(time_to_wait);
505: } catch (InterruptedException ex) {
506: if (log.isWarnEnabled())
507: log.warn(ex.toString());
508: }
509: }
510: }
511: }
512: waiting = false;
513: }
514:
515: /**
516: * Start the retransmitter. This has no effect, if the retransmitter
517: * was externally provided
518: */
519: public void start() {
520: if (retransmitter_owned)
521: retransmitter.start();
522: }
523:
524: /**
525: * Stop the rentransmition and clear all pending msgs.
526: * <p>
527: * If this retransmitter has been provided an externally managed
528: * scheduler, then just clear all msgs and the associated tasks, else
529: * stop the scheduler. In this case the method blocks until the
530: * scheduler's thread is dead. Only the owner of the scheduler should
531: * stop it.
532: */
533: public void stop() {
534: Entry entry;
535:
536: // i. If retransmitter is owned, stop it else cancel all tasks
537: // ii. Clear all pending msgs and notify anyone waiting
538: synchronized (msgs) {
539: if (retransmitter_owned) {
540: try {
541: retransmitter.stop();
542: } catch (InterruptedException ex) {
543: if (log.isErrorEnabled())
544: log.error(_toString(ex));
545: }
546: } else {
547: for (Enumeration e = msgs.elements(); e
548: .hasMoreElements();) {
549: entry = (Entry) e.nextElement();
550: entry.cancel();
551: }
552: }
553: msgs.clear();
554: // wake up waitUntilAllAcksReceived() method
555: msgs.notifyAll();
556: }
557: }
558:
559: /**
560: * Remove all pending msgs from the hashtable. Cancel all associated
561: * tasks in the retransmission scheduler
562: */
563: public void reset() {
564: Entry entry;
565:
566: if (waiting)
567: return;
568:
569: synchronized (msgs) {
570: for (Enumeration e = msgs.elements(); e.hasMoreElements();) {
571: entry = (Entry) e.nextElement();
572: entry.cancel();
573: }
574: msgs.clear();
575: msgs.notifyAll();
576: }
577: }
578:
579: public String toString() {
580: StringBuffer ret;
581: Entry entry;
582: Long key;
583:
584: ret = new StringBuffer();
585: synchronized (msgs) {
586: ret.append("msgs: (").append(msgs.size()).append(')');
587: for (Enumeration e = msgs.keys(); e.hasMoreElements();) {
588: key = (Long) e.nextElement();
589: entry = (Entry) msgs.get(key);
590: ret.append("key = ").append(key).append(", value = ")
591: .append(entry).append('\n');
592: }
593: synchronized (stable_msgs) {
594: ret.append("\nstable_msgs: ").append(stable_msgs);
595: }
596: }
597:
598: return (ret.toString());
599: }
600: }
|