001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.base;
028:
029: import java.util.ArrayList;
030:
031: import org.cougaar.core.component.ServiceBroker;
032: import org.cougaar.core.mts.AgentState;
033: import org.cougaar.core.mts.AttributeConstants;
034: import org.cougaar.core.mts.MessageAddress;
035: import org.cougaar.core.mts.MessageTransportClient;
036: import org.cougaar.core.service.LoggingService;
037: import org.cougaar.mts.std.AttributedMessage;
038: import org.cougaar.util.UnaryPredicate;
039:
040: /**
041: * The only implementation of {@link SendLink}, instantiated once per
042: * MessageTransportClient. It's main job is simply to place outgoing
043: * messages on the (singleton) {@link SendQueue}.
044: */
045: final public class SendLinkImpl implements SendLink {
046: static final String VERSION = "version";
047:
048: private SendQueue sendq;
049: private SendQueueProviderService sendq_factory;
050: private DestinationQueueProviderService destq_factory;
051: private MessageAddress addr;
052: private MessageTransportRegistryService registry;
053: private LoggingService loggingService;
054: private Long incarnation;
055: private Object flush_lock = new Object();
056:
057: SendLinkImpl(MessageAddress addr, long incarnation, ServiceBroker sb) {
058: this .addr = addr;
059: this .incarnation = new Long(incarnation);
060: registry = (MessageTransportRegistryService) sb.getService(
061: this , MessageTransportRegistryService.class, null);
062: sendq_factory = (SendQueueProviderService) sb.getService(this ,
063: SendQueueProviderService.class, null);
064: sendq = sendq_factory.getSendQueue(addr);
065: destq_factory = (DestinationQueueProviderService) sb
066: .getService(this ,
067: DestinationQueueProviderService.class, null);
068: loggingService = (LoggingService) sb.getService(this ,
069: LoggingService.class, null);
070: }
071:
072: // This should be locked vis-a-vis flushMessages
073: public void sendMessage(AttributedMessage message) {
074: synchronized (flush_lock) {
075: MessageAddress orig = message.getOriginator();
076: if (!addr.equals(orig)) {
077: loggingService
078: .error("SendLink saw a message whose originator ("
079: + orig
080: + ") didn't match the MessageTransportClient address ("
081: + addr + ")");
082: }
083: message.setAttribute(
084: AttributeConstants.INCARNATION_ATTRIBUTE,
085: incarnation);
086: sendq.sendMessage(message);
087: }
088: }
089:
090: private final UnaryPredicate flushPredicate = new UnaryPredicate() {
091: public boolean execute(Object m) {
092: AttributedMessage msg = (AttributedMessage) m;
093: MessageAddress primalAddress = addr.getPrimary();
094: MessageAddress src = msg.getOriginator().getPrimary();
095: return src.equals(primalAddress);
096: }
097: };
098:
099: public void flushMessages(ArrayList droppedMessages) {
100: synchronized (flush_lock) {
101: sendq_factory.removeMessages(flushPredicate,
102: droppedMessages);
103: destq_factory.removeMessages(flushPredicate,
104: droppedMessages);
105: }
106: }
107:
108: public MessageAddress getAddress() {
109: return addr;
110: }
111:
112: public void release() {
113: registry.removeAgentState(addr);
114: sendq = null;
115: registry = null;
116: }
117:
118: public boolean okToSend(AttributedMessage message) {
119: if (sendq == null)
120: return false; // client has released the service
121:
122: MessageAddress target = message.getTarget();
123: if (target == null || target.toString().equals("")) {
124: if (loggingService.isErrorEnabled())
125: loggingService.error("Malformed message: " + message);
126: return false;
127: } else {
128: return true;
129: }
130: }
131:
132: /**
133: * Redirects the request to the MessageTransportRegistry. */
134: public void registerClient(MessageTransportClient client) {
135: // Should throw an exception of client != this.client
136: registry.registerClient(client);
137: }
138:
139: /**
140: * Redirects the request to the MessageTransportRegistry. */
141: public void unregisterClient(MessageTransportClient client) {
142: // Should throw an exception of client != this.client
143:
144: registry.unregisterClient(client);
145:
146: // NB: The proxy (as opposed to the client) CANNOT be
147: // unregistered here. If it were, messageDelivered callbacks
148: // wouldn't be delivered and flush could block forever.
149: // Unregistering the proxy can only happen as part of
150: // releasing the service (see release());
151: }
152:
153: /**
154: * Redirects the request to the MessageTransportRegistry. */
155: public String getIdentifier() {
156: return registry.getIdentifier();
157: }
158:
159: /**
160: * Redirects the request to the MessageTransportRegistry. */
161: public boolean addressKnown(MessageAddress a) {
162: return registry.addressKnown(a);
163: }
164:
165: public AgentState getAgentState() {
166: return registry.getAgentState(addr);
167: }
168:
169: }
|