001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.qos.ca;
028:
029: import java.util.Enumeration;
030: import java.util.Properties;
031:
032: import org.cougaar.core.blackboard.IncrementalSubscription;
033: import org.cougaar.core.component.ServiceBroker;
034: import org.cougaar.core.relay.Relay;
035: import org.cougaar.core.service.BlackboardService;
036: import org.cougaar.core.util.UID;
037: import org.cougaar.multicast.AttributeBasedAddress;
038: import org.cougaar.util.UnaryPredicate;
039:
040: /**
041: * An abstraction of the response role in the
042: * QueryResponseCoordinationArtifact, as provided by {@link
043: * QueryResponseCoordinationArtifactProvider}. The {@link Facet}
044: * methods are implemented here, leaving subclasses only to implement
045: * the abstract methods of this class.
046: */
047: abstract public class ResponseFacet extends CommunityFacetImpl
048: implements QueryCoordArtConstants {
049: private String managerRole;
050: // private HashSet completedUIDs;
051: private Relay lastResponse; // for cleaning up
052: private IncrementalSubscription querySub;
053:
054: protected ResponseFacet(CoordinationArtifact owner,
055: ServiceBroker sb, ConnectionSpec spec, RolePlayer player) {
056: super (owner, sb, spec, player);
057: Properties role_parameters = spec.role_parameters;
058: // completedUIDs = new HashSet();
059:
060: String communityType = spec.ca_parameters
061: .getProperty(COMMUNITY_TYPE_ATTRIBUTE);
062: managerRole = role_parameters.getProperty(MANAGER_ATTRIBUTE);
063: String communityRole = role_parameters
064: .getProperty(RESPONDERS_COMMUNITY_ROLE_ATTRIBUTE);
065: String agentId = getAgentID().getAddress();
066:
067: String filter = CommunityFinder.makeFilter(
068: COMMUNITY_TYPE_ATTRIBUTE, communityType);
069: // String filter =
070: // "(& (" +COMMUNITY_TYPE_ATTRIBUTE+ "=" +communityType+ ")" +
071: // "(!(" +managerRole+ "=" +agentId+ ")))";
072: if (log.isDebugEnabled())
073: log.debug("Response filter is " + filter);
074: UnaryPredicate predicate = CommunityFinder.memberHasRole(
075: agentId, communityRole);
076: findCommunityForAgent(filter, predicate);
077: }
078:
079: abstract protected boolean acceptFact(Object fact);
080:
081: public AttributeBasedAddress makeABA(String communityName) {
082: return AttributeBasedAddress.getAttributeBasedAddress(
083: communityName, "Role", managerRole);
084: }
085:
086: public void setupSubscriptions(BlackboardService blackboard) {
087: querySub = (IncrementalSubscription) blackboard
088: .subscribe(QueryPred);
089: }
090:
091: public void execute(BlackboardService blackboard) {
092: if (querySub == null /* || !querySub.hasChanged() */)
093: return;
094:
095: Enumeration en;
096: // observe added relays
097: en = querySub.getAddedList();
098: while (en.hasMoreElements()) {
099: Relay.Source relay = (Relay.Source) en.nextElement();
100: processQuery(relay);
101: }
102:
103: // observe changed relays shouldn't happen, because the
104: // manager should remove them when received log seen
105: en = querySub.getChangedList();
106: while (en.hasMoreElements()) {
107: Relay tr = (Relay) en.nextElement();
108: if (log.isDebugEnabled()) {
109: log.debug("Observed changed " + tr);
110: }
111:
112: // don't pay attention
113: }
114:
115: // removed relays
116: en = querySub.getRemovedList();
117: while (en.hasMoreElements()) {
118: Relay tr = (Relay) en.nextElement();
119: if (log.isDebugEnabled()) {
120: log.debug("Observed removed relay: " + tr);
121: }
122: }
123:
124: }
125:
126: // Process facts
127: public void processFactBase(BlackboardService blackboard) {
128: if (!factsHaveChanged())
129: return;
130:
131: for (FactRevision frev = nextFact(); frev != null; frev = nextFact()) {
132: if (log.isDebugEnabled())
133: log.debug("Processing fact " + frev.getFact());
134: if (frev instanceof FactAssertion) {
135: Object fact = frev.getFact();
136: sendReply(fact, blackboard);
137: } else {
138: // no retractions yet
139: }
140: }
141: }
142:
143: private void sendReply(Object reply, BlackboardService blackboard) {
144: // Pass back response relay
145: UID uid = nextUID();
146: //String s = "Response Matrix";
147:
148: // Remove old respsonse, reassign
149: if (lastResponse != null) {
150: blackboard.publishRemove(lastResponse);
151: }
152:
153: Relay response = new ResponseRelayImpl(uid, getAgentID(),
154: getABA(), reply);
155:
156: if (log.isDebugEnabled()) {
157: log.debug("Responding to query: " // + queryUID
158: + " with new reply: " + response);
159: }
160: //blackboard.publishChange(tr);
161: blackboard.publishAdd(response);
162:
163: lastResponse = response;
164:
165: }
166:
167: // process relays
168: void processQuery(Relay.Source query) {
169: // UID query_id = query.getUID();
170: // // check cache
171: // if(completedUIDs.contains(query_id)) {
172: // // ignore seen relay
173: // if(log.isDebugEnabled()) {
174: // log.debug("Observed already seen relay: " + query);
175: // }
176: // return;
177: // } else {
178: // // add relay to seen cache
179: // completedUIDs.add(query_id);
180: // if (log.isDebugEnabled()) {
181: // log.debug("Adding relay: "+query+ " to relays cache.");
182: // }
183: // }
184:
185: if (log.isDebugEnabled()) {
186: log.debug("Observed added relay" + query);
187: }
188:
189: Object fact = query.getContent();
190: if (log.isDebugEnabled())
191: log.debug("Updated Fact" + fact);
192: getPlayer().assertFact(fact);
193: }
194:
195: private UnaryPredicate QueryPred = new UnaryPredicate() {
196: public boolean execute(Object o) {
197: if (o instanceof QueryRelayImpl) {
198: Object fact = ((Relay.Source) o).getContent();
199: return acceptFact(fact);
200: } else {
201: return false;
202: }
203: }
204: };
205:
206: }
|