001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.thread;
028:
029: import java.util.HashMap;
030: import java.util.Iterator;
031: import java.util.Map;
032:
033: import org.cougaar.core.component.ServiceBroker;
034: import org.cougaar.core.qos.metrics.Constants;
035: import org.cougaar.core.qos.metrics.Metric;
036: import org.cougaar.core.qos.metrics.MetricImpl;
037: import org.cougaar.core.qos.metrics.MetricsUpdateService;
038: import org.cougaar.core.service.ThreadListenerService;
039: import org.cougaar.core.service.ThreadService;
040:
041: /**
042: * This class listens for events on the closest ThreadService,
043: * collects information about every consumer, and periodically uploads
044: * that information the the metrics service. It's typically
045: * instantiated by the {@link AgentControlPlugin}, which implies that
046: * it's watching the ThreadService for an Agent.
047: */
048: public class SchedulerWatcher implements ThreadListener, Constants {
049: private static final double CREDIBILITY = SECOND_MEAS_CREDIBILITY;
050: private static final String PROVENANCE = "SchedulerWatcher";
051:
052: private String agentName;
053:
054: private class ConsumerRecord {
055: // Static
056: Object consumer;
057: String name;
058: String prefix;
059: //instantanous
060: int outstanding;
061: int pending;
062: //Accumalators
063: long accumalate_timestamp;
064: long count; // accumalator counts
065: long queued; // exit counts
066: long ran; // exit counts
067: // Integrators
068: double sumPending;
069: double sumOutstanding;
070: //Rate Snapshots
071: long snapshot_timestamp;
072: double snapshot_sumOutstanding;
073: double snapshot_sumPending;
074: double snapshot_count;
075: double snapshot_ran;
076: double snapshot_queued;
077: //Rates
078: double utilization;
079: double runs_per_sec;
080: double avg_cpu_per_run;
081: double avg_latency_per_run;
082: double avg_wait_per_run;
083:
084: ConsumerRecord(Object consumer) {
085: // System.err.println("%%%% new ConsumerRecord for " +consumer);
086: this .consumer = consumer;
087: this .name = consumer.toString();
088: this .prefix = "Agent" + KEY_SEPR + agentName + KEY_SEPR
089: + "Plugin" + KEY_SEPR + this .name + KEY_SEPR;
090: }
091:
092: synchronized void snapshot() {
093: // Calculate Deltas
094: long now = System.currentTimeMillis();
095: double deltaSumOutstanding = (sumOutstanding - snapshot_sumOutstanding);
096: double deltaSumPending = (sumPending - snapshot_sumPending);
097: double deltaRuns = (ran - snapshot_ran);
098: double deltaQueued = (queued - snapshot_queued);
099: double deltaTime = (now - snapshot_timestamp);
100: double deltaLatency = deltaSumPending + deltaSumOutstanding;
101:
102: // Calculate Rates
103: utilization = deltaSumOutstanding / deltaTime;
104: runs_per_sec = 1000 * (deltaRuns / deltaTime);
105:
106: avg_latency_per_run = 0;
107: avg_cpu_per_run = 0;
108: avg_wait_per_run = 0;
109: if (deltaRuns > 0) {
110: avg_latency_per_run = deltaLatency / deltaRuns;
111: avg_cpu_per_run = deltaSumOutstanding / deltaRuns;
112: }
113: if (deltaQueued > 0) {
114: avg_wait_per_run = deltaSumPending / deltaQueued;
115: }
116:
117: // Save SnapShot
118: snapshot_timestamp = now;
119: snapshot_sumOutstanding = sumOutstanding;
120: snapshot_sumPending = sumPending;
121: snapshot_count = count;
122: snapshot_ran = ran;
123: snapshot_queued = queued;
124:
125: sendData(utilization, "utilization");
126: sendData(runs_per_sec, "runspersec");
127: sendData(avg_cpu_per_run, "avgcpuperrun");
128: sendData(avg_latency_per_run, "avglatencyperrun");
129: sendData(avg_wait_per_run, "avgwaitperrun");
130: }
131:
132: private void sendData(double value, String tag) {
133: Metric metric = new MetricImpl(new Double(value),
134: CREDIBILITY, "", PROVENANCE);
135: // metricsUpdateService.updateValue(prefix+tag, PROVENANCE, metric);
136: metricsUpdateService.updateValue(prefix + tag, metric);
137: // System.out.println("Metric:"+prefix+tag + "="+metric);
138: }
139:
140: synchronized void accumulate() {
141: ++count;
142: long now = System.currentTimeMillis();
143: if (accumalate_timestamp > 0) {
144: double deltaT = now - accumalate_timestamp;
145: sumOutstanding += deltaT * outstanding;
146: sumPending += deltaT * pending;
147: }
148: accumalate_timestamp = now;
149: }
150: }
151:
152: private class SnapShotter implements Runnable {
153: public void run() {
154: synchronized (records) {
155: Iterator itr = records.values().iterator();
156: while (itr.hasNext()) {
157: ConsumerRecord rec = (ConsumerRecord) itr.next();
158: rec.snapshot();
159: }
160: }
161: }
162: }
163:
164: private Map<Object, ConsumerRecord> records = new HashMap<Object, ConsumerRecord>();
165: private MetricsUpdateService metricsUpdateService;
166:
167: public SchedulerWatcher(ServiceBroker sb, String agent) {
168: agentName = agent;
169: metricsUpdateService = (MetricsUpdateService) sb.getService(
170: this , MetricsUpdateService.class, null);
171: ThreadListenerService tls = (ThreadListenerService) sb
172: .getService(this , ThreadListenerService.class, null);
173: if (tls != null)
174: tls.addListener(this );
175:
176: ThreadService tsvc = (ThreadService) sb.getService(this ,
177: ThreadService.class, null);
178: Runnable body = new SnapShotter();
179: Schedulable sched = tsvc.getThread(this , body);
180: sched.schedule(5000, 1000);
181: sb.releaseService(this , ThreadService.class, tsvc);
182: }
183:
184: ConsumerRecord findRecord(Object consumer) {
185: ConsumerRecord rec = null;
186: synchronized (records) {
187: rec = records.get(consumer);
188: if (rec == null) {
189: rec = new ConsumerRecord(consumer);
190: records.put(consumer, rec);
191: }
192: }
193: return rec;
194: }
195:
196: public void threadQueued(Schedulable schedulable, Object consumer) {
197: ConsumerRecord rec = findRecord(consumer);
198: rec.accumulate();
199: ++rec.pending;
200: }
201:
202: public void threadDequeued(Schedulable schedulable, Object consumer) {
203: ConsumerRecord rec = findRecord(consumer);
204: rec.accumulate();
205: --rec.pending;
206: ++rec.queued;
207: }
208:
209: public void threadStarted(Schedulable schedulable, Object consumer) {
210: ConsumerRecord rec = findRecord(consumer);
211: rec.accumulate();
212: ++rec.outstanding;
213: }
214:
215: public void threadStopped(Schedulable schedulable, Object consumer) {
216: ConsumerRecord rec = findRecord(consumer);
217: rec.accumulate();
218: --rec.outstanding;
219: ++rec.ran;
220: }
221:
222: public void rightGiven(String consumer) {
223: }
224:
225: public void rightReturned(String consumer) {
226: }
227: }
|