001: /*
002: * <copyright>
003: *
004: * Copyright 2001-2004 Mobile Intelligence Corp
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.community;
028:
029: import java.util.Collection;
030: import java.util.Collections;
031: import java.util.ArrayList;
032: import java.util.Iterator;
033: import java.util.List;
034:
035: import org.cougaar.core.service.community.CommunityService;
036: import org.cougaar.core.service.community.Agent;
037: import org.cougaar.core.service.community.Community;
038: import org.cougaar.core.service.community.Entity;
039: import org.cougaar.core.service.community.CommunityResponse;
040: import org.cougaar.core.service.community.CommunityResponseListener;
041: import org.cougaar.core.service.community.FindCommunityCallback;
042:
043: import javax.naming.directory.Attribute;
044: import javax.naming.directory.Attributes;
045: import javax.naming.directory.DirContext;
046: import javax.naming.directory.ModificationItem;
047: import javax.naming.NamingEnumeration;
048: import javax.naming.NamingException;
049:
050: import org.cougaar.util.log.LoggerFactory;
051: import org.cougaar.util.log.Logger;
052:
053: /**
054: * This class listens for community change events and updates local
055: * CommunityMembership object to reflect current state. The
056: * CommunityMembership is used to rejoin communities on a restart and to
057: * periodically verify that this agents view of the world is in sync with
058: * that of applicable community managers.
059: */
060: public class MembershipWatcher {
061:
062: protected String this Agent;
063: protected CommunityMemberships myCommunities;
064: protected CommunityService communityService;
065: protected Logger logger;
066: protected List managedCommunities = Collections
067: .synchronizedList(new ArrayList());;
068: protected List pendingOperations = Collections
069: .synchronizedList(new ArrayList());
070:
071: public MembershipWatcher(String agentName,
072: CommunityService commSvc, CommunityMemberships memberships) {
073: this .this Agent = agentName;
074: this .myCommunities = memberships;
075: this .communityService = commSvc;
076: this .logger = LoggerFactory.getInstance().createLogger(
077: MembershipWatcher.class);
078: }
079:
080: public MembershipWatcher(String agentName, CommunityService commSvc) {
081: this .this Agent = agentName;
082: this .myCommunities = new CommunityMemberships();
083: this .communityService = commSvc;
084: this .logger = LoggerFactory.getInstance().createLogger(
085: MembershipWatcher.class);
086: }
087:
088: public void setMemberships(CommunityMemberships memberships) {
089: this .myCommunities = memberships;
090: }
091:
092: public synchronized void validate() {
093: if (logger.isDebugEnabled()) {
094: logger.debug(this Agent
095: + ": validate community memberships: " + this Agent
096: + " myCommunities=" + myCommunities);
097: }
098: for (Iterator it = myCommunities.listCommunities().iterator(); it
099: .hasNext();) {
100: final String communityName = (String) it.next();
101: Collection entities = myCommunities
102: .getEntities(communityName);
103: for (Iterator it1 = entities.iterator(); it1.hasNext();) {
104: Entity entity = (Entity) it1.next();
105: if ((entity.getName().equals(this Agent))
106: && !pendingOperations.contains(communityName)) {
107: checkCommunity(communityName, entity, true);
108: }
109: }
110: }
111: Collection parents = communityService.listParentCommunities(
112: null, (CommunityResponseListener) null);
113: parents.removeAll(myCommunities.listCommunities());
114: for (Iterator it1 = parents.iterator(); it1.hasNext();) {
115: String parentName = (String) it1.next();
116: Community parentCommunity = communityService.getCommunity(
117: parentName, null);
118: if (parentCommunity != null
119: && parentCommunity.hasEntity(this Agent)
120: && !pendingOperations.contains(parentName)) {
121: // a problem occurs when a remote agent (AgentA) joins a community on behalf
122: // of another agent (AgentB). In this case AgentB's myCommunities
123: // is not aware that it has joined the community and attempts to
124: // leave the community at this point. At some point in the future
125: // AgentA will see that AgentB has left the community and because
126: // AgentA's myCommunity list says that AgentB should be in the community
127: // it will try to re-join the community for AgentB. Updating the
128: // myCommunities list at this point resolves the problem but I am
129: // not sure if it is the correct solution
130: if (!myCommunities.contains(parentName, this Agent)) {
131: myCommunities.add(parentName, new AgentImpl(
132: this Agent));
133: }
134: checkCommunity(parentName, new AgentImpl(this Agent),
135: true);
136: }
137: }
138:
139: }
140:
141: public void addPendingOperation(String communityName) {
142: if (!pendingOperations.contains(communityName)) {
143: pendingOperations.add(communityName);
144: }
145: }
146:
147: public void removePendingOperation(String communityName) {
148: if (pendingOperations.contains(communityName)) {
149: pendingOperations.remove(communityName);
150: }
151: }
152:
153: protected void checkCommunity(final String communityName,
154: final Entity entity, final boolean isMember) {
155: if (logger.isDetailEnabled()) {
156: logger.detail(this Agent + ": checkCommunityMembership:"
157: + " community=" + communityName + " entity="
158: + entity + " isMember=" + isMember);
159: }
160: FindCommunityCallback fccb = new FindCommunityCallback() {
161: public void execute(String managerName) {
162: if (managerName != null) { // Community exists
163: Community community = communityService
164: .getCommunity(communityName,
165: new CommunityResponseListener() {
166: public void getResponse(
167: CommunityResponse resp) {
168: Object obj = resp
169: .getContent();
170: if (obj != null
171: && !(obj instanceof Community)) {
172: logger
173: .warn(this Agent
174: + ": Invalid response object, type="
175: + obj
176: .getClass()
177: .getName());
178: } else {
179: Community community = (Community) obj;
180: if (isMember
181: && (community == null || !community
182: .hasEntity(entity
183: .getName()))) {
184: rejoin(
185: communityName,
186: entity);
187: } else if (!isMember
188: && community != null
189: && community
190: .hasEntity(entity
191: .getName())) {
192: leave(
193: communityName,
194: entity
195: .getName());
196: } else if (isMember
197: && community
198: .hasEntity(entity
199: .getName())) {
200: verifyAttributes(
201: communityName,
202: entity
203: .getName(),
204: community
205: .getEntity(
206: entity
207: .getName())
208: .getAttributes(),
209: entity
210: .getAttributes());
211: }
212: }
213: }
214: });
215: if (community != null) {
216: if (isMember
217: && !community.hasEntity(entity
218: .getName())) {
219: rejoin(communityName, entity);
220: } else if (!isMember
221: && community
222: .hasEntity(entity.getName())) {
223: leave(communityName, entity.getName());
224: } else if (isMember
225: && community
226: .hasEntity(entity.getName())) {
227: verifyAttributes(communityName, entity
228: .getName(), community.getEntity(
229: entity.getName()).getAttributes(),
230: entity.getAttributes());
231: }
232: }
233: } else { // Community doesn't exist
234: rejoin(communityName, entity);
235: }
236: }
237: };
238: communityService.findCommunity(communityName, fccb, 0);
239: }
240:
241: protected void verifyAttributes(String communityName,
242: String entityName, Attributes attrsFromComm,
243: Attributes attrsFromLocalCopy) {
244: ModificationItem attrDelta[] = getAttrDelta(attrsFromComm,
245: attrsFromLocalCopy);
246: if (attrDelta.length > 0) {
247: if (logger.isInfoEnabled()) {
248: logger.info(this Agent
249: + ": Correcting attributes:"
250: + " community="
251: + communityName
252: + " entity="
253: + entityName
254: + " numAttrsCorrected="
255: + attrDelta.length
256: + " commAttrs="
257: + CommunityUtils.attrsToString(attrsFromComm)
258: + " localAttrs="
259: + CommunityUtils
260: .attrsToString(attrsFromLocalCopy));
261: }
262: communityService.modifyAttributes(communityName,
263: entityName, attrDelta, null);
264: }
265: }
266:
267: protected void rejoin(final String communityName, Entity entity) {
268: if (logger.isDebugEnabled()) {
269: logger.debug(this Agent + ": Re-joining community:"
270: + " community=" + communityName + " entity="
271: + entity.getName());
272: }
273: addPendingOperation(communityName);
274: int type = entity instanceof Agent ? CommunityService.AGENT
275: : CommunityService.COMMUNITY;
276: communityService.joinCommunity(communityName, entity.getName(),
277: type, entity.getAttributes(), false, null,
278: new CommunityResponseListener() {
279: public void getResponse(CommunityResponse resp) {
280: removePendingOperation(communityName);
281: if (logger.isDetailEnabled()) {
282: logger.detail(this Agent + ": Join status="
283: + resp);
284: }
285: }
286: });
287: }
288:
289: protected void leave(final String communityName, String entityName) {
290: if (logger.isDebugEnabled()) {
291: logger.debug(this Agent + ": Leaving community:"
292: + " community=" + communityName + " entity="
293: + entityName);
294: }
295: addPendingOperation(communityName);
296: communityService.leaveCommunity(communityName, entityName,
297: new CommunityResponseListener() {
298: public void getResponse(CommunityResponse resp) {
299: if (logger.isDetailEnabled()) {
300: logger.detail("Leave status=" + resp);
301: }
302: removePendingOperation(communityName);
303: }
304: });
305: }
306:
307: protected ModificationItem[] getAttrDelta(Attributes attrsFromComm,
308: Attributes attrsFromLocalCopy) {
309: List mods = new ArrayList();
310: if (attrsFromLocalCopy != null && attrsFromLocalCopy.size() > 0) {
311: NamingEnumeration ne = attrsFromLocalCopy.getAll();
312: try {
313: while (ne.hasMore()) {
314: Attribute attr2 = (Attribute) ne.next();
315: Attribute attr1 = attrsFromComm.get(attr2.getID());
316: if (attr1 == null) {
317: mods.add(new ModificationItem(
318: DirContext.ADD_ATTRIBUTE, attr2));
319: } else {
320: if (!attr2.equals(attr1)) {
321: mods
322: .add(new ModificationItem(
323: DirContext.REPLACE_ATTRIBUTE,
324: attr2));
325: }
326: }
327: }
328: } catch (NamingException nex) {
329: if (logger.isWarnEnabled()) {
330: logger.warn(nex.getMessage());
331: }
332: }
333: }
334: return (ModificationItem[]) mods
335: .toArray(new ModificationItem[] {});
336: }
337:
338: }
|