001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.qos.ca;
028:
029: import java.util.Enumeration;
030: import java.util.Iterator;
031: import java.util.Properties;
032:
033: import javax.naming.directory.Attribute;
034: import javax.naming.directory.Attributes;
035:
036: import org.cougaar.core.blackboard.IncrementalSubscription;
037: import org.cougaar.core.component.ServiceBroker;
038: import org.cougaar.core.relay.Relay;
039: import org.cougaar.core.service.BlackboardService;
040: import org.cougaar.core.service.community.Community;
041: import org.cougaar.core.service.community.Entity;
042: import org.cougaar.core.util.UID;
043: import org.cougaar.multicast.AttributeBasedAddress;
044: import org.cougaar.util.UnaryPredicate;
045:
046: /**
047: * An abstraction of the query role in
048: * QueryResponseCoordinationArtifacts, as provided by {@link
049: * QueryResponseCoordinationArtifactProvider}. The {@link Facet}
050: * methods are implemented here, leaving subclasses only to implement
051: * the abstract methods of this class.
052: */
053: abstract public class QueryFacet extends CommunityFacetImpl implements
054: QueryCoordArtConstants {
055: private String managerAttr;
056: private String communityRole;
057: private Relay lastQuery; // for cleaning up
058: private IncrementalSubscription responseSub;
059:
060: protected QueryFacet(CoordinationArtifact owner, ServiceBroker sb,
061: ConnectionSpec spec, RolePlayer player) {
062: super (owner, sb, spec, player);
063:
064: String communityType = spec.ca_parameters
065: .getProperty(COMMUNITY_TYPE_ATTRIBUTE);
066:
067: Properties role_parameters = spec.role_parameters;
068: managerAttr = role_parameters.getProperty(MANAGER_ATTRIBUTE);
069: communityRole = role_parameters
070: .getProperty(RESPONDERS_COMMUNITY_ROLE_ATTRIBUTE);
071: if (log.isDebugEnabled()) {
072: log.debug("Value of " + MANAGER_ATTRIBUTE + " is "
073: + managerAttr);
074: log.debug("Value of " + RESPONDERS_COMMUNITY_ROLE_ATTRIBUTE
075: + " is " + communityRole);
076: }
077:
078: String filter = CommunityFinder.makeFilter(
079: COMMUNITY_TYPE_ATTRIBUTE, communityType, managerAttr,
080: getAgentID().getAddress());
081: findCommunityForAny(filter, null);
082: }
083:
084: abstract protected boolean acceptFact(Object fact);
085:
086: public AttributeBasedAddress makeABA(String communityName) {
087: return AttributeBasedAddress.getAttributeBasedAddress(
088: communityName, "Role", communityRole);
089: }
090:
091: public void setupSubscriptions(BlackboardService blackboard) {
092: responseSub = (IncrementalSubscription) blackboard
093: .subscribe(ResponsePred);
094: }
095:
096: public void execute(BlackboardService blackboard) {
097: if (responseSub == null /* || !responseSub.hasChanged() */)
098: return;
099:
100: Enumeration en;
101:
102: // observe added relays
103: en = responseSub.getAddedList();
104: while (en.hasMoreElements()) {
105: Relay.Source response = (Relay.Source) en.nextElement();
106: if (log.isDebugEnabled()) {
107: log.debug("Observed added ResponseSub" + response);
108: }
109:
110: // Assert the data to the RolePlayer
111: processResponse(response);
112: }
113:
114: // observe changed relays
115: en = responseSub.getChangedList();
116: while (en.hasMoreElements()) {
117: Relay tr = (Relay) en.nextElement();
118: // Should not happen
119: if (log.isDebugEnabled()) {
120: log.debug("Observed changed ResponseSub " + tr);
121: }
122: }
123:
124: // removed relays
125: en = responseSub.getRemovedList();
126: while (en.hasMoreElements()) {
127: Relay tr = (Relay) en.nextElement();
128: if (log.isDebugEnabled()) {
129: log.debug("Observed removed ResponseSub" + tr);
130: }
131: }
132: }
133:
134: // Fact processing
135: public void processFactBase(BlackboardService blackboard) {
136: if (!factsHaveChanged())
137: return;
138: for (FactRevision frev = nextFact(); frev != null; frev = nextFact()) {
139: if (log.isDebugEnabled())
140: log.debug("Processing fact " + frev.getFact());
141: if (frev instanceof FactAssertion) {
142: Object fact = frev.getFact();
143: // Should only be one and should be a RequestFact
144: sendQuery(fact, blackboard);
145: } else {
146: // no retractions yet
147: }
148: }
149: }
150:
151: protected void sendQuery(Object fact, BlackboardService blackboard) {
152: if (log.isDebugEnabled()) {
153: log.debug("sendQueries()");
154: }
155:
156: AttributeBasedAddress aba = getABA();
157: if (aba == null)
158: return;
159: // too early, but this shouldn't happen
160:
161: UID uid = nextUID();
162: Relay qr = new QueryRelayImpl(uid, getAgentID(), aba, fact);
163: if (log.isInfoEnabled()) {
164: log.info("Sending QueryRelay from " + getAgentID()
165: + " to all nodes in community: " + getCommunity());
166: }
167: publishQuery(qr, blackboard);
168: }
169:
170: protected void publishQuery(Relay query,
171: BlackboardService blackboard) {
172: if (lastQuery != null) {
173: blackboard.publishRemove(lastQuery);
174: }
175: blackboard.publishAdd(query);
176: lastQuery = query;
177: }
178:
179: // Relay processing
180: /**
181: * Handle a single response. The default is to assert it
182: * immediately to the player. Subclasses can override.
183: */
184: protected void processResponse(Relay.Source response) {
185: Object responseFact = response.getContent();
186: if (log.isDebugEnabled())
187: log.debug("Tranformed " + response + " into "
188: + responseFact);
189: if (responseFact != null)
190: getPlayer().assertFact(responseFact);
191: }
192:
193: protected Receptacle makeReceptacle() {
194: return new QueryReceptacleImpl();
195: }
196:
197: private class QueryReceptacleImpl extends CommunityReceptacleImpl
198: implements QueryReceptacle {
199: public int getReceiverCount() {
200: Community community = getCommunity();
201: int count = 0;
202: Iterator itr = community.getEntities().iterator();
203: if (log.isDebugEnabled())
204: log.debug("Counting members of " + community.getName()
205: + " with Role " + communityRole);
206: while (itr.hasNext()) {
207: Entity entity = (Entity) itr.next();
208: Attributes attrs = entity.getAttributes();
209: Attribute attr = attrs.get("Role");
210: if (attr != null && attr.contains(communityRole))
211: ++count;
212: }
213: if (log.isDebugEnabled())
214: log.debug("Counted " + count + " members of "
215: + community.getName() + " with Role "
216: + communityRole);
217: return count;
218: }
219: }
220:
221: private UnaryPredicate ResponsePred = new UnaryPredicate() {
222: public boolean execute(Object o) {
223: if (o instanceof ResponseRelayImpl) {
224: Object fact = ((Relay.Source) o).getContent();
225: return acceptFact(fact);
226: } else {
227: return false;
228: }
229: }
230: };
231:
232: }
|