001: /*
002:
003: * <copyright>
004: *
005: * Copyright 2002-2007 BBNT Solutions, LLC
006: * under sponsorship of the Defense Advanced Research Projects
007: * Agency (DARPA).
008: *
009: * You can redistribute this software and/or modify it under the
010: * terms of the Cougaar Open Source License as published on the
011: * Cougaar Open Source Website (www.cougaar.org).
012: *
013: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
014: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
015: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
016: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
017: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
018: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
019: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
020: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
021: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
022: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
023: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
024: *
025: * </copyright>
026:
027: */
028:
029: package org.cougaar.qos.qrs;
030:
031: import org.cougaar.util.log.Logger;
032:
033: /**
034: * This a 'rate-ifier' abstraction. It's designed to accumulate calculated
035: * instantanteous data so that a periodic poller can supply a rate to the
036: * client. The period is supplied as an argument. For now the assumption is that
037: * raw values are doubles. Instantiate subclasses must provide two methods:
038: * configureDependencies(), to set up any dependencies on other formulas, and
039: * computeValueFromDependencies, which should calculate a new value given a
040: * collection of dependency values.
041: *
042: * A useful extension is SingleKeyPollingIntegral.java, a PollingIntegral which
043: * relies on a single DataFeed value.
044: */
045: abstract class PollingIntegral extends DataFormula {
046: private double lastValue;
047: private long lastTime; // not necessarily == lastValue.timestamp
048: private double runningIntegral, lastRunningIntegral;
049: private double credibility;
050: private boolean pollingValid;
051: private DataFormula alarm;
052: private int period;
053: private Logger logger;
054:
055: protected abstract void configureDependencies();
056:
057: protected abstract DataValue computeValueFromDependencies(
058: Values values);
059:
060: protected void setArgs(String[] args) {
061: if (args == null || args.length == 0) {
062: throw new RuntimeException(
063: "No period supplied to PollingIntegral");
064: }
065: super .setArgs(args);
066: this .period = Integer.parseInt(args[0]);
067: }
068:
069: protected void initialize(ResourceContext context) {
070: super .initialize(context);
071: lastTime = 0;
072: configureDependencies();
073: String[] parameters = {};
074: ResourceContext dependency = RSS.instance().resolveSpec(
075: "Alarm", parameters);
076: alarm = registerDependency(dependency, "AlarmFormula",
077: getArgs(), "Alarm");
078: logger = Logging.getLogger(PollingIntegral.class);
079: }
080:
081: // Note that this doesn't return the value of the formula itself.
082: // The value returned here will simply be included in the
083: // integral, via newValue(). Only the poller alters the actual
084: // formula value.
085: //
086: protected DataValue doCalculation(Values values) {
087: return computeValueFromDependencies(values);
088: }
089:
090: // Some value I depend on has changed. Backward chain, but don't
091: // notify listeners!
092: public void dataValueChanged(DataFormula changedFormula) {
093: if (changedFormula == alarm) {
094: pollUpdate();
095: } else {
096: DataValue newValue = computeValue(false);
097: newValue(newValue);
098: }
099: }
100:
101: // No forward-chaining here, just use the cache
102: public DataValue blockingQuery() {
103: return getCachedValue();
104: }
105:
106: private void integrate(long time) {
107: long duration = time - lastTime;
108: runningIntegral += duration * lastValue;
109: lastTime = time;
110: }
111:
112: private boolean noData() {
113: return lastTime == 0.0;
114: }
115:
116: private synchronized void newValue(DataValue value) {
117: if (logger.isDebugEnabled()) {
118: logger.debug("newValue " + value.getDoubleValue());
119: }
120: if (noData()) {
121: // first call, no valid timestamp yet
122: runningIntegral = 0.0;
123: lastRunningIntegral = 0.0;
124: lastTime = value.getTimestamp();
125: } else {
126: integrate(value.getTimestamp());
127: }
128: credibility = value.getCredibility(); // this is arbitrary, fix later
129: lastValue = value.getDoubleValue();
130: }
131:
132: private void pollUpdate() {
133: synchronized (this ) {
134: if (noData()) {
135: // No callbacks have arrived yet. No-op
136: pollingValid = false;
137: } else if (!pollingValid) {
138: // First poll with valid data; too early to compute an average
139: // Update runningIntegral with lastValue until now
140: integrate(System.currentTimeMillis());
141: lastRunningIntegral = runningIntegral;
142: pollingValid = true;
143: } else {
144: // update runningIntegral with lastValue until now
145: integrate(System.currentTimeMillis());
146: double avg = (runningIntegral - lastRunningIntegral)
147: / period;
148: if (logger.isDebugEnabled()) {
149: logger.debug("pollUpdate:" + " last="
150: + lastRunningIntegral + " running="
151: + runningIntegral + " avg= " + avg);
152: }
153: lastRunningIntegral = runningIntegral;
154: setCachedValue(new DataValue(avg, credibility));
155: }
156: }
157:
158: }
159:
160: }
|