Source Code Cross Referenced for Retransmitter.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » stack » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.stack 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: Retransmitter.java,v 1.10.10.3 2007/04/27 06:26:38 belaban Exp $
002:
003:        package org.jgroups.stack;
004:
005:        import org.apache.commons.logging.Log;
006:        import org.apache.commons.logging.LogFactory;
007:        import org.jgroups.Address;
008:        import org.jgroups.util.TimeScheduler;
009:        import org.jgroups.util.Util;
010:
011:        import java.util.*;
012:
013:        /**
014:         * Maintains a pool of sequence numbers of messages that need to be retransmitted. Messages
015:         * are aged and retransmission requests sent according to age (linear backoff used). If a
016:         * TimeScheduler instance is given to the constructor, it will be used, otherwise Reransmitter
017:         * will create its own. The retransmit timeouts have to be set first thing after creating an instance.
018:         * The <code>add()</code> method adds a range of sequence numbers of messages to be retransmitted. The
019:         * <code>remove()</code> method removes a sequence number again, cancelling retransmission requests for it.
020:         * Whenever a message needs to be retransmitted, the <code>RetransmitCommand.retransmit()</code> method is called.
021:         * It can be used e.g. by an ack-based scheme (e.g. AckSenderWindow) to retransmit a message to the receiver, or
022:         * by a nak-based scheme to send a retransmission request to the sender of the missing message.
023:         *
024:         * @author John Giorgiadis
025:         * @author Bela Ban
026:         * @version $Revision: 1.10.10.3 $
027:         */
028:        public class Retransmitter {
029:
030:            private static final long SEC = 1000;
031:            /** Default retransmit intervals (ms) - exponential approx. */
032:            private static long[] RETRANSMIT_TIMEOUTS = { 2 * SEC, 3 * SEC,
033:                    5 * SEC, 8 * SEC };
034:            /** Default retransmit thread suspend timeout (ms) */
035:            private static final long SUSPEND_TIMEOUT = 2000;
036:
037:            private Address sender = null;
038:            private final LinkedList msgs = new LinkedList(); // List<Entry> of elements to be retransmitted
039:            private RetransmitCommand cmd = null;
040:            private boolean timer_owned;
041:            private TimeScheduler timer = null;
042:            protected static final Log log = LogFactory
043:                    .getLog(Retransmitter.class);
044:
045:            /** Retransmit command (see Gamma et al.) used to retrieve missing messages */
046:            public interface RetransmitCommand {
047:                /**
048:                 * Get the missing messages between sequence numbers
049:                 * <code>first_seqno</code> and <code>last_seqno</code>. This can either be done by sending a
050:                 * retransmit message to destination <code>sender</code> (nak-based scheme), or by
051:                 * retransmitting the missing message(s) to <code>sender</code> (ack-based scheme).
052:                 * @param first_seqno The sequence number of the first missing message
053:                 * @param last_seqno  The sequence number of the last missing message
054:                 * @param sender The destination of the member to which the retransmit request will be sent
055:                 *               (nak-based scheme), or to which the message will be retransmitted (ack-based scheme).
056:                 */
057:                void retransmit(long first_seqno, long last_seqno,
058:                        Address sender);
059:            }
060:
061:            /**
062:             * Create a new Retransmitter associated with the given sender address
063:             * @param sender the address from which retransmissions are expected or to which retransmissions are sent
064:             * @param cmd the retransmission callback reference
065:             * @param sched retransmissions scheduler
066:             */
067:            public Retransmitter(Address sender, RetransmitCommand cmd,
068:                    TimeScheduler sched) {
069:                init(sender, cmd, sched, false);
070:            }
071:
072:            /**
073:             * Create a new Retransmitter associated with the given sender address
074:             * @param sender the address from which retransmissions are expected or to which retransmissions are sent
075:             * @param cmd the retransmission callback reference
076:             */
077:            public Retransmitter(Address sender, RetransmitCommand cmd) {
078:                init(sender, cmd, new TimeScheduler(), true);
079:            }
080:
081:            public void setRetransmitTimeouts(long[] timeouts) {
082:                if (timeouts != null)
083:                    RETRANSMIT_TIMEOUTS = timeouts;
084:            }
085:
086:            /**
087:             * Add the given range [first_seqno, last_seqno] in the list of
088:             * entries eligible for retransmission. If first_seqno > last_seqno,
089:             * then the range [last_seqno, first_seqno] is added instead
090:             * <p>
091:             * If retransmitter thread is suspended, wake it up
092:             */
093:            public void add(long first_seqno, long last_seqno) {
094:                Entry e;
095:
096:                if (first_seqno > last_seqno) {
097:                    long tmp = first_seqno;
098:                    first_seqno = last_seqno;
099:                    last_seqno = tmp;
100:                }
101:                synchronized (msgs) {
102:                    e = new Entry(first_seqno, last_seqno, RETRANSMIT_TIMEOUTS);
103:                    msgs.add(e);
104:                    timer.add(e);
105:                }
106:            }
107:
108:            /**
109:             * Remove the given sequence number from the list of seqnos eligible
110:             * for retransmission. If there are no more seqno intervals in the
111:             * respective entry, cancel the entry from the retransmission
112:             * scheduler and remove it from the pending entries
113:             */
114:            public void remove(long seqno) {
115:                Entry e;
116:
117:                synchronized (msgs) {
118:                    for (ListIterator it = msgs.listIterator(); it.hasNext();) {
119:                        e = (Entry) it.next();
120:                        if (seqno < e.low || seqno > e.high)
121:                            continue;
122:                        e.remove(seqno);
123:                        if (e.low > e.high) {
124:                            e.cancel();
125:                            it.remove();
126:                        }
127:                        break;
128:                    }
129:                }
130:            }
131:
132:            /**
133:             * Reset the retransmitter: clear all msgs and cancel all the
134:             * respective tasks
135:             */
136:            public void reset() {
137:                Entry entry;
138:
139:                synchronized (msgs) {
140:                    for (ListIterator it = msgs.listIterator(); it.hasNext();) {
141:                        entry = (Entry) it.next();
142:                        entry.cancel();
143:                    }
144:                    msgs.clear();
145:                }
146:            }
147:
148:            /**
149:             * Stop the rentransmition and clear all pending msgs.
150:             * <p>
151:             * If this retransmitter has been provided  an externally managed
152:             * scheduler, then just clear all msgs and the associated tasks, else
153:             * stop the scheduler. In this case the method blocks until the
154:             * scheduler's thread is dead. Only the owner of the scheduler should
155:             * stop it.
156:             */
157:            public void stop() {
158:                Entry entry;
159:
160:                // i. If retransmitter is owned, stop it else cancel all tasks
161:                // ii. Clear all pending msgs
162:                synchronized (msgs) {
163:                    if (timer_owned) {
164:                        try {
165:                            timer.stop();
166:                        } catch (InterruptedException ex) {
167:                            if (log.isErrorEnabled())
168:                                log.error("failed stopping retransmitter", ex);
169:                        }
170:                    } else {
171:                        for (ListIterator it = msgs.listIterator(); it
172:                                .hasNext();) {
173:                            entry = (Entry) it.next();
174:                            entry.cancel();
175:                        }
176:                    }
177:                    msgs.clear();
178:                }
179:            }
180:
181:            public String toString() {
182:                synchronized (msgs) {
183:                    int size = size();
184:                    StringBuffer sb = new StringBuffer();
185:                    sb.append(size).append(" messages to retransmit: ").append(
186:                            msgs);
187:                    return sb.toString();
188:                }
189:            }
190:
191:            public int size() {
192:                int size = 0;
193:                Entry entry;
194:                synchronized (msgs) {
195:                    for (Iterator it = msgs.iterator(); it.hasNext();) {
196:                        entry = (Retransmitter.Entry) it.next();
197:                        size += entry.size();
198:                    }
199:                }
200:                return size;
201:            }
202:
203:            /* ------------------------------- Private Methods -------------------------------------- */
204:
205:            /**
206:             * Init this object
207:             *
208:             * @param sender the address from which retransmissions are expected
209:             * @param cmd the retransmission callback reference
210:             * @param sched retransmissions scheduler
211:             * @param sched_owned whether the scheduler parameter is owned by this
212:             * object or is externally provided
213:             */
214:            private void init(Address sender, RetransmitCommand cmd,
215:                    TimeScheduler sched, boolean sched_owned) {
216:                this .sender = sender;
217:                this .cmd = cmd;
218:                timer_owned = sched_owned;
219:                timer = sched;
220:            }
221:
222:            /* ---------------------------- End of Private Methods ------------------------------------ */
223:
224:            /**
225:             * The retransmit task executed by the scheduler in regular intervals
226:             */
227:            private static abstract class Task implements  TimeScheduler.Task {
228:                private final Interval intervals;
229:                private boolean cancelled;
230:
231:                protected Task(long[] intervals) {
232:                    this .intervals = new Interval(intervals);
233:                    this .cancelled = false;
234:                }
235:
236:                public long nextInterval() {
237:                    return (intervals.next());
238:                }
239:
240:                public boolean cancelled() {
241:                    return (cancelled);
242:                }
243:
244:                public void cancel() {
245:                    cancelled = true;
246:                }
247:            }
248:
249:            /**
250:             * The entry associated with an initial group of missing messages
251:             * with contiguous sequence numbers and with all its subgroups.<br>
252:             * E.g.
253:             * - initial group: [5-34]
254:             * - msg 12 is acknowledged, now the groups are: [5-11], [13-34]
255:             * <p>
256:             * Groups are stored in a list as long[2] arrays of the each group's
257:             * bounds. For speed and convenience, the lowest & highest bounds of
258:             * all the groups in this entry are also stored separately
259:             */
260:            private class Entry extends Task {
261:                private long low;
262:                private long high;
263:                /** List<long[2]> of ranges to be retransmitted */
264:                final java.util.List list = new ArrayList();
265:
266:                public Entry(long low, long high, long[] intervals) {
267:                    super (intervals);
268:                    this .low = low;
269:                    this .high = high;
270:                    list.add(new long[] { low, high });
271:                }
272:
273:                /**
274:                 * Remove the given seqno and resize or partition groups as
275:                 * necessary. The algorithm is as follows:<br>
276:                 * i. Find the group with low <= seqno <= high
277:                 * ii. If seqno == low,
278:                 *	a. if low == high, then remove the group
279:                 *	Adjust global low. If global low was pointing to the group
280:                 * deleted in the previous step, set it to point to the next group.
281:                 * If there is no next group, set global low to be higher than
282:                 * global high. This way the entry is invalidated and will be removed
283:                 * all together from the pending msgs and the task scheduler
284:                 * iii. If seqno == high, adjust high, adjust global high if this is
285:                 * the group at the tail of the list
286:                 * iv. Else low < seqno < high, break [low,high] into [low,seqno-1]
287:                 * and [seqno+1,high]
288:                 *
289:                 * @param seqno the sequence number to remove
290:                 */
291:                public void remove(long seqno) {
292:                    int i;
293:                    long[] bounds = null, newBounds;
294:
295:                    synchronized (list) {
296:                        for (i = 0; i < list.size(); ++i) {
297:                            bounds = (long[]) list.get(i);
298:                            if (seqno < bounds[0] || seqno > bounds[1])
299:                                continue;
300:                            break;
301:                        }
302:                        if (i == list.size())
303:                            return;
304:
305:                        if (seqno == bounds[0]) {
306:                            if (bounds[0] == bounds[1])
307:                                list.remove(i);
308:                            else
309:                                bounds[0]++;
310:                            if (i == 0)
311:                                low = list.isEmpty() ? high + 1
312:                                        : ((long[]) list.get(i))[0];
313:                        } else if (seqno == bounds[1]) {
314:                            bounds[1]--;
315:                            if (i == list.size() - 1)
316:                                high = ((long[]) list.get(i))[1];
317:                        } else {
318:                            newBounds = new long[2];
319:                            newBounds[0] = seqno + 1;
320:                            newBounds[1] = bounds[1];
321:                            bounds[1] = seqno - 1;
322:                            list.add(i + 1, newBounds);
323:                        }
324:                    }
325:                }
326:
327:                /**
328:                 * Retransmission task:<br>
329:                 * For each interval, call the retransmission callback command
330:                 */
331:                public void run() {
332:                    long[] bounds;
333:                    List copy;
334:
335:                    synchronized (list) {
336:                        copy = new LinkedList(list);
337:                    }
338:
339:                    for (Iterator it = copy.iterator(); it.hasNext();) {
340:                        bounds = (long[]) it.next();
341:                        try {
342:                            cmd.retransmit(bounds[0], bounds[1], sender);
343:                        } catch (Throwable t) {
344:                            log.error("failure asking " + cmd
345:                                    + " for retransmission", t);
346:                        }
347:                    }
348:                }
349:
350:                int size() {
351:                    int size = 0;
352:                    long diff;
353:                    long[] tmp;
354:                    synchronized (list) {
355:                        for (Iterator it = list.iterator(); it.hasNext();) {
356:                            tmp = (long[]) it.next();
357:                            diff = tmp[1] - tmp[0] + 1;
358:                            size += diff;
359:                        }
360:                    }
361:
362:                    return size;
363:                }
364:
365:                public String toString() {
366:                    StringBuffer sb = new StringBuffer();
367:                    synchronized (list) {
368:                        long[] range;
369:                        boolean first = true;
370:                        for (Iterator it = list.iterator(); it.hasNext();) {
371:                            range = (long[]) it.next();
372:                            if (first) {
373:                                first = false;
374:                            } else {
375:                                sb.append(", ");
376:                            }
377:                            sb.append(range[0]).append('-').append(range[1]);
378:                        }
379:                    }
380:
381:                    return sb.toString();
382:                }
383:
384:            }
385:
386:            public static void main(String[] args) {
387:                Retransmitter xmitter;
388:                Address sender;
389:
390:                try {
391:                    sender = new org.jgroups.stack.IpAddress("localhost", 5555);
392:                    xmitter = new Retransmitter(sender, new MyXmitter());
393:                    xmitter.setRetransmitTimeouts(new long[] { 1000, 2000,
394:                            4000, 8000 });
395:
396:                    xmitter.add(1, 10);
397:                    System.out.println("retransmitter: " + xmitter);
398:                    xmitter.remove(1);
399:                    System.out.println("retransmitter: " + xmitter);
400:                    xmitter.remove(2);
401:                    System.out.println("retransmitter: " + xmitter);
402:                    xmitter.remove(4);
403:                    System.out.println("retransmitter: " + xmitter);
404:
405:                    Util.sleep(3000);
406:                    xmitter.remove(3);
407:                    System.out.println("retransmitter: " + xmitter);
408:
409:                    Util.sleep(1000);
410:                    xmitter.remove(10);
411:                    System.out.println("retransmitter: " + xmitter);
412:                    xmitter.remove(8);
413:                    System.out.println("retransmitter: " + xmitter);
414:                    xmitter.remove(6);
415:                    System.out.println("retransmitter: " + xmitter);
416:                    xmitter.remove(7);
417:                    System.out.println("retransmitter: " + xmitter);
418:                    xmitter.remove(9);
419:                    System.out.println("retransmitter: " + xmitter);
420:                    xmitter.remove(5);
421:                    System.out.println("retransmitter: " + xmitter);
422:                } catch (Exception e) {
423:                    log.error(e);
424:                }
425:            }
426:
427:            static class MyXmitter implements  Retransmitter.RetransmitCommand {
428:
429:                public void retransmit(long first_seqno, long last_seqno,
430:                        Address sender) {
431:                    System.out.println("-- " + new java.util.Date()
432:                            + ": retransmit(" + first_seqno + ", " + last_seqno
433:                            + ", " + sender + ')');
434:                }
435:            }
436:
437:            static void sleep(long timeout) {
438:                Util.sleep(timeout);
439:            }
440:
441:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.