001: /*
002: * <copyright>
003: *
004: * Copyright 2001-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.planning.plugin.completion;
028:
029: import java.util.ArrayList;
030: import java.util.Collection;
031: import java.util.Collections;
032: import java.util.HashSet;
033: import java.util.Iterator;
034: import java.util.List;
035: import java.util.Map;
036: import java.util.Set;
037: import java.util.SortedSet;
038: import java.util.WeakHashMap;
039:
040: import org.cougaar.core.agent.AgentContainer;
041: import org.cougaar.core.blackboard.IncrementalSubscription;
042: import org.cougaar.core.mts.MessageAddress;
043: import org.cougaar.core.node.NodeBusyService;
044: import org.cougaar.core.node.NodeControlService;
045:
046: /**
047: * This plugin gathers and integrates completion information from
048: * agents in a node to determine the "completion" of the current
049: * tasks. It continually determines the worst laggard in the node and
050: * forwards that one laggard to the society root.
051: **/
052:
053: public class CompletionNodePlugin extends CompletionSourcePlugin {
054: private IncrementalSubscription targetRelaySubscription;
055: private Map filters = new WeakHashMap();
056: private Laggard worstLaggard = null;
057: private NodeControlService ncs;
058: private NodeBusyService nbs;
059: private static final Class[] requiredServices = {
060: NodeControlService.class, NodeBusyService.class, };
061:
062: public CompletionNodePlugin() {
063: super (requiredServices);
064: }
065:
066: public void load() {
067: super .load();
068: }
069:
070: public void unload() {
071: if (haveServices()) {
072: getServiceBroker().releaseService(this ,
073: NodeControlService.class, ncs);
074: getServiceBroker().releaseService(this ,
075: NodeBusyService.class, nbs);
076: }
077: super .unload();
078: }
079:
080: protected boolean haveServices() {
081: if (nbs != null && ncs != null)
082: return true;
083: if (super .haveServices()) {
084: ncs = (NodeControlService) getServiceBroker().getService(
085: this , NodeControlService.class, null);
086: nbs = (NodeBusyService) getServiceBroker().getService(this ,
087: NodeBusyService.class, null);
088: return true;
089: }
090: return false;
091: }
092:
093: public void setupSubscriptions() {
094: targetRelaySubscription = (IncrementalSubscription) blackboard
095: .subscribe(targetRelayPredicate);
096: super .setupSubscriptions();
097: }
098:
099: public void execute() {
100: if (targetRelaySubscription.hasChanged()) {
101: checkPersistenceNeeded(targetRelaySubscription);
102: if (logger.isDebugEnabled()) {
103: Collection newRelays = targetRelaySubscription
104: .getAddedCollection();
105: if (!newRelays.isEmpty()) {
106: for (Iterator i = newRelays.iterator(); i.hasNext();) {
107: CompletionRelay relay = (CompletionRelay) i
108: .next();
109: logger
110: .debug("New target: "
111: + relay.getSource());
112: if (worstLaggard != null) {
113: sendResponseLaggard(relay, worstLaggard);
114: }
115: }
116: }
117: }
118: }
119: super .execute();
120: }
121:
122: protected Set getTargets() {
123: // get local agent addresses
124: AgentContainer agentContainer = ncs.getRootContainer();
125: if (agentContainer == null) {
126: if (logger.isErrorEnabled()) {
127: logger.error("Unable to list local agents on node "
128: + getAgentIdentifier());
129: }
130: return Collections.EMPTY_SET;
131: } else {
132: return agentContainer.getAgentAddresses();
133: }
134: }
135:
136: private void sendResponseLaggard(CompletionRelay relay,
137: Laggard newLaggard) {
138: if (logger.isDebugEnabled()) {
139: logger.debug("Send response to " + relay.getSource() + ": "
140: + newLaggard);
141: }
142: relay.setResponseLaggard(newLaggard);
143: blackboard.publishChange(relay);
144: }
145:
146: // Adjust the set of laggards to be sure all busy agents appear to be incomplete
147: protected boolean adjustLaggards(SortedSet laggards) {
148: Set targets = new HashSet(getTargets());
149: List newLaggards = new ArrayList(targets.size());
150: for (Iterator i = laggards.iterator(); i.hasNext();) {
151: Laggard laggard = (Laggard) i.next();
152: MessageAddress target = laggard.getAgent();
153: targets.remove(target);
154: if (laggard.isLaggard())
155: continue; // Already laggard, don't care if busy
156: if (nbs.isAgentBusy(target)) {
157: if (logger.isInfoEnabled()) {
158: logger.info("adjustLaggards: " + target
159: + " is busy");
160: }
161: i.remove();
162: newLaggards.add(new Laggard(target, laggard
163: .getBlackboardCompletion(), 1.0, true));
164: }
165: }
166: if (targets.size() > 0) {
167: // Some targets were apparently missing assume they are busy
168: for (Iterator i = targets.iterator(); i.hasNext();) {
169: MessageAddress target = (MessageAddress) i.next();
170: if (nbs.isAgentBusy(target)) {
171: newLaggards
172: .add(new Laggard(target, 0.0, 1.0, true));
173: }
174: }
175: }
176: if (logger.isDebugEnabled()) {
177: logger.debug("adjustLaggards laggards: " + laggards);
178: logger.debug(" newLaggards: " + newLaggards);
179: }
180: if (newLaggards.size() > 0) {
181: laggards.addAll(newLaggards);
182: return true;
183: }
184: return false;
185: }
186:
187: protected void handleNewLaggard(Laggard newLaggard) {
188: worstLaggard = newLaggard;
189: if (targetRelaySubscription.size() > 0) {
190: for (Iterator i = targetRelaySubscription.iterator(); i
191: .hasNext();) {
192: CompletionRelay relay = (CompletionRelay) i.next();
193: LaggardFilter filter = (LaggardFilter) filters
194: .get(relay);
195: if (filter == null) {
196: filter = new LaggardFilter();
197: filters.put(relay, filter);
198: }
199: if (filter.filter(newLaggard)) {
200: sendResponseLaggard(relay, newLaggard);
201: } else {
202: if (logger.isDebugEnabled())
203: logger.debug("No new response to "
204: + relay.getSource());
205: }
206: }
207: } else {
208: if (logger.isDebugEnabled())
209: logger.debug("No relays");
210: }
211: }
212: }
|