001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import java.util.ArrayList;
030: import java.util.HashMap;
031: import java.util.Iterator;
032: import java.util.List;
033: import java.util.Map;
034:
035: import org.cougaar.core.component.ServiceBroker;
036: import org.cougaar.core.mts.AttributeConstants;
037: import org.cougaar.core.mts.Message;
038: import org.cougaar.core.mts.MessageAddress;
039: import org.cougaar.core.mts.MessageAttributes;
040: import org.cougaar.core.mts.MulticastMessageAddress;
041: import org.cougaar.core.service.LoggingService;
042: import org.cougaar.core.service.ThreadService;
043: import org.cougaar.core.thread.Schedulable;
044:
045: import org.cougaar.mts.base.DestinationLink;
046: import org.cougaar.mts.base.DestinationLinkDelegateImplBase;
047: import org.cougaar.mts.base.DestinationQueueProviderService;
048: import org.cougaar.mts.base.QueueListener;
049: import org.cougaar.mts.base.SendLink;
050: import org.cougaar.mts.base.SendLinkDelegateImplBase;
051: import org.cougaar.mts.base.SendQueueProviderService;
052: import org.cougaar.mts.base.StandardAspect;
053: import org.cougaar.mts.base.MisdeliveredMessageException;
054: import org.cougaar.mts.base.CommFailureException;
055: import org.cougaar.mts.base.NameLookupException;
056: import org.cougaar.mts.base.UnregisteredNameException;
057: import org.cougaar.mts.base.DontRetryException;
058:
059: /**
060: * This Aspect logs the delivery of messages, as well as the delay if
061: * the delivery takes awhile.
062: */
063: public class DeliveryVerificationAspect extends StandardAspect
064: implements AttributeConstants, Runnable, QueueListener {
065: private static final int TOO_LONG = 10000; // 10 seconds
066: private static final int SAMPLE_PERIOD = TOO_LONG / 2;
067: private static final String WARN_TIME_VALUE_STR = "100";
068: private static final String WARN_TIME_PARAM = "warn-time";
069: private static final String INFO_TIME_VALUE_STR = "10";
070: private static final String INFO_TIME_PARAM = "info-time";
071:
072: // Less ugly than writing a proper math function
073: private static final long[] LIMITS = { TOO_LONG, TOO_LONG * 3,
074: TOO_LONG * 10, TOO_LONG * 30, TOO_LONG * 100,
075: TOO_LONG * 300, TOO_LONG * 1000 };
076:
077: private HashMap pendingMessages = new HashMap();
078: private Schedulable schedulable;
079: private int timeout = TOO_LONG;
080: private int infoTime;
081: private int warnTime;
082:
083: public DeliveryVerificationAspect() {
084: }
085:
086: public void load() {
087: super .load();
088:
089: // initializeParameter(WARN_TIME_PARAM,WARN_TIME_VALUE_STR);
090: // initializeParameter(INFO_TIME_PARAM,INFO_TIME_VALUE_STR);
091: dynamicParameterChanged(WARN_TIME_PARAM, getParameter(
092: WARN_TIME_PARAM, WARN_TIME_VALUE_STR));
093: dynamicParameterChanged(INFO_TIME_PARAM, getParameter(
094: INFO_TIME_PARAM, INFO_TIME_VALUE_STR));
095:
096: }
097:
098: public void start() {
099: super .start();
100: ServiceBroker sb = getServiceBroker();
101: SendQueueProviderService sendq_fact = (SendQueueProviderService) sb
102: .getService(this , SendQueueProviderService.class, null);
103:
104: DestinationQueueProviderService destq_fact = (DestinationQueueProviderService) sb
105: .getService(this ,
106: DestinationQueueProviderService.class, null);
107:
108: LoggingService lsvc = getLoggingService();
109:
110: if (sendq_fact != null) {
111: sendq_fact.addListener(this );
112: sb.releaseService(this , SendQueueProviderService.class,
113: sendq_fact);
114: } else if (lsvc.isInfoEnabled()) {
115: lsvc.info("Couldn't get SendQueueProviderService");
116: }
117:
118: if (destq_fact != null) {
119: destq_fact.addListener(this );
120: sb.releaseService(this ,
121: DestinationQueueProviderService.class, destq_fact);
122: } else if (lsvc.isInfoEnabled()) {
123: lsvc.info("Couldn't get DestinationQueueProviderService");
124: }
125:
126: ThreadService tsvc = getThreadService();
127: schedulable = tsvc
128: .getThread(this , this , "DeliveryVerification");
129: schedulable.schedule(0, SAMPLE_PERIOD);
130:
131: }
132:
133: protected void dynamicParameterChanged(String name, String value) {
134: if (name.equals(WARN_TIME_PARAM)) {
135: warnTime = Integer.parseInt(value) * 1000; //millisecond
136: }
137: if (name.equals(INFO_TIME_PARAM)) {
138: infoTime = Integer.parseInt(value) * 1000; //millisecond
139: }
140: }
141:
142: private boolean timeToLog(long deltaT) {
143: for (int i = 0; i < LIMITS.length; i++) {
144: long lowerBound = LIMITS[i];
145: if (deltaT < lowerBound)
146: return false;
147: long upperBound = lowerBound + SAMPLE_PERIOD;
148: if (deltaT < upperBound)
149: return true;
150: }
151: return false;
152: }
153:
154: // Runnable
155: public void run() {
156: LoggingService lsvc = getLoggingService();
157: if (!lsvc.isWarnEnabled())
158: return;
159:
160: long now = System.currentTimeMillis();
161: synchronized (pendingMessages) {
162: Iterator itr = pendingMessages.entrySet().iterator();
163: while (itr.hasNext()) {
164: Map.Entry entry = (Map.Entry) itr.next();
165: AttributedMessage msg = (AttributedMessage) entry
166: .getKey();
167: long time = ((Long) entry.getValue()).longValue();
168: long deltaT = now - time;
169: if (timeToLog(deltaT)) {
170: if (deltaT >= warnTime)
171: lsvc.warn(msg.logString()
172: + " has been pending for " + deltaT
173: + "ms (Pending Messages="
174: + pendingMessages.size() + ")");
175: else if (deltaT >= infoTime && lsvc.isInfoEnabled())
176: lsvc.info(msg.logString()
177: + " has been pending for " + deltaT
178: + "ms (Pending Messages="
179: + pendingMessages.size() + ")");
180: }
181: }
182: }
183: }
184:
185: // assume caller synchronizes on pendingMessages
186: private void removeMessage(Message message) {
187: LoggingService lsvc = getLoggingService();
188: if (lsvc.isInfoEnabled()) {
189: Long time = (Long) pendingMessages.get(message);
190: if (time != null) {
191: long now = System.currentTimeMillis();
192: long deltaT = now - time.longValue();
193: if (deltaT > timeout) {
194: lsvc.info("Pending "
195: + ((AttributedMessage) message).logString()
196: + " has been sent after " + deltaT + "ms");
197: }
198: }
199: }
200:
201: pendingMessages.remove(message);
202: }
203:
204: public void messagesRemoved(List messages) {
205: LoggingService lsvc = getLoggingService();
206: if (lsvc.isInfoEnabled())
207: lsvc.info("Messages removed from queue");
208: synchronized (messages) {
209: Iterator itr = messages.iterator();
210: AttributedMessage message;
211: while (itr.hasNext()) {
212: message = (AttributedMessage) itr.next();
213: synchronized (pendingMessages) {
214: pendingMessages.remove(message);
215: }
216: if (lsvc.isInfoEnabled())
217: lsvc
218: .info("Removing message "
219: + message.logString());
220: }
221: }
222: }
223:
224: // Aspect
225: public Object getDelegate(Object delegatee, Class type) {
226: if (type == SendLink.class) {
227: SendLink link = (SendLink) delegatee;
228: return new VerificationSendLink(link);
229: } else {
230: return null;
231: }
232: }
233:
234: public Object getReverseDelegate(Object delegatee, Class type) {
235: if (type == DestinationLink.class) {
236: DestinationLink link = (DestinationLink) delegatee;
237: return new VerificationDestinationLink(link);
238: } else {
239: return null;
240: }
241: }
242:
243: private class VerificationSendLink extends SendLinkDelegateImplBase {
244:
245: private VerificationSendLink(SendLink link) {
246: super (link);
247: }
248:
249: public void sendMessage(AttributedMessage message) {
250: MessageAddress destination = message.getTarget();
251: if (destination instanceof MulticastMessageAddress) {
252: LoggingService lsvc = getLoggingService();
253: if (lsvc.isWarnEnabled()) {
254: lsvc.warn("Ignoring Multicast Message " + message);
255: }
256: } else {
257: Long now = new Long(System.currentTimeMillis());
258: synchronized (pendingMessages) {
259: pendingMessages.put(message, now);
260: }
261: }
262: super .sendMessage(message);
263: }
264:
265: public void flushMessages(ArrayList messages) {
266: super .flushMessages(messages);
267: // 'messages' now holds a list of all retracted
268: // Messages. Remove them from the pendingMessages map.
269: synchronized (pendingMessages) {
270: Iterator i = messages.iterator();
271: while (i.hasNext()) {
272: Message message = (Message) i.next();
273: removeMessage(message);
274: }
275: }
276: }
277:
278: }
279:
280: private class VerificationDestinationLink extends
281: DestinationLinkDelegateImplBase {
282: private VerificationDestinationLink(DestinationLink link) {
283: super (link);
284: }
285:
286: public MessageAttributes forwardMessage(
287: AttributedMessage message)
288: throws UnregisteredNameException, NameLookupException,
289: CommFailureException, MisdeliveredMessageException {
290: try {
291: MessageAttributes result = super
292: .forwardMessage(message);
293: // If it returns without an exception, declare the message delivered
294: // ignore delivery status.
295: synchronized (pendingMessages) {
296: removeMessage(message);
297: }
298: return result;
299: } catch (CommFailureException ex) {
300: if (ex.getCause() instanceof DontRetryException) {
301: // Security dropped message, so declared it delivered
302: synchronized (pendingMessages) {
303: removeMessage(message);
304: }
305: }
306: throw ex;
307: }
308:
309: }
310: }
311: }
|