001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.planning.ldm.lps;
028:
029: import java.util.Collection;
030: import java.util.Date;
031: import java.util.Enumeration;
032: import java.util.Iterator;
033:
034: import org.cougaar.core.blackboard.EnvelopeTuple;
035: import org.cougaar.core.domain.EnvelopeLogicProvider;
036: import org.cougaar.core.domain.LogicProvider;
037: import org.cougaar.core.domain.RestartLogicProvider;
038: import org.cougaar.core.domain.RestartLogicProviderHelper;
039: import org.cougaar.core.domain.RootPlan;
040: import org.cougaar.core.mts.MessageAddress;
041: import org.cougaar.core.service.AlarmService;
042: import org.cougaar.core.util.UID;
043: import org.cougaar.planning.ldm.LogPlan;
044: import org.cougaar.planning.ldm.PlanningFactory;
045: import org.cougaar.planning.ldm.asset.Asset;
046: import org.cougaar.planning.ldm.asset.ClusterPG;
047: import org.cougaar.planning.ldm.plan.Allocation;
048: import org.cougaar.planning.ldm.plan.AllocationforCollections;
049: import org.cougaar.planning.ldm.plan.NewTask;
050: import org.cougaar.planning.ldm.plan.PlanElement;
051: import org.cougaar.planning.ldm.plan.Task;
052: import org.cougaar.planning.plugin.util.PluginHelper;
053: import org.cougaar.util.PropertyParser;
054: import org.cougaar.util.UnaryPredicate;
055: import org.cougaar.util.log.Logger;
056: import org.cougaar.util.log.Logging;
057:
058: /** RemoteAllocationLP class provides the logic to capture
059: * Allocations against remote agents
060: *
061: **/
062:
063: public class RemoteAllocationLP implements LogicProvider,
064: EnvelopeLogicProvider, RestartLogicProvider {
065: // Tasks older than this are not sent to other agents.
066: private static final long VALID_TASK_TIME_OFFSET = 86400000L;
067:
068: private static final Logger logger = Logging
069: .getLogger(RemoteAllocationLP.class);
070:
071: private final RootPlan rootplan;
072: private final PlanningFactory ldmf;
073: private final MessageAddress self;
074: private final AlarmService alarmService;
075: private final LogPlan logplan;
076: // private final Workflow specialWorkflow = new SpecialWorkflow();
077:
078: // Whether to confirm the Task being allocated exists before sending the allocation. Defaults to true.
079: private static final boolean CHECKALLOC = PropertyParser
080: .getBoolean(
081: "org.cougaar.planning.ldm.lps.RemoteAllocationLP.checkBadTask",
082: true);
083:
084: public RemoteAllocationLP(RootPlan rootplan, PlanningFactory ldmf,
085: MessageAddress self, AlarmService alarmService,
086: LogPlan logplan) {
087: this .rootplan = rootplan;
088: this .ldmf = ldmf;
089: this .self = self;
090: this .alarmService = alarmService;
091: this .logplan = logplan;
092: // logger is static final now
093: //logger = new LoggingServiceWithPrefix(logger, self + ": ");
094: }
095:
096: public void init() {
097: }
098:
099: private long currentTimeMillis() {
100: return alarmService.currentTimeMillis();
101: }
102:
103: private void examineTask(Object obj, Collection changes) {
104: if (obj instanceof Task) {
105: Task task = (Task) obj;
106: PlanElement pe = task.getPlanElement();
107: if (logger.isDebugEnabled()) {
108: logger.debug("examineTask " + task + ", pe=" + pe);
109: }
110: examine(pe, changes);
111: } else {
112: if (logger.isDebugEnabled()) {
113: logger.debug("examine(non)Task " + obj);
114: }
115: }
116: }
117:
118: private void examine(Object obj, Collection changes) {
119: if (!(obj instanceof Allocation))
120: return;
121: AllocationforCollections all = (AllocationforCollections) obj;
122: Task task = all.getTask();
123:
124: // Confirm that the Task exists. If it does not, then
125: // do not send this.
126: if (CHECKALLOC) {
127: if (logplan.findTask(task) == null) {
128: // The Task being allocated is not on the Blackboard any more. Note that this will INFO about
129: // Allocations to local Assets as well as remote Agents
130: if (logger.isInfoEnabled())
131: logger
132: .info(self
133: + ": RemoteAllocationLP: Allocation of rescinded Task: "
134: + task + " using new Allocation "
135: + all);
136: return;
137: }
138: }
139:
140: Asset asset = all.getAsset();
141: ClusterPG cpg = asset.getClusterPG();
142: if (cpg == null)
143: return;
144: MessageAddress destination = cpg.getMessageAddress();
145: if (destination == null)
146: return;
147: if (!taskShouldBeSent(task)) {
148: if (logger.isDebugEnabled()) {
149: logger.debug("shouldNotBeSent: " + task);
150: }
151: return; // In past
152: }
153:
154: // see if we're reissuing the task... if so, we'll just use it.
155: UID copyUID = all.getAllocationTaskUID();
156: boolean isDeleted = all.isAllocationTaskDeleted();
157: Task copytask = prepareRemoteTask(task, destination, copyUID,
158: isDeleted);
159: ((AllocationforCollections) all).setAllocationTask(copytask);
160: rootplan.change(all, changes);
161:
162: // Give the task directive to the blackboard for transmission
163: sendTask(copytask, changes);
164: }
165:
166: private void sendTask(Task copytask, Collection changes) {
167: // if (copytask.getWorkflow() == null) {
168: // NewTask nt = (NewTask) copytask;
169: // nt.setWorkflow(specialWorkflow);
170: // }
171: if (logger.isDebugEnabled()) {
172: logger.debug("Send task: " + copytask);
173: }
174: rootplan.sendDirective(copytask, changes);
175: }
176:
177: /**
178: * Handle one EnvelopeTuple. Call examine to check for objects that
179: * are Allocations to a remote agent.
180: **/
181: public void execute(EnvelopeTuple o, Collection changes) {
182: Object obj = o.getObject();
183: if (o.isAdd()) {
184: examine(obj, changes);
185: } else if (o.isChange()) {
186: examineTask(obj, changes);
187: } else if (o.isBulk()) {
188: Collection c = (Collection) obj;
189: for (Iterator e = c.iterator(); e.hasNext();) {
190: examine(e.next(), changes);
191: }
192: }
193: }
194:
195: /**
196: * If a agent restarts, we resend all the tasks we sent before in
197: * case they have been lost or are out of date.
198: **/
199: public void restart(final MessageAddress cid) {
200: if (logger.isInfoEnabled()) {
201: logger.info("Reconcile with "
202: + (cid == null ? "all agents" : cid.toString()));
203: }
204: UnaryPredicate pred = new UnaryPredicate() {
205: public boolean execute(Object o) {
206: if (o instanceof Allocation) {
207: Allocation alloc = (Allocation) o;
208: Asset asset = alloc.getAsset();
209: ClusterPG cpg = asset.getClusterPG();
210: if (cpg == null)
211: return false;
212: MessageAddress destination = cpg
213: .getMessageAddress();
214: return RestartLogicProviderHelper.matchesRestart(
215: self, cid, destination);
216: }
217: return false;
218: }
219: };
220: Enumeration en = rootplan.searchBlackboard(pred);
221: while (en.hasMoreElements()) {
222: AllocationforCollections alloc = (AllocationforCollections) en
223: .nextElement();
224: UID remoteTaskUID = alloc.getAllocationTaskUID();
225: Task localTask = alloc.getTask();
226: if (remoteTaskUID != null && taskShouldBeSent(localTask)) {
227: Asset asset = alloc.getAsset();
228: ClusterPG cpg = asset.getClusterPG();
229: MessageAddress destination = cpg.getMessageAddress();
230: Task remoteTask = prepareRemoteTask(localTask,
231: destination, remoteTaskUID, false);
232: if (logger.isInfoEnabled()) {
233: logger.info("Resend" + (cid == null ? "*" : "")
234: + " task to " + remoteTask.getDestination()
235: + " with remoteUID=" + remoteTaskUID + " "
236: + localTask);
237: }
238: sendTask(remoteTask, null);
239: }
240: }
241: if (logger.isInfoEnabled()) {
242: logger.info("Reconciled");
243: }
244: }
245:
246: private boolean taskShouldBeSent(Task task) {
247: double et;
248: try {
249: et = PluginHelper.getEndTime(task);
250: } catch (RuntimeException re) {
251: et = Double.NaN;
252: }
253: if (Double.isNaN(et))
254: try {
255: et = PluginHelper.getStartTime(task);
256: } catch (RuntimeException re) {
257: et = Double.NaN;
258: }
259: if (Double.isNaN(et))
260: return true; // Can't tell, send it
261:
262: // Require end time to be later than 1 day before now. In other words,
263: // Task must be at least 1 day in the past for it to be dropped.
264: long minValidTaskTime = currentTimeMillis()
265: - VALID_TASK_TIME_OFFSET;
266: boolean shouldSend = ((long) et) >= minValidTaskTime;
267:
268: if (!shouldSend && logger.isInfoEnabled()) {
269: Date thePref = new Date((long) et);
270: long myCurrentTime = currentTimeMillis();
271: Date myCurrentTimeDate = new Date(myCurrentTime);
272: logger
273: .info(self
274: + ": "
275: + task
276: + " has end time pref earlier than minValidTime (now - 1 day) of "
277: + thePref
278: + ". Will not Allocate task due more than 1 day ago! Now is: "
279: + myCurrentTimeDate);
280: }
281:
282: return shouldSend;
283: }
284:
285: private Task prepareRemoteTask(Task task, MessageAddress dest,
286: UID uid, boolean isDeleted) {
287: NewTask nt;
288: /*
289: if (task instanceof MPTask) {
290: nt = ldmf.newMPTask();
291: ((NewMPTask)nt).setParentTasks(((MPTask)task).getParentTasks());
292: }
293: */
294: nt = ldmf.newTask(uid);
295: nt.setDeleted(isDeleted);
296: nt.setParentTask(task); // set ParenTask to original task
297:
298: // redundant: ldmf initializes it.
299: //nt.setSource(self);
300:
301: // FIXME MIK WARNING! WARNING!
302: // as a hack, we've made setDestination bark if it isn't the current
303: // agent (suspicious use). In order to prevent the below from
304: // generating barkage, we've got a (privately) muzzle...
305: //nt.setDestination(dest);
306: //
307: ((org.cougaar.planning.ldm.plan.TaskImpl) nt)
308: .privately_setDestination(dest);
309: nt.setVerb(task.getVerb());
310: nt.setDirectObject(task.getDirectObject());
311: nt.setPrepositionalPhrases(task.getPrepositionalPhrases());
312: Date commitmentDate = task.getCommitmentDate();
313: if (commitmentDate != null)
314: nt.setCommitmentDate(commitmentDate);
315: // no workflow
316: synchronized (task) {
317: nt.setPreferences(task.getPreferences());
318: }
319: nt.setPriority(task.getPriority());
320: nt.setPlan(task.getPlan());
321: nt.setAuxiliaryQueryTypes(task.getAuxiliaryQueryTypes());
322: nt.setContext(task.getContext());
323:
324: return nt;
325: }
326: }
|