001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.base;
028:
029: import java.util.ArrayList;
030: import java.util.HashMap;
031: import java.util.Iterator;
032: import java.util.Map;
033:
034: import org.cougaar.core.component.ServiceBroker;
035: import org.cougaar.core.component.ServiceProvider;
036: import org.cougaar.core.mts.AgentState;
037: import org.cougaar.core.mts.MessageAddress;
038: import org.cougaar.core.mts.MessageTransportClient;
039: import org.cougaar.core.mts.MulticastMessageAddress;
040: import org.cougaar.core.mts.SimpleMessageAttributes;
041: import org.cougaar.core.service.IncarnationService;
042: import org.cougaar.core.service.LoggingService;
043:
044: /**
045: * The MessageTransportRegistry {@link ServiceProvider} singleton is a
046: * utility instance that helps certain pieces of the message transport
047: * subsystem to find one another. It provides the {@link
048: * MessageTransportRegistryService}. An inner class implements that
049: * service.
050: */
051: public final class MessageTransportRegistry implements ServiceProvider {
052:
053: public ServiceImpl service;
054:
055: public MessageTransportRegistry(String name, ServiceBroker sb) {
056: service = new ServiceImpl(name, sb);
057: }
058:
059: public Object getService(ServiceBroker sb, Object requestor,
060: Class serviceClass) {
061: if (serviceClass == MessageTransportRegistryService.class) {
062: return service;
063: } else {
064: return null;
065: }
066: }
067:
068: public void releaseService(ServiceBroker sb, Object requestor,
069: Class serviceClass, Object service) {
070: }
071:
072: static final class ServiceImpl implements
073: MessageTransportRegistryService,
074: IncarnationService.Callback {
075:
076: private String name;
077: private HashMap receiveLinks = new HashMap(89);
078: private HashMap agentStates = new HashMap();
079: private HashMap localClients = new HashMap();
080: private ArrayList linkProtocols = new ArrayList();
081: private ReceiveLinkProviderService receiveLinkProvider;
082: private NameSupport nameSupport;
083: private ServiceBroker sb;
084: private LoggingService loggingService;
085: private IncarnationService incarnationService;
086:
087: private ServiceImpl(String name, ServiceBroker sb) {
088: this .name = name;
089: this .sb = sb;
090: loggingService = (LoggingService) sb.getService(this ,
091: LoggingService.class, null);
092:
093: incarnationService = (IncarnationService) sb.getService(
094: this , IncarnationService.class, null);
095: if (incarnationService == null
096: && loggingService.isWarnEnabled())
097: loggingService.warn("Couldn't load IncarnationService");
098: }
099:
100: private NameSupport nameSupport() {
101: if (nameSupport == null)
102: nameSupport = (NameSupport) sb.getService(this ,
103: NameSupport.class, null);
104: return nameSupport;
105: }
106:
107: private ReceiveLink makeReceiveLink(
108: MessageTransportClient client) {
109: if (receiveLinkProvider == null) {
110: receiveLinkProvider = (ReceiveLinkProviderService) sb
111: .getService(this ,
112: ReceiveLinkProviderService.class, null);
113: }
114: ReceiveLink link = receiveLinkProvider
115: .getReceiveLink(client);
116: receiveLinks.put(client.getMessageAddress(), link);
117: return link;
118: }
119:
120: private void registerClientWithSociety(
121: MessageTransportClient client) {
122: // register with each component transport
123: synchronized (linkProtocols) {
124: Iterator protocols = linkProtocols.iterator();
125: while (protocols.hasNext()) {
126: LinkProtocol protocol = (LinkProtocol) protocols
127: .next();
128: protocol.registerClient(client);
129: }
130: }
131: }
132:
133: private void unregisterClientWithSociety(
134: MessageTransportClient client) {
135: // register with each component transport
136: synchronized (linkProtocols) {
137: Iterator protocols = linkProtocols.iterator();
138: while (protocols.hasNext()) {
139: LinkProtocol protocol = (LinkProtocol) protocols
140: .next();
141: protocol.unregisterClient(client);
142: }
143: }
144: }
145:
146: public synchronized void incarnationChanged(
147: MessageAddress address, long incarnation) {
148: MessageAddress key = address.getPrimary();
149: MessageTransportClient client = null;
150: client = (MessageTransportClient) localClients.get(key);
151: if (client != null
152: && client.getIncarnationNumber() < incarnation) {
153: agentStates.remove(key);
154: receiveLinks.remove(key);
155: localClients.remove(key);
156: }
157: }
158:
159: private synchronized void addLocalClient(
160: MessageTransportClient client) {
161: MessageAddress key = client.getMessageAddress();
162: localClients.put(key.getPrimary(), client);
163:
164: if (incarnationService != null) {
165: incarnationService.subscribe(key, this );
166: }
167:
168: try {
169: ReceiveLink link = findLocalReceiveLink(key);
170: if (link == null) {
171: link = makeReceiveLink(client);
172: }
173: } catch (Exception e) {
174: if (loggingService.isErrorEnabled())
175: loggingService.error(e.toString());
176: }
177: }
178:
179: private synchronized void removeLocalClient(
180: MessageTransportClient client) {
181: MessageAddress key = client.getMessageAddress();
182: try {
183: receiveLinks.remove(key);
184: localClients.remove(key.getPrimary());
185: } catch (Exception e) {
186: }
187: }
188:
189: public boolean hasLinkProtocols() {
190: synchronized (linkProtocols) {
191: return linkProtocols.size() > 0;
192: }
193: }
194:
195: public void addLinkProtocol(LinkProtocol lp) {
196: synchronized (linkProtocols) {
197: linkProtocols.add(lp);
198: }
199: }
200:
201: public String getIdentifier() {
202: return name;
203: }
204:
205: public synchronized AgentState getAgentState(MessageAddress id) {
206: MessageAddress canonical_id = id.getPrimary();
207: Object raw = agentStates.get(canonical_id);
208: if (raw == null) {
209: AgentState state = new SimpleMessageAttributes();
210: agentStates.put(canonical_id, state);
211: return state;
212: } else if (raw instanceof AgentState) {
213: return (AgentState) raw;
214: } else {
215: throw new RuntimeException("Cached state for " + id
216: + "=" + raw
217: + " which is not an AgentState instance");
218: }
219: }
220:
221: public synchronized void removeAgentState(MessageAddress id) {
222: agentStates.remove(id.getPrimary());
223: }
224:
225: public boolean isLocalClient(MessageAddress id) {
226: synchronized (this ) {
227: return receiveLinks.get(id.getPrimary()) != null
228: || id.equals(MessageAddress.MULTICAST_LOCAL);
229: }
230: }
231:
232: public ReceiveLink findLocalReceiveLink(MessageAddress id) {
233: return (ReceiveLink) receiveLinks.get(id.getPrimary());
234: }
235:
236: // this is a slow implementation, as it conses a new set each time.
237: // Better alternatives surely exist.
238: public Iterator findLocalMulticastReceivers(
239: MulticastMessageAddress addr) {
240: if (addr.hasReceiverClass()) {
241: ArrayList result = new ArrayList();
242: Class mclass = addr.getReceiverClass();
243: if (mclass != null) {
244: Iterator itr = receiveLinks.entrySet().iterator();
245: while (itr.hasNext()) {
246: Map.Entry entry = (Map.Entry) itr.next();
247: ReceiveLink link = (ReceiveLink) entry
248: .getValue();
249: MessageTransportClient client = link
250: .getClient();
251: if (mclass.isAssignableFrom(client.getClass())) {
252: result.add(entry.getKey());
253: if (loggingService.isDebugEnabled())
254: loggingService.debug("Client " + client
255: + " matches " + mclass
256: + ", added " + entry.getKey());
257: } else {
258: if (loggingService.isDebugEnabled())
259: loggingService.debug("Client " + client
260: + " doesn't match " + mclass);
261: }
262: }
263: }
264: if (loggingService.isDebugEnabled())
265: loggingService.debug("result=" + result);
266: return result.iterator();
267:
268: } else {
269: return new ArrayList(receiveLinks.keySet()).iterator();
270: }
271: }
272:
273: public Iterator findRemoteMulticastTransports(
274: MulticastMessageAddress addr) {
275: return nameSupport().lookupMulticast(addr);
276: }
277:
278: public void registerClient(MessageTransportClient client) {
279: registerClientWithSociety(client);
280: addLocalClient(client);
281: }
282:
283: public void unregisterClient(MessageTransportClient client) {
284: removeLocalClient(client);
285: unregisterClientWithSociety(client);
286: }
287:
288: public void ipAddressChanged() {
289: // inform each protocol
290: synchronized (linkProtocols) {
291: Iterator protocols = linkProtocols.iterator();
292: while (protocols.hasNext()) {
293: LinkProtocol protocol = (LinkProtocol) protocols
294: .next();
295: protocol.ipAddressChanged();
296: }
297: }
298: }
299:
300: public boolean addressKnown(MessageAddress address) {
301: synchronized (linkProtocols) {
302: Iterator protocols = linkProtocols.iterator();
303: while (protocols.hasNext()) {
304: LinkProtocol protocol = (LinkProtocol) protocols
305: .next();
306: if (protocol.addressKnown(address))
307: return true;
308: }
309: }
310: return false;
311: }
312:
313: public ArrayList getDestinationLinks(MessageAddress destination) {
314: ArrayList destinationLinks = new ArrayList();
315: synchronized (linkProtocols) {
316: Iterator itr = linkProtocols.iterator();
317: DestinationLink link;
318: while (itr.hasNext()) {
319: LinkProtocol lp = (LinkProtocol) itr.next();
320: // Class lp_class = lp.getClass();
321: link = lp.getDestinationLink(destination);
322: destinationLinks.add(link);
323: }
324: }
325: return destinationLinks;
326: }
327:
328: }
329:
330: }
|