001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.planning.examples;
028:
029: import java.util.Collection;
030: import java.util.Iterator;
031: import org.cougaar.core.blackboard.DirectiveMessage;
032: import org.cougaar.core.component.Component;
033: import org.cougaar.core.component.ServiceBroker;
034: import org.cougaar.core.logging.LoggingServiceWithPrefix;
035: import org.cougaar.core.mts.Message;
036: import org.cougaar.core.mts.MessageAddress;
037: import org.cougaar.core.mts.MessageStatistics;
038: import org.cougaar.core.mts.MessageTransportWatcher;
039: import org.cougaar.core.node.NodeIdentificationService;
040: import org.cougaar.core.qos.metrics.Constants;
041: import org.cougaar.core.qos.metrics.Metric;
042: import org.cougaar.core.qos.metrics.MetricsService;
043: import org.cougaar.core.service.AgentIdentificationService;
044: import org.cougaar.core.service.BlackboardMetricsService;
045: import org.cougaar.core.service.LoggingService;
046: import org.cougaar.core.service.MessageStatisticsService;
047: import org.cougaar.core.service.MessageWatcherService;
048: import org.cougaar.core.service.NodeMetricsService;
049: import org.cougaar.planning.service.PrototypeRegistryService;
050: import org.cougaar.core.service.ThreadService;
051: import org.cougaar.core.thread.Schedulable;
052: import org.cougaar.core.blackboard.Directive;
053: import org.cougaar.planning.ldm.asset.Asset;
054: import org.cougaar.planning.ldm.plan.Notification;
055: import org.cougaar.planning.ldm.plan.PlanElement;
056: import org.cougaar.planning.ldm.plan.Task;
057: import org.cougaar.util.GenericStateModelAdapter;
058:
059: /**
060: * Component that periodically logs sample metrics.
061: * <p>
062: * This component can be added to any or all agents. Load as:<pre>
063: * plugin = org.cougaar.core.examples.MetricsLoggerComponent(csv=true, delay=30000, interval=1000)
064: * </pre> See below for more parameter options.
065: * <p>
066: * MTS message byte and count requires this aspect:<pre>
067: * -Dorg.cougaar.message.transport.aspects=org.cougaar.mts.std.StatisticsAspect
068: * </pre><p>
069: * CPULoad detection requires this node-agent plugin:<pre>
070: * plugin = org.cougaar.core.thread.AgentSensorPlugin
071: * </pre><p>
072: * Here's a sample log4j configuration file that logs to a file
073: * named "metrics.csv":<pre>
074: * log4j.rootCategory=WARN, stdout
075: * log4j.appender.stdout=org.apache.log4j.ConsoleAppender
076: * log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
077: * log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %-5p - %c{1} - %m%n
078: * log4j.appender.metrics=org.apache.log4j.RollingFileAppender
079: * log4j.appender.metrics.File=metrics.csv
080: * log4j.appender.metrics.MaxFileSize=5024KB
081: * log4j.appender.metrics.MaxBackupIndex=1
082: * log4j.appender.metrics.layout=org.apache.log4j.PatternLayout
083: * log4j.appender.metrics.layout.ConversionPattern=%m%n
084: * log4j.additivity.org.cougaar.core.examples.MetricsLoggerComponent=false
085: * log4j.category.org.cougaar.core.examples.MetricsLoggerComponent=DEBUG,metrics
086: * </pre> Save the above log4j configuration in a file named:<pre>
087: * $COUGAAR_INSTALL_PATH/configs/common/log.props
088: * </pre> and enable with this system property: <pre>
089: * -Dorg.cougaar.core.logging.config.filename=log.props
090: * </pre>
091: */
092: public class MetricsLoggerComponent extends GenericStateModelAdapter
093: implements Component {
094:
095: /**
096: * Default component parameters are:<pre>
097: * csv=false <i>log human-readable output, not CSV</i>
098: * delay=30000 <i>wait thirty seconds before starting</i>
099: * interval=1000 <i>once starting, log once per second</i>
100: * </pre>
101: */
102: private boolean useCSV = true;
103: private long delay = 10000;
104: private long interval = 1000;
105:
106: private ServiceBroker sb;
107: private LoggingService rawLogger = LoggingService.NULL;
108: private LoggingService logger = rawLogger; // "AGENT: "+logger
109: private AgentIdentificationService agentIdService;
110: private NodeIdentificationService nodeIdService;
111: private ThreadService threadService;
112: private PrototypeRegistryService protoRegistryService;
113: private BlackboardMetricsService bbMetricsService;
114: private MetricsService metricsService;
115: private MessageStatisticsService messageStatsService;
116: private MessageWatcherService messageWatcherService;
117: private NodeMetricsService nodeMetricsService;
118:
119: private MessageAddress agentId;
120: private MessageAddress nodeId;
121: private String cpuPath;
122: private String toMsgPath;
123: private String fromMsgPath;
124: protected MessageWatcher myMessageWatcher;
125:
126: // gory service load/unload is at the end.
127: //
128: // here we show the interesting stuff up front:
129:
130: private void logAllMetrics() {
131: if (logger.isInfoEnabled()) {
132: if (useCSV) {
133: logCSV();
134: } else {
135: logVerbose();
136: }
137: }
138: }
139:
140: private void logVerbose() {
141: logProtoMetrics();
142: logBlackboardMetrics();
143: logNodeMetrics();
144: logQosMetrics();
145: logMessageStatsMetrics();
146: logMessageWatcherMetrics();
147: }
148:
149: private void logProtoMetrics() {
150: if (protoRegistryService != null) {
151: logger.info("Cached Prototype Count: "
152: + protoRegistryService.getCachedPrototypeCount());
153: logger.info("Property Provider Count: "
154: + protoRegistryService.getPropertyProviderCount());
155: logger.info("Prototype Provider Count: "
156: + protoRegistryService.getPrototypeProviderCount());
157: }
158: }
159:
160: private void logBlackboardMetrics() {
161: if (bbMetricsService != null) {
162: logger.info("Asset Count: "
163: + bbMetricsService.getBlackboardCount(Asset.class));
164: logger.info("Plan Element Count: "
165: + bbMetricsService
166: .getBlackboardCount(PlanElement.class));
167: logger.info("Task Count: "
168: + bbMetricsService.getBlackboardCount(Task.class));
169: logger.info("Total Blackboard Object Count: "
170: + bbMetricsService.getBlackboardCount());
171: }
172: }
173:
174: private void logNodeMetrics() {
175: if (nodeMetricsService != null) {
176: logger.info("Active Thread Count: "
177: + nodeMetricsService.getActiveThreadCount());
178: logger.info("Free Memory: "
179: + nodeMetricsService.getFreeMemory());
180: logger.info("Total Memory: "
181: + nodeMetricsService.getTotalMemory());
182: }
183: }
184:
185: private void logQosMetrics() {
186: logger.info("CPU Load: " + getMetric(cpuPath, 0.0));
187: logger.info("Sent To: " + getMetric(toMsgPath, 0.0));
188: logger.info("Heard From: " + getMetric(fromMsgPath, 0.0));
189: }
190:
191: private void logMessageStatsMetrics() {
192: if (messageStatsService != null) {
193: MessageStatistics.Statistics stats = messageStatsService
194: .getMessageStatistics(false);
195: logger.info("Message Queue: "
196: + stats.averageMessageQueueLength);
197: logger
198: .info("Message Bytes: "
199: + stats.totalSentMessageBytes);
200: logger
201: .info("Message Count: "
202: + stats.totalSentMessageCount);
203: long[] h = stats.histogram;
204: int n = (h != null ? h.length : 0);
205: n = Math.min(n, MessageStatistics.NBINS);
206: for (int i = 0; i < n; i++) {
207: int bin = MessageStatistics.BIN_SIZES[i];
208: logger.info("Histogram[" + bin + "]: " + h[i]);
209: }
210: }
211: }
212:
213: private void logMessageWatcherMetrics() {
214: if (myMessageWatcher != null) {
215: logger.info("Directives In: "
216: + myMessageWatcher.directivesIn);
217: logger.info("Directives Out: "
218: + myMessageWatcher.directivesOut);
219: logger.info("Notifications In: "
220: + myMessageWatcher.notificationsIn);
221: logger.info("Notifications Out: "
222: + myMessageWatcher.notificationsOut);
223: }
224: }
225:
226: private void logCSVHeader() {
227: rawLogger.info(getCSVHeader());
228: }
229:
230: private String getCSVHeader() {
231: StringBuffer buf = new StringBuffer();
232: buf.append("#Agent" + ", Time_In_Millis"
233: + ", Cached_Prototype_Count"
234: + ", Property_Provider_Count"
235: + ", Prototype_Provider_Count" + ", Asset_Count"
236: + ", Plan_Element_Count" + ", Task_Count"
237: + ", Total_Blackboard_Object_Count"
238: + ", Active_Thread_Count" + ", Free_Memory"
239: + ", Total_Memory" + ", CPU_Load" + ", Sent_To"
240: + ", Heard_From" + ", Message_Queue"
241: + ", Message_Bytes" + ", Message_Count");
242: for (int i = 0; i < MessageStatistics.NBINS; i++) {
243: int bin = MessageStatistics.BIN_SIZES[i];
244: buf.append(", Histogram_").append(bin);
245: }
246: buf.append(", Directives_In" + ", Directives_Out"
247: + ", Notifications_In" + ", Notifications_Out");
248: return buf.toString();
249: }
250:
251: private void logCSV() {
252: rawLogger.info(getCSV());
253: }
254:
255: private String getCSV() {
256: StringBuffer buf = new StringBuffer();
257: buf.append(agentId.getAddress());
258: buf.append(", ").append(System.currentTimeMillis());
259: if (protoRegistryService != null) {
260: buf.append(", ").append(
261: protoRegistryService.getCachedPrototypeCount());
262: buf.append(", ").append(
263: protoRegistryService.getPropertyProviderCount());
264: buf.append(", ").append(
265: protoRegistryService.getPrototypeProviderCount());
266: } else {
267: buf.append(", 0, 0, 0");
268: }
269: if (bbMetricsService != null) {
270: buf.append(", ").append(
271: bbMetricsService.getBlackboardCount(Asset.class));
272: buf.append(", ").append(
273: bbMetricsService
274: .getBlackboardCount(PlanElement.class));
275: buf.append(", ").append(
276: bbMetricsService.getBlackboardCount(Task.class));
277: buf.append(", ").append(
278: bbMetricsService.getBlackboardCount());
279: } else {
280: buf.append(", 0, 0, 0, 0");
281: }
282: if (nodeMetricsService != null) {
283: buf.append(", ").append(
284: nodeMetricsService.getActiveThreadCount());
285: buf.append(", ").append(nodeMetricsService.getFreeMemory());
286: buf.append(", ")
287: .append(nodeMetricsService.getTotalMemory());
288: } else {
289: buf.append(", 0, 0, 0");
290: }
291: buf.append(", ").append(getMetric(cpuPath, 0.0));
292: buf.append(", ").append(getMetric(toMsgPath, 0.0));
293: buf.append(", ").append(getMetric(fromMsgPath, 0.0));
294: if (messageStatsService != null) {
295: MessageStatistics.Statistics stats = messageStatsService
296: .getMessageStatistics(false);
297: buf.append(", ").append(stats.averageMessageQueueLength);
298: buf.append(", ").append(stats.totalSentMessageBytes);
299: buf.append(", ").append(stats.totalSentMessageCount);
300: long[] h = stats.histogram;
301: for (int i = 0; i < MessageStatistics.NBINS; i++) {
302: long hi = (i < h.length ? h[i] : 0l);
303: buf.append(", ").append(hi);
304: }
305: } else {
306: buf.append(", 0, 0, 0");
307: for (int i = 0; i < MessageStatistics.NBINS; i++) {
308: buf.append(", 0");
309: }
310: }
311: if (myMessageWatcher != null) {
312: buf.append(", ").append(myMessageWatcher.directivesIn);
313: buf.append(", ").append(myMessageWatcher.directivesOut);
314: buf.append(", ").append(myMessageWatcher.notificationsIn);
315: buf.append(", ").append(myMessageWatcher.notificationsOut);
316: } else {
317: buf.append(", 0, 0, 0, 0");
318: }
319: return buf.toString();
320: }
321:
322: private double getMetric(String s, double val) {
323: Metric metric = metricsService.getValue(s);
324: return (metric != null ? metric.doubleValue() : val);
325: }
326:
327: // service load/unload:
328:
329: public void setParameter(Object o) {
330: try {
331: Collection c = (Collection) o;
332: for (Iterator iter = c.iterator(); iter.hasNext();) {
333: String s = (String) iter.next();
334: int sep = s.indexOf("=");
335: String n = s.substring(0, sep);
336: String v = s.substring(sep + 1);
337: if (n.equals("csv") || n.equals("useCSV")
338: || n.equals("cvs")) {
339: useCSV = "true".equals(v);
340: } else if (n.equals("delay")) {
341: delay = Long.parseLong(v);
342: } else if (n.equals("interval")) {
343: interval = Long.parseLong(v);
344: }
345: }
346: } catch (Exception e) {
347: throw new RuntimeException(
348: "Usage: \"csv=BOOLEAN, delay=MILLIS, interval=MILLIS\"",
349: e);
350: }
351: }
352:
353: public void setServiceBroker(ServiceBroker sb) {
354: this .sb = sb;
355: }
356:
357: public void setLoggingService(LoggingService s) {
358: if (s != null) {
359: rawLogger = s;
360: logger = s;
361: }
362: }
363:
364: public void setAgentIdentificationService(
365: AgentIdentificationService s) {
366: agentIdService = s;
367: if (agentIdService != null) {
368: agentId = agentIdService.getMessageAddress();
369: }
370: }
371:
372: public void setNodeIdentificationService(NodeIdentificationService s) {
373: nodeIdService = s;
374: if (nodeIdService != null) {
375: nodeId = nodeIdService.getMessageAddress();
376: }
377: }
378:
379: public void setThreadService(ThreadService s) {
380: threadService = s;
381: }
382:
383: public void setPrototypeRegistryService(PrototypeRegistryService s) {
384: protoRegistryService = s;
385: }
386:
387: public void setBlackboardMetricsService(BlackboardMetricsService s) {
388: bbMetricsService = s;
389: }
390:
391: public void setNodeMetricsService(NodeMetricsService s) {
392: nodeMetricsService = s;
393: }
394:
395: public void setMetricsService(MetricsService s) {
396: metricsService = s;
397: }
398:
399: public void setMessageStatisticsService(MessageStatisticsService s) {
400: messageStatsService = s;
401: }
402:
403: public void setMessageWatcherService(MessageWatcherService s) {
404: messageWatcherService = s;
405: }
406:
407: public void load() {
408: super .load();
409: if (agentId != null) {
410: logger = LoggingServiceWithPrefix.add(rawLogger, agentId
411: .getAddress()
412: + ": ");
413: }
414: if (metricsService != null && agentId != null && nodeId != null) {
415: String agentName = agentId.getAddress();
416: String nodeName = nodeId.getAddress();
417: String agentPath = "Agent(" + agentName + ")"
418: + Constants.PATH_SEPR;
419: String destPath = "Node(" + nodeName + ")"
420: + Constants.PATH_SEPR + "Destination(" + agentName
421: + ")" + Constants.PATH_SEPR;
422: cpuPath = agentPath + Constants.CPU_LOAD_AVG + "("
423: + Constants._1_SEC_AVG + ")";
424: toMsgPath = destPath + Constants.MSG_TO + "("
425: + Constants._10_SEC_AVG + ")";
426: fromMsgPath = destPath + Constants.MSG_FROM + "("
427: + Constants._10_SEC_AVG + ")";
428: }
429: if (messageWatcherService != null && agentId != null) {
430: myMessageWatcher = new MessageWatcher(agentId);
431: messageWatcherService
432: .addMessageTransportWatcher(myMessageWatcher);
433: }
434: if (logger.isWarnEnabled()) {
435: boolean b = false;
436: if (agentIdService == null) {
437: logger
438: .warn("Unable to obtain AgentIdentificationService");
439: b = true;
440: }
441: if (nodeIdService == null) {
442: logger
443: .warn("Unable to obtain NodeIdentificationService");
444: b = true;
445: }
446: if (threadService == null) {
447: logger.warn("Unable to obtain ThreadService");
448: b = true;
449: }
450: if (protoRegistryService == null) {
451: logger
452: .warn("Unable to obtain PrototypeRegistryService");
453: b = true;
454: }
455: if (bbMetricsService == null) {
456: logger
457: .warn("Unable to obtain BlackboardMetricsService");
458: b = true;
459: }
460: if (nodeMetricsService == null) {
461: logger.warn("Unable to obtain NodeMetricsService");
462: b = true;
463: }
464: if (metricsService == null) {
465: logger.warn("Unable to obtain MetricsService");
466: b = true;
467: }
468: if (messageStatsService == null) {
469: logger
470: .warn("Unable to obtain MessageStatisticsService");
471: b = true;
472: }
473: if (messageWatcherService == null) {
474: logger.warn("Unable to obtain MessageWatcherService");
475: b = true;
476: }
477: if (!b && logger.isInfoEnabled()) {
478: logger.info("Loaded all necessary services");
479: }
480: }
481:
482: if (useCSV && logger.isInfoEnabled()) {
483: logCSVHeader();
484: }
485:
486: if (threadService == null) {
487: if (logger.isErrorEnabled()) {
488: logger.error("Unable to obtain ThreadService");
489: }
490: } else {
491: Runnable poller = new Runnable() {
492: public void run() {
493: logAllMetrics();
494: }
495: };
496: Schedulable sched = threadService.getThread(this , poller);
497: sched.schedule(delay, interval);
498: }
499: }
500:
501: public void unload() {
502: if (agentIdService != null) {
503: sb.releaseService(this , AgentIdentificationService.class,
504: agentIdService);
505: agentIdService = null;
506: }
507: if (nodeIdService != null) {
508: sb.releaseService(this , NodeIdentificationService.class,
509: nodeIdService);
510: nodeIdService = null;
511: }
512: if (threadService != null) {
513: sb.releaseService(this , ThreadService.class, threadService);
514: threadService = null;
515: }
516: if (protoRegistryService != null) {
517: sb.releaseService(this , PrototypeRegistryService.class,
518: protoRegistryService);
519: protoRegistryService = null;
520: }
521: if (bbMetricsService != null) {
522: sb.releaseService(this , BlackboardMetricsService.class,
523: bbMetricsService);
524: bbMetricsService = null;
525: }
526: if (nodeMetricsService != null) {
527: sb.releaseService(this , NodeMetricsService.class,
528: nodeMetricsService);
529: nodeMetricsService = null;
530: }
531: if (metricsService != null) {
532: sb.releaseService(this , MetricsService.class,
533: metricsService);
534: metricsService = null;
535: }
536: if (messageStatsService != null) {
537: sb.releaseService(this , MessageStatisticsService.class,
538: messageStatsService);
539: messageStatsService = null;
540: }
541: if (messageWatcherService != null) {
542: if (myMessageWatcher != null) {
543: messageWatcherService
544: .removeMessageTransportWatcher(myMessageWatcher);
545: myMessageWatcher = null;
546: }
547: sb.releaseService(this , MessageWatcherService.class,
548: messageWatcherService);
549: messageWatcherService = null;
550: }
551: if (rawLogger != null && rawLogger != LoggingService.NULL) {
552: sb.releaseService(this , LoggingService.class, rawLogger);
553: rawLogger = LoggingService.NULL;
554: logger = rawLogger;
555: }
556: super .unload();
557: }
558:
559: private static class MessageWatcher implements
560: MessageTransportWatcher {
561: private final MessageAddress me;
562: public int directivesIn = 0;
563: public int directivesOut = 0;
564: public int notificationsIn = 0;
565: public int notificationsOut = 0;
566:
567: public MessageWatcher(MessageAddress me) {
568: this .me = me;
569: }
570:
571: public void messageSent(Message m) {
572: if (m.getOriginator().equals(me)) {
573: if (m instanceof DirectiveMessage) {
574: Directive[] directives = ((DirectiveMessage) m)
575: .getDirectives();
576: for (int i = 0; i < directives.length; i++) {
577: if (directives[i] instanceof Notification)
578: notificationsOut++;
579: else
580: directivesOut++;
581: }
582: }
583: }
584: }
585:
586: public void messageReceived(Message m) {
587: if (m.getTarget().equals(me)) {
588: if (m instanceof DirectiveMessage) {
589: Directive[] directives = ((DirectiveMessage) m)
590: .getDirectives();
591: for (int i = 0; i < directives.length; i++) {
592: if (directives[i] instanceof Notification)
593: notificationsIn++;
594: else
595: directivesIn++;
596: }
597: }
598: }
599: }
600: }
601: }
|