001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.planning.ldm.lps;
028:
029: import java.util.ArrayList;
030: import java.util.Collection;
031:
032: import org.cougaar.core.blackboard.BlackboardServesDomain;
033: import org.cougaar.core.blackboard.Directive;
034: import org.cougaar.core.domain.DelayedLPAction;
035: import org.cougaar.core.domain.LogicProvider;
036: import org.cougaar.core.domain.MessageLogicProvider;
037: import org.cougaar.core.domain.RootPlan;
038: import org.cougaar.core.util.UID;
039: import org.cougaar.planning.ldm.LogPlan;
040: import org.cougaar.planning.ldm.PlanningFactory;
041: import org.cougaar.planning.ldm.plan.Aggregation;
042: import org.cougaar.planning.ldm.plan.Allocation;
043: import org.cougaar.planning.ldm.plan.AllocationResult;
044: import org.cougaar.planning.ldm.plan.AllocationforCollections;
045: import org.cougaar.planning.ldm.plan.AssetTransfer;
046: import org.cougaar.planning.ldm.plan.Expansion;
047: import org.cougaar.planning.ldm.plan.Notification;
048: import org.cougaar.planning.ldm.plan.PEforCollections;
049: import org.cougaar.planning.ldm.plan.PlanElement;
050: import org.cougaar.planning.ldm.plan.TaskRescind;
051: import org.cougaar.planning.ldm.plan.WorkflowImpl;
052: import org.cougaar.util.log.Logger;
053: import org.cougaar.util.log.Logging;
054:
055: /**
056: * Take an incoming Notification Directive and
057: * perform Modification to the LOGPLAN
058: **/
059: public class ReceiveNotificationLP implements LogicProvider,
060: MessageLogicProvider {
061: private static final Logger logger = Logging
062: .getLogger(ReceiveNotificationLP.class);
063:
064: private final RootPlan rootplan;
065: private final LogPlan logplan;
066: private final PlanningFactory ldmf;
067:
068: public ReceiveNotificationLP(RootPlan rootplan, LogPlan logplan,
069: PlanningFactory ldmf) {
070: this .rootplan = rootplan;
071: this .logplan = logplan;
072: this .ldmf = ldmf;
073: }
074:
075: public void init() {
076: }
077:
078: /**
079: * perform updates -- per Notification ALGORITHM --
080: *
081: **/
082: public void execute(Directive dir, Collection changes) {
083: if (dir instanceof Notification) {
084: processNotification((Notification) dir, changes);
085: }
086: }
087:
088: private void processNotification(Notification not,
089: Collection changes) {
090: UID tuid = not.getTaskUID();
091: UID childuid = not.getChildTaskUID();
092: PlanElement pe = logplan.findPlanElement(tuid);
093: boolean needToRescind = (pe == null);
094:
095: if (logger.isDebugEnabled() && needToRescind)
096: logger
097: .debug("Got notification for task with no published PlanElement - will rescind the task: "
098: + tuid);
099:
100: // verify that the pe matches the task
101: if (!needToRescind && (pe instanceof AllocationforCollections)) {
102: UID remoteTUID = ((AllocationforCollections) pe)
103: .getAllocationTaskUID();
104: if (remoteTUID == null) {
105: needToRescind = true;
106: } else {
107: if (!(remoteTUID.equals(childuid))) {
108: // this was likely due to replacing the Allocation
109: if (logger.isInfoEnabled()) {
110: logger
111: .info("Got a Notification for the wrong allocation:"
112: + "\n\tTask="
113: + tuid
114: + " ("
115: + pe.getTask().getUID()
116: + ")"
117: + "\n\tFrom="
118: + childuid
119: + " ("
120: + remoteTUID
121: + ")"
122: + "\n\tResult="
123: + not.getAllocationResult()
124: + "\n" + "\n\tPE=" + pe);
125: }
126: needToRescind = true; // Insure that the old child task is gone.
127: return;
128: }
129: }
130: }
131:
132: if (needToRescind) {
133: TaskRescind trm = ldmf.newTaskRescind(childuid, not
134: .getSource());
135: if (logger.isDebugEnabled())
136: logger.debug("Sending new TaskRescind for " + childuid);
137: rootplan.sendDirective(trm, changes);
138: } else {
139: AllocationResult ar = not.getAllocationResult();
140: propagateNotification(rootplan, logplan, pe, tuid, ar,
141: childuid, changes);
142: }
143: }
144:
145: // default protection so that NotificationLP can call this method
146: static final void propagateNotification(RootPlan rootplan,
147: LogPlan logplan, UID tuid, AllocationResult result,
148: UID childuid, Collection changes) {
149: PlanElement pe = logplan.findPlanElement(tuid);
150: if (pe != null) {
151: propagateNotification(rootplan, logplan, pe, tuid, result,
152: childuid, changes);
153: } else {
154: if (logger.isDebugEnabled()) {
155: logger
156: .debug("Received notification about unknown task: "
157: + tuid);
158: }
159: // FIXME: Doesn't this mean that the downstream Task's parent is missing or screwy,
160: // and the downstream task should be rescinded?
161: }
162: }
163:
164: // default protection so that NotificationLP can call this method
165: static final void propagateNotification(RootPlan rootplan,
166: LogPlan logplan, PlanElement pe, UID tuid,
167: AllocationResult result, UID childuid, Collection changes) {
168:
169: // In general, do not pubChange PE if nothing changed. Primary job here is to propagate Received
170: // result, so if that hasn't changed, don't publishChange. However, Expansions are
171: // different since tuid task is not same as childuid (see bug 3462).
172: // (the big win here is less work during reconciliation when all notifications are resent)
173:
174: if ((pe instanceof Allocation) || (pe instanceof AssetTransfer)
175: || (pe instanceof Aggregation)) {
176:
177: // compare getReceivedResult .isEqual with this new one -- reconciliation after restart
178: // is going to resend all the ARs, and we should avoid propagating the changes
179: AllocationResult currAR = pe.getReceivedResult();
180: if ((result == null && currAR == null)
181: || (result != null && result.isEqual(currAR))) {
182: if (logger.isInfoEnabled()) {
183: logger
184: .info("Not propagating unchanged ReceivedResult for PE "
185: + pe + ", new result: " + result);
186: }
187: return;
188: }
189:
190: ((PEforCollections) pe).setReceivedResult(result);
191: if (logger.isDebugEnabled())
192: logger
193: .debug("pubChanging local PE with new ReceivedResult: "
194: + pe);
195: rootplan.change(pe, changes);
196: } else if (pe instanceof Expansion) {
197: // Note that below we avoid pubChanging the expansion if the newly calculated AR is same as
198: // the old
199: rootplan.delayLPAction(new DelayedAggregateResults(
200: (Expansion) pe, childuid));
201:
202: /*
203: Workflow wf = ((Expansion)pe).getWorkflow();
204: AllocationResult ar = wf.aggregateAllocationResults();
205: if (ar != null) {
206: // get the TaskScoreTable used in the aggregation
207: TaskScoreTable aggTST = ((WorkflowImpl)wf).getCurrentTST();
208: // get the UID of the child task that caused this aggregation
209: ((ExpansionImpl)pe).setSubTaskResults(aggTST,childuid);
210: ((PEforCollections) pe).setReceivedResult(ar);
211: rootplan.change(pe, changes);
212: } // if we can't successfully aggregate the results - don't send a notification
213: */
214: /*
215: } else if (pe instanceof Disposition) {
216: // drop the notification on the floor - cannot possibly be valid
217: }
218: */
219: } else {
220: if (logger.isInfoEnabled()) {
221: logger
222: .info("Got a Notification for an inappropriate PE:\n"
223: + "\tTask="
224: + tuid
225: + "\n"
226: + "\tFrom="
227: + childuid
228: + "\n"
229: + "\tResult="
230: + result + "\n" + "\tPE=" + pe);
231: }
232: }
233: }
234:
235: /** delay the results aggregation of an expansion until the end in case
236: * we have lots of them to do.
237: **/
238: private final static class DelayedAggregateResults implements
239: DelayedLPAction {
240: private final Expansion pe;
241: private final ArrayList ids = new ArrayList(1);
242:
243: DelayedAggregateResults(Expansion pe, UID id) {
244: this .pe = pe;
245: ids.add(id);
246: }
247:
248: public void execute(BlackboardServesDomain bb) {
249: WorkflowImpl wf = (WorkflowImpl) pe.getWorkflow();
250:
251: // compute the new result from the subtask results.
252: try {
253: AllocationResult ar = wf
254: .aggregateAllocationResults(ids);
255: AllocationResult currAR = pe.getReceivedResult();
256: // If the newly calculated result is at all different from the previously
257: // calculated result, then make the change and publishChange the Expansion
258: if ((ar == null && currAR != null)
259: || (ar != null && !ar.isEqual(currAR))) {
260: // set the result on the Expansion
261: ((PEforCollections) pe).setReceivedResult(ar);
262:
263: // publish the change to the blackboard.
264:
265: // Note that the above setReceivedResult puts a PlanElement.ReportedResultChangeReport
266: // on this transaction.
267: bb.change(pe, null); // drop the change details.
268: //bb.change(pe, changes);
269: //Logging.printDot("=");
270: } else {
271: if (logger.isInfoEnabled())
272: logger.info("NOT publishChanging Expansion "
273: + pe
274: + " - new ReceivedResult same as old: "
275: + ar);
276: }
277: } catch (RuntimeException re) {
278: logger.error(
279: "Caught exception while processing DelayedAggregateResults for "
280: + pe, re);
281: }
282: }
283:
284: /** hashcode is the hashcode of the expansion **/
285: public int hashCode() {
286: return pe.hashCode();
287: }
288:
289: /** these guys are equal iff the they have the same PE **/
290: public boolean equals(Object e) {
291: return (e instanceof DelayedAggregateResults && ((DelayedAggregateResults) e).pe == pe);
292: }
293:
294: /** merge another one into this one **/
295: public void merge(DelayedLPAction e) {
296: // don't bother to check, since we will only be here if this.equals(e).
297: DelayedAggregateResults other = (DelayedAggregateResults) e;
298: ids.addAll(other.ids);
299: }
300: }
301: }
|