Source Code Cross Referenced for FLUSH.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » pbcast » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols.pbcast 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.jgroups.protocols.pbcast;
002:
003:        import java.io.DataInputStream;
004:        import java.io.DataOutputStream;
005:        import java.io.IOException;
006:        import java.io.ObjectInput;
007:        import java.io.ObjectOutput;
008:        import java.util.ArrayList;
009:        import java.util.Collection;
010:        import java.util.HashMap;
011:        import java.util.Iterator;
012:        import java.util.Map;
013:        import java.util.Properties;
014:        import java.util.Set;
015:        import java.util.TreeSet;
016:        import java.util.Vector;
017:
018:        import org.jgroups.Address;
019:        import org.jgroups.Event;
020:        import org.jgroups.Header;
021:        import org.jgroups.Message;
022:        import org.jgroups.TimeoutException;
023:        import org.jgroups.View;
024:        import org.jgroups.ViewId;
025:        import org.jgroups.stack.Protocol;
026:        import org.jgroups.util.Promise;
027:        import org.jgroups.util.Streamable;
028:        import org.jgroups.util.Util;
029:
030:        import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;
031:
032:        /**
033:         * Flush, as it name implies, forces group members to flush their pending messages 
034:         * while blocking them to send any additional messages. The process of flushing 
035:         * acquiesces the group so that state transfer or a join can be done. It is also 
036:         * called stop-the-world model as nobody will be able to send messages while a 
037:         * flush is in process.
038:         * 
039:         * <p>
040:         * Flush is needed for:
041:         * <p>
042:         * (1) 	State transfer. When a member requests state transfer, the coordinator 
043:         * 		tells everyone to stop sending messages and waits for everyone's ack. Then it asks
044:         *  	the application for its state and ships it back to the requester. After the 
045:         *  	requester has received and set the state successfully, the coordinator tells 
046:         *  	everyone to resume sending messages.
047:         * <p>  
048:         * (2) 	View changes (e.g.a join). Before installing a new view V2, flushing would 
049:         * 		ensure that all messages *sent* in the current view V1 are indeed *delivered* 
050:         * 		in V1, rather than in V2 (in all non-faulty members). This is essentially 
051:         * 		Virtual Synchrony.
052:         * 
053:         * 
054:         * 
055:         * @author Vladimir Blagojevic
056:         * @version $Id$
057:         * @since 2.4
058:         */
059:        public class FLUSH extends Protocol {
060:            public static final String NAME = "FLUSH";
061:
062:            // GuardedBy ("sharedLock")
063:            private View currentView;
064:
065:            // GuardedBy ("sharedLock")
066:            private Address localAddress;
067:
068:            /**
069:             * Group member that requested FLUSH.
070:             * For view intallations flush coordinator is the group coordinator
071:             * For state transfer flush coordinator is the state requesting member
072:             */
073:            // GuardedBy ("sharedLock")
074:            private Address flushCoordinator;
075:
076:            // GuardedBy ("sharedLock")
077:            private final Collection flushMembers;
078:
079:            // GuardedBy ("sharedLock")
080:            private final Set flushOkSet;
081:
082:            // GuardedBy ("sharedLock")
083:            private final Set flushCompletedSet;
084:
085:            // GuardedBy ("sharedLock")
086:            private final Set stopFlushOkSet;
087:
088:            // GuardedBy ("sharedLock")
089:            private final Set suspected;
090:
091:            private final Object sharedLock = new Object();
092:
093:            private final Object blockMutex = new Object();
094:
095:            /**
096:             * Indicates if FLUSH.down() is currently blocking threads
097:             * Condition predicate associated with blockMutex
098:             */
099:            //GuardedBy ("blockMutex")
100:            private boolean isBlockingFlushDown = true;
101:
102:            /**
103:             * Default timeout for a group member to be in <code>isBlockingFlushDown</code>
104:             */
105:            private long timeout = 8000;
106:
107:            /**
108:             * Default timeout started when <code>Event.BLOCK</code> is passed to
109:             * application. Response <code>Event.BLOCK_OK</code> should be received by
110:             * application within timeout.
111:             */
112:            private long block_timeout = 10000;
113:
114:            // GuardedBy ("sharedLock")
115:            private boolean receivedFirstView = false;
116:
117:            // GuardedBy ("sharedLock")
118:            private boolean receivedMoreThanOneView = false;
119:
120:            private long startFlushTime;
121:
122:            private long totalTimeInFlush;
123:
124:            private int numberOfFlushes;
125:
126:            private double averageFlushDuration;
127:
128:            private final Promise flush_promise = new Promise();
129:
130:            private final Promise blockok_promise = new Promise();
131:
132:            private final FlushPhase flushPhase = new FlushPhase();
133:
134:            /**
135:             * If true configures timeout in GMS and STATE_TRANFER using FLUSH timeout value
136:             */
137:            private boolean auto_flush_conf = true;
138:
139:            public FLUSH() {
140:                super ();
141:                currentView = new View(new ViewId(), new Vector());
142:                flushOkSet = new TreeSet();
143:                flushCompletedSet = new TreeSet();
144:                stopFlushOkSet = new TreeSet();
145:                flushMembers = new ArrayList();
146:                suspected = new TreeSet();
147:            }
148:
149:            public String getName() {
150:                return NAME;
151:            }
152:
153:            public boolean setProperties(Properties props) {
154:                super .setProperties(props);
155:                timeout = Util.parseLong(props, "timeout", timeout);
156:                block_timeout = Util.parseLong(props, "block_timeout",
157:                        block_timeout);
158:                auto_flush_conf = Util.parseBoolean(props, "auto_flush_conf",
159:                        auto_flush_conf);
160:
161:                if (props.size() > 0) {
162:                    log.error("the following properties are not recognized: "
163:                            + props);
164:                    return false;
165:                }
166:                return true;
167:            }
168:
169:            public void init() throws Exception {
170:                if (auto_flush_conf) {
171:                    Map map = new HashMap();
172:                    map.put("flush_timeout", new Long(timeout));
173:                    passUp(new Event(Event.CONFIG, map));
174:                    passDown(new Event(Event.CONFIG, map));
175:                }
176:            }
177:
178:            public void start() throws Exception {
179:                Map map = new HashMap();
180:                map.put("flush_supported", Boolean.TRUE);
181:                passUp(new Event(Event.CONFIG, map));
182:                passDown(new Event(Event.CONFIG, map));
183:
184:                synchronized (sharedLock) {
185:                    receivedFirstView = false;
186:                    receivedMoreThanOneView = false;
187:                }
188:                synchronized (blockMutex) {
189:                    isBlockingFlushDown = true;
190:                }
191:            }
192:
193:            public void stop() {
194:                synchronized (sharedLock) {
195:                    currentView = new View(new ViewId(), new Vector());
196:                    flushCompletedSet.clear();
197:                    flushOkSet.clear();
198:                    stopFlushOkSet.clear();
199:                    flushMembers.clear();
200:                    suspected.clear();
201:                    flushCoordinator = null;
202:                }
203:            }
204:
205:            /* -------------------JMX attributes and operations --------------------- */
206:
207:            public double getAverageFlushDuration() {
208:                return averageFlushDuration;
209:            }
210:
211:            public long getTotalTimeInFlush() {
212:                return totalTimeInFlush;
213:            }
214:
215:            public int getNumberOfFlushes() {
216:                return numberOfFlushes;
217:            }
218:
219:            public boolean startFlush(long timeout) {
220:                boolean successfulFlush = false;
221:                down(new Event(Event.SUSPEND));
222:                flush_promise.reset();
223:                try {
224:                    flush_promise.getResultWithTimeout(timeout);
225:                    successfulFlush = true;
226:                } catch (TimeoutException e) {
227:                }
228:                return successfulFlush;
229:            }
230:
231:            public void stopFlush() {
232:                down(new Event(Event.RESUME));
233:            }
234:
235:            /* ------------------- end JMX attributes and operations --------------------- */
236:
237:            public void down(Event evt) {
238:                switch (evt.getType()) {
239:                case Event.MSG:
240:                    Message msg = (Message) evt.getArg();
241:                    FlushHeader fh = (FlushHeader) msg.removeHeader(getName());
242:                    if (fh != null && fh.type == FlushHeader.FLUSH_BYPASS) {
243:                        break;
244:                    } else {
245:                        blockMessageDuringFlush();
246:                        break;
247:                    }
248:                case Event.GET_STATE:
249:                    blockMessageDuringFlush();
250:                    break;
251:
252:                case Event.CONNECT:
253:                    boolean successfulBlock = sendBlockUpToChannel(block_timeout);
254:                    if (successfulBlock && log.isDebugEnabled()) {
255:                        log.debug("Blocking of channel " + localAddress
256:                                + " completed successfully");
257:                    }
258:
259:                    break;
260:
261:                case Event.SUSPEND:
262:                    attemptSuspend(evt);
263:                    return;
264:
265:                case Event.RESUME:
266:                    onResume();
267:                    return;
268:
269:                case Event.BLOCK_OK:
270:                    blockok_promise.setResult(Boolean.TRUE);
271:                    return;
272:                }
273:                passDown(evt);
274:            }
275:
276:            private void blockMessageDuringFlush() {
277:                boolean shouldSuspendByItself = false;
278:                long start = 0, stop = 0;
279:                synchronized (blockMutex) {
280:                    while (isBlockingFlushDown) {
281:                        if (log.isDebugEnabled())
282:                            log.debug("FLUSH block at " + localAddress
283:                                    + " for "
284:                                    + (timeout <= 0 ? "ever" : timeout + "ms"));
285:                        try {
286:                            start = System.currentTimeMillis();
287:                            if (timeout <= 0)
288:                                blockMutex.wait();
289:                            else
290:                                blockMutex.wait(timeout);
291:                            stop = System.currentTimeMillis();
292:                        } catch (InterruptedException e) {
293:                        }
294:                        if (isBlockingFlushDown) {
295:                            isBlockingFlushDown = false;
296:                            shouldSuspendByItself = true;
297:                            blockMutex.notifyAll();
298:                        }
299:                    }
300:                }
301:                if (shouldSuspendByItself) {
302:                    log.warn("unblocking FLUSH.down() at " + localAddress
303:                            + " after timeout of " + (stop - start) + "ms");
304:                    passUp(new Event(Event.SUSPEND_OK));
305:                    passDown(new Event(Event.SUSPEND_OK));
306:                }
307:            }
308:
309:            public void up(Event evt) {
310:
311:                Message msg = null;
312:                switch (evt.getType()) {
313:                case Event.MSG:
314:                    msg = (Message) evt.getArg();
315:                    FlushHeader fh = (FlushHeader) msg.removeHeader(getName());
316:                    if (fh != null) {
317:                        flushPhase.lock();
318:                        if (fh.type == FlushHeader.START_FLUSH) {
319:                            if (!flushPhase.isFlushInProgress()) {
320:                                flushPhase.setFirstPhase(true);
321:                                flushPhase.release();
322:                                boolean successfulBlock = sendBlockUpToChannel(block_timeout);
323:                                if (successfulBlock && log.isDebugEnabled()) {
324:                                    log.debug("Blocking of channel "
325:                                            + localAddress
326:                                            + " completed successfully");
327:                                }
328:                                onStartFlush(msg.getSrc(), fh);
329:                            } else if (flushPhase.isInFirstPhase()) {
330:                                flushPhase.release();
331:                                Address flushRequester = msg.getSrc();
332:                                Address coordinator = null;
333:                                synchronized (sharedLock) {
334:                                    coordinator = flushCoordinator;
335:                                }
336:
337:                                if (flushRequester.compareTo(coordinator) < 0) {
338:                                    rejectFlush(fh.viewID, coordinator);
339:                                    if (log.isDebugEnabled()) {
340:                                        log
341:                                                .debug("Rejecting flush at "
342:                                                        + localAddress
343:                                                        + " to current flush coordinator "
344:                                                        + coordinator
345:                                                        + " and switching flush coordinator to "
346:                                                        + flushRequester);
347:                                    }
348:                                    synchronized (sharedLock) {
349:                                        flushCoordinator = flushRequester;
350:                                    }
351:                                } else {
352:                                    rejectFlush(fh.viewID, flushRequester);
353:                                    if (log.isDebugEnabled()) {
354:                                        log.debug("Rejecting flush at "
355:                                                + localAddress
356:                                                + " to flush requester "
357:                                                + flushRequester);
358:                                    }
359:                                }
360:                            } else if (flushPhase.isInSecondPhase()) {
361:                                flushPhase.release();
362:                                Address flushRequester = msg.getSrc();
363:                                rejectFlush(fh.viewID, flushRequester);
364:                                if (log.isDebugEnabled()) {
365:                                    log
366:                                            .debug("Rejecting flush in second phase at "
367:                                                    + localAddress
368:                                                    + " to flush requester "
369:                                                    + flushRequester);
370:                                }
371:                            }
372:                        } else if (fh.type == FlushHeader.STOP_FLUSH) {
373:                            flushPhase.setPhases(false, true);
374:                            flushPhase.release();
375:                            onStopFlush();
376:                        } else if (fh.type == FlushHeader.ABORT_FLUSH) {
377:                            //abort current flush  
378:                            flushPhase.release();
379:                            passUp(new Event(Event.SUSPEND_FAILED));
380:                            passDown(new Event(Event.SUSPEND_FAILED));
381:
382:                        } else if (isCurrentFlushMessage(fh)) {
383:                            flushPhase.release();
384:                            if (fh.type == FlushHeader.FLUSH_OK) {
385:                                onFlushOk(msg.getSrc(), fh.viewID);
386:                            } else if (fh.type == FlushHeader.STOP_FLUSH_OK) {
387:                                onStopFlushOk(msg.getSrc(), fh.viewID);
388:                            } else if (fh.type == FlushHeader.FLUSH_COMPLETED) {
389:                                onFlushCompleted(msg.getSrc());
390:                            }
391:                        } else {
392:                            flushPhase.release();
393:                            if (log.isDebugEnabled())
394:                                log.debug(localAddress
395:                                        + " received outdated FLUSH message "
396:                                        + fh + ",ignoring it.");
397:                        }
398:                        return; //do not pass FLUSH msg up
399:                    }
400:                    break;
401:
402:                case Event.VIEW_CHANGE:
403:                    //if this is channel's first view and its the only member of the group then the
404:                    //goal is to pass BLOCK,VIEW,UNBLOCK to application space on the same thread as VIEW.
405:                    View newView = (View) evt.getArg();
406:                    boolean firstView = onViewChange(newView);
407:                    boolean singletonMember = newView.size() == 1
408:                            && newView.containsMember(localAddress);
409:                    if (firstView && singletonMember) {
410:                        passUp(evt);
411:                        synchronized (blockMutex) {
412:                            isBlockingFlushDown = false;
413:                            blockMutex.notifyAll();
414:                        }
415:                        if (log.isDebugEnabled())
416:                            log
417:                                    .debug("At "
418:                                            + localAddress
419:                                            + " unblocking FLUSH.down() and sending UNBLOCK up");
420:
421:                        passUp(new Event(Event.UNBLOCK));
422:                        return;
423:                    }
424:                    break;
425:
426:                case Event.SET_LOCAL_ADDRESS:
427:                    synchronized (sharedLock) {
428:                        localAddress = (Address) evt.getArg();
429:                    }
430:                    break;
431:
432:                case Event.SUSPECT:
433:                    onSuspect((Address) evt.getArg());
434:                    break;
435:
436:                case Event.SUSPEND:
437:                    attemptSuspend(evt);
438:                    return;
439:
440:                case Event.RESUME:
441:                    onResume();
442:                    return;
443:
444:                }
445:
446:                passUp(evt);
447:            }
448:
449:            public Vector providedDownServices() {
450:                Vector retval = new Vector(2);
451:                retval.addElement(new Integer(Event.SUSPEND));
452:                retval.addElement(new Integer(Event.RESUME));
453:                return retval;
454:            }
455:
456:            private void attemptSuspend(Event evt) {
457:                View v = (View) evt.getArg();
458:                if (log.isDebugEnabled())
459:                    log.debug("Received SUSPEND at " + localAddress
460:                            + ", view is " + v);
461:
462:                flushPhase.lock();
463:                if (!flushPhase.isFlushInProgress()) {
464:                    flushPhase.release();
465:                    onSuspend(v);
466:                } else {
467:                    flushPhase.release();
468:                    passUp(new Event(Event.SUSPEND_FAILED));
469:                    passDown(new Event(Event.SUSPEND_FAILED));
470:                }
471:            }
472:
473:            private void rejectFlush(long viewId, Address flushRequester) {
474:                Message reject = new Message(flushRequester);
475:                reject.putHeader(getName(), new FlushHeader(
476:                        FlushHeader.ABORT_FLUSH, viewId));
477:                passDown(new Event(Event.MSG, reject));
478:            }
479:
480:            private boolean sendBlockUpToChannel(long btimeout) {
481:                boolean successfulBlock = false;
482:                blockok_promise.reset();
483:
484:                new Thread(Util.getGlobalThreadGroup(), new Runnable() {
485:                    public void run() {
486:                        passUp(new Event(Event.BLOCK));
487:                    }
488:                }, "FLUSH block").start();
489:
490:                try {
491:                    blockok_promise.getResultWithTimeout(btimeout);
492:                    successfulBlock = true;
493:                } catch (TimeoutException e) {
494:                    log
495:                            .warn("Blocking of channel using BLOCK event timed out after "
496:                                    + btimeout + " msec.");
497:                }
498:                return successfulBlock;
499:            }
500:
501:            private boolean isCurrentFlushMessage(FlushHeader fh) {
502:                return fh.viewID == currentViewId();
503:            }
504:
505:            private long currentViewId() {
506:                long viewId = -1;
507:                synchronized (sharedLock) {
508:                    ViewId view = currentView.getVid();
509:                    if (view != null) {
510:                        viewId = view.getId();
511:                    }
512:                }
513:                return viewId;
514:            }
515:
516:            private boolean onViewChange(View view) {
517:                boolean amINewCoordinator = false;
518:                boolean isThisOurFirstView = false;
519:                synchronized (sharedLock) {
520:                    if (receivedFirstView) {
521:                        receivedMoreThanOneView = true;
522:                    }
523:                    if (!receivedFirstView) {
524:                        receivedFirstView = true;
525:                    }
526:                    isThisOurFirstView = receivedFirstView
527:                            && !receivedMoreThanOneView;
528:                    suspected.retainAll(view.getMembers());
529:                    currentView = view;
530:                    amINewCoordinator = flushCoordinator != null
531:                            && !view.getMembers().contains(flushCoordinator)
532:                            && localAddress.equals(view.getMembers().get(0));
533:                }
534:
535:                //If coordinator leaves, its STOP FLUSH message will be discarded by
536:                //other members at NAKACK layer. Remaining members will be hung, waiting
537:                //for STOP_FLUSH message. If I am new coordinator I will complete the
538:                //FLUSH and send STOP_FLUSH on flush callers behalf.
539:                if (amINewCoordinator) {
540:                    if (log.isDebugEnabled())
541:                        log.debug("Coordinator left, " + localAddress
542:                                + " will complete flush");
543:                    onResume();
544:                }
545:
546:                if (log.isDebugEnabled())
547:                    log.debug("Installing view at  " + localAddress
548:                            + " view is " + view);
549:
550:                return isThisOurFirstView;
551:            }
552:
553:            private void onStopFlush() {
554:                if (stats) {
555:                    long stopFlushTime = System.currentTimeMillis();
556:                    totalTimeInFlush += (stopFlushTime - startFlushTime);
557:                    if (numberOfFlushes > 0) {
558:                        averageFlushDuration = totalTimeInFlush
559:                                / (double) numberOfFlushes;
560:                    }
561:                }
562:                //ack this STOP_FLUSH
563:                Message msg = new Message(null);
564:                msg.putHeader(getName(), new FlushHeader(
565:                        FlushHeader.STOP_FLUSH_OK, currentViewId()));
566:                passDown(new Event(Event.MSG, msg));
567:
568:                if (log.isDebugEnabled())
569:                    log
570:                            .debug("Received STOP_FLUSH and sent STOP_FLUSH_OK from "
571:                                    + localAddress);
572:            }
573:
574:            private void onSuspend(View view) {
575:                Message msg = null;
576:                Collection participantsInFlush = null;
577:                synchronized (sharedLock) {
578:                    //start FLUSH only on group members that we need to flush
579:                    if (view != null) {
580:                        participantsInFlush = new ArrayList(view.getMembers());
581:                        participantsInFlush.retainAll(currentView.getMembers());
582:                    } else {
583:                        participantsInFlush = new ArrayList(currentView
584:                                .getMembers());
585:                    }
586:                    msg = new Message(null);
587:                    msg.putHeader(getName(), new FlushHeader(
588:                            FlushHeader.START_FLUSH, currentViewId(),
589:                            participantsInFlush));
590:                }
591:                if (participantsInFlush.isEmpty()) {
592:                    passUp(new Event(Event.SUSPEND_OK));
593:                    passDown(new Event(Event.SUSPEND_OK));
594:                } else {
595:                    passDown(new Event(Event.MSG, msg));
596:                    if (log.isDebugEnabled())
597:                        log.debug("Received SUSPEND at " + localAddress
598:                                + ", sent START_FLUSH to "
599:                                + participantsInFlush);
600:                }
601:            }
602:
603:            private void onResume() {
604:                long viewID = currentViewId();
605:                Message msg = new Message(null);
606:                msg.putHeader(getName(), new FlushHeader(
607:                        FlushHeader.STOP_FLUSH, viewID));
608:                passDown(new Event(Event.MSG, msg));
609:                if (log.isDebugEnabled())
610:                    log.debug("Received RESUME at " + localAddress
611:                            + ", sent STOP_FLUSH to all");
612:            }
613:
614:            private void onStartFlush(Address flushStarter, FlushHeader fh) {
615:                if (stats) {
616:                    startFlushTime = System.currentTimeMillis();
617:                    numberOfFlushes += 1;
618:                }
619:                synchronized (sharedLock) {
620:                    flushCoordinator = flushStarter;
621:                    flushMembers.clear();
622:                    if (fh.flushParticipants != null) {
623:                        flushMembers.addAll(fh.flushParticipants);
624:                    }
625:                    flushMembers.removeAll(suspected);
626:                }
627:                Message msg = new Message(null);
628:                msg.putHeader(getName(), new FlushHeader(FlushHeader.FLUSH_OK,
629:                        fh.viewID));
630:                passDown(new Event(Event.MSG, msg));
631:                if (log.isDebugEnabled())
632:                    log.debug("Received START_FLUSH at " + localAddress
633:                            + " responded with FLUSH_OK");
634:            }
635:
636:            private void onFlushOk(Address address, long viewID) {
637:
638:                boolean flushOkCompleted = false;
639:                Message m = null;
640:                synchronized (sharedLock) {
641:                    flushOkSet.add(address);
642:                    flushOkCompleted = flushOkSet.containsAll(flushMembers);
643:                    if (flushOkCompleted) {
644:                        m = new Message(flushCoordinator);
645:                    }
646:
647:                    if (log.isDebugEnabled())
648:                        log.debug("At " + localAddress + " FLUSH_OK from "
649:                                + address + ",completed " + flushOkCompleted
650:                                + ",  flushOkSet " + flushOkSet.toString());
651:                }
652:
653:                if (flushOkCompleted) {
654:                    synchronized (blockMutex) {
655:                        isBlockingFlushDown = true;
656:                    }
657:                    m.putHeader(getName(), new FlushHeader(
658:                            FlushHeader.FLUSH_COMPLETED, viewID));
659:                    passDown(new Event(Event.MSG, m));
660:                    if (log.isDebugEnabled())
661:                        log
662:                                .debug(localAddress
663:                                        + " is blocking FLUSH.down(). Sent FLUSH_COMPLETED message to "
664:                                        + flushCoordinator);
665:                }
666:            }
667:
668:            private void onStopFlushOk(Address address, long viewID) {
669:
670:                boolean stopFlushOkCompleted = false;
671:                synchronized (sharedLock) {
672:                    stopFlushOkSet.add(address);
673:                    TreeSet membersCopy = new TreeSet(currentView.getMembers());
674:                    membersCopy.removeAll(suspected);
675:                    stopFlushOkCompleted = stopFlushOkSet
676:                            .containsAll(membersCopy);
677:
678:                    if (log.isDebugEnabled())
679:                        log.debug("At " + localAddress + " STOP_FLUSH_OK from "
680:                                + address + ",completed "
681:                                + stopFlushOkCompleted + ",  stopFlushOkSet "
682:                                + stopFlushOkSet.toString());
683:                }
684:
685:                if (stopFlushOkCompleted) {
686:                    synchronized (sharedLock) {
687:                        flushCompletedSet.clear();
688:                        flushOkSet.clear();
689:                        stopFlushOkSet.clear();
690:                        flushMembers.clear();
691:                        suspected.clear();
692:                        flushCoordinator = null;
693:                    }
694:                    flushPhase.lock();
695:                    flushPhase.setSecondPhase(false);
696:                    flushPhase.release();
697:
698:                    if (log.isDebugEnabled())
699:                        log
700:                                .debug("At "
701:                                        + localAddress
702:                                        + " unblocking FLUSH.down() and sending UNBLOCK up");
703:
704:                    synchronized (blockMutex) {
705:                        isBlockingFlushDown = false;
706:                        blockMutex.notifyAll();
707:                    }
708:                    passUp(new Event(Event.UNBLOCK));
709:                }
710:            }
711:
712:            private void onFlushCompleted(Address address) {
713:                boolean flushCompleted = false;
714:                synchronized (sharedLock) {
715:                    flushCompletedSet.add(address);
716:                    flushCompleted = flushCompletedSet
717:                            .containsAll(flushMembers);
718:
719:                    if (log.isDebugEnabled())
720:                        log.debug("At " + localAddress
721:                                + " FLUSH_COMPLETED from " + address
722:                                + ",completed " + flushCompleted
723:                                + ",flushCompleted "
724:                                + flushCompletedSet.toString());
725:                }
726:
727:                if (flushCompleted) {
728:                    //needed for jmx operation startFlush(timeout);
729:                    flush_promise.setResult(Boolean.TRUE);
730:                    passUp(new Event(Event.SUSPEND_OK));
731:                    passDown(new Event(Event.SUSPEND_OK));
732:                    if (log.isDebugEnabled())
733:                        log.debug("All FLUSH_COMPLETED received at "
734:                                + localAddress + " sent SUSPEND_OK down/up");
735:                }
736:            }
737:
738:            private void onSuspect(Address address) {
739:                boolean flushOkCompleted = false;
740:                Message m = null;
741:                long viewID = 0;
742:                synchronized (sharedLock) {
743:                    suspected.add(address);
744:                    flushMembers.removeAll(suspected);
745:                    viewID = currentViewId();
746:                    flushOkCompleted = !flushOkSet.isEmpty()
747:                            && flushOkSet.containsAll(flushMembers);
748:                    if (flushOkCompleted) {
749:                        m = new Message(flushCoordinator);
750:                    }
751:
752:                    if (log.isDebugEnabled())
753:                        log.debug("Suspect is " + address + ",completed "
754:                                + flushOkCompleted + ",  flushOkSet "
755:                                + flushOkSet + " flushMembers " + flushMembers);
756:                }
757:                if (flushOkCompleted) {
758:                    m.putHeader(getName(), new FlushHeader(
759:                            FlushHeader.FLUSH_COMPLETED, viewID));
760:                    passDown(new Event(Event.MSG, m));
761:                    if (log.isDebugEnabled())
762:                        log.debug(localAddress
763:                                + " sent FLUSH_COMPLETED message to "
764:                                + flushCoordinator);
765:                }
766:            }
767:
768:            private static class FlushPhase {
769:                private boolean inFirstFlushPhase = false;
770:                private boolean inSecondFlushPhase = false;
771:                private final ReentrantLock lock = new ReentrantLock();
772:
773:                public FlushPhase() {
774:                }
775:
776:                public void lock() {
777:                    try {
778:                        lock.acquire();
779:                    } catch (InterruptedException e) {
780:                        e.printStackTrace();
781:                    }
782:                }
783:
784:                public void release() {
785:                    lock.release();
786:                }
787:
788:                public void setFirstPhase(boolean inFirstPhase) {
789:                    inFirstFlushPhase = inFirstPhase;
790:                }
791:
792:                public void setSecondPhase(boolean inSecondPhase) {
793:                    inSecondFlushPhase = inSecondPhase;
794:                }
795:
796:                public void setPhases(boolean inFirstPhase,
797:                        boolean inSecondPhase) {
798:                    inFirstFlushPhase = inFirstPhase;
799:                    inSecondFlushPhase = inSecondPhase;
800:                }
801:
802:                public boolean isInFirstPhase() {
803:                    return inFirstFlushPhase;
804:                }
805:
806:                public boolean isInSecondPhase() {
807:                    return inSecondFlushPhase;
808:                }
809:
810:                public boolean isFlushInProgress() {
811:                    return inFirstFlushPhase || inSecondFlushPhase;
812:                }
813:            }
814:
815:            public static class FlushHeader extends Header implements 
816:                    Streamable {
817:                public static final byte START_FLUSH = 0;
818:
819:                public static final byte FLUSH_OK = 1;
820:
821:                public static final byte STOP_FLUSH = 2;
822:
823:                public static final byte FLUSH_COMPLETED = 3;
824:
825:                public static final byte STOP_FLUSH_OK = 4;
826:
827:                public static final byte ABORT_FLUSH = 5;
828:
829:                public static final byte FLUSH_BYPASS = 6;
830:
831:                byte type;
832:
833:                long viewID;
834:
835:                Collection flushParticipants;
836:
837:                public FlushHeader() {
838:                    this (START_FLUSH, 0);
839:                } // used for externalization
840:
841:                public FlushHeader(byte type) {
842:                    this (type, 0);
843:                }
844:
845:                public FlushHeader(byte type, long viewID) {
846:                    this (type, viewID, null);
847:                }
848:
849:                public FlushHeader(byte type, long viewID, Collection flushView) {
850:                    this .type = type;
851:                    this .viewID = viewID;
852:                    this .flushParticipants = flushView;
853:                }
854:
855:                public String toString() {
856:                    switch (type) {
857:                    case START_FLUSH:
858:                        return "FLUSH[type=START_FLUSH,viewId=" + viewID
859:                                + ",members=" + flushParticipants + "]";
860:                    case FLUSH_OK:
861:                        return "FLUSH[type=FLUSH_OK,viewId=" + viewID + "]";
862:                    case STOP_FLUSH:
863:                        return "FLUSH[type=STOP_FLUSH,viewId=" + viewID + "]";
864:                    case STOP_FLUSH_OK:
865:                        return "FLUSH[type=STOP_FLUSH_OK,viewId=" + viewID
866:                                + "]";
867:                    case ABORT_FLUSH:
868:                        return "FLUSH[type=ABORT_FLUSH,viewId=" + viewID + "]";
869:                    case FLUSH_COMPLETED:
870:                        return "FLUSH[type=FLUSH_COMPLETED,viewId=" + viewID
871:                                + "]";
872:                    case FLUSH_BYPASS:
873:                        return "FLUSH[type=FLUSH_BYPASS,viewId=" + viewID + "]";
874:                    default:
875:                        return "[FLUSH: unknown type (" + type + ")]";
876:                    }
877:                }
878:
879:                public void writeExternal(ObjectOutput out) throws IOException {
880:                    out.writeByte(type);
881:                    out.writeLong(viewID);
882:                    out.writeObject(flushParticipants);
883:                }
884:
885:                public void readExternal(ObjectInput in) throws IOException,
886:                        ClassNotFoundException {
887:                    type = in.readByte();
888:                    viewID = in.readLong();
889:                    flushParticipants = (Collection) in.readObject();
890:                }
891:
892:                public void writeTo(DataOutputStream out) throws IOException {
893:                    out.writeByte(type);
894:                    out.writeLong(viewID);
895:                    if (flushParticipants != null
896:                            && !flushParticipants.isEmpty()) {
897:                        out.writeShort(flushParticipants.size());
898:                        for (Iterator iter = flushParticipants.iterator(); iter
899:                                .hasNext();) {
900:                            Address address = (Address) iter.next();
901:                            Util.writeAddress(address, out);
902:                        }
903:                    } else {
904:                        out.writeShort(0);
905:                    }
906:                }
907:
908:                public void readFrom(DataInputStream in) throws IOException,
909:                        IllegalAccessException, InstantiationException {
910:                    type = in.readByte();
911:                    viewID = in.readLong();
912:                    int flushParticipantsSize = in.readShort();
913:                    if (flushParticipantsSize > 0) {
914:                        flushParticipants = new ArrayList(flushParticipantsSize);
915:                        for (int i = 0; i < flushParticipantsSize; i++) {
916:                            flushParticipants.add(Util.readAddress(in));
917:                        }
918:                    }
919:                }
920:            }
921:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.