001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import java.util.ArrayList;
030:
031: import org.cougaar.core.component.ServiceBroker;
032: import org.cougaar.core.mts.AttributeConstants;
033: import org.cougaar.core.mts.MessageAttributes;
034: import org.cougaar.core.service.ThreadService;
035: import org.cougaar.core.thread.Schedulable;
036: import org.cougaar.util.PropertyParser;
037: import org.cougaar.util.UnaryPredicate;
038:
039: import org.cougaar.mts.base.MessageReply;
040: import org.cougaar.mts.base.DestinationQueueProviderService;
041: import org.cougaar.mts.base.SendQueueProviderService;
042: import org.cougaar.mts.base.ReceiveLink;
043: import org.cougaar.mts.base.ReceiveLinkDelegateImplBase;
044: import org.cougaar.mts.base.MisdeliveredMessageException;
045: import org.cougaar.mts.base.CommFailureException;
046: import org.cougaar.mts.base.UnregisteredNameException;
047: import org.cougaar.mts.base.NameLookupException;
048: import org.cougaar.mts.base.DestinationLink;
049: import org.cougaar.mts.base.DestinationLinkDelegateImplBase;
050: import org.cougaar.mts.base.SendLink;
051: import org.cougaar.mts.base.SendLinkDelegateImplBase;
052: import org.cougaar.mts.base.StandardAspect;
053:
054: /**
055: * Aspect to throw out a timed out message. Necessary for MsgLog et. al.
056: * Checks every thread in MTS for timed out attributes on a message:<pre>
057: * -SendLink
058: * -DestinationLink
059: * -ReceiveLink
060: * </pre>
061: *
062: * @property org.cougaar.syncClock
063: * Is NTP clock synchronization guaranteed? default is false.
064: */
065: public final class MessageTimeoutAspect extends StandardAspect
066: implements AttributeConstants {
067: private SendQueueProviderService sendq_factory;
068: private DestinationQueueProviderService destq_factory;
069: private final UnaryPredicate timeoutPredicate = new UnaryPredicate() {
070: public boolean execute(Object x) {
071: AttributedMessage msg = (AttributedMessage) x;
072: return timedOut(msg, "Message Timeout Reclaimer");
073: }
074: };
075:
076: public static final boolean SYNC_CLOCK_AVAILABLE = PropertyParser
077: .getBoolean("org.cougaar.syncClock", false);
078:
079: public static final long RECLAIM_PERIOD = PropertyParser.getLong(
080: "org.cougaar.core.mts.timout.reclaim", 60000);
081:
082: public MessageTimeoutAspect() {
083: }
084:
085: public void load() {
086: super .load();
087:
088: ServiceBroker sb = getServiceBroker();
089: ThreadService tsvc = (ThreadService) sb.getService(this ,
090: ThreadService.class, null);
091:
092: Runnable reclaimer = new Runnable() {
093: public void run() {
094: reclaim();
095: }
096: };
097: Schedulable sched = tsvc.getThread(this , reclaimer,
098: "Message Timeout Reclaimer");
099: sched.schedule(RECLAIM_PERIOD, RECLAIM_PERIOD);
100: sb.releaseService(this , ThreadService.class, tsvc);
101:
102: }
103:
104: public void start() {
105: super .start();
106: ServiceBroker sb = getServiceBroker();
107: sendq_factory = (SendQueueProviderService) sb.getService(this ,
108: SendQueueProviderService.class, null);
109: destq_factory = (DestinationQueueProviderService) sb
110: .getService(this ,
111: DestinationQueueProviderService.class, null);
112: }
113:
114: private void reclaim() {
115: ArrayList droppedMessages = new ArrayList(); // not using this yet
116: if (sendq_factory != null)
117: sendq_factory.removeMessages(timeoutPredicate,
118: droppedMessages);
119: if (destq_factory != null)
120: destq_factory.removeMessages(timeoutPredicate,
121: droppedMessages);
122: }
123:
124: // Helper methods
125: private boolean delivered(MessageAttributes attributes) {
126: return attributes != null
127: & attributes.getAttribute(DELIVERY_ATTRIBUTE).equals(
128: DELIVERY_STATUS_DELIVERED);
129: }
130:
131: // retrieves absolute timeout that convertTimeout stored for us
132: private long getTimeout(AttributedMessage message) {
133: long the_timeout = -1;
134: Object attr = message
135: .getAttribute(MESSAGE_SEND_DEADLINE_ATTRIBUTE);
136: if (attr != null) { // check for, convert to long
137: if (attr instanceof Long) {
138: the_timeout = ((Long) attr).longValue();
139: return the_timeout;
140: }
141: }
142: return -1; // something extraordinarily large so msgs will never time out
143: }
144:
145: private boolean timedOut(AttributedMessage message, String station) {
146: long the_timeout = getTimeout(message);
147: // absolute timeout value of must be greater than 0;
148: if (the_timeout > 0) {
149: long now = System.currentTimeMillis();
150: if (the_timeout < now) {
151: // log that the message timed out
152: if (loggingService.isWarnEnabled())
153: loggingService.warn(station
154: + " threw away a message="
155: + message.logString() + " Beyond deadline="
156: + (now - the_timeout) + " ms");
157: return true;
158: }
159: }
160: return false;
161: }
162:
163: /*
164: * Aspect Code to hook into all the links in the MTS chain
165: */
166: public Object getDelegate(Object object, Class type) {
167: if (type == SendLink.class) {
168: return new SendLinkDelegate((SendLink) object);
169: } else if (type == DestinationLink.class) {
170: return new DestinationLinkDelegate((DestinationLink) object);
171: } else if (type == ReceiveLink.class) {
172: if (SYNC_CLOCK_AVAILABLE) {
173: return new ReceiveLinkDelegate((ReceiveLink) object);
174: } else {
175: return null;
176: }
177: } else {
178: return null;
179: }
180: }
181:
182: /*
183: * First thread in the msg chain to check timeout values
184: * Also computes timeout
185: */
186: private class SendLinkDelegate extends SendLinkDelegateImplBase {
187: SendLinkDelegate(SendLink link) {
188: super (link);
189: }
190:
191: // turns relative into absolute timeout
192: // stores back into absolute for other delegates to access
193: // null means no timeout was used
194: long convertTimeout(AttributedMessage message) {
195:
196: long the_timeout = -1;
197:
198: // Get either the relative or absolute timeout values here
199: // One (should) be null
200: Object attr = message
201: .getAttribute(MESSAGE_SEND_TIMEOUT_ATTRIBUTE);
202: if (attr != null) { // check for relative
203: if (attr instanceof Integer) {
204: the_timeout = ((Integer) attr).intValue();
205: the_timeout += System.currentTimeMillis(); // turn into absolute time
206: // store back into absolute attribute value
207: message.setAttribute(
208: MESSAGE_SEND_DEADLINE_ATTRIBUTE, new Long(
209: the_timeout));
210: }
211: } else if (attr == null) {
212: attr = message
213: .getAttribute(MESSAGE_SEND_DEADLINE_ATTRIBUTE);
214: if (attr != null) { // check for absolute
215: if (attr instanceof Long) {
216: the_timeout = ((Long) attr).longValue();
217: }
218: }
219: }
220: return the_timeout;
221: }
222:
223: public void sendMessage(AttributedMessage message) {
224: // convert relative timeouts to absolute
225: // long the_timeout = convertTimeout(message);
226:
227: if (timedOut(message, "SendLink")) {
228: // drop message silently
229: return;
230: }
231: super .sendMessage(message);
232: }
233: }
234:
235: private class DestinationLinkDelegate extends
236: DestinationLinkDelegateImplBase {
237: DestinationLinkDelegate(DestinationLink delegatee) {
238: super (delegatee);
239: }
240:
241: public MessageAttributes forwardMessage(
242: AttributedMessage message)
243: throws UnregisteredNameException, NameLookupException,
244: CommFailureException, MisdeliveredMessageException {
245: if (timedOut(message, "DestinationLink")) {
246: //drop message, set delivery status to dropped
247: MessageAttributes metadata = new MessageReply(message);
248: metadata.setAttribute(
249: MessageAttributes.DELIVERY_ATTRIBUTE,
250: MessageAttributes.DELIVERY_STATUS_DROPPED);
251: return metadata;
252: }
253: MessageAttributes metadata = super .forwardMessage(message);
254: return metadata;
255: }
256: }
257:
258: private class ReceiveLinkDelegate extends
259: ReceiveLinkDelegateImplBase {
260: ReceiveLinkDelegate(ReceiveLink delegatee) {
261: super (delegatee);
262: }
263:
264: public MessageAttributes deliverMessage(
265: AttributedMessage message) {
266: if (timedOut(message, "Deliverer")) {
267: //drop message, set delivery status to dropped
268: MessageAttributes metadata = new MessageReply(message);
269: metadata.setAttribute(
270: MessageAttributes.DELIVERY_ATTRIBUTE,
271: MessageAttributes.DELIVERY_STATUS_DROPPED);
272: return metadata;
273: }
274: MessageAttributes metadata = super.deliverMessage(message);
275: return metadata;
276: }
277: }
278: }
|