001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.node;
028:
029: import java.util.Collection;
030: import java.util.HashMap;
031: import java.util.Iterator;
032: import java.util.Map;
033: import java.util.Set;
034:
035: import org.cougaar.bootstrap.SystemProperties;
036: import org.cougaar.core.agent.AgentContainer;
037: import org.cougaar.core.component.ServiceBroker;
038: import org.cougaar.core.component.ServiceProvider;
039: import org.cougaar.core.logging.LoggingServiceWithPrefix;
040: import org.cougaar.core.mts.MessageAddress;
041: import org.cougaar.core.service.AgentIdentificationService;
042: import org.cougaar.core.service.AgentQuiescenceStateService;
043: import org.cougaar.core.service.EventService;
044: import org.cougaar.core.service.QuiescenceReportForDistributorService;
045: import org.cougaar.core.service.QuiescenceReportService;
046: import org.cougaar.core.service.ThreadService;
047: import org.cougaar.core.thread.Schedulable;
048: import org.cougaar.util.FilteredIterator;
049: import org.cougaar.util.Memo;
050: import org.cougaar.util.UnaryPredicate;
051: import org.cougaar.util.log.Logger;
052: import org.cougaar.util.log.Logging;
053:
054: /**
055: * {@link ServiceProvider} for the {@link QuiescenceReportService}.
056: * <p>
057: * The QRS collects quiescence information from the Agent Distributors
058: * and other QRS clients in the Node. It also matches sent and
059: * received messages between agents in the Node. When the collective
060: * quiescence state of the Node changs, the QRS sends an Event indicating
061: * this change.
062: *
063: * @property org.cougaar.core.node.quiescenceAnnounceDelay specifies the
064: * number of milliseconds that the Node waits when it thinks it has become
065: * quiescent to send an event announcing the fact. This suppresses
066: * possible toggling. Default is 20 seconds.
067: */
068: class QuiescenceReportServiceProvider implements ServiceProvider {
069: private Map quiescenceStates = new HashMap();
070: private boolean isQuiescent = false;
071:
072: // Predicate to get the quiscence states for agents that have enabled
073: // their QRS and have not been marked as dead (duplicated elsewhere)
074: private UnaryPredicate enabledQuiescenceStatePredicate = new UnaryPredicate() {
075: public boolean execute(Object o) {
076: QuiescenceState qs = (QuiescenceState) o;
077: return (qs.isEnabled() && qs.isAlive());
078: }
079: };
080: private Logger logger;
081: private String nodeName;
082: private AgentContainer agentContainer;
083: private ServiceBroker sb;
084: private QuiescenceAnnouncer quiescenceAnnouncer;
085: private AgentQuiescenceStateService aqsService = null;
086:
087: private static final long ANNOUNCEMENT_DELAY = SystemProperties
088: .getLong("org.cougaar.core.node.quiescenceAnnounceDelay",
089: 20000);
090:
091: // private static final long ANNOUNCEMENT_DELAY = 30000L;
092: private static final String EOL = " "; //SystemProperties.getProperty("line.separator");
093:
094: QuiescenceReportServiceProvider(String nodeName,
095: AgentContainer agentContainer, ServiceBroker sb) {
096: this .nodeName = nodeName;
097: this .agentContainer = agentContainer;
098: this .sb = sb;
099: logger = new LoggingServiceWithPrefix(Logging
100: .getLogger(getClass()), nodeName + ": ");
101: quiescenceAnnouncer = new QuiescenceAnnouncer();
102: }
103:
104: public Object getService(ServiceBroker xsb, Object requestor,
105: Class serviceClass) {
106: if (serviceClass == QuiescenceReportService.class) {
107: if (requestor instanceof MessageAddress) {
108: // special case, just for node-agent!
109: MessageAddress addr = (MessageAddress) requestor;
110: addr = addr.getPrimary(); // drop MessageAttributes
111: return new QuiescenceReportServiceImpl(addr);
112: } else {
113: String name = requestor.toString() + " hash: "
114: + requestor.hashCode();
115: return new QuiescenceReportServiceImpl(name);
116: }
117: }
118: if (serviceClass == QuiescenceReportForDistributorService.class) {
119: return new QuiescenceReportForDistributorServiceImpl(
120: requestor.toString());
121: }
122: if (serviceClass == AgentQuiescenceStateService.class) {
123: if (aqsService == null)
124: aqsService = new AgentQuiescenceStateServiceImpl();
125: return aqsService;
126: }
127: throw new IllegalArgumentException("Cannot provide "
128: + serviceClass.getName());
129: }
130:
131: public void releaseService(ServiceBroker xsb, Object requestor,
132: Class serviceClass, Object service) {
133: if (service instanceof QuiescenceReportService) {
134: QuiescenceReportService quiescenceReportService = (QuiescenceReportService) service;
135: quiescenceReportService.setQuiescentState();
136: }
137: // Nothing to do to release the aqsService
138: }
139:
140: protected void revokeService() {
141: quiescenceAnnouncer.stopQuiescenceAnnouncer();
142: }
143:
144: private static void setMessageMap(MessageAddress me,
145: Map messageNumbers, Map newMap) {
146: Map existingMap = (Map) messageNumbers.get(me);
147: if (existingMap == null) {
148: existingMap = new HashMap(newMap);
149: messageNumbers.put(me, existingMap);
150: } else {
151: existingMap.clear();
152: existingMap.putAll(newMap);
153: }
154: }
155:
156: private synchronized QuiescenceState getQuiescenceState(
157: MessageAddress me) {
158: QuiescenceState quiescenceState = (QuiescenceState) quiescenceStates
159: .get(me);
160: if (quiescenceState == null) {
161: quiescenceState = new QuiescenceState(me, logger);
162: quiescenceStates.put(me, quiescenceState);
163: }
164: return quiescenceState;
165: }
166:
167: /** Just like getQuiescenceState, except does not create empty ones for misses */
168: private synchronized QuiescenceState accessQuiescenceState(
169: MessageAddress me) {
170: return (QuiescenceState) quiescenceStates.get(me);
171: }
172:
173: private Iterator getQuiescenceStatesIterator() {
174: return new FilteredIterator(quiescenceStates.values()
175: .iterator(), enabledQuiescenceStatePredicate);
176: }
177:
178: private boolean isLocalAgent(MessageAddress otherAgent) {
179: QuiescenceState otherState = (QuiescenceState) quiescenceStates
180: .get(otherAgent);
181: if (otherState == null)
182: return false;
183: // Also return false if the otherState.isEnabled() && otherState's agent isDead
184: return (otherState.isEnabled() && otherState.isAlive());
185: }
186:
187: /**
188: * Memoization of the quiescence state set.
189: * Avoids bothering to make changes to
190: * unless there might be a difference.
191: */
192: private Memo _quiescenceStatesMemo = Memo.get(new Memo.Function() {
193: public String toString() {
194: return "Memo<active quiescenceStates>";
195: }
196:
197: public Object eval(Object o) {
198: Set agentAddresses = (Set) o;
199: if (quiescenceStates.keySet().retainAll(agentAddresses)) {
200: // retainAll() == true means that the collection changed, so we'll return
201: return quiescenceStates;
202: } else {
203: // if we executed this, but it didn't change, we'll return null to
204: // indicate the same end-result as if we hadn't executed at all!
205: return null;
206: }
207: }
208: });
209:
210: /**
211: * An agent has become quiescent. If all agents are quiescent and no
212: * messages are outstanding, we announce our quiescence. Otherwise,
213: * we cancel quiescence.
214: */
215: private void checkQuiescence() { // We might be quiescent...
216: // If an agent moves out of this node, we need to clean out any
217: // remembered message numbers.
218: // FIXME: To be safe, should do getPrimary() on all the addresses returned
219: // by the agentContainer, to strip off MessageAttributes
220: _quiescenceStatesMemo.evalIfNew(agentContainer
221: .getAgentAddresses()); // returns non-null if it changed!
222: // if the set of agents hasn't changed, can we skip any of the below?
223:
224: // note that this expression short-circuits the expensive noMessagesAreOutstanding
225: // call when !allAgentsAreQuiescent()
226: if (allAgentsAreQuiescent() && noMessagesAreOutstanding()) {
227: announceQuiescence();
228: if (!isQuiescent && logger.isInfoEnabled()) {
229: logger.info("Is quiescent");
230: }
231: isQuiescent = true;
232: } else {
233: if (isQuiescent) {
234: cancelQuiescence();
235: }
236: // else { still_quiescent(); }
237: }
238: }
239:
240: private void cancelQuiescence() {
241: if (isQuiescent) {
242: announceNonQuiescence();
243: isQuiescent = false;
244: }
245: }
246:
247: private synchronized void setQuiescenceReportEnabled(
248: QuiescenceState quiescenceState, boolean enabled) {
249: quiescenceState.setEnabled(enabled);
250: if (logger.isInfoEnabled()) {
251: logger.info((enabled ? "Enabled " : "Disabled ")
252: + quiescenceState.getAgentName());
253: }
254: checkQuiescence();
255: }
256:
257: private synchronized void setMessageNumbers(
258: QuiescenceState quiescenceState, Map outgoing, Map incoming) {
259: quiescenceState.setMessageNumbers(outgoing, incoming);
260: checkQuiescence();
261: }
262:
263: private synchronized void setQuiescent(
264: QuiescenceState quiescenceState, boolean isAgentQuiescent,
265: String blocker) {
266: quiescenceState.setQuiescent(isAgentQuiescent, blocker);
267: if (isAgentQuiescent && quiescenceState.isEnabled()
268: && quiescenceState.isAlive()) {
269: checkQuiescence();
270: } else if (quiescenceState.isEnabled()
271: && quiescenceState.isAlive()) {
272: // Only cancel quiescence if the state that announced it was not quiescent isEnabled
273: // This prevents early-loading plugins from toggling quiescence of the Node
274: cancelQuiescence();
275: }
276: }
277:
278: private boolean allAgentsAreQuiescent() {
279: boolean result = true;
280: // Loop through each Agent whose quiescence we care about
281: for (Iterator theseStates = getQuiescenceStatesIterator(); theseStates
282: .hasNext();) {
283: QuiescenceState this AgentState = (QuiescenceState) theseStates
284: .next();
285: if (!this AgentState.isQuiescent()) {
286: result = false;
287: if (logger.isDebugEnabled()) {
288: logger.debug(this AgentState.getAgentName()
289: + " is not quiescent");
290: } else {
291: break;
292: }
293: } else {
294: if (logger.isDebugEnabled()) {
295: logger.debug(this AgentState.getAgentName()
296: + " is quiescent");
297: }
298: }
299: }
300: return result;
301: }
302:
303: /** Check known QS to see if all locally-sent messages have been recieved */
304: private boolean noMessagesAreOutstanding() {
305: // Old version: O(N^2)
306: // for each X in local agents {
307: // for each Y in local agents {
308: // if (X.sentTo(Y) != Y.receivedFrom(X)) return false;
309: // }}
310: // return true;
311: //
312: // this is bad because:
313: // - major: VERY few agents talk to all (or even most) other local agents
314: // - minor: sentTo cannot be < receivedFrom (factor of 2)
315: //
316: // So now, we'll do:
317: // for each X in local agents {
318: // foreach Y in (X.sentToList) {
319: // if (Y.isLocal) {
320: // if (X.sentTo(Y) != Y.receivedFrom(X)) return false;
321: // }}}
322: // return true;
323:
324: int local_agents = 0;
325: int number_compares = 0;
326: for (Iterator theseStates = getQuiescenceStatesIterator(); theseStates
327: .hasNext();) {
328: local_agents++;
329: QuiescenceState this AgentState = (QuiescenceState) theseStates
330: .next();
331: MessageAddress this Agent = this AgentState.getAgent();
332: for (Iterator theseNumbers = this AgentState
333: .getOutgoingEntrySet().iterator(); theseNumbers
334: .hasNext();) {
335: Map.Entry this Number = (Map.Entry) theseNumbers.next();
336:
337: MessageAddress thatAgent = (MessageAddress) this Number
338: .getKey();
339: QuiescenceState thatAgentState = accessQuiescenceState(thatAgent);
340: if (thatAgentState != null
341: && thatAgentState.isEnabled()
342: && thatAgentState.isAlive()) {
343: number_compares++;
344: Integer sentNumber = this AgentState
345: .getOutgoingMessageNumber(thatAgent);
346: Integer rcvdNumber = thatAgentState
347: .getIncomingMessageNumber(this Agent);
348: boolean match;
349: if (sentNumber == null) {
350: match = rcvdNumber == null;
351: } else {
352: match = sentNumber.equals(rcvdNumber);
353: }
354: if (!match) {
355: if (logger.isDebugEnabled()) {
356: logger.debug("Quiescence prevented by "
357: + this Agent + " sent " + sentNumber
358: + ", but " + thatAgent + " rcvd "
359: + rcvdNumber);
360: }
361: return false;
362: }
363: }
364: }
365: }
366: if (logger.isDebugEnabled()) {
367: logger
368: .debug("Quiescence message compare statistics: locals="
369: + local_agents
370: + ", compares="
371: + number_compares);
372: }
373: return true;
374: }
375:
376: private void appendMessageNumbers(StringBuffer ms, Map messages,
377: String listTag, String itemTag) {
378: ms.append(" <").append(listTag).append(">").append(EOL);
379: for (Iterator entries = messages.entrySet().iterator(); entries
380: .hasNext();) {
381: Map.Entry entry = (Map.Entry) entries.next();
382: MessageAddress otherAgent = (MessageAddress) entry.getKey();
383: if (isLocalAgent(otherAgent))
384: continue; // Exclude local agents
385: Integer msgnum = (Integer) entry.getValue(); //
386: ms.append(" <").append(itemTag).append(" agent=\"")
387: .append(otherAgent).append("\" msgnum=\"").append(
388: msgnum).append("\"/>").append(EOL);
389: }
390: ms.append(" </").append(listTag).append(">").append(EOL);
391: }
392:
393: private void announceQuiescence() {
394: // Spit out the message numbers we sent to agents of other
395: // nodes and the message numbers we received from agents
396: // of other nodes.
397: StringBuffer ms = new StringBuffer();
398: ms.append("<node name=\"").append(nodeName).append(
399: "\" quiescent=\"true\">").append(EOL);
400: // Loop over relevant agents to get message numbers
401: for (Iterator states = getQuiescenceStatesIterator(); states
402: .hasNext();) {
403: QuiescenceState quiescenceState = (QuiescenceState) states
404: .next();
405: ms.append(" <agent name=\"").append(
406: quiescenceState.getAgentName()).append("\">")
407: .append(EOL);
408: appendMessageNumbers(ms, quiescenceState
409: .getOutgoingMessageNumbers(), "receivers", "dest");
410: appendMessageNumbers(ms, quiescenceState
411: .getIncomingMessageNumbers(), "senders", "src");
412: ms.append(" </agent>").append(EOL);
413: }
414: ms.append("</node>").append(EOL);
415: quiescenceAnnouncer.announceQuiescence(ms.toString());
416: }
417:
418: private void announceNonQuiescence() {
419: quiescenceAnnouncer.announceNonquiescence("<node name=\""
420: + nodeName + "\" quiescent=\"false\"/>" + EOL);
421: }
422:
423: private class QuiescenceAnnouncer implements Runnable {
424: private String pendingAnnouncement;
425: private boolean lastAnnouncedQuiescence = true;
426: private long announcementTime;
427: private EventService eventService;
428: private Object eventServiceLock = new Object();
429: private Schedulable schedulable;
430: private boolean isRunning;
431:
432: public QuiescenceAnnouncer() {
433: super ();
434: isRunning = true;
435: }
436:
437: /**
438: * Stops the quiescence announcer.
439: * <p>
440: *
441: * This method should be invoked when the quiescence report service is revoked.
442: */
443: protected synchronized void stopQuiescenceAnnouncer() {
444: isRunning = false;
445: interrupt();
446: }
447:
448: public synchronized void interrupt() {
449: schedulable.cancel();
450: }
451:
452: public synchronized void announceNonquiescence(
453: String announcement) {
454: if (isRunning) {
455: // Cancel pending announcment if any
456: pendingAnnouncement = null;
457: if (lastAnnouncedQuiescence) {
458: event(announcement);
459: lastAnnouncedQuiescence = false;
460: }
461: }
462: }
463:
464: public synchronized void announceQuiescence(String announcement) {
465: if (isRunning) {
466: if (schedulable == null) {
467: // first time
468: ThreadService tsvc = (ThreadService) sb.getService(
469: this , ThreadService.class, null);
470: schedulable = tsvc.getThread(this , this ,
471: "Quiescence Announcer");
472: sb.releaseService(this , ThreadService.class, tsvc);
473: }
474: // Replace the pending announcement
475: pendingAnnouncement = announcement;
476: // and restart the timeout
477: announcementTime = System.currentTimeMillis()
478: + ANNOUNCEMENT_DELAY;
479: schedulable.start();
480: }
481: }
482:
483: private long getDelay() {
484: return announcementTime - System.currentTimeMillis();
485: }
486:
487: private void event(String msg) {
488: synchronized (eventServiceLock) {
489: if (eventService == null) {
490: eventService = (EventService) sb.getService(
491: QuiescenceReportServiceProvider.this ,
492: EventService.class, null);
493: if (eventService == null) {
494: logger.error("No EventService available for "
495: + EOL + msg);
496: }
497: }
498: }
499: if (eventService != null)
500: eventService.event(msg);
501: else if (logger.isInfoEnabled())
502: logger.info(msg);
503: }
504:
505: public synchronized void run() {
506: long delay;
507:
508: if (isRunning) {
509: try {
510: if (pendingAnnouncement == null) {
511: // Nothing to announce. No-op.
512: } else if ((delay = getDelay()) > 0L) {
513: // run again later
514: schedulable.schedule(delay);
515: } else {
516: event(pendingAnnouncement);
517: pendingAnnouncement = null;
518: lastAnnouncedQuiescence = true;
519: }
520: } catch (Exception e) {
521: logger.error("QuiescenceAnnouncer", e);
522: }
523: }
524: }
525: }
526:
527: private class QuiescenceReportForDistributorServiceImpl extends
528: QuiescenceReportServiceImpl implements
529: QuiescenceReportForDistributorService {
530: public QuiescenceReportForDistributorServiceImpl(
531: String requestorName) {
532: super (requestorName);
533: }
534:
535: public void setQuiescenceReportEnabled(boolean enabled) {
536: QuiescenceReportServiceProvider.this
537: .setQuiescenceReportEnabled(quiescenceState,
538: enabled);
539: }
540: }
541:
542: // FIXME: Need a factory so can ensure unique requestorNames?
543:
544: private class QuiescenceReportServiceImpl implements
545: QuiescenceReportService {
546: protected QuiescenceState quiescenceState = null;
547:
548: // We haven't been counted as not quiescent yet
549: // This tracks whether this particular client of the QRS says it is quiescent, where the Distributor
550: // is just one
551: private boolean isServiceImplQuiescent = true;
552: private String requestorName;
553:
554: // constructor used only by NodeAgent
555: public QuiescenceReportServiceImpl(MessageAddress agent) {
556: quiescenceState = getQuiescenceState(agent); // Drop messageAttributes
557: this .requestorName = agent.toString();
558: }
559:
560: // requestorName is the toString on the service requestor object, plus uniqueID
561: public QuiescenceReportServiceImpl(String requestorName) {
562: this .requestorName = requestorName;
563: }
564:
565: public void setAgentIdentificationService(
566: AgentIdentificationService ais) {
567: if (ais == null) {
568: quiescenceState = null;
569: } else {
570: quiescenceState = getQuiescenceState(ais
571: .getMessageAddress());
572: }
573: }
574:
575: /**
576: * Specify the maps of quiescence-relevant outgoing and incoming
577: * message numbers associated with a newly achieved state of
578: * quiescence. For efficiency, this method should be called before
579: * calling setQuiescentState().
580: * @param outgoingMessageNumbers a Map from agent MessageAddresses
581: * to Integers giving the number of the last message sent. The
582: * arguments must not be null.
583: * @param incomingMessageNumbers a Map from agent MessageAddresses
584: * to Integers giving the number of the last message received. The
585: * arguments must not be null.
586: */
587: public void setMessageNumbers(Map outgoing, Map incoming) {
588: if (quiescenceState == null)
589: throw new RuntimeException(
590: "AgentIdentificationService has not be set");
591: QuiescenceReportServiceProvider.this .setMessageNumbers(
592: quiescenceState, outgoing, incoming);
593: }
594:
595: /**
596: * Specifies that, from this service instance point-of-view, the
597: * agent is quiescent.
598: */
599: public void setQuiescentState() {
600: if (quiescenceState == null) {
601: throw new RuntimeException(
602: "AgentIdentificationService has not been set.");
603: }
604:
605: // RFE 3760: Remove this requestor from qState's list of blockers
606: if (isServiceImplQuiescent) {
607: // If this serviceImpl already thinks it is quiescent, no need to say so twice
608: if (logger.isDebugEnabled())
609: logger
610: .debug("setQuiescentState for "
611: + requestorName
612: + " of "
613: + quiescenceState.getAgentName()
614: + ", but this ServiceImpl already quiescent.");
615: } else {
616: if (logger.isDebugEnabled()) {
617: logger.debug("setQuiescentState for "
618: + requestorName + " of "
619: + quiescenceState.getAgentName());
620: }
621: isServiceImplQuiescent = true;
622: QuiescenceReportServiceProvider.this .setQuiescent(
623: quiescenceState, true, requestorName);
624: }
625: }
626:
627: /**
628: * Specifies that this agent is no longer quiescent.
629: * That is, this client of the QRS (requestor) is saying that it
630: * has work to do, and therefore the agent cannot be quiescent.
631: */
632: public void clearQuiescentState() {
633: if (quiescenceState == null) {
634: throw new RuntimeException(
635: "AgentIdentificationService has not been set.");
636: }
637:
638: // RFE 3760: Add this requestor to qState's list of blockers
639: // [Commented this out so can track all blockers of quiescence...]
640: if (!isServiceImplQuiescent) {
641: // FIXME: Without this return, 2 requestors with same name can conflict - second will make both non-q
642: // but also it only takes 1 to make both Q
643: // return;
644:
645: // If this serviceImpl already thinks it is blocking quiescence, no need to say so twice
646: if (logger.isDebugEnabled()) {
647: logger
648: .debug("clearQuiescentState for "
649: + requestorName
650: + " of "
651: + quiescenceState.getAgentName()
652: + ", but this ServiceImpl already not quiescent.");
653: }
654: } else {
655: if (logger.isDebugEnabled()) {
656: logger.debug("clearQuiescentState for "
657: + requestorName + " of "
658: + quiescenceState.getAgentName());
659: }
660: isServiceImplQuiescent = false;
661: QuiescenceReportServiceProvider.this .setQuiescent(
662: quiescenceState, false, requestorName);
663: }
664: }
665: } // end of QuiescenceReportServiceImpl
666:
667: private synchronized MessageAddress[] listQuiescenceStates() {
668: // return a new collection from quiescenceStates.keySet();
669: Collection states = quiescenceStates.keySet();
670: return (MessageAddress[]) states
671: .toArray(new MessageAddress[states.size()]);
672: }
673:
674: // Put this in separate synch method so can ensure setDead is protected, and can call checkQuiescence
675: private synchronized void setAgentDead(MessageAddress agentAddress) {
676: QuiescenceState qState = accessQuiescenceState(agentAddress);
677: if (qState == null)
678: return;
679: qState.setDead();
680: checkQuiescence();
681: }
682:
683: // Service to support servlet to mark agents as dead (and view all agent registered)
684: private class AgentQuiescenceStateServiceImpl implements
685: AgentQuiescenceStateService {
686: /** Is the Node altogether quiescent */
687: public boolean isNodeQuiescent() {
688: return QuiescenceReportServiceProvider.this .isQuiescent;
689: }
690:
691: /**
692: * List the local agents with quiescence states for the Node to consider
693: * @return an array of MessageAddresses registered with the Nodes QuiescenceReportService
694: */
695: public MessageAddress[] listAgentsRegistered() {
696: return listQuiescenceStates();
697: }
698:
699: /**
700: * Is the named agent's quiescence service enabled (ie the Distributor is fully loaded)?
701: * @param agentAddress The agent to query
702: * @return true if the agent's quiescence service has been enabled and it counts towards Node quiescence, false otherwise or if it does not exist locally
703: */
704: public boolean isAgentEnabled(MessageAddress agentAddress) {
705: if (agentAddress == null)
706: return false;
707: QuiescenceState qState = accessQuiescenceState(agentAddress);
708: if (qState == null)
709: return false;
710: return qState.isEnabled();
711: }
712:
713: /**
714: * Is the named agent quiescent?
715: * @param agentAddress The agent to query
716: * @return true if the Agent's Distributor is quiescent, false otherwise or if the agent does not exist locally
717: */
718: public boolean isAgentQuiescent(MessageAddress agentAddress) {
719: if (agentAddress == null)
720: return false;
721: QuiescenceState qState = accessQuiescenceState(agentAddress);
722: if (qState == null)
723: return false;
724: return qState.isQuiescent();
725: }
726:
727: /**
728: * Is the named agent alive for quiescence purposes, or has it been
729: * marked as dead to be ignored?
730: * @param agentAddress The agent to query
731: * @return false if the agent is dead and should be ignored for local quiescence or does not exist locally
732: */
733: public boolean isAgentAlive(MessageAddress agentAddress) {
734: if (agentAddress == null)
735: return false;
736: QuiescenceState qState = accessQuiescenceState(agentAddress);
737: if (qState == null)
738: return false;
739: return qState.isAlive();
740: }
741:
742: /**
743: * Mark the named agent as dead - it has been restarted elsewhere, and should
744: * be ignored locally for quiescence calculations.
745: * @param agentAddress The Agent to mark as dead
746: */
747: public void setAgentDead(MessageAddress agentAddress) {
748: if (agentAddress == null)
749: return;
750: QuiescenceReportServiceProvider.this
751: .setAgentDead(agentAddress);
752: }
753:
754: // Other options: list message numbers?
755:
756: // RFE 3760: List blockers of an agents quiescence (return a String)
757: // FIXME: Synchronize something?
758: public String getAgentQuiescenceBlockers(
759: MessageAddress agentAddress) {
760: if (agentAddress == null)
761: return "";
762: QuiescenceState qState = accessQuiescenceState(agentAddress);
763: if (qState == null)
764: return "";
765: return qState.getBlockersString();
766: }
767: }
768: }
|