Source Code Cross Referenced for NakReceiverWindow.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » stack » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.stack 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: NakReceiverWindow.java,v 1.28 2006/01/14 14:00:42 belaban Exp $
002:
003:        package org.jgroups.stack;
004:
005:        import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
006:        import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
007:        import org.apache.commons.logging.Log;
008:        import org.apache.commons.logging.LogFactory;
009:        import org.jgroups.Address;
010:        import org.jgroups.Message;
011:        import org.jgroups.util.List;
012:        import org.jgroups.util.TimeScheduler;
013:
014:        import java.util.*;
015:
016:        /**
017:         * Keeps track of messages according to their sequence numbers. Allows
018:         * messages to be added out of order, and with gaps between sequence numbers.
019:         * Method <code>remove()</code> removes the first message with a sequence
020:         * number that is 1 higher than <code>next_to_remove</code> (this variable is
021:         * then incremented), or it returns null if no message is present, or if no
022:         * message's sequence number is 1 higher.
023:         * <p>
024:         * When there is a gap upon adding a message, its seqno will be added to the
025:         * Retransmitter, which (using a timer) requests retransmissions of missing
026:         * messages and keeps on trying until the message has been received, or the
027:         * member who sent the message is suspected.
028:         * <p>
029:         * Started out as a copy of SlidingWindow. Main diff: RetransmitCommand is
030:         * different, and retransmission thread is only created upon detection of a
031:         * gap.
032:         * <p>
033:         * Change Nov 24 2000 (bela): for PBCAST, which has its own retransmission
034:         * (via gossip), the retransmitter thread can be turned off
035:         * <p>
036:         * Change April 25 2001 (igeorg):<br>
037:         * i. Restructuring: placed all nested class definitions at the top, then
038:         * class static/non-static variables, then class private/public methods.<br>
039:         * ii. Class and all nested classes are thread safe. Readers/writer lock
040:         * added on <tt>NakReceiverWindow</tt> for finer grained locking.<br>
041:         * iii. Internal or externally provided retransmission scheduler thread.<br>
042:         * iv. Exponential backoff in time for retransmissions.<br>
043:         *
044:         * @author Bela Ban May 27 1999, May 2004
045:         * @author John Georgiadis May 8 2001
046:         */
047:        public class NakReceiverWindow {
048:
049:            public interface Listener {
050:                void missingMessageReceived(long seqno, Message msg);
051:            }
052:
053:            /** The big read/write lock */
054:            private final ReadWriteLock lock = new WriterPreferenceReadWriteLock();
055:            //private final ReadWriteLock lock=new NullReadWriteLock();
056:
057:            /** keep track of *next* seqno to remove and highest received */
058:            private long head = 0;
059:            private long tail = 0;
060:
061:            /** lowest seqno delivered so far */
062:            private long lowest_seen = 0;
063:
064:            /** highest deliverable (or delivered) seqno so far */
065:            private long highest_seen = 0;
066:
067:            /** TreeMap<Long,Message>. Maintains messages keyed by (sorted) sequence numbers */
068:            private final TreeMap received_msgs = new TreeMap();
069:
070:            /** TreeMap<Long,Message>. Delivered (= seen by all members) messages. A remove() method causes a message to be
071:             moved from received_msgs to delivered_msgs. Message garbage collection will gradually remove elements in this map */
072:            private final TreeMap delivered_msgs = new TreeMap();
073:
074:            /**
075:             * Messages that have been received in order are sent up the stack (= delivered to the application). Delivered
076:             * messages are removed from NakReceiverWindow.received_msgs and moved to NakReceiverWindow.delivered_msgs, where
077:             * they are later garbage collected (by STABLE). Since we do retransmits only from sent messages, never
078:             * received or delivered messages, we can turn the moving to delivered_msgs off, so we don't keep the message
079:             * around, and don't need to wait for garbage collection to remove them.
080:             */
081:            private boolean discard_delivered_msgs = false;
082:
083:            /** If value is > 0, the retransmit buffer is bounded: only the max_xmit_buf_size latest messages are kept,
084:             * older ones are discarded when the buffer size is exceeded. A value <= 0 means unbounded buffers
085:             */
086:            private int max_xmit_buf_size = 0;
087:
088:            /** if not set, no retransmitter thread will be started. Useful if
089:             * protocols do their own retransmission (e.g PBCAST) */
090:            private Retransmitter retransmitter = null;
091:
092:            private Listener listener = null;
093:
094:            protected static final Log log = LogFactory
095:                    .getLog(NakReceiverWindow.class);
096:
097:            /**
098:             * Creates a new instance with the given retransmit command
099:             *
100:             * @param sender The sender associated with this instance
101:             * @param cmd The command used to retransmit a missing message, will
102:             * be invoked by the table. If null, the retransmit thread will not be
103:             * started
104:             * @param start_seqno The first sequence number to be received
105:             * @param sched the external scheduler to use for retransmission
106:             * requests of missing msgs. If it's not provided or is null, an internal
107:             * one is created
108:             */
109:            public NakReceiverWindow(Address sender,
110:                    Retransmitter.RetransmitCommand cmd, long start_seqno,
111:                    TimeScheduler sched) {
112:                head = start_seqno;
113:                tail = head;
114:
115:                if (cmd != null)
116:                    retransmitter = sched == null ? new Retransmitter(sender,
117:                            cmd) : new Retransmitter(sender, cmd, sched);
118:            }
119:
120:            /**
121:             * Creates a new instance with the given retransmit command
122:             *
123:             * @param sender The sender associated with this instance
124:             * @param cmd The command used to retransmit a missing message, will
125:             * be invoked by the table. If null, the retransmit thread will not be
126:             * started
127:             * @param start_seqno The first sequence number to be received
128:             */
129:            public NakReceiverWindow(Address sender,
130:                    Retransmitter.RetransmitCommand cmd, long start_seqno) {
131:                this (sender, cmd, start_seqno, null);
132:            }
133:
134:            /**
135:             * Creates a new instance without a retransmission thread
136:             *
137:             * @param sender The sender associated with this instance
138:             * @param start_seqno The first sequence number to be received
139:             */
140:            public NakReceiverWindow(Address sender, long start_seqno) {
141:                this (sender, null, start_seqno);
142:            }
143:
144:            public void setRetransmitTimeouts(long[] timeouts) {
145:                if (retransmitter != null)
146:                    retransmitter.setRetransmitTimeouts(timeouts);
147:            }
148:
149:            public void setDiscardDeliveredMessages(boolean flag) {
150:                this .discard_delivered_msgs = flag;
151:            }
152:
153:            public int getMaxXmitBufSize() {
154:                return max_xmit_buf_size;
155:            }
156:
157:            public void setMaxXmitBufSize(int max_xmit_buf_size) {
158:                this .max_xmit_buf_size = max_xmit_buf_size;
159:            }
160:
161:            public void setListener(Listener l) {
162:                this .listener = l;
163:            }
164:
165:            /**
166:             * Adds a message according to its sequence number (ordered).
167:             * <p>
168:             * Variables <code>head</code> and <code>tail</code> mark the start and
169:             * end of the messages received, but not delivered yet. When a message is
170:             * received, if its seqno is smaller than <code>head</code>, it is
171:             * discarded (already received). If it is bigger than <code>tail</code>,
172:             * we advance <code>tail</code> and add empty elements. If it is between
173:             * <code>head</code> and <code>tail</code>, we set the corresponding
174:             * missing (or already present) element. If it is equal to
175:             * <code>tail</code>, we advance the latter by 1 and add the message
176:             * (default case).
177:             */
178:            public void add(long seqno, Message msg) {
179:                long old_tail;
180:
181:                try {
182:                    lock.writeLock().acquire();
183:                    try {
184:                        old_tail = tail;
185:                        if (seqno < head) {
186:                            if (log.isTraceEnabled()) {
187:                                StringBuffer sb = new StringBuffer("seqno ");
188:                                sb.append(seqno).append(" is smaller than ")
189:                                        .append(head).append(
190:                                                "); discarding message");
191:                                log.trace(sb.toString());
192:                            }
193:                            return;
194:                        }
195:
196:                        // add at end (regular expected msg)
197:                        if (seqno == tail) {
198:                            received_msgs.put(new Long(seqno), msg);
199:                            tail++;
200:                            if (highest_seen + 2 == tail) {
201:                                highest_seen++;
202:                            } else {
203:                                updateHighestSeen();
204:                            }
205:
206:                            // highest_seen=seqno;
207:                        }
208:                        // gap detected
209:                        // i. add placeholders, creating gaps
210:                        // ii. add real msg
211:                        // iii. tell retransmitter to retrieve missing msgs
212:                        else if (seqno > tail) {
213:                            for (long i = tail; i < seqno; i++) {
214:                                received_msgs.put(new Long(i), null);
215:                                // XmitEntry xmit_entry=new XmitEntry();
216:                                //xmits.put(new Long(i), xmit_entry);
217:                                tail++;
218:                            }
219:                            received_msgs.put(new Long(seqno), msg);
220:                            tail = seqno + 1;
221:                            if (retransmitter != null) {
222:                                retransmitter.add(old_tail, seqno - 1);
223:                            }
224:                        } else if (seqno < tail) { // finally received missing message
225:                            if (log.isTraceEnabled()) {
226:                                log
227:                                        .trace(new StringBuffer(
228:                                                "added missing msg ").append(
229:                                                msg.getSrc()).append('#')
230:                                                .append(seqno));
231:                            }
232:                            if (listener != null) {
233:                                try {
234:                                    listener.missingMessageReceived(seqno, msg);
235:                                } catch (Throwable t) {
236:                                }
237:                            }
238:
239:                            Object val = received_msgs.get(new Long(seqno));
240:                            if (val == null) {
241:                                // only set message if not yet received (bela July 23 2003)
242:                                received_msgs.put(new Long(seqno), msg);
243:
244:                                if (highest_seen + 1 == seqno || seqno == head)
245:                                    updateHighestSeen();
246:
247:                                //XmitEntry xmit_entry=(XmitEntry)xmits.get(new Long(seqno));
248:                                //if(xmit_entry != null)
249:                                //  xmit_entry.received=System.currentTimeMillis();
250:                                //long xmit_diff=xmit_entry == null? -1 : xmit_entry.received - xmit_entry.created;
251:                                //NAKACK.addXmitResponse(msg.getSrc(), seqno);
252:                                if (retransmitter != null)
253:                                    retransmitter.remove(seqno);
254:                            }
255:                        }
256:                        updateLowestSeen();
257:                    } finally {
258:                        lock.writeLock().release();
259:                    }
260:                } catch (InterruptedException e) {
261:                    log.error("failed acquiring write lock", e);
262:                }
263:            }
264:
265:            /** Start from the current sequence number and set highest_seen until we find a gap (null value in the entry) */
266:            void updateHighestSeen() {
267:                SortedMap map = received_msgs.tailMap(new Long(highest_seen));
268:                Map.Entry entry;
269:                for (Iterator it = map.entrySet().iterator(); it.hasNext();) {
270:                    entry = (Map.Entry) it.next();
271:                    if (entry.getValue() != null)
272:                        highest_seen = ((Long) entry.getKey()).longValue();
273:                    else
274:                        break;
275:                }
276:            }
277:
278:            public Message remove() {
279:                Message retval = null;
280:                Long key;
281:                boolean bounded_buffer_enabled = max_xmit_buf_size > 0;
282:
283:                try {
284:                    lock.writeLock().acquire();
285:                    try {
286:                        while (received_msgs.size() > 0) {
287:                            key = (Long) received_msgs.firstKey();
288:                            retval = (Message) received_msgs.get(key);
289:                            if (retval != null) { // message exists and is ready for delivery
290:                                received_msgs.remove(key); // move from received_msgs to ...
291:                                if (discard_delivered_msgs == false) {
292:                                    delivered_msgs.put(key, retval); // delivered_msgs
293:                                }
294:                                head++; // is removed from retransmitter somewhere else (when missing message is received)
295:                                return retval;
296:                            } else { // message has not yet been received (gap in the message sequence stream)
297:                                if (bounded_buffer_enabled
298:                                        && received_msgs.size() > max_xmit_buf_size) {
299:                                    received_msgs.remove(key); // move from received_msgs to ...
300:                                    head++;
301:                                    retransmitter.remove(key.longValue());
302:                                } else {
303:                                    break;
304:                                }
305:                            }
306:                        }
307:                        return retval;
308:                    } finally {
309:                        lock.writeLock().release();
310:                    }
311:                } catch (InterruptedException e) {
312:                    log.error("failed acquiring write lock", e);
313:                    return null;
314:                }
315:            }
316:
317:            /**
318:             * Delete all messages <= seqno (they are stable, that is, have been
319:             * received at all members). Stop when a number > seqno is encountered
320:             * (all messages are ordered on seqnos).
321:             */
322:            public void stable(long seqno) {
323:                try {
324:                    lock.writeLock().acquire();
325:                    try {
326:                        // we need to remove all seqnos *including* seqno: because headMap() *excludes* seqno, we
327:                        // simply increment it, so we have to correct behavior
328:                        SortedMap m = delivered_msgs
329:                                .headMap(new Long(seqno + 1));
330:                        if (m.size() > 0)
331:                            lowest_seen = Math.max(lowest_seen, ((Long) m
332:                                    .lastKey()).longValue());
333:                        m.clear(); // removes entries from delivered_msgs
334:                    } finally {
335:                        lock.writeLock().release();
336:                    }
337:                } catch (InterruptedException e) {
338:                    log.error("failed acquiring write lock", e);
339:                }
340:            }
341:
342:            /**
343:             * Reset the retransmitter and the nak window<br>
344:             */
345:            public void reset() {
346:                try {
347:                    lock.writeLock().acquire();
348:                    try {
349:                        if (retransmitter != null)
350:                            retransmitter.reset();
351:                        _reset();
352:                    } finally {
353:                        lock.writeLock().release();
354:                    }
355:                } catch (InterruptedException e) {
356:                    log.error("failed acquiring write lock", e);
357:                }
358:            }
359:
360:            /**
361:             * Stop the retransmitter and reset the nak window<br>
362:             */
363:            public void destroy() {
364:                try {
365:                    lock.writeLock().acquire();
366:                    try {
367:                        if (retransmitter != null)
368:                            retransmitter.stop();
369:                        _reset();
370:                    } finally {
371:                        lock.writeLock().release();
372:                    }
373:                } catch (InterruptedException e) {
374:                    log.error("failed acquiring write lock", e);
375:                }
376:            }
377:
378:            /**
379:             * @return the highest sequence number of a message consumed by the
380:             * application (by <code>remove()</code>)
381:             */
382:            public long getHighestDelivered() {
383:                try {
384:                    lock.readLock().acquire();
385:                    try {
386:                        return (Math.max(head - 1, -1));
387:                    } finally {
388:                        lock.readLock().release();
389:                    }
390:                } catch (InterruptedException e) {
391:                    log.error("failed acquiring read lock", e);
392:                    return -1;
393:                }
394:            }
395:
396:            /**
397:             * @return the lowest sequence number of a message that has been
398:             * delivered or is a candidate for delivery (by the next call to
399:             * <code>remove()</code>)
400:             */
401:            public long getLowestSeen() {
402:                try {
403:                    lock.readLock().acquire();
404:                    try {
405:                        return (lowest_seen);
406:                    } finally {
407:                        lock.readLock().release();
408:                    }
409:                } catch (InterruptedException e) {
410:                    log.error("failed acquiring read lock", e);
411:                    return -1;
412:                }
413:            }
414:
415:            /**
416:             * Returns the highest deliverable seqno; e.g., for 1,2,3,5,6 it would
417:             * be 3.
418:             *
419:             * @see NakReceiverWindow#getHighestReceived
420:             */
421:            public long getHighestSeen() {
422:                try {
423:                    lock.readLock().acquire();
424:                    try {
425:                        return (highest_seen);
426:                    } finally {
427:                        lock.readLock().release();
428:                    }
429:                } catch (InterruptedException e) {
430:                    log.error("failed acquiring read lock", e);
431:                    return -1;
432:                }
433:            }
434:
435:            /**
436:             * Find all messages between 'low' and 'high' (including 'low' and
437:             * 'high') that have a null msg.
438:             * Return them as a list of longs
439:             *
440:             * @return List<Long>. A list of seqnos, sorted in ascending order.
441:             * E.g. [1, 4, 7, 8]
442:             */
443:            public List getMissingMessages(long low, long high) {
444:                List retval = new List();
445:                // long my_high;
446:
447:                if (low > high) {
448:                    if (log.isErrorEnabled())
449:                        log.error("invalid range: low (" + low
450:                                + ") is higher than high (" + high + ')');
451:                    return null;
452:                }
453:
454:                try {
455:                    lock.readLock().acquire();
456:                    try {
457:
458:                        // my_high=Math.max(head - 1, 0);
459:                        // check only received messages, because delivered messages *must* have a non-null msg
460:                        SortedMap m = received_msgs.subMap(new Long(low),
461:                                new Long(high + 1));
462:                        for (Iterator it = m.keySet().iterator(); it.hasNext();) {
463:                            retval.add(it.next());
464:                        }
465:
466:                        //            if(received_msgs.size() > 0) {
467:                        //                entry=(Entry)received_msgs.peek();
468:                        //                if(entry != null) my_high=entry.seqno;
469:                        //            }
470:                        //            for(long i=my_high + 1; i <= high; i++)
471:                        //                retval.add(new Long(i));
472:
473:                        return retval;
474:                    } finally {
475:                        lock.readLock().release();
476:                    }
477:                } catch (InterruptedException e) {
478:                    log.error("failed acquiring read lock", e);
479:                    return null;
480:                }
481:            }
482:
483:            /**
484:             * Returns the highest sequence number received so far (which may be
485:             * higher than the highest seqno <em>delivered</em> so far; e.g., for
486:             * 1,2,3,5,6 it would be 6.
487:             *
488:             * @see NakReceiverWindow#getHighestSeen
489:             */
490:            public long getHighestReceived() {
491:                try {
492:                    lock.readLock().acquire();
493:                    try {
494:                        return Math.max(tail - 1, -1);
495:                    } finally {
496:                        lock.readLock().release();
497:                    }
498:                } catch (InterruptedException e) {
499:                    log.error("failed acquiring read lock", e);
500:                    return -1;
501:                }
502:            }
503:
504:            /**
505:             * Return messages that are higher than <code>seqno</code> (excluding
506:             * <code>seqno</code>). Check both received <em>and</em> delivered
507:             * messages.
508:             * @return List<Message>. All messages that have a seqno greater than <code>seqno</code>
509:             */
510:            public List getMessagesHigherThan(long seqno) {
511:                List retval = new List();
512:
513:                try {
514:                    lock.readLock().acquire();
515:                    try {
516:                        // check received messages
517:                        SortedMap m = received_msgs
518:                                .tailMap(new Long(seqno + 1));
519:                        for (Iterator it = m.values().iterator(); it.hasNext();) {
520:                            retval.add((it.next()));
521:                        }
522:
523:                        // we retrieve all msgs whose seqno is strictly greater than seqno (tailMap() *includes* seqno,
524:                        // but we need to exclude seqno, that's why we increment it
525:                        m = delivered_msgs.tailMap(new Long(seqno + 1));
526:                        for (Iterator it = m.values().iterator(); it.hasNext();) {
527:                            retval.add(((Message) it.next()).copy());
528:                        }
529:                        return (retval);
530:
531:                    } finally {
532:                        lock.readLock().release();
533:                    }
534:                } catch (InterruptedException e) {
535:                    log.error("failed acquiring read lock", e);
536:                    return null;
537:                }
538:            }
539:
540:            /**
541:             * Return all messages m for which the following holds:
542:             * m > lower && m <= upper (excluding lower, including upper). Check both
543:             * <code>received_msgs</code> and <code>delivered_msgs</code>.
544:             */
545:            public List getMessagesInRange(long lower, long upper) {
546:                List retval = new List();
547:
548:                try {
549:                    lock.readLock().acquire();
550:                    try {
551:                        // check received messages
552:                        SortedMap m = received_msgs.subMap(new Long(lower + 1),
553:                                new Long(upper + 1));
554:                        for (Iterator it = m.values().iterator(); it.hasNext();) {
555:                            retval.add(it.next());
556:                        }
557:
558:                        m = delivered_msgs.subMap(new Long(lower + 1),
559:                                new Long(upper + 1));
560:                        for (Iterator it = m.values().iterator(); it.hasNext();) {
561:                            retval.add(((Message) it.next()).copy());
562:                        }
563:                        return retval;
564:
565:                    } finally {
566:                        lock.readLock().release();
567:                    }
568:                } catch (InterruptedException e) {
569:                    log.error("failed acquiring read lock", e);
570:                    return null;
571:                }
572:            }
573:
574:            /**
575:             * Return a list of all messages for which there is a seqno in
576:             * <code>missing_msgs</code>. The seqnos of the argument list are
577:             * supposed to be in ascending order
578:             * @param missing_msgs A List<Long> of seqnos
579:             * @return List<Message>
580:             */
581:            public List getMessagesInList(List missing_msgs) {
582:                List ret = new List();
583:
584:                if (missing_msgs == null) {
585:                    if (log.isErrorEnabled())
586:                        log.error("argument list is null");
587:                    return ret;
588:                }
589:
590:                try {
591:                    lock.readLock().acquire();
592:                    try {
593:                        Long seqno;
594:                        Message msg;
595:                        for (Enumeration en = missing_msgs.elements(); en
596:                                .hasMoreElements();) {
597:                            seqno = (Long) en.nextElement();
598:                            msg = (Message) delivered_msgs.get(seqno);
599:                            if (msg != null)
600:                                ret.add(msg.copy());
601:                            msg = (Message) received_msgs.get(seqno);
602:                            if (msg != null)
603:                                ret.add(msg.copy());
604:                        }
605:                        return ret;
606:                    } finally {
607:                        lock.readLock().release();
608:                    }
609:                } catch (InterruptedException e) {
610:                    log.error("failed acquiring read lock", e);
611:                    return null;
612:                }
613:            }
614:
615:            /**
616:             * Returns the message from received_msgs or delivered_msgs.
617:             * @param sequence_num
618:             * @return Message from received_msgs or delivered_msgs.
619:             */
620:            public Message get(long sequence_num) {
621:                Message msg;
622:                Long seqno = new Long(sequence_num);
623:                try {
624:                    lock.readLock().acquire();
625:                    try {
626:                        msg = (Message) delivered_msgs.get(seqno);
627:                        if (msg != null)
628:                            return msg;
629:                        msg = (Message) received_msgs.get(seqno);
630:                        if (msg != null)
631:                            return msg;
632:                    } finally {
633:                        lock.readLock().release();
634:                    }
635:                } catch (InterruptedException e) {
636:                    log.error("failed acquiring read lock", e);
637:                }
638:                return null;
639:            }
640:
641:            public int size() {
642:                boolean acquired = false;
643:                try {
644:                    lock.readLock().acquire();
645:                    acquired = true;
646:                } catch (InterruptedException e) {
647:                }
648:                try {
649:                    return received_msgs.size();
650:                } finally {
651:                    if (acquired)
652:                        lock.readLock().release();
653:                }
654:            }
655:
656:            public String toString() {
657:                StringBuffer sb = new StringBuffer();
658:                try {
659:                    lock.readLock().acquire();
660:                    try {
661:                        sb.append("received_msgs: ").append(
662:                                printReceivedMessages());
663:                        sb.append(", delivered_msgs: ").append(
664:                                printDeliveredMessages());
665:                    } finally {
666:                        lock.readLock().release();
667:                    }
668:                } catch (InterruptedException e) {
669:                    log.error("failed acquiring read lock", e);
670:                    return "";
671:                }
672:
673:                return sb.toString();
674:            }
675:
676:            /**
677:             * Prints delivered_msgs. Requires read lock present.
678:             * @return String
679:             */
680:            String printDeliveredMessages() {
681:                StringBuffer sb = new StringBuffer();
682:                Long min = null, max = null;
683:
684:                if (delivered_msgs.size() > 0) {
685:                    try {
686:                        min = (Long) delivered_msgs.firstKey();
687:                    } catch (NoSuchElementException ex) {
688:                    }
689:                    try {
690:                        max = (Long) delivered_msgs.lastKey();
691:                    } catch (NoSuchElementException ex) {
692:                    }
693:                }
694:                sb.append('[').append(min).append(" - ").append(max)
695:                        .append(']');
696:                if (min != null && max != null)
697:                    sb.append(" (size=").append(
698:                            max.longValue() - min.longValue()).append(")");
699:                return sb.toString();
700:            }
701:
702:            /**
703:             * Prints received_msgs. Requires read lock to be present
704:             * @return String
705:             */
706:            String printReceivedMessages() {
707:                StringBuffer sb = new StringBuffer();
708:                sb.append('[');
709:                if (received_msgs.size() > 0) {
710:                    Long first = null, last = null;
711:                    try {
712:                        first = (Long) received_msgs.firstKey();
713:                    } catch (NoSuchElementException ex) {
714:                    }
715:                    try {
716:                        last = (Long) received_msgs.lastKey();
717:                    } catch (NoSuchElementException ex) {
718:                    }
719:                    sb.append(first).append(" - ").append(last);
720:                    int non_received = 0;
721:                    Map.Entry entry;
722:
723:                    for (Iterator it = received_msgs.entrySet().iterator(); it
724:                            .hasNext();) {
725:                        entry = (Map.Entry) it.next();
726:                        if (entry.getValue() == null)
727:                            non_received++;
728:                    }
729:                    sb.append(" (size=").append(received_msgs.size()).append(
730:                            ", missing=").append(non_received).append(')');
731:                }
732:                sb.append(']');
733:                return sb.toString();
734:            }
735:
736:            /* ------------------------------- Private Methods -------------------------------------- */
737:
738:            /**
739:             * Sets the value of lowest_seen to the lowest seqno of the delivered messages (if available), otherwise
740:             * to the lowest seqno of received messages.
741:             */
742:            private void updateLowestSeen() {
743:                Long lowest_seqno = null;
744:
745:                // If both delivered and received messages are empty, let the highest
746:                // seen seqno be the one *before* the one which is expected to be
747:                // received next by the NakReceiverWindow (head-1)
748:
749:                // incorrect: if received and delivered msgs are empty, don't do anything: we may have initial values,
750:                // but both lists are cleaned after some time of inactivity
751:                // (bela April 19 2004)
752:                /*
753:                if((delivered_msgs.size() == 0) && (msgs.size() == 0)) {
754:                    lowest_seen=0;
755:                    return;
756:                }
757:                 */
758:
759:                // The lowest seqno is the first seqno of the delivered messages
760:                if (delivered_msgs.size() > 0) {
761:                    try {
762:                        lowest_seqno = (Long) delivered_msgs.firstKey();
763:                        if (lowest_seqno != null)
764:                            lowest_seen = lowest_seqno.longValue();
765:                    } catch (NoSuchElementException ex) {
766:                    }
767:                }
768:                // If no elements in delivered messages (e.g. due to message garbage collection), use the received messages
769:                else {
770:                    if (received_msgs.size() > 0) {
771:                        try {
772:                            lowest_seqno = (Long) received_msgs.firstKey();
773:                            if (received_msgs.get(lowest_seqno) != null) { // only set lowest_seen if we *have* a msg
774:                                lowest_seen = lowest_seqno.longValue();
775:                            }
776:                        } catch (NoSuchElementException ex) {
777:                        }
778:                    }
779:                }
780:            }
781:
782:            /**
783:             * Find the highest seqno that is deliverable or was actually delivered.
784:             * Returns seqno-1 if there are no messages in the queues (the first
785:             * message to be expected is always seqno).
786:             */
787:            //    private void updateHighestSeen() {
788:            //        long      ret=0;
789:            //        Map.Entry entry=null;
790:            //
791:            //        // If both delivered and received messages are empty, let the highest
792:            //        // seen seqno be the one *before* the one which is expected to be
793:            //        // received next by the NakReceiverWindow (head-1)
794:            //
795:            //        // changed by bela (April 19 2004): we don't change the value if received and delivered msgs are empty
796:            //        /*if((delivered_msgs.size() == 0) && (msgs.size() == 0)) {
797:            //            highest_seen=0;
798:            //            return;
799:            //        }*/
800:            //
801:            //
802:            //        // The highest seqno is the last of the delivered messages, to start with,
803:            //        // or again the one before the first seqno expected (if no delivered
804:            //        // msgs). Then iterate through the received messages, and find the highest seqno *before* a gap
805:            //        Long highest_seqno=null;
806:            //        if(delivered_msgs.size() > 0) {
807:            //            try {
808:            //                highest_seqno=(Long)delivered_msgs.lastKey();
809:            //                ret=highest_seqno.longValue();
810:            //            }
811:            //            catch(NoSuchElementException ex) {
812:            //            }
813:            //        }
814:            //        else {
815:            //            ret=Math.max(head - 1, 0);
816:            //        }
817:            //
818:            //        // Now check the received msgs head to tail. if there is an entry
819:            //        // with a non-null msg, increment ret until we find an entry with
820:            //        // a null msg
821:            //        for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) {
822:            //            entry=(Map.Entry)it.next();
823:            //            if(entry.getValue() != null)
824:            //                ret=((Long)entry.getKey()).longValue();
825:            //            else
826:            //                break;
827:            //        }
828:            //        highest_seen=Math.max(ret, 0);
829:            //    }
830:
831:            /**
832:             * Reset the Nak window. Should be called from within a writeLock() context.
833:             * <p>
834:             * i. Delete all received entries<br>
835:             * ii. Delete alll delivered entries<br>
836:             * iii. Reset all indices (head, tail, etc.)<br>
837:             */
838:            private void _reset() {
839:                received_msgs.clear();
840:                delivered_msgs.clear();
841:                head = 0;
842:                tail = 0;
843:                lowest_seen = 0;
844:                highest_seen = 0;
845:            }
846:            /* --------------------------- End of Private Methods ----------------------------------- */
847:
848:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.