001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.base;
028:
029: import java.util.ArrayList;
030: import java.util.HashMap;
031: import java.util.List;
032:
033: import org.cougaar.core.agent.Agent;
034: import org.cougaar.core.component.ComponentDescription;
035: import org.cougaar.core.component.ComponentDescriptions;
036: import org.cougaar.core.component.ContainerSupport;
037: import org.cougaar.core.component.ServiceBroker;
038: import org.cougaar.core.component.ServiceProvider;
039: import org.cougaar.core.mts.AgentStatusService;
040: import org.cougaar.core.mts.MessageAddress;
041: import org.cougaar.core.mts.MessageTransportClient;
042: import org.cougaar.core.node.ComponentInitializerService;
043: import org.cougaar.core.node.NodeControlService;
044: import org.cougaar.core.node.NodeIdentificationService;
045: import org.cougaar.core.service.LoggingService;
046: import org.cougaar.core.service.MessageStatisticsService;
047: import org.cougaar.core.service.MessageTransportService;
048: import org.cougaar.core.service.MessageWatcherService;
049: import org.cougaar.core.thread.ThreadServiceProvider;
050:
051: import org.cougaar.mts.std.WatcherAspect;
052: import org.cougaar.mts.std.AgentStatusAspect;
053: import org.cougaar.mts.std.AspectSupport;
054: import org.cougaar.mts.std.AspectSupportImpl;
055: import org.cougaar.mts.std.MessageWatcherServiceImpl;
056: import org.cougaar.mts.std.MessageTimeoutAspect;
057: import org.cougaar.mts.std.MulticastAspect;
058: import org.cougaar.mts.std.StatisticsAspect;
059:
060: /**
061: * This Component and Container is the ServiceProvider for the
062: * {@link MessageTransportService}. This the overall organizing class
063: * for the MTS as a whole.
064: */
065: public final class MessageTransportServiceProvider extends
066: ContainerSupport implements ServiceProvider {
067: // Some special aspect classes
068: private static final String STATISTICS_ASPECT = "org.cougaar.mts.std.StatisticsAspect";
069:
070: // Services we use (more than once)
071: private AspectSupport aspectSupport;
072: private LoggingService loggingService;
073: // Hang on to these because they implement services we provide.
074: private WatcherAspect watcherAspect;
075: private AgentStatusAspect agentStatusAspect;
076:
077: private String id;
078: private final HashMap proxies = new HashMap();
079:
080: private AspectSupportImpl aspectSupportImpl;
081:
082: private MessageStreamsFactory msgFactory;
083:
084: protected String specifyContainmentPoint() {
085: return Agent.INSERTION_POINT + ".MessageTransport";
086: }
087:
088: public void load() {
089: transitState("load()", UNLOADED, LOADED);
090:
091: beforeConfig();
092:
093: addAll(readConfig());
094:
095: afterConfig();
096: }
097:
098: // disable super load sequence
099: protected void loadHighPriorityComponents() {
100: }
101:
102: protected void loadInternalPriorityComponents() {
103: }
104:
105: protected void loadBinderPriorityComponents() {
106: }
107:
108: protected void loadComponentPriorityComponents() {
109: }
110:
111: protected void loadLowPriorityComponents() {
112: }
113:
114: protected ComponentDescriptions findInitialComponentDescriptions() {
115: return null;
116: }
117:
118: protected ComponentDescriptions findExternalComponentDescriptions() {
119: return null;
120: }
121:
122: // components that must be loaded before the config's components
123: private void beforeConfig() {
124: ServiceBroker csb = getChildServiceBroker();
125:
126: NodeIdentificationService nis = (NodeIdentificationService) csb
127: .getService(this , NodeIdentificationService.class, null);
128: id = nis.getMessageAddress().toString();
129: loggingService = (LoggingService) csb.getService(this ,
130: LoggingService.class, null);
131:
132: aspectSupportImpl = new AspectSupportImpl(this , loggingService);
133: csb.addService(AspectSupport.class, aspectSupportImpl);
134:
135: // Do the standard set first, since they're assumed to be more
136: // generic than the user-specified set.
137:
138: // Drop Stale messages
139: // Needs loaded first so all other aspets have a chance the
140: // process the message before it gets dropped)
141: add(new MessageTimeoutAspect());
142:
143: // For the MessageWatcher service
144: watcherAspect = new WatcherAspect();
145: add(watcherAspect);
146:
147: // Keep track of Agent state
148: agentStatusAspect = new AgentStatusAspect();
149: add(agentStatusAspect);
150:
151: // Handling multicast messages
152: add(new MulticastAspect());
153:
154: // Could use a ComponentDescription
155: ThreadServiceProvider tsp = new ThreadServiceProvider();
156: tsp.setParameter("name=MTS");
157: add(tsp);
158:
159: MessageTransportRegistry reg = new MessageTransportRegistry(id,
160: csb);
161: csb.addService(MessageTransportRegistryService.class, reg);
162:
163: LinkSelectionProvision lsp = new LinkSelectionProvision();
164: csb.addService(LinkSelectionProvisionService.class, lsp);
165:
166: SocketControlProvision scp = new SocketControlProvision();
167: csb.addService(SocketControlProvisionService.class, scp);
168: // SocketFactory has no access to services, so set it manually
169: // in a static.
170: SocketFactory.configureProvider(csb);
171: }
172:
173: // components specified in the config files
174: private List readConfig() {
175: ComponentDescriptions descs;
176: if (loadState instanceof ComponentDescriptions) {
177: // rehydration
178: descs = (ComponentDescriptions) loadState;
179: loadState = null;
180: } else {
181: // read config
182: ServiceBroker csb = getChildServiceBroker();
183: ComponentInitializerService cis = (ComponentInitializerService) csb
184: .getService(this ,
185: ComponentInitializerService.class, null);
186: try {
187: String cp = specifyContainmentPoint();
188: descs = new ComponentDescriptions(cis
189: .getComponentDescriptions(id, cp));
190: } catch (ComponentInitializerService.InitializerException cise) {
191: if (loggingService.isInfoEnabled()) {
192: loggingService.info("\nUnable to add " + id
193: + "'s plugins ", cise);
194: }
195: descs = null;
196: } finally {
197: csb.releaseService(this ,
198: ComponentInitializerService.class, cis);
199: }
200: }
201:
202: // flatten
203: List l = new ArrayList();
204: if (descs != null) {
205: // ubug 13451:
206: l
207: .addAll(descs
208: .selectComponentDescriptions(ComponentDescription.PRIORITY_INTERNAL));
209: // typical Aspects/LinkProtocols/etc
210: l
211: .addAll(descs
212: .selectComponentDescriptions(ComponentDescription.PRIORITY_COMPONENT));
213: }
214: return l;
215: }
216:
217: // components that must be loaded after the config's components
218: private void afterConfig() {
219: ServiceBroker csb = getChildServiceBroker();
220:
221: aspectSupport = (AspectSupport) csb.getService(this ,
222: AspectSupport.class, null);
223:
224: // The rest of the services depend on aspects.
225: createNameSupport(id);
226: createFactories();
227:
228: NodeControlService ncs = (NodeControlService) csb.getService(
229: this , NodeControlService.class, null);
230:
231: ServiceBroker rootsb = ncs.getRootServiceBroker();
232: rootsb.addService(MessageTransportService.class, this );
233: rootsb.addService(MessageStatisticsService.class, this );
234: rootsb.addService(MessageWatcherService.class, this );
235: rootsb.addService(AgentStatusService.class, this );
236: }
237:
238: private void createNameSupport(String id) {
239: ServiceBroker csb = getChildServiceBroker();
240: NameSupportImpl impl = new NameSupportImpl(id, csb);
241: csb.addService(NameSupport.class, impl);
242: }
243:
244: private void createFactories() {
245: ServiceBroker csb = getChildServiceBroker();
246:
247: msgFactory = MessageStreamsFactory.makeFactory();
248: add(msgFactory);
249:
250: ReceiveLinkFactory receiveLinkFactory = new ReceiveLinkFactory();
251: add(receiveLinkFactory);
252: csb.addService(ReceiveLinkProviderService.class,
253: receiveLinkFactory);
254:
255: LinkSelectionPolicyServiceProvider lspsp = new LinkSelectionPolicyServiceProvider(
256: csb, this );
257: csb.addService(LinkSelectionPolicy.class, lspsp);
258:
259: DestinationQueueFactory destQFactory = new DestinationQueueFactory(
260: this );
261: add(destQFactory);
262: csb.addService(DestinationQueueProviderService.class,
263: destQFactory);
264: csb.addService(DestinationQueueMonitorService.class,
265: destQFactory);
266:
267: // Singletons, though produced by factories.
268: MessageDelivererFactory delivererFactory = new MessageDelivererFactory(
269: id);
270: add(delivererFactory);
271: csb.addService(MessageDeliverer.class, delivererFactory);
272:
273: RouterFactory routerFactory = new RouterFactory();
274: add(routerFactory);
275: csb.addService(Router.class, routerFactory);
276:
277: SendQueueFactory sendQFactory = new SendQueueFactory(this , id);
278: add(sendQFactory);
279: csb.addService(SendQueueProviderService.class, sendQFactory);
280:
281: // load LinkProtocols
282: new LinkProtocolFactory(this , csb);
283: }
284:
285: private Object findOrMakeProxy(Object requestor) {
286: MessageTransportClient client = (MessageTransportClient) requestor;
287: MessageAddress addr = client.getMessageAddress();
288: long incarnation = client.getIncarnationNumber();
289: MessageTransportServiceProxy proxy = (MessageTransportServiceProxy) proxies
290: .get(addr);
291: if (proxy != null
292: && proxy.getIncarnationNumber() == incarnation) {
293: return proxy;
294: }
295:
296: // Make SendLink and attach aspect delegates
297: SendLink link = new SendLinkImpl(addr, incarnation,
298: getChildServiceBroker());
299: Class c = SendLink.class;
300: Object raw = aspectSupport.attachAspects(link, c);
301: link = (SendLink) raw;
302:
303: // Make proxy
304: proxy = new MessageTransportServiceProxy(client, link);
305: proxies.put(addr, proxy);
306: if (loggingService.isDebugEnabled())
307: loggingService
308: .debug("Created MessageTransportServiceProxy for "
309: + requestor + " with address "
310: + client.getMessageAddress());
311: return proxy;
312:
313: }
314:
315: // ServiceProvider
316:
317: public Object getService(ServiceBroker sb, Object requestor,
318: Class serviceClass) {
319: if (serviceClass == MessageTransportService.class) {
320: if (requestor instanceof MessageTransportClient) {
321: return findOrMakeProxy(requestor);
322: } else {
323: throw new IllegalArgumentException(
324: "Requestor is not a MessageTransportClient");
325: }
326: } else if (serviceClass == MessageStatisticsService.class) {
327: StatisticsAspect aspect = (StatisticsAspect) aspectSupport
328: .findAspect(STATISTICS_ASPECT);
329: return aspect;
330: } else if (serviceClass == MessageWatcherService.class) {
331: return new MessageWatcherServiceImpl(watcherAspect);
332: } else if (serviceClass == AgentStatusService.class) {
333: return agentStatusAspect;
334: } else {
335: return null;
336: }
337: }
338:
339: public void releaseService(ServiceBroker sb, Object requestor,
340: Class serviceClass, Object service) {
341: if (serviceClass == MessageTransportService.class) {
342: if (requestor instanceof MessageTransportClient) {
343: MessageTransportClient client = (MessageTransportClient) requestor;
344: MessageAddress addr = client.getMessageAddress();
345: MessageTransportService svc = (MessageTransportService) proxies
346: .get(addr);
347: MessageTransportServiceProxy proxy = (MessageTransportServiceProxy) proxies
348: .get(addr);
349: if (svc != service)
350: return; // ???
351: proxies.remove(addr);
352: proxy.release();
353: }
354: } else if (serviceClass == MessageStatisticsService.class) {
355: // The only resource used here is the StatisticsAspect,
356: // which stays around.
357: } else if (serviceClass == AgentStatusService.class) {
358: // The only resource used here is the aspect, which stays
359: // around.
360: } else if (serviceClass == MessageWatcherService.class) {
361: if (service instanceof MessageWatcherServiceImpl) {
362: ((MessageWatcherServiceImpl) service).release();
363: }
364: }
365: }
366:
367: /**
368: * @see org.cougaar.core.component.ContainerSupport#unload()
369: */
370: public void unload() {
371: super.unload();
372: aspectSupportImpl.unload();
373:
374: msgFactory.releaseFactory();
375: }
376: }
|