001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.qos.tmatrix;
028:
029: import java.util.HashMap;
030:
031: import org.cougaar.core.component.ServiceBroker;
032: import org.cougaar.core.mts.MessageAddress;
033: import org.cougaar.core.plugin.ComponentPlugin;
034: import org.cougaar.core.qos.metrics.Constants;
035: import org.cougaar.core.qos.metrics.DecayingHistory;
036: import org.cougaar.core.qos.metrics.Metric;
037: import org.cougaar.core.qos.metrics.MetricImpl;
038: import org.cougaar.core.qos.metrics.MetricsUpdateService;
039: import org.cougaar.core.service.LoggingService;
040: import org.cougaar.core.service.ThreadService;
041: import org.cougaar.core.thread.Schedulable;
042:
043: /* Load this Plugin at LOW priority since it needs another plugin's service
044: */
045: public class AgentFlowRatePlugin extends ComponentPlugin implements
046: Runnable, Constants {
047: private static final int BASE_PERIOD = 10; //10SecAVG
048: private LoggingService logging;
049:
050: private class AgentFlowHistory extends DecayingHistory {
051: private static final double CREDIBILITY = SECOND_MEAS_CREDIBILITY;
052: String msgKey;
053: String byteKey;
054: MessageAddress orig = null;
055: MessageAddress target = null;
056:
057: AgentFlowHistory(MessageAddress orig, MessageAddress target) {
058: super (10, 3, BASE_PERIOD);
059: this .orig = orig;
060: this .target = target;
061: String flowKey = "AgentFlow" + KEY_SEPR + orig + KEY_SEPR
062: + target + KEY_SEPR;
063: msgKey = (flowKey + MSG_RATE).intern();
064: addKey(msgKey);
065: byteKey = (flowKey + BYTE_RATE).intern();
066: addKey(byteKey);
067: }
068:
069: // done on records, not the whole matrix
070: public void newAddition(KeyMap keys,
071: DecayingHistory.SnapShot now_raw,
072: DecayingHistory.SnapShot last_raw) {
073: TrafficMatrix.TrafficRecord now = (TrafficMatrix.TrafficRecord) now_raw;
074: TrafficMatrix.TrafficRecord last = (TrafficMatrix.TrafficRecord) last_raw;
075: double deltaT = (now.timestamp - last.timestamp) / 1000.0;
076: double deltaMsgs = now.msgCount - last.msgCount;
077: double deltaBytes = now.byteCount - last.byteCount;
078:
079: String msgAvgKey = keys.getKey(msgKey);
080: String byteAvgKey = keys.getKey(byteKey);
081:
082: Metric msgAvg = new MetricImpl(new Double(deltaMsgs
083: / deltaT), CREDIBILITY, "msg/sec", "AgentFlowRate");
084: metricsUpdateService.updateValue(msgAvgKey, msgAvg);
085:
086: Metric byteAvg = new MetricImpl(new Double(deltaBytes
087: / deltaT), CREDIBILITY, "bytes/sec",
088: "AgentFlowRate");
089: metricsUpdateService.updateValue(byteAvgKey, byteAvg);
090: if (logging.isDebugEnabled())
091: logging.debug("key=" + msgAvgKey + " Value=" + msgAvg);
092: }
093: }
094:
095: private TrafficMatrixStatisticsService agentFlowService;
096: private MetricsUpdateService metricsUpdateService;
097: private Schedulable schedulable;
098: private HashMap histories;
099:
100: public AgentFlowRatePlugin() {
101: histories = new HashMap();
102: }
103:
104: // Local
105: AgentFlowHistory findOrMakeHistory(MessageAddress orig,
106: MessageAddress target) {
107: HashMap submap = (HashMap) histories.get(orig.getPrimary());
108: if (submap == null) {
109: submap = new HashMap();
110: histories.put(orig.getPrimary(), submap);
111: }
112: AgentFlowHistory history = (AgentFlowHistory) submap.get(target
113: .getPrimary());
114: if (history == null) {
115: history = new AgentFlowHistory(orig, target);
116: submap.put(target.getPrimary(), history);
117: }
118: return history;
119: }
120:
121: // Component
122: public void load() {
123: super .load();
124:
125: ServiceBroker sb = getServiceBroker();
126:
127: logging = (LoggingService) sb.getService(this ,
128: LoggingService.class, null);
129:
130: agentFlowService = (TrafficMatrixStatisticsService) sb
131: .getService(this , TrafficMatrixStatisticsService.class,
132: null);
133: if (agentFlowService == null) {
134: logging
135: .error("Can't find TrafficMatrixStatsisticsService. This plugin must be loaded at Low priority");
136: return;
137: }
138:
139: metricsUpdateService = (MetricsUpdateService) sb.getService(
140: this , MetricsUpdateService.class, null);
141:
142: ThreadService threadService = (ThreadService) sb.getService(
143: this , ThreadService.class, null);
144: schedulable = threadService.getThread(this , this ,
145: "AgentFlowRatePlugin");
146: schedulable.schedule(5000, BASE_PERIOD * 1000);
147: sb.releaseService(this , ThreadService.class, threadService);
148: }
149:
150: // Schedulable body
151: public void run() {
152: int count = 0;
153: TrafficMatrix agentFlowSnapshot = agentFlowService
154: .snapshotMatrix();
155: // print out for sanity's sake
156: if (agentFlowSnapshot != null) {
157: logging
158: .debug("AgentFlowRatePlugin.agentFlowSnapshot Looks like: "
159: + agentFlowSnapshot);
160: }
161: TrafficMatrix.TrafficIterator itr = agentFlowSnapshot
162: .getIterator();
163: while (itr.hasNext()) {
164: count++;
165: TrafficMatrix.TrafficRecord record = (TrafficMatrix.TrafficRecord) itr
166: .next();
167: MessageAddress orig = itr.getOrig();
168: MessageAddress target = itr.getTarget();
169: AgentFlowHistory history = findOrMakeHistory(orig, target);
170: history.add(record);
171: logging
172: .debug("AgentFlowRatePlugin processed TrafficRecord: "
173: + record);
174: }
175: if (logging.isDebugEnabled())
176: logging.debug("Processed Traffic Records=" + count);
177: }
178:
179: // Plugin
180: protected void setupSubscriptions() {
181: }
182:
183: protected void execute() {
184: }
185: }
|