001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import org.cougaar.core.component.ServiceBroker;
030: import org.cougaar.core.mts.AttributeConstants;
031: import org.cougaar.core.mts.MessageAddress;
032: import org.cougaar.core.mts.MessageAttributes;
033: import org.cougaar.core.mts.SimpleMessageAttributes;
034: import org.cougaar.core.service.IncarnationService;
035: import org.cougaar.core.service.LoggingService;
036: import org.cougaar.mts.base.CommFailureException;
037: import org.cougaar.mts.base.DestinationLink;
038: import org.cougaar.mts.base.DestinationLinkDelegateImplBase;
039: import org.cougaar.mts.base.MessageDeliverer;
040: import org.cougaar.mts.base.MessageDelivererDelegateImplBase;
041: import org.cougaar.mts.base.MessageTransportRegistryService;
042: import org.cougaar.mts.base.MisdeliveredMessageException;
043: import org.cougaar.mts.base.NameLookupException;
044: import org.cougaar.mts.base.StandardAspect;
045: import org.cougaar.mts.base.UnregisteredNameException;
046:
047: /**
048: * This Aspect assists in the detection of out-of-date incarnations of
049: * Agents.
050: */
051: public class OldIncarnationAspect extends StandardAspect {
052:
053: private static transient MessageAttributes DummyReturn;
054: static {
055: DummyReturn = new SimpleMessageAttributes();
056: DummyReturn.setAttribute(MessageAttributes.DELIVERY_ATTRIBUTE,
057: MessageAttributes.DELIVERY_STATUS_OLD_INCARNATION);
058: }
059:
060: private MessageTransportRegistryService registry;
061: private IncarnationService incarnationSvc;
062:
063: public void load() {
064: super .load();
065: LoggingService loggingService = getLoggingService();
066: ServiceBroker sb = getServiceBroker();
067:
068: registry = (MessageTransportRegistryService) sb.getService(
069: this , MessageTransportRegistryService.class, null);
070: if (registry == null && loggingService.isWarnEnabled())
071: loggingService
072: .warn("Couldn't get MessageTransportRegistryService");
073:
074: incarnationSvc = (IncarnationService) sb.getService(this ,
075: IncarnationService.class, null);
076: if (incarnationSvc == null && loggingService.isWarnEnabled())
077: loggingService.warn("Couldn't get IncarnationService");
078: }
079:
080: public Object getDelegate(Object delegate, Class type) {
081: if (type == DestinationLink.class) {
082: return new DestinationLinkDelegate(
083: (DestinationLink) delegate);
084: } else if (type == MessageDeliverer.class) {
085: return new MessageDelivererDelegate(
086: (MessageDeliverer) delegate);
087: } else {
088: return null;
089: }
090: }
091:
092: private class MessageDelivererDelegate extends
093: MessageDelivererDelegateImplBase {
094: public MessageDelivererDelegate(MessageDeliverer deliverer) {
095: super (deliverer);
096: }
097:
098: // The sender's incarnation has to be checked for two cases.
099: // If it's new we need to inform the incarnation service of
100: // that fact. If it's an obsolete incarnation we need to avoid
101: // delivering the message, and also to notify the sender (the
102: // sender-side handling of this case is in the DestinationLink
103: // delegate, below).
104: public MessageAttributes deliverMessage(
105: AttributedMessage message, MessageAddress addr)
106: throws MisdeliveredMessageException {
107: LoggingService loggingService = getLoggingService();
108:
109: // Check sender incarnation
110: MessageAddress sender = message.getOriginator();
111: Long incarnationAttr = (Long) message
112: .getAttribute(AttributeConstants.INCARNATION_ATTRIBUTE);
113:
114: if (incarnationAttr == null) {
115: throw new RuntimeException(
116: "No incarnation number in message " + message);
117: }
118:
119: long incarnation = incarnationAttr.longValue();
120: int status = incarnationSvc.updateIncarnation(sender,
121: incarnation);
122: if (status > 0) {
123: if (loggingService.isInfoEnabled())
124: loggingService
125: .info("Detected new incarnation number "
126: + incarnation + " in message "
127: + message);
128: } else if (status < 0) {
129: // Bogus message from old incarnation. Pretend normal
130: // delivery but don't process it.
131: if (loggingService.isInfoEnabled())
132: loggingService
133: .info("Detected remote obsolete incarnation number "
134: + incarnation
135: + " in message "
136: + message);
137: long new_incarnation = incarnationSvc
138: .getIncarnation(sender);
139:
140: MessageAttributes reply = new SimpleMessageAttributes();
141: reply
142: .setAttribute(
143: MessageAttributes.DELIVERY_ATTRIBUTE,
144: MessageAttributes.DELIVERY_STATUS_OLD_INCARNATION);
145: reply.setAttribute(
146: AttributeConstants.INCARNATION_ATTRIBUTE,
147: new Long(new_incarnation));
148: return reply;
149: }
150:
151: return super .deliverMessage(message, addr);
152: }
153:
154: }
155:
156: private class DestinationLinkDelegate extends
157: DestinationLinkDelegateImplBase {
158:
159: public DestinationLinkDelegate(DestinationLink link) {
160: super (link);
161: }
162:
163: public MessageAttributes forwardMessage(
164: AttributedMessage message)
165: throws UnregisteredNameException, NameLookupException,
166: CommFailureException, MisdeliveredMessageException
167:
168: {
169: LoggingService loggingService = getLoggingService();
170:
171: // Check sender - don't allow messages from obsolete local
172: // agents.
173: if (!registry.isLocalClient(message.getOriginator())) {
174: if (loggingService.isWarnEnabled()) {
175: loggingService
176: .warn("Blocking message from local obsolete agent: "
177: + message);
178: }
179: return DummyReturn;
180: }
181:
182: // Handle the special return if the receive side thinks
183: // the sender is obsolete.
184: try {
185: MessageAttributes reply = super .forwardMessage(message);
186: Object status = reply
187: .getAttribute(MessageAttributes.DELIVERY_ATTRIBUTE);
188: if (status
189: .equals(MessageAttributes.DELIVERY_STATUS_OLD_INCARNATION)) {
190: Long new_incarnation_attr = (Long) reply
191: .getAttribute(AttributeConstants.INCARNATION_ATTRIBUTE);
192: long new_incarnation = new_incarnation_attr
193: .longValue();
194: MessageAddress sender = message.getOriginator();
195: incarnationSvc.updateIncarnation(sender,
196: new_incarnation);
197: if (loggingService.isInfoEnabled()) {
198: loggingService.info(message.getTarget()
199: + " says that "
200: + message.getOriginator()
201: + " is obsolete");
202: }
203: }
204: return reply;
205: } catch (UnregisteredNameException ex1) {
206: throw ex1;
207: } catch (NameLookupException ex2) {
208: throw ex2;
209: } catch (CommFailureException ex3) {
210: throw ex3;
211: } catch (MisdeliveredMessageException ex4) {
212: throw ex4;
213: }
214: }
215:
216: }
217:
218: }
|