001: /*
002: * <copyright>
003: *
004: * Copyright 2001-2004 Mobile Intelligence Corp
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.community.manager;
027:
028: import java.net.URI;
029: import java.util.Collection;
030: import java.util.Collections;
031: import java.util.HashSet;
032: import java.util.Iterator;
033: import java.util.Set;
034: import java.lang.reflect.Constructor;
035:
036: import javax.naming.directory.ModificationItem;
037:
038: import org.cougaar.community.BlackboardClient;
039: import org.cougaar.community.CommunityDescriptor;
040: import org.cougaar.community.CommunityUpdateListener;
041: import org.cougaar.community.RelayAdapter;
042: import org.cougaar.community.AbstractCommunityService;
043: import org.cougaar.community.CommunityServiceConstants;
044: import org.cougaar.community.CommunityResponseImpl;
045:
046: import org.cougaar.core.blackboard.IncrementalSubscription;
047: import org.cougaar.core.component.BindingSite;
048: import org.cougaar.core.component.ServiceBroker;
049: import org.cougaar.core.mts.MessageAddress;
050: import org.cougaar.core.service.AgentIdentificationService;
051: import org.cougaar.core.service.LoggingService;
052: import org.cougaar.core.service.community.Community;
053: import org.cougaar.core.service.community.Entity;
054: import org.cougaar.core.service.community.FindCommunityCallback;
055:
056: import org.cougaar.core.service.wp.AddressEntry;
057: import org.cougaar.core.service.wp.Callback;
058: import org.cougaar.core.service.wp.Response;
059: import org.cougaar.core.service.wp.WhitePagesService;
060: import org.cougaar.util.UnaryPredicate;
061:
062: /**
063: * Concrete implementation of CommunityManager interface that uses Blackboard
064: * Relays to perform communication with remote nodes/agents.
065: */
066: public class DefaultCommunityManagerImpl extends
067: AbstractCommunityManager implements CommunityServiceConstants {
068:
069: protected BindingSite bindingSite;
070: protected MyBlackboardClient myBlackboardClient;
071:
072: protected Set managedCommunities = Collections
073: .synchronizedSet(new HashSet());
074: protected Set communitiesToCheck = Collections
075: .synchronizedSet(new HashSet());
076:
077: // Helper class for distributing Community updates
078: protected CommunityDistributer distributer;
079:
080: // Services used
081: protected AbstractCommunityService communityService;
082: protected WhitePagesService whitePagesService;
083:
084: // This agent
085: protected MessageAddress agentId;
086: protected CommunityUpdateListener updateListener;
087:
088: protected String priorManager = null;
089:
090: protected long verifyInterval = DEFAULT_VERIFY_MGR_INTERVAL;
091:
092: protected boolean includeDescriptorInResponse = DEFAULT_INCLUDE_DESCRIPTOR_IN_RESPONSE;
093:
094: /**
095: * Construct CommunityManager component capable of communicating with remote
096: * agents via Blackboard Relays.
097: * @param bs BindingSite
098: * @param acs CommunityService reference
099: * @param cul Listener for local updates
100: */
101: public DefaultCommunityManagerImpl(BindingSite bs,
102: AbstractCommunityService acs, CommunityUpdateListener cul) {
103: this .bindingSite = bs;
104: ServiceBroker sb = getServiceBroker();
105: agentId = getAgentId();
106: agentName = agentId.toString();
107: logger = (LoggingService) sb.getService(this ,
108: LoggingService.class, null);
109: communityService = acs;
110: whitePagesService = (WhitePagesService) sb.getService(this ,
111: WhitePagesService.class, null);
112: myBlackboardClient = new MyBlackboardClient(bs);
113: getSystemProperties();
114: accessManager = getCommunityAccessManager();
115: distributer = new CommunityDistributer(bs, true, cul,
116: communities);
117: }
118:
119: /**
120: * Create a new CommunityAccessManager.
121: * @return CommunityAccessManager
122: */
123: protected CommunityAccessManager getCommunityAccessManager() {
124: ServiceBroker sb = getServiceBroker();
125: CommunityAccessManager cam = null;
126: String accessManagerClassname = System.getProperty(
127: COMMUNITY_ACCESS_MANAGER_PROPERTY,
128: DEFAULT_COMMUNITY_ACCESS_MANAGER_CLASSNAME);
129: try {
130: Class accessManagerClass = Class
131: .forName(accessManagerClassname);
132: Class args[] = new Class[] { ServiceBroker.class };
133: Constructor constructor = accessManagerClass
134: .getConstructor(args);
135: cam = (CommunityAccessManager) constructor
136: .newInstance(new Object[] { sb });
137: } catch (Exception ex) {
138: logger
139: .error("Exception creating CommunityAccessManager: "
140: + ex.getMessage()
141: + ", reverting to org.cougaar.community.manager.CommunityAccessManager");
142: cam = new CommunityAccessManager(sb);
143: }
144: return cam;
145: }
146:
147: public void manageCommunity(Community community) {
148: super .manageCommunity(community);
149: }
150:
151: public void manageCommunity(Community community, Callback callback) {
152: super .manageCommunity(community, callback);
153: }
154:
155: protected void getSystemProperties() {
156: try {
157: verifyInterval = Long.parseLong(System.getProperty(
158: VERIFY_MGR_INTERVAL_PROPERTY, Long
159: .toString(DEFAULT_VERIFY_MGR_INTERVAL)));
160: includeDescriptorInResponse = Boolean
161: .valueOf(
162: System
163: .getProperty(
164: INCLUDE_DESCRIPTOR_IN_RESPONSE_PROPERTY,
165: Boolean
166: .toString(DEFAULT_INCLUDE_DESCRIPTOR_IN_RESPONSE)))
167: .booleanValue();
168: } catch (Exception ex) {
169: if (logger.isWarnEnabled()) {
170: logger
171: .warn(
172: agentName
173: + ": Exception setting parameter from system property",
174: ex);
175: }
176: }
177: }
178:
179: protected MessageAddress getAgentId() {
180: AgentIdentificationService ais = (AgentIdentificationService) getServiceBroker()
181: .getService(this , AgentIdentificationService.class,
182: null);
183: MessageAddress addr = ais.getMessageAddress();
184: getServiceBroker().releaseService(this ,
185: AgentIdentificationService.class, ais);
186: return addr;
187: }
188:
189: protected ServiceBroker getServiceBroker() {
190: return bindingSite.getServiceBroker();
191: }
192:
193: /**
194: * Processes Requests received via Relay.
195: * @param req Request
196: */
197: protected void processRequest(Request req) {
198: if (logger.isDetailEnabled()) {
199: logger.detail(agentId + ": processRequest: " + req);
200: }
201: String source = req.getSource().toString();
202: String communityName = req.getCommunityName();
203: int reqType = req.getRequestType();
204: Entity entity = req.getEntity();
205: ModificationItem[] attrMods = req.getAttributeModifications();
206: CommunityResponseImpl resp = (CommunityResponseImpl) handleRequest(
207: source, communityName, reqType, entity, attrMods);
208:
209: if (!includeDescriptorInResponse
210: && reqType != GET_COMMUNITY_DESCRIPTOR) {
211: // Don't include community in response, instead rely on CommunityDistributer to send
212: // This decreases messaging overhead (primarily in serialization) and thus
213: // improves overally scalability
214: resp.setContent(null);
215: }
216: req.setResponse(resp);
217: myBlackboardClient.publish(req, BlackboardClient.CHANGE);
218: }
219:
220: /**
221: * Tests whether this agent is the manager for the specified community.
222: * @param communityName String
223: * @return boolean
224: */
225: protected boolean isManager(String communityName) {
226: return (managedCommunities.contains(communityName)
227: && communities.containsKey(communityName) && distributer
228: .contains(communityName));
229: }
230:
231: /**
232: * Add agents to distribution list for community updates.
233: * @param communityName Name of community
234: * @param targets Set of agent names (String) to add to distribution
235: */
236: protected void addTargets(String communityName, Set targets) {
237: distributer.addTargets(communityName, targets);
238: }
239:
240: /**
241: * Remove agents from distribution list for community updates.
242: * @param communityName Name of community
243: * @param targets Set of agent names (String) to remove from distribution
244: */
245: protected void removeTargets(String communityName, Set targets) {
246: distributer.removeTargets(communityName, targets);
247: }
248:
249: /**
250: * Send updated Community info to agents on distribution.
251: * @param communityName Name of community
252: */
253: protected void distributeUpdates(String communityName) {
254: distributer.update(communityName);
255: }
256:
257: /**
258: * Get name of community manager.
259: * @param communityName String
260: * @param fmcb FindManagerCallback
261: */
262: public void findManager(String communityName,
263: final FindCommunityCallback fmcb) {
264:
265: communityService.findCommunity(communityName, fmcb, 0);
266: }
267:
268: /**
269: * Asserts community manager role.
270: * @param communityName Community to manage
271: */
272: protected void assertCommunityManagerRole(String communityName) {
273: assertCommunityManagerRole(communityName, false);
274: }
275:
276: protected void assertCommunityManagerRole(String communityName,
277: Callback callback) {
278: assertCommunityManagerRole(communityName, false, callback);
279: }
280:
281: /**
282: * Asserts community manager role by binding address to community name in
283: * White Pages
284: * @param communityName Community to manage
285: * @param override If true any existing binding will be removed
286: * and replaced with new
287: */
288: protected void assertCommunityManagerRole(String communityName,
289: boolean override) {
290: assertCommunityManagerRole(communityName, override, null);
291: }
292:
293: /**
294: * Asserts community manager role by binding address to community name in
295: * White Pages
296: *
297: * @param communityName
298: * Community to manage
299: * @param override
300: * If true any existing binding will be removed and replaced with new
301: * @param callback
302: * Invoked when the assertion is completed
303: */
304: protected void assertCommunityManagerRole(String communityName,
305: boolean override, Callback callback) {
306: if (logger.isDetailEnabled()) {
307: logger.detail(agentName
308: + ": assertCommunityManagerRole: agent="
309: + agentId.toString() + " community="
310: + communityName);
311: }
312: try {
313: bindCommunityManager(communityName, override, callback);
314: communitiesToCheck.add(communityName);
315: myBlackboardClient.startVerifyManagerCheck();
316: } catch (Throwable ex) {
317: if (logger.isWarnEnabled()) {
318: logger
319: .warn(agentName
320: + ": Unable to (re)bind agent as community manager:"
321: + " error=" + ex + " agent=" + agentId
322: + " community=" + communityName);
323: }
324: }
325:
326: }
327:
328: /**
329: * Return current time as a long.
330: * @return Current time
331: */
332: private long now() {
333: return System.currentTimeMillis();
334: }
335:
336: /** Create a wp entry for white pages binding
337: * @param communityName Name of community to bind
338: * @return AddressEntry for new manager binding.
339: * @exception Exception Unable to create AddressEntry
340: */
341: private AddressEntry createManagerEntry(String communityName)
342: throws Exception {
343: URI uri = URI.create("agent:///" + agentId);
344: AddressEntry entry = AddressEntry.getAddressEntry(communityName
345: + ".comm", "community", uri);
346: return entry;
347: }
348:
349: /**
350: * Bind this agent to community name in White Pages.
351: * @param communityName String Name of community to bind
352: * @param override boolean Override existing binding
353: * @throws Exception
354: */
355: private void bindCommunityManager(final String communityName,
356: final boolean override, final Callback callback)
357: throws Exception {
358: final AddressEntry communityAE = createManagerEntry(communityName);
359: Callback cb = new Callback() {
360: public void execute(Response resp) {
361: Response.Bind bindResp = (Response.Bind) resp;
362: if (resp.isAvailable()) {
363: if (logger.isDebugEnabled())
364: logger.debug(agentName + ": bind: "
365: + " success=" + resp.isSuccess()
366: + " didBind=" + bindResp.didBind());
367: if (bindResp.didBind()) {
368: distributer.add(communityName, Collections
369: .singleton(agentId.toString()));
370: if (logger.isDebugEnabled()) {
371: logger.debug(agentName
372: + ": Managing community "
373: + communityName);
374: }
375: managedCommunities.add(communityName);
376: } else {
377: if (logger.isDetailEnabled())
378: logger
379: .detail(agentName
380: + ": Unable to bind agent as community manager:"
381: + " agent=" + agentId
382: + " community="
383: + communityName + " entry="
384: + communityAE
385: + " attemptingRebind="
386: + override);
387: if (override) {
388: rebindCommunityManager(communityAE,
389: communityName);
390: }
391: }
392: resp.removeCallback(this );
393: if (callback != null) {
394: callback.execute(resp);
395: }
396: }
397: }
398: };
399: whitePagesService.bind(communityAE, cb);
400: }
401:
402: private void rebindCommunityManager(AddressEntry ae,
403: final String communityName) {
404: Callback cb = new Callback() {
405: public void execute(Response resp) {
406: Response.Bind bindResp = (Response.Bind) resp;
407: if (resp.isAvailable()) {
408: if (logger.isDebugEnabled())
409: logger.debug(agentName + ": rebind: "
410: + " success=" + resp.isSuccess()
411: + " didBind=" + bindResp.didBind());
412: if (bindResp.didBind()) {
413: logger.debug(agentName
414: + ": Managing community (rebind)"
415: + communityName);
416: managedCommunities.add(communityName);
417: } else {
418: if (logger.isDebugEnabled())
419: logger
420: .debug(agentName
421: + ": Unable to rebind agent as community manager:"
422: + " agent=" + agentId
423: + " community="
424: + communityName);
425: }
426: resp.removeCallback(this );
427: }
428: }
429: };
430: whitePagesService.rebind(ae, cb);
431: }
432:
433: private static final UnaryPredicate communityDescriptorPredicate = new CommunityDescriptorPredicate();
434:
435: private static final class CommunityDescriptorPredicate implements
436: UnaryPredicate {
437: public boolean execute(Object o) {
438: return (o instanceof RelayAdapter && ((RelayAdapter) o)
439: .getContent() instanceof CommunityDescriptor);
440: }
441: }
442:
443: /**
444: * Check WPS binding to verify that the state of this community manager
445: * is in sync with the WPS bindings.
446: */
447: private void verifyManagerRole() {
448: Set l = new HashSet();
449: synchronized (communitiesToCheck) {
450: l.addAll(communitiesToCheck);
451: }
452: for (Iterator it = l.iterator(); it.hasNext();) {
453: final String communityName = (String) it.next();
454: // See if WP binding lists this agent as manager for each name
455: // in communityNames collection
456: FindCommunityCallback fmcb = new FindCommunityCallback() {
457: public void execute(String mgrName) {
458: if (logger.isDetailEnabled()) {
459: logger.detail(agentName + ": verifyWpsBinding:"
460: + " community=" + communityName
461: + " current=" + mgrName + " prior="
462: + priorManager);
463: }
464: if (isManager(communityName) && mgrName == null) {
465: assertCommunityManagerRole(communityName, true); // reassert mgr role
466: } else if (!isManager(communityName)
467: && agentName.equals(mgrName)) {
468: if (logger.isDebugEnabled()) {
469: logger.debug(agentName
470: + ": New WP binding:"
471: + " community=" + communityName
472: + " prior=" + priorManager
473: + " new=" + mgrName);
474: }
475: managedCommunities.add(communityName);
476: distributer.add(communityName, Collections
477: .singleton(agentId.toString()));
478: myBlackboardClient.startVerifyManagerCheck();
479: } else if (isManager(communityName)
480: && !agentName.equals(mgrName)) {
481: if (logger.isDebugEnabled()) {
482: logger.debug(agentName
483: + ": No longer bound in WP:"
484: + " community=" + communityName
485: + " prior=" + agentName + " new="
486: + mgrName);
487: }
488: managedCommunities.remove(communityName);
489: distributer.remove(communityName);
490: }
491: priorManager = mgrName;
492: }
493: };
494: findManager(communityName, fmcb);
495: }
496: }
497:
498: /**
499: * Predicate used to select community manager Requests sent by remote
500: * agents.
501: */
502: private IncrementalSubscription requestSub;
503: private static final UnaryPredicate requestPredicate = new RequestPredicate();
504:
505: private static final class RequestPredicate implements
506: UnaryPredicate {
507: public boolean execute(Object o) {
508: return (o instanceof Request);
509: }
510: };
511:
512: class MyBlackboardClient extends BlackboardClient {
513:
514: private BBWakeAlarm verifyMgrAlarm;
515:
516: public MyBlackboardClient(BindingSite bs) {
517: super (bs);
518: }
519:
520: protected void startVerifyManagerCheck() {
521: if (verifyMgrAlarm == null) {
522: verifyMgrAlarm = new BBWakeAlarm(now() + verifyInterval);
523: alarmService.addRealTimeAlarm(verifyMgrAlarm);
524: }
525: }
526:
527: public void setupSubscriptions() {
528: // Subscribe to CommunityManagerRequests
529: requestSub = (IncrementalSubscription) blackboard
530: .subscribe(requestPredicate);
531:
532: // Re-publish any CommunityDescriptor Relays found on BB
533: if (blackboard.didRehydrate()) {
534: Collection cds = blackboard
535: .query(communityDescriptorPredicate);
536: for (Iterator it = cds.iterator(); it.hasNext();) {
537: RelayAdapter ra = (RelayAdapter) it.next();
538: CommunityDescriptor cd = (CommunityDescriptor) ra
539: .getContent();
540: if (logger.isInfoEnabled()) {
541: logger
542: .info(agentName
543: + ": Found CommunityDescriptor Relay: community="
544: + cd.getCommunity());
545: }
546: communities.put(cd.getName(), cd.getCommunity());
547: distributer.add(ra);
548: assertCommunityManagerRole(cd.getName());
549: }
550: }
551:
552: }
553:
554: public void execute() {
555:
556: super .execute();
557:
558: // On verifyMgrAlarm expiration check WPS binding to verify that
559: // manager roles for this agent
560: if (verifyMgrAlarm != null && verifyMgrAlarm.hasExpired()) {
561: verifyManagerRole();
562: verifyMgrAlarm = new BBWakeAlarm(now() + verifyInterval);
563: alarmService.addRealTimeAlarm(verifyMgrAlarm);
564: }
565:
566: // Get CommunityManagerRequests sent by remote agents
567: Collection communityManagerRequests = requestSub
568: .getAddedCollection();
569: for (Iterator it = communityManagerRequests.iterator(); it
570: .hasNext();) {
571: Request req = (Request) it.next();
572: // Process requests sent from remote agents only
573: if (!agentName.equals(req.getSource().toString())) {
574: processRequest(req);
575: }
576: }
577: }
578:
579: }
580:
581: }
|