001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.planning.ldm.lps;
028:
029: import java.util.Collection;
030: import java.util.Date;
031: import java.util.Enumeration;
032: import java.util.HashSet;
033: import java.util.Iterator;
034: import java.util.Set;
035:
036: import org.cougaar.core.blackboard.Directive;
037: import org.cougaar.core.blackboard.SubscriberException;
038: import org.cougaar.core.domain.LogicProvider;
039: import org.cougaar.core.domain.MessageLogicProvider;
040: import org.cougaar.core.domain.RootPlan;
041: import org.cougaar.core.logging.LoggingServiceWithPrefix;
042: import org.cougaar.core.mts.MessageAddress;
043: import org.cougaar.core.service.AlarmService;
044: import org.cougaar.planning.ldm.LogPlan;
045: import org.cougaar.planning.ldm.PlanningFactory;
046: import org.cougaar.planning.ldm.plan.Context;
047: import org.cougaar.planning.ldm.plan.NewTask;
048: import org.cougaar.planning.ldm.plan.PEforCollections;
049: import org.cougaar.planning.ldm.plan.PlanElement;
050: import org.cougaar.planning.ldm.plan.Preference;
051: import org.cougaar.planning.ldm.plan.Task;
052: import org.cougaar.planning.ldm.plan.TaskImpl;
053: import org.cougaar.util.log.Logger;
054: import org.cougaar.util.log.Logging;
055:
056: /**
057: * Take an incoming Task (excepting Rescind task) and
058: * add to the LogPlan w/side-effect of also disseminating to
059: * other subscribers.
060: * Only adds tasks that haven't been seen before, allowing stability
061: * in the face of wire retransmits.
062: **/
063: public class ReceiveTaskLP implements LogicProvider,
064: MessageLogicProvider {
065: private Logger logger;
066:
067: private final RootPlan rootplan;
068: private final LogPlan logplan;
069: private final PlanningFactory ldmf;
070: private final MessageAddress self;
071: private final AlarmService alarmService;
072: private Set existingPhrases = new HashSet();
073: private Set newPhrases = new HashSet();
074:
075: public ReceiveTaskLP(RootPlan rootplan, LogPlan logplan,
076: MessageAddress self, PlanningFactory ldmf,
077: AlarmService alarmService) {
078: this .rootplan = rootplan;
079: this .logplan = logplan;
080: this .self = self;
081: this .ldmf = ldmf;
082: this .alarmService = alarmService;
083: logger = new LoggingServiceWithPrefix(Logging
084: .getLogger(ReceiveTaskLP.class), self.toString() + ": ");
085: }
086:
087: public void init() {
088: }
089:
090: private Date currentDate() {
091: return new Date(alarmService.currentTimeMillis());
092: }
093:
094: /**
095: * Adds Task to LogPlan... Side-effect = other subscribers also
096: * updated. If the task is already in the logplan, then there is
097: * probably a change in task preferences. If there is no change in
098: * task preferences, then it might be the case that the sending
099: * agent has undergone a restart and is trying to resynchronize
100: * its tasks. We need to activate the NotificationLP to send the
101: * estimated allocation result for the plan element of the task. We
102: * do this by publishing a change of the plan element (if it
103: * exists).
104: * Also, the received task may have been deleted. If so, complete
105: * the deletion process by actually removing the task from the
106: * blackboard.
107: **/
108: public void execute(Directive dir, Collection changes) {
109: if (dir instanceof Task) {
110: Task tsk = (Task) dir;
111: try {
112: Task existingTask = logplan.findTask(tsk);
113: // We don't know about this task
114: if (existingTask == null) {
115: if (tsk.isDeleted()) {
116: // Ignore received deleted tasks
117: if (logger.isDebugEnabled()) {
118: logger
119: .debug("Ignoring new, but deleted task from another node "
120: + tsk.getUID());
121: }
122: } else {
123: // Add it to our blackboard
124: if (logger.isDebugEnabled()) {
125: logger
126: .debug("Received new task from another node "
127: + tsk.getUID());
128: }
129:
130: // First check committment date - dont add if
131: // its late. (see bug 3757)
132: if (tsk.beforeCommitment(currentDate())) {
133: rootplan.add(tsk);
134: } else if (logger.isInfoEnabled()) {
135: logger
136: .info("New task arrived past commitment date ("
137: + tsk.getCommitmentDate()
138: + "), not adding " + tsk);
139: }
140: }
141: } else if (tsk.isDeleted()) {
142: if (existingTask.isDeleted()) {
143: if (logger.isDebugEnabled()) {
144: logger.debug("Removing deleted task "
145: + tsk.getUID());
146: }
147: rootplan.remove(existingTask); // Complete the removal
148: } else {
149: // Whoops! We must have restarted and reverted to a
150: // pre-deletion state.
151: if (logger.isDebugEnabled()) {
152: logger
153: .debug("Received deleted task, but blackboard task is undeleted "
154: + tsk.getUID());
155: }
156: }
157: } else if (tsk == existingTask) {
158: // This never happens any more because a new task is created
159: // for each transmission
160: if (logger.isWarnEnabled()) {
161: logger
162: .warn("Received task instance already on blackboard "
163: + tsk.getUID());
164: }
165: // First check commitment date, dont do if old. (see bug 3757)
166: if (tsk.beforeCommitment(currentDate())) {
167: rootplan.change(existingTask, changes);
168: } else if (logger.isInfoEnabled()) {
169: logger.info("Task past commitment ("
170: + existingTask.getCommitmentDate()
171: + "), not changing " + tsk);
172: }
173: } else if (!existingTask
174: .beforeCommitment(currentDate())) {
175: // Task already commited. Can't change. (see bug 3757)
176: if (logger.isInfoEnabled())
177: logger.info("Existing task already committed ("
178: + existingTask.getCommitmentDate()
179: + "), not changing " + existingTask);
180: // Note this also skips sending back a notification of the EstAR
181: // if the commitment date is past.
182: } else {
183: // Update task from received task
184: boolean changedTask = false;
185:
186: // 1: Compare preferences
187: Preference[] newPreferences = ((TaskImpl) tsk)
188: .getPreferencesAsArray();
189: Preference[] existingPreferences = ((TaskImpl) existingTask)
190: .getPreferencesAsArray();
191: if (logger.isDebugEnabled()) {
192: logger.debug("Comparing " + tsk);
193: }
194:
195: if (!java.util.Arrays.equals(newPreferences,
196: existingPreferences)) {
197: if (logger.isDebugEnabled()) {
198: logger.debug("Preferences differ "
199: + newPreferences + "!="
200: + existingPreferences);
201: }
202: ((NewTask) existingTask).setPreferences(tsk
203: .getPreferences());
204: changedTask = true;
205: } else {
206: if (logger.isDebugEnabled()) {
207: logger.debug("Preferences compare equal "
208: + newPreferences + "=="
209: + existingPreferences);
210: }
211: }
212:
213: // 2: Compare Prep Phrases
214: for (Enumeration e = existingTask
215: .getPrepositionalPhrases(); e
216: .hasMoreElements();) {
217: existingPhrases.add(e.nextElement());
218: }
219: for (Enumeration e = tsk.getPrepositionalPhrases(); e
220: .hasMoreElements();) {
221: newPhrases.add(e.nextElement());
222: }
223:
224: boolean match = false;
225:
226: if (newPhrases.size() == existingPhrases.size()) {
227: for (Iterator existingIterator = existingPhrases
228: .iterator(); existingIterator.hasNext();) {
229: Object existingPhrase = existingIterator
230: .next();
231: match = false;
232:
233: for (Iterator newIterator = newPhrases
234: .iterator(); newIterator.hasNext();) {
235: if (newIterator.next().equals(
236: existingPhrase)) {
237: match = true;
238: break;
239: }
240: }
241:
242: if (match == false) {
243: break;
244: }
245: }
246: }
247:
248: if (!match) {
249: ((NewTask) existingTask)
250: .setPrepositionalPhrases(tsk
251: .getPrepositionalPhrases());
252: changedTask = true;
253: if (logger.isDebugEnabled()) {
254: logger.debug("Phrases differ " + newPhrases
255: + "!=" + existingPhrases);
256: }
257: } else {
258: if (logger.isDebugEnabled()) {
259: logger.debug("Phrases compare equal "
260: + newPhrases + "=="
261: + existingPhrases);
262: }
263: }
264:
265: existingPhrases.clear();
266: newPhrases.clear();
267:
268: // 3: Compare context
269: Context existingContext = existingTask.getContext();
270: Context tskContext = tsk.getContext();
271: if (logger.isInfoEnabled()) {
272: if (existingContext == null) {
273: logger
274: .info("existingTask has null context: "
275: + existingTask);
276: }
277: if (tskContext == null) {
278: logger
279: .info("received Task has null context: "
280: + tsk);
281: }
282: }
283: if (((existingContext != null) && !existingContext
284: .equals(tskContext))
285: || ((existingContext == null) && (tskContext != null))) {
286: ((NewTask) existingTask).setContext(tskContext);
287: changedTask = true;
288: if (logger.isDebugEnabled()) {
289: logger.debug("Contexts differ: "
290: + tskContext + "!="
291: + existingContext);
292: }
293: }
294:
295: // 4: Compare verb (?)
296: if (!existingTask.getVerb().equals(tsk.getVerb())) {
297: ((NewTask) existingTask).setVerb(tsk.getVerb());
298: changedTask = true;
299: if (logger.isDebugEnabled()) {
300: logger.debug("Verbs differ "
301: + tsk.getVerb() + "!="
302: + existingTask.getVerb());
303: }
304: }
305:
306: // If anything differed, we changed it above, so publishChange the local task
307: if (changedTask) {
308: rootplan.change(existingTask, changes);
309: } else {
310: // Nothing changed. Use this opportunity to send back an AR notification if necessary
311: // FIXME: task.getPE is evil! Bug 3588 means this PE
312: // might have previously changed, and rehydration will only confirm
313: // that the task should be there.
314: PlanElement pe = existingTask.getPlanElement();
315: if (pe != null) {
316: if (logger.isDebugEnabled()) {
317: logger
318: .debug("Unchanged task with PE. Check shouldDoNotification");
319: }
320: // Cause a notification / estAR to be (re)sent (see bug 3338)
321: if (((PEforCollections) pe)
322: .shouldDoNotification()) {
323: if (logger.isDebugEnabled()) {
324: logger
325: .debug("Got PE.shouldDoNotification. Invoke NotificationLP to send the notification.");
326: }
327: // FIXME: Avoid doing this if commitment date on
328: // task is past? (see line 165)
329: NotificationLP.checkValues(pe, changes,
330: rootplan, logplan, ldmf, self);
331: } else {
332: if (logger.isDebugEnabled()) {
333: logger
334: .debug("Unchanged task, PE doesnt DoNotification. Do nothing (old would have done a pubChange). PE "
335: + pe.getUID());
336: }
337: }
338: //rootplan.change(pe, changes); // Cause estimated result to be resent
339: }
340: } // end block not changed task
341: } // end block to update local Task with changes
342: } catch (SubscriberException se) {
343: logger.error("Could not add Task to LogPlan: " + tsk,
344: se);
345: }
346: }
347: }
348: }
|