001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.base;
028:
029: import java.util.ArrayList;
030: import java.util.Iterator;
031:
032: import org.cougaar.mts.std.AttributedMessage;
033:
034: import org.cougaar.core.component.Container;
035: import org.cougaar.core.component.ServiceBroker;
036: import org.cougaar.core.mts.MessageAddress;
037: import org.cougaar.core.service.ThreadService;
038: import org.cougaar.util.PropertyParser;
039:
040: /**
041: * The default, and for now only, implementation of {@link
042: * DestinationQueue}. The dispatcher on this queue selects a {@link
043: * DestinationLink} based on the {@link LinkSelectionPolicy} and
044: * forwards to that link. If an exception occurs during the
045: * forwarding, it will retry the whole process , including link
046: * selection, continuously, gradually increasing the delay between
047: * retries. until the message has been successfully forwarded to the
048: *
049: **/
050: final class DestinationQueueImpl extends MessageQueue implements
051: DestinationQueue {
052: private static final int INITIAL_RETRY_TIMEOUT = PropertyParser
053: .getInt("org.cougaar.core.mts.destq.retry.initialTimeout",
054: 500); // 1/2 second
055: private static final int MAX_RETRY_TIMEOUT = PropertyParser.getInt(
056: "org.cougaar.core.mts.destq.retry.maxTimeout", 60 * 1000); // 1 minute
057: private MessageAddress destination;
058: private LinkSelectionPolicy selectionPolicy;
059: private DestinationQueue delegate;
060: private ArrayList destinationLinks;
061:
062: private class LinkIterator implements Iterator {
063: int position;
064: DestinationLink next;
065:
066: LinkIterator() {
067: position = 0;
068: findNextValidLink();
069: }
070:
071: private void findNextValidLink() {
072: while (position < destinationLinks.size()) {
073: next = (DestinationLink) destinationLinks.get(position);
074: if (next.isValid()) {
075: if (loggingService.isDebugEnabled())
076: loggingService.debug("Link "
077: + next.getProtocolClass() + " ["
078: + position + "] for "
079: + next.getDestination() + " is valid");
080: return;
081: }
082: if (loggingService.isDebugEnabled())
083: loggingService.debug("Link "
084: + next.getProtocolClass() + " [" + position
085: + "] for " + next.getDestination()
086: + " is not valid");
087: ++position;
088: }
089: next = null;
090: }
091:
092: public boolean hasNext() {
093: return next != null;
094: }
095:
096: public Object next() {
097: DestinationLink link = next;
098: ++position;
099: findNextValidLink();
100: return link;
101: }
102:
103: public void remove() {
104: throw new RuntimeException("Cannot remove link");
105: }
106:
107: }
108:
109: DestinationQueueImpl(MessageAddress destination, Container container) {
110: super (destination.toString() + "/DestQ", container);
111: this .destination = destination;
112: container.add(this );
113: }
114:
115: // workaround for bug 3723
116: private boolean loaded = false;
117:
118: protected synchronized void transitState(String op,
119: int expectedState, int endState) {
120: if (getModelState() == expectedState) {
121: super .transitState(op, expectedState, endState);
122: }
123: }
124:
125: public synchronized boolean shouldLoad() {
126: if (loaded)
127: return false;
128: loaded = true;
129: return true;
130: }
131:
132: public void load() {
133: if (!shouldLoad())
134: return;
135: super .load();
136: ServiceBroker sb = getServiceBroker();
137: selectionPolicy = (LinkSelectionPolicy) sb.getService(this ,
138: LinkSelectionPolicy.class, null);
139:
140: this .delegate = this ;
141:
142: // cache DestinationLinks, per transport
143: destinationLinks = getRegistry().getDestinationLinks(
144: destination);
145:
146: }
147:
148: int getLane() {
149: return ThreadService.WILL_BLOCK_LANE;
150: }
151:
152: public MessageAddress getDestination() {
153: return destination;
154: }
155:
156: /**
157: * Enqueues the given message. */
158: public void holdMessage(AttributedMessage message) {
159: add(message);
160: }
161:
162: public boolean matches(MessageAddress address) {
163: return destination.equals(address);
164: }
165:
166: void setDelegate(DestinationQueue delegate) {
167: this .delegate = delegate;
168: }
169:
170: // Save retry-state as instance variables
171:
172: private int retryTimeout = INITIAL_RETRY_TIMEOUT;
173: private int retryCount = 0;
174: private Exception lastException = null;
175: private AttributedMessage previous = null;
176:
177: private void resetState() {
178: retryTimeout = INITIAL_RETRY_TIMEOUT;
179: retryCount = 0;
180: lastException = null;
181: previous = null;
182: }
183:
184: /**
185: * Processes the next dequeued message. */
186: boolean dispatch(AttributedMessage message) {
187: if (message == null)
188: return true;
189: if (retryCount == 0)
190: delegate.dispatchNextMessage(message);
191: else
192: dispatchNextMessage(message);
193: return retryCount == 0;
194: }
195:
196: public void dispatchNextMessage(AttributedMessage message) {
197: if (retryCount == 0) {
198: message.snapshotAttributes();
199: previous = message;
200: } else {
201: if (loggingService.isDebugEnabled())
202: loggingService.debug("Retrying " + message);
203: }
204:
205: Iterator links = new LinkIterator();
206: if (!links.hasNext()) {
207: if (loggingService.isInfoEnabled())
208: loggingService.info("No valid links to " + destination);
209: } else {
210: DestinationLink link = selectionPolicy.selectLink(links,
211: message, previous, retryCount, lastException);
212: if (link != null) {
213: if (loggingService.isInfoEnabled())
214: loggingService.info("To Agent=" + destination
215: + " Selected Protocol "
216: + link.getProtocolClass());
217: try {
218: link.addMessageAttributes(message);
219: link.forwardMessage(message);
220: resetState();
221: return;
222: } catch (UnregisteredNameException no_name) {
223: lastException = no_name;
224: // nothing to say here
225: } catch (NameLookupException lookup_error) {
226: lastException = lookup_error;
227: if (loggingService.isErrorEnabled())
228: loggingService.error(null, lookup_error);
229: } catch (CommFailureException comm_failure) {
230: Exception cause = (Exception) comm_failure
231: .getCause();
232: if (loggingService.isWarnEnabled()) {
233: String msg = "Failure in communication, message "
234: + message + " caused by \n" + cause;
235: loggingService.warn(msg);
236: if (loggingService.isInfoEnabled()) {
237: loggingService.info("", cause);
238: }
239: }
240: if (cause instanceof DontRetryException) {
241: // Act as if the message has gone through.
242: resetState();
243: return;
244: } else {
245: // This is some other kind of CommFailure, not
246: // related to security. Retry.
247: lastException = comm_failure;
248: }
249: } catch (MisdeliveredMessageException misd) {
250: lastException = misd;
251: if (loggingService.isDebugEnabled())
252: loggingService.debug(misd.toString());
253: }
254:
255: if (!link.retryFailedMessage(message, retryCount)) {
256: resetState();
257: return;
258: }
259: } else if (loggingService.isInfoEnabled()) {
260: loggingService.info("No Protocol selected for Agent"
261: + message.getTarget());
262: }
263: }
264:
265: retryCount++;
266: previous = new AttributedMessage(message);
267: message.restoreSnapshot();
268: scheduleRestart(retryTimeout);
269: retryTimeout = Math.min(retryTimeout + retryTimeout,
270: MAX_RETRY_TIMEOUT);
271: }
272:
273: }
|