001: /*
002: * <copyright>
003: *
004: * Copyright 2001-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.planning.plugin.completion;
028:
029: import java.util.Collection;
030: import java.util.Collections;
031: import java.util.HashSet;
032: import java.util.Iterator;
033: import java.util.Map;
034: import java.util.Set;
035: import java.util.WeakHashMap;
036:
037: import org.cougaar.core.blackboard.ChangeReport;
038: import org.cougaar.core.blackboard.IncrementalSubscription;
039: import org.cougaar.core.persist.PersistenceNotEnabledException;
040: import org.cougaar.util.UnaryPredicate;
041:
042: /**
043: * This plugin gathers and integrates completion information from one
044: * agent to determine the "completion" of the current blackboard objects.
045: * It gathers the information and forwards the completion status of the
046: * agent to another agent. This is typically the NodeAgent of the node
047: * on which the agent is running.
048: **/
049:
050: public class CompletionTargetPlugin extends CompletionPlugin {
051: private static final long NORMAL_SLEEP_INTERVAL = 5000L;
052: private static final long NORMAL_ACTIVITY_DELAY = 600000;
053: private static final String SLEEP_INTERVAL_KEY = "SLEEP_INTERVAL=";
054: private static final String ACTIVITY_DELAY_KEY = "ACTIVITY_DELAY=";
055: private long SLEEP_INTERVAL = NORMAL_SLEEP_INTERVAL;
056: private long ACTIVITY_DELAY = NORMAL_ACTIVITY_DELAY;
057:
058: private static class MyChangeReport implements ChangeReport {
059: }
060:
061: private ChangeReport myChangeReport = new MyChangeReport();
062: private Set myChangeReports = Collections.singleton(myChangeReport);
063: private Set changedRelays = new HashSet();
064: private static final Class[] requiredServices = {};
065: protected Set ignoredVerbs = new HashSet();
066: private IncrementalSubscription relaySubscription;
067: private IncrementalSubscription activitySubscription;
068: protected long now; // Time of current execute()
069: protected long scenarioNow; // Scenario time of current execute()
070: private long lastActivity; // Time of last activity
071: private double cpuConsumption = 0.0;
072: private double blackboardCompletion = 0.0;
073: private boolean updateBlackboardCompletionPending = true;
074: private boolean debug = false;
075: private Map filters = new WeakHashMap();
076:
077: protected CompletionCalculator calc;
078:
079: public CompletionTargetPlugin() {
080: super (requiredServices);
081: }
082:
083: protected UnaryPredicate createActivityPredicate() {
084: return new CompletionActivityPredicate();
085: }
086:
087: protected CompletionCalculator getCalculator() {
088: if (calc == null) {
089: calc = new CompletionCalculator();
090: }
091: return calc;
092: }
093:
094: public void setupSubscriptions() {
095: Collection params = getParameters();
096: for (Iterator i = params.iterator(); i.hasNext();) {
097: String param = (String) i.next();
098: if (param.startsWith(SLEEP_INTERVAL_KEY)) {
099: SLEEP_INTERVAL = Long.parseLong(param
100: .substring(SLEEP_INTERVAL_KEY.length()));
101: if (logger.isInfoEnabled())
102: logger.info("Set " + SLEEP_INTERVAL_KEY
103: + SLEEP_INTERVAL);
104: continue;
105: }
106: if (param.startsWith(ACTIVITY_DELAY_KEY)) {
107: ACTIVITY_DELAY = Long.parseLong(param
108: .substring(ACTIVITY_DELAY_KEY.length()));
109: if (logger.isInfoEnabled())
110: logger.info("Set " + ACTIVITY_DELAY_KEY
111: + ACTIVITY_DELAY);
112: continue;
113: }
114: }
115: debug = true;//getMessageAddress().toString().equals("47-FSB");
116: relaySubscription = (IncrementalSubscription) blackboard
117: .subscribe(targetRelayPredicate);
118: UnaryPredicate activityPredicate = createActivityPredicate();
119: activitySubscription = (IncrementalSubscription) blackboard
120: .subscribe(activityPredicate, new AmnesiaCollection(),
121: true);
122: lastActivity = System.currentTimeMillis();
123: resetTimer(SLEEP_INTERVAL);
124: }
125:
126: public void execute() {
127: processSubscriptions();
128: if (relaySubscription.hasChanged()) {
129: changedRelays.clear();
130: changedRelays
131: .addAll(relaySubscription.getAddedCollection());
132: Collection changes = relaySubscription
133: .getChangedCollection();
134: if (changes.size() > 0) {
135: for (Iterator i = changes.iterator(); i.hasNext();) {
136: CompletionRelay relay = (CompletionRelay) i.next();
137: Set changeReports = relaySubscription
138: .getChangeReports(relay);
139: if (changeReports == null
140: || !changeReports.equals(myChangeReports)) {
141: changedRelays.add(relay);
142: }
143: }
144: }
145: if (changedRelays.size() > 0) {
146: checkPersistenceNeeded(changedRelays);
147: }
148: }
149: }
150:
151: private void processSubscriptions() {
152: boolean timerExpired = timerExpired();
153: now = System.currentTimeMillis();
154: scenarioNow = getAlarmService().currentTimeMillis();
155: if (activitySubscription.hasChanged()) {
156: lastActivity = now;
157: // Activity has changed blackboard completion
158: updateBlackboardCompletionPending = true;
159: }
160: updateCPUConsumption(now);
161: if (timerExpired) {
162: if (updateBlackboardCompletionPending) {
163: updateBlackboardCompletion();
164: }
165: cancelTimer();
166: resetTimer(SLEEP_INTERVAL);
167: }
168: maybeRespondToRelays();
169: }
170:
171: protected void setPersistenceNeeded() {
172: try {
173: blackboard.persistNow();
174: if (logger.isInfoEnabled()) {
175: logger.info("doPersistence()");
176: }
177: } catch (PersistenceNotEnabledException pnee) {
178: logger.error(pnee.getMessage(), pnee);
179: }
180: }
181:
182: private void updateCPUConsumption(long now) {
183: cpuConsumption = Math
184: .max(
185: 0.0,
186: 1.0 - (((double) (now - lastActivity)) / ACTIVITY_DELAY));
187: }
188:
189: private void updateBlackboardCompletion() {
190: CompletionCalculator cc = getCalculator();
191: Collection objs = blackboard.query(cc.getPredicate());
192: blackboardCompletion = cc.calculate(objs);
193: updateBlackboardCompletionPending = false;
194: }
195:
196: /**
197: * Create a new Laggard if the conditions warrant. The conditions
198: * warranting a new laggard are embodied in the LaggardFilter, but
199: * we want to defer recomputing blackboard completion as long as possible
200: * because it is moderately expensive. So, if the filter suppresses
201: * transmission for either value of blackboard completion, then blackboard
202: * completion is not updated transmission is suppressed. Otherwise,
203: * blackboard completion is updated and a new laggard created.
204: **/
205: private Laggard createLaggard(CompletionRelay relay) {
206: boolean cpuConsumed = cpuConsumption > relay.getCPUThreshold();
207: LaggardFilter filter = (LaggardFilter) filters.get(relay);
208: if (filter == null) {
209: filter = new LaggardFilter();
210: filters.put(relay, filter);
211: }
212: if (updateBlackboardCompletionPending) {
213: if (filter.filter(true, now) || !cpuConsumed
214: && filter.filter(false, now)) {
215: updateBlackboardCompletion();
216: }
217: }
218: boolean isBlackboardIncomplete = blackboardCompletion < relay
219: .getCompletionThreshold();
220: boolean isLaggard = cpuConsumed || isBlackboardIncomplete;
221: if (filter.filter(isLaggard, now)) {
222: Laggard newLaggard = new Laggard(getAgentIdentifier(),
223: blackboardCompletion, cpuConsumption, isLaggard);
224: filter.setOldLaggard(newLaggard);
225: return newLaggard;
226: }
227: return null;
228: }
229:
230: private void maybeRespondToRelays() {
231: if (debug && logger.isDebugEnabled()
232: && relaySubscription.size() == 0) {
233: return;
234: }
235: for (Iterator relays = relaySubscription.iterator(); relays
236: .hasNext();) {
237: CompletionRelay relay = (CompletionRelay) relays.next();
238: Laggard newLaggard = createLaggard(relay);
239: if (newLaggard != null) {
240: if (logger.isDebugEnabled())
241: logger.debug("Send response to "
242: + relay.getSource() + ": " + newLaggard);
243: relay.setResponseLaggard(newLaggard);
244: blackboard.publishChange(relay, myChangeReports);
245: } else {
246: // if (logger.isDebugEnabled()) logger.debug("No new response to " + relay.getSource());
247: }
248: }
249: }
250: }
|