0001: /*
0002: * <copyright>
0003: *
0004: * Copyright 2001-2004 Mobile Intelligence Corp
0005: * under sponsorship of the Defense Advanced Research Projects
0006: * Agency (DARPA).
0007: *
0008: * You can redistribute this software and/or modify it under the
0009: * terms of the Cougaar Open Source License as published on the
0010: * Cougaar Open Source Website (www.cougaar.org).
0011: *
0012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023: *
0024: * </copyright>
0025: */
0026:
0027: package org.cougaar.community;
0028:
0029: import java.util.ArrayList;
0030: import java.util.Collection;
0031: import java.util.Collections;
0032: import java.util.HashMap;
0033: import java.util.HashSet;
0034: import java.util.Iterator;
0035: import java.util.List;
0036: import java.util.Map;
0037: import java.util.Set;
0038:
0039: import javax.naming.NamingEnumeration;
0040: import javax.naming.NamingException;
0041: import javax.naming.directory.Attribute;
0042: import javax.naming.directory.Attributes;
0043: import javax.naming.directory.ModificationItem;
0044:
0045: import org.cougaar.community.manager.CommunityManager;
0046: import org.cougaar.community.manager.DefaultCommunityManagerImpl;
0047: import org.cougaar.community.manager.Request;
0048: import org.cougaar.community.manager.RequestImpl;
0049: import org.cougaar.community.requests.ListAgentParentCommunities;
0050: import org.cougaar.core.agent.service.alarm.Alarm;
0051: import org.cougaar.core.blackboard.IncrementalSubscription;
0052: import org.cougaar.core.component.BindingSite;
0053: import org.cougaar.core.component.ServiceAvailableEvent;
0054: import org.cougaar.core.component.ServiceAvailableListener;
0055: import org.cougaar.core.component.ServiceBroker;
0056: import org.cougaar.core.mts.MessageAddress;
0057: import org.cougaar.core.service.AgentIdentificationService;
0058: import org.cougaar.core.service.LoggingService;
0059: import org.cougaar.core.service.ThreadService;
0060: import org.cougaar.core.service.UIDService;
0061: import org.cougaar.core.service.community.Community;
0062: import org.cougaar.core.service.community.CommunityResponse;
0063: import org.cougaar.core.service.community.CommunityResponseListener;
0064: import org.cougaar.core.service.community.CommunityService;
0065: import org.cougaar.core.service.community.Entity;
0066: import org.cougaar.core.service.community.FindCommunityCallback;
0067: import org.cougaar.core.service.wp.AddressEntry;
0068: import org.cougaar.core.service.wp.Callback;
0069: import org.cougaar.core.service.wp.Response;
0070: import org.cougaar.core.service.wp.WhitePagesService;
0071: import org.cougaar.core.util.UID;
0072: import org.cougaar.util.UnaryPredicate;
0073:
0074: /**
0075: * Default implementation of CommunityService that uses Blackboard Relays
0076: * for remote communication. This includes sending requests to a remote
0077: * community manager and community discovery.
0078: **/
0079: public class DefaultCommunityServiceImpl extends
0080: AbstractCommunityService implements CommunityService,
0081: java.io.Serializable, CommunityServiceConstants {
0082:
0083: protected BindingSite bindingSite;
0084: protected UIDService uidService;
0085:
0086: // This agent
0087: protected MessageAddress agentId;
0088:
0089: protected MyBlackboardClient myBlackboardClient;
0090: protected static Object cacheLock = new Object();
0091: protected CommunityRequestQueue requestQueue;
0092:
0093: protected long verifyMembershipsInterval = DEFAULT_VERIFY_MEMBERSHIPS_INTERVAL;
0094:
0095: /**
0096: * Constructor.
0097: * @param bs Agents BindingSite
0098: */
0099: public DefaultCommunityServiceImpl(BindingSite bs) {
0100: this .bindingSite = bs;
0101: agentId = getAgentId();
0102: agentName = agentId.toString();
0103: log = (LoggingService) getServiceBroker().getService(this ,
0104: LoggingService.class, null);
0105: initUidService();
0106: communityUpdateListener = new MyCommunityUpdateListener();
0107: myBlackboardClient = new MyBlackboardClient(bs);
0108: communityManager = getCommunityManager();
0109: getSystemProperties();
0110: synchronized (cacheLock) {
0111: if (cache == null) {
0112: ThreadService ts = (ThreadService) getServiceBroker()
0113: .getService(this , ThreadService.class, null);
0114: cache = new CommunityCache(ts);
0115: }
0116: }
0117: requestQueue = new CommunityRequestQueue(getServiceBroker(),
0118: this );
0119: myCommunities = new CommunityMemberships();
0120: membershipWatcher = new MembershipWatcher(agentName,
0121: DefaultCommunityServiceImpl.this , myCommunities);
0122: }
0123:
0124: protected void getSystemProperties() {
0125: try {
0126: verifyMembershipsInterval = Long
0127: .parseLong(System
0128: .getProperty(
0129: VERIFY_MEMBERSHIPS_INTERVAL_PROPERTY,
0130: Long
0131: .toString(DEFAULT_VERIFY_MEMBERSHIPS_INTERVAL)));
0132: } catch (Exception ex) {
0133: if (log.isWarnEnabled()) {
0134: log
0135: .warn(
0136: agentName
0137: + ": Exception setting parameter from system property",
0138: ex);
0139: }
0140: }
0141: }
0142:
0143: protected MessageAddress getAgentId() {
0144: AgentIdentificationService ais = (AgentIdentificationService) getServiceBroker()
0145: .getService(this , AgentIdentificationService.class,
0146: null);
0147: MessageAddress addr = ais.getMessageAddress();
0148: getServiceBroker().releaseService(this ,
0149: AgentIdentificationService.class, ais);
0150: return addr;
0151: }
0152:
0153: protected ServiceBroker getServiceBroker() {
0154: return bindingSite.getServiceBroker();
0155: }
0156:
0157: /*
0158: * Get Unique identifier.
0159: */
0160: protected UID getUID() {
0161: return uidService != null ? uidService.nextUID() : null;
0162: }
0163:
0164: /**
0165: * Initialize UIDService using ServiceAvailableListener if service not
0166: * immediately available.
0167: */
0168: private void initUidService() {
0169: ServiceBroker sb = getServiceBroker();
0170: if (sb.hasService(org.cougaar.core.service.UIDService.class)) {
0171: uidService = (UIDService) sb.getService(this ,
0172: UIDService.class, null);
0173: } else {
0174: sb.addServiceListener(new ServiceAvailableListener() {
0175: public void serviceAvailable(ServiceAvailableEvent sae) {
0176: if (sae.getService().equals(UIDService.class)) {
0177: uidService = (UIDService) getServiceBroker()
0178: .getService(this , UIDService.class,
0179: null);
0180: }
0181: }
0182: });
0183: }
0184: }
0185:
0186: protected CommunityManager getCommunityManager() {
0187: return communityManager != null ? communityManager
0188: : new DefaultCommunityManagerImpl(bindingSite, this ,
0189: communityUpdateListener);
0190: }
0191:
0192: /**
0193: * Send a request to manager of specified community.
0194: * @param communityName String
0195: * @param requestType int
0196: * @param entity Entity
0197: * @param attrMods ModificationItem[]
0198: * @param crl CommunityResponseListener
0199: * @param delay Defines how long to wait before processing request, a value
0200: * of 0 or < 1 indicates that the request should be processed immediately
0201: */
0202: protected void queueCommunityRequest(final String communityName,
0203: final int requestType, final Entity entity,
0204: final ModificationItem[] attrMods,
0205: final CommunityResponseListener crl, final long timeout,
0206: final long delay) {
0207: if (log.isDebugEnabled()) {
0208: log.debug(agentName + ": queueCommunityRequest: "
0209: + " community=" + communityName + " type="
0210: + requestType + " entity=" + entity + " attrMods="
0211: + attrMods + " delay=" + delay);
0212: }
0213: if (delay > 0) {
0214: requestQueue.add(delay, communityName, requestType, entity,
0215: attrMods, timeout, crl);
0216: } else {
0217: sendCommunityRequest(communityName, requestType, entity,
0218: attrMods, timeout, crl);
0219: }
0220: }
0221:
0222: /**
0223: * Send request to manager.
0224: * @param communityName String
0225: * @param requestType int
0226: * @param entity Entity
0227: * @param attrMods ModificationItem[]
0228: * @param crl CommunityResponseListener
0229: */
0230: protected void sendCommunityRequest(final String communityName,
0231: final int requestType, final Entity entity,
0232: final ModificationItem[] attrMods, final long timeout,
0233: final CommunityResponseListener crl) {
0234: if (log.isDebugEnabled()) {
0235: log.debug(agentName + ": sendCommunityRequest: "
0236: + " community=" + communityName + " type="
0237: + requestType + " entity=" + entity + " attrMods="
0238: + attrMods + " timeout=" + timeout);
0239: }
0240: FindCommunityCallback fmcb = new FindCommunityCallback() {
0241: public void execute(String managerName) {
0242: if (log.isDebugEnabled()) {
0243: log.debug(agentName + ": sendCommunityRequest: "
0244: + " community=" + communityName
0245: + " manager=" + managerName);
0246: }
0247: if (managerName != null) {
0248: if (managerName.equals(agentName)) { // is this agent manager?
0249: CommunityResponse resp = communityManager
0250: .processRequest(agentName,
0251: communityName, requestType,
0252: entity, attrMods);
0253: Set listeners = Collections.singleton(crl);
0254: handleResponse(communityName, resp, listeners);
0255: } else { // Send request to remote manager agent
0256: MessageAddress managerAddr = MessageAddress
0257: .getMessageAddress(managerName);
0258: Request req = new RequestImpl(
0259: agentId, // source
0260: managerAddr, // target
0261: communityName, requestType, entity,
0262: attrMods, getUID(), crl);
0263: myBlackboardClient.publish(req,
0264: BlackboardClient.ADD);
0265: }
0266: } else {
0267: handleResponse(communityName,
0268: new CommunityResponseImpl(
0269: CommunityResponse.TIMEOUT, null),
0270: Collections.singleton(crl));
0271: }
0272: }
0273: };
0274: findCommunity(communityName, fmcb, timeout);
0275: }
0276:
0277: /**
0278: * Handle response to community request returned by manager.
0279: * @param req Request
0280: */
0281: protected void handleResponse(Request req) {
0282: handleResponse(req.getCommunityName(), (CommunityResponse) req
0283: .getResponse(), req.getCommunityResponseListeners());
0284: }
0285:
0286: protected void sendResponse(CommunityResponse resp, Set listeners) {
0287: myBlackboardClient.queueResponse(resp, listeners);
0288: }
0289:
0290: protected String getAgentName() {
0291: return agentId.toString();
0292: }
0293:
0294: /**
0295: * Lists all communities in White pages.
0296: * @return Collection of community names
0297: */
0298: public Collection listAllCommunities() {
0299: List commNames = new ArrayList();
0300: try {
0301: WhitePagesService wps = (WhitePagesService) getServiceBroker()
0302: .getService(this , WhitePagesService.class, null);
0303: recursiveFindCommunities(commNames, wps, ".comm", // all community entries end in ".comm"
0304: 0, // no timeout
0305: -1); // no recursion limit
0306: } catch (Exception e) {
0307: if (log.isDebugEnabled()) {
0308: log.error(agentName + ": Error in listAllCommunities: "
0309: + e);
0310: }
0311: }
0312: return commNames;
0313: }
0314:
0315: public void listAllCommunities(CommunityResponseListener crl) {
0316: crl.getResponse(new CommunityResponseImpl(
0317: CommunityResponse.SUCCESS, listAllCommunities()));
0318: }
0319:
0320: private static final void recursiveFindCommunities(
0321: Collection toCol, WhitePagesService wps, String suffix,
0322: long timeout, int limit) throws Exception {
0323: if (limit == 0) {
0324: // max recursion depth
0325: return;
0326: }
0327: Set names = wps.list(suffix, timeout);
0328: for (Iterator iter = names.iterator(); iter.hasNext();) {
0329: String s = (String) iter.next();
0330: if (s == null || s.length() <= 5) {
0331: // never
0332: } else if (s.charAt(0) == '.') {
0333: // hierarchical community name
0334: recursiveFindCommunities(toCol, wps, s, timeout,
0335: (limit - 1));
0336: } else {
0337: // trim the ".comm" suffix
0338: String commName = s.substring(0, s.length() - 5);
0339: toCol.add(commName);
0340: }
0341: }
0342: }
0343:
0344: /**
0345: * Invokes callback when specified community is found.
0346: * @param communityName Name of community
0347: * @param fccb Callback invoked after community is found or timeout
0348: * has lapsed
0349: * @param timeout Length of time (in milliseconds) to wait for
0350: * community to be located. A value of -1 disables
0351: * the timeout.
0352: */
0353: public void findCommunity(final String communityName,
0354: final FindCommunityCallback fccb, final long timeout) {
0355: if (log.isDetailEnabled()) {
0356: log.detail(agentName + ": findCommunity:" + " community="
0357: + communityName + " timeout=" + timeout);
0358: }
0359: long tryUntil = -1;
0360: if (timeout >= 0) {
0361: tryUntil = timeout == 0 ? 0 : System.currentTimeMillis()
0362: + timeout;
0363: }
0364: myBlackboardClient.queueFindManagerRequest(communityName, fccb,
0365: 0, tryUntil);
0366: }
0367:
0368: public void findManager(final String communityName,
0369: final FindCommunityCallback fccb, final long tryUntil) {
0370: Callback cb = new Callback() {
0371: long start = System.currentTimeMillis();
0372:
0373: public void execute(Response resp) {
0374: String name = null;
0375: if (resp.isAvailable() && resp.isSuccess()) {
0376: AddressEntry entry = ((Response.Get) resp)
0377: .getAddressEntry();
0378: if (entry != null) {
0379: name = entry.getURI().getPath().substring(1);
0380: }
0381: }
0382: long wpRespTime = System.currentTimeMillis() - start;
0383: if (log.isDebugEnabled() && wpRespTime > 10000) {
0384: log.debug(agentName + ": findManager.execute:"
0385: + " community=" + communityName
0386: + " manager=" + name + " wpRespTime="
0387: + wpRespTime);
0388: } else if (log.isDetailEnabled()) {
0389: log.detail(agentName + ": findManager.execute:"
0390: + " community=" + communityName
0391: + " manager=" + name + " wpRespTime="
0392: + wpRespTime);
0393: }
0394: if (name != null) {
0395: fccb.execute(name);
0396: } else { // retry?
0397: long now = System.currentTimeMillis();
0398: if (tryUntil < 0 || now < tryUntil) {
0399: myBlackboardClient.queueFindManagerRequest(
0400: communityName, fccb, 5000, // 5 sec delay
0401: tryUntil);
0402: } else {
0403: fccb.execute(null); // Give up
0404: }
0405: }
0406: }
0407: };
0408: WhitePagesService wps = (WhitePagesService) getServiceBroker()
0409: .getService(this , WhitePagesService.class, null);
0410: try {
0411: wps.get(communityName + ".comm", "community", cb);
0412: } catch (Exception ex) {
0413: if (log.isErrorEnabled()) {
0414: log.error(ex.getMessage());
0415: }
0416: } finally {
0417: getServiceBroker().releaseService(this ,
0418: WhitePagesService.class, wps);
0419: }
0420: }
0421:
0422: protected long now() {
0423: return System.currentTimeMillis();
0424: }
0425:
0426: class MyCommunityUpdateListener implements CommunityUpdateListener {
0427:
0428: public void updateCommunity(Community community) {
0429: if (log.isDebugEnabled()) {
0430: log.debug(agentName + ": updateCommunity:"
0431: + " community=" + community + " size="
0432: + community.getEntities().size());
0433: }
0434: cache.update(community);
0435: }
0436:
0437: public void removeCommunity(Community community) {
0438: if (log.isDebugEnabled()) {
0439: log.debug(agentName + ": remove: community="
0440: + community);
0441: }
0442: cache.remove(community.getName());
0443: //myBlackboardClient.publish(community, BlackboardClient.REMOVE);
0444: }
0445:
0446: }
0447:
0448: Map parentsForRemoteAgent = Collections
0449: .synchronizedMap(new HashMap());
0450:
0451: public Collection listParentCommunities(String member,
0452: CommunityResponseListener crl) {
0453: if (log.isDebugEnabled()) {
0454: log.debug("listParentCommunities:" + " member=" + member
0455: + " hasCRL=" + (crl != null));
0456: }
0457: String child = (member == null) ? getAgentName() : member;
0458: if (child.equals(getAgentName()) || cache.contains(child)) {
0459: // Entity is this agent or a community found in local cache
0460: return listParentCommunitiesForLocalEntity(child);
0461: } else {
0462: return listParentCommunitiesForRemoteEntity(child, crl);
0463: }
0464: }
0465:
0466: protected Collection listParentCommunitiesForLocalEntity(
0467: String member) {
0468: String child = (member == null) ? getAgentName() : member;
0469: Set parents = new HashSet();
0470: if (child.equals(getAgentName())) {
0471: // List parents of this agent
0472: parents = cache.getAncestorNames(child, false);
0473: } else if (cache.contains(child)) { // it's a community
0474: // List parents of community found in local cache
0475: Attributes attrs = cache.get(child).getAttributes();
0476: if (attrs != null) {
0477: Attribute parentAttr = attrs.get("Parent");
0478: if (parentAttr != null) {
0479: try {
0480: for (NamingEnumeration en = parentAttr.getAll(); en
0481: .hasMore();) {
0482: parents.add((String) en.next());
0483: }
0484: } catch (NamingException ne) {
0485: if (log.isErrorEnabled()) {
0486: log.error(agentName
0487: + ": Error parsing attributes for "
0488: + child, ne);
0489: }
0490: }
0491: }
0492: }
0493: }
0494: if (log.isDebugEnabled()) {
0495: log.debug("listParentCommunitiesForLocalEntity:"
0496: + " member=" + child + " parents=" + parents);
0497: }
0498: return parents;
0499: }
0500:
0501: /**
0502: * Create list of parent communities. For local agent this can easily be
0503: * obtained from cache. For any other agent/community a request must be sent
0504: * to the agent or community manager.
0505: * @param member String
0506: * @param crl CommunityResponseListener
0507: * @return Collection
0508: */
0509: public Collection listParentCommunitiesForRemoteEntity(
0510: final String member, final CommunityResponseListener crl) {
0511: Collection allCommunities = listAllCommunities();
0512: if (log.isDebugEnabled()) {
0513: log.debug("listParentCommunitiesForRemoteEntity:"
0514: + " requester=" + getAgentName() + " member="
0515: + member + " hasCRL=" + (crl != null)
0516: + " boundCommunities=" + allCommunities
0517: +
0518: //" cache=" + parentsForRemoteAgent.keySet() +
0519: " memberIsCommunity="
0520: + allCommunities.contains(member));
0521: }
0522: if (parentsForRemoteAgent.containsKey(member)) {
0523: return (Collection) parentsForRemoteAgent.remove(member);
0524: }
0525: if (allCommunities.contains(member)) {
0526: // Member is a community, send relay to community manager
0527: findManager(member, new FindCommunityCallback() {
0528: public void execute(String manager) {
0529: if (manager != null) {
0530: UID uid = uidService.nextUID();
0531: ListAgentParentCommunities tr = new ListAgentParentCommunities(
0532: agentId, uid, member);
0533: RelayAdapter relay = new RelayAdapter(agentId,
0534: tr, uid);
0535: relay.setCommunityResponseListener(crl);
0536: relay.addTarget(MessageAddress
0537: .getMessageAddress(manager));
0538: if (log.isDebugEnabled()) {
0539: log
0540: .debug("listParentCommunitiesForRemoteCommunity: "
0541: + " member="
0542: + member
0543: + " communityManager="
0544: + manager
0545: + " hasCRL="
0546: + (crl != null)
0547: + " uid="
0548: + uid);
0549: }
0550: myBlackboardClient.publish(relay,
0551: BlackboardClient.ADD);
0552: } else {
0553: if (log.isDebugEnabled()) {
0554: log
0555: .debug("listParentCommunites: TIMEOUT member="
0556: + member);
0557: }
0558: if (crl != null) {
0559: crl.getResponse(new CommunityResponseImpl(
0560: CommunityResponse.TIMEOUT,
0561: Collections.EMPTY_SET));
0562: }
0563: }
0564: }
0565: }, 5000);
0566: } else {
0567: // Member is a regular agent, send relay to agent
0568: UID uid = uidService.nextUID();
0569: ListAgentParentCommunities tr = new ListAgentParentCommunities(
0570: agentId, uid, member);
0571: RelayAdapter relay = new RelayAdapter(agentId, tr, uid);
0572: relay.setCommunityResponseListener(crl);
0573: relay.addTarget(MessageAddress.getMessageAddress(member));
0574: if (log.isDebugEnabled()) {
0575: log.debug("listParentCommunitiesForRemoteAgent: "
0576: + " member=" + member + " hasCRL="
0577: + (crl != null) + " uid=" + uid);
0578: }
0579: myBlackboardClient.publish(relay, BlackboardClient.ADD);
0580: }
0581: return null;
0582: }
0583:
0584: class MyBlackboardClient extends BlackboardClient {
0585:
0586: List findManagerRequests = Collections
0587: .synchronizedList(new ArrayList());
0588: WakeAlarm findMgrTimer;
0589: WakeAlarm verifyMembershipsTimer;
0590: boolean myCommunitiesChanged;
0591:
0592: List responses = new ArrayList();
0593:
0594: public MyBlackboardClient(BindingSite bs) {
0595: super (bs);
0596: }
0597:
0598: protected long now() {
0599: return System.currentTimeMillis();
0600: }
0601:
0602: protected void queueFindManagerRequest(String communityName,
0603: FindCommunityCallback fccb, long delay, long tryUntil) {
0604: if (log.isDetailEnabled()) {
0605: log.detail("queueFindManagerRequest: " + " community="
0606: + communityName + " delay=" + delay
0607: + " tryUntil=" + tryUntil);
0608: }
0609: findManagerRequests.add(new FindManagerRequest(now()
0610: + delay, communityName, fccb, tryUntil));
0611: if (findMgrTimer != null)
0612: findMgrTimer.expire();
0613: blackboard.signalClientActivity();
0614: }
0615:
0616: protected void queueResponse(CommunityResponse resp,
0617: Set listeners) {
0618: responses.add(new ResponseHolder(resp, listeners));
0619: if (blackboard != null)
0620: blackboard.signalClientActivity();
0621: }
0622:
0623: public void setupSubscriptions() {
0624:
0625: if (blackboard.didRehydrate()) {
0626: // Look for a persisted CommunityMemberships instance
0627: // This is used to determine what communities this agent previously joined
0628: // in order to ensure that correct memberships are maintained after a restart.
0629: Collection cms = blackboard
0630: .query(communityMembershipsPredicate);
0631: if (cms.isEmpty()) {
0632: blackboard.publishAdd(myCommunities);
0633: } else {
0634: myCommunities = (CommunityMemberships) cms
0635: .iterator().next();
0636: membershipWatcher.setMemberships(myCommunities);
0637: }
0638: }
0639:
0640: myCommunities
0641: .addListener(new CommunityMembershipsListener() {
0642: public void membershipsChanged() {
0643: myCommunitiesChanged = true;
0644: if (!myCommunities.listCommunities()
0645: .isEmpty()
0646: && verifyMembershipsTimer == null) {
0647: verifyMembershipsTimer = new WakeAlarm(
0648: System.currentTimeMillis()
0649: + verifyMembershipsInterval);
0650: alarmService
0651: .addRealTimeAlarm(verifyMembershipsTimer);
0652: }
0653: }
0654: });
0655:
0656: // Activate MembershipWatcher
0657: if (!myCommunities.listCommunities().isEmpty()
0658: && verifyMembershipsTimer == null) {
0659: verifyMembershipsTimer = new WakeAlarm(System
0660: .currentTimeMillis()
0661: + verifyMembershipsInterval);
0662: alarmService.addRealTimeAlarm(verifyMembershipsTimer);
0663: }
0664:
0665: // Subscribe to CommunityRequests
0666: communityRequestSub = (IncrementalSubscription) blackboard
0667: .subscribe(communityRequestPredicate);
0668:
0669: // Subscribe to CommunityDescriptors
0670: communityDescriptorSub = (IncrementalSubscription) blackboard
0671: .subscribe(communityDescriptorPredicate);
0672:
0673: // Subscribe to ListParentCommunities request and response
0674: listParentCommunitiesSub = (IncrementalSubscription) blackboard
0675: .subscribe(listParentCommunitiesPredicate);
0676: listParentCommunitiesResponseSub = (IncrementalSubscription) blackboard
0677: .subscribe(listParentCommunitiesResponsePredicate);
0678:
0679: }
0680:
0681: public void execute() {
0682: super .execute();
0683:
0684: sendCommunityResponses();
0685:
0686: // Resend queued FindManagerRequests
0687: if (findMgrTimer == null || findMgrTimer.hasExpired()) {
0688: performFindManagerRetries();
0689: if (!findManagerRequests.isEmpty()) {
0690: findMgrTimer = new WakeAlarm(now() + TIMER_INTERVAL);
0691: alarmService.addRealTimeAlarm(findMgrTimer);
0692: } else {
0693: findMgrTimer = null;
0694: }
0695: }
0696:
0697: // Verify agent memberships
0698: if (verifyMembershipsTimer != null
0699: && verifyMembershipsTimer.hasExpired()) {
0700: if (myCommunitiesChanged) {
0701: blackboard.publishChange(myCommunities);
0702: myCommunitiesChanged = false;
0703: }
0704: membershipWatcher.validate();
0705: if (!myCommunities.listCommunities().isEmpty()) {
0706: verifyMembershipsTimer = new WakeAlarm(now()
0707: + verifyMembershipsInterval);
0708: alarmService
0709: .addRealTimeAlarm(verifyMembershipsTimer);
0710: } else {
0711: verifyMembershipsTimer = null;
0712: }
0713: }
0714:
0715: // Process request response
0716: Collection communityRequests = communityRequestSub
0717: .getChangedCollection();
0718: for (Iterator it = communityRequests.iterator(); it
0719: .hasNext();) {
0720: Request req = (Request) it.next();
0721: if (agentId.equals(req.getSource())) {
0722: if (logger.isDetailEnabled()) {
0723: logger.detail(agentName
0724: + ": Request subscription: " + req);
0725: }
0726: blackboard.publishRemove(req); // Remove completed request from BB
0727: handleResponse(req);
0728: }
0729: }
0730:
0731: // Receives CommunityDescriptors from community managers. A CommunityDescriptor
0732: // is basically a wrapper around a Community instance that defines the
0733: // entities and attributes of a community.
0734: for (Iterator it = communityDescriptorSub
0735: .getAddedCollection().iterator(); it.hasNext();) {
0736: CommunityDescriptor cd = (CommunityDescriptor) it
0737: .next();
0738: if (logger.isDebugEnabled()) {
0739: logger.debug(agentName
0740: + ": received added CommunityDescriptor: "
0741: + cd + " size="
0742: + cd.getCommunity().getEntities().size());
0743: }
0744: communityUpdateListener.updateCommunity(cd
0745: .getCommunity());
0746: }
0747: for (Iterator it = communityDescriptorSub
0748: .getChangedCollection().iterator(); it.hasNext();) {
0749: CommunityDescriptor cd = (CommunityDescriptor) it
0750: .next();
0751: if (logger.isDebugEnabled()) {
0752: logger
0753: .debug(agentName
0754: + ": received changed CommunityDescriptor: "
0755: + cd
0756: + " size="
0757: + cd.getCommunity().getEntities()
0758: .size());
0759: }
0760: communityUpdateListener.updateCommunity(cd
0761: .getCommunity());
0762: }
0763: for (Iterator it = communityDescriptorSub
0764: .getRemovedCollection().iterator(); it.hasNext();) {
0765: CommunityDescriptor cd = (CommunityDescriptor) it
0766: .next();
0767: if (logger.isDebugEnabled()) {
0768: logger
0769: .debug(agentName
0770: + ": received removed CommunityDescriptor: "
0771: + cd
0772: + " size="
0773: + cd.getCommunity().getEntities()
0774: .size());
0775: }
0776: communityUpdateListener.removeCommunity(cd
0777: .getCommunity());
0778: }
0779:
0780: // ListParentCommunities requests
0781: for (Iterator it = listParentCommunitiesSub
0782: .getAddedCollection().iterator(); it.hasNext();) {
0783: ListAgentParentCommunities tr = (ListAgentParentCommunities) it
0784: .next();
0785: String member = tr.getMember();
0786: Collection parents = listParentCommunities(member);
0787: if (logger.isDebugEnabled()) {
0788: logger
0789: .debug("Received ListAgentParentCommunities request:"
0790: + " source="
0791: + tr.getSource()
0792: + " member="
0793: + tr.getMember()
0794: + " parents="
0795: + parents
0796: + " uid="
0797: + tr.getUID());
0798: }
0799: tr.setResponse(new CommunityResponseImpl(
0800: CommunityResponse.SUCCESS, parents));
0801: blackboard.publishChange(tr);
0802: }
0803:
0804: // ListParentCommunities responses
0805: for (Iterator it = listParentCommunitiesResponseSub
0806: .getChangedCollection().iterator(); it.hasNext();) {
0807: RelayAdapter ra = (RelayAdapter) it.next();
0808: ListAgentParentCommunities tr = (ListAgentParentCommunities) ra
0809: .getContent();
0810: String member = tr.getMember();
0811: Collection parents = (Collection) ((CommunityResponse) ra
0812: .getResponse()).getContent();
0813: parentsForRemoteAgent.put(member, parents);
0814: if (logger.isDebugEnabled()) {
0815: logger
0816: .debug("Received ListAgentParentCommunities response:"
0817: + " member="
0818: + member
0819: + " parents="
0820: + parents
0821: + " hasCallback="
0822: + (ra
0823: .getCommunityResponseListener() != null)
0824: + " uid=" + tr.getUID());
0825: }
0826: CommunityResponseListener crl = ra
0827: .getCommunityResponseListener();
0828: if (crl != null) {
0829: crl.getResponse((CommunityResponse) ra
0830: .getResponse());
0831: }
0832: blackboard.publishRemove(ra);
0833: }
0834: }
0835:
0836: private void sendCommunityResponses() {
0837: int n;
0838: List l;
0839: synchronized (responses) {
0840: n = responses.size();
0841: if (n <= 0 || blackboard == null) {
0842: return;
0843: }
0844: l = new ArrayList(responses);
0845: responses.clear();
0846: }
0847: for (int i = 0; i < n; i++) {
0848: ResponseHolder resp = (ResponseHolder) l.get(i);
0849: if (resp != null) {
0850: Set listeners = resp.getListeners();
0851: for (Iterator it = listeners.iterator(); it
0852: .hasNext();) {
0853: CommunityResponseListener crl = (CommunityResponseListener) it
0854: .next();
0855: if (crl != null) {
0856: crl.getResponse(resp.getResponse());
0857: }
0858: }
0859: }
0860: }
0861: }
0862:
0863: private void performFindManagerRetries() {
0864: int n;
0865: List l;
0866: long now = now();
0867: if (log.isDetailEnabled()) {
0868: log.detail("performFindManagerRetries: entries="
0869: + findManagerRequests.size());
0870: }
0871: synchronized (findManagerRequests) {
0872: n = findManagerRequests.size();
0873: if (n <= 0 || blackboard == null) {
0874: return;
0875: }
0876: l = new ArrayList(findManagerRequests);
0877: findManagerRequests.clear();
0878: }
0879: for (int i = 0; i < n; i++) {
0880: FindManagerRequest req = (FindManagerRequest) l.get(i);
0881: if (now >= req.getTime()) {
0882: findManager(req.getCommunityName(), req
0883: .getCallback(), req.tryUntil);
0884: } else { // requeue
0885: findManagerRequests.add(req);
0886: }
0887: }
0888: }
0889:
0890: private final UnaryPredicate communityMembershipsPredicate = new CommunityMembershipsPredicate();
0891:
0892: private class CommunityMembershipsPredicate implements
0893: UnaryPredicate {
0894: public boolean execute(Object o) {
0895: return (o instanceof CommunityMemberships);
0896: }
0897: }
0898:
0899: /**
0900: * Predicate used to list parent communities.
0901: */
0902: private IncrementalSubscription listParentCommunitiesSub;
0903: private final UnaryPredicate listParentCommunitiesPredicate = new ListParentCommunitiesPredicate();
0904:
0905: private class ListParentCommunitiesPredicate implements
0906: UnaryPredicate {
0907: public boolean execute(Object o) {
0908: return (o instanceof ListAgentParentCommunities);
0909: }
0910: }
0911:
0912: /**
0913: * Predicate used to select CommunityRequests.
0914: */
0915: private IncrementalSubscription communityRequestSub;
0916: private final UnaryPredicate communityRequestPredicate = new CommunityRequestPredicate();
0917:
0918: private class CommunityRequestPredicate implements
0919: UnaryPredicate {
0920: public boolean execute(Object o) {
0921: return (o instanceof Request);
0922: }
0923: }
0924:
0925: /**
0926: * Selects CommunityDescriptors that are sent by remote community manager
0927: * agent.
0928: */
0929: private IncrementalSubscription communityDescriptorSub;
0930: private final UnaryPredicate communityDescriptorPredicate = new CommunityDescriptorPredicate();
0931:
0932: private class CommunityDescriptorPredicate implements
0933: UnaryPredicate {
0934: public boolean execute(Object o) {
0935: return (o instanceof CommunityDescriptor);
0936: }
0937: }
0938:
0939: /**
0940: * Selects RelayAdapters containing ListParentCommunities request
0941: */
0942: private IncrementalSubscription listParentCommunitiesResponseSub;
0943: private final UnaryPredicate listParentCommunitiesResponsePredicate = new ListParentCommunitiesResponsePredicate();
0944:
0945: private class ListParentCommunitiesResponsePredicate implements
0946: UnaryPredicate {
0947: public boolean execute(Object o) {
0948: if (o instanceof RelayAdapter) {
0949: RelayAdapter ra = (RelayAdapter) o;
0950: return (ra.getContent() instanceof ListAgentParentCommunities);
0951: }
0952: return false;
0953: }
0954: }
0955:
0956: // Timer for periodically checking blackboard availability.
0957: // Blackboard activity is signaled once the blackboard service is available
0958: // to check for queued requests
0959: protected class WakeAlarm implements Alarm {
0960: private long expiresAt;
0961: private boolean expired = false;
0962:
0963: public WakeAlarm(long expirationTime) {
0964: expiresAt = expirationTime;
0965: }
0966:
0967: public long getExpirationTime() {
0968: return expiresAt;
0969: }
0970:
0971: public synchronized void expire() {
0972: if (!expired) {
0973: expired = true;
0974: blackboard.signalClientActivity();
0975: }
0976: }
0977:
0978: public boolean hasExpired() {
0979: return expired;
0980: }
0981:
0982: public synchronized boolean cancel() {
0983: boolean was = expired;
0984: expired = true;
0985: return was;
0986: }
0987: }
0988: }
0989:
0990: class FindManagerRequest {
0991: private long nextRetryTime;
0992: private String communityName;
0993: private FindCommunityCallback fmcb;
0994: private long tryUntil;
0995:
0996: FindManagerRequest(long time, String cname,
0997: FindCommunityCallback cb, long tu) {
0998: nextRetryTime = time;
0999: communityName = cname;
1000: fmcb = cb;
1001: tryUntil = tu;
1002: }
1003:
1004: protected long getTime() {
1005: return nextRetryTime;
1006: }
1007:
1008: protected String getCommunityName() {
1009: return communityName;
1010: }
1011:
1012: protected FindCommunityCallback getCallback() {
1013: return fmcb;
1014: }
1015: }
1016:
1017: class ResponseHolder {
1018: private CommunityResponse resp;
1019: private Set listeners;
1020:
1021: ResponseHolder(CommunityResponse r, Set l) {
1022: resp = r;
1023: listeners = l;
1024: }
1025:
1026: protected CommunityResponse getResponse() {
1027: return resp;
1028: }
1029:
1030: protected Set getListeners() {
1031: return listeners;
1032: }
1033: }
1034:
1035: }
|