001: package org.cougaar.core.qos.profile;
002:
003: import java.lang.reflect.*;
004: import java.io.*;
005: import java.text.*;
006: import java.util.*;
007: import java.util.regex.*;
008: import org.cougaar.core.agent.*;
009: import org.cougaar.core.component.*;
010: import org.cougaar.core.mts.*;
011: import org.cougaar.core.node.*;
012: import org.cougaar.core.qos.metrics.*;
013: import org.cougaar.core.service.*;
014: import org.cougaar.core.service.wp.*;
015: import org.cougaar.core.thread.*;
016: import org.cougaar.core.wp.resolver.*;
017: import org.cougaar.util.*;
018:
019: /**
020: * This component profiles the message traffic (message count and
021: * byte count) from the local aggregated agents to each specific
022: * remote target agent.
023: * <p>
024: * I.e. FROM (this node and it's agents) TO (a specific remote target)
025: * <p>
026: * Example output:<pre>
027: * tr_AgentB - #count, bytes,
028: * tr_AgentB - 4783, 6136339
029: * tr_NodeA - 38, 0
030: * tr_NodeB - 8, 8167
031: * </pre>
032: *
033: * @see ProfilerCoordinator required coordinator component
034: */
035: public class RemoteTraffic extends ProfilerBase {
036: private static final String[] FIELDS = new String[] { "count",
037: "bytes", };
038: private static final String HEADER = toHeader(FIELDS);
039: private static final String ALIGN = "0, 0";
040: private static final MulticastMessageAddress MMA = MulticastMessageAddress
041: .getMulticastMessageAddress("dummy");
042: private static final Comparator MESSAGE_ADDRESS_COMPARATOR = new Comparator() {
043: public int compare(Object a, Object b) {
044: String sa = ((MessageAddress) a).getAddress();
045: String sb = ((MessageAddress) b).getAddress();
046: return sa.compareTo(sb);
047: }
048: };
049: private final Map logs = new HashMap();
050:
051: public AgentStatusService as;
052: public Object dqms;
053:
054: public void load() {
055: super .load();
056: findServiceLater("as",
057: "org.cougaar.core.mts.AgentStatusService");
058: findServiceLater("dqms",
059: "org.cougaar.mts.base.DestinationQueueMonitorService");
060: }
061:
062: public void run() {
063: logTraffic();
064: }
065:
066: private void logTraffic() {
067: MessageAddress[] targets = getDestinations();
068: for (int i = 0; i < targets.length; i++) {
069: MessageAddress t = targets[i];
070: AgentStatusService.AgentState state = as
071: .getRemoteAgentState(t);
072: if (state != null) {
073: getLog(t).shout(
074: state.deliveredCount + ", "
075: + state.deliveredBytes);
076: }
077: }
078: }
079:
080: private LoggingService getLog(MessageAddress addr) {
081: LoggingService log;
082: synchronized (logs) {
083: log = (LoggingService) logs.get(addr);
084: if (log == null) {
085: String as = addr.getAddress().replace('.', '_');
086: String key = "tr_" + as;
087: log = (LoggingService) sb
088: .getService(
089: ("org.cougaar.core.qos.profile.remote_traffic." + key),
090: LoggingService.class, null);
091: logs.put(addr, log);
092: if (header) {
093: log.shout(HEADER);
094: }
095: if (align) {
096: for (int i = 0, n = getRunCount(); i < n; i++) {
097: log.shout("0, 0");
098: }
099: }
100: }
101: }
102: return log;
103: }
104:
105: private MessageAddress[] getDestinations() {
106: MessageAddress[] ret;
107: try {
108: Class cl = Class
109: .forName("org.cougaar.mts.base.DestinationQueueMonitorService");
110: Method m = cl.getMethod("getDestinations", null);
111: ret = (MessageAddress[]) m.invoke(dqms, null);
112: } catch (Exception e) {
113: System.err.println("getDestinations failed: " + e);
114: ret = null;
115: }
116: if (ret == null || ret.length == 0) {
117: return new MessageAddress[0];
118: }
119: Arrays.sort(ret, MESSAGE_ADDRESS_COMPARATOR);
120: return ret;
121: }
122: }
|