Source Code Cross Referenced for QuiescenceReportServiceProvider.java in  » Science » Cougaar12_4 » org » cougaar » core » node » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.core.node 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * <copyright>
003:         *  
004:         *  Copyright 1997-2004 BBNT Solutions, LLC
005:         *  under sponsorship of the Defense Advanced Research Projects
006:         *  Agency (DARPA).
007:         * 
008:         *  You can redistribute this software and/or modify it under the
009:         *  terms of the Cougaar Open Source License as published on the
010:         *  Cougaar Open Source Website (www.cougaar.org).
011:         * 
012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023:         *  
024:         * </copyright>
025:         */
026:
027:        package org.cougaar.core.node;
028:
029:        import java.util.Collection;
030:        import java.util.HashMap;
031:        import java.util.Iterator;
032:        import java.util.Map;
033:        import java.util.Set;
034:
035:        import org.cougaar.bootstrap.SystemProperties;
036:        import org.cougaar.core.agent.AgentContainer;
037:        import org.cougaar.core.component.ServiceBroker;
038:        import org.cougaar.core.component.ServiceProvider;
039:        import org.cougaar.core.logging.LoggingServiceWithPrefix;
040:        import org.cougaar.core.mts.MessageAddress;
041:        import org.cougaar.core.service.AgentIdentificationService;
042:        import org.cougaar.core.service.AgentQuiescenceStateService;
043:        import org.cougaar.core.service.EventService;
044:        import org.cougaar.core.service.QuiescenceReportForDistributorService;
045:        import org.cougaar.core.service.QuiescenceReportService;
046:        import org.cougaar.core.service.ThreadService;
047:        import org.cougaar.core.thread.Schedulable;
048:        import org.cougaar.util.FilteredIterator;
049:        import org.cougaar.util.Memo;
050:        import org.cougaar.util.UnaryPredicate;
051:        import org.cougaar.util.log.Logger;
052:        import org.cougaar.util.log.Logging;
053:
054:        /**
055:         * {@link ServiceProvider} for the {@link QuiescenceReportService}.
056:         * <p> 
057:         * The QRS collects quiescence information from the Agent Distributors
058:         * and other QRS clients in the Node. It also matches sent and
059:         * received messages between agents in the Node. When the collective
060:         * quiescence state of the Node changs, the QRS sends an Event indicating
061:         * this change.
062:         *
063:         * @property org.cougaar.core.node.quiescenceAnnounceDelay specifies the 
064:         * number of milliseconds that the Node waits when it thinks it has become 
065:         * quiescent to send an event announcing the fact. This suppresses 
066:         * possible toggling. Default is 20 seconds.
067:         */
068:        class QuiescenceReportServiceProvider implements  ServiceProvider {
069:            private Map quiescenceStates = new HashMap();
070:            private boolean isQuiescent = false;
071:
072:            // Predicate to get the quiscence states for agents that have enabled
073:            // their QRS and have not been marked as dead (duplicated elsewhere)
074:            private UnaryPredicate enabledQuiescenceStatePredicate = new UnaryPredicate() {
075:                public boolean execute(Object o) {
076:                    QuiescenceState qs = (QuiescenceState) o;
077:                    return (qs.isEnabled() && qs.isAlive());
078:                }
079:            };
080:            private Logger logger;
081:            private String nodeName;
082:            private AgentContainer agentContainer;
083:            private ServiceBroker sb;
084:            private QuiescenceAnnouncer quiescenceAnnouncer;
085:            private AgentQuiescenceStateService aqsService = null;
086:
087:            private static final long ANNOUNCEMENT_DELAY = SystemProperties
088:                    .getLong("org.cougaar.core.node.quiescenceAnnounceDelay",
089:                            20000);
090:
091:            //  private static final long ANNOUNCEMENT_DELAY = 30000L;
092:            private static final String EOL = " "; //SystemProperties.getProperty("line.separator");
093:
094:            QuiescenceReportServiceProvider(String nodeName,
095:                    AgentContainer agentContainer, ServiceBroker sb) {
096:                this .nodeName = nodeName;
097:                this .agentContainer = agentContainer;
098:                this .sb = sb;
099:                logger = new LoggingServiceWithPrefix(Logging
100:                        .getLogger(getClass()), nodeName + ": ");
101:                quiescenceAnnouncer = new QuiescenceAnnouncer();
102:            }
103:
104:            public Object getService(ServiceBroker xsb, Object requestor,
105:                    Class serviceClass) {
106:                if (serviceClass == QuiescenceReportService.class) {
107:                    if (requestor instanceof  MessageAddress) {
108:                        // special case, just for node-agent!
109:                        MessageAddress addr = (MessageAddress) requestor;
110:                        addr = addr.getPrimary(); // drop MessageAttributes
111:                        return new QuiescenceReportServiceImpl(addr);
112:                    } else {
113:                        String name = requestor.toString() + " hash: "
114:                                + requestor.hashCode();
115:                        return new QuiescenceReportServiceImpl(name);
116:                    }
117:                }
118:                if (serviceClass == QuiescenceReportForDistributorService.class) {
119:                    return new QuiescenceReportForDistributorServiceImpl(
120:                            requestor.toString());
121:                }
122:                if (serviceClass == AgentQuiescenceStateService.class) {
123:                    if (aqsService == null)
124:                        aqsService = new AgentQuiescenceStateServiceImpl();
125:                    return aqsService;
126:                }
127:                throw new IllegalArgumentException("Cannot provide "
128:                        + serviceClass.getName());
129:            }
130:
131:            public void releaseService(ServiceBroker xsb, Object requestor,
132:                    Class serviceClass, Object service) {
133:                if (service instanceof  QuiescenceReportService) {
134:                    QuiescenceReportService quiescenceReportService = (QuiescenceReportService) service;
135:                    quiescenceReportService.setQuiescentState();
136:                }
137:                // Nothing to do to release the aqsService
138:            }
139:
140:            protected void revokeService() {
141:                quiescenceAnnouncer.stopQuiescenceAnnouncer();
142:            }
143:
144:            private static void setMessageMap(MessageAddress me,
145:                    Map messageNumbers, Map newMap) {
146:                Map existingMap = (Map) messageNumbers.get(me);
147:                if (existingMap == null) {
148:                    existingMap = new HashMap(newMap);
149:                    messageNumbers.put(me, existingMap);
150:                } else {
151:                    existingMap.clear();
152:                    existingMap.putAll(newMap);
153:                }
154:            }
155:
156:            private synchronized QuiescenceState getQuiescenceState(
157:                    MessageAddress me) {
158:                QuiescenceState quiescenceState = (QuiescenceState) quiescenceStates
159:                        .get(me);
160:                if (quiescenceState == null) {
161:                    quiescenceState = new QuiescenceState(me, logger);
162:                    quiescenceStates.put(me, quiescenceState);
163:                }
164:                return quiescenceState;
165:            }
166:
167:            /** Just like getQuiescenceState, except does not create empty ones for misses */
168:            private synchronized QuiescenceState accessQuiescenceState(
169:                    MessageAddress me) {
170:                return (QuiescenceState) quiescenceStates.get(me);
171:            }
172:
173:            private Iterator getQuiescenceStatesIterator() {
174:                return new FilteredIterator(quiescenceStates.values()
175:                        .iterator(), enabledQuiescenceStatePredicate);
176:            }
177:
178:            private boolean isLocalAgent(MessageAddress otherAgent) {
179:                QuiescenceState otherState = (QuiescenceState) quiescenceStates
180:                        .get(otherAgent);
181:                if (otherState == null)
182:                    return false;
183:                // Also return false if the otherState.isEnabled() && otherState's agent isDead
184:                return (otherState.isEnabled() && otherState.isAlive());
185:            }
186:
187:            /**
188:             * Memoization of the quiescence state set.
189:             * Avoids bothering to make changes to 
190:             * unless there might be a difference.
191:             */
192:            private Memo _quiescenceStatesMemo = Memo.get(new Memo.Function() {
193:                public String toString() {
194:                    return "Memo<active quiescenceStates>";
195:                }
196:
197:                public Object eval(Object o) {
198:                    Set agentAddresses = (Set) o;
199:                    if (quiescenceStates.keySet().retainAll(agentAddresses)) {
200:                        // retainAll() == true means that the collection changed, so we'll return
201:                        return quiescenceStates;
202:                    } else {
203:                        // if we executed this, but it didn't change, we'll return null to 
204:                        // indicate the same end-result as if we hadn't executed at all!
205:                        return null;
206:                    }
207:                }
208:            });
209:
210:            /**
211:             * An agent has become quiescent. If all agents are quiescent and no
212:             * messages are outstanding, we announce our quiescence. Otherwise,
213:             * we cancel quiescence.
214:             */
215:            private void checkQuiescence() { // We might be quiescent...
216:                // If an agent moves out of this node, we need to clean out any
217:                // remembered message numbers.
218:                // FIXME: To be safe, should do getPrimary() on all the addresses returned
219:                // by the agentContainer, to strip off MessageAttributes
220:                _quiescenceStatesMemo.evalIfNew(agentContainer
221:                        .getAgentAddresses()); // returns non-null if it changed!
222:                // if the set of agents hasn't changed, can we skip any of the below?
223:
224:                // note that this expression short-circuits the expensive noMessagesAreOutstanding
225:                // call when !allAgentsAreQuiescent()
226:                if (allAgentsAreQuiescent() && noMessagesAreOutstanding()) {
227:                    announceQuiescence();
228:                    if (!isQuiescent && logger.isInfoEnabled()) {
229:                        logger.info("Is quiescent");
230:                    }
231:                    isQuiescent = true;
232:                } else {
233:                    if (isQuiescent) {
234:                        cancelQuiescence();
235:                    }
236:                    // else { still_quiescent(); }
237:                }
238:            }
239:
240:            private void cancelQuiescence() {
241:                if (isQuiescent) {
242:                    announceNonQuiescence();
243:                    isQuiescent = false;
244:                }
245:            }
246:
247:            private synchronized void setQuiescenceReportEnabled(
248:                    QuiescenceState quiescenceState, boolean enabled) {
249:                quiescenceState.setEnabled(enabled);
250:                if (logger.isInfoEnabled()) {
251:                    logger.info((enabled ? "Enabled " : "Disabled ")
252:                            + quiescenceState.getAgentName());
253:                }
254:                checkQuiescence();
255:            }
256:
257:            private synchronized void setMessageNumbers(
258:                    QuiescenceState quiescenceState, Map outgoing, Map incoming) {
259:                quiescenceState.setMessageNumbers(outgoing, incoming);
260:                checkQuiescence();
261:            }
262:
263:            private synchronized void setQuiescent(
264:                    QuiescenceState quiescenceState, boolean isAgentQuiescent,
265:                    String blocker) {
266:                quiescenceState.setQuiescent(isAgentQuiescent, blocker);
267:                if (isAgentQuiescent && quiescenceState.isEnabled()
268:                        && quiescenceState.isAlive()) {
269:                    checkQuiescence();
270:                } else if (quiescenceState.isEnabled()
271:                        && quiescenceState.isAlive()) {
272:                    // Only cancel quiescence if the state that announced it was not quiescent isEnabled
273:                    // This prevents early-loading plugins from toggling quiescence of the Node
274:                    cancelQuiescence();
275:                }
276:            }
277:
278:            private boolean allAgentsAreQuiescent() {
279:                boolean result = true;
280:                // Loop through each Agent whose quiescence we care about
281:                for (Iterator theseStates = getQuiescenceStatesIterator(); theseStates
282:                        .hasNext();) {
283:                    QuiescenceState this AgentState = (QuiescenceState) theseStates
284:                            .next();
285:                    if (!this AgentState.isQuiescent()) {
286:                        result = false;
287:                        if (logger.isDebugEnabled()) {
288:                            logger.debug(this AgentState.getAgentName()
289:                                    + " is not quiescent");
290:                        } else {
291:                            break;
292:                        }
293:                    } else {
294:                        if (logger.isDebugEnabled()) {
295:                            logger.debug(this AgentState.getAgentName()
296:                                    + " is quiescent");
297:                        }
298:                    }
299:                }
300:                return result;
301:            }
302:
303:            /** Check known QS to see if all locally-sent messages have been recieved */
304:            private boolean noMessagesAreOutstanding() {
305:                // Old version:  O(N^2)
306:                //   for each X in local agents {
307:                //    for each Y in local agents {
308:                //     if (X.sentTo(Y) != Y.receivedFrom(X)) return false;
309:                //   }} 
310:                //   return true;
311:                //
312:                // this is bad because:
313:                //  - major: VERY few agents talk to all (or even most) other local agents
314:                //  - minor: sentTo cannot be < receivedFrom (factor of 2)
315:                //
316:                // So now, we'll do:
317:                //   for each X in local agents {
318:                //    foreach Y in (X.sentToList) {
319:                //     if (Y.isLocal) {
320:                //      if (X.sentTo(Y) != Y.receivedFrom(X)) return false;
321:                //   }}}
322:                //   return true;
323:
324:                int local_agents = 0;
325:                int number_compares = 0;
326:                for (Iterator theseStates = getQuiescenceStatesIterator(); theseStates
327:                        .hasNext();) {
328:                    local_agents++;
329:                    QuiescenceState this AgentState = (QuiescenceState) theseStates
330:                            .next();
331:                    MessageAddress this Agent = this AgentState.getAgent();
332:                    for (Iterator theseNumbers = this AgentState
333:                            .getOutgoingEntrySet().iterator(); theseNumbers
334:                            .hasNext();) {
335:                        Map.Entry this Number = (Map.Entry) theseNumbers.next();
336:
337:                        MessageAddress thatAgent = (MessageAddress) this Number
338:                                .getKey();
339:                        QuiescenceState thatAgentState = accessQuiescenceState(thatAgent);
340:                        if (thatAgentState != null
341:                                && thatAgentState.isEnabled()
342:                                && thatAgentState.isAlive()) {
343:                            number_compares++;
344:                            Integer sentNumber = this AgentState
345:                                    .getOutgoingMessageNumber(thatAgent);
346:                            Integer rcvdNumber = thatAgentState
347:                                    .getIncomingMessageNumber(this Agent);
348:                            boolean match;
349:                            if (sentNumber == null) {
350:                                match = rcvdNumber == null;
351:                            } else {
352:                                match = sentNumber.equals(rcvdNumber);
353:                            }
354:                            if (!match) {
355:                                if (logger.isDebugEnabled()) {
356:                                    logger.debug("Quiescence prevented by "
357:                                            + this Agent + " sent " + sentNumber
358:                                            + ", but " + thatAgent + " rcvd "
359:                                            + rcvdNumber);
360:                                }
361:                                return false;
362:                            }
363:                        }
364:                    }
365:                }
366:                if (logger.isDebugEnabled()) {
367:                    logger
368:                            .debug("Quiescence message compare statistics:  locals="
369:                                    + local_agents
370:                                    + ", compares="
371:                                    + number_compares);
372:                }
373:                return true;
374:            }
375:
376:            private void appendMessageNumbers(StringBuffer ms, Map messages,
377:                    String listTag, String itemTag) {
378:                ms.append("  <").append(listTag).append(">").append(EOL);
379:                for (Iterator entries = messages.entrySet().iterator(); entries
380:                        .hasNext();) {
381:                    Map.Entry entry = (Map.Entry) entries.next();
382:                    MessageAddress otherAgent = (MessageAddress) entry.getKey();
383:                    if (isLocalAgent(otherAgent))
384:                        continue; // Exclude local agents
385:                    Integer msgnum = (Integer) entry.getValue(); // 
386:                    ms.append("   <").append(itemTag).append(" agent=\"")
387:                            .append(otherAgent).append("\" msgnum=\"").append(
388:                                    msgnum).append("\"/>").append(EOL);
389:                }
390:                ms.append("  </").append(listTag).append(">").append(EOL);
391:            }
392:
393:            private void announceQuiescence() {
394:                // Spit out the message numbers we sent to agents of other
395:                // nodes and the message numbers we received from agents
396:                // of other nodes.
397:                StringBuffer ms = new StringBuffer();
398:                ms.append("<node name=\"").append(nodeName).append(
399:                        "\" quiescent=\"true\">").append(EOL);
400:                // Loop over relevant agents to get message numbers
401:                for (Iterator states = getQuiescenceStatesIterator(); states
402:                        .hasNext();) {
403:                    QuiescenceState quiescenceState = (QuiescenceState) states
404:                            .next();
405:                    ms.append(" <agent name=\"").append(
406:                            quiescenceState.getAgentName()).append("\">")
407:                            .append(EOL);
408:                    appendMessageNumbers(ms, quiescenceState
409:                            .getOutgoingMessageNumbers(), "receivers", "dest");
410:                    appendMessageNumbers(ms, quiescenceState
411:                            .getIncomingMessageNumbers(), "senders", "src");
412:                    ms.append(" </agent>").append(EOL);
413:                }
414:                ms.append("</node>").append(EOL);
415:                quiescenceAnnouncer.announceQuiescence(ms.toString());
416:            }
417:
418:            private void announceNonQuiescence() {
419:                quiescenceAnnouncer.announceNonquiescence("<node name=\""
420:                        + nodeName + "\" quiescent=\"false\"/>" + EOL);
421:            }
422:
423:            private class QuiescenceAnnouncer implements  Runnable {
424:                private String pendingAnnouncement;
425:                private boolean lastAnnouncedQuiescence = true;
426:                private long announcementTime;
427:                private EventService eventService;
428:                private Object eventServiceLock = new Object();
429:                private Schedulable schedulable;
430:                private boolean isRunning;
431:
432:                public QuiescenceAnnouncer() {
433:                    super ();
434:                    isRunning = true;
435:                }
436:
437:                /**
438:                 * Stops the quiescence announcer.
439:                 * <p>
440:                 * 
441:                 * This method should be invoked when the quiescence report service is revoked.
442:                 */
443:                protected synchronized void stopQuiescenceAnnouncer() {
444:                    isRunning = false;
445:                    interrupt();
446:                }
447:
448:                public synchronized void interrupt() {
449:                    schedulable.cancel();
450:                }
451:
452:                public synchronized void announceNonquiescence(
453:                        String announcement) {
454:                    if (isRunning) {
455:                        // Cancel pending announcment if any
456:                        pendingAnnouncement = null;
457:                        if (lastAnnouncedQuiescence) {
458:                            event(announcement);
459:                            lastAnnouncedQuiescence = false;
460:                        }
461:                    }
462:                }
463:
464:                public synchronized void announceQuiescence(String announcement) {
465:                    if (isRunning) {
466:                        if (schedulable == null) {
467:                            // first time
468:                            ThreadService tsvc = (ThreadService) sb.getService(
469:                                    this , ThreadService.class, null);
470:                            schedulable = tsvc.getThread(this , this ,
471:                                    "Quiescence Announcer");
472:                            sb.releaseService(this , ThreadService.class, tsvc);
473:                        }
474:                        // Replace the pending announcement
475:                        pendingAnnouncement = announcement;
476:                        // and restart the timeout
477:                        announcementTime = System.currentTimeMillis()
478:                                + ANNOUNCEMENT_DELAY;
479:                        schedulable.start();
480:                    }
481:                }
482:
483:                private long getDelay() {
484:                    return announcementTime - System.currentTimeMillis();
485:                }
486:
487:                private void event(String msg) {
488:                    synchronized (eventServiceLock) {
489:                        if (eventService == null) {
490:                            eventService = (EventService) sb.getService(
491:                                    QuiescenceReportServiceProvider.this ,
492:                                    EventService.class, null);
493:                            if (eventService == null) {
494:                                logger.error("No EventService available for "
495:                                        + EOL + msg);
496:                            }
497:                        }
498:                    }
499:                    if (eventService != null)
500:                        eventService.event(msg);
501:                    else if (logger.isInfoEnabled())
502:                        logger.info(msg);
503:                }
504:
505:                public synchronized void run() {
506:                    long delay;
507:
508:                    if (isRunning) {
509:                        try {
510:                            if (pendingAnnouncement == null) {
511:                                // Nothing to announce. No-op.
512:                            } else if ((delay = getDelay()) > 0L) {
513:                                // run again later
514:                                schedulable.schedule(delay);
515:                            } else {
516:                                event(pendingAnnouncement);
517:                                pendingAnnouncement = null;
518:                                lastAnnouncedQuiescence = true;
519:                            }
520:                        } catch (Exception e) {
521:                            logger.error("QuiescenceAnnouncer", e);
522:                        }
523:                    }
524:                }
525:            }
526:
527:            private class QuiescenceReportForDistributorServiceImpl extends
528:                    QuiescenceReportServiceImpl implements 
529:                    QuiescenceReportForDistributorService {
530:                public QuiescenceReportForDistributorServiceImpl(
531:                        String requestorName) {
532:                    super (requestorName);
533:                }
534:
535:                public void setQuiescenceReportEnabled(boolean enabled) {
536:                    QuiescenceReportServiceProvider.this 
537:                            .setQuiescenceReportEnabled(quiescenceState,
538:                                    enabled);
539:                }
540:            }
541:
542:            // FIXME: Need a factory so can ensure unique requestorNames?
543:
544:            private class QuiescenceReportServiceImpl implements 
545:                    QuiescenceReportService {
546:                protected QuiescenceState quiescenceState = null;
547:
548:                // We haven't been counted as not quiescent yet
549:                // This tracks whether this particular client of the QRS says it is quiescent, where the Distributor
550:                // is just one
551:                private boolean isServiceImplQuiescent = true;
552:                private String requestorName;
553:
554:                // constructor used only by NodeAgent
555:                public QuiescenceReportServiceImpl(MessageAddress agent) {
556:                    quiescenceState = getQuiescenceState(agent); // Drop messageAttributes
557:                    this .requestorName = agent.toString();
558:                }
559:
560:                // requestorName is the toString on the service requestor object, plus uniqueID
561:                public QuiescenceReportServiceImpl(String requestorName) {
562:                    this .requestorName = requestorName;
563:                }
564:
565:                public void setAgentIdentificationService(
566:                        AgentIdentificationService ais) {
567:                    if (ais == null) {
568:                        quiescenceState = null;
569:                    } else {
570:                        quiescenceState = getQuiescenceState(ais
571:                                .getMessageAddress());
572:                    }
573:                }
574:
575:                /**
576:                 * Specify the maps of quiescence-relevant outgoing and incoming
577:                 * message numbers associated with a newly achieved state of
578:                 * quiescence. For efficiency, this method should be called before
579:                 * calling setQuiescentState().
580:                 * @param outgoingMessageNumbers a Map from agent MessageAddresses
581:                 * to Integers giving the number of the last message sent. The
582:                 * arguments must not be null.
583:                 * @param incomingMessageNumbers a Map from agent MessageAddresses
584:                 * to Integers giving the number of the last message received. The
585:                 * arguments must not be null.
586:                 */
587:                public void setMessageNumbers(Map outgoing, Map incoming) {
588:                    if (quiescenceState == null)
589:                        throw new RuntimeException(
590:                                "AgentIdentificationService has not be set");
591:                    QuiescenceReportServiceProvider.this .setMessageNumbers(
592:                            quiescenceState, outgoing, incoming);
593:                }
594:
595:                /**
596:                 * Specifies that, from this service instance point-of-view, the
597:                 * agent is quiescent.
598:                 */
599:                public void setQuiescentState() {
600:                    if (quiescenceState == null) {
601:                        throw new RuntimeException(
602:                                "AgentIdentificationService has not been set.");
603:                    }
604:
605:                    // RFE 3760: Remove this requestor from qState's list of blockers
606:                    if (isServiceImplQuiescent) {
607:                        // If this serviceImpl already thinks it is quiescent, no need to say so twice
608:                        if (logger.isDebugEnabled())
609:                            logger
610:                                    .debug("setQuiescentState for "
611:                                            + requestorName
612:                                            + " of "
613:                                            + quiescenceState.getAgentName()
614:                                            + ", but this ServiceImpl already quiescent.");
615:                    } else {
616:                        if (logger.isDebugEnabled()) {
617:                            logger.debug("setQuiescentState for "
618:                                    + requestorName + " of "
619:                                    + quiescenceState.getAgentName());
620:                        }
621:                        isServiceImplQuiescent = true;
622:                        QuiescenceReportServiceProvider.this .setQuiescent(
623:                                quiescenceState, true, requestorName);
624:                    }
625:                }
626:
627:                /**
628:                 * Specifies that this agent is no longer quiescent.
629:                 * That is, this client of the QRS (requestor) is saying that it
630:                 * has work to do, and therefore the agent cannot be quiescent.
631:                 */
632:                public void clearQuiescentState() {
633:                    if (quiescenceState == null) {
634:                        throw new RuntimeException(
635:                                "AgentIdentificationService has not been set.");
636:                    }
637:
638:                    // RFE 3760: Add this requestor to qState's list of blockers
639:                    // [Commented this out so can track all blockers of quiescence...]
640:                    if (!isServiceImplQuiescent) {
641:                        // FIXME: Without this return, 2 requestors with same name can conflict - second will make both non-q
642:                        // but also it only takes 1 to make both Q
643:                        // return;
644:
645:                        // If this serviceImpl already thinks it is blocking quiescence, no need to say so twice
646:                        if (logger.isDebugEnabled()) {
647:                            logger
648:                                    .debug("clearQuiescentState for "
649:                                            + requestorName
650:                                            + " of "
651:                                            + quiescenceState.getAgentName()
652:                                            + ", but this ServiceImpl already not quiescent.");
653:                        }
654:                    } else {
655:                        if (logger.isDebugEnabled()) {
656:                            logger.debug("clearQuiescentState for "
657:                                    + requestorName + " of "
658:                                    + quiescenceState.getAgentName());
659:                        }
660:                        isServiceImplQuiescent = false;
661:                        QuiescenceReportServiceProvider.this .setQuiescent(
662:                                quiescenceState, false, requestorName);
663:                    }
664:                }
665:            } // end of QuiescenceReportServiceImpl
666:
667:            private synchronized MessageAddress[] listQuiescenceStates() {
668:                // return a new collection from quiescenceStates.keySet();
669:                Collection states = quiescenceStates.keySet();
670:                return (MessageAddress[]) states
671:                        .toArray(new MessageAddress[states.size()]);
672:            }
673:
674:            // Put this in separate synch method so can ensure setDead is protected, and can call checkQuiescence
675:            private synchronized void setAgentDead(MessageAddress agentAddress) {
676:                QuiescenceState qState = accessQuiescenceState(agentAddress);
677:                if (qState == null)
678:                    return;
679:                qState.setDead();
680:                checkQuiescence();
681:            }
682:
683:            // Service to support servlet to mark agents as dead (and view all agent registered)
684:            private class AgentQuiescenceStateServiceImpl implements 
685:                    AgentQuiescenceStateService {
686:                /** Is the Node altogether quiescent */
687:                public boolean isNodeQuiescent() {
688:                    return QuiescenceReportServiceProvider.this .isQuiescent;
689:                }
690:
691:                /**
692:                 * List the local agents with quiescence states for the Node to consider
693:                 * @return an array of MessageAddresses registered with the Nodes QuiescenceReportService
694:                 */
695:                public MessageAddress[] listAgentsRegistered() {
696:                    return listQuiescenceStates();
697:                }
698:
699:                /**
700:                 * Is the named agent's quiescence service enabled (ie the Distributor is fully loaded)?
701:                 * @param agentAddress The agent to query
702:                 * @return true if the agent's quiescence service has been enabled and it counts towards Node quiescence, false otherwise or if it does not exist locally
703:                 */
704:                public boolean isAgentEnabled(MessageAddress agentAddress) {
705:                    if (agentAddress == null)
706:                        return false;
707:                    QuiescenceState qState = accessQuiescenceState(agentAddress);
708:                    if (qState == null)
709:                        return false;
710:                    return qState.isEnabled();
711:                }
712:
713:                /**
714:                 * Is the named agent quiescent?
715:                 * @param agentAddress The agent to query
716:                 * @return true if the Agent's Distributor is quiescent, false otherwise or if the agent does not exist locally
717:                 */
718:                public boolean isAgentQuiescent(MessageAddress agentAddress) {
719:                    if (agentAddress == null)
720:                        return false;
721:                    QuiescenceState qState = accessQuiescenceState(agentAddress);
722:                    if (qState == null)
723:                        return false;
724:                    return qState.isQuiescent();
725:                }
726:
727:                /**
728:                 * Is the named agent alive for quiescence purposes, or has it been
729:                 * marked as dead to be ignored?
730:                 * @param agentAddress The agent to query
731:                 * @return false if the agent is dead and should be ignored for local quiescence or does not exist locally
732:                 */
733:                public boolean isAgentAlive(MessageAddress agentAddress) {
734:                    if (agentAddress == null)
735:                        return false;
736:                    QuiescenceState qState = accessQuiescenceState(agentAddress);
737:                    if (qState == null)
738:                        return false;
739:                    return qState.isAlive();
740:                }
741:
742:                /**
743:                 * Mark the named agent as dead - it has been restarted elsewhere, and should
744:                 * be ignored locally for quiescence calculations.
745:                 * @param agentAddress The Agent to mark as dead
746:                 */
747:                public void setAgentDead(MessageAddress agentAddress) {
748:                    if (agentAddress == null)
749:                        return;
750:                    QuiescenceReportServiceProvider.this 
751:                            .setAgentDead(agentAddress);
752:                }
753:
754:                // Other options: list message numbers? 
755:
756:                // RFE 3760: List blockers of an agents quiescence (return a String)
757:                // FIXME: Synchronize something?
758:                public String getAgentQuiescenceBlockers(
759:                        MessageAddress agentAddress) {
760:                    if (agentAddress == null)
761:                        return "";
762:                    QuiescenceState qState = accessQuiescenceState(agentAddress);
763:                    if (qState == null)
764:                        return "";
765:                    return qState.getBlockersString();
766:                }
767:            }
768:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.