001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.planning.plugin.adaptivity;
028:
029: import org.cougaar.core.adaptivity.OMCRangeList;
030: import org.cougaar.core.adaptivity.SensorCondition;
031: import org.cougaar.core.blackboard.IncrementalSubscription;
032: import org.cougaar.core.component.ServiceBroker;
033: import org.cougaar.core.persist.NotPersistable;
034: import org.cougaar.core.plugin.ServiceUserPlugin;
035: import org.cougaar.core.service.ConditionService;
036: import org.cougaar.planning.ldm.plan.PlanElement;
037: import org.cougaar.planning.ldm.plan.Task;
038: import org.cougaar.util.UnaryPredicate;
039:
040: /**
041: * Plugin to sense task processing conditions. We create and maintain
042: * three Conditions: the rate of arrival of new tasks
043: * (TaskSensor.publishRate), the rate of disposal of tasks
044: * (TaskSensor.disposeRate), and the current backlog
045: * (TaskSensor.backlog). The backlog is the integral of the difference
046: * of the first two. The rate of disposal includes the rescind rate.
047: * All three are smoothed with low-pass filters having the same time
048: * constant (30 seconds currently).
049: **/
050: public class TaskSensorPlugin extends ServiceUserPlugin {
051: /** The name of the Condition we publish **/
052: private static final String PUBLISH_RATE_CONDITION_NAME = "TaskSensor.publishRate";
053: private static final String DISPOSE_RATE_CONDITION_NAME = "TaskSensor.disposeRate";
054: private static final String BACKLOG_CONDITION_NAME = "TaskSensor.backlog";
055:
056: private static final OMCRangeList POSITIVE_VALUES = new OMCRangeList(
057: new Double(0.0), new Double(Double.MAX_VALUE));
058:
059: private static final double TIME_CONSTANT = 10000.0; // Ten second time constant
060: private static final long UPDATE_INTERVAL = 5000L; // Update every 5 seconds
061:
062: private ConditionService conditionService;
063:
064: private MyCondition publishRateCondition = new MyCondition(
065: PUBLISH_RATE_CONDITION_NAME);
066: private MyCondition disposeRateCondition = new MyCondition(
067: DISPOSE_RATE_CONDITION_NAME);
068: private MyCondition backlogCondition = new MyCondition(
069: BACKLOG_CONDITION_NAME);
070:
071: private long then = System.currentTimeMillis();
072: private IncrementalSubscription tasksSubscription;
073: private UnaryPredicate tasksPredicate = new UnaryPredicate() {
074: public boolean execute(Object o) {
075: return o instanceof Task;
076: }
077: };
078: private IncrementalSubscription peSubscription;
079: private UnaryPredicate pePredicate = new UnaryPredicate() {
080: public boolean execute(Object o) {
081: return o instanceof PlanElement;
082: }
083: };
084:
085: /**
086: * Private inner class precludes use by others to set our
087: * measurement. Others can only reference the base Condition
088: * class which has no setter method.
089: **/
090: private static class MyCondition extends SensorCondition implements
091: NotPersistable {
092: private double filtered = 0.0;
093:
094: public MyCondition(String name) {
095: super (name, POSITIVE_VALUES);
096: filtered = ((Number) getValue()).doubleValue();
097: }
098:
099: public void updateRate(double sample, long elapsed) {
100: double f = Math.exp(-elapsed / TIME_CONSTANT);
101: double g = (1.0 - f) / elapsed;
102: filtered = filtered * f + sample * g;
103: }
104:
105: public void update(double sample, long elapsed) {
106: double f = Math.exp(-elapsed / TIME_CONSTANT);
107: double g = (1.0 - f);
108: filtered = filtered * f + sample * g;
109: }
110:
111: public void publish() {
112: super .setValue(new Double(filtered));
113: }
114: }
115:
116: private static final Class[] requiredServices = { ConditionService.class };
117:
118: public TaskSensorPlugin() {
119: super (requiredServices);
120: }
121:
122: public void setupSubscriptions() {
123: blackboard.publishAdd(publishRateCondition);
124: blackboard.publishAdd(disposeRateCondition);
125: blackboard.publishAdd(backlogCondition);
126: tasksSubscription = (IncrementalSubscription) blackboard
127: .subscribe(tasksPredicate);
128: peSubscription = (IncrementalSubscription) blackboard
129: .subscribe(pePredicate);
130: if (haveServices())
131: update(true);
132: }
133:
134: /**
135: * Test if all needed services have been acquired. Test the
136: * conditionService variable for null. If still null ask
137: * acquireServices to continue trying to acquire services. If true
138: * is returned, fill in the service variables and return true.
139: * Subsequent calls will return true immediately.
140: **/
141: private boolean haveServices() {
142: if (conditionService != null)
143: return true;
144: if (acquireServices()) {
145: ServiceBroker sb = getServiceBroker();
146: conditionService = (ConditionService) sb.getService(this ,
147: ConditionService.class, null);
148: return true;
149: }
150: return false;
151: }
152:
153: public void execute() {
154: logger.debug("execute");
155: if (haveServices()) {
156: update(timerExpired());
157: }
158: }
159:
160: /**
161: * Update the conditions. We have one subscription to undisposed
162: * tasks. From this subscription we obtain three values: The number
163: * of newly published tasks is just the size of the added list of
164: * the backlog subscription. The number of previously published
165: * tasks that have been disposed is the size of the removed list of
166: * the backlog subscription. And the current backlog is just the
167: * size of the backlog subscription.
168: **/
169: private void update(boolean publish) {
170: long now = System.currentTimeMillis();
171: long elapsed = now - then;
172: if (elapsed < 1)
173: elapsed = 1;
174: then = now;
175: publishRateCondition.updateRate(tasksSubscription
176: .getAddedCollection().size(), elapsed);
177: disposeRateCondition.updateRate(peSubscription
178: .getAddedCollection().size(), elapsed);
179: backlogCondition.update(tasksSubscription.size()
180: - peSubscription.size(), elapsed);
181: if (publish) {
182: cancelTimer();
183: if (logger.isDebugEnabled()) {
184: logger.debug(publishRateCondition.toString());
185: logger.debug(disposeRateCondition.toString());
186: logger.debug(backlogCondition.toString());
187: }
188: publishRateCondition.publish();
189: disposeRateCondition.publish();
190: backlogCondition.publish();
191: resetTimer(UPDATE_INTERVAL);
192: }
193: }
194: }
|