001: /*
002: * <copyright>
003: *
004: * Copyright 2001-2004 Mobile Intelligence Corp
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.community.manager;
028:
029: import java.net.URI;
030: import java.util.ArrayList;
031: import java.util.Collections;
032: import java.util.HashMap;
033: import java.util.HashSet;
034: import java.util.Iterator;
035: import java.util.List;
036: import java.util.Map;
037: import java.util.Set;
038:
039: import org.cougaar.community.CommunityImpl;
040: import org.cougaar.community.CommunityDescriptor;
041: import org.cougaar.community.RelayAdapter;
042: import org.cougaar.community.CommunityUpdateListener;
043: import org.cougaar.community.BlackboardClient;
044: import org.cougaar.community.CommunityServiceConstants;
045: import org.cougaar.core.component.BindingSite;
046: import org.cougaar.core.component.ServiceBroker;
047: import org.cougaar.core.component.ServiceAvailableEvent;
048: import org.cougaar.core.component.ServiceAvailableListener;
049: import org.cougaar.core.mts.MessageAddress;
050: import org.cougaar.core.service.AlarmService;
051: import org.cougaar.core.service.AgentIdentificationService;
052: import org.cougaar.core.service.LoggingService;
053: import org.cougaar.core.service.UIDService;
054: import org.cougaar.core.service.community.CommunityChangeEvent;
055: import org.cougaar.core.service.wp.AddressEntry;
056: import org.cougaar.core.service.wp.Callback;
057: import org.cougaar.core.service.wp.Response;
058: import org.cougaar.core.service.wp.WhitePagesService;
059:
060: import org.cougaar.core.util.UID;
061:
062: /**
063: * Helper class used to distribute new/updated CommunityDescriptor objects to
064: * interested nodes and agents.
065: */
066: public class CommunityDistributer implements CommunityServiceConstants {
067:
068: private long updateInterval;
069: private long cacheExpiration;
070: private boolean nodesOnly = true;
071:
072: private WhitePagesService whitePagesService;
073: private ServiceBroker serviceBroker;
074: private UIDService uidService;
075: private LoggingService logger;
076: private CommunityUpdateListener updateListener;
077: private MyBlackboardClient blackboardClient;
078: private BindingSite bindingSite;
079: private MessageAddress agentId;
080:
081: private Map communities;
082:
083: // Map of DescriptorEntry objects. Allows multiple communities to be
084: // managed.
085: private Map descriptors = Collections
086: .synchronizedMap(new HashMap());
087:
088: class DescriptorEntry {
089: String name;
090: RelayAdapter ra;
091: Set nodeTargets = Collections.synchronizedSet(new HashSet());
092: Set unresolvedAgents = Collections
093: .synchronizedSet(new HashSet());
094: long lastSent = 0;
095: boolean didChange = true;
096: boolean doRemove = false;
097:
098: DescriptorEntry(String name) {
099: this .name = name;
100: }
101: }
102:
103: /**
104: * Constructor.
105: * @param bs BindingSite from CommunityManager.
106: * @param nodesOnly True if CommunityDescriptors are only sent to node
107: * agents
108: * @param cul Listener object to receive community descriptor updates
109: * @param communities Communities managed by CommunityManager
110: *
111: */
112: public CommunityDistributer(BindingSite bs, boolean nodesOnly,
113: CommunityUpdateListener cul, Map communities) {
114: this .communities = communities;
115: this .bindingSite = bs;
116: this .nodesOnly = nodesOnly;
117: this .updateListener = cul;
118: this .blackboardClient = new MyBlackboardClient(bs);
119: this .serviceBroker = getServiceBroker();
120: this .agentId = getAgentId();
121: this .logger = (LoggingService) serviceBroker.getService(this ,
122: LoggingService.class, null);
123: this .whitePagesService = (WhitePagesService) serviceBroker
124: .getService(this , WhitePagesService.class, null);
125: getSystemProperties();
126: initUidService();
127: }
128:
129: protected void getSystemProperties() {
130: try {
131: updateInterval = Long.parseLong(System.getProperty(
132: UPDATE_INTERVAL_PROPERTY, Long
133: .toString(DEFAULT_UPDATE_INTERVAL)));
134: cacheExpiration = Long.parseLong(System.getProperty(
135: CACHE_EXPIRATION_PROPERTY, Long
136: .toString(DEFAULT_CACHE_EXPIRATION)));
137: } catch (Exception ex) {
138: if (logger.isWarnEnabled()) {
139: logger
140: .warn(
141: agentId
142: + ": Exception setting parameter from system property",
143: ex);
144: }
145: }
146: }
147:
148: protected ServiceBroker getServiceBroker() {
149: return bindingSite.getServiceBroker();
150: }
151:
152: protected MessageAddress getAgentId() {
153: AgentIdentificationService ais = (AgentIdentificationService) getServiceBroker()
154: .getService(this , AgentIdentificationService.class,
155: null);
156: MessageAddress addr = ais.getMessageAddress();
157: getServiceBroker().releaseService(this ,
158: AgentIdentificationService.class, ais);
159: return addr;
160: }
161:
162: /**
163: * Initialize UIDService using ServiceAvailableListener if service not
164: * immediately available.
165: */
166: private void initUidService() {
167: ServiceBroker sb = getServiceBroker();
168: if (sb.hasService(org.cougaar.core.service.UIDService.class)) {
169: uidService = (UIDService) sb.getService(this ,
170: UIDService.class, null);
171: } else {
172: sb.addServiceListener(new ServiceAvailableListener() {
173: public void serviceAvailable(ServiceAvailableEvent sae) {
174: if (sae.getService().equals(UIDService.class)) {
175: uidService = (UIDService) getServiceBroker()
176: .getService(this , UIDService.class,
177: null);
178: }
179: }
180: });
181: }
182: }
183:
184: /**
185: * Get Unique identifier.
186: * @return Unique ID
187: */
188: protected UID getUID() {
189: return uidService != null ? uidService.nextUID() : null;
190: }
191:
192: /**
193: * Publishes pending CommunityDescriptors.
194: */
195: private void publishDescriptors() {
196: if (logger.isDetailEnabled()) {
197: logger.detail("publishDescriptors");
198: }
199: long now = now();
200: List l;
201: synchronized (descriptors) {
202: l = new ArrayList(descriptors.values());
203: }
204: for (Iterator it = l.iterator(); it.hasNext();) {
205: DescriptorEntry de = (DescriptorEntry) it.next();
206: //CommunityImpl community =
207: // (CommunityImpl)((CommunityImpl)communities.get(de.name)).clone();
208: CommunityImpl community = (CommunityImpl) communities
209: .get(de.name);
210: community.setLastUpdate(now);
211: ((CommunityDescriptorImpl) de.ra.getContent()).community = community;
212: if (de.lastSent == 0) {
213: if (!de.nodeTargets.isEmpty()) {
214: updateTargets(de.ra, nodesOnly ? de.nodeTargets
215: : de.ra.getInterestedAgents());
216: de.didChange = false;
217: de.lastSent = now;
218: if (blackboardClient != null) {
219: blackboardClient.publish(de.ra,
220: BlackboardClient.ADD);
221: if (logger.isDebugEnabled()) {
222: logger.debug("publishAdd: "
223: + de.ra
224: + " targets="
225: + de.ra.getTargets().size()
226: + " size="
227: + ((CommunityDescriptor) de.ra
228: .getContent())
229: .getCommunity()
230: .getEntities().size());
231: }
232: }
233: if (de.nodeTargets.contains(agentId)) {
234: //updateListener.updateCommunity((CommunityImpl)community.clone());
235: updateListener.updateCommunity(community);
236: }
237: }
238: } else {
239: if ((de.didChange && (now > (de.lastSent + updateInterval)))
240: || (cacheExpiration != NEVER && (now > (de.lastSent + (cacheExpiration / 2))))) {
241: // publish changed descriptor
242: updateTargets(de.ra, nodesOnly ? de.nodeTargets
243: : de.ra.getInterestedAgents());
244: de.didChange = false;
245: de.lastSent = now;
246: if (blackboardClient != null) {
247: blackboardClient.publish(de.ra,
248: BlackboardClient.CHANGE);
249: if (logger.isDebugEnabled()) {
250: logger.debug("publishChange: "
251: + de.ra
252: + " targets="
253: + de.ra.getTargets().size()
254: + " size="
255: + ((CommunityDescriptor) de.ra
256: .getContent())
257: .getCommunity()
258: .getEntities().size());
259: }
260: }
261: if (de.nodeTargets.contains(agentId)) {
262: //updateListener.updateCommunity((CommunityImpl)community.clone());
263: updateListener.updateCommunity(community);
264: }
265: } else {
266: if (de.doRemove) { // remove descriptor
267: if (blackboardClient != null) {
268: blackboardClient.publish(de.ra,
269: BlackboardClient.REMOVE);
270: }
271: if (de.nodeTargets.contains(agentId)) {
272: //updateListener.removeCommunity((CommunityImpl)community.clone());
273: updateListener.removeCommunity(community);
274: }
275: descriptors.remove(de.name);
276: if (logger.isDebugEnabled()) {
277: logger.debug("publishRemove: " + de.ra);
278: }
279: }
280: }
281: }
282: }
283: }
284:
285: /**
286: * Enable automatic update of CommunityDescriptors for named community.
287: * @param communityName Community to update
288: * @param agents Initial set of targets
289: */
290: protected void add(String communityName, Set agents) {
291: DescriptorEntry de = (DescriptorEntry) descriptors
292: .get(communityName);
293: if (de == null) {
294: de = new DescriptorEntry(communityName);
295: CommunityDescriptorImpl cd = new CommunityDescriptorImpl(
296: agentId, null, getUID());
297: de.ra = new RelayAdapter(agentId, cd, cd.getUID());
298: descriptors.put(communityName, de);
299: addTargets(communityName, agents);
300: }
301: blackboardClient.startTimer();
302: }
303:
304: /**
305: * Enable automatic update of CommunityDescriptors for named community.
306: * @param ra RelayAdapter associated with previously created CommunityDescriptor
307: */
308: protected void add(RelayAdapter ra) {
309: CommunityDescriptorImpl cd = (CommunityDescriptorImpl) ra
310: .getContent();
311: String communityName = cd.getName();
312: DescriptorEntry de = (DescriptorEntry) descriptors
313: .get(communityName);
314: if (de == null) {
315: de = new DescriptorEntry(communityName);
316: de.ra = ra;
317: descriptors.put(communityName, de);
318: addTargets(communityName, ra.getInterestedAgents());
319: }
320: blackboardClient.startTimer();
321: }
322:
323: protected boolean contains(String communityName) {
324: return descriptors.containsKey(communityName);
325: }
326:
327: protected Set getTargets(String communityName) {
328: DescriptorEntry de = (DescriptorEntry) descriptors
329: .get(communityName);
330: if (de != null && de.ra != null) {
331: return de.ra.getTargets();
332: } else {
333: return Collections.EMPTY_SET;
334: }
335: }
336:
337: /**
338: * Adds new targets to receive CommunityDescriptor updates.
339: * @param communityName Community
340: * @param targets New targets
341: */
342: protected void addTargets(String communityName, Set targets) {
343: if (logger.isDebugEnabled()) {
344: logger.debug("addTargets:" + " community=" + communityName
345: + " agents=" + targets);
346: }
347: DescriptorEntry de = (DescriptorEntry) descriptors
348: .get(communityName);
349: if (de != null) {
350: de.ra.getInterestedAgents().addAll(targets);
351: Set agentsToAdd = new HashSet(targets);
352: for (Iterator it = agentsToAdd.iterator(); it.hasNext();) {
353: String targetName = (String) it.next();
354: findNodeTargets(MessageAddress
355: .getMessageAddress(targetName), communityName);
356: }
357: }
358: }
359:
360: /**
361: * Removes targets to receive CommunityDescriptor updates.
362: * @param communityName Community
363: * @param agentNames Targets to remove
364: */
365: protected void removeTargets(String communityName, Set agentNames) {
366: if (logger.isDetailEnabled()) {
367: logger.detail("removeTargets:" + " community="
368: + communityName + " agents=" + agentNames);
369: }
370: DescriptorEntry de = (DescriptorEntry) descriptors
371: .get(communityName);
372: if (de != null) {
373: de.ra.getInterestedAgents().removeAll(agentNames);
374: }
375: }
376:
377: /**
378: * Disables CommunityDescriptor updates for named community. If a
379: * CommunityDescriptor Relay was previously published it is rescinded via
380: * a blackboard publishRemove.
381: * @param communityName Name of community
382: */
383: protected void remove(String communityName) {
384: DescriptorEntry de = (DescriptorEntry) descriptors
385: .get(communityName);
386: if (de != null) {
387: de.doRemove = true;
388: }
389: }
390:
391: /**
392: * Update Relay target set.
393: * @param ra Relay to update
394: * @param targets Targets
395: */
396: private void updateTargets(RelayAdapter ra, Set targets) {
397: Set targetsToAdd = new HashSet();
398: synchronized (targets) {
399: targetsToAdd.addAll(targets);
400: }
401: for (Iterator it = targetsToAdd.iterator(); it.hasNext();) {
402: MessageAddress target = (MessageAddress) it.next();
403: if (!ra.getTargets().contains(target)) {
404: ra.addTarget(target);
405: }
406: }
407: resolveAgents();
408: }
409:
410: /**
411: * Notify targets of a change in community state.
412: * @param communityName Name of changed community
413: */
414: protected void update(String communityName) {
415: if (logger.isDetailEnabled()) {
416: logger.detail("update:" + " community=" + communityName);
417: }
418: DescriptorEntry de = (DescriptorEntry) descriptors
419: .get(communityName);
420: if (de != null) {
421: de.didChange = true;
422: blackboardClient.timer.expire();
423: }
424: }
425:
426: /**
427: * Notify targets of a change in community state.
428: * @param communityName Name of changed community
429: * @param type Type of change
430: * @param what Entity affected by change
431: * @deprecated
432: */
433: protected void update(String communityName, int type, String what) {
434: if (logger.isDetailEnabled()) {
435: logger.detail("update:" + " community=" + communityName
436: + " type="
437: + CommunityChangeEvent.getChangeTypeAsString(type)
438: + " whatChanged=" + what);
439: }
440: DescriptorEntry de = (DescriptorEntry) descriptors
441: .get(communityName);
442: if (de != null) {
443: de.didChange = true;
444: blackboardClient.timer.expire();
445: }
446: }
447:
448: /**
449: * Get CommunityDescriptor associated with named community.
450: * @param communityName Name of community
451: * @return CommunityDescriptor for community
452: */
453: protected CommunityDescriptor get(String communityName) {
454: DescriptorEntry de = (DescriptorEntry) descriptors
455: .get(communityName);
456: return de != null ? (CommunityDescriptor) de.ra.getContent()
457: : null;
458: }
459:
460: /**
461: * Find an agents node by looking in WhitePages. Add node address to
462: * Relay target set.
463: * @param agentId MessageAddress of agent
464: * @param communityName Name of associated community
465: */
466: private void findNodeTargets(final MessageAddress agentId,
467: final String communityName) {
468: if (logger.isDetailEnabled()) {
469: logger.detail("findNodeTargets:" + " community="
470: + communityName + " agent=" + agentId);
471: }
472: Callback cb = new Callback() {
473: public void execute(Response resp) {
474: if (resp.isAvailable()) {
475: if (resp.isSuccess()) {
476: AddressEntry entry = ((Response.Get) resp)
477: .getAddressEntry();
478: try {
479: if (entry != null) {
480: URI uri = entry.getURI();
481: MessageAddress node = MessageAddress
482: .getMessageAddress(uri
483: .getPath().substring(1));
484: DescriptorEntry de = (DescriptorEntry) descriptors
485: .get(communityName);
486: if (de != null) {
487: if (!de.nodeTargets.contains(node)) {
488: de.nodeTargets.add(node);
489: de.didChange = true;
490: }
491: }
492: } else {
493: if (logger.isDetailEnabled()) {
494: logger
495: .detail("AddressEntry is null: agent="
496: + agentId);
497: }
498: DescriptorEntry de = (DescriptorEntry) descriptors
499: .get(communityName);
500: if (de != null) {
501: de.unresolvedAgents.add(agentId);
502: }
503: }
504: } catch (Exception ex) {
505: if (logger.isErrorEnabled()) {
506: logger
507: .error(
508: "Exception in addNodeToTargets:",
509: ex);
510: }
511: } finally {
512: resp.removeCallback(this );
513: }
514: }
515: }
516: }
517: };
518: whitePagesService.get(agentId.toString(), "topology", cb);
519: }
520:
521: // Find node for any agents which are still unresolved.
522: private void resolveAgents() {
523: List l;
524: synchronized (descriptors) {
525: l = new ArrayList(descriptors.values());
526: }
527: for (Iterator it = l.iterator(); it.hasNext();) {
528: DescriptorEntry de = (DescriptorEntry) it.next();
529: if (!de.unresolvedAgents.isEmpty()) {
530: List agents;
531: synchronized (descriptors) {
532: agents = new ArrayList(de.unresolvedAgents);
533: de.unresolvedAgents.clear();
534: }
535: for (Iterator it1 = agents.iterator(); it1.hasNext();) {
536: findNodeTargets((MessageAddress) it1.next(),
537: de.name);
538: }
539: }
540: }
541: }
542:
543: /**
544: * Returns current time as a long.
545: * @return long Current time
546: */
547: private long now() {
548: return System.currentTimeMillis();
549: }
550:
551: class MyBlackboardClient extends BlackboardClient {
552:
553: private BBWakeAlarm timer;
554:
555: public MyBlackboardClient(BindingSite bs) {
556: super (bs);
557: }
558:
559: protected void startTimer() {
560: if (timer == null) {
561: AlarmService as = getAlarmService();
562: if (as != null) {
563: timer = new BBWakeAlarm(now() + updateInterval);
564: as.addRealTimeAlarm(timer);
565: }
566: }
567: }
568:
569: public void setupSubscriptions() {
570: }
571:
572: public void execute() {
573: super .execute();
574: if (timer != null && timer.hasExpired()) {
575: publishDescriptors();
576: timer = new BBWakeAlarm(now() + updateInterval);
577: alarmService.addRealTimeAlarm(timer);
578: }
579: }
580: }
581:
582: }
|