001: // $Id: AckSenderWindow.java,v 1.19 2006/02/08 08:47:43 belaban Exp $
002:
003: package org.jgroups.stack;
004:
005: import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
006: import org.apache.commons.logging.Log;
007: import org.apache.commons.logging.LogFactory;
008: import org.jgroups.Address;
009: import org.jgroups.Message;
010: import org.jgroups.util.TimeScheduler;
011: import org.jgroups.util.Util;
012:
013: import java.util.Map;
014: import java.util.TreeSet;
015:
016: /**
017: * ACK-based sliding window for a sender. Messages are added to the window keyed by seqno
018: * When an ACK is received, the corresponding message is removed. The Retransmitter
019: * continously iterates over the entries in the hashmap, retransmitting messages based on their
020: * creation time and an (increasing) timeout. When there are no more messages in the retransmission
021: * table left, the thread terminates. It will be re-activated when a new entry is added to the
022: * retransmission table.
023: * @author Bela Ban
024: */
025: public class AckSenderWindow implements Retransmitter.RetransmitCommand {
026: RetransmitCommand retransmit_command = null; // called to request XMIT of msg
027: final Map msgs = new ConcurrentReaderHashMap(); // keys: seqnos (Long), values: Messages
028: long[] interval = new long[] { 400, 800, 1200, 1600 };
029: final Retransmitter retransmitter;
030: static final Log log = LogFactory.getLog(AckSenderWindow.class);
031:
032: public interface RetransmitCommand {
033: void retransmit(long seqno, Message msg);
034: }
035:
036: /**
037: * Creates a new instance. Thre retransmission thread has to be started separately with
038: * <code>start()</code>.
039: * @param com If not null, its method <code>retransmit()</code> will be called when a message
040: * needs to be retransmitted (called by the Retransmitter).
041: */
042: public AckSenderWindow(RetransmitCommand com) {
043: retransmit_command = com;
044: retransmitter = new Retransmitter(null, this );
045: retransmitter.setRetransmitTimeouts(interval);
046: }
047:
048: public AckSenderWindow(RetransmitCommand com, long[] interval) {
049: retransmit_command = com;
050: this .interval = interval;
051: retransmitter = new Retransmitter(null, this );
052: retransmitter.setRetransmitTimeouts(interval);
053: }
054:
055: public AckSenderWindow(RetransmitCommand com, long[] interval,
056: TimeScheduler sched) {
057: retransmit_command = com;
058: this .interval = interval;
059: retransmitter = new Retransmitter(null, this , sched);
060: retransmitter.setRetransmitTimeouts(interval);
061: }
062:
063: public AckSenderWindow(RetransmitCommand com, long[] interval,
064: TimeScheduler sched, Address sender) {
065: retransmit_command = com;
066: this .interval = interval;
067: retransmitter = new Retransmitter(sender, this , sched);
068: retransmitter.setRetransmitTimeouts(interval);
069: }
070:
071: public void reset() {
072: msgs.clear();
073:
074: // moved out of sync scope: Retransmitter.reset()/add()/remove() are sync'ed anyway
075: // Bela Jan 15 2003
076: retransmitter.reset();
077: }
078:
079: /**
080: * Adds a new message to the retransmission table. If the message won't have received an ack within
081: * a certain time frame, the retransmission thread will retransmit the message to the receiver. If
082: * a sliding window protocol is used, we only add up to <code>window_size</code> messages. If the table is
083: * full, we add all new messages to a queue. Those will only be added once the table drains below a certain
084: * threshold (<code>min_threshold</code>)
085: */
086: public void add(long seqno, Message msg) {
087: Long tmp = new Long(seqno);
088: synchronized (msgs) { // the contains() and put() should be atomic
089: if (!msgs.containsKey(tmp))
090: msgs.put(tmp, msg);
091: }
092: retransmitter.add(seqno, seqno);
093: }
094:
095: /**
096: * Removes the message from <code>msgs</code>, removing them also from retransmission. If
097: * sliding window protocol is used, and was queueing, check whether we can resume adding elements.
098: * Add all elements. If this goes above window_size, stop adding and back to queueing. Else
099: * set queueing to false.
100: */
101: public void ack(long seqno) {
102: msgs.remove(new Long(seqno));
103: retransmitter.remove(seqno);
104: }
105:
106: public int size() {
107: return msgs.size();
108: }
109:
110: public String toString() {
111: StringBuffer sb = new StringBuffer();
112: sb.append(msgs.size()).append(" msgs (").append(
113: retransmitter.size()).append(" to retransmit): ");
114: TreeSet keys = new TreeSet(msgs.keySet());
115: if (keys.size() > 0)
116: sb.append(keys.first()).append(" - ").append(keys.last());
117: else
118: sb.append("[]");
119: return sb.toString();
120: }
121:
122: public String printDetails() {
123: StringBuffer sb = new StringBuffer();
124: sb.append(msgs.size()).append(" msgs (").append(
125: retransmitter.size()).append(" to retransmit): ")
126: .append(new TreeSet(msgs.keySet()));
127: return sb.toString();
128: }
129:
130: /* -------------------------------- Retransmitter.RetransmitCommand interface ------------------- */
131: public void retransmit(long first_seqno, long last_seqno,
132: Address sender) {
133: Message msg;
134:
135: if (retransmit_command != null) {
136: if (log.isTraceEnabled())
137: log.trace(new StringBuffer("retransmitting messages ")
138: .append(first_seqno).append(" - ").append(
139: last_seqno).append(" from ").append(
140: sender));
141: for (long i = first_seqno; i <= last_seqno; i++) {
142: if ((msg = (Message) msgs.get(new Long(i))) != null) { // find the message to retransmit
143: retransmit_command.retransmit(i, msg);
144: }
145: }
146: }
147: }
148:
149: /* ----------------------------- End of Retransmitter.RetransmitCommand interface ---------------- */
150:
151: /* ---------------------------------- Private methods --------------------------------------- */
152:
153: /* ------------------------------ End of Private methods ------------------------------------ */
154:
155: /** Struct used to store message alongside with its seqno in the message queue */
156: static class Entry {
157: final long seqno;
158: final Message msg;
159:
160: Entry(long seqno, Message msg) {
161: this .seqno = seqno;
162: this .msg = msg;
163: }
164: }
165:
166: static class Dummy implements RetransmitCommand {
167: static final long last_xmit_req = 0;
168: long curr_time;
169:
170: public void retransmit(long seqno, Message msg) {
171:
172: if (log.isDebugEnabled())
173: log.debug("seqno=" + seqno);
174:
175: curr_time = System.currentTimeMillis();
176: }
177: }
178:
179: public static void main(String[] args) {
180: long[] xmit_timeouts = { 1000, 2000, 3000, 4000 };
181: AckSenderWindow win = new AckSenderWindow(new Dummy(),
182: xmit_timeouts);
183:
184: final int NUM = 1000;
185:
186: for (int i = 1; i < NUM; i++)
187: win.add(i, new Message());
188:
189: System.out.println(win);
190: Util.sleep(5000);
191:
192: for (int i = 1; i < NUM; i++) {
193: if (i % 2 == 0) // ack the even seqnos
194: win.ack(i);
195: }
196:
197: System.out.println(win);
198: Util.sleep(4000);
199:
200: for (int i = 1; i < NUM; i++) {
201: if (i % 2 != 0) // ack the odd seqnos
202: win.ack(i);
203: }
204: System.out.println(win);
205:
206: win.add(3, new Message());
207: win.add(5, new Message());
208: win.add(4, new Message());
209: win.add(8, new Message());
210: win.add(9, new Message());
211: win.add(6, new Message());
212: win.add(7, new Message());
213: win.add(3, new Message());
214: System.out.println(win);
215:
216: try {
217: Thread.sleep(5000);
218: win.ack(5);
219: System.out.println("ack(5)");
220: win.ack(4);
221: System.out.println("ack(4)");
222: win.ack(6);
223: System.out.println("ack(6)");
224: win.ack(7);
225: System.out.println("ack(7)");
226: win.ack(8);
227: System.out.println("ack(8)");
228: win.ack(6);
229: System.out.println("ack(6)");
230: win.ack(9);
231: System.out.println("ack(9)");
232: System.out.println(win);
233:
234: Thread.sleep(5000);
235: win.ack(3);
236: System.out.println("ack(3)");
237: System.out.println(win);
238:
239: Thread.sleep(3000);
240: win.add(10, new Message());
241: win.add(11, new Message());
242: System.out.println(win);
243: Thread.sleep(3000);
244: win.ack(10);
245: System.out.println("ack(10)");
246: win.ack(11);
247: System.out.println("ack(11)");
248: System.out.println(win);
249:
250: win.add(12, new Message());
251: win.add(13, new Message());
252: win.add(14, new Message());
253: win.add(15, new Message());
254: win.add(16, new Message());
255: System.out.println(win);
256:
257: Util.sleep(1000);
258: win.ack(12);
259: System.out.println("ack(12)");
260: win.ack(13);
261: System.out.println("ack(13)");
262:
263: win.ack(15);
264: System.out.println("ack(15)");
265: System.out.println(win);
266:
267: Util.sleep(5000);
268: win.ack(16);
269: System.out.println("ack(16)");
270: System.out.println(win);
271:
272: Util.sleep(1000);
273:
274: win.ack(14);
275: System.out.println("ack(14)");
276: System.out.println(win);
277: } catch (Exception e) {
278: log.error(e);
279: }
280: }
281:
282: }
|