Source Code Cross Referenced for AckMcastSenderWindow.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » stack » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.stack 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: AckMcastSenderWindow.java,v 1.10.6.1 2007/04/27 06:26:38 belaban Exp $
002:
003:        package org.jgroups.stack;
004:
005:        import org.apache.commons.logging.Log;
006:        import org.apache.commons.logging.LogFactory;
007:        import org.jgroups.Address;
008:        import org.jgroups.Message;
009:        import org.jgroups.util.TimeScheduler;
010:
011:        import java.io.PrintWriter;
012:        import java.io.StringWriter;
013:        import java.util.*;
014:
015:        /**
016:         * Keeps track of ACKs from receivers for each message. When a new message is
017:         * sent, it is tagged with a sequence number and the receiver set (set of
018:         * members to which the message is sent) and added to a hashtable
019:         * (key = sequence number, val = message + receiver set). Each incoming ACK
020:         * is noted and when all ACKs for a specific sequence number haven been
021:         * received, the corresponding entry is removed from the hashtable. A
022:         * retransmission thread periodically re-sends the message point-to-point to
023:         * all receivers from which no ACKs have been received yet. A view change or
024:         * suspect message causes the corresponding non-existing receivers to be
025:         * removed from the hashtable.
026:         * <p>
027:         * This class may need flow control in order to avoid needless
028:         * retransmissions because of timeouts.
029:         *
030:         * @author Bela Ban June 9 1999
031:         * @author John Georgiadis May 8 2001
032:         * @version $Revision: 1.10.6.1 $
033:         */
034:        public class AckMcastSenderWindow {
035:            /**
036:             * Called by retransmitter thread whenever a message needs to be re-sent
037:             * to a destination. <code>dest</code> has to be set in the
038:             * <code>dst</code> field of <code>msg</code>, as the latter was sent
039:             * multicast, but now we are sending a unicast message. Message has to be
040:             * copied before sending it (as headers will be appended and therefore
041:             * the message changed!).
042:             */
043:            public interface RetransmitCommand {
044:                /**
045:                 * Retranmit the given msg
046:                 *
047:                 * @param seqno the sequence number associated with the message
048:                 * @param msg the msg to retransmit (it should be a copy!)
049:                 * @param dest the msg destination
050:                 */
051:                void retransmit(long seqno, Message msg, Address dest);
052:            }
053:
054:            /**
055:             * The retransmit task executed by the scheduler in regular intervals
056:             */
057:            private static abstract class Task implements  TimeScheduler.Task {
058:                private final Interval intervals;
059:                private boolean cancelled;
060:
061:                protected Task(long[] intervals) {
062:                    this .intervals = new Interval(intervals);
063:                    this .cancelled = false;
064:                }
065:
066:                public long nextInterval() {
067:                    return (intervals.next());
068:                }
069:
070:                public void cancel() {
071:                    cancelled = true;
072:                }
073:
074:                public boolean cancelled() {
075:                    return (cancelled);
076:                }
077:            }
078:
079:            /**
080:             * The entry associated with a pending msg
081:             */
082:            private class Entry extends Task {
083:                /** The msg sequence number */
084:                public final long seqno;
085:                /** The msg to retransmit */
086:                public Message msg = null;
087:                /** destination addr -> boolean (true = received, false = not) */
088:                public final Hashtable senders = new Hashtable();
089:                /** How many destinations have received the msg */
090:                public int num_received = 0;
091:
092:                public Entry(long seqno, Message msg, Vector dests,
093:                        long[] intervals) {
094:                    super (intervals);
095:                    this .seqno = seqno;
096:                    this .msg = msg;
097:                    for (int i = 0; i < dests.size(); i++)
098:                        senders.put(dests.elementAt(i), Boolean.FALSE);
099:                }
100:
101:                boolean allReceived() {
102:                    return (num_received >= senders.size());
103:                }
104:
105:                /** Retransmit this entry */
106:                public void run() {
107:                    _retransmit(this );
108:                }
109:
110:                public String toString() {
111:                    StringBuffer buff = new StringBuffer();
112:                    buff.append("num_received = ").append(num_received).append(
113:                            ", received msgs = ").append(senders);
114:                    return (buff.toString());
115:                }
116:            }
117:
118:            private static final long SEC = 1000;
119:            /** Default retransmit intervals (ms) - exponential approx. */
120:            private static final long[] RETRANSMIT_TIMEOUTS = { 2 * SEC,
121:                    3 * SEC, 5 * SEC, 8 * SEC };
122:            /** Default retransmit thread suspend timeout (ms) */
123:            private static final long SUSPEND_TIMEOUT = 2000;
124:
125:            protected static final Log log = LogFactory
126:                    .getLog(AckMcastSenderWindow.class);
127:
128:            // Msg tables related
129:            /** Table of pending msgs: seqno -> Entry */
130:            private final Hashtable msgs = new Hashtable();
131:
132:            /** List of recently suspected members. Used to cease retransmission to suspected members */
133:            private final LinkedList suspects = new LinkedList();
134:
135:            /** Max number in suspects list */
136:            private static final int max_suspects = 20;
137:
138:            /**
139:             * List of acknowledged msgs since the last call to
140:             * <code>getStableMessages()</code>
141:             */
142:            private final Vector stable_msgs = new Vector();
143:            /** Whether a call to <code>waitUntilAcksReceived()</code> is still active */
144:            private boolean waiting = false;
145:
146:            // Retransmission thread related
147:            /** Whether retransmitter is externally provided or owned by this object */
148:            private boolean retransmitter_owned;
149:            /** The retransmission scheduler */
150:            private TimeScheduler retransmitter = null;
151:            /** Retransmission intervals */
152:            private long[] retransmit_intervals;
153:            /** The callback object for retransmission */
154:            private RetransmitCommand cmd = null;
155:
156:            /**
157:             * Convert exception stack trace to string
158:             */
159:            private static String _toString(Throwable ex) {
160:                StringWriter sw = new StringWriter();
161:                PrintWriter pw = new PrintWriter(sw);
162:                ex.printStackTrace(pw);
163:                return (sw.toString());
164:            }
165:
166:            /**
167:             * @param entry the record associated with the msg to retransmit. It
168:             * contains the list of receivers that haven't yet ack reception
169:             */
170:            private void _retransmit(Entry entry) {
171:                Address sender;
172:                boolean received;
173:
174:                synchronized (entry) {
175:                    for (Enumeration e = entry.senders.keys(); e
176:                            .hasMoreElements();) {
177:                        sender = (Address) e.nextElement();
178:                        received = ((Boolean) entry.senders.get(sender))
179:                                .booleanValue();
180:                        if (!received) {
181:                            if (suspects.contains(sender)) {
182:
183:                                if (log.isWarnEnabled())
184:                                    log
185:                                            .warn("removing "
186:                                                    + sender
187:                                                    + " from retransmit list as it is in the suspect list");
188:                                remove(sender);
189:                                continue;
190:                            }
191:
192:                            if (log.isInfoEnabled())
193:                                log.info("--> retransmitting msg #"
194:                                        + entry.seqno + " to " + sender);
195:                            cmd.retransmit(entry.seqno, entry.msg.copy(),
196:                                    sender);
197:                        }
198:                    }
199:                }
200:            }
201:
202:            /**
203:             * Setup this object's state
204:             *
205:             * @param cmd the callback object for retranmissions
206:             * @param retransmit_intervals the interval between two consecutive
207:             * retransmission attempts
208:             * @param sched the external scheduler to use to schedule retransmissions
209:             * @param sched_owned if true, the scheduler is owned by this object and
210:             * can be started/stopped/destroyed. If false, the scheduler is shared
211:             * among multiple objects and start()/stop() should not be called from
212:             * within this object
213:             *
214:             * @throws IllegalArgumentException if <code>cmd</code> is null
215:             */
216:            private void init(RetransmitCommand cmd,
217:                    long[] retransmit_intervals, TimeScheduler sched,
218:                    boolean sched_owned) {
219:                if (cmd == null) {
220:                    if (log.isErrorEnabled())
221:                        log.error("command is null. Cannot retransmit "
222:                                + "messages !");
223:                    throw new IllegalArgumentException("cmd");
224:                }
225:
226:                retransmitter_owned = sched_owned;
227:                retransmitter = sched;
228:                this .retransmit_intervals = retransmit_intervals;
229:                this .cmd = cmd;
230:
231:                start();
232:            }
233:
234:            /**
235:             * Create and <b>start</b> the retransmitter
236:             *
237:             * @param cmd the callback object for retranmissions
238:             * @param retransmit_intervals the interval between two consecutive
239:             * retransmission attempts
240:             * @param sched the external scheduler to use to schedule retransmissions
241:             *
242:             * @throws IllegalArgumentException if <code>cmd</code> is null
243:             */
244:            public AckMcastSenderWindow(RetransmitCommand cmd,
245:                    long[] retransmit_intervals, TimeScheduler sched) {
246:                init(cmd, retransmit_intervals, sched, false);
247:            }
248:
249:            /**
250:             * Create and <b>start</b> the retransmitter
251:             *
252:             * @param cmd the callback object for retranmissions
253:             * @param sched the external scheduler to use to schedule retransmissions
254:             *
255:             * @throws IllegalArgumentException if <code>cmd</code> is null
256:             */
257:            public AckMcastSenderWindow(RetransmitCommand cmd,
258:                    TimeScheduler sched) {
259:                init(cmd, RETRANSMIT_TIMEOUTS, sched, false);
260:            }
261:
262:            /**
263:             * Create and <b>start</b> the retransmitter
264:             *
265:             * @param cmd the callback object for retranmissions
266:             * @param retransmit_intervals the interval between two consecutive
267:             * retransmission attempts
268:             *
269:             * @throws IllegalArgumentException if <code>cmd</code> is null
270:             */
271:            public AckMcastSenderWindow(RetransmitCommand cmd,
272:                    long[] retransmit_intervals) {
273:                init(cmd, retransmit_intervals, new TimeScheduler(), true);
274:            }
275:
276:            /**
277:             * Create and <b>start</b> the retransmitter
278:             *
279:             * @param cmd the callback object for retranmissions
280:             *
281:             * @throws IllegalArgumentException if <code>cmd</code> is null
282:             */
283:            public AckMcastSenderWindow(RetransmitCommand cmd) {
284:                this (cmd, RETRANSMIT_TIMEOUTS);
285:            }
286:
287:            /**
288:             * Adds a new message to the hash table.
289:             *
290:             * @param seqno The sequence number associated with the message
291:             * @param msg The message (should be a copy!)
292:             * @param receivers The set of addresses to which the message was sent
293:             * and from which consequently an ACK is expected
294:             */
295:            public void add(long seqno, Message msg, Vector receivers) {
296:                Entry e;
297:
298:                if (waiting)
299:                    return;
300:                if (receivers.size() == 0)
301:                    return;
302:
303:                synchronized (msgs) {
304:                    if (msgs.get(new Long(seqno)) != null)
305:                        return;
306:                    e = new Entry(seqno, msg, receivers, retransmit_intervals);
307:                    msgs.put(new Long(seqno), e);
308:                    retransmitter.add(e);
309:                }
310:            }
311:
312:            /**
313:             * An ACK has been received from <code>sender</code>. Tag the sender in
314:             * the hash table as 'received'. If all ACKs have been received, remove
315:             * the entry all together.
316:             *
317:             * @param seqno  The sequence number of the message for which an ACK has
318:             * been received.
319:             * @param sender The sender which sent the ACK
320:             */
321:            public void ack(long seqno, Address sender) {
322:                Entry entry;
323:                Boolean received;
324:
325:                synchronized (msgs) {
326:                    entry = (Entry) msgs.get(new Long(seqno));
327:                    if (entry == null)
328:                        return;
329:
330:                    synchronized (entry) {
331:                        received = (Boolean) entry.senders.get(sender);
332:                        if (received == null || received.booleanValue())
333:                            return;
334:
335:                        // If not yet received
336:                        entry.senders.put(sender, Boolean.TRUE);
337:                        entry.num_received++;
338:                        if (!entry.allReceived())
339:                            return;
340:                    }
341:
342:                    synchronized (stable_msgs) {
343:                        entry.cancel();
344:                        msgs.remove(new Long(seqno));
345:                        stable_msgs.add(new Long(seqno));
346:                    }
347:                    // wake up waitUntilAllAcksReceived() method
348:                    msgs.notifyAll();
349:                }
350:            }
351:
352:            /**
353:             * Remove <code>obj</code> from all receiver sets and wake up
354:             * retransmission thread.
355:             *
356:             * @param obj the sender to remove
357:             */
358:            public void remove(Address obj) {
359:                Long key;
360:                Entry entry;
361:
362:                synchronized (msgs) {
363:                    for (Enumeration e = msgs.keys(); e.hasMoreElements();) {
364:                        key = (Long) e.nextElement();
365:                        entry = (Entry) msgs.get(key);
366:                        synchronized (entry) {
367:                            //if (((Boolean)entry.senders.remove(obj)).booleanValue()) entry.num_received--;
368:                            //if (!entry.allReceived()) continue;
369:                            Boolean received = (Boolean) entry.senders
370:                                    .remove(obj);
371:                            if (received == null)
372:                                continue; // suspected member not in entry.senders ?
373:                            if (received.booleanValue())
374:                                entry.num_received--;
375:                            if (!entry.allReceived())
376:                                continue;
377:                        }
378:                        synchronized (stable_msgs) {
379:                            entry.cancel();
380:                            msgs.remove(key);
381:                            stable_msgs.add(key);
382:                        }
383:                        // wake up waitUntilAllAcksReceived() method
384:                        msgs.notifyAll();
385:                    }
386:                }
387:            }
388:
389:            /**
390:             * Process with address <code>suspected</code> is suspected: remove it
391:             * from all receiver sets. This means that no ACKs are expected from this
392:             * process anymore.
393:             *
394:             * @param suspected The suspected process
395:             */
396:            public void suspect(Address suspected) {
397:
398:                if (log.isInfoEnabled())
399:                    log.info("suspect is " + suspected);
400:                remove(suspected);
401:                suspects.add(suspected);
402:                if (suspects.size() >= max_suspects)
403:                    suspects.removeFirst();
404:            }
405:
406:            /**
407:             * @return a copy of stable messages, or null (if non available). Removes
408:             * all stable messages afterwards
409:             */
410:            public Vector getStableMessages() {
411:                Vector retval;
412:
413:                synchronized (stable_msgs) {
414:                    retval = (stable_msgs.size() > 0) ? (Vector) stable_msgs
415:                            .clone() : null;
416:                    if (stable_msgs.size() > 0)
417:                        stable_msgs.clear();
418:                }
419:
420:                return (retval);
421:            }
422:
423:            public void clearStableMessages() {
424:                synchronized (stable_msgs) {
425:                    stable_msgs.clear();
426:                }
427:            }
428:
429:            /**
430:             * @return the number of currently pending msgs
431:             */
432:            public long size() {
433:                synchronized (msgs) {
434:                    return (msgs.size());
435:                }
436:            }
437:
438:            /** Returns the number of members for a given entry for which acks have to be received */
439:            public long getNumberOfResponsesExpected(long seqno) {
440:                Entry entry = (Entry) msgs.get(new Long(seqno));
441:                if (entry != null)
442:                    return entry.senders.size();
443:                else
444:                    return -1;
445:            }
446:
447:            /** Returns the number of members for a given entry for which acks have been received */
448:            public long getNumberOfResponsesReceived(long seqno) {
449:                Entry entry = (Entry) msgs.get(new Long(seqno));
450:                if (entry != null)
451:                    return entry.num_received;
452:                else
453:                    return -1;
454:            }
455:
456:            /** Prints all members plus whether an ack has been received from those members for a given seqno */
457:            public String printDetails(long seqno) {
458:                Entry entry = (Entry) msgs.get(new Long(seqno));
459:                if (entry != null)
460:                    return entry.toString();
461:                else
462:                    return null;
463:            }
464:
465:            /**
466:             * Waits until all outstanding messages have been ACKed by all receivers.
467:             * Takes into account suspicions and view changes. Returns when there are
468:             * no entries left in the hashtable. <b>While waiting, no entries can be
469:             * added to the hashtable (they will be discarded).</b>
470:             *
471:             * @param timeout Miliseconds to wait. 0 means wait indefinitely.
472:             */
473:            public void waitUntilAllAcksReceived(long timeout) {
474:                long time_to_wait, start_time, current_time;
475:                Address suspect;
476:
477:                // remove all suspected members from retransmission
478:                for (Iterator it = suspects.iterator(); it.hasNext();) {
479:                    suspect = (Address) it.next();
480:                    remove(suspect);
481:                }
482:
483:                time_to_wait = timeout;
484:                waiting = true;
485:                if (timeout <= 0) {
486:                    synchronized (msgs) {
487:                        while (msgs.size() > 0)
488:                            try {
489:                                msgs.wait();
490:                            } catch (InterruptedException ex) {
491:                            }
492:                    }
493:                } else {
494:                    start_time = System.currentTimeMillis();
495:                    synchronized (msgs) {
496:                        while (msgs.size() > 0) {
497:                            current_time = System.currentTimeMillis();
498:                            time_to_wait = timeout
499:                                    - (current_time - start_time);
500:                            if (time_to_wait <= 0)
501:                                break;
502:
503:                            try {
504:                                msgs.wait(time_to_wait);
505:                            } catch (InterruptedException ex) {
506:                                if (log.isWarnEnabled())
507:                                    log.warn(ex.toString());
508:                            }
509:                        }
510:                    }
511:                }
512:                waiting = false;
513:            }
514:
515:            /**
516:             * Start the retransmitter. This has no effect, if the retransmitter
517:             * was externally provided
518:             */
519:            public void start() {
520:                if (retransmitter_owned)
521:                    retransmitter.start();
522:            }
523:
524:            /**
525:             * Stop the rentransmition and clear all pending msgs.
526:             * <p>
527:             * If this retransmitter has been provided an externally managed
528:             * scheduler, then just clear all msgs and the associated tasks, else
529:             * stop the scheduler. In this case the method blocks until the
530:             * scheduler's thread is dead. Only the owner of the scheduler should
531:             * stop it.
532:             */
533:            public void stop() {
534:                Entry entry;
535:
536:                // i. If retransmitter is owned, stop it else cancel all tasks
537:                // ii. Clear all pending msgs and notify anyone waiting
538:                synchronized (msgs) {
539:                    if (retransmitter_owned) {
540:                        try {
541:                            retransmitter.stop();
542:                        } catch (InterruptedException ex) {
543:                            if (log.isErrorEnabled())
544:                                log.error(_toString(ex));
545:                        }
546:                    } else {
547:                        for (Enumeration e = msgs.elements(); e
548:                                .hasMoreElements();) {
549:                            entry = (Entry) e.nextElement();
550:                            entry.cancel();
551:                        }
552:                    }
553:                    msgs.clear();
554:                    // wake up waitUntilAllAcksReceived() method
555:                    msgs.notifyAll();
556:                }
557:            }
558:
559:            /**
560:             * Remove all pending msgs from the hashtable. Cancel all associated
561:             * tasks in the retransmission scheduler
562:             */
563:            public void reset() {
564:                Entry entry;
565:
566:                if (waiting)
567:                    return;
568:
569:                synchronized (msgs) {
570:                    for (Enumeration e = msgs.elements(); e.hasMoreElements();) {
571:                        entry = (Entry) e.nextElement();
572:                        entry.cancel();
573:                    }
574:                    msgs.clear();
575:                    msgs.notifyAll();
576:                }
577:            }
578:
579:            public String toString() {
580:                StringBuffer ret;
581:                Entry entry;
582:                Long key;
583:
584:                ret = new StringBuffer();
585:                synchronized (msgs) {
586:                    ret.append("msgs: (").append(msgs.size()).append(')');
587:                    for (Enumeration e = msgs.keys(); e.hasMoreElements();) {
588:                        key = (Long) e.nextElement();
589:                        entry = (Entry) msgs.get(key);
590:                        ret.append("key = ").append(key).append(", value = ")
591:                                .append(entry).append('\n');
592:                    }
593:                    synchronized (stable_msgs) {
594:                        ret.append("\nstable_msgs: ").append(stable_msgs);
595:                    }
596:                }
597:
598:                return (ret.toString());
599:            }
600:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.