001: /*
002: * <copyright>
003: *
004: * Copyright 2001-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.planning.plugin.completion;
028:
029: import java.util.Collection;
030: import java.util.Collections;
031: import java.util.Date;
032: import java.util.Iterator;
033: import java.util.Set;
034: import java.util.SortedSet;
035:
036: import org.cougaar.core.blackboard.Subscription;
037: import org.cougaar.core.component.ServiceBroker;
038: import org.cougaar.core.relay.RelayChangeReport;
039: import org.cougaar.core.service.AlarmService;
040: import org.cougaar.core.service.DemoControlService;
041: import org.cougaar.core.service.UIDService;
042: import org.cougaar.util.UnaryPredicate;
043:
044: /**
045: * This plugin gathers and integrates completion information from
046: * agents in a society to determin the "completion" of the current
047: * tasks. In most agents, it gathers the information and forwards the
048: * completion status of the agent to another agent. This process
049: * continues through a hierarchy of such plugins until the plugin at
050: * the root of the tree is reached. When the root determines that
051: * completion has been acheived (or is never going to be achieved), it
052: * advances the clock with the expectation that the advancement will
053: * engender additional activity and waits for the completion of that
054: * work.
055: **/
056:
057: public abstract class CompletionSourcePlugin extends CompletionPlugin {
058: private static final double NORMAL_TASK_COMPLETION_THRESHOLD = 0.99;
059: private static final double CPU_CONSUMPTION_THRESHOLD = 0.95;
060: private static final long NORMAL_UPDATE_INTERVAL = 5000L;
061: private static final long NORMAL_LONG_CHECK_TARGETS_INTERVAL = 120000L;
062: private static final long NORMAL_SHORT_CHECK_TARGETS_INTERVAL = 15000L;
063: private static final long DEFAULT_DEAD_NODE_TIMEOUT = 120000L;
064: private static final long DEFAULT_TIMER_SLACK = 10000;
065: private static final String UPDATE_INTERVAL_KEY = "UPDATE_INTERVAL=";
066: private static final String LONG_CHECK_TARGETS_INTERVAL_KEY = "LONG_CHECK_TARGETS_INTERVAL=";
067: private static final String SHORT_CHECK_TARGETS_INTERVAL_KEY = "SHORT_CHECK_TARGETS_INTERVAL=";
068: private static final String TASK_COMPLETION_THRESHOLD_KEY = "TASK_COMPLETION_THRESHOLD=";
069: private static final String DEAD_NODE_TIMEOUT_KEY = "DEAD_NODE_TIMEOUT=";
070: private static final String TIMER_SLACK_KEY = "TIMER_SLACK=";
071: private static final int SHORT_CHECK_TARGETS_MAX = 5;
072: private double TASK_COMPLETION_THRESHOLD = NORMAL_TASK_COMPLETION_THRESHOLD;
073: private long UPDATE_INTERVAL = NORMAL_UPDATE_INTERVAL;
074: private long LONG_CHECK_TARGETS_INTERVAL = NORMAL_LONG_CHECK_TARGETS_INTERVAL;
075: private long SHORT_CHECK_TARGETS_INTERVAL = NORMAL_SHORT_CHECK_TARGETS_INTERVAL;
076: private long DEAD_NODE_TIMEOUT = DEFAULT_DEAD_NODE_TIMEOUT;
077: private long TIMER_SLACK = DEFAULT_TIMER_SLACK;
078: private static final Class[] requiredServices = { UIDService.class,
079: DemoControlService.class, AlarmService.class, };
080: protected UIDService uidService;
081: protected DemoControlService demoControlService;
082: protected AlarmService alarmService;
083: protected long now = System.currentTimeMillis();
084: // The following are all times when we need to awaken
085: private long nextCheckTargetsTime = 0L; // Time to check the list of targets
086: private long nextUpdateTime = now; // Time to check for new laggards
087: private int shortCheckTargetsCount = 0;
088: private CompletionRelay relay; // The relay we sent
089: private Laggard selfLaggard = null;
090: private UnaryPredicate myRelayPredicate = new UnaryPredicate() {
091: public boolean execute(Object o) {
092: return o == relay;
093: }
094: };
095: private Subscription responseSubscription;
096: private long timerTimeout = 0L; // When the timer should expire.
097:
098: private static Class[] concatRequiredServices(Class[] a1, Class[] a2) {
099: Class[] result = new Class[a1.length + a2.length];
100: System.arraycopy(a1, 0, result, 0, a1.length);
101: System.arraycopy(a2, 0, result, a1.length, a2.length);
102: return result;
103: }
104:
105: protected CompletionSourcePlugin() {
106: super (requiredServices);
107: }
108:
109: protected CompletionSourcePlugin(Class[] requiredServices) {
110: super (concatRequiredServices(
111: CompletionSourcePlugin.requiredServices,
112: requiredServices));
113: }
114:
115: public void suspend() {
116: if (haveServices()) {
117: ServiceBroker sb = getServiceBroker();
118: sb.releaseService(this , UIDService.class, uidService);
119: sb.releaseService(this , DemoControlService.class,
120: demoControlService);
121: sb.releaseService(this , AlarmService.class, alarmService);
122: uidService = null;
123: }
124: super .suspend();
125: }
126:
127: protected boolean haveServices() {
128: if (uidService != null)
129: return true;
130: if (acquireServices()) {
131: ServiceBroker sb = getServiceBroker();
132: uidService = (UIDService) sb.getService(this ,
133: UIDService.class, null);
134: demoControlService = (DemoControlService) sb.getService(
135: this , DemoControlService.class, null);
136: alarmService = (AlarmService) sb.getService(this ,
137: AlarmService.class, null);
138: return true;
139: }
140: return false;
141: }
142:
143: public void setupSubscriptions() {
144: Collection params = getParameters();
145: for (Iterator i = params.iterator(); i.hasNext();) {
146: String param = (String) i.next();
147: if (param.startsWith(TASK_COMPLETION_THRESHOLD_KEY)) {
148: TASK_COMPLETION_THRESHOLD = Double.parseDouble(param
149: .substring(TASK_COMPLETION_THRESHOLD_KEY
150: .length()));
151: if (logger.isInfoEnabled())
152: logger.info("Set " + TASK_COMPLETION_THRESHOLD_KEY
153: + TASK_COMPLETION_THRESHOLD);
154: continue;
155: }
156: if (param.startsWith(UPDATE_INTERVAL_KEY)) {
157: UPDATE_INTERVAL = Long.parseLong(param
158: .substring(UPDATE_INTERVAL_KEY.length()));
159: if (logger.isInfoEnabled())
160: logger.info("Set " + UPDATE_INTERVAL_KEY
161: + UPDATE_INTERVAL);
162: continue;
163: }
164: if (param.startsWith(LONG_CHECK_TARGETS_INTERVAL_KEY)) {
165: LONG_CHECK_TARGETS_INTERVAL = Long.parseLong(param
166: .substring(LONG_CHECK_TARGETS_INTERVAL_KEY
167: .length()));
168: if (logger.isInfoEnabled())
169: logger.info("Set "
170: + LONG_CHECK_TARGETS_INTERVAL_KEY
171: + LONG_CHECK_TARGETS_INTERVAL);
172: continue;
173: }
174: if (param.startsWith(SHORT_CHECK_TARGETS_INTERVAL_KEY)) {
175: SHORT_CHECK_TARGETS_INTERVAL = Long.parseLong(param
176: .substring(SHORT_CHECK_TARGETS_INTERVAL_KEY
177: .length()));
178: if (logger.isInfoEnabled())
179: logger.info("Set "
180: + SHORT_CHECK_TARGETS_INTERVAL_KEY
181: + SHORT_CHECK_TARGETS_INTERVAL);
182: continue;
183: }
184: if (param.startsWith(DEAD_NODE_TIMEOUT_KEY)) {
185: DEAD_NODE_TIMEOUT = Long.parseLong(param
186: .substring(DEAD_NODE_TIMEOUT_KEY.length()));
187: if (logger.isInfoEnabled())
188: logger.info("Set " + DEAD_NODE_TIMEOUT_KEY
189: + DEAD_NODE_TIMEOUT);
190: continue;
191: }
192: if (param.startsWith(TIMER_SLACK_KEY)) {
193: TIMER_SLACK = Long.parseLong(param
194: .substring(TIMER_SLACK_KEY.length()));
195: if (logger.isInfoEnabled())
196: logger.info("Set " + TIMER_SLACK_KEY + TIMER_SLACK);
197: continue;
198: }
199: }
200: responseSubscription = blackboard.subscribe(myRelayPredicate);
201: if (haveServices()) {
202: checkTargets();
203: checkSelfLaggard(true);
204: resetTimer(SHORT_CHECK_TARGETS_INTERVAL);
205: timerTimeout = System.currentTimeMillis()
206: + SHORT_CHECK_TARGETS_INTERVAL + TIMER_SLACK;
207: } else {
208: timerTimeout = 0L;
209: }
210: }
211:
212: public void execute() {
213: if (haveServices()) {
214: now = System.currentTimeMillis();
215: boolean timerExpired = !hasUnexpiredTimer(); // should we execute now?
216: if (!timerExpired) {
217: if (timerTimeout > 0L && now > timerTimeout) {
218: // this shouldn't hapen
219: long actual = getTimerExpirationTime();
220: logger.error("Timer failed to fire: now="
221: + (new Date(now)) + " timeout="
222: + (new Date(timerTimeout))
223: + " realTrigger=" + (new Date(actual)));
224: timerExpired = true;
225: }
226: }
227:
228: if (timerExpired) {
229: //cancelTimer(); // using resetTimer now, no need to cancel.
230: if (now > nextCheckTargetsTime) {
231: if (checkTargets()) {
232: checkSelfLaggard(true);
233: shortCheckTargetsCount = 0; // Reset and start over
234: nextCheckTargetsTime = now
235: + SHORT_CHECK_TARGETS_INTERVAL;
236: } else if (shortCheckTargetsCount < SHORT_CHECK_TARGETS_MAX) {
237: shortCheckTargetsCount++;
238: nextCheckTargetsTime = now
239: + SHORT_CHECK_TARGETS_INTERVAL;
240: if (logger.isDebugEnabled()) {
241: logger.debug("shortCheckTargetsCount="
242: + shortCheckTargetsCount);
243: }
244: } else { // Switch to using the long interval
245: nextCheckTargetsTime = now
246: + LONG_CHECK_TARGETS_INTERVAL;
247: }
248: } else if (shortCheckTargetsCount >= SHORT_CHECK_TARGETS_MAX) {
249: checkSelfLaggard(false);
250: checkLaggards();
251: }
252: resetTimer(UPDATE_INTERVAL);
253: timerTimeout = System.currentTimeMillis()
254: + UPDATE_INTERVAL + TIMER_SLACK;
255: }
256: } else {
257: if (logger.isDebugEnabled()) {
258: logger.debug("waiting for services");
259: }
260: }
261: }
262:
263: private void checkSelfLaggard(boolean isLaggard) {
264: if (selfLaggard == null || selfLaggard.isLaggard() != isLaggard) {
265: selfLaggard = new Laggard(getAgentIdentifier(), 1.0,
266: isLaggard ? 1.0 : 0.0, isLaggard);
267: if (isLaggard)
268: handleNewLaggard(selfLaggard);
269: }
270: }
271:
272: /**
273: * Check if a new relay needs to be published due to a change in
274: * targets. We get the set of targets by calling "getTargetNames()"
275: * and compare to the set of agents that are
276: * targes of the current relay. If a difference is detected, the old
277: * relay is removed and a new one with the new agent set is
278: * published.
279: * @return true if a new relay was published (suppresses laggard checking)
280: **/
281: private boolean checkTargets() {
282: Set targets = getTargets();
283: if (relay == null) {
284: relay = new CompletionRelay(null, targets,
285: TASK_COMPLETION_THRESHOLD,
286: CPU_CONSUMPTION_THRESHOLD);
287: relay.setUID(uidService.nextUID());
288: if (logger.isInfoEnabled())
289: logger.info("New relay for " + targets);
290: blackboard.publishAdd(relay);
291: return true;
292: }
293: if (!targets.equals(relay.getTargets())) {
294: RelayChangeReport rcr = new RelayChangeReport(relay);
295: relay.setTargets(targets);
296: blackboard.publishChange(relay, Collections.singleton(rcr));
297: if (logger.isInfoEnabled())
298: logger.info("Changed relay for " + targets);
299: return true;
300: }
301: if (logger.isDebugEnabled())
302: logger.debug("Same relay for " + targets);
303: return false;
304: }
305:
306: /**
307: * Identify the worst laggard and "handle" it.
308: **/
309: private void checkLaggards() {
310: SortedSet laggards = relay.getLaggards();
311: adjustLaggards(laggards);
312: if (laggards.size() > 0) {
313: long oldestAllowedTimestamp = now
314: - (LaggardFilter.NON_LAGGARD_UPDATE_INTERVAL + DEAD_NODE_TIMEOUT);
315: for (Iterator i = laggards.iterator(); i.hasNext();) {
316: Laggard newLaggard = (Laggard) i.next();
317: long okBy = newLaggard.getTimestamp()
318: - oldestAllowedTimestamp;
319: if (okBy > 0L) {
320: if (logger.isDebugEnabled()) {
321: logger.debug("checkLaggards(" + (okBy / 1000L)
322: + ") " + newLaggard);
323: }
324: handleNewLaggard(newLaggard);
325: break;
326: } else {
327: //relay.flushOutdatedLaggard(newLaggard);
328: if (logger.isDebugEnabled()) {
329: logger.debug("checkLaggards ignoring old "
330: + newLaggard);
331: }
332: }
333: }
334: } else {
335: handleNewLaggard(selfLaggard);
336: if (logger.isDebugEnabled()) {
337: logger.debug("Waiting for relay responses");
338: }
339: }
340: }
341:
342: protected void setPersistenceNeeded() {
343: if (logger.isInfoEnabled()) {
344: logger.info("setPersistence()");
345: }
346: relay.setPersistenceNeeded();
347: blackboard.publishChange(relay);
348: }
349:
350: /**
351: * Get the addresses of all targets
352: **/
353: protected abstract Set getTargets();
354:
355: protected abstract void handleNewLaggard(Laggard worstLaggard);
356:
357: protected abstract boolean adjustLaggards(SortedSet laggards);
358: }
|