001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import java.util.HashMap;
030: import java.util.Iterator;
031: import java.util.List;
032: import java.util.Set;
033:
034: import org.cougaar.core.component.ServiceBroker;
035: import org.cougaar.core.mts.AgentStatusService;
036: import org.cougaar.core.mts.AttributeConstants;
037: import org.cougaar.core.mts.MessageAddress;
038: import org.cougaar.core.mts.MessageAttributes;
039: import org.cougaar.core.mts.MessageTransportClient;
040: import org.cougaar.core.qos.metrics.Constants;
041: import org.cougaar.core.qos.metrics.Metric;
042: import org.cougaar.core.qos.metrics.MetricImpl;
043: import org.cougaar.core.qos.metrics.MetricsUpdateService;
044: import org.cougaar.core.service.LoggingService;
045: import org.cougaar.mts.base.CommFailureException;
046: import org.cougaar.mts.base.DestinationLink;
047: import org.cougaar.mts.base.DestinationLinkDelegateImplBase;
048: import org.cougaar.mts.base.DestinationQueueProviderService;
049: import org.cougaar.mts.base.MessageDeliverer;
050: import org.cougaar.mts.base.MessageDelivererDelegateImplBase;
051: import org.cougaar.mts.base.MisdeliveredMessageException;
052: import org.cougaar.mts.base.NameLookupException;
053: import org.cougaar.mts.base.QueueListener;
054: import org.cougaar.mts.base.SendLink;
055: import org.cougaar.mts.base.SendLinkDelegateImplBase;
056: import org.cougaar.mts.base.SendQueue;
057: import org.cougaar.mts.base.SendQueueDelegateImplBase;
058: import org.cougaar.mts.base.SendQueueProviderService;
059: import org.cougaar.mts.base.StandardAspect;
060: import org.cougaar.mts.base.UnregisteredNameException;
061:
062: /**
063: * This Aspect implements the {@link AgentStatusService}.
064: *
065: * In the <a
066: * href="../../../../../OnlineManual/MetricsService/sensors.html">Sensor
067: * Data Flow</a> pattern this class plays the role of <b>Sensor</b>
068: * for message counts and size among Agents and Nodes.
069: *
070: * @see org.cougaar.core.qos.metrics.AgentStatusRatePlugin
071: * @see org.cougaar.core.qos.metrics.AgentLoadServlet
072: *
073: */
074: public class AgentStatusAspect extends StandardAspect implements
075: AgentStatusService, QueueListener, Constants,
076: AttributeConstants {
077:
078: private static final double SEND_CREDIBILITY = Constants.SECOND_MEAS_CREDIBILITY;
079:
080: private HashMap remoteStates;
081: private HashMap localStates;
082: private AgentState nodeState;
083: private MetricsUpdateService metricsUpdateService;
084:
085: public AgentStatusAspect() {
086: remoteStates = new HashMap();
087: localStates = new HashMap();
088: nodeState = newAgentState();
089: }
090:
091: public void load() {
092: super .load();
093:
094: ServiceBroker sb = getServiceBroker();
095: metricsUpdateService = (MetricsUpdateService) sb.getService(
096: this , MetricsUpdateService.class, null);
097: }
098:
099: public void start() {
100: super .start();
101:
102: ServiceBroker sb = getServiceBroker();
103:
104: SendQueueProviderService sendq_fact = (SendQueueProviderService) sb
105: .getService(this , SendQueueProviderService.class, null);
106:
107: DestinationQueueProviderService destq_fact = (DestinationQueueProviderService) sb
108: .getService(this ,
109: DestinationQueueProviderService.class, null);
110:
111: LoggingService lsvc = getLoggingService();
112:
113: if (sendq_fact != null) {
114: sendq_fact.addListener(this );
115: sb.releaseService(this , SendQueueProviderService.class,
116: sendq_fact);
117: } else if (lsvc.isInfoEnabled()) {
118: lsvc.info("Couldn't get SendQueueProviderService");
119: }
120:
121: if (destq_fact != null) {
122: destq_fact.addListener(this );
123: sb.releaseService(this ,
124: DestinationQueueProviderService.class, destq_fact);
125: } else if (lsvc.isInfoEnabled()) {
126: lsvc.info("Couldn't get DestinationQueueProviderService");
127: }
128: }
129:
130: public void messagesRemoved(List messages) {
131: LoggingService lsvc = getLoggingService();
132: synchronized (messages) {
133: Iterator itr = messages.iterator();
134: AttributedMessage message;
135: while (itr.hasNext()) {
136: message = (AttributedMessage) itr.next();
137: // handle removed message
138: MessageAddress remoteAddr = message.getTarget()
139: .getPrimary();
140: AgentState remoteState = ensureRemoteState(remoteAddr);
141: if (lsvc.isInfoEnabled())
142: lsvc.info("Messages removed from queue "
143: + remoteAddr);
144: synchronized (remoteState) {
145: remoteState.queueLength--;
146: }
147: }
148: }
149: }
150:
151: private AgentState ensureRemoteState(MessageAddress address) {
152: AgentState state = null;
153: synchronized (remoteStates) {
154: state = (AgentState) remoteStates.get(address);
155: if (state == null) {
156: state = newAgentState();
157: remoteStates.put(address, state);
158: }
159: }
160: return state;
161: }
162:
163: private AgentState getRemoteState(MessageAddress address) {
164: AgentState state = null;
165: synchronized (remoteStates) {
166: state = (AgentState) remoteStates.get(address);
167: }
168: return state;
169: }
170:
171: private AgentState ensureLocalState(MessageAddress address) {
172: AgentState state = null;
173: synchronized (localStates) {
174: state = (AgentState) localStates.get(address);
175: if (state == null) {
176: state = newAgentState();
177: localStates.put(address, state);
178: }
179: }
180: return state;
181: }
182:
183: private AgentState getLocalState(MessageAddress address) {
184: AgentState state = null;
185: synchronized (localStates) {
186: state = (AgentState) localStates.get(address);
187: }
188: return state;
189: }
190:
191: private AgentState newAgentState() {
192: AgentState state = new AgentState();
193: // JAZ must be a better way to initialize an object
194: state.timestamp = System.currentTimeMillis();
195: state.status = UNREGISTERED;
196: state.queueLength = 0;
197: state.receivedCount = 0;
198: state.receivedBytes = 0;
199: state.lastReceivedBytes = 0;
200: state.sendCount = 0;
201: state.deliveredCount = 0;
202: state.deliveredBytes = 0;
203: state.lastDeliveredBytes = 0;
204: state.deliveredLatencySum = 0;
205: state.lastDeliveredLatency = 0;
206: state.averageDeliveredLatency = 0;
207: state.unregisteredNameCount = 0;
208: state.nameLookupFailureCount = 0;
209: state.commFailureCount = 0;
210: state.misdeliveredMessageCount = 0;
211: state.lastLinkProtocolTried = null;
212: state.lastLinkProtocolSuccess = null;
213: state.lastHeardFrom = 0;
214: state.lastSentTo = 0;
215: state.lastFailedSend = 0;
216: return state;
217: }
218:
219: // JAZ must be a better way to clone an object
220: private AgentState snapshotState(AgentState state) {
221: AgentState result = new AgentState();
222: synchronized (state) {
223: result.timestamp = state.timestamp;
224: result.status = state.status;
225: result.queueLength = state.queueLength;
226: result.receivedCount = state.receivedCount;
227: result.receivedBytes = state.receivedBytes;
228: result.lastReceivedBytes = state.lastReceivedBytes;
229: result.sendCount = state.sendCount;
230: result.deliveredCount = state.deliveredCount;
231: result.deliveredBytes = state.deliveredBytes;
232: result.lastDeliveredBytes = state.lastDeliveredBytes;
233: result.deliveredLatencySum = state.deliveredLatencySum;
234: result.lastDeliveredLatency = state.lastDeliveredLatency;
235: result.averageDeliveredLatency = state.averageDeliveredLatency;
236: result.unregisteredNameCount = state.unregisteredNameCount;
237: result.nameLookupFailureCount = state.nameLookupFailureCount;
238: result.commFailureCount = state.commFailureCount;
239: result.misdeliveredMessageCount = state.misdeliveredMessageCount;
240: result.lastLinkProtocolTried = state.lastLinkProtocolTried;
241: result.lastLinkProtocolSuccess = state.lastLinkProtocolSuccess;
242: result.lastHeardFrom = state.lastHeardFrom;
243: result.lastSentTo = state.lastSentTo;
244: result.lastFailedSend = state.lastFailedSend;
245: }
246: return result;
247: }
248:
249: private Metric longMetric(long value) {
250: return new MetricImpl(new Long(value), SEND_CREDIBILITY, "",
251: "AgentStatusAspect");
252: }
253:
254: //
255: // Agent Status Service Public Interface
256:
257: // Deprecated: For backwards compatibility
258: public AgentState getAgentState(MessageAddress address) {
259: return getRemoteAgentState(address);
260: }
261:
262: public AgentState getNodeState() {
263: return snapshotState(nodeState);
264: }
265:
266: public AgentState getLocalAgentState(MessageAddress address) {
267: AgentState state = getLocalState(address);
268: // must snapshot state or caller will get a dynamic value.
269: if (state != null)
270: return snapshotState(state);
271: else
272: return null;
273: }
274:
275: public AgentState getRemoteAgentState(MessageAddress address) {
276: AgentState state = getRemoteState(address);
277: // must snapshot state or caller will get a dynamic value.
278: if (state != null)
279: return snapshotState(state);
280: else
281: return null;
282: }
283:
284: public Set getLocalAgents() {
285: Set result = new java.util.HashSet();
286: synchronized (localStates) {
287: result.addAll(localStates.keySet());
288: }
289: return result;
290: }
291:
292: public Set getRemoteAgents() {
293: Set result = new java.util.HashSet();
294: synchronized (remoteStates) {
295: result.addAll(remoteStates.keySet());
296: }
297: return result;
298: }
299:
300: //
301: // Aspect Code to implement Sensors
302:
303: // To gather sensible send-side statistics, this aspect's
304: // delegates need to run very late on the SendQueue (so as to
305: // count any internal messages added to the queue by other aspect
306: // delegates) but very early on the DestinationLink (because the
307: // delegate on that side is processing the return). The aspect
308: // mechanism doesn't provide for station-specific ordering. But
309: // it does provide an implicit early-vs-late switch, since
310: // reverse delegates always run early. Use that here.
311: public Object getDelegate(Object object, Class type) {
312: if (type == SendQueue.class) {
313: return new SendQueueDelegate((SendQueue) object);
314: } else if (type == SendLink.class) {
315: return new SendLinkDelegate((SendLink) object);
316: } else if (type == MessageDeliverer.class) {
317: return new MessageDelivererDelegate(
318: (MessageDeliverer) object);
319: } else {
320: return null;
321: }
322: }
323:
324: public Object getReverseDelegate(Object object, Class type) {
325: if (type == DestinationLink.class) {
326: return new DestinationLinkDelegate((DestinationLink) object);
327: } else {
328: return null;
329: }
330: }
331:
332: private class SendLinkDelegate extends SendLinkDelegateImplBase {
333: SendLinkDelegate(SendLink link) {
334: super (link);
335: }
336:
337: public void release() {
338: MessageAddress addr = getAddress().getPrimary();
339: synchronized (localStates) {
340: localStates.remove(addr);
341: }
342: }
343:
344: public void registerClient(MessageTransportClient client) {
345: super .registerClient(client);
346: ensureLocalState(getAddress().getPrimary());
347: }
348:
349: }
350:
351: private class DestinationLinkDelegate extends
352: DestinationLinkDelegateImplBase {
353: private String spoke_key, heard_key, error_key;
354:
355: public DestinationLinkDelegate(DestinationLink link) {
356: super (link);
357: String remoteAgent = link.getDestination().getAddress();
358: spoke_key = "Agent" + KEY_SEPR + remoteAgent + KEY_SEPR
359: + "SpokeTime";
360: heard_key = "Agent" + KEY_SEPR + remoteAgent + KEY_SEPR
361: + "HeardTime";
362: error_key = "Agent" + KEY_SEPR + remoteAgent + KEY_SEPR
363: + "SpokeErrorTime";
364: }
365:
366: boolean delivered(MessageAttributes attributes) {
367: return attributes != null
368: & attributes.getAttribute(DELIVERY_ATTRIBUTE)
369: .equals(DELIVERY_STATUS_DELIVERED);
370: }
371:
372: public MessageAttributes forwardMessage(
373: AttributedMessage message)
374: throws UnregisteredNameException, NameLookupException,
375: CommFailureException, MisdeliveredMessageException
376:
377: {
378: MessageAddress remoteAddr = message.getTarget()
379: .getPrimary();
380: AgentState remoteState = ensureRemoteState(remoteAddr);
381: MessageAddress localAddr = message.getOriginator()
382: .getPrimary();
383: AgentState localState = getLocalState(localAddr);
384:
385: if (localState == null) {
386: // Leftover message from an unregistered agent
387: LoggingService lsvc = getLoggingService();
388: if (lsvc.isErrorEnabled())
389: lsvc
390: .error("Forwarding leftover message from unregistered agent "
391: + localAddr);
392: return super .forwardMessage(message);
393: }
394:
395: try {
396: long startTime = System.currentTimeMillis();
397: synchronized (remoteState) {
398: remoteState.lastLinkProtocolTried = getProtocolClass()
399: .getName();
400: }
401: // Attempt to Deliver message
402: MessageAttributes meta = super .forwardMessage(message);
403:
404: //successful Delivery
405: long endTime = System.currentTimeMillis();
406: boolean success = delivered(meta);
407: if (success) {
408: metricsUpdateService.updateValue(heard_key,
409: longMetric(endTime));
410: metricsUpdateService.updateValue(spoke_key,
411: longMetric(endTime));
412: }
413:
414: int msgBytes = 0;
415: Object attr = message
416: .getAttribute(MESSAGE_BYTES_ATTRIBUTE);
417: if (attr != null && (attr instanceof Number))
418: msgBytes = ((Number) attr).intValue();
419:
420: long latency = endTime - startTime;
421: double alpha = 0.20;
422: synchronized (remoteState) {
423: if (success) {
424: remoteState.lastHeardFrom = endTime;
425: remoteState.lastSentTo = endTime;
426: }
427: remoteState.status = AgentStatusService.ACTIVE;
428: remoteState.timestamp = System.currentTimeMillis();
429: remoteState.deliveredCount++;
430: remoteState.deliveredBytes += msgBytes;
431: remoteState.lastDeliveredBytes = msgBytes;
432: remoteState.queueLength--;
433: remoteState.lastDeliveredLatency = (int) latency;
434: remoteState.deliveredLatencySum += latency;
435: remoteState.averageDeliveredLatency = (alpha * latency)
436: + ((1 - alpha) * remoteState.averageDeliveredLatency);
437: remoteState.lastLinkProtocolSuccess = getProtocolClass()
438: .getName();
439: }
440: synchronized (localState) {
441: localState.status = AgentStatusService.ACTIVE;
442: localState.timestamp = System.currentTimeMillis();
443: localState.deliveredCount++;
444: localState.deliveredBytes += msgBytes;
445: localState.lastDeliveredBytes = msgBytes;
446: }
447: synchronized (nodeState) {
448: nodeState.timestamp = System.currentTimeMillis();
449: nodeState.deliveredCount++;
450: nodeState.deliveredBytes += msgBytes;
451: }
452:
453: return meta;
454:
455: } catch (UnregisteredNameException unreg) {
456: long now = System.currentTimeMillis();
457: synchronized (remoteState) {
458: remoteState.status = UNREGISTERED;
459: remoteState.timestamp = now;
460: remoteState.unregisteredNameCount++;
461: remoteState.lastFailedSend = now;
462: }
463: metricsUpdateService.updateValue(error_key,
464: longMetric(now));
465: throw unreg;
466: } catch (NameLookupException namex) {
467: long now = System.currentTimeMillis();
468: synchronized (remoteState) {
469: remoteState.status = UNKNOWN;
470: remoteState.timestamp = now;
471: remoteState.nameLookupFailureCount++;
472: remoteState.lastFailedSend = now;
473: }
474: metricsUpdateService.updateValue(error_key,
475: longMetric(now));
476: throw namex;
477: } catch (CommFailureException commex) {
478: long now = System.currentTimeMillis();
479: synchronized (remoteState) {
480: remoteState.status = UNREACHABLE;
481: remoteState.timestamp = now;
482: remoteState.commFailureCount++;
483: remoteState.lastFailedSend = now;
484: }
485: metricsUpdateService.updateValue(error_key,
486: longMetric(now));
487: throw commex;
488: } catch (MisdeliveredMessageException misd) {
489: long now = System.currentTimeMillis();
490: synchronized (remoteState) {
491: remoteState.status = UNREGISTERED;
492: remoteState.timestamp = now;
493: remoteState.misdeliveredMessageCount++;
494: remoteState.lastFailedSend = now;
495: }
496: metricsUpdateService.updateValue(error_key,
497: longMetric(now));
498: throw misd;
499: }
500: }
501:
502: }
503:
504: private class MessageDelivererDelegate extends
505: MessageDelivererDelegateImplBase {
506:
507: MessageDelivererDelegate(MessageDeliverer delegatee) {
508: super (delegatee);
509: }
510:
511: public MessageAttributes deliverMessage(
512: AttributedMessage message, MessageAddress dest)
513: throws MisdeliveredMessageException {
514: String remoteAgent = message.getOriginator().getAddress();
515: String heard_key = "Agent" + KEY_SEPR + remoteAgent
516: + KEY_SEPR + "HeardTime";
517: long receiveTime = System.currentTimeMillis();
518: metricsUpdateService.updateValue(heard_key,
519: longMetric(receiveTime));
520:
521: int msgBytes = 0;
522: Object attr = message.getAttribute(MESSAGE_BYTES_ATTRIBUTE);
523: if (attr != null && (attr instanceof Number))
524: msgBytes = ((Number) attr).intValue();
525:
526: AgentState remoteState = ensureRemoteState(message
527: .getOriginator().getPrimary());
528: synchronized (remoteState) {
529: remoteState.receivedCount++;
530: remoteState.receivedBytes += msgBytes;
531: remoteState.lastHeardFrom = receiveTime;
532: }
533:
534: AgentState localState = getLocalState(message.getTarget()
535: .getPrimary());
536: if (localState != null) {
537: synchronized (localState) {
538: localState.receivedCount++;
539: localState.receivedBytes += msgBytes;
540: }
541: } else {
542: LoggingService lsvc = getLoggingService();
543: if (lsvc.isInfoEnabled())
544: lsvc.info("Received message for non-local agent "
545: + message.getTarget());
546: }
547:
548: synchronized (nodeState) {
549: nodeState.receivedCount++;
550: nodeState.receivedBytes += msgBytes;
551: }
552:
553: return super .deliverMessage(message, dest);
554: }
555:
556: }
557:
558: private class SendQueueDelegate extends SendQueueDelegateImplBase {
559: public SendQueueDelegate(SendQueue queue) {
560:
561: super (queue);
562: }
563:
564: public void sendMessage(AttributedMessage message) {
565: MessageAddress remoteAddr = message.getTarget()
566: .getPrimary();
567: AgentState remoteState = ensureRemoteState(remoteAddr);
568: MessageAddress localAddr = message.getOriginator()
569: .getPrimary();
570: AgentState localState = getLocalState(localAddr);
571:
572: synchronized (remoteState) {
573: remoteState.sendCount++;
574: remoteState.queueLength++;
575: }
576:
577: long receiveTime = System.currentTimeMillis();
578:
579: if (localState != null) {
580: synchronized (localState) {
581: localState.sendCount++;
582: localState.lastHeardFrom = receiveTime;
583: }
584:
585: //Local agent sending message means that the MTS has
586: //"heard from" the local agent
587: String localAgent = localAddr.getAddress();
588: String heard_key = "Agent" + KEY_SEPR + localAgent
589: + KEY_SEPR + "HeardTime";
590: metricsUpdateService.updateValue(heard_key,
591: longMetric(receiveTime));
592: } else {
593: LoggingService lsvc = getLoggingService();
594: if (lsvc.isErrorEnabled())
595: lsvc
596: .error("SendQueue sending leftover message from "
597: + localAddr);
598: }
599:
600: synchronized (nodeState) {
601: nodeState.sendCount++;
602: nodeState.lastHeardFrom = receiveTime; // ???
603: }
604:
605: super.sendMessage(message);
606: }
607: }
608: }
|