001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import java.util.Comparator;
030: import java.util.HashMap;
031: import java.util.Iterator;
032: import java.util.TreeSet;
033:
034: import org.cougaar.core.mts.AgentState;
035: import org.cougaar.core.mts.Message;
036: import org.cougaar.core.mts.MessageAddress;
037: import org.cougaar.core.mts.MessageAttributes;
038: import org.cougaar.core.mts.MessageTransportClient;
039: import org.cougaar.core.mts.SimpleMessageAttributes;
040: import org.cougaar.core.service.LoggingService;
041:
042: import org.cougaar.mts.base.ReceiveLink;
043: import org.cougaar.mts.base.ReceiveLinkDelegateImplBase;
044: import org.cougaar.mts.base.SendLink;
045: import org.cougaar.mts.base.SendLinkDelegateImplBase;
046: import org.cougaar.mts.base.StandardAspect;
047:
048: /**
049: * This Aspect adds sequence numbers to messages, and enforces
050: * sequencing based on those numbers.
051: */
052: public class SequenceAspect extends StandardAspect {
053:
054: private static final String SEQ = "org.cougaar.message.transport.sequencenumber";
055: private static final String SEQ_SEND_MAP_ATTR = "org.cougaar.message.transport.sequence.send";
056: private static final String SEQ_RECV_MAP_ATTR = "org.cougaar.message.transport.sequence.recv";
057:
058: private static final Integer ONE = new Integer(1);
059: private static final Integer TWO = new Integer(2);
060:
061: private static Comparator comparator = new MessageComparator();
062:
063: public SequenceAspect() {
064: }
065:
066: public Object getDelegate(Object delegate, Class type) {
067: if (type == SendLink.class) {
068: return new SequencedSendLink((SendLink) delegate);
069: } else {
070: return null;
071: }
072: }
073:
074: public Object getReverseDelegate(Object delegate, Class type) {
075: if (type == ReceiveLink.class) {
076: return new SequencedReceiveLink((ReceiveLink) delegate);
077: } else {
078: return null;
079: }
080: }
081:
082: private static int getSequenceNumber(Object message) {
083: AttributedMessage m = (AttributedMessage) message;
084: return ((Integer) m.getAttribute(SEQ)).intValue();
085: }
086:
087: private class SequencedSendLink extends SendLinkDelegateImplBase {
088: HashMap sequenceNumbers;
089:
090: private SequencedSendLink(SendLink link) {
091: super (link);
092: }
093:
094: // This can't be done in the constructor, since that runs when
095: // the client first requests the MessageTransportService (too
096: // early). Wait for registration.
097: public synchronized void registerClient(
098: MessageTransportClient client) {
099: super .registerClient(client);
100:
101: MessageAddress myAddress = getAddress();
102: AgentState myState = getRegistry().getAgentState(myAddress);
103:
104: synchronized (myState) {
105: sequenceNumbers = (HashMap) myState
106: .getAttribute(SEQ_SEND_MAP_ATTR);
107: if (sequenceNumbers == null) {
108: sequenceNumbers = new HashMap();
109: myState.setAttribute(SEQ_SEND_MAP_ATTR,
110: sequenceNumbers);
111: }
112: }
113: }
114:
115: private Integer nextSeq(AttributedMessage msg) {
116: // Verify that msg.getOriginator() == getAddress() ?
117: MessageAddress dest = msg.getTarget();
118: Integer next = (Integer) sequenceNumbers.get(dest
119: .getPrimary());
120: if (next == null) {
121: sequenceNumbers.put(dest.getPrimary(), TWO);
122: return ONE;
123: } else {
124: int n = next.intValue();
125: sequenceNumbers.put(dest.getPrimary(), new Integer(
126: 1 + n));
127: return next;
128: }
129: }
130:
131: public void sendMessage(AttributedMessage message) {
132: Integer sequence_number = nextSeq(message);
133: message.setAttribute(SEQ, sequence_number);
134: super .sendMessage(message);
135: }
136: }
137:
138: private static class MessageComparator implements Comparator,
139: java.io.Serializable {
140: public int compare(Object msg1, Object msg2) {
141: int seq1 = getSequenceNumber(msg1);
142: int seq2 = getSequenceNumber(msg2);
143: if (seq1 == seq2)
144: return 0;
145: else if (seq1 < seq2)
146: return -1;
147: else
148: return 1;
149: }
150:
151: public boolean equals(Object obj) {
152: return (obj == this );
153: }
154:
155: }
156:
157: private static class ConversationState implements
158: java.io.Serializable {
159: int nextSeqNum;
160: TreeSet heldMessages;
161:
162: public ConversationState() {
163: nextSeqNum = 1;
164: heldMessages = new TreeSet(comparator);
165: }
166:
167: private void stripAndDeliver(AttributedMessage message,
168: SequencedReceiveLink link) {
169: // message.removeAttribute(SEQ);
170: link.super DeliverMessage(message);
171: nextSeqNum++;
172: }
173:
174: private MessageAttributes handleNewMessage(
175: AttributedMessage message, SequencedReceiveLink link,
176: LoggingService loggingService) {
177: MessageAttributes meta = new SimpleMessageAttributes();
178: String delivery_status = null;
179: int msgSeqNum = getSequenceNumber(message);
180: if (nextSeqNum > msgSeqNum) {
181: Message contents = message.getRawMessage();
182: if (loggingService.isDebugEnabled())
183: loggingService.debug("Dropping duplicate " + " <"
184: + contents.getClass().getName() + " "
185: + contents.hashCode() + " "
186: + message.getOriginator() + "->"
187: + message.getTarget() + " #" + msgSeqNum);
188: delivery_status = MessageAttributes.DELIVERY_STATUS_DROPPED_DUPLICATE;
189: } else if (nextSeqNum == msgSeqNum) {
190: stripAndDeliver(message, link);
191: Iterator itr = heldMessages.iterator();
192: while (itr.hasNext()) {
193: AttributedMessage next = (AttributedMessage) itr
194: .next();
195: if (getSequenceNumber(next) == nextSeqNum) {
196: if (loggingService.isDebugEnabled())
197: loggingService
198: .debug("delivered held message"
199: + next);
200: stripAndDeliver(next, link);
201: itr.remove();
202: }
203: }//end while
204: delivery_status = MessageAttributes.DELIVERY_STATUS_DELIVERED;
205: } else {
206: if (loggingService.isDebugEnabled())
207: loggingService
208: .debug("holding out of sequence message"
209: + message);
210: heldMessages.add(message);
211: delivery_status = MessageAttributes.DELIVERY_STATUS_HELD;
212: }
213:
214: meta.setAttribute(MessageAttributes.DELIVERY_ATTRIBUTE,
215: delivery_status);
216: return meta;
217: }
218: }
219:
220: private class SequencedReceiveLink extends
221: ReceiveLinkDelegateImplBase {
222: HashMap conversationState;
223:
224: private SequencedReceiveLink(ReceiveLink link) {
225: super (link);
226:
227: MessageAddress myAddress = getClient().getMessageAddress();
228: AgentState myState = getRegistry().getAgentState(myAddress);
229: synchronized (myState) {
230: conversationState = (HashMap) myState
231: .getAttribute(SEQ_RECV_MAP_ATTR);
232: if (conversationState == null) {
233: conversationState = new HashMap();
234: myState.setAttribute(SEQ_RECV_MAP_ATTR,
235: conversationState);
236: }
237: }
238: // conversationState = new HashMap();
239: }
240:
241: private void super DeliverMessage(AttributedMessage message) {
242: super .deliverMessage(message);
243: }
244:
245: public MessageAttributes deliverMessage(
246: AttributedMessage message) {
247: Object seq = message.getAttribute(SEQ);
248: if (seq != null) {
249: MessageAddress src = message.getOriginator();
250: ConversationState conversation = (ConversationState) conversationState
251: .get(src);
252: if (conversation == null) {
253: conversation = new ConversationState();
254: conversationState.put(src, conversation);
255: }
256:
257: return conversation.handleNewMessage(message, this ,
258: loggingService);
259: } else {
260: if (loggingService.isErrorEnabled())
261: loggingService.error("No Sequence tag: " + message);
262: return super.deliverMessage(message);
263: }
264:
265: }
266: }
267:
268: }
|